MQTT协议
创始人
2024-11-15 05:36:54
0

EMQX

MQTT协议介绍

MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的消息传输协议。它特别适用于物联网(IoT)设备之间的通信,因其轻量、高效、低带宽需求等特性,广泛应用于需要高实时性和高可靠性的场景中。

EMQX介绍

EMQX(原名EMQ)是一个开源的、高性能、分布式的MQTT消息中间件。它作为一个MQTT Broker,负责接收客户端设备发布的消息,并将这些消息转发给订阅了相应主题的其他客户端。EMQX支持大规模的并发连接和消息处理,非常适合物联网场景中的大规模设备通信需求 。

MQTTX介绍

MQTTX:客户端工具,用于连接EMQX服务器,模拟收纳消息。

部署启动EMQX

emqx文件夹:使用bin里面的emqx.md:(windows)

启动:emqx start

进入页面:localhos:18083

账号:admin,密码:public

如果连不上在bin目录下重置密码;

emqx start emqx ctl admins passwd admin admin123 

这里改为admin123

加密选择md5(举例)

连上后测试:

数据库查询:(存在mqtt_user表)

update mqtt_user set password = md5(‘123456’)

在MQTTX中连接测试。

springboot整合mqtt

springboot中实现客户端,和MQTTX中创建的客户端功能一致

项目依赖:

     org.springframework.boot     spring-boot-starter-integration        org.springframework.integration     spring-integration-mqtt  

配置类:MQTTX连接属性+ip+端口号(1883)+ 超时+是否自动重连+是否清理会话

注意:http协议是18083,mqtt协议是1883

@Data @ConfigurationProperties(prefix = "qf.emqx.mqtt") public class MqttProperties {  private String host = "localhost"; //EMQX服务器主机地址  private int port = 1883; //EMQX服务器端口  private String username; //连接EMQX服务器的账号  private String password; //连接EMQX服务器的密码  private String clientId = "qf_java2402"; //连接EMQX服务器的客户端ID  private String clientName = "client_test"; /** * 连接保持的时间 */ private int keepAlive = 60; /** * 连接超时的时间 */ private int timeout = 30; /** * 是否自动重连 */ private boolean reconnect = false; /** * 是否清理会话 */ private boolean cleanSession = true; }  
@Configuration @EnableConfigurationProperties(MqttProperties.class) public class MqttConfig {      @Bean     public MqttMsgClient mqttAcceptClient(MqttProperties properties) throws MqttException {         MqttMsgClient client =  new MqttMsgClient(properties);         client.subscribe("/qf/java2402", 1);         return client;     } } 

application.yml文件:

qf:   emqx:     mqtt:       username: qf       password: java2402 

接收消息和发送消息的客户端:

/**  * @Description : MQTT接受服务的客户端  */  public class MqttMsgClient {       private final MqttClient client;      public MqttMsgClient(MqttProperties properties) throws MqttException {         //MQTT协议使用的是tcp地址         String address = "tcp://" +properties.getHost() +":" + properties.getPort();         MqttClient client = new MqttClient(address, properties.getClientId(), new MemoryPersistence());         //Mqtt连接选项配置         MqttConnectOptions options = new MqttConnectOptions();         //设置连接使用的账号         options.setUserName(properties.getUsername());         //设置连接使用的密码         options.setPassword(properties.getPassword().toCharArray());         //设置连接超时的时间         options.setConnectionTimeout(properties.getTimeout());         options.setKeepAliveInterval(properties.getKeepAlive());         options.setAutomaticReconnect(properties.isReconnect());         options.setCleanSession(properties.isCleanSession());         //设置客户端接收到信息时处理的回调         client.setCallback(new MsgProcessor());         this.client = client;         //客户端建立连接         client.connect(options);         MqttMessage msg = new MqttMessage("这是Java代码发出来的消息".getBytes(StandardCharsets.UTF_8));         msg.setQos(1);         msg.setRetained(true);         client.publish("/qf/java2403", msg);     }      /**      * 订阅某个主题      *      * @param topic 主题      * @param qos   连接方式      */     public void subscribe(String topic, int qos) {         try {             client.subscribe(topic, qos);         } catch (MqttException e) {             e.printStackTrace();         }     }     /**      * 取消订阅某个主题      *      * @param topic      */     public void unsubscribe(String topic) {         try {             client.unsubscribe(topic);         } catch (MqttException e) {             e.printStackTrace();         }     } } 

处理消息的接口:

public class MsgProcessor implements MqttCallback {      @Override     public void connectionLost(Throwable cause) {         System.out.println(cause.getMessage());     }      @Override     public void messageArrived(String topic, MqttMessage message) throws Exception {         System.out.println("当前主题:" +topic);         byte[] payload = message.getPayload();//获取MQTT消息的载荷部分,也就是消息体         System.out.println(new String(payload));     }      @Override     public void deliveryComplete(IMqttDeliveryToken token) {      } } 

原理:客户端订阅主题,向EMQX里这个主题里发送消息,只要其他的客户端订阅了这个主题就能获得消息。

Qos(Quality of Service):消息服务质量

0:最多分发一次,分发依赖于网络能力

1:至少分发一次,得到应答后不会再发,如果没有得到应答就不断发送直到得到应答为止

2:只分发一次,类似于隔离级别为序列化,对性能消耗很大

MQTT基础概念:

会话:一次连接

订阅:客户端可以订阅多个主题,减少客户端连接开销

主题名:主题的名称

载荷:消息体

相关内容

热门资讯

微信炸金花购买房卡/微信斗牛如... 炸金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:8488009许多玩家在游戏中会购买房卡来享受...
炸金花微信群购买房卡/牛牛链接... 炸金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:33903369许多玩家在游戏中会购买房卡来享...
拼三张从哪里买房卡/新海贝大厅... 拼三张是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:55051770许多玩家在游戏中会购买房卡来享...
微信炸金花房卡一张多少钱/微信... 炸金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:8488009许多玩家在游戏中会购买房卡来享受...
微信链接炸金花房卡在哪买的/微... 炸金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:33903369许多玩家在游戏中会购买房卡来享...
微信群链接炸金花房卡/微信里斗... 炸金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:55051770许多玩家在游戏中会购买房卡来享...
怎么买炸金花房间链接房卡/微信... 炸金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:8488009许多玩家在游戏中会购买房卡来享受...
微信玩链接牛牛房卡/新人皇大厅... 斗牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:33903369许多玩家在游戏中会购买房卡来享受...
拼三张房卡链接去哪里买/橘子大... 拼三张是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:55051770许多玩家在游戏中会购买房卡来享...
微信玩炸金花怎么买房卡/欢乐游... 炸金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:8488009许多玩家在游戏中会购买房卡来享受...
炸金花房卡链接在哪买的/狂飙大... 炸金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:33903369许多玩家在游戏中会购买房卡来享...
如何创建牛牛房间卡/牛至尊大厅... 牛牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:55051770许多玩家在游戏中会购买房卡来享受...
微信里上玩拼三张购买房卡/神牛... 拼三张是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:8488009许多玩家在游戏中会购买房卡来享受...
微信里面斗牛链接房卡/九酷大厅... 斗牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:33903369许多玩家在游戏中会购买房卡来享受...
炸金花如何开好友房间房卡/微信... 炸金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:8488009许多玩家在游戏中会购买房卡来享受...
微信炸金花在哪里充值房卡/新天... 炸金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:33903369许多玩家在游戏中会购买房卡来享...
微信里面拼三张房卡哪里买/新皇... 拼三张是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:55051770许多玩家在游戏中会购买房卡来享...
微信群开牛牛房卡/新天地大厅牛... 牛牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:8488009许多玩家在游戏中会购买房卡来享受更...
微信打炸金花链接房卡怎么买/怎... 炸金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:33903369许多玩家在游戏中会购买房卡来享...
微信玩炸金花房卡怎么买/开牛牛... 炸金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:55051770许多玩家在游戏中会购买房卡来享...