Emqx入门系列【3】客户端认证方式之-内置用户认证
创始人
2024-09-25 05:23:42
0

一、EMQX 认证方式

EMQ X 是一个高度可扩展的 MQTT 消息服务器,它支持多种认证和授权机制来确保安全性。以下是 EMQ X 支持的一些常见认证方式:

  1. 内置用户认证

    • EMQ X 拥有一个内置的用户数据库,可以存储用户名和密码,进行基本的认证。
  2. HTTP 认证

    • 通过 HTTP 接口进行认证,可以集成外部的用户系统。
  3. 数据库认证

    • 支持 MySQL、PostgreSQL、SQLite 等多种数据库,通过数据库查询进行用户认证。
  4. LDAP 认证

    • 支持与 LDAP 服务器集成,进行目录服务认证。
  5. JWT(JSON Web Tokens):

    • 支持 JWT 作为认证机制,适用于分布式系统和微服务架构。
  6. OAuth 2.0

    • 支持 OAuth 2.0 授权框架,可以与各种 OAuth 服务提供商集成。
  7. API 密钥

    • 通过 API 密钥进行认证,适用于不需要用户身份验证的简单场景。
  8. MQTT 证书认证

    • 支持 SSL/TLS 证书认证,适用于需要加密传输的场合。
  9. 访问控制列表(ACL)

    • 使用 ACL 管理用户或客户端的权限,控制对特定主题的访问。
  10. 外部脚本认证

    • 允许使用外部脚本进行复杂的认证逻辑处理。
  11. Kerberos 认证

    • 对于需要 Kerberos 认证的企业环境,EMQ X 也提供了相应的支持。
  12. Nest 认证

    • 支持 Nest 协议进行设备认证,适用于物联网设备。

每种认证方式都有其适用场景和配置方法。例如,数据库认证适用于需要集中管理用户信息的大型系统,而 MQTT 证书认证适用于需要加密通信的金融或医疗行业。

要配置认证方式,通常需要在 EMQ X 的配置文件 emqx.conf 中进行相应的设置,或者通过 EMQ X Dashboard 进行配置

 

二、EMQX常用认证方式(EMQX 界面配置)

1.内置用户认证

1.1 创建内置用户认证客户端认证

af9349c3bd384fd0ac29c1a98b0e16d8.png

1.2 选择数据库类型

ad8d2c68f8ba4cc082f8ee31fe841f8e.png

1.3 选择账号类型 这里选择username 

ff41cd13c291427aaf8bca2d46949bd8.png

1.4 创建用户

fdf6969d69f741f8a1564890045ac5a9.png

d94187024dc24e7bb19d97671b441bf5.png

1.5 java实现emqt集成

1.5.1 pom.xml 中引入 mqtt相关jar

  org.springframework.integration spring-integration-mqtt 

1.5.2  application.yml 配置mq连接信息

369f0f819076413dbbc0fb88b382d7ba.png

1.5.3  编写实现代码

MqttConfiguration.java
package com.chopin.mqtt.mqtt.config;  import com.chopin.mqtt.mqtt.client.MyMQTTClient; import com.chopin.mqtt.mqtt.utils.JwtUtil; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttException; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;  /**  * @class MqttConfiguration  * @description  * @copyright Copyright: 2024-2030  * @creator chopin  * @create-time 2024/8/08 14:38  **/ @Configuration @Slf4j @Data public class MqttConfiguration {      @Value("${mqtt.host}")     String host;     @Value("${mqtt.username}")     String username;     @Value("${mqtt.password}")     String password;     @Value("${mqtt.clientId}")     String clientId;     @Value("${mqtt.timeout}")     int timeOut;     @Value("${mqtt.keepalive}")     int keepAlive;     @Value("${mqtt.topic1}")     public String topic1;     @Value("${mqtt.topic2}")     public String topic2;     @Value("${mqtt.topic3}")     public String topic3;     @Value("${mqtt.topic4}")     public String topic4;      @Bean//注入spring     public MyMQTTClient myMQTTClient() {         MyMQTTClient myMQTTClient = new MyMQTTClient(host, username, password, clientId, timeOut, keepAlive);          try {             myMQTTClient.connect();             //不同的主题             //   myMQTTClient.subscribe(topic1, 1);             //   myMQTTClient.subscribe(topic2, 1);             //   myMQTTClient.subscribe(topic3, 1);             //   myMQTTClient.subscribe(topic4, 1);          } catch (MqttException e) {             log.error("MQTT connect exception:{} ", e.getMessage());         }          return myMQTTClient;     }  } 
MyMQTTClient.java
package com.chopin.mqtt.mqtt.client;  import com.chopin.mqtt.mqtt.callback.MyMQTTCallback; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;  /**  * @class MyMQTTClient  * @description  * @copyright Copyright: 2024-2030  * @creator chopin  * @create-time 2024/8/08 14:41  **/ @Slf4j public class MyMQTTClient {     private static MqttClient client;     private String host;     private String username;     private String password;     private String clientId;     private int timeout;     private int keepalive;      public MyMQTTClient(String host, String username, String password, String clientId, int timeOut, int keepAlive) {         this.host = host;         this.username = username;         this.password = password;         this.clientId = clientId;         this.timeout = timeOut;         this.keepalive = keepAlive;     }      public static MqttClient getClient() {         return client;     }      public static void setClient(MqttClient client) {         MyMQTTClient.client = client;     }      /**      * 设置mqtt连接参数      *      * @param username      * @param password      * @param timeout      * @param keepalive      * @return      */     public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {         MqttConnectOptions options = new MqttConnectOptions();         options.setUserName(username);         options.setPassword(password.toCharArray());         options.setConnectionTimeout(timeout);         options.setKeepAliveInterval(keepalive);         options.setCleanSession(true);         options.setAutomaticReconnect(true);         return options;     }      /**      * 连接mqtt服务端,得到MqttClient连接对象      */     public void connect() throws MqttException {         if (client == null) {             client = new MqttClient(host, clientId, new MemoryPersistence());             client.setCallback(new MyMQTTCallback(MyMQTTClient.this));         }         MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);         if (!client.isConnected()) {             client.connect(mqttConnectOptions);         } else {             client.disconnect();             client.connect(mqttConnectOptions);         }         log.info("MQTT connect success");//未发生异常,则连接成功     }      /**      * 发布,默认qos为0,非持久化      *      * @param pushMessage      * @param topic      */     public void publish(String pushMessage, String topic) {         publish(pushMessage, topic, 0, false);     }      /**      * 发布消息      *      * @param pushMessage      * @param topic      * @param qos      * @param retained:留存      */     public void publish(String pushMessage, String topic, int qos, boolean retained) {         MqttMessage message = new MqttMessage();         message.setPayload(pushMessage.getBytes());         message.setQos(qos);         message.setRetained(retained);         MqttTopic mqttTopic = MyMQTTClient.getClient().getTopic(topic);         if (null == mqttTopic) {             log.error("topic is not exist");         }         MqttDeliveryToken token;//Delivery:配送         synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充             try {                 token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件                 token.waitForCompletion(1000L);             } catch (Exception e) {                 e.printStackTrace();             }         }     }      /**      * 订阅某个主题      *      * @param topic      * @param qos      */     public void subscribe(String topic, int qos) {         try {             MyMQTTClient.getClient().subscribe(topic, qos);         } catch (MqttException e) {             e.printStackTrace();         }     }      /**      * 取消订阅主题      *      * @param topic 主题名称      */     public void cleanTopic(String topic) {         if (client != null && client.isConnected()) {             try {                 client.unsubscribe(topic);             } catch (MqttException e) {                 e.printStackTrace();             }         } else {             System.out.println("取消订阅失败!");         }     } }  
MyMQTTCallback.java 
package com.chopin.mqtt.mqtt.callback;  import com.chopin.mqtt.mqtt.client.MyMQTTClient; import com.chopin.mqtt.mqtt.config.MqttConfiguration; import com.chopin.mqtt.mqtt.dto.MqttMsg; import com.chopin.mqtt.mqtt.utils.SpringUtils; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import com.alibaba.fastjson.JSON; import java.util.Map;  /**  * @class MyMQTTCallback  * @description  * @copyright Copyright: 2024-2030  * @creator chopin  * @create-time 2024/8/08 14:43  **/ @Slf4j public class MyMQTTCallback implements MqttCallbackExtended {     //手动注入     private MqttConfiguration mqttConfiguration = SpringUtils.getBean(MqttConfiguration.class);     private MyMQTTClient myMQTTClient;     public MyMQTTCallback(MyMQTTClient myMQTTClient) {         this.myMQTTClient = myMQTTClient;     }     /**      * 丢失连接,可在这里做重连      * 只会调用一次      *      * @param throwable      */     @Override     public void connectionLost(Throwable throwable) {         log.error("mqtt connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage());         long reconnectTimes = 1;         while (true) {             try {                 if (MyMQTTClient.getClient().isConnected()) {                     //判断已经重新连接成功  需要重新订阅主题 可以在这个if里面订阅主题  或者 connectComplete(方法里面)  看你们自己选择                     log.warn("mqtt reconnect success end  重新连接  重新订阅成功");                     return;                 }                 reconnectTimes+=1;                 log.warn("mqtt reconnect times = {} try again...  mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);                 MyMQTTClient.getClient().reconnect();             } catch (MqttException e) {                 log.error("mqtt断连异常", e);             }             try {                 Thread.sleep(5000);             } catch (InterruptedException e1) {                 e1.printStackTrace();             }         }     }     /**      * @param topic      * @param mqttMessage      * @throws Exception      * subscribe后得到的消息会执行到这里面      */     @Override     public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {         log.info("接收消息主题 : {},接收消息内容 : {}", topic, new String(mqttMessage.getPayload()));         //发布消息主题         if (topic.contains("B/pick/warn/")){             MqttMsg mqttMsg =  JSON.parseObject(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8), MqttMsg.class);             //你自己的业务接口             log.info("接收消息主题 : {},接收消息内容 : {}", topic, JSON.toJSONString(mqttMsg));          }         if (topic.equals("embed/resp")){             Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8));             //你自己的业务接口          }         //接收报警主题         if (topic.equals("embed/warn")){             Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8));             //你自己的业务接口         }     }          /**      *连接成功后的回调 可以在这个方法执行 订阅主题  生成Bean的 MqttConfiguration方法中订阅主题 出现bug      *重新连接后  主题也需要再次订阅  将重新订阅主题放在连接成功后的回调 比较合理      * @param reconnect      * @param serverURI      */     @Override     public  void  connectComplete(boolean reconnect,String serverURI){         log.info("MQTT 连接成功,连接方式:{}",reconnect?"重连":"直连");         //订阅主题         myMQTTClient.subscribe(mqttConfiguration.topic1, 1);         myMQTTClient.subscribe("B/pick/warn/#", 1);         myMQTTClient.subscribe(mqttConfiguration.topic3, 1);         myMQTTClient.subscribe(mqttConfiguration.topic4, 1);     }     /**      * 消息到达后      * subscribe后,执行的回调函数      *      * @param s      * @param mqttMessage      * @throws Exception      */     /**      * publish后,配送完成后回调的方法      *      * @param iMqttDeliveryToken      */     @Override     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {         log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());     } } 

运行测试:

f73dca31ea364f209cc9d69ae2aac8c0.png

 

emqx 面板

a7561bd1e75d49e196e9a8499a202526.png

ws 数据订阅

5408079f23a747b9986e0aeccf23c7bf.png

c48f661f70874bd99b6732784fc4a262.png

 

相关内容

热门资讯

安卓系统怎么设壁纸桌面,轻松设... 亲爱的手机控们,你是不是也和我一样,对手机壁纸和桌面布局情有独钟?想要让你的安卓手机焕然一新,个性化...
安卓系统6.0怎么root教程... 亲爱的安卓用户们,你是否曾梦想过让你的手机拥有超能力?没错,我说的就是root!今天,就让我带你一步...
安卓2.2系统当贝市场,体验升... 你有没有想过,那些老旧的安卓设备,虽然已经不再流行,但它们在某个角落里,可能还在默默无闻地发挥着余热...
安卓14系统为啥卡顿,探究性能... 最近是不是发现你的安卓手机有点儿不给力了?打开应用慢吞吞的,滑动页面卡得像老牛拉车,这可真是让人头疼...
采集无锡安卓系统的公司,技术驱... 你有没有想过,在这个科技飞速发展的时代,手机操作系统的重要性简直不言而喻?而在中国,有一个城市,它的...
安卓新系统有问题,揭秘常见故障... 最近你的安卓手机是不是也遇到了点小麻烦?别急,让我来给你细细道来,看看这个新系统到底有哪些让人头疼的...
u盘安卓随身系统,U盘随身携带... 你有没有想过,把安卓系统随身携带,随时随地都能用上?没错,今天就要给你揭秘一个神奇的小玩意——u盘安...
安卓手机有俩系统吗,体验双重魅... 你有没有发现,安卓手机的世界里,好像藏着两个神秘的小秘密呢?没错,就是那个让人又爱又恨的安卓系统。今...
安卓系统恢复怎么搞,轻松应对系... 手机突然卡壳了,系统崩溃了,是不是瞬间感觉整个人都不好了?别慌,今天就来教你一招,让你的安卓手机重获...
windows和安卓系统的关系... 你有没有想过,为什么你的手机和电脑有时候会像好朋友一样,互相配合得天衣无缝?其实,这背后有一个大大的...
安卓11怎么降级系统,轻松还原... 你有没有发现,安卓11系统虽然功能强大,但有时候也会有点小闹心呢?比如,某些应用不支持,或者系统运行...
正版授权!游戏推荐斗牛房卡出售... 今 日消息,天蝎大厅房卡添加微信33549083 苹果今日发布了 iOS 16.1 正式版更新,简单...
推荐一款!金花房卡批发女娲大厅... 女娲大厅房卡更多详情添加微:33549083、 2、在商城页面中选择房卡选项。 3、根...
安卓系统误删无法开机,紧急修复... 手机里的照片、联系人、重要文件,说没就没了!最近,身边的朋友纷纷向我抱怨,他们的安卓手机因为误删数据...
玩家攻略,牛牛充值房卡新大圣/... 玩家攻略,牛牛充值房卡新大圣/新道游/微信链接房卡购买渠道新大圣/新道游是一款非常受欢迎的游戏,咨询...
一分钟了解!牛牛房卡出售红桃众... 红桃众娱是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:【3329006910】或QQ:332900...
IA解析/金花房卡批发玄灵大厅... IA解析/金花房卡批发玄灵大厅/微信链接房卡最低价格Sa9Ix苹果iPhone 17手机即将进入量产...
我来教你/牛牛房卡代理新518... 新518互娱房卡更多详情添加微:33549083、 2、在商城页面中选择房卡选项。 3...
正规平台有哪些,金花房卡代理零... 微信游戏中心:九天大厅房卡在哪里买打开微信,添加客服微信【88355042】,进入游戏中心或相关小程...
荣耀magic是安卓系统吗,探... 你有没有听说过荣耀Magic这款手机?最近它可是手机圈里的热门话题呢!很多人都在问,荣耀Magic是...