作者 钟来

Merge remote-tracking branch 'origin/master'

正在显示 26 个修改的文件 包含 549 行增加99 行删除
## /{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/ 由代理服务器做权限控制,终端不需要传递
## 终端订阅
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/PUT/+ 写数据,需要返回执行结果
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/GET_REQ/+ 获取数据的返回结果
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/READ/+ 读数据,需要返回执行结果
## 终端发布
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/ALL_POST 全量上报数据,不需要返回
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/ADD_POST 增量上报数据,不需要返回
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/PUT_REQ/+ 写数据的执行结果
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/GET/+ 获取数据
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/READ_REQ/+ 读数据的执行结果
##代理通知
/${{roleid}}/${{username}}/${{clientid}}/online 上下线通知
##下位机发布
1、发布设备信息,对应主题:ALL_POST 或者 ADD_POST
# 描述:设备上电后发布主机信息
# rssi 设备强度(信号极好[-55— 0],信号好[-70— -55],信号一般[-85— -70],信号差[-100— -85])
# status 设备状态,固定为3,表示在线
# userId 用户的ID,1=admin
# firmwareVersion 固件版本
# longitude 经度,可选,使用设备定位时需要上传
# latitude 纬度,可选,使用设备定位时需要上传
# summary 摘要,可选,设备的配置信息等,json格式,对象可自定义
{
"0":{
"rssi": -43,
"firmwareVersion": 1.2,
"longitude": 0,
"latitude": 0,
"summary": {
"name": "wumei-smart",
"chip": "esp8266",
"author": "kerwincui",
"version": 1.2,
"createTime": "2022-06-06"
}
}
}
2、发布实时监测,对应主题:ALL_POST 或者 ADD_POST
# 描述:定时提交数据,分全量提交和增量提交,全量提交会替换传输的指标同时删除没有传输的指标,增量提交只会替换传输的指标
# id 标识符,物模型中的属性,产品详情中查看标识符,对应id值
# value 属性对应的值
{
"1":{
"id1":"value1",
"id2":"value2",
"id3":"value3"
},
"3":{
"id1":"value1",
"id2":"value2",
"id3":"value3"
},
"4":{
"id1":"value1",
"id2":"value2",
"id3":"value3"
}
}
3、发布获取设备信息,对应主题: GET/+
# 描述: 0表示主机,1 其它接入设备(开关"10_XXX"或者传感器(1_XXX))的编号,id1 指定属性或者配置
{
"1":"id1,id2,id3",
"0":"id1,id2,id3"
}
4、发布主题PUT/+的回复主题PUT_REQ/+
# 描述:上位机发布PUT/+主题远程操作下位机,下位机要应答回复,+ 代表messageid必须唯一,推荐使用时间戳单位s,下位机回复messageid和PUT/+主题里的messageid一致不用改变
# code 操作结果(1成功,0失败)
# mgs 失败原因
{
"code":1,
"mgs":"DEVICE_NOT_FIND"
}
5、发布主题 READ/+ 的回复主题 READ_REQ/+
# 描述:上位机发布 READ/+ 主题远程读取下位机,下位机要应答回复,+ 代表messageid必须唯一,推荐使用时间戳单位s,下位机回复 messageid 和 READ/+ 主题里的 messageid 一致不用改变
# code 操作结果(1成功,0失败)
# mgs 失败原因
# data 如果需要返回设备数据,data不为空,数据格式遵循ALL_POST和ADD_POST主题payload的规则
{
"code":1,
"mgs":"DEVICE_NOT_FIND"
"data":{{POST_PAYLOAD}}
}
##下位机订阅
# 描述:firmwareVersion版本号如果不一致表示升级
# restart 1重启,2复位,3恢复出厂值
# longitude,latitude 经纬度
# summary 自定义内容
# 第一层"0"代表主机值固定,"1"代表其它接入设备(开关"10_XXX"或者传感器(1_XXX)),编号可以自定义
1、订阅主题 PUT/+,上位机写操作
{
"0":{
"restart": 1,
"firmwareVersion": 1.2,
"longitude": 0,
"latitude": 0,
"summary": {
"name": "wumei-smart",
"chip": "esp8266",
"author": "kerwincui",
"version": 1.2,
"createTime": "2022-06-06"
}
},
"1":{
"id1":"value1",
"id2":"value2",
"id3":"value3"
},
"3":{
"id1":"value1",
"id2":"value2",
"id3":"value3"
},
"4":{
"id1":"value1",
"id2":"value2",
"id3":"value3"
}
}
2、订阅主题 GET_REQ/+,下位机读取上位机信息时候,上位机返回的消息
# 描述:规则和 PUT/+ 主题一致,返回的数据根据 GET/+ 主题请求的 sensor_number 来生成
{
"0":{
"restart": 1,
"firmwareVersion": 1.2,
"longitude": 0,
"latitude": 0,
"summary": {
"name": "wumei-smart",
"chip": "esp8266",
"author": "kerwincui",
"version": 1.2,
"createTime": "2022-06-06"
}
},
"1":{
"id1":"value1",
"id2":"value2",
"id3":"value3"
},
"3":{
"id1":"value1",
"id2":"value2",
"id3":"value3"
},
"4":{
"id1":"value1",
"id2":"value2",
"id3":"value3"
}
}
3、订阅主题 READ/+,上位机读操作
# 描述: 0表示主机,1 其它接入设备(开关"10_XXX"或者传感器(1_XXX))的编号,id1 指定属性或者配置
{
"1":"id1,id2,id3",
"0":"id1,id2,id3"
}
\ No newline at end of file
... ...
package com.zhonglai.luhui.mqtt.agreement;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.dto.topic.PutReqDto;
import org.springframework.stereotype.Service;
@Service("PUT_REQ")
public class PutReqAgreement implements BusinessAgreement<PutReqDto> {
@Override
public ServerDto analysis(Topic topic, PutReqDto putDto) throws Exception {
putDto.setMessageid(topic.getMessageid());
putDto.setCode(1);
return putDto;
}
@Override
public PutReqDto toData(byte[] bytes) {
String str = new String(bytes);
return new PutReqDto().setData(str);
}
}
package com.zhonglai.luhui.mqtt.comm.agreement;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreementFactory;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.util.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
... ... @@ -16,12 +20,21 @@ public class BusinessAgreementFactoryImpl implements BusinessAgreementFactory {
private Map<String, BusinessAgreement> businessAgreementMap;
@Override
public BusinessAgreement createBusinessAgreement(String topicType) {
BusinessAgreement businessAgreement = businessAgreementMap.get(topicType.toUpperCase());
public BusinessAgreement createBusinessAgreement(Topic topic, BusinessDto businessDto) {
BusinessAgreement businessAgreement = businessAgreementMap.get(topic.getTopicType().toUpperCase());
if(null == businessAgreement) //没有找到就用默认的
{
businessAgreementMap.get("DEFAULT");
}
return businessAgreement;
}
public void toBusinessDto(String payloadtype,byte[] data)
{
if(StringUtils.isBlank(payloadtype))
{
payloadtype = "String";
}
}
}
... ...
... ... @@ -18,6 +18,10 @@ import org.springframework.data.redis.serializer.StringRedisSerializer;
public class RedisConfig {
@Value("${sys.redis.field:#{'luhui:mqttservice:device:'}")
public static String FIELD ; //域
public static String DEVICE = "device:"; //存放网关数据
public static String THINGS_MODEL = "things_model:"; //存放数据模型
public static String TERMINAL = "terminal:"; //存放终端数据
@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
... ...
package com.zhonglai.luhui.mqtt.comm.dto.business;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
public interface BusinessDto {
BusinessDto analyticalModel(String modeData);
ServerDto toServerDto();
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.business;
public class BusinessDtoClassNew {
public static BusinessDto newBean(String payloadtype,byte[] data )
{
switch (payloadtype)
{
case "String":
return new StringBusinessDto(data);
case "Json":
return new JsonBusinessDto(data);
case "Byte":
return new ByteBusinessDto(data);
default:
return new StringBusinessDto(data);
}
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.business;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import org.apache.commons.lang3.ArrayUtils;
/**
* 字节payload内容协议
*/
@Data
public class ByteBusinessDto implements BusinessDto{
private int functionCode; //功能码
private int verification; //校验码
private byte[] data; //数据
private byte[] srcData; // 原始数据
public ByteBusinessDto(byte[] data )
{
srcData = data;
}
@Override
public BusinessDto analyticalModel(String modeData) {
Mode mode = JSONObject.parseObject(modeData,Mode.class);
this.functionCode = srcData[mode.functionCodeAdr];
this.verification = srcData[mode.verificationAdr];
this.data = ArrayUtils.subarray(srcData,mode.dataLeftAdr,srcData.length-mode.datarRightAdr);
return this;
}
@Data
class Mode
{
int dataLeftAdr; //数据左起地址
int datarRightAdr; //数据右起地址
int functionCodeAdr; //功能码地址
int verificationAdr; //校验码地址
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.business;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import java.io.UnsupportedEncodingException;
/**
* json payload内容协议
*/
@Data
public class JsonBusinessDto implements BusinessDto{
private byte[] srcData; // 原始数据
private JSONObject data;
public JsonBusinessDto(byte[] data )
{
srcData = data;
}
@Override
public BusinessDto analyticalModel(String modeData) {
try {
this.data = JSONObject.parseObject(new String(srcData,"UTF-8"),JSONObject.class);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return this;
}
@Override
public ServerDto toServerDto() {
return null;
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.business;
import lombok.Data;
/**
* 字符串payload内容协议
*/
@Data
public class StringBusinessDto implements BusinessDto{
private byte[] srcData; // 原始数据
public StringBusinessDto(byte[] data )
{
srcData = data;
}
@Override
public BusinessDto analyticalModel(String modeData) {
return this;
}
}
... ...
... ... @@ -106,6 +106,17 @@ public class IotDevice
@ApiModelProperty("负载类型(String,Json,Bite16,Bite32)")
private String payload_type;
@ApiModelProperty("payload 协议模型")
private String business_model; //payload 协议模型
public String getBusiness_model() {
return business_model;
}
public void setBusiness_model(String business_model) {
this.business_model = business_model;
}
public String getPayload_type() {
return payload_type;
... ...
... ... @@ -8,5 +8,4 @@ import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
*/
public interface BusinessAgreement<T> {
ServerDto analysis(Topic topic, T data) throws Exception; //解析协议
T toData(byte[] data);
}
... ...
... ... @@ -4,5 +4,5 @@ package com.zhonglai.luhui.mqtt.comm.factory;
* mqtt业务协议工厂
*/
public interface BusinessAgreementFactory {
BusinessAgreement createBusinessAgreement(String topicType);
BusinessAgreement createBusinessAgreement(Topic topic );
}
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDtoClassNew;
import com.zhonglai.luhui.mqtt.comm.dto.iot.IotDevice;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreementFactory;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.util.ByteUtil;
import com.zhonglai.luhui.mqtt.service.db.DeviceService;
import lombok.SneakyThrows;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
... ... @@ -30,6 +34,9 @@ public class MqttCallback implements MqttCallbackExtended {
@Autowired
private CacheService cacheService; //数据缓存
@Autowired
private DeviceService deviceService ;
@SneakyThrows
@Override
public void connectComplete(boolean b, String s) {
... ... @@ -41,7 +48,7 @@ public class MqttCallback implements MqttCallbackExtended {
@Override
public void connectionLost(Throwable throwable) {
//连接丢失
log.info("连接丢失");
log.error("连接丢失",throwable);
}
... ... @@ -50,12 +57,18 @@ public class MqttCallback implements MqttCallbackExtended {
//接收到消息
log.info("接收到消息topc:{}, mqttMessage {},payload 十六进制 {}",s,mqttMessage, ByteUtil.hexStringToSpace(ByteUtil.toHexString(mqttMessage.getPayload())));
Topic topic = new Topic(s);
BusinessAgreement businessAgreement = businessAgreementFactory.createBusinessAgreement(topic.getTopicType());
if(null == topic)
{
log.error("消息{},topic为空,不做解析");
return;
}
//解析协议
byte[] data = mqttMessage.getPayload();
BusinessAgreement businessAgreement = businessAgreementFactory.createBusinessAgreement(topic);
try {
//解析协议
byte[] data = mqttMessage.getPayload();
ServerDto dto = businessAgreement.analysis(topic,businessAgreement.toData(data));
IotDevice iotDevice = deviceService.getDeviceById(topic.getClientid());
BusinessDto businessDto = BusinessDtoClassNew.newBean(topic.getPayloadtype(),data).analyticalModel(iotDevice.getBusiness_model());
ServerDto dto = businessAgreement.analysis(topic,businessDto.toServerDto());
if(null == dto)
{
return;
... ...
package com.zhonglai.luhui.mqtt.controller;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.service.ClienNoticeService;
import com.zhonglai.luhui.mqtt.comm.util.DateUtils;
import com.zhonglai.luhui.mqtt.dto.Message;
... ... @@ -11,6 +12,8 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
@Api(tags = "设备操作")
@RestController
@RequestMapping("/device")
... ... @@ -18,18 +21,25 @@ public class DeviceController {
@Autowired
private ClienNoticeService clienNoticeService;
@ApiOperation("控制")
@RequestMapping(value = "control/{clienid}",method = RequestMethod.POST)
public Message control(@PathVariable String clienid, @RequestBody PutDto putDto) throws MqttException, InterruptedException {
@ApiOperation("控制发16进制指令")
@RequestMapping(value = "controlHex/{clienid}",method = RequestMethod.POST)
public Message controlHex(@PathVariable String clienid, String data) throws MqttException, InterruptedException {
MqttMessage mqttMessage = new MqttMessage();
byte[] bs = hexStringToByte(putDto.getData().trim().toUpperCase());
// ByteBuf message = Unpooled.buffer(bs.length);
// message.writeBytes(bs);
byte[] bs = hexStringToByte(data.trim().toUpperCase());
mqttMessage.setPayload(bs);
Message message = clienNoticeService.sendMessage(clienid,mqttMessage, DateUtils.getNowTimeMilly()+"");
return message;
}
@ApiOperation("控制发json")
@RequestMapping(value = "control/{clienid}",method = RequestMethod.POST)
public Message control(@PathVariable String clienid, @RequestBody String map) throws MqttException, InterruptedException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(map.trim().getBytes());
Message message = clienNoticeService.sendMessage(clienid,mqttMessage, DateUtils.getNowTimeMilly()+"");
return message;
}
/**
* 把16进制字符串转换成字节数组
*
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
... ... @@ -11,7 +12,7 @@ import lombok.experimental.Accessors;
@Data
@Accessors(chain = true)
public class PutDto implements ServerDto {
private String data;
private Object data;
@Override
public ServerAgreementContent getServerAgreementContent() {
... ...
... ... @@ -12,6 +12,6 @@ public class TopicsServiceImpl implements TopicsService {
@Override
public String getTopicFromImei(String imei, String messageid) {
return "/2/2/X6/"+imei+"/PUT";
return "/2/6_W/"+imei+"/Json/PUT";
}
}
... ...
package com.zhonglai.luhui.mqtt.service.db;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.config.RedisConfig;
import com.zhonglai.luhui.mqtt.comm.dao.BaseDao;
import com.zhonglai.luhui.mqtt.comm.dto.iot.IotDevice;
import com.zhonglai.luhui.mqtt.comm.dto.iot.IotTerminal;
import com.zhonglai.luhui.mqtt.comm.dto.iot.IotThingsModel;
import com.zhonglai.luhui.mqtt.comm.service.RedisService;
import com.zhonglai.luhui.mqtt.comm.util.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class DeviceService {
@Autowired
private RedisService redisService ;
private BaseDao baseDao = new BaseDao();
/**
* 根据id获取终端
* @param id
* @return
*/
public IotTerminal getTerminalById(String id)
{
return (IotTerminal) baseDao.get(IotTerminal.class,"id='"+id+"'","`mqtt_broker`.`iot_terminal`");
}
/**
* 根据id获取终端lieb
* @param ids
* @return
*/
public List<IotTerminal> getTerminalListByIds(String ids)
{
List<IotTerminal> list = baseDao.findBysql("select * from `mqtt_broker`.`iot_terminal` where id in("+ids+")", IotTerminal.class);
return list;
}
/**
* 根据id获取网关
* @param id
* @return
*/
public IotDevice getDeviceById(String id)
{
return (IotDevice) baseDao.get(IotDevice.class,"client_id='"+id+"'","`mqtt_broker`.`iot_device`");
}
/**
* 全量更新终端模型数据
* @param JSONObject
* @param id
*/
public void fullUpdateTerminal(JSONObject JSONObject,String id)
{
IotTerminal terminal = new IotTerminal();
terminal.setThings_model_value(JSONObject.toString());
terminal.setId(id);
terminal.setUpdate_time(DateUtils.getNowTimeMilly());
baseDao.update(terminal,"id");
}
/**
* 全量更新网关的模型数据
* @param JSONObject
* @param id
*/
public void fullUpdateDevice(JSONObject JSONObject,String id)
{
IotTerminal terminal = new IotTerminal();
terminal.setThings_model_value(JSONObject.toString());
terminal.setId(id);
terminal.setUpdate_time(DateUtils.getNowTimeMilly());
baseDao.update(terminal,"id");
}
/**
* 获取缓存网关信息
* @param id
* @return
*/
public IotDevice getRedicDevice(String id)
{
Object object = redisService.get(RedisConfig.FIELD+RedisConfig.DEVICE+id);
if(null != object)
{
return (IotDevice)object;
}
return null;
}
public void updataDevice(IotDevice device)
{
setRedicDevice(device);
baseDao.saveOrUpdateObject(device);
}
/**
* 设置缓存网关信息
* @param device
* @return
*/
private boolean setRedicDevice(IotDevice device)
{
return redisService.setexDevice(RedisConfig.FIELD+RedisConfig.DEVICE+device.getClient_id(),device);
}
/**
* 获取缓存终端信息
* @param id
* @return
*/
public IotTerminal getRedicTerminal(String id)
{
Object object = redisService.get(RedisConfig.FIELD+RedisConfig.TERMINAL+id);
if(null == object)
{
IotTerminal terminal = getTerminalById(id);
IotTerminal saveTerminal = new IotTerminal();
saveTerminal.setId(id);
if(null == terminal)
{
baseDao.saveOrUpdateObject(saveTerminal);
}else{
saveTerminal.setThings_model_value(terminal.getThings_model_value());
}
return saveTerminal;
}
return (IotTerminal)object;
}
/**
* 设置缓存终端信息
* @param terminal
* @return
*/
private boolean setRedicTerminal(IotTerminal terminal)
{
return redisService.setexDevice(RedisConfig.FIELD+RedisConfig.TERMINAL+terminal.getId(),terminal);
}
public void updataTerminal(IotTerminal terminal)
{
setRedicTerminal(terminal);
baseDao.saveOrUpdateObject(terminal);
}
/**
* 获取
* @param userId
* @return
*/
public IotThingsModel getThingsModelsByUserIdAndIdentifier(Integer userId, String identifier)
{
Object object = redisService.hget(RedisConfig.FIELD+RedisConfig.THINGS_MODEL+userId,identifier);
return null != object?(IotThingsModel)object:null;
}
public JSONObject getNewdate(String oldstr, JSONObject saveJson)
{
JSONObject oldjs = new JSONObject();
if(StringUtils.isNoneBlank(oldstr))
{
oldjs = JSONObject.parseObject(oldstr);
}
for (String sk:saveJson.keySet())
{
oldjs.put(sk,saveJson.get(sk));
}
return oldjs;
}
}
... ...
... ... @@ -13,14 +13,10 @@ import org.springframework.stereotype.Service;
@Service("ADD_POST")
public class AddPostTopic implements BusinessAgreement<AddPostDto> {
@Override
public ServerDto analysis(Topic topic, AddPostDto data) throws Exception {
return null;
}
@Override
public AddPostDto toData(byte[] data) {
return null;
}
}
... ...
package com.zhonglai.luhui.mqtt.service.topic;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.dto.topic.AllPostDto;
... ... @@ -17,7 +18,7 @@ public class AllPostTopic implements BusinessAgreement<AllPostDto> {
}
@Override
public AllPostDto toData(byte[] data) {
public AllPostDto toData(BusinessDto data) {
return null;
}
}
... ...
package com.zhonglai.luhui.mqtt.service.topic;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.dto.topic.DbDistributeDto;
... ... @@ -17,7 +18,7 @@ public class DbDistributeTopic implements BusinessAgreement<DbDistributeDto> {
}
@Override
public DbDistributeDto toData(byte[] data) {
public DbDistributeDto toData(BusinessDto data) {
return null;
}
}
... ...
package com.zhonglai.luhui.mqtt.service.topic;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.dto.topic.GetReqDto;
import org.springframework.stereotype.Service;
/**
* 获取数据的返回结果
*/
@Service("GET_REQ")
public class GetReqTopic implements BusinessAgreement<GetReqDto> {
@Override
public ServerDto analysis(Topic topic, GetReqDto data) throws Exception {
return null;
}
@Override
public GetReqDto toData(byte[] data) {
return null;
}
}
package com.zhonglai.luhui.mqtt.service.topic;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.dto.topic.GetDto;
... ... @@ -17,7 +18,7 @@ public class GetTopic implements BusinessAgreement<GetDto> {
}
@Override
public GetDto toData(byte[] data) {
public GetDto toData(BusinessDto data) {
return null;
}
}
... ...
package com.zhonglai.luhui.mqtt.service.topic;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.dto.topic.OnlineDto;
... ... @@ -15,7 +16,7 @@ public class OnlineTopic implements BusinessAgreement<OnlineDto> {
}
@Override
public OnlineDto toData(byte[] data) {
public OnlineDto toData(BusinessDto data) {
return null;
}
}
... ...
package com.zhonglai.luhui.mqtt.service.topic;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.dto.topic.PutReqDto;
... ... @@ -17,7 +18,7 @@ public class PutReqTopic implements BusinessAgreement<PutReqDto> {
}
@Override
public PutReqDto toData(byte[] data) {
public PutReqDto toData(BusinessDto data) {
return null;
}
}
... ...
package com.zhonglai.luhui.mqtt.service.topic;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.dto.topic.PutDto;
import org.springframework.stereotype.Service;
/**
* 更新数据,需要返回执行结果
*/
@Service("PUT")
public class PutTopic implements BusinessAgreement<PutDto> {
@Override
public ServerDto analysis(Topic topic, PutDto data) throws Exception {
return null;
}
@Override
public PutDto toData(byte[] data) {
return null;
}
}
... ... @@ -44,7 +44,7 @@ mqtt:
#唯一标识
clientId: lh-mqtt-service-001
#订阅的topic
topics: "/2/#"
topics: "/+/+/+/+/ADD_POST,/+/+/+/+/ALL_POST,/+/+/+/+/DB_TOPIC_DISTRIBUTE,/+/+/+/+/GET,/+/+/+/online,/+/+/+/+/PUT_REQ"
topicconfig: "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}"
username: sysuser
password: "!@#1qaz"
... ...