作者 钟来

初始提交

正在显示 42 个修改的文件 包含 820 行增加84 行删除
driverClassName=com.mysql.jdbc.Driver
driverClassName=com.mysql.cj.jdbc.Driver
url=jdbc:mysql://rm-wz9740un21f09iokuao.mysql.rds.aliyuncs.com:3306/mqtt_broker?useUnicode=true&characterEncoding=utf8&autoReconnect=true
username=luhui
password=Luhui586
... ...
... ... @@ -125,6 +125,7 @@
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
</dependencies>
<build>
... ...
... ... @@ -12,7 +12,6 @@ import org.springframework.context.annotation.ComponentScan;
"com.zhonglai.luhui.mqtt.comm.agreement",
"com.zhonglai.luhui.mqtt.comm.service",
"com.zhonglai.luhui.mqtt.config",
"com.zhonglai.luhui.mqtt.agreement",
"com.zhonglai.luhui.mqtt.service",
"com.zhonglai.luhui.mqtt.controller",
})
... ...
package com.zhonglai.luhui.mqtt.comm.agreement;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreementFactory;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.util.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
... ... @@ -19,8 +17,9 @@ public class BusinessAgreementFactoryImpl implements BusinessAgreementFactory {
@Autowired
private Map<String, BusinessAgreement> businessAgreementMap;
@Override
public BusinessAgreement createBusinessAgreement(Topic topic, BusinessDto businessDto) {
public BusinessAgreement createBusinessAgreement(Topic topic) {
BusinessAgreement businessAgreement = businessAgreementMap.get(topic.getTopicType().toUpperCase());
if(null == businessAgreement) //没有找到就用默认的
{
... ... @@ -29,12 +28,4 @@ public class BusinessAgreementFactoryImpl implements BusinessAgreementFactory {
return businessAgreement;
}
public void toBusinessDto(String payloadtype,byte[] data)
{
if(StringUtils.isBlank(payloadtype))
{
payloadtype = "String";
}
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.agreement;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import org.slf4j.Logger;
... ... @@ -20,7 +21,8 @@ public class DefaultAgreement implements BusinessAgreement<byte[]> {
}
@Override
public byte[] toData(byte[] data) {
return data;
public ServerDto toData(BusinessDto data) {
return null;
}
}
\ No newline at end of file
... ...
... ... @@ -148,7 +148,7 @@ public class BaseDao {
{
attributeStr += ",";
}
attributeStr += "`"+changTableNameFromObject(field.getName())+"`";
attributeStr += "`"+com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(field.getName())+"`";
}
attributeStr += ")";
return attributeStr;
... ... @@ -769,13 +769,12 @@ public class BaseDao {
values += ",";
update += ",";
}
sql += "`"+changTableNameFromObject(field.getName())+"`";
sql += "`"+com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(field.getName())+"`";
values += "'"+ value+"'";
update += "`"+changTableNameFromObject(field.getName())+"`"+"=VALUES("+"`"+changTableNameFromObject(field.getName())+"`)";
update += "`"+com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(field.getName())+"`"+"=VALUES("+"`"+com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(field.getName())+"`)";
}
} catch (NoSuchMethodException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.println("未找到"+field.getName()+"的get方法");
} catch (SecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
... ... @@ -938,7 +937,7 @@ public class BaseDao {
tableNmae = (String) tObject;
}
} catch (NoSuchMethodException e) {
e.printStackTrace();
System.out.println("未找到getTableName");
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
... ...
package com.zhonglai.luhui.mqtt.comm.dto;
import lombok.Data;
@Data
public class DeviceSensorData {
private String deviceInfoId; //设备信息id
private String dataType; //数据类型
private String dataValue; //L数据值
private Integer creatTime; //创建时间
private String deviceModel; // VARCHAR(10) NOT NULL COMMENT '设备型号,(3,5,6)',
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.business;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
public interface BusinessDto {
BusinessDto analyticalModel(String modeData);
ServerDto toServerDto();
Object getContentData();
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.business;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import org.apache.commons.lang3.ArrayUtils;
import javax.script.*;
/**
* 字节payload内容协议
*/
... ... @@ -12,7 +15,7 @@ public class ByteBusinessDto implements BusinessDto{
private int functionCode; //功能码
private int verification; //校验码
private byte[] data; //数据
private int datalength; //数据长度
private byte[] srcData; // 原始数据
public ByteBusinessDto(byte[] data )
... ... @@ -22,6 +25,24 @@ public class ByteBusinessDto implements BusinessDto{
@Override
public BusinessDto analyticalModel(String modeData) {
ScriptEngine engine = new ScriptEngineManager().getEngineByName("javascript");
try {
engine.eval(modeData);
if (engine instanceof Invocable) {
Invocable invoke = (Invocable) engine;
functionCode = (int) invoke.invokeFunction("getFunctionCode", srcData);
verification = (int) invoke.invokeFunction("getVerification", srcData);
data = (byte[]) invoke.invokeFunction("getData", srcData);
datalength = (int) invoke.invokeFunction("getDatalength", srcData);
} else {
System.out.println("error");
}
} catch (ScriptException e) {
System.out.println("表达式runtime错误:" + e.getMessage());
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
Mode mode = JSONObject.parseObject(modeData,Mode.class);
this.functionCode = srcData[mode.functionCodeAdr];
this.verification = srcData[mode.verificationAdr];
... ... @@ -29,6 +50,11 @@ public class ByteBusinessDto implements BusinessDto{
return this;
}
@Override
public Object getContentData() {
return data;
}
@Data
class Mode
{
... ... @@ -37,4 +63,29 @@ public class ByteBusinessDto implements BusinessDto{
int functionCodeAdr; //功能码地址
int verificationAdr; //校验码地址
}
public static void main(String[] args) throws ScriptException, NoSuchMethodException {
String regular = "function hexToString(str){\n" +
"    var arr = str.split(\",\");\n" +
" \t\tvar val;\n" +
"    for(var i = 0; i < arr.length; i++){\n" +
"      val[i] = arr[i].fromCharCode(i);\n" +
"    }\n" +
"    return val;\n" +
"}";
ScriptEngine engine = new ScriptEngineManager().getEngineByName("javascript");
try {
engine.eval(regular);
if (engine instanceof Invocable) {
Invocable invoke = (Invocable) engine;
String result = invoke.invokeFunction("hexToString", "01,03,70,02,00,01,3F,0A").toString();
System.out.println(result);
} else {
System.out.println("error");
}
} catch (ScriptException e) {
System.out.println("表达式runtime错误:" + e.getMessage());
}
}
}
... ...
... ... @@ -30,8 +30,7 @@ public class JsonBusinessDto implements BusinessDto{
}
@Override
public ServerDto toServerDto() {
return null;
public Object getContentData() {
return data;
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.business;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
/**
... ... @@ -17,4 +18,9 @@ public class StringBusinessDto implements BusinessDto{
public BusinessDto analyticalModel(String modeData) {
return this;
}
@Override
public Object getContentData() {
return srcData;
}
}
... ...
... ... @@ -5,6 +5,9 @@ import io.swagger.annotations.ApiModelProperty;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import java.util.List;
import java.util.Map;
/**
* 主机/网关对象 iot_device
*
... ... @@ -107,7 +110,6 @@ public class IotDevice
@ApiModelProperty("负载类型(String,Json,Bite16,Bite32)")
private String payload_type;
@ApiModelProperty("payload 协议模型")
private String business_model; //payload 协议模型
public String getBusiness_model() {
... ...
... ... @@ -76,6 +76,18 @@ public class IotThingsModel
@ApiModelProperty("用户id")
private Integer user_id;
/** 用户id */
@ApiModelProperty("用户名称")
private Integer user_name;
public Integer getUser_name() {
return user_name;
}
public void setUser_name(Integer user_name) {
this.user_name = user_name;
}
public void setCreate_by(String create_by)
{
this.create_by = create_by;
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels;
import lombok.Data;
/**
* 枚举类物模型的参数
*/
@Data
public class EnumItemOutput
{
private String text;
private String value;
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
/**
* 事件物模型
*/
@Data
public class EventDto {
/** 物模型唯一标识符 */
private String id;
/** 物模型名称 */
private String name;
/** 数据定义 */
private JSONObject datatype;
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
/**
* 功能物模型
*/
@Data
public class FunctionDto {
/** 物模型唯一标识符 */
private String id;
/** 物模型名称 */
private String name;
/** 是否首页显示(0-否,1-是) */
private Integer isTop;
/** 数据定义 */
private JSONObject datatype;
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
/**
* 属性物模型
*/
@Data
public class PropertyDto {
/** 物模型唯一标识符 */
private String id;
/** 物模型名称 */
private String name;
/** 是否首页显示(0-否,1-是) */
private Integer isTop;
/** 是否实时监测(0-否,1-是) */
private Integer isMonitor;
/** 数据定义 */
private JSONObject datatype;
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels;
import com.zhonglai.luhui.mqtt.comm.dto.iot.IotThingsModel;
/**
* 物模型工厂
*/
public interface ThingsModelBase<T> {
void conversionThingsModel(IotThingsModel thingsModel);
void addValue(T t);
String getView();
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.specs.*;
/**
* 物模型数据类型,及对应显示解析方案
*/
public enum ThingsModelDataTypeEnum {
INTEGER(IntegerModelOutput.class),
DECIMAL(DecimalModelOutput.class),
STRING(StringModelOutput.class),
BOOL(BoolModelOutput.class),
ARRAY(ArrayModelOutput.class),
ENUM(EnumModelOutput.class);
private Class aClass;
ThingsModelDataTypeEnum(Class aClass) {
this.aClass = aClass;
}
public Class getaClass() {
return aClass;
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelItem;
import java.math.BigDecimal;
public class ReadOnlyModelOutput
{
private BigDecimal min;
private BigDecimal max;
private BigDecimal step;
private String unit;
private String arrayType;
private String falseText;
private String trueText;
private int maxLength;
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels;
import com.zhonglai.luhui.mqtt.comm.dto.iot.IotThingsModel;
import lombok.Data;
/**
* 物模型工厂的公用方法
* @param <T>
*/
@Data
public abstract class ThingsModelItemBase<T> implements ThingsModelBase<T>
{
/** 物模型唯一标识符 */
private String id;
/** 物模型名称 */
private String name;
/** 物模型值 */
private T value;
/** 是否首页显示(0-否,1-是) */
private Integer isTop;
/** 是否实时监测(0-否,1-是) */
private Integer isMonitor;
/** 数据类型 */
private String type;
/** 是否记录日志(0否,1是) */
private Integer is_save_log;
/** 模型类别(1-属性,2-功能,3-事件) */
private Integer mode_type;
public void conversionThingsModel(IotThingsModel thingsModel)
{
id = thingsModel.getIdentifier();
name = thingsModel.getModel_name();
isTop = thingsModel.getIs_top();
isMonitor = thingsModel.getIs_monitor();
type = thingsModel.getData_type();
is_save_log = thingsModel.getIs_save_log();
mode_type = thingsModel.getType();
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.specs;
import com.alibaba.fastjson.JSONArray;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelItemBase;
import lombok.Data;
@Data
public class ArrayModelOutput extends ThingsModelItemBase<JSONArray>
{
private String arrayType;
@Override
public void addValue(JSONArray jsonArray) {
setValue(jsonArray);
}
@Override
public String getView() {
if(null != getValue())
{
return JSONArray.toJSONString(getValue());
}
return "";
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.specs;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelItemBase;
import lombok.Data;
@Data
public class BoolModelOutput extends ThingsModelItemBase<Boolean>
{
private String falseText;
private String trueText;
@Override
public void addValue(Boolean bl) {
setValue( bl);
}
@Override
public String getView() {
if(getValue())
{
return trueText;
}else {
return falseText;
}
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.specs;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelItemBase;
import lombok.Data;
import java.math.BigDecimal;
@Data
public class DecimalModelOutput extends ThingsModelItemBase<BigDecimal>
{
private BigDecimal min;
private BigDecimal max;
private BigDecimal step;
private String unit;
@Override
public void addValue(BigDecimal object) {
setValue(object);
}
@Override
public String getView() {
return getView()+unit;
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.specs;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.EnumItemOutput;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelItemBase;
import lombok.Data;
import java.util.List;
@Data
public class EnumModelOutput extends ThingsModelItemBase<String>
{
private List<EnumItemOutput> enumList;
@Override
public void addValue(String object) {
setValue(object);
}
@Override
public String getView() {
for(EnumItemOutput enumItemOutput:enumList)
{
if(enumItemOutput.getValue().equals(getValue()))
{
return enumItemOutput.getText();
}
}
return getValue();
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.specs;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelItemBase;
import lombok.Data;
import java.math.BigDecimal;
@Data
public class IntegerModelOutput extends ThingsModelItemBase<Integer>
{
private BigDecimal min;
private BigDecimal max;
private BigDecimal step;
private String unit;
@Override
public void addValue(Integer object) {
setValue(object);
}
@Override
public String getView() {
return getView()+unit;
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.specs;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelItemBase;
import lombok.Data;
/**
* 字符串可以适配所有
*/
@Data
public class StringModelOutput extends ThingsModelItemBase<Object>
{
private int maxLength;
@Override
public void addValue(Object object) {
setValue(null!=getValue()?getValue()+"":null);
}
@Override
public String getView() {
return null!=getValue()?getValue()+"":null;
}
}
... ...
... ... @@ -2,10 +2,12 @@ package com.zhonglai.luhui.mqtt.comm.factory;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
/**
* mqtt业务协议
*/
public interface BusinessAgreement<T> {
ServerDto analysis(Topic topic, T data) throws Exception; //解析协议
ServerDto toData(BusinessDto data);
}
... ...
... ... @@ -27,14 +27,14 @@ public class Topic {
topic = Optional.ofNullable(topic).orElseThrow(()->new MyException("topic为空"));
String[] sts = topic.split("/");
String[] config = SysParameter.topicconfig.split("/");
for(int i=0;i<config.length;i++)
for(int i=1;i<config.length;i++)
{
String cf = config[i].replace("{{","").replace("}}","");
try {
Field field = this.getClass().getField(cf);
Field field = this.getClass().getDeclaredField(cf);
field.set(this,sts[i]);
} catch (NoSuchFieldException e) {
log.info("{}生成topic时没有属性",topic,cf);
log.info("{}生成topic时没有属性{}",topic,cf);
} catch (IllegalAccessException e) {
log.info("{}生成topic时无法给{}赋值{}",topic,cf,sts[i]);
}
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.iot.IotDevice;
import com.zhonglai.luhui.mqtt.comm.dto.iot.IotTerminal;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.util.DateUtils;
import com.zhonglai.luhui.mqtt.dto.topic.AddPostDto;
import com.zhonglai.luhui.mqtt.service.db.DeviceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
* 业务数据更新服务
*/
@Service
public class BusinessDataUpdateService {
@Autowired
private DataModeAnalysisService dataModeAnalysisService ;
@Autowired
private DeviceLogService deviceLogService ;
@Autowired
private DeviceService deviceService ;
/**
* 更新数据
* @param type
* @param topic
* @param data
*/
public void updataDta(Type type,Topic topic, JSONObject data)
{
List<LogDeviceOperation> operateHisList = new ArrayList<>();
for(String key:data.keySet())
{
Object o = data.get(key);
if(o instanceof JSONObject)
{
JSONObject jsData = data.getJSONObject(key);
if("0".equals(key)) //主机
{
IotDevice iotDevice = translateDevice(type,topic,jsData,operateHisList);
deviceService.updataDevice(iotDevice);
}else{ //终端
IotTerminal iotTerminal = translateTerminal(type,key,topic,jsData,operateHisList);
deviceService.updataTerminal(iotTerminal);
}
}
}
if(null != operateHisList && operateHisList.size() !=0 )
{
deviceLogService.saveOperationLog(operateHisList);
}
}
/**
* 更新网关
* @param type
* @param topic
* @param jsData
* @param operateHisList
* @return
*/
private IotDevice translateDevice(Type type,Topic topic , JSONObject jsData, List<LogDeviceOperation> operateHisList)
{
IotDevice olddevice = deviceService.getRedicDevice(topic.getClientid());
JSONObject summaryObjec = null;
if(jsData.containsKey("summary") && null != jsData.get("summary") && jsData.get("summary") instanceof JSONObject)
{
summaryObjec = jsData.getJSONObject("summary");
//记录summary内容变更日志
operateHisList.add(deviceLogService.newLogDeviceOperation(topic.getClientid(),summaryObjec.toString(),olddevice.getSummary(),"主机本地summary状态更新",jsData.toJSONString()));
jsData.remove("summary");
}
IotDevice device = JSONObject.parseObject(JSONObject.toJSONString(jsData),IotDevice.class);
device.setClient_id(topic.getClientid());
device.setUpdate_time(DateUtils.getNowTimeMilly());
if(null != summaryObjec)
{
device.setSummary(summaryObjec.toString());
}
JSONObject saveJson = dataModeAnalysisService.analysisThingsModelValue( topic.getClientid(),topic.getUsername(),jsData,true,"主机本地");
//更新数据
if(null != olddevice && "ADD".equals(type.name()))
{
String str = olddevice.getThings_model_value();
String newStr = deviceService.getNewAdddate(str,saveJson).toJSONString();
device.setThings_model_value(newStr);
}else{
device.setThings_model_value(saveJson.toJSONString());
}
return device;
}
/**
* 更新终端
* @param type "ADD"增量更新,"ALL"全量更新
* @param key
* @param topic
* @param jsData
* @param operateHisList
* @return
*/
private IotTerminal translateTerminal(Type type,String key, Topic topic , JSONObject jsData, List<LogDeviceOperation> operateHisList)
{
String id = topic.getClientid()+"_"+key;
JSONObject saveJson = dataModeAnalysisService.analysisThingsModelValue( id,topic.getUsername(),jsData,true,"终端本地");
IotTerminal terminal = new IotTerminal();
terminal.setId(id);
terminal.setUpdate_time(DateUtils.getNowTimeMilly());
//更新数据
IotTerminal oldterminal = deviceService.getRedicTerminal(id);
if(null != oldterminal && "ADD".equals(type.name()))
{
String str = oldterminal.getThings_model_value();
terminal.setThings_model_value(deviceService.getNewAdddate(str,saveJson).toJSONString());
}else{
terminal.setThings_model_value(saveJson.toJSONString());
}
return terminal;
}
public enum Type
{
ADD,ALL
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.config.RedisConfig;
import com.zhonglai.luhui.mqtt.comm.dao.BaseDao;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.iot.IotThingsModel;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelBase;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelDataTypeEnum;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelItemBase;
import com.zhonglai.luhui.mqtt.comm.util.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* 数据模型解析服务
*/
@Service
public class DataModeAnalysisService {
private static final Logger log = LoggerFactory.getLogger(DataModeAnalysisService.class);
private BaseDao baseDao = new BaseDao();
@Autowired
private RedisService redisService ;
@Autowired
private DeviceLogService dviceLogService;
/**
* 初始化物模型数据
*/
public void initDataThingsMode()
{
List<IotThingsModel> list = baseDao.findBysql("select * from `mqtt_broker`.`iot_things_model` where del_flag=0", IotThingsModel.class);
if(null != list && list.size() != 0)
{
for(IotThingsModel thingsModel:list)
{
redisService.hset(RedisConfig.FIELD+RedisConfig.THINGS_MODEL+thingsModel.getUser_name(),thingsModel.getIdentifier(),thingsModel);
}
}
}
public JSONObject analysisThingsModelValue(String id, String userName , JSONObject jsData)
{
return analysisThingsModelValue(id,userName,jsData,false,null);
}
/**
* 解析物模型数据
*/
public JSONObject analysisThingsModelValue(String id,String userName ,JSONObject jsData,boolean isSaveLog,String controlModel)
{
if(null != jsData && jsData.size() != 0 )
{
Map<Object, Object> thingsModelMap = redisService.hmget(RedisConfig.FIELD+RedisConfig.THINGS_MODEL+userName);
JSONObject rObjec = new JSONObject();
List<DeviceSensorData> list = new ArrayList<>();
List<LogDeviceOperation> oplist = new ArrayList<>();
for(String key:jsData.keySet())
{
Object object = thingsModelMap.get(key);
IotThingsModel thingsModel = null;
if(object instanceof IotThingsModel)
{
thingsModel = (IotThingsModel)object;
}else{ //没有配置的 都按字符串处理
thingsModel = new IotThingsModel();
thingsModel.setData_type(ThingsModelDataTypeEnum.STRING.name());
thingsModel.setIdentifier(key);
thingsModel.setModel_name(key);
thingsModel.setIs_top(0);
thingsModel.setIs_monitor(0);
thingsModel.setIs_save_log(0);
JSONObject jsonObject = new JSONObject();
jsonObject.put("maxLength",255);
thingsModel.setSpecs(jsonObject.toString());
}
Class<ThingsModelBase> aClass = Enum.valueOf(ThingsModelDataTypeEnum.class,thingsModel.getData_type()).getaClass();
ThingsModelBase thingsModelBase = JSON.parseObject(thingsModel.getSpecs(),aClass);
thingsModelBase.conversionThingsModel(thingsModel);
thingsModelBase.addValue(jsData.get(key));
ThingsModelItemBase thingsModelItemBase = (ThingsModelItemBase) thingsModelBase;
//记录数据日志
if(1==thingsModelItemBase.getIs_save_log())
{
DeviceSensorData sensorData = new DeviceSensorData();
sensorData.setDataType(key);
sensorData.setDataValue(jsData.getString(key));
sensorData.setCreatTime(DateUtils.getNowTimeMilly());
sensorData.setDeviceModel(userName);
sensorData.setDeviceInfoId(id);
list.add(sensorData);
}
//记录操作日志
oplist.add(dviceLogService.newLogDeviceOperation(id,jsData.getString(key),null,controlModel+thingsModelItemBase.getName()+"为"+thingsModelBase.getView(),jsData.toString()));
rObjec.put(key,JSONObject.toJSONString(thingsModelBase));
}
//日志入库
dviceLogService.saveDeviceSensorDataLog(list);
dviceLogService.saveOperationLog(oplist);
return rObjec;
}
return null;
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import com.zhonglai.luhui.mqtt.comm.dao.BaseDao;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.TableGenerateSqlEnum;
import com.zhonglai.luhui.mqtt.comm.util.DateUtils;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class DeviceLogService {
private BaseDao baseDao = new BaseDao();
public void saveOperationLog(List<LogDeviceOperation> list)
{
if(null != list && list.size() != 0 )
{
baseDao.insertList(list, TableGenerateSqlEnum.LogDeviceOperation.getNowTableName());
}
}
public void saveDeviceSensorDataLog(List<DeviceSensorData> list)
{
if(null != list && list.size() != 0 )
{
baseDao.insertList(list, TableGenerateSqlEnum.DeviceSensorData.getNowTableName());
}
}
public LogDeviceOperation newLogDeviceOperation(String id,String newStr,String oldstr,String operationDescribe,String operationInstruction)
{
LogDeviceOperation operateHis = new LogDeviceOperation();
operateHis.setDeviceInfoId(id);
operateHis.setDeviceNewState(newStr);
operateHis.setDeviceOldState(oldstr);
if(null != operateHis.getDeviceNewState() && !operateHis.getDeviceNewState().equals(operateHis.getDeviceOldState()))
{
operateHis.setIsStateChange(1);
}
operateHis.setOperationInstruction(operationInstruction);
operateHis.setDeviceOperationTime(DateUtils.getNowTimeMilly());
operateHis.setDeviceOperationType(-1);
operateHis.setOperationDescribe(operationDescribe);
return operateHis;
}
}
... ...
... ... @@ -26,8 +26,7 @@ public class MqttCallback implements MqttCallbackExtended {
private BusinessAgreementFactory businessAgreementFactory;
@Autowired
private TerminalService terminalService; //客户端服务
@Autowired
private ClienNoticeService clienNoticeService; //客户端通知服务
@Autowired
private DataPersistenceService dataPersistenceService; //数据持久化
... ... @@ -62,13 +61,17 @@ public class MqttCallback implements MqttCallbackExtended {
log.error("消息{},topic为空,不做解析");
return;
}
//解析协议
//准备数据
byte[] data = mqttMessage.getPayload();
IotDevice iotDevice = deviceService.getDeviceById(topic.getClientid());
//转化为协议对象
BusinessDto businessDto = BusinessDtoClassNew.newBean(topic.getPayloadtype(),data).analyticalModel(iotDevice.getThings_model_value());
BusinessAgreement businessAgreement = businessAgreementFactory.createBusinessAgreement(topic);
try {
IotDevice iotDevice = deviceService.getDeviceById(topic.getClientid());
BusinessDto businessDto = BusinessDtoClassNew.newBean(topic.getPayloadtype(),data).analyticalModel(iotDevice.getBusiness_model());
ServerDto dto = businessAgreement.analysis(topic,businessDto.toServerDto());
//解析为业务对象
ServerDto dto = businessAgreement.analysis(topic,businessAgreement.toData(businessDto));
if(null == dto)
{
return;
... ... @@ -86,11 +89,6 @@ public class MqttCallback implements MqttCallbackExtended {
}
dataPersistenceService.addDeviceSensorData(topic,dto);
//回复客户端消息
clienNoticeService.replySendMessage(topic,dto);
//回复终端消息
clienNoticeService.replyTerminalMessage(topic,dto);
} catch (Exception e) {
log.error(s+"消息解析异常",e);
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
... ... @@ -8,6 +9,7 @@ import lombok.experimental.Accessors;
@Data
@Accessors(chain = true)
public class AllPostDto implements ServerDto {
private JSONObject data;
@Override
public ServerAgreementContent getServerAgreementContent() {
return null;
... ...
... ... @@ -50,33 +50,6 @@ public class DeviceService {
return (IotDevice) baseDao.get(IotDevice.class,"client_id='"+id+"'","`mqtt_broker`.`iot_device`");
}
/**
* 全量更新终端模型数据
* @param JSONObject
* @param id
*/
public void fullUpdateTerminal(JSONObject JSONObject,String id)
{
IotTerminal terminal = new IotTerminal();
terminal.setThings_model_value(JSONObject.toString());
terminal.setId(id);
terminal.setUpdate_time(DateUtils.getNowTimeMilly());
baseDao.update(terminal,"id");
}
/**
* 全量更新网关的模型数据
* @param JSONObject
* @param id
*/
public void fullUpdateDevice(JSONObject JSONObject,String id)
{
IotTerminal terminal = new IotTerminal();
terminal.setThings_model_value(JSONObject.toString());
terminal.setId(id);
terminal.setUpdate_time(DateUtils.getNowTimeMilly());
baseDao.update(terminal,"id");
}
/**
* 获取缓存网关信息
... ... @@ -89,14 +62,15 @@ public class DeviceService {
if(null != object)
{
return (IotDevice)object;
}else{
return getDeviceById(id);
}
return null;
}
public void updataDevice(IotDevice device)
public void updataDevice(IotDevice iotDevice)
{
setRedicDevice(device);
baseDao.saveOrUpdateObject(device);
setRedicDevice(iotDevice);
baseDao.saveOrUpdateObject(iotDevice);
}
/**
... ... @@ -149,17 +123,22 @@ public class DeviceService {
}
/**
* 获取
* @param userId
* @param username
* @return
*/
public IotThingsModel getThingsModelsByUserIdAndIdentifier(Integer userId, String identifier)
public IotThingsModel getThingsModelsByUserIdAndIdentifier(String username, String identifier)
{
Object object = redisService.hget(RedisConfig.FIELD+RedisConfig.THINGS_MODEL+userId,identifier);
Object object = redisService.hget(RedisConfig.FIELD+RedisConfig.THINGS_MODEL+username,identifier);
return null != object?(IotThingsModel)object:null;
}
public JSONObject getNewdate(String oldstr, JSONObject saveJson)
/**
* 增量更新数据
* @param oldstr
* @param saveJson
* @return
*/
public JSONObject getNewAdddate(String oldstr, JSONObject saveJson)
{
JSONObject oldjs = new JSONObject();
if(StringUtils.isNoneBlank(oldstr))
... ...
package com.zhonglai.luhui.mqtt.service.topic;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.service.BusinessDataUpdateService;
import com.zhonglai.luhui.mqtt.dto.topic.AddPostDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 增量上报数据,不需要返回
*/
@Service("ADD_POST")
public class AddPostTopic implements BusinessAgreement<AddPostDto> {
@Autowired
private BusinessDataUpdateService businessDataUpdateService ;
@Override
public ServerDto analysis(Topic topic, AddPostDto data) throws Exception {
public ServerDto analysis(Topic topic, AddPostDto data) {
businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ADD,topic,data.getData());
return null;
}
@Override
public ServerDto toData(BusinessDto data) {
AddPostDto serverDto = new AddPostDto();
return serverDto.setData((JSONObject) data.getContentData());
}
}
... ...
package com.zhonglai.luhui.mqtt.service.topic;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.service.BusinessDataUpdateService;
import com.zhonglai.luhui.mqtt.dto.topic.AddPostDto;
import com.zhonglai.luhui.mqtt.dto.topic.AllPostDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 全量上报数据,不需要返回
*/
@Service("ALL_POST")
public class AllPostTopic implements BusinessAgreement<AllPostDto> {
@Autowired
private BusinessDataUpdateService businessDataUpdateService ;
@Override
public ServerDto analysis(Topic topic, AllPostDto data) throws Exception {
businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ALL,topic,data.getData());
return null;
}
@Override
public AllPostDto toData(BusinessDto data) {
return null;
AllPostDto serverDto = new AllPostDto();
return serverDto.setData((JSONObject) data.getContentData());
}
}
... ...
package com.zhonglai.luhui.mqtt.service.topic;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.service.ClienNoticeService;
import com.zhonglai.luhui.mqtt.dto.topic.AllPostDto;
import com.zhonglai.luhui.mqtt.dto.topic.GetDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
... ... @@ -12,6 +16,9 @@ import org.springframework.stereotype.Service;
*/
@Service("GET")
public class GetTopic implements BusinessAgreement<GetDto> {
@Autowired
private ClienNoticeService clienNoticeService; //客户端通知服务
@Override
public ServerDto analysis(Topic topic, GetDto data) throws Exception {
return null;
... ... @@ -19,6 +26,19 @@ public class GetTopic implements BusinessAgreement<GetDto> {
@Override
public GetDto toData(BusinessDto data) {
return null;
GetDto serverDto = new GetDto();
return serverDto;
}
/**
* 回复客户端消息
* @param topic
* @param dto
*/
private void replySendMessage(Topic topic,ServerDto dto)
{
//回复客户端消息
clienNoticeService.replySendMessage(topic,dto);
}
}
... ...
package com.zhonglai.luhui.mqtt.service.topic;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.service.ClienNoticeService;
import com.zhonglai.luhui.mqtt.dto.topic.AllPostDto;
import com.zhonglai.luhui.mqtt.dto.topic.PutReqDto;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
... ... @@ -12,6 +17,10 @@ import org.springframework.stereotype.Service;
*/
@Service("PUT_REQ")
public class PutReqTopic implements BusinessAgreement<PutReqDto> {
@Autowired
private ClienNoticeService clienNoticeService; //客户端通知服务
@Override
public ServerDto analysis(Topic topic, PutReqDto data) throws Exception {
return null;
... ... @@ -19,6 +28,12 @@ public class PutReqTopic implements BusinessAgreement<PutReqDto> {
@Override
public PutReqDto toData(BusinessDto data) {
return null;
PutReqDto putReqDto = new PutReqDto();
return putReqDto.setData((String) data.getContentData());
}
private void replyTerminalMessage(Topic topic,ServerDto dto) throws MqttException {
//回复终端消息
clienNoticeService.replyTerminalMessage(topic,dto);
}
}
... ...
... ... @@ -309,6 +309,7 @@
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
... ...
... ... @@ -590,4 +590,8 @@ public class StringUtils extends org.apache.commons.lang3.StringUtils
str = ch + str.substring(1);
return str;
}
public static void main(String[] args) {
System.out.println(StringUtils.toUnderScoreCase("deviceInfoId"));
}
}
\ No newline at end of file
... ...