技术学习 MQTT 消息通讯——MQTT的入门和使用 lycheeKing 2023-07-14 2024-10-27 Emqx简介 EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 ==Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器==。
EMQ X 设计目标是实现高可靠,并支持承载海量物联网终端的MQTT连接,支持在海量物联网设备间低延时消息路由:
1. 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。 2. 分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。 3. 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。 4. 完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持。
官方文档:https://docs.emqx.cn/broker/v4.3/getting-started/install.html
MQTT是什么? MQTT全称消息队列遥测传输 (Message Queuing Telemetry Transport)。其主要提供了订阅/发布两种消息模式,更为简约、轻量,易于使用,特别适合于受限环境(带宽低、网络延迟高、网络通信不稳定)的消息分发,属于物联网(Internet of Thing)的一个标准传输协议。
MQTT实现方式 实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分: (1)Topic ,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload); (2)payload ,可以理解为消息的内容,是指订阅者具体要使用的内容。
Emqx安装 官方网站:https://www.emqx.cn/
安装步骤
下载地址:https://www.emqx.cn/downloads#broker
解压程序包
启动 EMQ X Broker
进入到emqx解压后目录,进入bin目录,执行其下的命令脚本
1 2 3 4 5 6 emqx start emqx status emqx stop
卸载 EMQ X Broker
直接删除 EMQ X 目录即可
Emqx Dashboard插件 Emqx自带dashboard 插件:通过Dashboard,你可以查看服务器基本信息、负载情况和统计数据,可以查看某个客户端的连接状态等信息甚至
断开其连接,也可以动态加载和卸载指定插件。
除此之外,EMQ X Dashboard 还提供了规则引擎的可视化操作界面,同时集成了一个简易的 MQTT 客户端工具供用户测试使用。
当 EMQ X 成功运行在你的本地计算机上且 EMQ X Dashboard 被默认启用时,你可以访问 http://localhost:18083 来查看你的 Dashboard,默认用户名是admin,密码是 public 。
MQTT 设计了的3 QoS 等级 QoS 0:消息最多传递一次,如果当时客户端不可用,则会丢失该消息。
QoS 1:消息传递至少 1 次。
QoS 2:消息仅传送一次。
需要开放的端口
Emqx使用 Java使用mqtt
使用步骤如下
1. 导入依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-jpa</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.eclipse.paho</groupId > <artifactId > org.eclipse.paho.client.mqttv3</artifactId > <version > 1.2.2</version > </dependency >
2. 订阅者(App.java) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package cn.kt.mtqqdemo.mqtt;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.util.UUID;public class App { public static void main (String[] args) { try { String HOST = "tcp://127.0.0.1:1883" ; String TOPIC1 = "ceshi" ; String userName = "admin" ; String pwd = "123456" ; String clientid = UUID.randomUUID().toString().replace("-" , "" ); MqttClient client = new MqttClient (HOST, clientid, new MemoryPersistence ()); MqttConnectOptions options = new MqttConnectOptions (); options.setCleanSession(false ); options.setUserName(userName); options.setPassword(pwd.toCharArray()); options.setConnectionTimeout(10 ); options.setKeepAliveInterval(20 ); options.setWill("nick" , "我挂了,你加油" .getBytes(), 1 , true ); client.setCallback(new PushCallback ()); client.connect(options); int [] Qos = {1 }; String[] topics = {TOPIC1}; client.subscribe(topics, Qos); } catch (MqttException e) { e.printStackTrace(); } } }
3. 发布者(sendOut.java) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 package cn.kt.mtqqdemo.mqtt;import org.eclipse.paho.client.mqttv3.*;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.io.UnsupportedEncodingException;import java.util.Scanner;public class SendOut { String HOST = "tcp://127.0.0.1:1883" ; public static final String TOPIC = "ceshi" ; private static final String clientid = "server1" ; private MqttMessage message; public static final String TOPIC1 = "topic1" ; public static final String userName = "admin" ; public static final String pwd = "123456" ; public MqttClient client; private MqttTopic topic; public SendOut () { try { client = new MqttClient (HOST, clientid, new MemoryPersistence ()); connect(); } catch (MqttException e) { e.printStackTrace(); } } public void publish (MqttTopic topic, MqttMessage message) throws MqttException { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); System.out.println("message is published completely!" + token.isComplete()); } private void connect () throws MqttException { MqttConnectOptions options = new MqttConnectOptions (); options.setCleanSession(false ); options.setUserName(userName); options.setPassword(pwd.toCharArray()); options.setConnectionTimeout(10 ); options.setKeepAliveInterval(20 ); client.setCallback(new PushCallback ()); client.connect(options); } public static void main (String[] args) throws MqttException, UnsupportedEncodingException { SendOut service = new SendOut (); Scanner sc = new Scanner (System.in); service.topic = service.client.getTopic(TOPIC); service.message = new MqttMessage (); service.message.setQos(1 ); service.message.setPayload("干嘛这么想不开,要在脸上贴个输字" .getBytes("UTF-8" )); service.publish(service.topic, service.message); } }
4. 订阅消息回调(OnMessageCallback.java) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package cn.kt.mtqqdemo.mqtt;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttMessage;public class OnMessageCallback implements MqttCallback { public void connectionLost (Throwable cause) { System.out.println("连接断开,可以做重连" ); } public void messageArrived (String topic, MqttMessage message) throws Exception { System.out.println("接收消息主题:" + topic); System.out.println("接收消息Qos:" + message.getQos()); System.out.println("接收消息内容:" + new String (message.getPayload())); } public void deliveryComplete (IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }
5. 发布消息回调(PushCallback.java) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package cn.kt.mtqqdemo.mqtt; /** * Created by lcw. * Date: 2021/4/12 14:01 * 描述: */ import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class PushCallback implements MqttCallback { //连接丢失:一般用与重连 public void connectionLost(Throwable throwable) { System.out.println("丢失连接"); } //消息到达:指收到消息 public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息内容 : " + new String(message.getPayload())); } public void deliveryComplete(IMqttDeliveryToken token) { //(发布)publish后会执行到这里,发送状态 System.out.println("deliveryComplete---------" + token.isComplete()); } }
测试效果
发布者
订阅者
JS使用mqtt 引入mqttws31.js 可以下载: 链接:https://pan.baidu.com/s/1c9CfyhT4CSY2FEOa1OgxPw 提取码:siwg
也可以用对应的cdn 地址
1 2 3 4 <!-- For the plain library--> <script src ="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type ="text/javascript" > </script > <!-- For the minified library--> <script src ="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js" type ="text/javascript" > </script >
代码如下 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 <!DOCTYPE html > <html > <head > <meta charset ="UTF-8" > <meta name ="viewport" content ="width=device-width, initial-scale=1.0" > <meta http - equiv ="X-UA-Compatible" content ="ie=edge" > <title > Document </title > <link href ="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel ="nofollow noopener" rel ="stylesheet" > <script src ="https://cdn.bootcdn.net/ajax/libs/jquery/2.2.1/jquery.min.js" > </script > <script src ="./js/mqttws31.js" type ="text/javascript" > </script > <style > #contentList li { word-break : break-all; word-wrap : break-word; } </style > </head > <body > <div style ="width: 900px;margin: 50px auto;" > <div class ="form-group" > <label > 评论人:</label > <input type ="text" class ="form-control" id ="user" > </div > <div class ="form-group" > <label > 评论内容:</label > <textarea class ="form-control" id ="content" style ="word-break:break-all;word-wrap:break-word;" > </textarea > </div > <div class ="form-group" > <input type ="button" value ="发表评论" class ="btn btn-primary" onclick ="send()" > </div > <div > <ul id ="contentList" class ="list-group" > </ul > </div > </div > <script > var hostname = '192.168.3.181' , port = 8083 , clientId = 'client-' + generateUUID (), timeout = 1000 , keepAlive = 2000 , cleanSession = false , ssl = false , userName = 'Nick' , password = '12356' , topic = 'ceshi' ; client = new Paho .MQTT .Client (hostname, port, clientId); var options = { invocationContext : { host : hostname, port : port, path : client.path , clientId : clientId }, timeout : timeout, keepAliveInterval : keepAlive, cleanSession : cleanSession, useSSL : ssl, userName : userName, password : password, onSuccess : onConnect, onFailure : function (e ) { console .log (e); } }; client.connect (options); function onConnect ( ) { console .log ("onConnected" ); client.subscribe (topic); } client.onConnectionLost = onConnectionLost; client.onMessageArrived = onMessageArrived; function onConnectionLost (responseObject ) { console .log (responseObject); if (responseObject.errorCode !== 0 ) { console .log ("onConnectionLost:" + responseObject.errorMessage ); console .log ("连接已断开" ); } } function onMessageArrived (message ) { var msg = message.payloadString ; var obj = JSON .parse (msg); console .log ("收到消息:" + obj); $('#contentList' ).append ($(`<li class="list-group-item" > <span class="badge">评论人:` + obj.name + `,时间:` + obj.time + `</span>` + obj.content + `</li>` )); } function send ( ) { var name = document .getElementById ("user" ).value ; var content = document .getElementById ("content" ).value ; console .log ('name :>> ' , name); console .log ('content :>> ' , content); var time = new Date ().Format ("yyyy-MM-dd hh:mm:ss" ); var getConment = { name : name, content : content, time : time, } if (name) { var str = getConment; message = new Paho .MQTT .Message (JSON .stringify (str)); message.destinationName = topic; client.send (message); document .getElementById ("content" ).value = "" ; document .getElementById ("user" ).value = "" ; } } function generateUUID ( ) { var d = new Date ().getTime (); if (window .performance && typeof window .performance .now === "function" ) { d += performance.now (); } var uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx' .replace (/[xy]/g , function (c ) { var r = (d + Math .random () * 16 ) % 16 | 0 ; d = Math .floor (d / 16 ); return (c == 'x' ? r : (r & 0x3 | 0x8 )).toString (16 ); }); return uuid; } Date .prototype .Format = function (fmt ) { var o = { "M+" : this .getMonth () + 1 , "d+" : this .getDate (), "h+" : this .getHours (), "m+" : this .getMinutes (), "s+" : this .getSeconds (), "q+" : Math .floor ((this .getMonth () + 3 ) / 3 ), "S" : this .getMilliseconds () }; if (/(y+)/ .test (fmt)) fmt = fmt.replace (RegExp .$1 , (this .getFullYear () + "" ).substr (4 - RegExp .$1 .length )); for (var k in o) if (new RegExp ("(" + k + ")" ).test (fmt)) fmt = fmt.replace (RegExp .$1 , (RegExp .$1 .length == 1 ) ? (o[k]) : (("00" + o[k]).substr (("" + o[k]).length ))); return fmt; } </script > </body > </html >
测试效果 页面效果
Java 连接mqtt订阅者收到消息