作者 钟来

换新的方式实现数据服务

正在显示 51 个修改的文件 包含 1249 行增加813 行删除
... ... @@ -61,7 +61,7 @@ public class IotTerminalController extends BaseController
@ApiOperation("根据网关获取未分配终端")
@ApiImplicitParam(value = "网关id",name = "imei")
@GetMapping(value = "/getListByDevice{imei}")
@GetMapping(value = "/getListByDevice/{imei}")
public AjaxResult getListByDevice(@PathVariable("imei") String imei)
{
List<IotTerminal> list = iotTerminalService.selectNotUserIotTerminalListByImei(imei);
... ...
... ... @@ -25,6 +25,7 @@ public class IotDevice implements Serializable
private Integer active_time;
/** 主键 */
@PublicSQLConfig(isPrimarykey = true)
@ApiModelProperty("主键")
private String client_id;
... ...
... ... @@ -51,10 +51,54 @@ public class IotTerminal implements Serializable
@ApiModelProperty("数据更新时间")
private Integer data_update_time;
@ApiModelProperty("关联用户")
private Integer user_info_id;
@ApiModelProperty("创建时间")
private Integer create_time;
@ApiModelProperty("终端编号")
private String sensor_number;
@ApiModelProperty("在线状态")
private Integer online;
private UserTerminalGroupRelation userTerminalGroupRelation;
private IotDevice iotDevice;
public Integer getOnline() {
return online;
}
public void setOnline(Integer online) {
this.online = online;
}
public Integer getUser_info_id() {
return user_info_id;
}
public void setUser_info_id(Integer user_info_id) {
this.user_info_id = user_info_id;
}
public Integer getCreate_time() {
return create_time;
}
public void setCreate_time(Integer create_time) {
this.create_time = create_time;
}
public String getSensor_number() {
return sensor_number;
}
public void setSensor_number(String sensor_number) {
this.sensor_number = sensor_number;
}
public IotDevice getIotDevice() {
return iotDevice;
}
... ...
... ... @@ -10,4 +10,5 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
public @interface PublicSQLConfig {
boolean isSelect() default true;
boolean isPrimarykey() default false;
}
... ...
... ... @@ -167,6 +167,11 @@
<artifactId>lh-domain</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
</dependencies>
<build>
... ...
... ... @@ -11,6 +11,7 @@ import org.springframework.context.annotation.ComponentScan;
"com.zhonglai.luhui.mqtt.comm.config",
"com.zhonglai.luhui.mqtt.comm.agreement",
"com.zhonglai.luhui.mqtt.comm.service",
"com.zhonglai.luhui.mqtt.comm.rocketMq",
"com.zhonglai.luhui.mqtt.config",
"com.zhonglai.luhui.mqtt.service",
"com.zhonglai.luhui.mqtt.controller",
... ...
package com.zhonglai.luhui.mqtt.comm.clien;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ApiClientRePlyDto;
import com.zhonglai.luhui.mqtt.dto.Message;
/**
... ... @@ -12,9 +12,8 @@ public interface ClienConnection {
/**
* 回复
* @param agreementContent
*/
public void reply(ServerAgreementContent agreementContent);
public void reply(ApiClientRePlyDto apiClientRePlyDto);
/**
* 回复
... ...
... ... @@ -2,7 +2,7 @@ package com.zhonglai.luhui.mqtt.comm.clien.impl;
import com.zhonglai.luhui.mqtt.comm.clien.ClienConnection;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ApiClientRePlyDto;
import com.zhonglai.luhui.mqtt.dto.Message;
import com.zhonglai.luhui.mqtt.dto.MessageCode;
import com.zhonglai.luhui.mqtt.dto.MessageCodeType;
... ... @@ -18,8 +18,8 @@ public class ClienConnectionImpl implements ClienConnection {
}
@Override
public void reply(ServerAgreementContent agreementContent) {
agreementContent.setReplyMessage(message);
public void reply(ApiClientRePlyDto apiClientRePlyDto) {
apiClientRePlyDto.setReplyMessage(message);
this.notify();
}
... ...
package com.zhonglai.luhui.mqtt.comm.config;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import org.springframework.util.Assert;
import java.nio.charset.Charset;
/**
* Redis使用FastJson序列化
*
* @author ruoyi
*/
public class FastJson2JsonRedisSerializer<T> implements RedisSerializer<T>
{
@SuppressWarnings("unused")
private ObjectMapper objectMapper = new ObjectMapper();
public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
private Class<T> clazz;
public FastJson2JsonRedisSerializer(Class<T> clazz)
{
super();
this.clazz = clazz;
}
@Override
public byte[] serialize(T t) throws SerializationException
{
if (t == null)
{
return new byte[0];
}
//SerializerFeature.DisableCircularReferenceDetect解决重复引用的问题
return JSON.toJSONString(t, SerializerFeature.WriteClassName,SerializerFeature.DisableCircularReferenceDetect).getBytes(DEFAULT_CHARSET);
}
@Override
public T deserialize(byte[] bytes) throws SerializationException
{
if (bytes == null || bytes.length <= 0)
{
return null;
}
String str = new String(bytes, DEFAULT_CHARSET);
return JSON.parseObject(str, clazz);
}
public void setObjectMapper(ObjectMapper objectMapper)
{
Assert.notNull(objectMapper, "'objectMapper' must not be null");
this.objectMapper = objectMapper;
}
protected JavaType getJavaType(Class<?> clazz)
{
return TypeFactory.defaultInstance().constructType(clazz);
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.config;
import com.alibaba.fastjson.parser.ParserConfig;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
... ... @@ -14,8 +17,10 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import java.util.Properties;
@Configuration
public class RedisConfig {
... ... @@ -33,55 +38,62 @@ public class RedisConfig {
RedisConfig.FIELD = sysRedisField;
}
@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
//设置工厂链接
redisTemplate.setConnectionFactory(redisConnectionFactory);
//设置自定义序列化方式
setSerializeConfig(redisTemplate, redisConnectionFactory);
return redisTemplate;
}
@SuppressWarnings(value = { "unchecked", "rawtypes" })
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory)
{
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
private void setSerializeConfig(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory redisConnectionFactory) {
//对字符串采取普通的序列化方式 适用于key 因为我们一般采取简单字符串作为key
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//普通的string类型的key采用 普通序列化方式
redisTemplate.setKeySerializer(stringRedisSerializer);
//普通hash类型的key也使用 普通序列化方式
redisTemplate.setHashKeySerializer(stringRedisSerializer);
//解决查询缓存转换异常的问题 大家不能理解就直接用就可以了 这是springboot自带的jackson序列化类,但是会有一定问题
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.activateDefaultTyping(om.getPolymorphicTypeValidator(),ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
jackson2JsonRedisSerializer.setObjectMapper(om);
//普通的值采用jackson方式自动序列化
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
//hash类型的值也采用jackson方式序列化
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
//属性设置完成afterPropertiesSet就会被调用,可以对设置不成功的做一些默认处理
redisTemplate.afterPropertiesSet();
}
//配置
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
serializer.setObjectMapper(mapper);
/**
* redis消息监听
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer messageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(new MessageListenerAdapter(), new PatternTopic(getRedisKeyPath()+"**"));
// 使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(serializer);
return container;
}
// Hash的key也采用StringRedisSerializer的序列化方式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
public static String getRedisKeyPath()
{
return FIELD+ RedisKeyMqttUser.ROLEID+":"+ RedisKeyMqttUser.USERNAME+":";
template.afterPropertiesSet();
return template;
}
// @Bean
// public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
// RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
// //设置工厂链接
// redisTemplate.setConnectionFactory(redisConnectionFactory);
// //设置自定义序列化方式
// setSerializeConfig(redisTemplate, redisConnectionFactory);
// return redisTemplate;
// }
//
// private void setSerializeConfig(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory redisConnectionFactory) {
// //对字符串采取普通的序列化方式 适用于key 因为我们一般采取简单字符串作为key
// StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// //普通的string类型的key采用 普通序列化方式
// redisTemplate.setKeySerializer(stringRedisSerializer);
// //普通hash类型的key也使用 普通序列化方式
// redisTemplate.setHashKeySerializer(stringRedisSerializer);
// //解决查询缓存转换异常的问题 大家不能理解就直接用就可以了 这是springboot自带的jackson序列化类,但是会有一定问题
// Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
// ObjectMapper om = new ObjectMapper();
// om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// om.activateDefaultTyping(om.getPolymorphicTypeValidator(),ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
// jackson2JsonRedisSerializer.setObjectMapper(om);
// //普通的值采用jackson方式自动序列化
// redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
// //hash类型的值也采用jackson方式序列化
// redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
// //属性设置完成afterPropertiesSet就会被调用,可以对设置不成功的做一些默认处理
// redisTemplate.afterPropertiesSet();
// }
}
\ No newline at end of file
... ...
package com.zhonglai.luhui.mqtt.comm.dao;
import com.alibaba.fastjson.JSONArray;
import com.ruoyi.system.domain.tool.PublicSQLConfig;
import org.apache.commons.dbutils.*;
import org.apache.commons.dbutils.handlers.BeanHandler;
import org.apache.commons.dbutils.handlers.BeanListHandler;
... ... @@ -59,7 +60,7 @@ public class BaseDao {
sql += ",";
values += ",";
}
sql += "`"+changTableNameFromObject(field.getName())+"`";
sql += "`"+com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(field.getName())+"`";
values += "?";
valueList.add(value);
}
... ... @@ -119,7 +120,7 @@ public class BaseDao {
String sql = "insert into ";
if(StringUtils.isBlank(tableName))
{
tableName = changTableNameFromObject(object.getClass().getSimpleName());
tableName = com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(object.getClass().getSimpleName());
}
List<Object> valueList = new ArrayList<Object>();
... ... @@ -303,10 +304,20 @@ public class BaseDao {
}else{
Method method = null;
try {
method = object.getClass().getMethod("get"+ com.zhonglai.luhui.mqtt.comm.util.StringUtils.getName("id"));
String idName = "id";
for(Field field:fields)
{
PublicSQLConfig publicSQLConfig = field.getAnnotation(PublicSQLConfig.class);
if(null != publicSQLConfig && publicSQLConfig.isPrimarykey())
{
idName = field.getName();
}
}
method = object.getClass().getMethod("get"+ com.zhonglai.luhui.mqtt.comm.util.StringUtils.getName(idName));
Object value = method.invoke(object);
sql += " and ";
sql += "id=?";
sql += idName+"=?";
valueList.add(value);
} catch (NoSuchMethodException e) {
e.printStackTrace();
... ... @@ -337,7 +348,7 @@ public class BaseDao {
{
QueryRunner runner = new QueryRunner(dBFactory.getDataSource());
String tableName = changTableNameFromObject(clas.getSimpleName());
String tableName = com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(clas.getSimpleName());
String sql = "select * from "+tableName+" where 1=1 ";
try {
... ... @@ -383,12 +394,22 @@ public class BaseDao {
{
QueryRunner runner = new QueryRunner(dBFactory.getDataSource());
String tableName = changTableNameFromObject(clas.getSimpleName());
String tableName = com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(clas.getSimpleName());
String sql = "select * from "+tableName+" where 1=1 ";
String idName = "id";
Field[] fields = clas.getDeclaredFields();
for(Field field:fields)
{
PublicSQLConfig publicSQLConfig = field.getAnnotation(PublicSQLConfig.class);
if(null != publicSQLConfig && publicSQLConfig.isPrimarykey())
{
idName = field.getName();
}
}
try {
sql += " and id=?";
sql += " and "+idName+"=?";
Object[] params = {id};
return runner.query(sql, new BeanHandler<T>(clas, getRowProcessor()),params);
} catch (SQLException e) {
... ... @@ -407,7 +428,7 @@ public class BaseDao {
{
QueryRunner runner = new QueryRunner(dBFactory.getDataSource());
String tableName = changTableNameFromObject(clas.getSimpleName());
String tableName = com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(clas.getSimpleName());
String sql = "DELETE FROM "+tableName+" WHERE 1=1 ";
try {
List<Object> valueList = new ArrayList<Object>();
... ... @@ -440,7 +461,7 @@ public class BaseDao {
if(StringUtils.isBlank(tableName))
{
tableName = changTableNameFromObject(clas.getSimpleName());
tableName = com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(clas.getSimpleName());
}
String sql = "DELETE FROM "+tableName+" WHERE 1=1 ";
try {
... ... @@ -705,17 +726,17 @@ public class BaseDao {
if("like".equals(s))
{
value = "%"+value+"%";
like += " or " + "`"+changTableNameFromObject(field.getName())+"`"+s+" ?"+orther ;
like += " or " + "`"+com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(field.getName())+"`"+s+" ?"+orther ;
valueList.add(value);
continue;
}
if("time".equals(s))
{
s = ">";
orther = " and `"+changTableNameFromObject(field.getName())+"`< '"+whereMap.get("end_"+field.getName())+"'";
orther = " and `"+com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(field.getName())+"`< '"+whereMap.get("end_"+field.getName())+"'";
}
}
where += " and `"+changTableNameFromObject(field.getName())+"`"+s+" ?"+orther;
where += " and `"+com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(field.getName())+"`"+s+" ?"+orther;
valueList.add(value);
}
} catch (NoSuchMethodException e) {
... ... @@ -829,8 +850,8 @@ public class BaseDao {
sb.append(",");
update += ",";
}
sb.append("`"+changTableNameFromObject(field.getName())+"`");
update += "`"+changTableNameFromObject(field.getName())+"`"+"=VALUES("+"`"+changTableNameFromObject(field.getName())+"`)";
sb.append("`"+com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(field.getName())+"`");
update += "`"+com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(field.getName())+"`"+"=VALUES("+"`"+com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(field.getName())+"`)";
}
sb.append(")");
sb.append("VALUES ");
... ... @@ -901,7 +922,7 @@ public class BaseDao {
{
Object value = mapwhere.get(key);
where += " and ";
where += changTableNameFromObject(key) + "=?";
where += com.zhonglai.luhui.mqtt.comm.util.StringUtils.toUnderScoreCase(key) + "=?";
valueList.add(value);
}
return where;
... ...
package com.zhonglai.luhui.mqtt.comm.dto;
import com.zhonglai.luhui.mqtt.dto.Message;
/**
* 回复给前端的消息
*/
public interface ApiClientRePlyDto {
void setReplyMessage(Message message);
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto;
import com.ruoyi.system.domain.IotDevice;
import com.ruoyi.system.domain.IotTerminal;
import java.util.List;
public interface ServerDto {
ServerAgreementContent getServerAgreementContent();
boolean isReplyMessage();
List<DeviceSensorData> getDeviceSensorData();
List<LogDeviceOperation> getOperationLog();
public abstract class ServerDto {
private IotDevice iotDevice;
private List<IotTerminal> iotTerminalList;
private List<DeviceSensorData> deviceSensorDataList;
private List<LogDeviceOperation> logDeviceOperationList;
public IotDevice getIotDevice() {
return iotDevice;
}
public void setIotDevice(IotDevice iotDevice) {
this.iotDevice = iotDevice;
}
public List<IotTerminal> getIotTerminalList() {
return iotTerminalList;
}
public void setIotTerminalList(List<IotTerminal> iotTerminalList) {
this.iotTerminalList = iotTerminalList;
}
public List<DeviceSensorData> getDeviceSensorDataList() {
return deviceSensorDataList;
}
public void setDeviceSensorDataList(List<DeviceSensorData> deviceSensorDataList) {
this.deviceSensorDataList = deviceSensorDataList;
}
public List<LogDeviceOperation> getLogDeviceOperationList() {
return logDeviceOperationList;
}
public void setLogDeviceOperationList(List<LogDeviceOperation> logDeviceOperationList) {
this.logDeviceOperationList = logDeviceOperationList;
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.dto.Message;
public interface ServerAgreementContent {
/**
* 回复给终端的消息
*/
public interface TerminalClientRePlyDto {
byte[] getCommd();
String getReplyCommdTopic(Topic topic);
void setReplyMessage(Message message);
}
... ...
package com.zhonglai.luhui.mqtt.comm.rocketMq;
import com.alibaba.fastjson.JSON;
import com.zhonglai.luhui.mqtt.comm.service.MqttCallback;
import com.zhonglai.luhui.mqtt.dto.DeviceCommandApi;
import com.zhonglai.luhui.mqtt.dto.Message;
import com.zhonglai.luhui.mqtt.dto.MessageCode;
import com.zhonglai.luhui.mqtt.service.db.DeviceService;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(consumerGroup = "deviceCommand", topic = "lh-mqtt-service-deviceCommand")
public class RocketMqService implements RocketMQReplyListener<MessageExt, Message> {
private static final Logger log = LoggerFactory.getLogger(MqttCallback.class);
@Autowired
private DeviceService deviceService ;
@Override
public Message onMessage(MessageExt messageExt) {
log.info("监听到消息{}",messageExt);
// String clint = MessageUtil.getReplyToClient(messageExt);
String str = new String(messageExt.getBody());
DeviceCommandApi deviceCommandApi = JSON.parseObject(str, DeviceCommandApi.class);
try {
return deviceCommandApi.invokeApi(deviceService);
} catch (Exception e) {
log.error("执行异常",e);
}
return new Message(MessageCode.DEFAULT_FAIL_CODE,"服务器玩脱了");
}
}
... ...
... ... @@ -6,10 +6,13 @@ import com.ruoyi.system.domain.IotTerminal;
import com.zhonglai.luhui.mqtt.comm.config.SysParameter;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.util.DateUtils;
import com.zhonglai.luhui.mqtt.dto.SaveDataDto;
import com.zhonglai.luhui.mqtt.dto.topic.AddPostDto;
import com.zhonglai.luhui.mqtt.dto.topic.AllPostDto;
import com.zhonglai.luhui.mqtt.service.CacheServiceImpl;
import com.zhonglai.luhui.mqtt.service.db.DeviceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
... ... @@ -34,10 +37,14 @@ public class BusinessDataUpdateService {
@Autowired
private DataModeAnalysisService dataModeAnalysisService ;
@Autowired
private DeviceLogService deviceLogService ;
@Autowired
private DeviceService deviceService ;
@Autowired
private CacheServiceImpl cacheService ;
@Autowired
private DeviceLogService deviceLogService;
@Value("${server.port}")
private long port;
... ... @@ -50,11 +57,21 @@ public class BusinessDataUpdateService {
* 更新数据
* @param type
* @param topic
* @param data
*/
public void updataDta(Type type,Topic topic, JSONObject data,boolean isOperLog,List<LogDeviceOperation> operateHisList, List<DeviceSensorData> list)
public void updataDta(Type type, Topic topic, boolean isOperLog, ServerDto serverDto)
{
IotDevice olddevice = deviceService.getRedicDevice(topic.getClientid());
IotDevice olddevice = cacheService.getIotDevice(topic.getClientid());
JSONObject data = new JSONObject();
switch (type)
{
case ADD:
data = ((AddPostDto)serverDto).getData();
break;
case ALL:
data = ((AllPostDto)serverDto).getData();
break;
}
for(String key:data.keySet())
{
... ... @@ -64,7 +81,7 @@ public class BusinessDataUpdateService {
JSONObject jsData = data.getJSONObject(key);
if("0".equals(key)) //主机
{
IotDevice iotDevice = translateDevice(type,olddevice,jsData,isOperLog,operateHisList,list);
IotDevice iotDevice = translateDevice(type,olddevice,jsData,isOperLog,serverDto);
if(isText)
{
iotDevice.setListen_service_ip("127.0.0.1"+":"+port+contextPath);
... ... @@ -79,12 +96,12 @@ public class BusinessDataUpdateService {
iotDevice.setDevice_life(olddevice.getDevice_life());
iotDevice.setData_update_time(DateUtils.getNowTimeMilly());
iotDevice.setName(olddevice.getName());
deviceService.updataDevice(iotDevice);
serverDto.setIotDevice(iotDevice);
}else{ //终端
IotTerminal iotTerminal = translateTerminal(type,key,olddevice,jsData,isOperLog,operateHisList,list);
IotTerminal iotTerminal = translateTerminal(type,key,olddevice,jsData,isOperLog,serverDto);
logger.info("更新终端数据{}",iotTerminal);
iotTerminal.setData_update_time(DateUtils.getNowTimeMilly());
deviceService.updataTerminal(iotTerminal,olddevice.getDevice_life());
serverDto.getIotTerminalList().add(iotTerminal);
}
}
}
... ... @@ -95,17 +112,17 @@ public class BusinessDataUpdateService {
* @param type
* @param olddevice
* @param jsData
* @param operateHisList
* @param serverDto
* @return
*/
private IotDevice translateDevice(Type type, IotDevice olddevice , JSONObject jsData,boolean isOperLog, List<LogDeviceOperation> operateHisList, List<DeviceSensorData> list)
private IotDevice translateDevice(Type type, IotDevice olddevice , JSONObject jsData,boolean isOperLog, ServerDto serverDto)
{
JSONObject summaryObjec = null;
if(jsData.containsKey("summary") && null != jsData.get("summary") && jsData.get("summary") instanceof JSONObject)
{
summaryObjec = jsData.getJSONObject("summary");
//记录summary内容变更日志
operateHisList.add(deviceLogService.newLogDeviceOperation(olddevice.getClient_id(),summaryObjec.toString(),olddevice.getSummary(),"主机本地summary状态更新",jsData.toJSONString()));
serverDto.getLogDeviceOperationList().add(deviceLogService.newLogDeviceOperation(olddevice.getClient_id(),summaryObjec.toString(),olddevice.getSummary(),"主机本地summary状态更新",jsData.toJSONString()));
jsData.remove("summary");
}
IotDevice device = JSONObject.parseObject(JSONObject.toJSONString(jsData),IotDevice.class);
... ... @@ -115,7 +132,7 @@ public class BusinessDataUpdateService {
device.setSummary(summaryObjec.toString());
}
SaveDataDto saveDataDto = dataModeAnalysisService.analysisThingsModelValue( olddevice.getClient_id(),olddevice.getMqtt_username(),jsData,"主机本地",isOperLog,operateHisList,list);
SaveDataDto saveDataDto = dataModeAnalysisService.analysisThingsModelValue( olddevice.getClient_id(),olddevice.getMqtt_username(),jsData,"主机本地",isOperLog,serverDto);
//更新数据
if(null != olddevice && ("ADD".equals(type.name())|| "READ".equals(type.name())))
{
... ... @@ -139,30 +156,20 @@ public class BusinessDataUpdateService {
* @param key
* @param olddevice
* @param jsData
* @param operateHisList
* @param serverDto
* @return
*/
private IotTerminal translateTerminal(Type type,String key, IotDevice olddevice , JSONObject jsData,boolean isOperLog, List<LogDeviceOperation> operateHisList, List<DeviceSensorData> list)
private IotTerminal translateTerminal(Type type,String key, IotDevice olddevice , JSONObject jsData,boolean isOperLog,ServerDto serverDto)
{
String id = olddevice.getClient_id()+"_"+key;
SaveDataDto saveDataDto = dataModeAnalysisService.analysisThingsModelValue( id,olddevice.getMqtt_username(),jsData,"终端本地",isOperLog,operateHisList,list);
SaveDataDto saveDataDto = dataModeAnalysisService.analysisThingsModelValue( id,olddevice.getMqtt_username(),jsData,"终端本地",isOperLog,serverDto);
IotTerminal terminal = new IotTerminal();
terminal.setId(id);
terminal.setDevice_id(olddevice.getClient_id());
terminal.setProduct_id(olddevice.getProduct_id());
terminal.setMqtt_username(olddevice.getMqtt_username());
//更新数据
IotTerminal oldterminal = deviceService.getRedicTerminal(id);
if(null == oldterminal)
{
oldterminal = new IotTerminal();
oldterminal.setDevice_id(olddevice.getClient_id());
oldterminal.setId(id);
oldterminal.setMqtt_username(olddevice.getMqtt_username());
oldterminal.setName(olddevice.getMqtt_username()+"终端"+key);
oldterminal.setProduct_id(olddevice.getProduct_id());
deviceService.updataTerminal(oldterminal,olddevice.getDevice_life());
}
IotTerminal oldterminal = cacheService.getIotTerminal(id);
if(null != oldterminal && ("ADD".equals(type.name())|| "READ".equals(type.name())))
{
String str = oldterminal.getThings_model_value();
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import com.ruoyi.system.domain.IotDevice;
import com.ruoyi.system.domain.IotTerminal;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
... ... @@ -9,4 +11,6 @@ import com.zhonglai.luhui.mqtt.comm.factory.Topic;
*/
public interface CacheService {
boolean updateCache(Topic topic, ServerDto dto); //返回是否需要持久化
IotDevice getIotDevice(String id);
IotTerminal getIotTerminal(String id);
}
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import com.mysql.cj.x.protobuf.MysqlxDatatypes;
import com.zhonglai.luhui.mqtt.comm.clien.ClienConnection;
import com.zhonglai.luhui.mqtt.comm.clien.impl.ClienConnectionImpl;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.ApiClientRePlyDto;
import com.zhonglai.luhui.mqtt.comm.dto.TerminalClientRePlyDto;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.util.ByteUtil;
import com.zhonglai.luhui.mqtt.dto.Message;
... ... @@ -91,34 +91,39 @@ public class ClienNoticeService {
return clienConnectionMap.get(topic.generateClienKey());
}
public void replySendMessage(Topic topic, ServerDto dto)
/**
* 通知给api操作端
* @param topic
* @param apiClientRePlyDto
*/
public void replySendMessage(Topic topic, ApiClientRePlyDto apiClientRePlyDto)
{
log.info("开始通知{},数据:{}",topic,dto);
log.info("开始通知{},数据:{}",topic,apiClientRePlyDto);
//判断有没有需要回复的客户端,如果有就回复
if(dto.isReplyMessage())
ClienConnection clienConnection = getClienConnection(topic);
if(null != clienConnection)
{
ClienConnection clienConnection = getClienConnection(topic);
if(null != clienConnection)
synchronized(clienConnection)
{
synchronized(clienConnection)
{
log.info("正在通知{},通知结果{}",topic,dto);
clienConnection.reply(dto.getServerAgreementContent());
}
log.info("正在通知{},通知结果{}",topic,apiClientRePlyDto);
clienConnection.reply(apiClientRePlyDto);
}
}
log.info("结束通知{}",topic);
}
public void replyTerminalMessage(Topic topic, ServerDto dto) throws MqttException {
if(dto.isReplyMessage() && null != dto.getServerAgreementContent().getReplyCommdTopic(topic))
{
String tc = dto.getServerAgreementContent().getReplyCommdTopic(topic);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(dto.getServerAgreementContent().getCommd());
log.info("回复终端{}的消息{}",tc,new String(mqttMessage.getPayload()));
terminalService.publish(dto.getServerAgreementContent().getReplyCommdTopic(topic),mqttMessage);
}
/**
* 通知给下位机终端
* @param topic
* @param terminalClientRePlyDto
* @throws MqttException
*/
public void replyTerminalMessage(Topic topic, TerminalClientRePlyDto terminalClientRePlyDto) throws MqttException {
String tc = terminalClientRePlyDto.getReplyCommdTopic(topic);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(terminalClientRePlyDto.getCommd());
log.info("回复终端{}的消息{}",tc,new String(mqttMessage.getPayload()));
terminalService.publish(terminalClientRePlyDto.getReplyCommdTopic(topic),mqttMessage);
}
}
... ...
... ... @@ -6,6 +6,7 @@ import com.ruoyi.system.domain.IotThingsModel;
import com.zhonglai.luhui.mqtt.comm.dao.BaseDao;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelBase;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelDataTypeEnum;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelItemBase;
... ... @@ -19,6 +20,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
... ... @@ -39,26 +41,30 @@ public class DataModeAnalysisService {
/**
* 初始化物模型数据
*/
public void initDataThingsMode(String roleIds,String usernames)
public void initDataThingsMode(List<String> roleIds,List<String> usernames)
{
String sql = "SELECT a.*,b.mqtt_username mqtt_username FROM `mqtt_broker`.`iot_things_model` a LEFT JOIN `mqtt_broker`.`iot_product` b ON a.`product_id`=b.`id` WHERE a.del_flag=0 ";
StringBuffer sql = new StringBuffer("SELECT a.*,b.mqtt_username mqtt_username FROM `mqtt_broker`.`iot_things_model` a LEFT JOIN `mqtt_broker`.`iot_product` b ON a.`product_id`=b.`id` WHERE a.del_flag=0 ");
if(StringUtils.isNotEmpty(roleIds))
{
sql += " AND b.`role_id` IN("+roleIds+")";
sql.append(" AND b.`role_id` IN(");
sql.append(StringUtils.join(roleIds,","));
sql.append(")");
}
if(StringUtils.isNotEmpty(usernames))
{
sql += " AND b.`username` IN("+usernames+")";
sql.append(" AND b.`mqtt_username` IN('");
sql.append(StringUtils.join(usernames,"','"));
sql.append("')");
}
List<IotThingsModel> list = baseDao.findBysql(sql, IotThingsModel.class);
List<IotThingsModel> list = baseDao.findBysql(sql.toString(), IotThingsModel.class);
terminalDataThingsModeService.saveIotThingsModelToUser(list);
}
/**
* 解析物模型数据
*/
public SaveDataDto analysisThingsModelValue(String id,String userName ,JSONObject jsData,String controlModel,boolean isOperLog, List<LogDeviceOperation> operateHisList, List<DeviceSensorData> list)
public SaveDataDto analysisThingsModelValue(String id,String userName ,JSONObject jsData,String controlModel,boolean isOperLog, ServerDto serverDto)
{
if(null != jsData && jsData.size() != 0 )
{
... ... @@ -96,7 +102,7 @@ public class DataModeAnalysisService {
ThingsModelItemBase thingsModelItemBase = (ThingsModelItemBase) thingsModelBase;
//记录数据日志
if(1==thingsModelItemBase.getIs_save_log() && null != list)
if(1==thingsModelItemBase.getIs_save_log() && null != serverDto.getDeviceSensorDataList())
{
DeviceSensorData sensorData = new DeviceSensorData();
sensorData.setDataType(key);
... ... @@ -104,13 +110,13 @@ public class DataModeAnalysisService {
sensorData.setCreatTime(DateUtils.getNowTimeMilly());
sensorData.setDeviceModel(userName);
sensorData.setDeviceInfoId(id);
list.add(sensorData);
serverDto.getDeviceSensorDataList().add(sensorData);
}
//记录操作日志
if(isOperLog && null != operateHisList)
if(isOperLog && null != serverDto.getLogDeviceOperationList())
{
operateHisList.add(dviceLogService.newLogDeviceOperation(id,thingsModelBase.getSaveView(),null,controlModel+thingsModelItemBase.getName()+"为"+thingsModelBase.getView(),jsData.toString()));
serverDto.getLogDeviceOperationList().add(dviceLogService.newLogDeviceOperation(id,thingsModelBase.getSaveView(),null,controlModel+thingsModelItemBase.getName()+"为"+thingsModelBase.getView(),jsData.toString()));
}
... ...
... ... @@ -19,30 +19,6 @@ public abstract class DataPersistenceService {
public abstract void persistence(Topic topic, ServerDto serverDto);
/**
* 记录操作日志
* @param deviceOperationTypeEnum
* @param deviceNewState
*/
public void logDeviceOperation(String deviceInfoId, DeviceOperationTypeEnum deviceOperationTypeEnum, String deviceNewState)
{
List<Object> operateHisList = new ArrayList<>();
//如果老的和新的不一致,记录日志
LogDeviceOperation operateHis = new LogDeviceOperation();
operateHis.setDeviceInfoId(deviceInfoId);
deviceOperationTypeEnum.setDeviceOperationLog(operateHis);
if(StringUtils.isNoneBlank(deviceNewState))
{
operateHis.setDeviceNewState(deviceNewState);
}
operateHis.setIsStateChange(1);
operateHis.setDeviceOperationTime(DateUtils.getNowTimeMilly());
operateHisList.add(operateHis);
baseDao.insertList(operateHisList, TableGenerateSqlEnum.LogDeviceOperation.getNowTableName());
}
/**
* 离线处理
* @param imei
*/
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import com.zhonglai.luhui.mqtt.comm.config.RedisConfig;
import com.zhonglai.luhui.mqtt.comm.service.redis.RedisService;
import com.zhonglai.luhui.mqtt.service.db.DeviceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
... ... @@ -26,20 +24,20 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
String devicePath = deviceService.getRedicDeviceKeyPath();
String terminalPath = deviceService.getRedicTerminalKeyPath();
if(expiredKey.startsWith(devicePath)) //如果是主机
{
String imei = expiredKey.replace(devicePath,"").replace(":","");
dtaPersistenceService.offLine(imei);
}
if(expiredKey.startsWith(terminalPath)) //如果是终端
{
String imei = expiredKey.replace(devicePath,"").replace(":","");
dtaPersistenceService.offLine(imei);
}
// String expiredKey = message.toString();
//
// String devicePath = deviceService.getRedicDeviceKeyPath();
// String terminalPath = deviceService.getRedicTerminalKeyPath();
// if(expiredKey.startsWith(devicePath)) //如果是主机
// {
// String imei = expiredKey.replace(devicePath,"").replace(":","");
// dtaPersistenceService.offLine(imei);
// }
//
// if(expiredKey.startsWith(terminalPath)) //如果是终端
// {
// String imei = expiredKey.replace(devicePath,"").replace(":","");
// dtaPersistenceService.offLine(imei);
// }
}
}
... ...
... ... @@ -17,6 +17,8 @@ import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
... ... @@ -99,30 +101,22 @@ public class TerminalService {
private void initDataThingsMode()
{
String roleids="";
String usernames="";
List<String> roleids=new ArrayList<>();
List<String> usernames= new ArrayList<>();
for(String topicstr:topics.split(","))
{
Topic topic = new Topic(topicstr);
if(null != topic)
{
String rild = topic.getRoleid();
if(StringUtils.isNoneBlank(rild) && !"+".equals(rild))
if(StringUtils.isNoneBlank(rild) && !"+".equals(rild) && !roleids.contains(rild))
{
if(!"".equals(roleids))
{
roleids +=",";
}
roleids +=rild;
roleids.add(rild) ;
}
String username = topic.getUsername();
if(StringUtils.isNoneBlank(username) && !"+".equals(username))
if(StringUtils.isNoneBlank(username) && !"+".equals(username) && !usernames.contains(username))
{
if(!"".equals(usernames))
{
usernames +=",";
}
usernames +="'"+username+"'";
usernames.add(username);
}
}
}
... ...
... ... @@ -30,148 +30,55 @@ import java.util.Map;
@RestController
@RequestMapping("/device")
public class DeviceController {
@Autowired
private ClienNoticeService clienNoticeService;
@Autowired
private TerminalDataThingsModeService terminalDataThingsModeService;
private BaseDao baseDao = new BaseDao();
@Autowired
private DeviceService deviceService ;
@Autowired
private TerminalDataThingsModeService terminalDataThingsModeService;
@ApiOperation("控制发16进制指令")
@RequestMapping(value = "controlHex/{clienid}",method = RequestMethod.POST)
public Message controlHex(@PathVariable String clienid, String data) throws MqttException, InterruptedException {
Topic topic = getTopicFromDb(clienid);
if(null == topic)
{
return new Message(MessageCode.DEFAULT_FAIL_CODE,"mqtt_username查询失败");
}
topic.setTopicType("PUT");
topic.setMessageid(DateUtils.getNowTimeMilly()+"");
MqttMessage mqttMessage = new MqttMessage();
byte[] bs = hexStringToByte(data.trim().toUpperCase());
mqttMessage.setPayload(bs);
Message message = clienNoticeService.sendMessage(topic,mqttMessage);
return message;
return deviceService.controlHex(clienid,data);
}
@ApiOperation("读")
@RequestMapping(value = "read/{clienid}",method = RequestMethod.POST)
public Message read(@PathVariable String clienid,@RequestBody Map<String,Object> map) throws MqttException, InterruptedException {
if(null == map || map.size() ==0)
{
return new Message(MessageCode.DEFAULT_FAIL_CODE,"参数验证失败");
}
Topic topic = getTopicFromDb(clienid);
if(null == topic)
{
return new Message(MessageCode.DEFAULT_FAIL_CODE,"mqtt_username查询失败");
}
topic.setTopicType("READ");
topic.setMessageid(DateUtils.getNowTimeMilly()+"");
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(JSON.toJSONString(map).trim().getBytes());
Message message = clienNoticeService.sendMessage(topic,mqttMessage);
return message;
return deviceService.read(clienid,map);
}
@ApiOperation("强行断开链接")
@RequestMapping(value = "closeSession/{clienid}",method = RequestMethod.POST)
public Message closeSession(@PathVariable String clienid) throws MqttException, InterruptedException {
MqttMessage mqttMessage = new MqttMessage();
byte[] bs = hexStringToByte(clienid.trim().toUpperCase());
mqttMessage.setPayload(bs);
clienNoticeService.sendMessage("CLOSE",mqttMessage);
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"端口请求已发送");
return deviceService.closeSession(clienid);
}
@ApiOperation("删除主机")
@Transactional
@RequestMapping(value = "delIotDevice/{client_id}",method = RequestMethod.POST)
public Message delIotDevice(@PathVariable String client_id) throws MqttException, InterruptedException {
closeSession(client_id); //强制下线
deviceService.deletRedisDevice(client_id);
return new Message(MessageCode.DEFAULT_SUCCESS_CODE);
return deviceService.delIotDevice(client_id);
}
@ApiOperation("删除终端")
@Transactional
@RequestMapping(value = "delIotTerminal/{client_id}/{number}",method = RequestMethod.POST)
public Message delIotTerminal(@PathVariable String client_id,@PathVariable String number) throws MqttException, InterruptedException {
closeSession(client_id); //强制下线
deviceService.deletRedisTerminal(client_id+"_"+number);
return new Message(MessageCode.DEFAULT_SUCCESS_CODE);
return deviceService.delIotTerminal(client_id,number);
}
@ApiOperation("控制发json")
@RequestMapping(value = "control/{clienid}",method = RequestMethod.POST)
public Message control(@PathVariable String clienid,@RequestBody Map<String,Object> map) throws MqttException, InterruptedException {
if(null == map || map.size() ==0)
{
return new Message(MessageCode.DEFAULT_FAIL_CODE,"参数验证失败");
}
Topic topic = getTopicFromDb(clienid);
if(null == topic)
{
return new Message(MessageCode.DEFAULT_FAIL_CODE,"mqtt_username查询失败");
}
topic.setTopicType("PUT");
topic.setMessageid(DateUtils.getNowTimeMilly()+"");
for(String key:map.keySet())
{
Object sendMap = map.get(key);
JSONObject jsonObject = JSONObject.parseObject(JSONObject.toJSONString(sendMap));
for(String skey:jsonObject.keySet())
{
IotThingsModel thingsModel = terminalDataThingsModeService.getIotThingsModel(topic.getUsername(),skey);
if(null == thingsModel) //没有配置的 都按字符串处理
{
thingsModel = new IotThingsModel();
thingsModel.setData_type(ThingsModelDataTypeEnum.STRING.name());
thingsModel.setIdentifier(key);
thingsModel.setModel_name(key);
thingsModel.setIs_top(0);
thingsModel.setIs_monitor(0);
thingsModel.setIs_save_log(0);
thingsModel.setIs_config(0);
JSONObject spes = new JSONObject();
spes.put("maxLength",255);
thingsModel.setSpecs(spes.toString());
}
String data_type = thingsModel.getData_type().toUpperCase();
Class<ThingsModelBase> aClass = Enum.valueOf(ThingsModelDataTypeEnum.class,data_type).getaClass();
ThingsModelBase thingsModelBase = JSON.parseObject(thingsModel.getSpecs(),aClass);
jsonObject.put(skey,thingsModelBase.getCmdView(jsonObject.get(skey)));
}
map.put(key,jsonObject);
}
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(JSON.toJSONString(map).trim().getBytes());
Message message = clienNoticeService.sendMessage(topic,mqttMessage);
return message;
return deviceService.control(clienid,map);
}
@ApiOperation("获取指定设备版本信息")
@RequestMapping(value = "getFirmwareVersion/{app_type}",method = RequestMethod.POST)
public Message getFirmwareVersion(@PathVariable String app_type)
{
List list = baseDao.findListBysql("SELECT md5str,upload_file_path uploadFilePath,version_number versionNumber,code FROM liu_yu_le.`app_file_upgrade` WHERE app_type='"+app_type+"' ORDER BY id DESC limit 5");
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,list);
return deviceService.getFirmwareVersion(app_type);
}
@ApiOperation("测试")
... ... @@ -182,42 +89,4 @@ public class DeviceController {
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,map);
}
/**
* 通过数据获取发送消息的topic
* @param clienid
* @return
*/
private Topic getTopicFromDb(String clienid)
{
JSONArray jsonArray = baseDao.findBysql("SELECT b.`role_id` roleid,b.`mqtt_username` username,a.`client_id` clientid,a.`payload_type` payloadtype FROM `iot_device` a LEFT JOIN `iot_product` b ON a.`product_id`=b.`id` WHERE client_id='"+clienid+"'");
if(null == jsonArray || jsonArray.size()==0 || null == jsonArray.getJSONObject(0).get("username"))
{
return null;
}
Topic topic = JSON.parseObject( jsonArray.getJSONObject(0).toJSONString(),Topic.class);
topic.setTopicType("PUT");
topic.setMessageid(DateUtils.getNowTimeMilly()+"");
return topic;
}
/**
* 把16进制字符串转换成字节数组
*
* @param hex
* @return
*/
public static byte[] hexStringToByte(String hex) {
int len = (hex.length() / 2);
byte[] result = new byte[len];
char[] achar = hex.toCharArray();
for (int i = 0; i < len; i++) {
int pos = i * 2;
result[i] = (byte) (toByte(achar[pos]) << 4 | toByte(achar[pos + 1]));
}
return result;
}
private static byte toByte(char c) {
byte b = (byte) "0123456789ABCDEF".indexOf(c);
return b;
}
}
... ...
package com.zhonglai.luhui.mqtt.dto;
public enum ApiName {
controlHex,
read,
closeSession,
delIotDevice,
delIotTerminal,
control,
getFirmwareVersion,
}
... ...
package com.zhonglai.luhui.mqtt.dto;
import com.zhonglai.luhui.mqtt.service.db.DeviceService;
import org.eclipse.paho.client.mqttv3.MqttException;
/**
* 设备指令接口
*/
public class DeviceCommandApi {
private ApiName apiName; //指令接口名称
private DeviceCommandApiParameter deviceCommandApiParameter; //参数
public Message invokeApi(DeviceService deviceService) throws MqttException, InterruptedException {
switch (apiName)
{
case read:
return deviceService.read(deviceCommandApiParameter.getClient_id(),deviceCommandApiParameter.getMap());
case control:
return deviceService.control(deviceCommandApiParameter.getClient_id(), deviceCommandApiParameter.getMap());
case controlHex:
return deviceService.controlHex(deviceCommandApiParameter.getClient_id(),deviceCommandApiParameter.getData());
case closeSession:
return deviceService.closeSession(deviceCommandApiParameter.getClient_id());
case delIotDevice:
return deviceService.delIotDevice(deviceCommandApiParameter.getClient_id());
case delIotTerminal:
return deviceService.delIotTerminal(deviceCommandApiParameter.getClient_id(),deviceCommandApiParameter.getNumber());
case getFirmwareVersion:
return deviceService.getFirmwareVersion(deviceCommandApiParameter.getData());
default:
return new Message(MessageCode.DEFAULT_FAIL_CODE,"接口不存在");
}
}
public ApiName getApiName() {
return apiName;
}
public void setApiName(ApiName apiName) {
this.apiName = apiName;
}
public DeviceCommandApiParameter getDeviceCommandApiParameter() {
return deviceCommandApiParameter;
}
public void setDeviceCommandApiParameter(DeviceCommandApiParameter deviceCommandApiParameter) {
this.deviceCommandApiParameter = deviceCommandApiParameter;
}
}
... ...
package com.zhonglai.luhui.mqtt.dto;
import java.util.Map;
public class DeviceCommandApiParameter {
private String client_id;
private Map<String, Object> map;
private String data;
private String number;
public String getClient_id() {
return client_id;
}
public void setClient_id(String client_id) {
this.client_id = client_id;
}
public Map<String, Object> getMap() {
return map;
}
public void setMap(Map<String, Object> map) {
this.map = map;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
public String getNumber() {
return number;
}
public void setNumber(String number) {
this.number = number;
}
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.ArrayList;
import java.util.List;
@Data
@Accessors(chain = true)
public class AddPostDto implements ServerDto {
private List<LogDeviceOperation> operateHisList = new ArrayList<>();
private List<DeviceSensorData> list = new ArrayList<>();
public class AddPostDto extends ServerDto {
private JSONObject data;
@Override
public ServerAgreementContent getServerAgreementContent() {
return null;
}
@Override
public boolean isReplyMessage() {
return false;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return list;
public JSONObject getData() {
return data;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return operateHisList;
public void setData(JSONObject data) {
this.data = data;
}
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.ArrayList;
import java.util.List;
@Data
@Accessors(chain = true)
public class AllPostDto implements ServerDto {
private List<LogDeviceOperation> operateHisList = new ArrayList<>();
private List<DeviceSensorData> list = new ArrayList<>();
public class AllPostDto extends ServerDto {
private JSONObject data;
@Override
public ServerAgreementContent getServerAgreementContent() {
return null;
}
@Override
public boolean isReplyMessage() {
return false;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return list;
public JSONObject getData() {
return data;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return operateHisList;
public void setData(JSONObject data) {
this.data = data;
}
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
@Data
@Accessors(chain = true)
public class DbDistributeDto implements ServerDto {
@Override
public ServerAgreementContent getServerAgreementContent() {
return null;
}
@Override
public boolean isReplyMessage() {
return false;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return null;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return null;
}
}
package com.zhonglai.luhui.mqtt.dto.topic;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
@Data
@Accessors(chain = true)
public class GetDto implements ServerDto {
@Override
public ServerAgreementContent getServerAgreementContent() {
return null;
}
@Override
public boolean isReplyMessage() {
return false;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return null;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return null;
}
public class GetDto extends ServerDto {
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
@Data
@Accessors(chain = true)
public class GetReqDto implements ServerDto {
@Override
public ServerAgreementContent getServerAgreementContent() {
return null;
}
@Override
public boolean isReplyMessage() {
return false;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return null;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return null;
}
public class GetReqDto extends ServerDto {
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
@Data
@Accessors(chain = true)
public class OnlineDto implements ServerDto {
public class OnlineDto extends ServerDto {
private Integer state;
@Override
public ServerAgreementContent getServerAgreementContent() {
return null;
}
@Override
public boolean isReplyMessage() {
return false;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return null;
public Integer getState() {
return state;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return null;
public void setState(Integer state) {
this.state = state;
}
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
/**
* 服务器下发数据
*/
@Data
@Accessors(chain = true)
public class PutDto implements ServerDto {
public class PutDto extends ServerDto {
private Object data;
@Override
public ServerAgreementContent getServerAgreementContent() {
return null;
}
@Override
public boolean isReplyMessage() {
return false;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return null;
public Object getData() {
return data;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return null;
public void setData(Object data) {
this.data = data;
}
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.dto.Message;
import com.zhonglai.luhui.mqtt.dto.MessageCode;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
@Data
@Accessors(chain = true)
public class PutReqDto implements ServerDto {
public class PutReqDto extends ServerDto {
private Integer code;
private String data;
private String messageid;
@Override
public ServerAgreementContent getServerAgreementContent() {
PutReqDto putReqDto = this;
return new ServerAgreementContent() {
@Override
public byte[] getCommd() {
JSONObject jsonObject = JSONObject.parseObject(JSONObject.toJSONString(putReqDto));
jsonObject.remove("messageid");
return jsonObject.toJSONString().getBytes();
}
public Integer getCode() {
return code;
}
@Override
public String getReplyCommdTopic(Topic topic) {
return null;
}
public void setCode(Integer code) {
this.code = code;
}
@Override
public void setReplyMessage(Message message) {
message.setData(data);
message.setCode(MessageCode.DEFAULT_SUCCESS_CODE);
message.setMessage("成功");
switch (code)
{
case 0:
message.setCode(MessageCode.DEFAULT_FAIL_CODE);
message.setMessage("失败");
break;
}
}
};
public String getData() {
return data;
}
@Override
public boolean isReplyMessage() {
return true;
public void setData(String data) {
this.data = data;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return null;
public String getMessageid() {
return messageid;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return null;
public void setMessageid(String messageid) {
this.messageid = messageid;
}
}
... ...
package com.zhonglai.luhui.mqtt.dto.topic;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.dto.Message;
import com.zhonglai.luhui.mqtt.dto.MessageCode;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@Data
@Accessors(chain = true)
public class ReadReqDto implements ServerDto {
public class ReadReqDto extends ServerDto {
private Integer code;
private JSONObject data;
private String messageid;
private List<LogDeviceOperation> operateHisList = new ArrayList<>();
private List<DeviceSensorData> list = new ArrayList<>();
@Override
public ServerAgreementContent getServerAgreementContent() {
ReadReqDto readReqDto = this;
return new ServerAgreementContent() {
@Override
public byte[] getCommd() {
JSONObject jsonObject = JSONObject.parseObject(JSONObject.toJSONString(readReqDto));
jsonObject.remove("messageid");
return jsonObject.toJSONString().getBytes();
}
public Integer getCode() {
return code;
}
@Override
public String getReplyCommdTopic(Topic topic) {
return null;
}
public void setCode(Integer code) {
this.code = code;
}
@Override
public void setReplyMessage(Message message) {
if(null != readReqDto.data)
{
message.setData(JSONObject.parseObject(readReqDto.data.toJSONString(), HashMap.class));
}
message.setCode(MessageCode.DEFAULT_SUCCESS_CODE);
message.setMessage("成功");
switch (code)
{
case 0:
message.setCode(MessageCode.DEFAULT_FAIL_CODE);
message.setMessage("失败");
break;
}
}
};
public JSONObject getData() {
return data;
}
@Override
public boolean isReplyMessage() {
return true;
public void setData(JSONObject data) {
this.data = data;
}
@Override
public List<DeviceSensorData> getDeviceSensorData() {
return null;
public String getMessageid() {
return messageid;
}
@Override
public List<LogDeviceOperation> getOperationLog() {
return null;
public void setMessageid(String messageid) {
this.messageid = messageid;
}
}
... ...
package com.zhonglai.luhui.mqtt.service;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.system.domain.IotDevice;
import com.ruoyi.system.domain.IotTerminal;
import com.zhonglai.luhui.mqtt.comm.config.RedisConfig;
import com.zhonglai.luhui.mqtt.comm.dao.BaseDao;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.service.CacheService;
import com.zhonglai.luhui.mqtt.dto.topic.PutReqDto;
import com.zhonglai.luhui.mqtt.comm.service.redis.RedisService;
import com.zhonglai.luhui.mqtt.comm.util.DateUtils;
import com.zhonglai.luhui.mqtt.service.redis.RedisDeleteListener;
import com.zhonglai.luhui.mqtt.service.redis.RedisExpiredListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@Service
public class CacheServiceImpl implements CacheService {
private static final Logger log = LoggerFactory.getLogger(CacheServiceImpl.class);
@Autowired
private RedisService redisService ;
private BaseDao baseDao = new BaseDao();
@Override
public boolean updateCache(Topic topic, ServerDto serverDto) {
IotDevice iotDevice = serverDto.getIotDevice();
if(null == iotDevice)
{
return false;
}
setIotDeviceToRedis(iotDevice);
log.info("{} 缓存更新 {}",topic,iotDevice);
List<IotTerminal> list = serverDto.getIotTerminalList();
if(null != list && list.size() != 0 )
{
for (IotTerminal iotTerminal:list)
{
setIotTerminalToRedis(iotTerminal);
}
}
log.info("{} 缓存更新 {}",topic,list);
return true;
}
@Override
public IotDevice getIotDevice(String id) {
IotDevice deviceHost = getIotDeviceFromRedis(id);
if(null == deviceHost)
{
deviceHost = (IotDevice) baseDao.get(IotDevice.class,id);
if(null == deviceHost)
{
return null;
}
}
return deviceHost;
}
@Override
public IotTerminal getIotTerminal(String id) {
IotTerminal iotTerminal = getIotTerminalFromRedis(id);
if(null == iotTerminal)
{
iotTerminal = (IotTerminal) baseDao.get(IotTerminal.class,id);
if(null == iotTerminal)
{
String imei = id.split("_")[0];
IotDevice iotDevice = getIotDevice(id);
if(null == iotDevice)
{
return null;
}
int time = DateUtils.getNowTimeMilly();
iotTerminal = new IotTerminal();
iotTerminal.setId(id);
iotTerminal.setDevice_id(imei);
iotTerminal.setSensor_number(id.replace(imei+"_",""));
iotTerminal.setName(iotDevice.getMqtt_username()+"设备的第"+iotTerminal.getSensor_number()+"号终端");
iotTerminal.setUpdate_time(time);
iotTerminal.setProduct_id(iotDevice.getProduct_id());
iotTerminal.setMqtt_username(iotDevice.getMqtt_username());
iotTerminal.setData_update_time(time);
iotTerminal.setOnline(3);
iotTerminal.setCreate_time(time);
baseDao.insert(iotTerminal);
}
}
return iotTerminal;
}
/**
* 获取主机
* @param id
* @return
*/
public IotDevice getIotDeviceFromRedis(String id)
{
String key =getRedicDeviceKey(id);
if(redisService.hasKey(key))
{
Map<Object, Object> map = redisService.hmget(key);
IotDevice iotDevice = JSONObject.parseObject(JSONObject.toJSONString(map),IotDevice.class);
return iotDevice;
}
return null;
}
/**
* 获取终端
* @param id
* @return
*/
public IotTerminal getIotTerminalFromRedis(String id)
{
String key =getRedicTerminalKey(id);
if(redisService.hasKey(key))
{
Map<Object, Object> map = redisService.hmget(key);
IotTerminal iotTerminal = JSONObject.parseObject(JSONObject.toJSONString(map),IotTerminal.class);
return iotTerminal;
}
return null;
}
/**
* 设置缓存主机
* @param iotDevice
*/
private void setIotDeviceToRedis(IotDevice iotDevice)
{
redisService.hmset(getRedicDeviceKey(iotDevice.getClient_id()),iotDevice);
}
/**
* 设置缓存终端
* @param iotTerminal
*/
private void setIotTerminalToRedis(IotTerminal iotTerminal)
{
redisService.hmset(getRedicTerminalKey(iotTerminal.getId()),iotTerminal);
}
/**
* 删除缓存主机
* @param client_id
*/
public void deletRedisDevice(String client_id)
{
Set<String> keys = getRedicTerminalFromClientId(client_id);
if(null != keys && keys.size() != 0)
{
redisService.del(keys.toArray(new String[keys.size()]));
}
}
/**
* 删除缓存终端
* @param client_ids
*/
public void deletRedisTerminal(String... client_ids)
{
String[] keys = new String[client_ids.length];
for (int i=0;i<client_ids.length;i++)
{
keys[i] = getRedicTerminalKey(client_ids[i]);
}
redisService.del(keys);
}
/**
* 获取缓存主机key
* @param client_id
* @return
*/
private String getRedicDeviceKey(String client_id)
{
return getRedicDeviceKeyPath()+client_id;
}
/**
* 终端key集合规则
* @param client_id
* @return
*/
public Set<String> getRedicTerminalFromClientId(String client_id)
{
Set<String> keys = redisService.keys(RedisConfig.FIELD+RedisConfig.TERMINAL+client_id+"*");
return keys;
}
/**
* 主机kry路径
* @return
*/
private String getRedicDeviceKeyPath()
{
return RedisConfig.FIELD+RedisConfig.DEVICE;
}
/**
* 终端key
* @param terminal_id
* @return
*/
private String getRedicTerminalKey(String terminal_id)
{
return getRedicTerminalKeyPath()+terminal_id;
}
/**
* 终端key路径
* @return
*/
private String getRedicTerminalKeyPath()
{
return RedisConfig.FIELD+RedisConfig.TERMINAL;
}
private String keyspaceNotificationsConfigParameter = "KEA";
@Autowired
private RedisDeleteListener redisDeleteListener ;
@Autowired
private RedisExpiredListener redisExpiredListener ;
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisConnection connection = connectionFactory.getConnection();
Properties config = connection.info("notify-keyspace-events");
if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) {
connection.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter);
}
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//监听所有key的删除事件
container.addMessageListener(redisDeleteListener,redisDeleteListener.getTopic());
//监听所有key的过期事件
container.addMessageListener(redisExpiredListener,redisExpiredListener.getTopic());
return container;
}
}
... ...
package com.zhonglai.luhui.mqtt.service;
import com.ruoyi.system.domain.IotDevice;
import com.ruoyi.system.domain.IotTerminal;
import com.zhonglai.luhui.mqtt.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.mqtt.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
... ... @@ -23,20 +25,38 @@ public class DataPersistenceServiceImpl extends DataPersistenceService {
private DeviceLogService dviceLogService;
@Override
public void persistence(Topic topic, ServerDto serverDto) {
IotDevice iotDevice = serverDto.getIotDevice();
if(null == iotDevice)
{
return ;
}
log.info("更新网关数据{}",iotDevice);
baseDao.update(iotDevice);
List<IotTerminal> list = serverDto.getIotTerminalList();
if(null != list && list.size() != 0 )
{
for (IotTerminal iotTerminal:list)
{
log.info("更新终端数据{}",iotTerminal);
baseDao.update(iotTerminal);
}
}
//曲线数据入库
List<DeviceSensorData> dsdList = serverDto.getDeviceSensorData();
List<DeviceSensorData> dsdList = serverDto.getDeviceSensorDataList();
if(null != dsdList && dsdList.size() != 0)
{
dviceLogService.saveDeviceSensorDataLog(dsdList);
}
//日志入库
List<LogDeviceOperation> doList = serverDto.getOperationLog();
List<LogDeviceOperation> doList = serverDto.getLogDeviceOperationList();
if(null != doList && doList.size() != 0)
{
dviceLogService.saveOperationLog(doList);
}
}
@Override
... ...
package com.zhonglai.luhui.mqtt.service.db;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.system.domain.IotDevice;
import com.ruoyi.system.domain.IotTerminal;
import com.ruoyi.system.domain.IotThingsModel;
import com.zhonglai.luhui.mqtt.comm.config.RedisConfig;
import com.zhonglai.luhui.mqtt.comm.dao.BaseDao;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelBase;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelDataTypeEnum;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.service.ClienNoticeService;
import com.zhonglai.luhui.mqtt.comm.service.redis.RedisService;
import com.zhonglai.luhui.mqtt.comm.util.DateUtils;
import com.zhonglai.luhui.mqtt.dto.Message;
import com.zhonglai.luhui.mqtt.dto.MessageCode;
import com.zhonglai.luhui.mqtt.service.CacheServiceImpl;
import com.zhonglai.luhui.mqtt.service.db.mode.TerminalDataThingsModeService;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Service
public class DeviceService {
@Autowired
private ClienNoticeService clienNoticeService;
@Autowired
private RedisService redisService ;
private TerminalDataThingsModeService terminalDataThingsModeService;
@Autowired
private CacheServiceImpl cacheServiceImpl;
private BaseDao baseDao = new BaseDao();
/**
... ... @@ -51,146 +75,251 @@ public class DeviceService {
/**
* 获取缓存网关信息
* @param id
* 增量更新数据
* @param oldstr
* @param saveJson
* @return
*/
public IotDevice getRedicDevice(String id)
public JSONObject getNewAdddate(String oldstr, JSONObject saveJson)
{
Object object = redisService.get(getRedicDeviceKey(id));
if(null != object)
JSONObject oldjs = new JSONObject();
if(StringUtils.isNoneBlank(oldstr))
{
oldjs = JSONObject.parseObject(oldstr);
}
for (String sk:saveJson.keySet())
{
return (IotDevice)object;
}else{
IotDevice iotDevice = getDeviceById(id);
setRedicDevice(iotDevice);
return iotDevice;
oldjs.put(sk,saveJson.get(sk));
}
return oldjs;
}
public void updataDevice(IotDevice iotDevice)
public void deviceOffLine(String clientId)
{
setRedicDevice(iotDevice);
cacheServiceImpl.deletRedisDevice(clientId);
IotDevice iotDevice = new IotDevice();
iotDevice.setClient_id(clientId);
iotDevice.setStatus(4);
baseDao.update(iotDevice,"client_id");
Set<String> keys = cacheServiceImpl.getRedicTerminalFromClientId(clientId);
if(null != keys && keys.size() != 0)
{
terminalOffLine(keys.toArray(new String[keys.size()]));
}
}
public void terminalOffLine(String... terminalIds)
{
baseDao.updateBySql("update iot_terminal set online=0 where id in(?)",Arrays.asList(terminalIds));
cacheServiceImpl.deletRedisTerminal(terminalIds);
}
/**
* 设置缓存网关信息
* @param device
* 控制发16进制指令
* @param clienid
* @param data
* @return
* @throws MqttException
* @throws InterruptedException
*/
private boolean setRedicDevice(IotDevice device)
{
System.out.println("更新 "+device.getClient_id()+" 缓存,生命周期为 "+device.getDevice_life());
return redisService.setexDevice(getRedicDeviceKey(device.getClient_id()),device.getDevice_life(),device);
}
public Message controlHex(String clienid, String data) throws MqttException, InterruptedException {
public String getRedicDeviceKey(String client_id)
{
return getRedicDeviceKeyPath()+client_id;
}
Topic topic = getTopicFromDb(clienid);
if(null == topic)
{
return new Message(MessageCode.DEFAULT_FAIL_CODE,"mqtt_username查询失败");
}
topic.setTopicType("PUT");
topic.setMessageid(DateUtils.getNowTimeMilly()+"");
public String getRedicTerminalFromClientId(String client_id)
{
return RedisConfig.FIELD+"*:"+client_id+"*";
}
MqttMessage mqttMessage = new MqttMessage();
byte[] bs = hexStringToByte(data.trim().toUpperCase());
mqttMessage.setPayload(bs);
public String getRedicDeviceKeyPath()
{
return RedisConfig.FIELD+RedisConfig.DEVICE;
Message message = clienNoticeService.sendMessage(topic,mqttMessage);
return message;
}
/**
* 获取缓存终端信息
* @param id
* 读
* @param clienid
* @param map
* @return
* @throws MqttException
* @throws InterruptedException
*/
public IotTerminal getRedicTerminal(String id)
{
Object object = redisService.get(getRedicTerminalKey(id));
if(null == object)
public Message read(String clienid,Map<String,Object> map) throws MqttException, InterruptedException {
if(null == map || map.size() ==0)
{
IotTerminal terminal = getTerminalById(id);
return new Message(MessageCode.DEFAULT_FAIL_CODE,"参数验证失败");
}
if(null == terminal)
{
return null;
}else{
return terminal;
}
Topic topic = getTopicFromDb(clienid);
if(null == topic)
{
return new Message(MessageCode.DEFAULT_FAIL_CODE,"mqtt_username查询失败");
}
return (IotTerminal)object;
topic.setTopicType("READ");
topic.setMessageid(DateUtils.getNowTimeMilly()+"");
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(JSON.toJSONString(map).trim().getBytes());
Message message = clienNoticeService.sendMessage(topic,mqttMessage);
return message;
}
/**
* 设置缓存终端信息
* @param terminal
* 强行断开链接
* @param clienid
* @return
* @throws MqttException
* @throws InterruptedException
*/
private boolean setRedicTerminal(IotTerminal terminal,long device_life)
{
return redisService.setexDevice(getRedicTerminalKey(terminal.getId()),device_life,terminal);
}
public Message closeSession(String clienid) throws MqttException, InterruptedException {
public String getRedicTerminalKey(String terminal_id)
{
return getRedicTerminalKeyPath()+terminal_id;
MqttMessage mqttMessage = new MqttMessage();
byte[] bs = hexStringToByte(clienid.trim().toUpperCase());
mqttMessage.setPayload(bs);
clienNoticeService.sendMessage("CLOSE",mqttMessage);
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"端口请求已发送");
}
public String getRedicTerminalKeyPath()
{
return RedisConfig.FIELD+RedisConfig.TERMINAL;
/**
* 删除主机
* @param client_id
* @return
* @throws MqttException
* @throws InterruptedException
*/
public Message delIotDevice(String client_id) throws MqttException, InterruptedException {
closeSession(client_id); //强制下线
cacheServiceImpl.deletRedisDevice(client_id);
return new Message(MessageCode.DEFAULT_SUCCESS_CODE);
}
public void updataTerminal(IotTerminal terminal,long device_life)
{
setRedicTerminal(terminal,device_life);
baseDao.saveOrUpdateObject(terminal);
/**
* 删除终端
* @param client_id
* @param number
* @return
* @throws MqttException
* @throws InterruptedException
*/
public Message delIotTerminal(String client_id,String number) throws MqttException, InterruptedException {
closeSession(client_id); //强制下线
cacheServiceImpl.deletRedisTerminal(client_id+"_"+number);
return new Message(MessageCode.DEFAULT_SUCCESS_CODE);
}
/**
* 增量更新数据
* @param oldstr
* @param saveJson
* 控制发json
* @param clienid
* @param map
* @return
* @throws MqttException
* @throws InterruptedException
*/
public JSONObject getNewAdddate(String oldstr, JSONObject saveJson)
{
JSONObject oldjs = new JSONObject();
if(StringUtils.isNoneBlank(oldstr))
public Message control(String clienid,Map<String,Object> map) throws MqttException, InterruptedException {
if(null == map || map.size() ==0)
{
oldjs = JSONObject.parseObject(oldstr);
return new Message(MessageCode.DEFAULT_FAIL_CODE,"参数验证失败");
}
for (String sk:saveJson.keySet())
Topic topic = getTopicFromDb(clienid);
if(null == topic)
{
oldjs.put(sk,saveJson.get(sk));
return new Message(MessageCode.DEFAULT_FAIL_CODE,"mqtt_username查询失败");
}
return oldjs;
topic.setTopicType("PUT");
topic.setMessageid(DateUtils.getNowTimeMilly()+"");
for(String key:map.keySet())
{
Object sendMap = map.get(key);
JSONObject jsonObject = JSONObject.parseObject(JSONObject.toJSONString(sendMap));
for(String skey:jsonObject.keySet())
{
IotThingsModel thingsModel = terminalDataThingsModeService.getIotThingsModel(topic.getUsername(),skey);
if(null == thingsModel) //没有配置的 都按字符串处理
{
thingsModel = new IotThingsModel();
thingsModel.setData_type(ThingsModelDataTypeEnum.STRING.name());
thingsModel.setIdentifier(key);
thingsModel.setModel_name(key);
thingsModel.setIs_top(0);
thingsModel.setIs_monitor(0);
thingsModel.setIs_save_log(0);
thingsModel.setIs_config(0);
JSONObject spes = new JSONObject();
spes.put("maxLength",255);
thingsModel.setSpecs(spes.toString());
}
String data_type = thingsModel.getData_type().toUpperCase();
Class<ThingsModelBase> aClass = Enum.valueOf(ThingsModelDataTypeEnum.class,data_type).getaClass();
ThingsModelBase thingsModelBase = JSON.parseObject(thingsModel.getSpecs(),aClass);
jsonObject.put(skey,thingsModelBase.getCmdView(jsonObject.get(skey)));
}
map.put(key,jsonObject);
}
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(JSON.toJSONString(map).trim().getBytes());
Message message = clienNoticeService.sendMessage(topic,mqttMessage);
return message;
}
/**
* 删除主机
* @param client_id
* 获取指定设备版本信息
* @param app_type
* @return
*/
public void deletRedisDevice(String client_id)
public Message getFirmwareVersion(String app_type)
{
Set<String> keys = redisService.keys(getRedicTerminalFromClientId(client_id));
if(null != keys && keys.size() != 0)
{
redisService.del(keys.toArray(new String[keys.size()]));
}
List list = baseDao.findListBysql("SELECT md5str,upload_file_path uploadFilePath,version_number versionNumber,code FROM liu_yu_le.`app_file_upgrade` WHERE app_type='"+app_type+"' ORDER BY id DESC limit 5");
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,list);
}
/**
* 删除终端
* @param client_ids
* 通过数据获取发送消息的topic
* @param clienid
* @return
*/
public void deletRedisTerminal(String... client_ids)
private Topic getTopicFromDb(String clienid)
{
String[] keys = new String[client_ids.length];
for (int i=0;i<client_ids.length;i++)
JSONArray jsonArray = baseDao.findBysql("SELECT b.`role_id` roleid,b.`mqtt_username` username,a.`client_id` clientid,a.`payload_type` payloadtype FROM `iot_device` a LEFT JOIN `iot_product` b ON a.`product_id`=b.`id` WHERE client_id='"+clienid+"'");
if(null == jsonArray || jsonArray.size()==0 || null == jsonArray.getJSONObject(0).get("username"))
{
keys[i] = getRedicTerminalKey(client_ids[i]);
return null;
}
redisService.del(keys);
Topic topic = JSON.parseObject( jsonArray.getJSONObject(0).toJSONString(),Topic.class);
topic.setTopicType("PUT");
topic.setMessageid(DateUtils.getNowTimeMilly()+"");
return topic;
}
/**
* 把16进制字符串转换成字节数组
*
* @param hex
* @return
*/
private byte[] hexStringToByte(String hex) {
int len = (hex.length() / 2);
byte[] result = new byte[len];
char[] achar = hex.toCharArray();
for (int i = 0; i < len; i++) {
int pos = i * 2;
result[i] = (byte) (toByte(achar[pos]) << 4 | toByte(achar[pos + 1]));
}
return result;
}
private byte toByte(char c) {
byte b = (byte) "0123456789ABCDEF".indexOf(c);
return b;
}
}
... ...
package com.zhonglai.luhui.mqtt.service.db.mode;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.system.domain.IotThingsModel;
import com.zhonglai.luhui.mqtt.comm.service.redis.RedisService;
import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -50,10 +51,10 @@ public class TerminalDataThingsModeService {
{
if(null != iotThingsModels && iotThingsModels.size() !=0)
{
Map<String,Map<Object,Object>> all = new HashMap<>();
Map<String,Map<String,IotThingsModel>> all = new HashMap<>();
for(IotThingsModel iotThingsModel:iotThingsModels)
{
Map<Object,Object> map = all.get(iotThingsModel.getMqtt_username());
Map<String,IotThingsModel> map = all.get(iotThingsModel.getMqtt_username());
if(null == map)
{
map = new HashMap<>();
... ... @@ -81,7 +82,7 @@ public class TerminalDataThingsModeService {
{
if(redisService.hHashKey(deviceTypeKeyPath+mqtt_username,identifier))
{
return (IotThingsModel) redisService.hget(deviceTypeKeyPath+mqtt_username,identifier);
return JSONObject.parseObject(JSONObject.toJSONString(redisService.hget(deviceTypeKeyPath+mqtt_username,identifier)),IotThingsModel.class);
}
return null;
}
... ...
package com.zhonglai.luhui.mqtt.service.redis;
import com.zhonglai.luhui.mqtt.comm.service.BusinessDataUpdateService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.stereotype.Component;
@Component
public class RedisDeleteListener implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(RedisDeleteListener.class);
//监听的主题
private final PatternTopic topic = new PatternTopic("__keyevent@1__:del"); //#是对db2数据库,key前缀为order所有键的键空间通知
@Override
public void onMessage(Message message, byte[] pattern){
String topic = new String(pattern);
String msg = new String(message.getBody());
logger.info("收到key删除,消息主题是:"+ topic+",消息内容是:"+msg);
}
public PatternTopic getTopic() {
return topic;
}
}
... ...
package com.zhonglai.luhui.mqtt.service.redis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.stereotype.Component;
@Component
public class RedisExpiredListener implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(RedisExpiredListener.class);
//监听的主题
private final PatternTopic topic = new PatternTopic("__keyevent@1__:expired");
@Override
public void onMessage(Message message, byte[] pattern){
String topic = new String(pattern);
String msg = new String(message.getBody());
logger.info("收到key失效,消息主题是:"+ topic+",消息内容是:"+msg);
}
public PatternTopic getTopic() {
return topic;
}
}
\ No newline at end of file
... ...
... ... @@ -10,6 +10,7 @@ import com.zhonglai.luhui.mqtt.dto.topic.AddPostDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
/**
... ... @@ -21,14 +22,18 @@ public class AddPostTopic implements BusinessAgreement<AddPostDto> {
private BusinessDataUpdateService businessDataUpdateService ;
@Override
public ServerDto analysis(Topic topic, AddPostDto data) {
businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ADD,topic,data.getData(),true,data.getOperateHisList(),data.getList());
businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ADD,topic,true,data);
return data;
}
@Override
public ServerDto toData(BusinessDto data) {
AddPostDto serverDto = new AddPostDto();
return serverDto.setData((JSONObject) data.getContentData());
serverDto.setData((JSONObject) data.getContentData());
serverDto.setIotTerminalList(new ArrayList<>());
serverDto.setDeviceSensorDataList(new ArrayList<>());
serverDto.setLogDeviceOperationList(new ArrayList<>());
return serverDto;
}
}
... ...
... ... @@ -11,6 +11,8 @@ import com.zhonglai.luhui.mqtt.dto.topic.AllPostDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
/**
* 全量上报数据,不需要返回
... ... @@ -21,20 +23,26 @@ public class AllPostTopic implements BusinessAgreement<AllPostDto> {
private BusinessDataUpdateService businessDataUpdateService ;
@Override
public ServerDto analysis(Topic topic, AllPostDto data) throws Exception {
businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ALL,topic,data.getData(),false,data.getOperateHisList(),data.getList());
businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ALL,topic,false,data);
return data;
}
@Override
public AllPostDto toData(BusinessDto data) {
AllPostDto serverDto = new AllPostDto();
serverDto.setIotTerminalList(new ArrayList<>());
serverDto.setDeviceSensorDataList(new ArrayList<>());
serverDto.setLogDeviceOperationList(new ArrayList<>());
if(data.getContentData() instanceof JSONObject)
{
return serverDto.setData((JSONObject) data.getContentData());
serverDto.setData((JSONObject) data.getContentData());
return serverDto;
}else if(data.getContentData() instanceof byte[])
{
return serverDto.setData(JSONObject.parseObject(new String((byte[]) data.getContentData())));
serverDto.setData(JSONObject.parseObject(new String((byte[]) data.getContentData())));
return serverDto;
}
return null;
}
}
... ...
package com.zhonglai.luhui.mqtt.service.topic;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.dto.topic.DbDistributeDto;
import org.springframework.stereotype.Service;
/**
* 数据库topic分发
*/
@Service("DB_TOPIC_DISTRIBUTE")
public class DbDistributeTopic implements BusinessAgreement<DbDistributeDto> {
@Override
public ServerDto analysis(Topic topic, DbDistributeDto data) throws Exception {
return null;
}
@Override
public DbDistributeDto toData(BusinessDto data) {
return null;
}
}
package com.zhonglai.luhui.mqtt.service.topic;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.TerminalClientRePlyDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.service.ClienNoticeService;
import com.zhonglai.luhui.mqtt.dto.topic.AllPostDto;
import com.zhonglai.luhui.mqtt.dto.topic.GetDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
... ... @@ -21,6 +20,18 @@ public class GetTopic implements BusinessAgreement<GetDto> {
@Override
public ServerDto analysis(Topic topic, GetDto data) throws Exception {
//回复客户端消息
clienNoticeService.replyTerminalMessage(topic, new TerminalClientRePlyDto() {
@Override
public byte[] getCommd() {
return new byte[0];
}
@Override
public String getReplyCommdTopic(Topic topic) {
return null;
}
});
return null;
}
... ... @@ -30,15 +41,5 @@ public class GetTopic implements BusinessAgreement<GetDto> {
return serverDto;
}
/**
* 回复客户端消息
* @param topic
* @param dto
*/
private void replySendMessage(Topic topic,ServerDto dto)
{
//回复客户端消息
clienNoticeService.replySendMessage(topic,dto);
}
}
... ...
... ... @@ -5,13 +5,23 @@ import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.dto.topic.OnlineDto;
import com.zhonglai.luhui.mqtt.service.db.DeviceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("ONLINE")
public class OnlineTopic implements BusinessAgreement<OnlineDto> {
@Autowired
private DeviceService deviceService ;
@Override
public ServerDto analysis(Topic topic, OnlineDto data) throws Exception {
if(1==data.getState()) //在线
{
}else{ //离线
deviceService.deviceOffLine(topic.getClientid());
}
return data;
}
... ...
... ... @@ -6,9 +6,8 @@ import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.service.ClienNoticeService;
import com.zhonglai.luhui.mqtt.dto.topic.AllPostDto;
import com.zhonglai.luhui.mqtt.dto.MessageCode;
import com.zhonglai.luhui.mqtt.dto.topic.PutReqDto;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
... ... @@ -17,14 +16,24 @@ import org.springframework.stereotype.Service;
*/
@Service("PUT_REQ")
public class PutReqTopic implements BusinessAgreement<PutReqDto> {
@Autowired
private ClienNoticeService clienNoticeService; //客户端通知服务
@Override
public ServerDto analysis(Topic topic, PutReqDto data) throws Exception {
data.setMessageid(topic.getMessageid());
clienNoticeService.replySendMessage(topic,data);
clienNoticeService.replySendMessage(topic, message -> {
message.setData(data.getData());
message.setCode(MessageCode.DEFAULT_SUCCESS_CODE);
message.setMessage("成功");
switch (data.getCode())
{
case 0:
message.setCode(MessageCode.DEFAULT_FAIL_CODE);
message.setMessage("失败");
break;
}
});
return null;
}
... ...
... ... @@ -3,6 +3,7 @@ package com.zhonglai.luhui.mqtt.service.topic;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.system.domain.IotThingsModel;
import com.zhonglai.luhui.mqtt.comm.dto.ApiClientRePlyDto;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.dto.business.BusinessDto;
import com.zhonglai.luhui.mqtt.comm.dto.thingsmodels.ThingsModelBase;
... ... @@ -11,12 +12,16 @@ import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.service.BusinessDataUpdateService;
import com.zhonglai.luhui.mqtt.comm.service.ClienNoticeService;
import com.zhonglai.luhui.mqtt.dto.Message;
import com.zhonglai.luhui.mqtt.dto.MessageCode;
import com.zhonglai.luhui.mqtt.dto.topic.ReadReqDto;
import com.zhonglai.luhui.mqtt.service.db.mode.TerminalDataThingsModeService;
import org.apache.commons.lang3.EnumUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
@Service("READ_REQ")
public class ReadReqTopic implements BusinessAgreement<ReadReqDto> {
@Autowired
... ... @@ -24,62 +29,73 @@ public class ReadReqTopic implements BusinessAgreement<ReadReqDto> {
@Autowired
private BusinessDataUpdateService businessDataUpdateService ;
@Autowired
private TerminalDataThingsModeService terminalDataThingsModeService;
@Override
public ServerDto analysis(Topic topic, ReadReqDto data) {
if(1==data.getCode())
{
JSONObject vjsonObject = data.getData().clone();
businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ADD,topic,data.getData(),true,data.getOperateHisList(),data.getList());
// JSONObject vjsonObject = data.getData().clone();
businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ADD,topic,true,data);
if(null != vjsonObject && vjsonObject.size() !=0 )
{
for(String vkey:vjsonObject.keySet())
// if(null != vjsonObject && vjsonObject.size() !=0 )
// {
// for(String vkey:vjsonObject.keySet())
// {
// JSONObject jsData = vjsonObject.getJSONObject(vkey);
// for(String key:jsData.keySet())
// {
// IotThingsModel thingsModel = terminalDataThingsModeService.getIotThingsModel(topic.getUsername(),key);
// if(null == thingsModel) //没有配置的 都按字符串处理
// {
// thingsModel = new IotThingsModel();
// thingsModel.setData_type(ThingsModelDataTypeEnum.STRING.name());
// thingsModel.setIdentifier(key);
// thingsModel.setModel_name(key);
// thingsModel.setIs_top(0);
// thingsModel.setIs_monitor(0);
// thingsModel.setIs_save_log(0);
// thingsModel.setIs_config(0);
// JSONObject jsonObject = new JSONObject();
// jsonObject.put("maxLength",255);
// thingsModel.setSpecs(jsonObject.toString());
// }
// String data_type = thingsModel.getData_type().toUpperCase();
// if(!EnumUtils.isValidEnum(ThingsModelDataTypeEnum.class,data_type))
// {
// data_type = ThingsModelDataTypeEnum.STRING.name();
// }
// Class<ThingsModelBase> aClass = Enum.valueOf(ThingsModelDataTypeEnum.class,data_type).getaClass();
// ThingsModelBase thingsModelBase = JSON.parseObject(thingsModel.getSpecs(),aClass);
// thingsModelBase.conversionThingsModel(thingsModel);
//
// thingsModelBase.addValue(jsData.get(key));
// jsData.put(key,thingsModelBase);
// }
// vjsonObject.put(vkey,jsData);
// }
// data.setData(vjsonObject);
// }
clienNoticeService.replySendMessage(topic, message -> {
if(null != data.getData())
{
JSONObject jsData = vjsonObject.getJSONObject(vkey);
for(String key:jsData.keySet())
{
IotThingsModel thingsModel = terminalDataThingsModeService.getIotThingsModel(topic.getUsername(),key);
if(null == thingsModel) //没有配置的 都按字符串处理
{
thingsModel = new IotThingsModel();
thingsModel.setData_type(ThingsModelDataTypeEnum.STRING.name());
thingsModel.setIdentifier(key);
thingsModel.setModel_name(key);
thingsModel.setIs_top(0);
thingsModel.setIs_monitor(0);
thingsModel.setIs_save_log(0);
thingsModel.setIs_config(0);
JSONObject jsonObject = new JSONObject();
jsonObject.put("maxLength",255);
thingsModel.setSpecs(jsonObject.toString());
}
String data_type = thingsModel.getData_type().toUpperCase();
if(!EnumUtils.isValidEnum(ThingsModelDataTypeEnum.class,data_type))
{
data_type = ThingsModelDataTypeEnum.STRING.name();
}
Class<ThingsModelBase> aClass = Enum.valueOf(ThingsModelDataTypeEnum.class,data_type).getaClass();
ThingsModelBase thingsModelBase = JSON.parseObject(thingsModel.getSpecs(),aClass);
thingsModelBase.conversionThingsModel(thingsModel);
thingsModelBase.addValue(jsData.get(key));
jsData.put(key,thingsModelBase);
}
vjsonObject.put(vkey,jsData);
message.setData(JSONObject.parseObject(data.getData().toJSONString(), HashMap.class));
}
data.setData(vjsonObject);
}
clienNoticeService.replySendMessage(topic,data);
message.setCode(MessageCode.DEFAULT_SUCCESS_CODE);
message.setMessage("成功");
});
return data;
}else if(0==data.getCode())
{
clienNoticeService.replySendMessage(topic,data);
clienNoticeService.replySendMessage(topic, message -> {
if(null != data.getData())
{
message.setData(JSONObject.parseObject(data.getData().toJSONString(), HashMap.class));
}
message.setCode(MessageCode.DEFAULT_FAIL_CODE);
message.setMessage("失败");
});
return data;
}
return null;
return data;
}
@Override
... ...
... ... @@ -44,7 +44,7 @@ mqtt:
#唯一标识
clientId: ${random.uuid}
#订阅的topic
topics: "/+/+/+/+/ADD_POST,/+/+/+/+/ALL_POST,/+/+/+/+/DB_TOPIC_DISTRIBUTE,/+/+/+/+/GET/+,/+/+/+/+/online,/+/+/+/+/PUT_REQ/+,/+/+/+/+/READ_REQ/+"
topics: "/2/6_WP/+/+/ADD_POST,/2/6_WP/+/+/ALL_POST,/2/6_WP/+/+/DB_TOPIC_DISTRIBUTE,/2/6_WP/+/+/GET/+,/2/6_WP/+/+/online,/2/6_WP/+/+/PUT_REQ/+,/2/6_WP/+/+/READ_REQ/+"
topicconfig: "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/{{messageid}}"
top_return_map: '{"PUT":"PUT_REQ","READ":"READ_REQ"}'
username: sysuser
... ... @@ -61,5 +61,10 @@ mqtt:
sys:
redis:
field: "luhui:mqttservice:device:"
field: "lh:mqttservice:"
isText: false
#rocketmq配置信息
rocketmq:
#nameservice服务器地址(多个以英文逗号隔开)
name-server: 47.115.144.179:9876
\ No newline at end of file
... ...
... ... @@ -330,6 +330,12 @@
<artifactId>jedis</artifactId>
<version>3.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
... ...
... ... @@ -107,7 +107,7 @@ public class LoginService {
*/
public String yu2leTokenLogin(String token) {
Yu2leLoginToken loginToken = new Yu2leLoginToken(token);
Yu2leUserLogin userInfo = publicService.getObjectForTableName(Yu2leUserLogin.class,"id",loginToken.getUserId()+"","`liu_yu_le`.`user_login`");
Yu2leUserLogin userInfo = publicService.getObjectForTableName(Yu2leUserLogin.class,"user_id",loginToken.getUserId()+"","`liu_yu_le`.`user_login`");
return apiLoginByPass(userInfo.getLoginName(),DESUtil.decode(userInfo.getLoginPass(),userInfo.getUserLoginPassKey()));
}
... ...