作者 钟来

初始提交

正在显示 37 个修改的文件 包含 452 行增加82 行删除
... ... @@ -35,7 +35,11 @@
<groupId>com.zhonglai.luhui</groupId>
<artifactId>ruoyi-generator</artifactId>
</dependency>
<!-- 代码生成模块-->
<dependency>
<groupId>com.zhonglai.luhui</groupId>
<artifactId>lh-mqtt-service</artifactId>
</dependency>
<!-- 文档 -->
<dependency >
<groupId>io.springfox</groupId>
... ...
... ... @@ -3,7 +3,9 @@ package com.zhonglai.luhui.admin.controller.iot;
import java.util.List;
import javax.servlet.http.HttpServletResponse;
import com.ruoyi.common.utils.http.HttpUtils;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -82,7 +84,7 @@ public class IotDeviceController extends BaseController
@ApiOperation("新增主机/网关")
@PreAuthorize("@ss.hasPermi('iot:IotDevice:add')")
@Log(title = "主机/网关", businessType = BusinessType.INSERT)
@PostMapping
@PostMapping("add")
public AjaxResult add(@RequestBody IotDevice iotDevice)
{
return toAjax(iotDeviceService.insertIotDevice(iotDevice));
... ... @@ -94,7 +96,7 @@ public class IotDeviceController extends BaseController
@ApiOperation("修改主机/网关")
@PreAuthorize("@ss.hasPermi('iot:IotDevice:edit')")
@Log(title = "主机/网关", businessType = BusinessType.UPDATE)
@PutMapping
@PutMapping("edit")
public AjaxResult edit(@RequestBody IotDevice iotDevice)
{
return toAjax(iotDeviceService.updateIotDevice(iotDevice));
... ... @@ -111,4 +113,27 @@ public class IotDeviceController extends BaseController
{
return toAjax(iotDeviceService.deleteIotDeviceByClient_ids(client_ids));
}
/**
* 修改主机/网关
*/
@ApiOperation("修改主机/网关")
@PreAuthorize("@ss.hasPermi('iot:IotDevice:restart')")
@Log(title = "主机/网关", businessType = BusinessType.UPDATE)
@PutMapping("restart")
public AjaxResult restart(@RequestBody IotDevice iotDevice)
{
return toAjax(iotDeviceService.updateIotDevice(iotDevice));
}
// @ApiOperation("固件版本更新")
// @ApiImplicitParam(value = "版本号",name = "firmwareVersion")
// @PreAuthorize("@ss.hasPermi('iot:IotDevice:firmwareUp')")
// @Log(title = "主机/网关", businessType = BusinessType.UPDATE)
// @PutMapping
// public AjaxResult firmwareUp(String firmwareVersion)
// {
// return redirect("/device/control");
// }
}
... ...
package com.zhonglai.luhui.admin.controller.iot;
import java.util.Date;
import java.util.List;
import javax.servlet.http.HttpServletResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.utils.DateUtils;
import com.zhonglai.luhui.admin.dto.IotThingsModelAddApi;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelItemBase;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.specs.*;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.security.access.prepost.PreAuthorize;
... ... @@ -83,8 +90,39 @@ public class IotThingsModelController extends BaseController
@PreAuthorize("@ss.hasPermi('iot:IotThingsModel:add')")
@Log(title = "物模型模板", businessType = BusinessType.INSERT)
@PostMapping
public AjaxResult add(@RequestBody IotThingsModel iotThingsModel)
public AjaxResult add(@RequestBody IotThingsModelAddApi iotThingsModelAddApi)
{
IotThingsModel iotThingsModel = iotThingsModelAddApi.getIotThingsModel();
iotThingsModel.setCreate_by(getUsername());
ThingsModelItemBase thingsModelItemBase = null;
switch (iotThingsModel.getData_type())
{
case "integer":
thingsModelItemBase = JSONObject.parseObject(JSON.toJSONString(iotThingsModelAddApi.getThingsModelBase()), IntegerModelOutput.class);
break;
case "decimal":
thingsModelItemBase = JSONObject.parseObject(JSON.toJSONString(iotThingsModelAddApi.getThingsModelBase()), DecimalModelOutput.class);
break;
case "string":
thingsModelItemBase = JSONObject.parseObject(JSON.toJSONString(iotThingsModelAddApi.getThingsModelBase()), StringModelOutput.class);
break;
case "bool":
thingsModelItemBase = JSONObject.parseObject(JSON.toJSONString(iotThingsModelAddApi.getThingsModelBase()), BoolModelOutput.class);
break;
case "array":
thingsModelItemBase = JSONObject.parseObject(JSON.toJSONString(iotThingsModelAddApi.getThingsModelBase()), ArrayModelOutput.class);
break;
case "enum":
thingsModelItemBase = JSONObject.parseObject(JSON.toJSONString(iotThingsModelAddApi.getThingsModelBase()), EnumModelOutput.class);
break;
}
if(null == thingsModelItemBase)
{
return AjaxResult.error("请输入数模型");
}
return toAjax(iotThingsModelService.insertIotThingsModel(iotThingsModel));
}
... ...
package com.zhonglai.luhui.admin.dto;
import com.ruoyi.system.domain.IotThingsModel;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.Map;
@ApiModel("物模型添加对象")
public class IotThingsModelAddApi {
@ApiModelProperty("物模型信息")
private IotThingsModel iotThingsModel;
@ApiModelProperty("specs数据定义对象")
private Map<String,Object> thingsModelBase;
public IotThingsModel getIotThingsModel() {
return iotThingsModel;
}
public void setIotThingsModel(IotThingsModel iotThingsModel) {
this.iotThingsModel = iotThingsModel;
}
public Map<String,Object> getThingsModelBase() {
return thingsModelBase;
}
public void setThingsModelBase(Map<String,Object> thingsModelBase) {
this.thingsModelBase = thingsModelBase;
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.config;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.iot.IotThingsModel;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.service.DataModeAnalysisService;
import com.zhonglai.luhui.mqtt.comm.util.http.HttpUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class SysParameter {
... ... @@ -18,10 +25,15 @@ public class SysParameter {
@Value("${mqtt.topicconfig:/{{roleid}}/{{username}}/{{clientid}}/{{topicType}}/{{messageid}}}")
public String tempTopicconfig ; //topic 配置
@Value("${mqtt.topics")
public String topics ; //topic
public static String topicconfig ; //topic 配置
private static Map<String, IotThingsModel> terminalDataThingsMode = new HashMap<>(); //topic 终端数据模型
@PostConstruct
public static void init() {
public void init() {
String service_ip_url = "http://ly.userlogin.yu2le.com/ip";
JSONObject jsonObject = JSONObject.parseObject(HttpUtils.sendGet(service_ip_url));
service_ip = jsonObject.getString("data");
... ... @@ -32,4 +44,14 @@ public class SysParameter {
{
topicconfig = tempTopicconfig;
}
public static void setTerminalDataThingsMode(String username,IotThingsModel thingsModel)
{
terminalDataThingsMode.put(username+":"+thingsModel.getIdentifier(),thingsModel);
}
public static IotThingsModel getTerminalDataThingsMode(String username,String identifier)
{
return terminalDataThingsMode.get(username+":"+identifier);
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto;
import java.util.List;
public interface ServerDto {
ServerAgreementContent getServerAgreementContent();
boolean isReplyMessage();
List<DeviceSensorData> getDeviceSensorData();
List<LogDeviceOperation> getOperationLog();
}
... ...
... ... @@ -78,13 +78,13 @@ public class IotThingsModel
/** 用户id */
@ApiModelProperty("用户名称")
private Integer user_name;
private String user_name;
public Integer getUser_name() {
public String getUser_name() {
return user_name;
}
public void setUser_name(Integer user_name) {
public void setUser_name(String user_name) {
this.user_name = user_name;
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels;
import com.alibaba.fastjson.annotation.JSONField;
import com.zhonglai.luhui.mqtt.comm.dto.iot.IotThingsModel;
/**
... ... @@ -8,5 +9,7 @@ import com.zhonglai.luhui.mqtt.comm.dto.iot.IotThingsModel;
public interface ThingsModelBase<T> {
void conversionThingsModel(IotThingsModel thingsModel);
void addValue(T t);
// @JSONField(serialize=false)
String getView();
String getSaveView();
}
... ...
... ... @@ -37,4 +37,5 @@ public abstract class ThingsModelItemBase<T> implements ThingsModelBase<T>
is_save_log = thingsModel.getIs_save_log();
mode_type = thingsModel.getType();
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.specs;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.annotation.JSONField;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelItemBase;
import lombok.Data;
... ... @@ -22,4 +23,9 @@ public class ArrayModelOutput extends ThingsModelItemBase<JSONArray>
}
return "";
}
@Override
public String getSaveView() {
return JSONArray.toJSONString(getValue());
}
}
... ...
... ... @@ -24,4 +24,9 @@ public class BoolModelOutput extends ThingsModelItemBase<Boolean>
}
}
@Override
public String getSaveView() {
return getValue().toString();
}
}
... ...
... ... @@ -20,6 +20,11 @@ public class DecimalModelOutput extends ThingsModelItemBase<BigDecimal>
@Override
public String getView() {
return getView()+unit;
return getValue().doubleValue()+unit;
}
@Override
public String getSaveView() {
return getValue().toString();
}
}
... ...
... ... @@ -27,4 +27,9 @@ public class EnumModelOutput extends ThingsModelItemBase<String>
}
return getValue();
}
@Override
public String getSaveView() {
return getValue();
}
}
... ...
... ... @@ -4,14 +4,17 @@ import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelItemBase;
import lombok.Data;
import java.math.BigDecimal;
import java.math.RoundingMode;
@Data
public class IntegerModelOutput extends ThingsModelItemBase<Integer>
{
private BigDecimal min;
private BigDecimal max;
private BigDecimal step;
private String unit;
private BigDecimal min; //最大值
private BigDecimal max; //最小值
private BigDecimal step; //步长
private String unit; //单位
private Integer acy; //精度
@Override
public void addValue(Integer object) {
... ... @@ -20,6 +23,12 @@ public class IntegerModelOutput extends ThingsModelItemBase<Integer>
@Override
public String getView() {
return getView()+unit;
return getSaveView()+unit;
}
@Override
public String getSaveView() {
BigDecimal bigDecimal = new BigDecimal(getValue().toString());
return bigDecimal.divide(new BigDecimal(acy),acy.toString().length()-1, RoundingMode.HALF_UP).toString();
}
}
... ...
... ... @@ -20,4 +20,9 @@ public class StringModelOutput extends ThingsModelItemBase<Object>
public String getView() {
return null!=getValue()?getValue()+"":null;
}
@Override
public String getSaveView() {
return getView();
}
}
... ...
... ... @@ -27,7 +27,12 @@ public class Topic {
topic = Optional.ofNullable(topic).orElseThrow(()->new MyException("topic为空"));
String[] sts = topic.split("/");
String[] config = SysParameter.topicconfig.split("/");
for(int i=1;i<config.length;i++)
int number = sts.length;
if(number>config.length)
{
number = config.length;
}
for(int i=1;i<number;i++)
{
String cf = config[i].replace("{{","").replace("}}","");
try {
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.iot.IotDevice;
import com.zhonglai.luhui.mqtt.comm.dto.iot.IotTerminal;
... ... @@ -32,9 +33,8 @@ public class BusinessDataUpdateService {
* @param topic
* @param data
*/
public void updataDta(Type type,Topic topic, JSONObject data)
public void updataDta(Type type,Topic topic, JSONObject data,boolean isOperLog,List<LogDeviceOperation> operateHisList, List<DeviceSensorData> list)
{
List<LogDeviceOperation> operateHisList = new ArrayList<>();
for(String key:data.keySet())
{
Object o = data.get(key);
... ... @@ -43,20 +43,15 @@ public class BusinessDataUpdateService {
JSONObject jsData = data.getJSONObject(key);
if("0".equals(key)) //主机
{
IotDevice iotDevice = translateDevice(type,topic,jsData,operateHisList);
IotDevice iotDevice = translateDevice(type,topic,jsData,isOperLog,operateHisList,list);
deviceService.updataDevice(iotDevice);
}else{ //终端
IotTerminal iotTerminal = translateTerminal(type,key,topic,jsData,operateHisList);
IotTerminal iotTerminal = translateTerminal(type,key,topic,jsData,isOperLog,operateHisList,list);
deviceService.updataTerminal(iotTerminal);
}
}
}
if(null != operateHisList && operateHisList.size() !=0 )
{
deviceLogService.saveOperationLog(operateHisList);
}
}
/**
... ... @@ -67,7 +62,7 @@ public class BusinessDataUpdateService {
* @param operateHisList
* @return
*/
private IotDevice translateDevice(Type type,Topic topic , JSONObject jsData, List<LogDeviceOperation> operateHisList)
private IotDevice translateDevice(Type type,Topic topic , JSONObject jsData,boolean isOperLog, List<LogDeviceOperation> operateHisList, List<DeviceSensorData> list)
{
IotDevice olddevice = deviceService.getRedicDevice(topic.getClientid());
JSONObject summaryObjec = null;
... ... @@ -86,7 +81,7 @@ public class BusinessDataUpdateService {
device.setSummary(summaryObjec.toString());
}
JSONObject saveJson = dataModeAnalysisService.analysisThingsModelValue( topic.getClientid(),topic.getUsername(),jsData,true,"主机本地");
JSONObject saveJson = dataModeAnalysisService.analysisThingsModelValue( topic.getClientid(),topic.getUsername(),jsData,"主机本地",isOperLog,operateHisList,list);
//更新数据
if(null != olddevice && "ADD".equals(type.name()))
{
... ... @@ -109,10 +104,10 @@ public class BusinessDataUpdateService {
* @param operateHisList
* @return
*/
private IotTerminal translateTerminal(Type type,String key, Topic topic , JSONObject jsData, List<LogDeviceOperation> operateHisList)
private IotTerminal translateTerminal(Type type,String key, Topic topic , JSONObject jsData,boolean isOperLog, List<LogDeviceOperation> operateHisList, List<DeviceSensorData> list)
{
String id = topic.getClientid()+"_"+key;
JSONObject saveJson = dataModeAnalysisService.analysisThingsModelValue( id,topic.getUsername(),jsData,true,"终端本地");
JSONObject saveJson = dataModeAnalysisService.analysisThingsModelValue( id,topic.getUsername(),jsData,"终端本地",isOperLog,operateHisList,list);
IotTerminal terminal = new IotTerminal();
terminal.setId(id);
terminal.setUpdate_time(DateUtils.getNowTimeMilly());
... ...
... ... @@ -3,6 +3,7 @@ package com.zhonglai.luhui.mqtt.comm.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.config.RedisConfig;
import com.zhonglai.luhui.mqtt.comm.config.SysParameter;
import com.zhonglai.luhui.mqtt.comm.dao.BaseDao;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
... ... @@ -11,11 +12,13 @@ import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelBase;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelDataTypeEnum;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelItemBase;
import com.zhonglai.luhui.mqtt.comm.util.DateUtils;
import com.zhonglai.luhui.mqtt.comm.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
... ... @@ -28,8 +31,6 @@ public class DataModeAnalysisService {
private static final Logger log = LoggerFactory.getLogger(DataModeAnalysisService.class);
private BaseDao baseDao = new BaseDao();
@Autowired
private RedisService redisService ;
@Autowired
private DeviceLogService dviceLogService;
... ... @@ -37,43 +38,39 @@ public class DataModeAnalysisService {
/**
* 初始化物模型数据
*/
public void initDataThingsMode()
public void initDataThingsMode(String roleIds,String usernames)
{
List<IotThingsModel> list = baseDao.findBysql("select * from `mqtt_broker`.`iot_things_model` where del_flag=0", IotThingsModel.class);
String sql = "SELECT a.*,b.username user_name FROM `mqtt_broker`.`iot_things_model` a LEFT JOIN `mqtt_broker`.`iot_user` b ON a.`user_id`=b.`id` WHERE a.del_flag=0 AND b.`role_id` IN("+roleIds+")";
if(StringUtils.isNotEmpty(usernames))
{
sql += " AND b.`username` IN("+usernames+")";
}
List<IotThingsModel> list = baseDao.findBysql(sql, IotThingsModel.class);
if(null != list && list.size() != 0)
{
for(IotThingsModel thingsModel:list)
{
redisService.hset(RedisConfig.FIELD+RedisConfig.THINGS_MODEL+thingsModel.getUser_name(),thingsModel.getIdentifier(),thingsModel);
SysParameter.setTerminalDataThingsMode(thingsModel.getUser_name(),thingsModel);
}
}
}
public JSONObject analysisThingsModelValue(String id, String userName , JSONObject jsData)
{
return analysisThingsModelValue(id,userName,jsData,false,null);
}
/**
* 解析物模型数据
*/
public JSONObject analysisThingsModelValue(String id,String userName ,JSONObject jsData,boolean isSaveLog,String controlModel)
public JSONObject analysisThingsModelValue(String id,String userName ,JSONObject jsData,String controlModel,boolean isOperLog, List<LogDeviceOperation> operateHisList, List<DeviceSensorData> list)
{
if(null != jsData && jsData.size() != 0 )
{
Map<Object, Object> thingsModelMap = redisService.hmget(RedisConfig.FIELD+RedisConfig.THINGS_MODEL+userName);
JSONObject rObjec = new JSONObject();
List<DeviceSensorData> list = new ArrayList<>();
List<LogDeviceOperation> oplist = new ArrayList<>();
for(String key:jsData.keySet())
{
Object object = thingsModelMap.get(key);
IotThingsModel thingsModel = null;
if(object instanceof IotThingsModel)
IotThingsModel thingsModel = SysParameter.getTerminalDataThingsMode(userName,key);
if(null == thingsModel) //没有配置的 都按字符串处理
{
thingsModel = (IotThingsModel)object;
}else{ //没有配置的 都按字符串处理
thingsModel = new IotThingsModel();
thingsModel.setData_type(ThingsModelDataTypeEnum.STRING.name());
thingsModel.setIdentifier(key);
... ... @@ -85,9 +82,15 @@ public class DataModeAnalysisService {
jsonObject.put("maxLength",255);
thingsModel.setSpecs(jsonObject.toString());
}
Class<ThingsModelBase> aClass = Enum.valueOf(ThingsModelDataTypeEnum.class,thingsModel.getData_type()).getaClass();
String data_type = thingsModel.getData_type().toUpperCase();
if(!jsData.get(key).getClass().getSimpleName().toUpperCase().equals(data_type))
{
data_type = ThingsModelDataTypeEnum.STRING.name();
}
Class<ThingsModelBase> aClass = Enum.valueOf(ThingsModelDataTypeEnum.class,data_type).getaClass();
ThingsModelBase thingsModelBase = JSON.parseObject(thingsModel.getSpecs(),aClass);
thingsModelBase.conversionThingsModel(thingsModel);
thingsModelBase.addValue(jsData.get(key));
ThingsModelItemBase thingsModelItemBase = (ThingsModelItemBase) thingsModelBase;
... ... @@ -96,7 +99,7 @@ public class DataModeAnalysisService {
{
DeviceSensorData sensorData = new DeviceSensorData();
sensorData.setDataType(key);
sensorData.setDataValue(jsData.getString(key));
sensorData.setDataValue(thingsModelBase.getSaveView());
sensorData.setCreatTime(DateUtils.getNowTimeMilly());
sensorData.setDeviceModel(userName);
sensorData.setDeviceInfoId(id);
... ... @@ -104,14 +107,14 @@ public class DataModeAnalysisService {
}
//记录操作日志
oplist.add(dviceLogService.newLogDeviceOperation(id,jsData.getString(key),null,controlModel+thingsModelItemBase.getName()+"为"+thingsModelBase.getView(),jsData.toString()));
if(isOperLog)
{
operateHisList.add(dviceLogService.newLogDeviceOperation(id,thingsModelBase.getSaveView(),null,controlModel+thingsModelItemBase.getName()+"为"+thingsModelBase.getView(),jsData.toString()));
}
rObjec.put(key,JSONObject.toJSONString(thingsModelBase));
}
//日志入库
dviceLogService.saveDeviceSensorDataLog(list);
dviceLogService.saveOperationLog(oplist);
return rObjec;
}
return null;
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDtoClassNew;
... ... @@ -19,6 +21,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class MqttCallback implements MqttCallbackExtended {
private static final Logger log = LoggerFactory.getLogger(MqttCallback.class);
... ... @@ -79,15 +83,10 @@ public class MqttCallback implements MqttCallbackExtended {
log.info("{} 解析到的dto【{}】",dto);
//缓存数据
boolean isPersistence = cacheService.updateCache(topic,dto);
cacheService.updateCache(topic,dto);
//数据持久化
if(isPersistence)
{
log.info("【{}】数据有更新",topic);
dataPersistenceService.persistence(topic,dto);
}
dataPersistenceService.addDeviceSensorData(topic,dto);
dataPersistenceService.persistence(topic,dto);
} catch (Exception e) {
log.error(s+"消息解析异常",e);
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import com.zhonglai.luhui.mqtt.comm.config.SysParameter;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
... ... @@ -32,6 +34,9 @@ public class TerminalService {
@Autowired
private SysParameter sysParameter;
@Autowired
private DataModeAnalysisService dataModeAnalysisService;
//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
private final static ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
... ... @@ -81,10 +86,47 @@ public class TerminalService {
log.info("-----------开始启动mqtt监听服务--------------------");
init();
log.info("-----------启动参数{}--------------------",options);
sysParameter.inittopicconfig();
log.info("-----------topic配置模型{}--------------------",SysParameter.topicconfig);
initDataThingsMode();
log.info("-----------终端数据模型配置成功--------------------");
connect();
log.info("-----------mqtt连接服务器成功--------------------");
subscribe();
sysParameter.inittopicconfig();
log.info("-----------mqtt监听服务启动成功--------------------");
log.info("-----------订阅{}成功--------------------",topics);
}
private void initDataThingsMode()
{
String roleids="";
String usernames="";
for(String topicstr:topics.split(","))
{
Topic topic = new Topic(topicstr);
if(null != topic)
{
String rild = topic.getRoleid();
if(StringUtils.isNoneBlank(rild))
{
if(!"".equals(roleids))
{
roleids +=",";
}
roleids +=rild;
}
String username = topic.getUsername();
if(StringUtils.isNoneBlank(username) && !"+".equals(username))
{
if(!"".equals(usernames))
{
usernames +=",";
}
usernames +="'"+username+"'";
}
}
}
dataModeAnalysisService.initDataThingsMode(roleids,usernames);
}
public void subscribe(String[] topicFilters) throws MqttException {
... ...
... ... @@ -4,7 +4,6 @@ import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.service.ClienNoticeService;
import com.zhonglai.luhui.mqtt.comm.util.DateUtils;
import com.zhonglai.luhui.mqtt.dto.Message;
import com.zhonglai.luhui.mqtt.dto.topic.PutDto;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.eclipse.paho.client.mqttv3.MqttException;
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.ArrayList;
import java.util.List;
@Data
@Accessors(chain = true)
public class AddPostDto implements ServerDto {
private List<LogDeviceOperation> operateHisList = new ArrayList<>();
private List<DeviceSensorData> list = new ArrayList<>();
private JSONObject data;
@Override
public ServerAgreementContent getServerAgreementContent() {
... ... @@ -19,4 +26,14 @@ public class AddPostDto implements ServerDto {
public boolean isReplyMessage() {
return false;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return list;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return operateHisList;
}
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.ArrayList;
import java.util.List;
@Data
@Accessors(chain = true)
public class AllPostDto implements ServerDto {
private List<LogDeviceOperation> operateHisList = new ArrayList<>();
private List<DeviceSensorData> list = new ArrayList<>();
private JSONObject data;
@Override
public ServerAgreementContent getServerAgreementContent() {
... ... @@ -19,4 +26,14 @@ public class AllPostDto implements ServerDto {
public boolean isReplyMessage() {
return false;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return list;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return operateHisList;
}
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
@Data
@Accessors(chain = true)
public class DbDistributeDto implements ServerDto {
... ... @@ -17,4 +21,14 @@ public class DbDistributeDto implements ServerDto {
public boolean isReplyMessage() {
return false;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return null;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return null;
}
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
@Data
@Accessors(chain = true)
public class GetDto implements ServerDto {
... ... @@ -17,4 +21,14 @@ public class GetDto implements ServerDto {
public boolean isReplyMessage() {
return false;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return null;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return null;
}
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
@Data
@Accessors(chain = true)
public class GetReqDto implements ServerDto {
... ... @@ -17,4 +21,14 @@ public class GetReqDto implements ServerDto {
public boolean isReplyMessage() {
return false;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return null;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return null;
}
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
@Data
@Accessors(chain = true)
public class OnlineDto implements ServerDto {
... ... @@ -17,4 +21,14 @@ public class OnlineDto implements ServerDto {
public boolean isReplyMessage() {
return false;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return null;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return null;
}
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
/**
* 服务器下发数据
*/
... ... @@ -23,4 +26,14 @@ public class PutDto implements ServerDto {
public boolean isReplyMessage() {
return false;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return null;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return null;
}
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
... ... @@ -9,6 +11,8 @@ import com.zhonglai.luhui.mqtt.dto.MessageCode;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
@Data
@Accessors(chain = true)
public class PutReqDto implements ServerDto {
... ... @@ -55,4 +59,14 @@ public class PutReqDto implements ServerDto {
public boolean isReplyMessage() {
return true;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return null;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return null;
}
}
... ...
... ... @@ -10,7 +10,6 @@ import org.springframework.stereotype.Service;
public class CacheServiceImpl implements CacheService {
@Override
public boolean updateCache(Topic topic, ServerDto serverDto) {
PutReqDto putReqDto = (PutReqDto) serverDto;
return true;
}
}
... ...
package com.zhonglai.luhui.mqtt.service;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.TableGenerateSqlEnum;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.service.DataPersistenceService;
import com.zhonglai.luhui.mqtt.comm.service.DeviceLogService;
import com.zhonglai.luhui.mqtt.dto.topic.AddPostDto;
import com.zhonglai.luhui.mqtt.dto.topic.AllPostDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class DataPersistenceServiceImpl extends DataPersistenceService {
private static final Logger log = LoggerFactory.getLogger(DataPersistenceServiceImpl.class);
@Autowired
private DeviceLogService dviceLogService;
@Override
public void persistence(Topic topic, ServerDto serverDto) {
//日志入库
List<DeviceSensorData> dsdList = serverDto.getDeviceSensorData();
if(null != dsdList && dsdList.size() != 0)
{
dviceLogService.saveDeviceSensorDataLog(dsdList);
}
List<LogDeviceOperation> doList = serverDto.getOperationLog();
if(null != doList && doList.size() != 0)
{
dviceLogService.saveOperationLog(doList);
}
}
@Override
public void addDeviceSensorData(Topic topic, ServerDto serverDto) {
if(serverDto instanceof AddPostDto)
{
AddPostDto addPostDto = (AddPostDto) serverDto;
List<DeviceSensorData> list = addPostDto.getList();
if(null != list && list.size() != 0)
{
baseDao.insertList(list, TableGenerateSqlEnum.DeviceSensorData.getNowTableName());
}
}else if(serverDto instanceof AllPostDto)
{
AllPostDto allPostDto = (AllPostDto) serverDto;
List<DeviceSensorData> list = allPostDto.getList();
if(null != list && list.size() != 0)
{
baseDao.insertList(list, TableGenerateSqlEnum.DeviceSensorData.getNowTableName());
}
}
}
@Override
... ...
... ... @@ -121,16 +121,6 @@ public class DeviceService {
setRedicTerminal(terminal);
baseDao.saveOrUpdateObject(terminal);
}
/**
* 获取
* @param username
* @return
*/
public IotThingsModel getThingsModelsByUserIdAndIdentifier(String username, String identifier)
{
Object object = redisService.hget(RedisConfig.FIELD+RedisConfig.THINGS_MODEL+username,identifier);
return null != object?(IotThingsModel)object:null;
}
/**
* 增量更新数据
... ...
... ... @@ -21,8 +21,9 @@ public class AddPostTopic implements BusinessAgreement<AddPostDto> {
private BusinessDataUpdateService businessDataUpdateService ;
@Override
public ServerDto analysis(Topic topic, AddPostDto data) {
businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ADD,topic,data.getData());
return null;
businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ADD,topic,data.getData(),true,data.getOperateHisList(),data.getList());
return data;
}
@Override
... ...
... ... @@ -21,8 +21,8 @@ public class AllPostTopic implements BusinessAgreement<AllPostDto> {
private BusinessDataUpdateService businessDataUpdateService ;
@Override
public ServerDto analysis(Topic topic, AllPostDto data) throws Exception {
businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ALL,topic,data.getData());
return null;
businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ALL,topic,data.getData(),false,data.getOperateHisList(),data.getList());
return data;
}
@Override
... ...
... ... @@ -44,7 +44,7 @@ mqtt:
#唯一标识
clientId: lh-mqtt-service-001
#订阅的topic
topics: "/+/+/+/+/ADD_POST,/+/+/+/+/ALL_POST,/+/+/+/+/DB_TOPIC_DISTRIBUTE,/+/+/+/+/GET,/+/+/+/online,/+/+/+/+/PUT_REQ"
topics: "/2/+/+/+/ADD_POST,/2/+/+/+/ALL_POST,/2/+/+/+/DB_TOPIC_DISTRIBUTE,/2/+/+/+/GET,/2/+/+/+/online,/2/+/+/+/PUT_REQ"
topicconfig: "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}"
username: sysuser
password: "!@#1qaz"
... ...
... ... @@ -155,5 +155,23 @@
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
<!-- 自定义验证注解 -->
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-validation -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
<version>3.4.3.Final</version>
</dependency>
<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
<version>2.0.2</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
... ...
... ... @@ -72,7 +72,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if test="update_by != null">#{update_by},</if>
<if test="update_time != null">#{update_time},</if>
<if test="user_id != null">#{user_id},</if>
UNIX_TIMESTAMP(NOW()),
TIMESTAMP(NOW()),
</trim>
</insert>
... ...