作者 钟来

初始提交

正在显示 26 个修改的文件 包含 548 行增加98 行删除
  1 +## /{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/ 由代理服务器做权限控制,终端不需要传递
  2 +
  3 +## 终端订阅
  4 +/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/PUT/+ 写数据,需要返回执行结果
  5 +/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/GET_REQ/+ 获取数据的返回结果
  6 +/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/READ/+ 读数据,需要返回执行结果
  7 +
  8 +## 终端发布
  9 +/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/ALL_POST 全量上报数据,不需要返回
  10 +/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/ADD_POST 增量上报数据,不需要返回
  11 +/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/PUT_REQ/+ 写数据的执行结果
  12 +/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/GET/+ 获取数据
  13 +/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/READ_REQ/+ 读数据的执行结果
  14 +
  15 +##代理通知
  16 +/${{roleid}}/${{username}}/${{clientid}}/online 上下线通知
  17 +
  18 +##下位机发布
  19 +1、发布设备信息,对应主题:ALL_POST 或者 ADD_POST
  20 +# 描述:设备上电后发布主机信息
  21 +# rssi 设备强度(信号极好[-55— 0],信号好[-70— -55],信号一般[-85— -70],信号差[-100— -85])
  22 +# status 设备状态,固定为3,表示在线
  23 +# userId 用户的ID,1=admin
  24 +# firmwareVersion 固件版本
  25 +# longitude 经度,可选,使用设备定位时需要上传
  26 +# latitude 纬度,可选,使用设备定位时需要上传
  27 +# summary 摘要,可选,设备的配置信息等,json格式,对象可自定义
  28 +{
  29 + "0":{
  30 + "rssi": -43,
  31 + "firmwareVersion": 1.2,
  32 + "longitude": 0,
  33 + "latitude": 0,
  34 + "summary": {
  35 + "name": "wumei-smart",
  36 + "chip": "esp8266",
  37 + "author": "kerwincui",
  38 + "version": 1.2,
  39 + "createTime": "2022-06-06"
  40 + }
  41 + }
  42 +}
  43 +
  44 +2、发布实时监测,对应主题:ALL_POST 或者 ADD_POST
  45 +# 描述:定时提交数据,分全量提交和增量提交,全量提交会替换传输的指标同时删除没有传输的指标,增量提交只会替换传输的指标
  46 +# id 标识符,物模型中的属性,产品详情中查看标识符,对应id值
  47 +# value 属性对应的值
  48 +{
  49 + "1":{
  50 + "id1":"value1",
  51 + "id2":"value2",
  52 + "id3":"value3"
  53 + },
  54 + "3":{
  55 + "id1":"value1",
  56 + "id2":"value2",
  57 + "id3":"value3"
  58 + },
  59 + "4":{
  60 + "id1":"value1",
  61 + "id2":"value2",
  62 + "id3":"value3"
  63 + }
  64 +}
  65 +
  66 +3、发布获取设备信息,对应主题: GET/+
  67 +# 描述: 0表示主机,1 其它接入设备(开关"10_XXX"或者传感器(1_XXX))的编号,id1 指定属性或者配置
  68 +{
  69 + "1":"id1,id2,id3",
  70 + "0":"id1,id2,id3"
  71 +}
  72 +
  73 +4、发布主题PUT/+的回复主题PUT_REQ/+
  74 +# 描述:上位机发布PUT/+主题远程操作下位机,下位机要应答回复,+ 代表messageid必须唯一,推荐使用时间戳单位s,下位机回复messageid和PUT/+主题里的messageid一致不用改变
  75 +# code 操作结果(1成功,0失败)
  76 +# mgs 失败原因
  77 +{
  78 +"code":1,
  79 +"mgs":"DEVICE_NOT_FIND"
  80 +}
  81 +
  82 +5、发布主题 READ/+ 的回复主题 READ_REQ/+
  83 +# 描述:上位机发布 READ/+ 主题远程读取下位机,下位机要应答回复,+ 代表messageid必须唯一,推荐使用时间戳单位s,下位机回复 messageid 和 READ/+ 主题里的 messageid 一致不用改变
  84 +# code 操作结果(1成功,0失败)
  85 +# mgs 失败原因
  86 +# data 如果需要返回设备数据,data不为空,数据格式遵循ALL_POST和ADD_POST主题payload的规则
  87 +{
  88 +"code":1,
  89 +"mgs":"DEVICE_NOT_FIND"
  90 +"data":{{POST_PAYLOAD}}
  91 +}
  92 +
  93 +
  94 +##下位机订阅
  95 +# 描述:firmwareVersion版本号如果不一致表示升级
  96 +# restart 1重启,2复位,3恢复出厂值
  97 +# longitude,latitude 经纬度
  98 +# summary 自定义内容
  99 +# 第一层"0"代表主机值固定,"1"代表其它接入设备(开关"10_XXX"或者传感器(1_XXX)),编号可以自定义
  100 +1、订阅主题 PUT/+,上位机写操作
  101 +{
  102 + "0":{
  103 + "restart": 1,
  104 + "firmwareVersion": 1.2,
  105 + "longitude": 0,
  106 + "latitude": 0,
  107 + "summary": {
  108 + "name": "wumei-smart",
  109 + "chip": "esp8266",
  110 + "author": "kerwincui",
  111 + "version": 1.2,
  112 + "createTime": "2022-06-06"
  113 + }
  114 + },
  115 + "1":{
  116 + "id1":"value1",
  117 + "id2":"value2",
  118 + "id3":"value3"
  119 + },
  120 + "3":{
  121 + "id1":"value1",
  122 + "id2":"value2",
  123 + "id3":"value3"
  124 + },
  125 + "4":{
  126 + "id1":"value1",
  127 + "id2":"value2",
  128 + "id3":"value3"
  129 + }
  130 +}
  131 +
  132 +2、订阅主题 GET_REQ/+,下位机读取上位机信息时候,上位机返回的消息
  133 +# 描述:规则和 PUT/+ 主题一致,返回的数据根据 GET/+ 主题请求的 sensor_number 来生成
  134 +{
  135 + "0":{
  136 + "restart": 1,
  137 + "firmwareVersion": 1.2,
  138 + "longitude": 0,
  139 + "latitude": 0,
  140 + "summary": {
  141 + "name": "wumei-smart",
  142 + "chip": "esp8266",
  143 + "author": "kerwincui",
  144 + "version": 1.2,
  145 + "createTime": "2022-06-06"
  146 + }
  147 + },
  148 + "1":{
  149 + "id1":"value1",
  150 + "id2":"value2",
  151 + "id3":"value3"
  152 + },
  153 + "3":{
  154 + "id1":"value1",
  155 + "id2":"value2",
  156 + "id3":"value3"
  157 + },
  158 + "4":{
  159 + "id1":"value1",
  160 + "id2":"value2",
  161 + "id3":"value3"
  162 + }
  163 +}
  164 +
  165 +3、订阅主题 READ/+,上位机读操作
  166 +# 描述: 0表示主机,1 其它接入设备(开关"10_XXX"或者传感器(1_XXX))的编号,id1 指定属性或者配置
  167 +{
  168 + "1":"id1,id2,id3",
  169 + "0":"id1,id2,id3"
  170 +}
1 -package com.zhonglai.luhui.mqtt.agreement;  
2 -  
3 -import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;  
4 -import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;  
5 -import com.zhonglai.luhui.mqtt.comm.factory.Topic;  
6 -import com.zhonglai.luhui.mqtt.dto.topic.PutReqDto;  
7 -import org.springframework.stereotype.Service;  
8 -  
9 -@Service("PUT_REQ")  
10 -public class PutReqAgreement implements BusinessAgreement<PutReqDto> {  
11 - @Override  
12 - public ServerDto analysis(Topic topic, PutReqDto putDto) throws Exception {  
13 - putDto.setMessageid(topic.getMessageid());  
14 - putDto.setCode(1);  
15 - return putDto;  
16 - }  
17 -  
18 - @Override  
19 - public PutReqDto toData(byte[] bytes) {  
20 - String str = new String(bytes);  
21 - return new PutReqDto().setData(str);  
22 - }  
23 -}  
1 package com.zhonglai.luhui.mqtt.comm.agreement; 1 package com.zhonglai.luhui.mqtt.comm.agreement;
2 2
  3 +import com.alibaba.fastjson.JSONObject;
  4 +import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
3 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement; 5 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
4 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreementFactory; 6 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreementFactory;
  7 +import com.zhonglai.luhui.mqtt.comm.factory.Topic;
  8 +import com.zhonglai.luhui.mqtt.comm.util.StringUtils;
5 import org.springframework.beans.factory.annotation.Autowired; 9 import org.springframework.beans.factory.annotation.Autowired;
6 import org.springframework.stereotype.Component; 10 import org.springframework.stereotype.Component;
7 11
@@ -16,12 +20,21 @@ public class BusinessAgreementFactoryImpl implements BusinessAgreementFactory { @@ -16,12 +20,21 @@ public class BusinessAgreementFactoryImpl implements BusinessAgreementFactory {
16 private Map<String, BusinessAgreement> businessAgreementMap; 20 private Map<String, BusinessAgreement> businessAgreementMap;
17 21
18 @Override 22 @Override
19 - public BusinessAgreement createBusinessAgreement(String topicType) {  
20 - BusinessAgreement businessAgreement = businessAgreementMap.get(topicType.toUpperCase()); 23 + public BusinessAgreement createBusinessAgreement(Topic topic, BusinessDto businessDto) {
  24 + BusinessAgreement businessAgreement = businessAgreementMap.get(topic.getTopicType().toUpperCase());
21 if(null == businessAgreement) //没有找到就用默认的 25 if(null == businessAgreement) //没有找到就用默认的
22 { 26 {
23 businessAgreementMap.get("DEFAULT"); 27 businessAgreementMap.get("DEFAULT");
24 } 28 }
25 return businessAgreement; 29 return businessAgreement;
26 } 30 }
  31 +
  32 + public void toBusinessDto(String payloadtype,byte[] data)
  33 + {
  34 + if(StringUtils.isBlank(payloadtype))
  35 + {
  36 + payloadtype = "String";
  37 + }
  38 +
  39 + }
27 } 40 }
@@ -18,6 +18,10 @@ import org.springframework.data.redis.serializer.StringRedisSerializer; @@ -18,6 +18,10 @@ import org.springframework.data.redis.serializer.StringRedisSerializer;
18 public class RedisConfig { 18 public class RedisConfig {
19 @Value("${sys.redis.field:#{'luhui:mqttservice:device:'}") 19 @Value("${sys.redis.field:#{'luhui:mqttservice:device:'}")
20 public static String FIELD ; //域 20 public static String FIELD ; //域
  21 + public static String DEVICE = "device:"; //存放网关数据
  22 + public static String THINGS_MODEL = "things_model:"; //存放数据模型
  23 + public static String TERMINAL = "terminal:"; //存放终端数据
  24 +
21 25
22 @Bean 26 @Bean
23 public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){ 27 public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
  1 +package com.zhonglai.luhui.mqtt.comm.dto.business;
  2 +
  3 +import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
  4 +
  5 +public interface BusinessDto {
  6 + BusinessDto analyticalModel(String modeData);
  7 + ServerDto toServerDto();
  8 +}
  1 +package com.zhonglai.luhui.mqtt.comm.dto.business;
  2 +
  3 +public class BusinessDtoClassNew {
  4 + public static BusinessDto newBean(String payloadtype,byte[] data )
  5 + {
  6 + switch (payloadtype)
  7 + {
  8 + case "String":
  9 + return new StringBusinessDto(data);
  10 + case "Json":
  11 + return new JsonBusinessDto(data);
  12 + case "Byte":
  13 + return new ByteBusinessDto(data);
  14 + default:
  15 + return new StringBusinessDto(data);
  16 + }
  17 + }
  18 +}
  1 +package com.zhonglai.luhui.mqtt.comm.dto.business;
  2 +
  3 +import com.alibaba.fastjson.JSONObject;
  4 +import lombok.Data;
  5 +import org.apache.commons.lang3.ArrayUtils;
  6 +
  7 +/**
  8 + * 字节payload内容协议
  9 + */
  10 +@Data
  11 +public class ByteBusinessDto implements BusinessDto{
  12 + private int functionCode; //功能码
  13 + private int verification; //校验码
  14 + private byte[] data; //数据
  15 +
  16 + private byte[] srcData; // 原始数据
  17 +
  18 + public ByteBusinessDto(byte[] data )
  19 + {
  20 + srcData = data;
  21 + }
  22 +
  23 + @Override
  24 + public BusinessDto analyticalModel(String modeData) {
  25 + Mode mode = JSONObject.parseObject(modeData,Mode.class);
  26 + this.functionCode = srcData[mode.functionCodeAdr];
  27 + this.verification = srcData[mode.verificationAdr];
  28 + this.data = ArrayUtils.subarray(srcData,mode.dataLeftAdr,srcData.length-mode.datarRightAdr);
  29 + return this;
  30 + }
  31 +
  32 + @Data
  33 + class Mode
  34 + {
  35 + int dataLeftAdr; //数据左起地址
  36 + int datarRightAdr; //数据右起地址
  37 + int functionCodeAdr; //功能码地址
  38 + int verificationAdr; //校验码地址
  39 + }
  40 +}
  1 +package com.zhonglai.luhui.mqtt.comm.dto.business;
  2 +
  3 +import com.alibaba.fastjson.JSONObject;
  4 +import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
  5 +import lombok.Data;
  6 +
  7 +import java.io.UnsupportedEncodingException;
  8 +
  9 +/**
  10 + * json payload内容协议
  11 + */
  12 +@Data
  13 +public class JsonBusinessDto implements BusinessDto{
  14 + private byte[] srcData; // 原始数据
  15 +
  16 + private JSONObject data;
  17 + public JsonBusinessDto(byte[] data )
  18 + {
  19 + srcData = data;
  20 + }
  21 +
  22 + @Override
  23 + public BusinessDto analyticalModel(String modeData) {
  24 + try {
  25 + this.data = JSONObject.parseObject(new String(srcData,"UTF-8"),JSONObject.class);
  26 + } catch (UnsupportedEncodingException e) {
  27 + e.printStackTrace();
  28 + }
  29 + return this;
  30 + }
  31 +
  32 + @Override
  33 + public ServerDto toServerDto() {
  34 +
  35 + return null;
  36 + }
  37 +}
  1 +package com.zhonglai.luhui.mqtt.comm.dto.business;
  2 +
  3 +import lombok.Data;
  4 +
  5 +/**
  6 + * 字符串payload内容协议
  7 + */
  8 +@Data
  9 +public class StringBusinessDto implements BusinessDto{
  10 + private byte[] srcData; // 原始数据
  11 + public StringBusinessDto(byte[] data )
  12 + {
  13 + srcData = data;
  14 + }
  15 +
  16 + @Override
  17 + public BusinessDto analyticalModel(String modeData) {
  18 + return this;
  19 + }
  20 +}
@@ -106,6 +106,17 @@ public class IotDevice @@ -106,6 +106,17 @@ public class IotDevice
106 106
107 @ApiModelProperty("负载类型(String,Json,Bite16,Bite32)") 107 @ApiModelProperty("负载类型(String,Json,Bite16,Bite32)")
108 private String payload_type; 108 private String payload_type;
  109 + @ApiModelProperty("payload 协议模型")
  110 +
  111 + private String business_model; //payload 协议模型
  112 +
  113 + public String getBusiness_model() {
  114 + return business_model;
  115 + }
  116 +
  117 + public void setBusiness_model(String business_model) {
  118 + this.business_model = business_model;
  119 + }
109 120
110 public String getPayload_type() { 121 public String getPayload_type() {
111 return payload_type; 122 return payload_type;
@@ -8,5 +8,4 @@ import com.zhonglai.luhui.mqtt.comm.dto.ServerDto; @@ -8,5 +8,4 @@ import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
8 */ 8 */
9 public interface BusinessAgreement<T> { 9 public interface BusinessAgreement<T> {
10 ServerDto analysis(Topic topic, T data) throws Exception; //解析协议 10 ServerDto analysis(Topic topic, T data) throws Exception; //解析协议
11 - T toData(byte[] data);  
12 } 11 }
@@ -4,5 +4,5 @@ package com.zhonglai.luhui.mqtt.comm.factory; @@ -4,5 +4,5 @@ package com.zhonglai.luhui.mqtt.comm.factory;
4 * mqtt业务协议工厂 4 * mqtt业务协议工厂
5 */ 5 */
6 public interface BusinessAgreementFactory { 6 public interface BusinessAgreementFactory {
7 - BusinessAgreement createBusinessAgreement(String topicType); 7 + BusinessAgreement createBusinessAgreement(Topic topic );
8 } 8 }
1 package com.zhonglai.luhui.mqtt.comm.service; 1 package com.zhonglai.luhui.mqtt.comm.service;
2 2
3 import com.zhonglai.luhui.mqtt.comm.dto.ServerDto; 3 import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
  4 +import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
  5 +import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDtoClassNew;
  6 +import com.zhonglai.luhui.mqtt.comm.dto.iot.IotDevice;
4 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement; 7 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
5 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreementFactory; 8 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreementFactory;
6 import com.zhonglai.luhui.mqtt.comm.factory.Topic; 9 import com.zhonglai.luhui.mqtt.comm.factory.Topic;
7 import com.zhonglai.luhui.mqtt.comm.util.ByteUtil; 10 import com.zhonglai.luhui.mqtt.comm.util.ByteUtil;
  11 +import com.zhonglai.luhui.mqtt.service.db.DeviceService;
8 import lombok.SneakyThrows; 12 import lombok.SneakyThrows;
9 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 13 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
10 import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; 14 import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
@@ -30,6 +34,9 @@ public class MqttCallback implements MqttCallbackExtended { @@ -30,6 +34,9 @@ public class MqttCallback implements MqttCallbackExtended {
30 @Autowired 34 @Autowired
31 private CacheService cacheService; //数据缓存 35 private CacheService cacheService; //数据缓存
32 36
  37 + @Autowired
  38 + private DeviceService deviceService ;
  39 +
33 @SneakyThrows 40 @SneakyThrows
34 @Override 41 @Override
35 public void connectComplete(boolean b, String s) { 42 public void connectComplete(boolean b, String s) {
@@ -41,7 +48,7 @@ public class MqttCallback implements MqttCallbackExtended { @@ -41,7 +48,7 @@ public class MqttCallback implements MqttCallbackExtended {
41 @Override 48 @Override
42 public void connectionLost(Throwable throwable) { 49 public void connectionLost(Throwable throwable) {
43 //连接丢失 50 //连接丢失
44 - log.info("连接丢失"); 51 + log.error("连接丢失",throwable);
45 52
46 } 53 }
47 54
@@ -50,12 +57,18 @@ public class MqttCallback implements MqttCallbackExtended { @@ -50,12 +57,18 @@ public class MqttCallback implements MqttCallbackExtended {
50 //接收到消息 57 //接收到消息
51 log.info("接收到消息topc:{}, mqttMessage {},payload 十六进制 {}",s,mqttMessage, ByteUtil.hexStringToSpace(ByteUtil.toHexString(mqttMessage.getPayload()))); 58 log.info("接收到消息topc:{}, mqttMessage {},payload 十六进制 {}",s,mqttMessage, ByteUtil.hexStringToSpace(ByteUtil.toHexString(mqttMessage.getPayload())));
52 Topic topic = new Topic(s); 59 Topic topic = new Topic(s);
53 -  
54 - BusinessAgreement businessAgreement = businessAgreementFactory.createBusinessAgreement(topic.getTopicType());  
55 - try { 60 + if(null == topic)
  61 + {
  62 + log.error("消息{},topic为空,不做解析");
  63 + return;
  64 + }
56 //解析协议 65 //解析协议
57 byte[] data = mqttMessage.getPayload(); 66 byte[] data = mqttMessage.getPayload();
58 - ServerDto dto = businessAgreement.analysis(topic,businessAgreement.toData(data)); 67 + BusinessAgreement businessAgreement = businessAgreementFactory.createBusinessAgreement(topic);
  68 + try {
  69 + IotDevice iotDevice = deviceService.getDeviceById(topic.getClientid());
  70 + BusinessDto businessDto = BusinessDtoClassNew.newBean(topic.getPayloadtype(),data).analyticalModel(iotDevice.getBusiness_model());
  71 + ServerDto dto = businessAgreement.analysis(topic,businessDto.toServerDto());
59 if(null == dto) 72 if(null == dto)
60 { 73 {
61 return; 74 return;
1 package com.zhonglai.luhui.mqtt.controller; 1 package com.zhonglai.luhui.mqtt.controller;
2 2
  3 +import com.alibaba.fastjson.JSONObject;
3 import com.zhonglai.luhui.mqtt.comm.service.ClienNoticeService; 4 import com.zhonglai.luhui.mqtt.comm.service.ClienNoticeService;
4 import com.zhonglai.luhui.mqtt.comm.util.DateUtils; 5 import com.zhonglai.luhui.mqtt.comm.util.DateUtils;
5 import com.zhonglai.luhui.mqtt.dto.Message; 6 import com.zhonglai.luhui.mqtt.dto.Message;
@@ -11,6 +12,8 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; @@ -11,6 +12,8 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
11 import org.springframework.beans.factory.annotation.Autowired; 12 import org.springframework.beans.factory.annotation.Autowired;
12 import org.springframework.web.bind.annotation.*; 13 import org.springframework.web.bind.annotation.*;
13 14
  15 +import java.util.Map;
  16 +
14 @Api(tags = "设备操作") 17 @Api(tags = "设备操作")
15 @RestController 18 @RestController
16 @RequestMapping("/device") 19 @RequestMapping("/device")
@@ -18,18 +21,25 @@ public class DeviceController { @@ -18,18 +21,25 @@ public class DeviceController {
18 @Autowired 21 @Autowired
19 private ClienNoticeService clienNoticeService; 22 private ClienNoticeService clienNoticeService;
20 23
21 - @ApiOperation("控制")  
22 - @RequestMapping(value = "control/{clienid}",method = RequestMethod.POST)  
23 - public Message control(@PathVariable String clienid, @RequestBody PutDto putDto) throws MqttException, InterruptedException { 24 + @ApiOperation("控制发16进制指令")
  25 + @RequestMapping(value = "controlHex/{clienid}",method = RequestMethod.POST)
  26 + public Message controlHex(@PathVariable String clienid, String data) throws MqttException, InterruptedException {
24 MqttMessage mqttMessage = new MqttMessage(); 27 MqttMessage mqttMessage = new MqttMessage();
25 - byte[] bs = hexStringToByte(putDto.getData().trim().toUpperCase());  
26 -// ByteBuf message = Unpooled.buffer(bs.length);  
27 -// message.writeBytes(bs); 28 + byte[] bs = hexStringToByte(data.trim().toUpperCase());
28 mqttMessage.setPayload(bs); 29 mqttMessage.setPayload(bs);
29 Message message = clienNoticeService.sendMessage(clienid,mqttMessage, DateUtils.getNowTimeMilly()+""); 30 Message message = clienNoticeService.sendMessage(clienid,mqttMessage, DateUtils.getNowTimeMilly()+"");
30 return message; 31 return message;
31 } 32 }
32 33
  34 + @ApiOperation("控制发json")
  35 + @RequestMapping(value = "control/{clienid}",method = RequestMethod.POST)
  36 + public Message control(@PathVariable String clienid, @RequestBody String map) throws MqttException, InterruptedException {
  37 + MqttMessage mqttMessage = new MqttMessage();
  38 + mqttMessage.setPayload(map.trim().getBytes());
  39 + Message message = clienNoticeService.sendMessage(clienid,mqttMessage, DateUtils.getNowTimeMilly()+"");
  40 + return message;
  41 + }
  42 +
33 /** 43 /**
34 * 把16进制字符串转换成字节数组 44 * 把16进制字符串转换成字节数组
35 * 45 *
1 package com.zhonglai.luhui.mqtt.dto.topic; 1 package com.zhonglai.luhui.mqtt.dto.topic;
2 2
  3 +import com.alibaba.fastjson.JSONObject;
3 import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent; 4 import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
4 import com.zhonglai.luhui.mqtt.comm.dto.ServerDto; 5 import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
5 import lombok.Data; 6 import lombok.Data;
@@ -11,7 +12,7 @@ import lombok.experimental.Accessors; @@ -11,7 +12,7 @@ import lombok.experimental.Accessors;
11 @Data 12 @Data
12 @Accessors(chain = true) 13 @Accessors(chain = true)
13 public class PutDto implements ServerDto { 14 public class PutDto implements ServerDto {
14 - private String data; 15 + private Object data;
15 16
16 @Override 17 @Override
17 public ServerAgreementContent getServerAgreementContent() { 18 public ServerAgreementContent getServerAgreementContent() {
@@ -12,6 +12,6 @@ public class TopicsServiceImpl implements TopicsService { @@ -12,6 +12,6 @@ public class TopicsServiceImpl implements TopicsService {
12 12
13 @Override 13 @Override
14 public String getTopicFromImei(String imei, String messageid) { 14 public String getTopicFromImei(String imei, String messageid) {
15 - return "/2/2/X6/"+imei+"/PUT"; 15 + return "/2/6_W/"+imei+"/Json/PUT";
16 } 16 }
17 } 17 }
  1 +package com.zhonglai.luhui.mqtt.service.db;
  2 +
  3 +import com.alibaba.fastjson.JSONObject;
  4 +import com.zhonglai.luhui.mqtt.comm.config.RedisConfig;
  5 +import com.zhonglai.luhui.mqtt.comm.dao.BaseDao;
  6 +import com.zhonglai.luhui.mqtt.comm.dto.iot.IotDevice;
  7 +import com.zhonglai.luhui.mqtt.comm.dto.iot.IotTerminal;
  8 +import com.zhonglai.luhui.mqtt.comm.dto.iot.IotThingsModel;
  9 +import com.zhonglai.luhui.mqtt.comm.service.RedisService;
  10 +import com.zhonglai.luhui.mqtt.comm.util.DateUtils;
  11 +import org.apache.commons.lang3.StringUtils;
  12 +import org.springframework.beans.factory.annotation.Autowired;
  13 +import org.springframework.stereotype.Service;
  14 +
  15 +import java.util.List;
  16 +
  17 +@Service
  18 +public class DeviceService {
  19 + @Autowired
  20 + private RedisService redisService ;
  21 + private BaseDao baseDao = new BaseDao();
  22 +
  23 + /**
  24 + * 根据id获取终端
  25 + * @param id
  26 + * @return
  27 + */
  28 + public IotTerminal getTerminalById(String id)
  29 + {
  30 + return (IotTerminal) baseDao.get(IotTerminal.class,"id='"+id+"'","`mqtt_broker`.`iot_terminal`");
  31 + }
  32 + /**
  33 + * 根据id获取终端lieb
  34 + * @param ids
  35 + * @return
  36 + */
  37 + public List<IotTerminal> getTerminalListByIds(String ids)
  38 + {
  39 + List<IotTerminal> list = baseDao.findBysql("select * from `mqtt_broker`.`iot_terminal` where id in("+ids+")", IotTerminal.class);
  40 + return list;
  41 + }
  42 +
  43 + /**
  44 + * 根据id获取网关
  45 + * @param id
  46 + * @return
  47 + */
  48 + public IotDevice getDeviceById(String id)
  49 + {
  50 + return (IotDevice) baseDao.get(IotDevice.class,"client_id='"+id+"'","`mqtt_broker`.`iot_device`");
  51 + }
  52 +
  53 + /**
  54 + * 全量更新终端模型数据
  55 + * @param JSONObject
  56 + * @param id
  57 + */
  58 + public void fullUpdateTerminal(JSONObject JSONObject,String id)
  59 + {
  60 + IotTerminal terminal = new IotTerminal();
  61 + terminal.setThings_model_value(JSONObject.toString());
  62 + terminal.setId(id);
  63 + terminal.setUpdate_time(DateUtils.getNowTimeMilly());
  64 + baseDao.update(terminal,"id");
  65 + }
  66 +
  67 + /**
  68 + * 全量更新网关的模型数据
  69 + * @param JSONObject
  70 + * @param id
  71 + */
  72 + public void fullUpdateDevice(JSONObject JSONObject,String id)
  73 + {
  74 + IotTerminal terminal = new IotTerminal();
  75 + terminal.setThings_model_value(JSONObject.toString());
  76 + terminal.setId(id);
  77 + terminal.setUpdate_time(DateUtils.getNowTimeMilly());
  78 + baseDao.update(terminal,"id");
  79 + }
  80 +
  81 + /**
  82 + * 获取缓存网关信息
  83 + * @param id
  84 + * @return
  85 + */
  86 + public IotDevice getRedicDevice(String id)
  87 + {
  88 + Object object = redisService.get(RedisConfig.FIELD+RedisConfig.DEVICE+id);
  89 + if(null != object)
  90 + {
  91 + return (IotDevice)object;
  92 + }
  93 + return null;
  94 + }
  95 +
  96 + public void updataDevice(IotDevice device)
  97 + {
  98 + setRedicDevice(device);
  99 + baseDao.saveOrUpdateObject(device);
  100 + }
  101 +
  102 + /**
  103 + * 设置缓存网关信息
  104 + * @param device
  105 + * @return
  106 + */
  107 + private boolean setRedicDevice(IotDevice device)
  108 + {
  109 + return redisService.setexDevice(RedisConfig.FIELD+RedisConfig.DEVICE+device.getClient_id(),device);
  110 + }
  111 +
  112 + /**
  113 + * 获取缓存终端信息
  114 + * @param id
  115 + * @return
  116 + */
  117 + public IotTerminal getRedicTerminal(String id)
  118 + {
  119 + Object object = redisService.get(RedisConfig.FIELD+RedisConfig.TERMINAL+id);
  120 + if(null == object)
  121 + {
  122 + IotTerminal terminal = getTerminalById(id);
  123 +
  124 + IotTerminal saveTerminal = new IotTerminal();
  125 + saveTerminal.setId(id);
  126 + if(null == terminal)
  127 + {
  128 + baseDao.saveOrUpdateObject(saveTerminal);
  129 + }else{
  130 + saveTerminal.setThings_model_value(terminal.getThings_model_value());
  131 + }
  132 + return saveTerminal;
  133 + }
  134 + return (IotTerminal)object;
  135 + }
  136 + /**
  137 + * 设置缓存终端信息
  138 + * @param terminal
  139 + * @return
  140 + */
  141 + private boolean setRedicTerminal(IotTerminal terminal)
  142 + {
  143 + return redisService.setexDevice(RedisConfig.FIELD+RedisConfig.TERMINAL+terminal.getId(),terminal);
  144 + }
  145 + public void updataTerminal(IotTerminal terminal)
  146 + {
  147 + setRedicTerminal(terminal);
  148 + baseDao.saveOrUpdateObject(terminal);
  149 + }
  150 + /**
  151 + * 获取
  152 + * @param userId
  153 + * @return
  154 + */
  155 + public IotThingsModel getThingsModelsByUserIdAndIdentifier(Integer userId, String identifier)
  156 + {
  157 + Object object = redisService.hget(RedisConfig.FIELD+RedisConfig.THINGS_MODEL+userId,identifier);
  158 + return null != object?(IotThingsModel)object:null;
  159 + }
  160 +
  161 +
  162 + public JSONObject getNewdate(String oldstr, JSONObject saveJson)
  163 + {
  164 + JSONObject oldjs = new JSONObject();
  165 + if(StringUtils.isNoneBlank(oldstr))
  166 + {
  167 + oldjs = JSONObject.parseObject(oldstr);
  168 + }
  169 + for (String sk:saveJson.keySet())
  170 + {
  171 + oldjs.put(sk,saveJson.get(sk));
  172 + }
  173 + return oldjs;
  174 + }
  175 +}
@@ -13,14 +13,10 @@ import org.springframework.stereotype.Service; @@ -13,14 +13,10 @@ import org.springframework.stereotype.Service;
13 @Service("ADD_POST") 13 @Service("ADD_POST")
14 public class AddPostTopic implements BusinessAgreement<AddPostDto> { 14 public class AddPostTopic implements BusinessAgreement<AddPostDto> {
15 15
16 -  
17 @Override 16 @Override
18 public ServerDto analysis(Topic topic, AddPostDto data) throws Exception { 17 public ServerDto analysis(Topic topic, AddPostDto data) throws Exception {
19 - return null;  
20 - }  
21 18
22 - @Override  
23 - public AddPostDto toData(byte[] data) {  
24 return null; 19 return null;
25 } 20 }
  21 +
26 } 22 }
1 package com.zhonglai.luhui.mqtt.service.topic; 1 package com.zhonglai.luhui.mqtt.service.topic;
2 2
3 import com.zhonglai.luhui.mqtt.comm.dto.ServerDto; 3 import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
  4 +import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
4 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement; 5 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
5 import com.zhonglai.luhui.mqtt.comm.factory.Topic; 6 import com.zhonglai.luhui.mqtt.comm.factory.Topic;
6 import com.zhonglai.luhui.mqtt.dto.topic.AllPostDto; 7 import com.zhonglai.luhui.mqtt.dto.topic.AllPostDto;
@@ -17,7 +18,7 @@ public class AllPostTopic implements BusinessAgreement<AllPostDto> { @@ -17,7 +18,7 @@ public class AllPostTopic implements BusinessAgreement<AllPostDto> {
17 } 18 }
18 19
19 @Override 20 @Override
20 - public AllPostDto toData(byte[] data) { 21 + public AllPostDto toData(BusinessDto data) {
21 return null; 22 return null;
22 } 23 }
23 } 24 }
1 package com.zhonglai.luhui.mqtt.service.topic; 1 package com.zhonglai.luhui.mqtt.service.topic;
2 2
3 import com.zhonglai.luhui.mqtt.comm.dto.ServerDto; 3 import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
  4 +import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
4 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement; 5 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
5 import com.zhonglai.luhui.mqtt.comm.factory.Topic; 6 import com.zhonglai.luhui.mqtt.comm.factory.Topic;
6 import com.zhonglai.luhui.mqtt.dto.topic.DbDistributeDto; 7 import com.zhonglai.luhui.mqtt.dto.topic.DbDistributeDto;
@@ -17,7 +18,7 @@ public class DbDistributeTopic implements BusinessAgreement<DbDistributeDto> { @@ -17,7 +18,7 @@ public class DbDistributeTopic implements BusinessAgreement<DbDistributeDto> {
17 } 18 }
18 19
19 @Override 20 @Override
20 - public DbDistributeDto toData(byte[] data) { 21 + public DbDistributeDto toData(BusinessDto data) {
21 return null; 22 return null;
22 } 23 }
23 } 24 }
1 -package com.zhonglai.luhui.mqtt.service.topic;  
2 -  
3 -import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;  
4 -import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;  
5 -import com.zhonglai.luhui.mqtt.comm.factory.Topic;  
6 -import com.zhonglai.luhui.mqtt.dto.topic.GetReqDto;  
7 -import org.springframework.stereotype.Service;  
8 -  
9 -/**  
10 - * 获取数据的返回结果  
11 - */  
12 -@Service("GET_REQ")  
13 -public class GetReqTopic implements BusinessAgreement<GetReqDto> {  
14 - @Override  
15 - public ServerDto analysis(Topic topic, GetReqDto data) throws Exception {  
16 - return null;  
17 - }  
18 -  
19 - @Override  
20 - public GetReqDto toData(byte[] data) {  
21 - return null;  
22 - }  
23 -}  
1 package com.zhonglai.luhui.mqtt.service.topic; 1 package com.zhonglai.luhui.mqtt.service.topic;
2 2
3 import com.zhonglai.luhui.mqtt.comm.dto.ServerDto; 3 import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
  4 +import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
4 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement; 5 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
5 import com.zhonglai.luhui.mqtt.comm.factory.Topic; 6 import com.zhonglai.luhui.mqtt.comm.factory.Topic;
6 import com.zhonglai.luhui.mqtt.dto.topic.GetDto; 7 import com.zhonglai.luhui.mqtt.dto.topic.GetDto;
@@ -17,7 +18,7 @@ public class GetTopic implements BusinessAgreement<GetDto> { @@ -17,7 +18,7 @@ public class GetTopic implements BusinessAgreement<GetDto> {
17 } 18 }
18 19
19 @Override 20 @Override
20 - public GetDto toData(byte[] data) { 21 + public GetDto toData(BusinessDto data) {
21 return null; 22 return null;
22 } 23 }
23 } 24 }
1 package com.zhonglai.luhui.mqtt.service.topic; 1 package com.zhonglai.luhui.mqtt.service.topic;
2 2
3 import com.zhonglai.luhui.mqtt.comm.dto.ServerDto; 3 import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
  4 +import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
4 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement; 5 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
5 import com.zhonglai.luhui.mqtt.comm.factory.Topic; 6 import com.zhonglai.luhui.mqtt.comm.factory.Topic;
6 import com.zhonglai.luhui.mqtt.dto.topic.OnlineDto; 7 import com.zhonglai.luhui.mqtt.dto.topic.OnlineDto;
@@ -15,7 +16,7 @@ public class OnlineTopic implements BusinessAgreement<OnlineDto> { @@ -15,7 +16,7 @@ public class OnlineTopic implements BusinessAgreement<OnlineDto> {
15 } 16 }
16 17
17 @Override 18 @Override
18 - public OnlineDto toData(byte[] data) { 19 + public OnlineDto toData(BusinessDto data) {
19 return null; 20 return null;
20 } 21 }
21 } 22 }
1 package com.zhonglai.luhui.mqtt.service.topic; 1 package com.zhonglai.luhui.mqtt.service.topic;
2 2
3 import com.zhonglai.luhui.mqtt.comm.dto.ServerDto; 3 import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
  4 +import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
4 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement; 5 import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
5 import com.zhonglai.luhui.mqtt.comm.factory.Topic; 6 import com.zhonglai.luhui.mqtt.comm.factory.Topic;
6 import com.zhonglai.luhui.mqtt.dto.topic.PutReqDto; 7 import com.zhonglai.luhui.mqtt.dto.topic.PutReqDto;
@@ -17,7 +18,7 @@ public class PutReqTopic implements BusinessAgreement<PutReqDto> { @@ -17,7 +18,7 @@ public class PutReqTopic implements BusinessAgreement<PutReqDto> {
17 } 18 }
18 19
19 @Override 20 @Override
20 - public PutReqDto toData(byte[] data) { 21 + public PutReqDto toData(BusinessDto data) {
21 return null; 22 return null;
22 } 23 }
23 } 24 }
1 -package com.zhonglai.luhui.mqtt.service.topic;  
2 -  
3 -import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;  
4 -import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;  
5 -import com.zhonglai.luhui.mqtt.comm.factory.Topic;  
6 -import com.zhonglai.luhui.mqtt.dto.topic.PutDto;  
7 -import org.springframework.stereotype.Service;  
8 -  
9 -/**  
10 - * 更新数据,需要返回执行结果  
11 - */  
12 -@Service("PUT")  
13 -public class PutTopic implements BusinessAgreement<PutDto> {  
14 -  
15 - @Override  
16 - public ServerDto analysis(Topic topic, PutDto data) throws Exception {  
17 - return null;  
18 - }  
19 -  
20 - @Override  
21 - public PutDto toData(byte[] data) {  
22 - return null;  
23 - }  
24 -}  
@@ -44,7 +44,7 @@ mqtt: @@ -44,7 +44,7 @@ mqtt:
44 #唯一标识 44 #唯一标识
45 clientId: lh-mqtt-service-001 45 clientId: lh-mqtt-service-001
46 #订阅的topic 46 #订阅的topic
47 - topics: "/2/#" 47 + topics: "/+/+/+/+/ADD_POST,/+/+/+/+/ALL_POST,/+/+/+/+/DB_TOPIC_DISTRIBUTE,/+/+/+/+/GET,/+/+/+/online,/+/+/+/+/PUT_REQ"
48 topicconfig: "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}" 48 topicconfig: "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}"
49 username: sysuser 49 username: sysuser
50 password: "!@#1qaz" 50 password: "!@#1qaz"