消息通讯——MQTT的入门和使用

Emqx简介

EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 ==Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器==。

EMQ X 设计目标是实现高可靠,并支持承载海量物联网终端的MQTT连接,支持在海量物联网设备间低延时消息路由:

1. 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。
2. 分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。
3. 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。
4. 完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持。

官方文档:https://docs.emqx.cn/broker/v4.3/getting-started/install.html

MQTT是什么?

MQTT全称消息队列遥测传输 (Message Queuing Telemetry Transport)。其主要提供了订阅/发布两种消息模式,更为简约、轻量,易于使用,特别适合于受限环境(带宽低、网络延迟高、网络通信不稳定)的消息分发,属于物联网(Internet of Thing)的一个标准传输协议。

MQTT实现方式

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

Emqx安装

官方网站:https://www.emqx.cn/

安装步骤

  1. 下载地址:https://www.emqx.cn/downloads#broker

  2. 解压程序包

  3. 启动 EMQ X Broker

    进入到emqx解压后目录,进入bin目录,执行其下的命令脚本

    1
    2
    3
    4
    5
    6
    #启动emqx
    emqx start
    #查看emqx状态
    emqx status
    #停止 EMQ X Broker
    emqx stop
  4. 卸载 EMQ X Broker

    直接删除 EMQ X 目录即可

Emqx Dashboard插件

Emqx自带dashboard插件:通过Dashboard,你可以查看服务器基本信息、负载情况和统计数据,可以查看某个客户端的连接状态等信息甚至

断开其连接,也可以动态加载和卸载指定插件。

除此之外,EMQ X Dashboard 还提供了规则引擎的可视化操作界面,同时集成了一个简易的 MQTT 客户端工具供用户测试使用。

当 EMQ X 成功运行在你的本地计算机上且 EMQ X Dashboard 被默认启用时,你可以访问 http://localhost:18083 来查看你的 Dashboard,默认用户名是admin,密码是 public

MQTT 设计了的3 QoS 等级

QoS 0:消息最多传递一次,如果当时客户端不可用,则会丢失该消息。

QoS 1:消息传递至少 1 次。

QoS 2:消息仅传送一次。

需要开放的端口

Emqx使用

Java使用mqtt

使用步骤如下

1. 导入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>

2. 订阅者(App.java)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package cn.kt.mtqqdemo.mqtt;
/**
* Created by lcw.
* Date: 2021/4/12 13:57
* 描述:
*/
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.UUID;
public class App {
public static void main(String[] args) {
try {
//apollo地址
String HOST = "tcp://127.0.0.1:1883";
//要订阅的主题
String TOPIC1 = "ceshi";
//指你Apollo中的用户名密码
String userName = "admin";
String pwd = "123456";
String clientid = UUID.randomUUID().toString().replace("-", "");
MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的连接对象
MqttConnectOptions options = new MqttConnectOptions();
//设置连接参数
//清除session回话
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(pwd.toCharArray());
//超时设置
options.setConnectionTimeout(10);
//心跳保持时间
options.setKeepAliveInterval(20);
//遗嘱:当该客户端端口连接时,会向whb主题发布一条信息
options.setWill("nick", "我挂了,你加油".getBytes(), 1, true);
//监听对象:自己创建
client.setCallback(new PushCallback());
//打开连接
client.connect(options);
//设置消息级别
int[] Qos = {1};
//订阅主题
String[] topics = {TOPIC1};
client.subscribe(topics, Qos);

} catch (MqttException e) {
e.printStackTrace();
}
}
}

3. 发布者(sendOut.java)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package cn.kt.mtqqdemo.mqtt;
/**
* Created by lcw.
* Date: 2021/4/12 14:09
* 描述:
*/
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.UnsupportedEncodingException;
import java.util.Scanner;
public class SendOut {
//tcp://MQTT安装的服务器地址:MQTT定义的端口号
String HOST = "tcp://127.0.0.1:1883";
//定义一个主题
public static final String TOPIC = "ceshi";
// public static final String TOPIC = "abc";
//定义MQTT的ID,可以在MQTT服务配置中指定
private static final String clientid = "server1";
private MqttMessage message;
public static final String TOPIC1 = "topic1";
public static final String userName = "admin";
public static final String pwd = "123456";
public MqttClient client;
private MqttTopic topic;
public SendOut() {
try {
client = new MqttClient(HOST, clientid, new MemoryPersistence());
connect();
} catch (MqttException e) {
e.printStackTrace();
}
}

//发布消息
public void publish(MqttTopic topic, MqttMessage message) throws MqttException {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
//打印发送状态
System.out.println("message is published completely!" + token.isComplete());
}

//建立连接:参数与订阅端相似
private void connect() throws MqttException {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(pwd.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
client.setCallback(new PushCallback());
client.connect(options);
}

public static void main(String[] args) throws MqttException, UnsupportedEncodingException {
SendOut service = new SendOut();
Scanner sc = new Scanner(System.in);
service.topic = service.client.getTopic(TOPIC);
service.message = new MqttMessage();
//确保被收到一次
service.message.setQos(1);
service.message.setPayload("干嘛这么想不开,要在脸上贴个输字".getBytes("UTF-8"));
service.publish(service.topic, service.message);
}
}

4. 订阅消息回调(OnMessageCallback.java)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package cn.kt.mtqqdemo.mqtt;
/**
* Created by lcw.
* Date: 2021/4/12 13:58
* 描述:
*/
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class OnMessageCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题:" + topic);
System.out.println("接收消息Qos:" + message.getQos());
System.out.println("接收消息内容:" + new String(message.getPayload()));
}

public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}

5. 发布消息回调(PushCallback.java)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package cn.kt.mtqqdemo.mqtt;
/**
* Created by lcw.
* Date: 2021/4/12 14:01
* 描述:
*/
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class PushCallback implements MqttCallback {

//连接丢失:一般用与重连
public void connectionLost(Throwable throwable) {
System.out.println("丢失连接");
}
//消息到达:指收到消息
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}

public void deliveryComplete(IMqttDeliveryToken token) {
//(发布)publish后会执行到这里,发送状态
System.out.println("deliveryComplete---------"
+ token.isComplete());
}
}

测试效果

  1. 发布者

  2. 订阅者

JS使用mqtt

引入mqttws31.js

可以下载:
链接:https://pan.baidu.com/s/1c9CfyhT4CSY2FEOa1OgxPw
提取码:siwg

也可以用对应的cdn 地址

1
2
3
4
<!-- For the plain library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
<!-- For the minified library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js" type="text/javascript"></script>

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
<!DOCTYPE html >
<html>

<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http - equiv="X-UA-Compatible" content="ie=edge">
<title> Document </title>
<link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="nofollow noopener" rel="stylesheet">
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/2.2.1/jquery.min.js"></script>
<script src="./js/mqttws31.js" type="text/javascript"></script>
<style>
#contentList li {
word-break: break-all;
word-wrap: break-word;
}
</style>
</head>

<body>
<div style="width: 900px;margin: 50px auto;">
<div class="form-group">
<label>评论人:</label>
<input type="text" class="form-control" id="user">
</div>

<div class="form-group">
<label>评论内容:</label>
<textarea class="form-control" id="content" style="word-break:break-all;word-wrap:break-word;"></textarea>
</div>

<div class="form-group">
<input type="button" value="发表评论" class="btn btn-primary" onclick="send()">
</div>

<div>
<ul id="contentList" class="list-group">
<!-- <li class="list-group-item">
<span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }}
</li> -->
</ul>
</div>
</div>

<script>
// http://192.168.3.181/
var hostname = '192.168.3.181',
port = 8083,
clientId = 'client-' + generateUUID(),
timeout = 1000,
keepAlive = 2000,
cleanSession = false,
ssl = false,
userName = 'Nick',
password = '12356',
topic = 'ceshi';
client = new Paho.MQTT.Client(hostname, port, clientId);
//建立客户端实例
var options = {
invocationContext: {
host: hostname,
port: port,
path: client.path,
clientId: clientId
},
timeout: timeout,
keepAliveInterval: keepAlive,
cleanSession: cleanSession,
useSSL: ssl,
userName: userName,
password: password,
onSuccess: onConnect,
onFailure: function(e) {
console.log(e);
}
};
client.connect(options);

//连接服务器并注册连接成功处理事件
function onConnect() {
console.log("onConnected");
client.subscribe(topic);
}

client.onConnectionLost = onConnectionLost;

//注册连接断开处理事件
client.onMessageArrived = onMessageArrived;

//注册消息接收处理事件
function onConnectionLost(responseObject) {
console.log(responseObject);
if (responseObject.errorCode !== 0) {
console.log("onConnectionLost:" + responseObject.errorMessage);
console.log("连接已断开");
}
}

//收到消息时处理事件
function onMessageArrived(message) {
var msg = message.payloadString;
var obj = JSON.parse(msg);
console.log("收到消息:" + obj);
/*
<li class="list-group-item">
<span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }}
</li>
*/
$('#contentList').append($(`<li class="list-group-item" > <span class="badge">评论人:` + obj.name + `,时间:` + obj.time + `</span>` + obj.content + `</li>`));
}

//点击发送按钮事件
function send() {
var name = document.getElementById("user").value;
var content = document.getElementById("content").value;
console.log('name :>> ', name);
console.log('content :>> ', content);
var time = new Date().Format("yyyy-MM-dd hh:mm:ss");
var getConment = {
name: name,
content: content,
time: time,
}
if (name) {
var str = getConment;
message = new Paho.MQTT.Message(JSON.stringify(str));
message.destinationName = topic;
client.send(message);
document.getElementById("content").value = "";
document.getElementById("user").value = "";
}
}

//生成UUID
function generateUUID() {
var d = new Date().getTime();
if (window.performance && typeof window.performance.now === "function") {
d += performance.now(); //use high-precision timer if available
}
var uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
var r = (d + Math.random() * 16) % 16 | 0;
d = Math.floor(d / 16);
return (c == 'x' ? r : (r & 0x3 | 0x8)).toString(16);
});
return uuid;
}
//date时间格式化
Date.prototype.Format = function(fmt) {
var o = {
"M+": this.getMonth() + 1, //月份
"d+": this.getDate(), //日
"h+": this.getHours(), //小时
"m+": this.getMinutes(), //分
"s+": this.getSeconds(), //秒
"q+": Math.floor((this.getMonth() + 3) / 3), //季度
"S": this.getMilliseconds() //毫秒
};
if (/(y+)/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() + "").substr(4 - RegExp.$1.length));
for (var k in o)
if (new RegExp("(" + k + ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00" + o[k]).substr(("" + o[k]).length)));
return fmt;
}
</script>
</body>
</html>

测试效果

页面效果

Java 连接mqtt订阅者收到消息