作者 钟来

初始提交

正在显示 35 个修改的文件 包含 1842 行增加11 行删除
... ... @@ -2,6 +2,7 @@
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile default="true" name="Default" enabled="true" />
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
... ...
此 diff 太大无法显示。
... ... @@ -12,23 +12,71 @@
<artifactId>lh-mqtt-service</artifactId>
<dependencies>
<!-- 通用工具-->
<dependency>
<groupId>com.luhui.lyl.device.service</groupId>
<artifactId>ly-device-mqtt-comm</artifactId>
<version>${project.parent.version}</version>
<groupId>com.zhonglai.luhui</groupId>
<artifactId>ruoyi-common</artifactId>
</dependency>
<!-- 文档 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>${swagger.version}</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>**</artifactId>
<groupId>io.swagger</groupId>
<artifactId>swagger-models</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--https://mvnrepository.com/artifact/io.swagger/swagger-models-->
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-models</artifactId>
<version>${swagger-models.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>${swagger.version}</version>
</dependency>
<!--&lt;!&ndash; https://mvnrepository.com/artifact/com.github.xiaoymin/swagger-bootstrap-ui &ndash;&gt;-->
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>swagger-bootstrap-ui</artifactId>
<version>${swagger-ui.version}</version>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper</artifactId>
<!-- 建议使用最新版本,最新版本请从项目首页查找 -->
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper-spring-boot-starter</artifactId>
</dependency>
<!-- mqtt -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.jodah</groupId>
<artifactId>expiringmap</artifactId>
</dependency>
</dependencies>
<build>
... ...
package com.zhonglai.luhui.mqtt.comm.agreement;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreementFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 业务协议工厂实现
*/
@Component
public class BusinessAgreementFactoryImpl implements BusinessAgreementFactory {
@Autowired
private Map<String, BusinessAgreement> businessAgreementMap;
@Override
public BusinessAgreement createBusinessAgreement(String topicType) {
BusinessAgreement businessAgreement = businessAgreementMap.get(topicType.toUpperCase());
if(null == businessAgreement) //没有找到就用默认的
{
businessAgreementMap.get("DEFAULT");
}
return businessAgreement;
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.agreement;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.factory.BusinessAgreement;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.nio.charset.Charset;
@Service("DEFAULT")
public class DefaultAgreement implements BusinessAgreement<byte[]> {
private static final Logger log = LoggerFactory.getLogger(DefaultAgreement.class);
@Override
public ServerDto analysis(Topic topic, byte[] data) throws Exception {
log.info("未知的topic{} payload{}",topic, new String(data, Charset.forName("UTF-8")));
return null;
}
@Override
public byte[] toData(byte[] data) {
return data;
}
}
\ No newline at end of file
... ...
package com.zhonglai.luhui.mqtt.comm.clien;
import com.ruoyi.common.core.domain.Message;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
/**
* 客户端链接
*/
public interface ClienConnection {
public void close();
/**
* 回复
* @param agreementContent
*/
public void reply(ServerAgreementContent agreementContent);
/**
* 回复
*/
public Message getReplyMessage();
}
... ...
package com.zhonglai.luhui.mqtt.comm.clien.impl;
import com.ruoyi.common.core.domain.Message;
import com.ruoyi.common.core.domain.MessageCode;
import com.ruoyi.common.core.domain.MessageCodeType;
import com.zhonglai.luhui.mqtt.comm.clien.ClienConnection;
import com.zhonglai.luhui.mqtt.comm.dto.ServerAgreementContent;
public class ClienConnectionImpl implements ClienConnection {
private Message message = new Message();
@Override
public void close() {
this.message.setCode(MessageCode.DEFAULT_FAIL_CODE);
this.message.setMessage("链接超时关闭");
this.notify();
}
@Override
public void reply(ServerAgreementContent agreementContent) {
agreementContent.setReplyMessage(message);
this.notify();
}
@Override
public Message getReplyMessage() {
return message;
}
public ClienConnectionImpl setCode(MessageCodeType code)
{
this.message.setCode(code);
return this;
}
public ClienConnectionImpl setData(Object data)
{
this.message.setData(data);
return this;
}
public ClienConnectionImpl setMessage(String message)
{
this.message.setMessage(message);
return this;
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.config;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
@Aspect
@Component
public class ControllerLogAspect {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Pointcut("execution(public * com.zhonglai.luhui.mqtt.*.controller..*.*(..))")
public void controllerLog() {
}
@Before("controllerLog()")
public void doBefore(JoinPoint joinPoint)
{
// 接收到请求,记录请求内容
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
Enumeration<String> hs = request.getHeaderNames();
Map<String,Object> map = new HashMap<>();
while (hs.hasMoreElements())
{
String key= hs.nextElement();
map.put(key,request.getHeader(key));
}
logger.info("请求url:{},请求head:{},请求IP:{},请求方法:{},请求参数:{}",request.getRequestURL(),map,request.getRemoteAddr(),joinPoint.getSignature().getDeclaringTypeName() + "." + joinPoint.getSignature().getName(),joinPoint.getArgs());
}
@AfterReturning(returning = "ret", pointcut = "controllerLog()")
public void doAfterReturning(Object ret) {
// 处理完请求,返回内容
logger.info("返回 {} " , GsonConstructor.get().toJson(ret));
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import org.springframework.web.filter.CorsFilter;
/**
* 跨域配置,需要和WebMvcConfigurerAdapter的addCorsMappings配合使用
*/
@Configuration
public class CorsConfig {
private CorsConfiguration buildConfig() {
CorsConfiguration corsConfiguration = new CorsConfiguration();
corsConfiguration.addAllowedOrigin("*"); // 1允许任何域名使用
corsConfiguration.addAllowedHeader("*"); // 2允许任何头
corsConfiguration.addAllowedMethod("*"); // 3允许任何方法(post、get等)
return corsConfiguration;
}
@Bean
public CorsFilter corsFilter() {
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
source.registerCorsConfiguration("/**", buildConfig()); // 4
return new CorsFilter(source);
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Value("${sys.redis.field:#{'luhui:mqttservice:device:'}")
public static String FIELD ; //域
@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
//设置工厂链接
redisTemplate.setConnectionFactory(redisConnectionFactory);
//设置自定义序列化方式
setSerializeConfig(redisTemplate, redisConnectionFactory);
return redisTemplate;
}
private void setSerializeConfig(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory redisConnectionFactory) {
//对字符串采取普通的序列化方式 适用于key 因为我们一般采取简单字符串作为key
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//普通的string类型的key采用 普通序列化方式
redisTemplate.setKeySerializer(stringRedisSerializer);
//普通hash类型的key也使用 普通序列化方式
redisTemplate.setHashKeySerializer(stringRedisSerializer);
//解决查询缓存转换异常的问题 大家不能理解就直接用就可以了 这是springboot自带的jackson序列化类,但是会有一定问题
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
//普通的值采用jackson方式自动序列化
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
//hash类型的值也采用jackson方式序列化
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
//属性设置完成afterPropertiesSet就会被调用,可以对设置不成功的做一些默认处理
redisTemplate.afterPropertiesSet();
}
/**
* redis消息监听
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer messageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(new MessageListenerAdapter(), new PatternTopic(getRedisKeyPath()+"**"));
return container;
}
public static String getRedisKeyPath()
{
return FIELD+ RedisKeyMqttUser.ROLEID+":"+ RedisKeyMqttUser.USERNAME+":";
}
}
\ No newline at end of file
... ...
package com.zhonglai.luhui.mqtt.comm.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class RedisKeyMqttUser {
public static String ROLEID;
public static String USERNAME;
@Value("${mqtt.redis.key.roleid}")
public void setROLEID(String roleid) {
ROLEID = roleid;
}
@Value("${mqtt.redis.key.username}")
public void setUSERNAME(String username) {
USERNAME = username;
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.config;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.utils.http.HttpUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class SysParameter {
private static Logger log = LoggerFactory.getLogger(SysParameter.class);
public static String service_ip = ""; //服务所在地址
@Value("${mqtt.topicconfig:/{{roleid}}/{{username}}/{{clientid}}/{{topicType}}/{{messageid}}}")
public String tempTopicconfig ; //topic 配置
public static String topicconfig ; //topic 配置
@PostConstruct
public static void init() {
String service_ip_url = "http://ly.userlogin.yu2le.com/ip";
JSONObject jsonObject = JSONObject.parseObject(HttpUtils.sendGet(service_ip_url));
service_ip = jsonObject.getString("data");
log.info("服务器地址:{}",service_ip);
}
public void inittopicconfig()
{
topicconfig = tempTopicconfig;
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto;
import com.zhonglai.luhui.mqtt.comm.config.SysParameter;
import lombok.Data;
@Data
public class DeviceInfoDto {
private String id; // varchar(50) NOT NULL COMMENT '设备编号,如:设备imei号_传感器编号',
private String deviceId; // varchar(50) NOT NULL COMMENT '设备IMEI号',
private String deviceName; // varchar(50) DEFAULT NULL COMMENT '设备名称',
private String sensorNumber; // varchar(10) NOT NULL DEFAULT '01' COMMENT '传感器编号,默认01',
private String deviceModel; // varchar(10) NOT NULL COMMENT '设备型号,(3,5,6)',
private String deviceType; // varchar(10) DEFAULT NULL COMMENT '设备类型(0传感器,1开关控制器)',
private String online; // varchar(10) DEFAULT NULL COMMENT '设备在线状态(00-不在线,01-在线)',
private Integer createTime; // int(11) DEFAULT NULL COMMENT '添加时间',
private String alarmCode; // varchar(10) DEFAULT NULL COMMENT '设备告警状态',
private Integer dataUpdateTime; // int(11) DEFAULT NULL COMMENT '数据更新时间',
private String dataState; // text COMMENT '数据状态(json串)',
private Integer alarmLastCreateTime; // int(11) DEFAULT NULL COMMENT '告警最后一次产生的时间',
private String deviceServiceIp; //设备链接服务器地址
private Integer alarmSqlApply; //告警sql规则申请(0-未申请,1-已申请)
private String deviceConfig; //设备配置
private String sensorType; //设备配置
private Boolean isadd; //是否添加
public static DeviceInfoDto newDefaultDeviceInfoDto(String imei, String sensor_number, String device_model, String device_type)
{
DeviceInfoDto deviceInfoDto = new DeviceInfoDto();
deviceInfoDto.setId(imei+"_"+sensor_number);
deviceInfoDto.setDeviceId(imei);
deviceInfoDto.setSensorNumber(sensor_number);
deviceInfoDto.setDeviceModel(device_model);
deviceInfoDto.setDeviceType(device_type);
deviceInfoDto.setAlarmCode("00");
deviceInfoDto.setOnline("01");
deviceInfoDto.setDeviceServiceIp(SysParameter.service_ip);
deviceInfoDto.setIsadd(true);
return deviceInfoDto;
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto;
import lombok.Data;
@Data
public class MyException extends RuntimeException{
private static final long serialVersionUID = 8827598182853467258L;
private Message errmge;
public MyException(Message myMessage) {
super(myMessage.getMessage());
this.errmge = myMessage;
}
public MyException(String message, Throwable cause) {
super(message, cause);
}
public MyException(String message) {
super(message);
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
public interface ServerAgreementContent {
String getClienConnectionId();
byte[] getCommd();
String getReplyCommdTopic(Topic topic);
void setReplyMessage(Message message);
}
... ...
package com.zhonglai.luhui.mqtt.comm.dto;
public interface ServerDto {
ServerAgreementContent getServerAgreementContent();
boolean isReplyMessage();
}
... ...
package com.zhonglai.luhui.mqtt.comm.factory;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
/**
* mqtt业务协议
*/
public interface BusinessAgreement<T> {
ServerDto analysis(Topic topic, T data) throws Exception; //解析协议
T toData(byte[] data);
}
... ...
package com.zhonglai.luhui.mqtt.comm.factory;
/**
* mqtt业务协议工厂
*/
public interface BusinessAgreementFactory {
BusinessAgreement createBusinessAgreement(String topicType);
}
... ...
package com.zhonglai.luhui.mqtt.comm.factory;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttPersistable;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import java.util.Enumeration;
public class MqttClientPersistenceImpl implements MqttClientPersistence {
@Override
public void open(String clientId, String serverURI) throws MqttPersistenceException {
}
@Override
public void close() throws MqttPersistenceException {
}
@Override
public void put(String key, MqttPersistable persistable) throws MqttPersistenceException {
}
@Override
public MqttPersistable get(String key) throws MqttPersistenceException {
return null;
}
@Override
public void remove(String key) throws MqttPersistenceException {
}
@Override
public Enumeration keys() throws MqttPersistenceException {
return null;
}
@Override
public void clear() throws MqttPersistenceException {
}
@Override
public boolean containsKey(String key) throws MqttPersistenceException {
return false;
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.factory;
import com.zhonglai.luhui.mqtt.comm.config.RedisConfig;
import com.zhonglai.luhui.mqtt.comm.config.SysParameter;
import com.zhonglai.luhui.mqtt.comm.dto.MyException;
import lombok.Data;
import java.util.Optional;
@Data
public class Topic {
private String roleid;
private String username;
private String clientid;
private String topicType;
private String redicKey;
private String messageid;
public Topic(String topic)
{
topic = Optional.ofNullable(topic).orElseThrow(()->new MyException("topic为空"));
String[] sts = topic.split("/");
String[] config = SysParameter.topicconfig.split("/");
for(int i=0;i<config.length;i++)
{
String cf = config[i].replace("{{","").replace("}}","");
switch (cf)
{
case "roleid":
roleid = sts[i];
break;
case "username":
username = sts[i];
break;
case "clientid":
clientid = sts[i];
break;
case "topicType":
topicType = sts[i];
break;
case "messageid":
messageid = sts[i];
break;
}
}
}
public String getRedicKey()
{
if(null == redicKey)
{
return generateRedicKey();
}
return redicKey;
}
private String generateRedicKey()
{
return RedisConfig.FIELD+roleid+":"+username+":"+clientid;
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
/**
* 缓存业务
*/
public interface CacheService {
boolean updateCache(Topic topic, ServerDto dto); //返回是否需要持久化
}
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import com.ruoyi.common.core.domain.Message;
import com.ruoyi.common.utils.ByteUtil;
import com.zhonglai.luhui.mqtt.comm.clien.ClienConnection;
import com.zhonglai.luhui.mqtt.comm.clien.impl.ClienConnectionImpl;
import com.zhonglai.luhui.mqtt.comm.dto.ServerDto;
import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import net.jodah.expiringmap.ExpirationListener;
import net.jodah.expiringmap.ExpirationPolicy;
import net.jodah.expiringmap.ExpiringMap;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;
/**
* 客户端通知服务
*/
@Service
public class ClienNoticeService {
private static final Logger log = LoggerFactory.getLogger(ClienNoticeService.class);
@Autowired
private TerminalService terminalService;
@Autowired
private TopicsService topicsService;
private ExpiringMap<String, ClienConnection> clienConnectionMap;
@Value("${mqtt.client.operationTime}")
private long operationTime; //客户端操作时间
@PostConstruct
public void init()
{
// maxSize: 设置最大值,添加第11个entry时,会导致第1个立马过期(即使没到过期时间)
// expiration:设置每个key有效时间10s, 如果key不设置过期时间,key永久有效。
// variableExpiration: 允许更新过期时间值,如果不设置variableExpiration,不允许后面更改过期时间,一旦执行更改过期时间操作会抛异常UnsupportedOperationException
// policy:
// CREATED: 只在put和replace方法清零过期时间
// ACCESSED: 在CREATED策略基础上增加, 在还没过期时get方法清零过期时间。
// 清零过期时间也就是重置过期时间,重新计算过期时间.
clienConnectionMap = ExpiringMap.builder().maxSize(2000).expiration(operationTime, TimeUnit.SECONDS)
.asyncExpirationListener((ExpirationListener<String, ClienConnection>) (s, clienConnection) -> clienConnection.close())
.expirationPolicy(ExpirationPolicy.CREATED).build();
}
public Message sendMessage(String imei, MqttMessage mqttMessage, String messageid) throws MqttException, InterruptedException {
//设置通知渠道
ClienConnection clienConnection = new ClienConnectionImpl();
clienConnectionMap.put(topicsService.getClienConnectionMapKey(imei,messageid),clienConnection);
sendMessage(imei,messageid,mqttMessage);
synchronized(clienConnection)
{
log.info("{}等待通知",imei);
clienConnection.wait(operationTime*1000+3000l);
}
log.info("{}收到通知{}",imei,clienConnection.getReplyMessage().getMessage());
Message message = clienConnection.getReplyMessage();
log.info("{}返回通知{}",imei,message);
return message;
}
/**
* 发送消息
* @param imei
* @param mqttMessage
* @throws MqttException
* @throws InterruptedException
*/
public void sendMessage(String imei,String messageid, MqttMessage mqttMessage) throws MqttException, InterruptedException {
//发生指令,等待通知
String topic = topicsService.getTopicFromImei(imei,messageid);
System.out.println("发送的消息内容"+ ByteUtil.hexStringToSpace(ByteUtil.toHexString(mqttMessage.getPayload()).toUpperCase()));
terminalService.publish(topic,mqttMessage);
}
public ClienConnection getClienConnection(String imei, String messageid)
{
return clienConnectionMap.get(topicsService.getClienConnectionMapKey(imei,messageid));
}
public void replySendMessage(Topic topic, ServerDto dto)
{
log.info("开始通知{},数据:{}",topic,dto);
//判断有没有需要回复的客户端,如果有就回复
if(dto.isReplyMessage())
{
ClienConnection clienConnection = getClienConnection(topic.getClientid(),dto.getServerAgreementContent().getClienConnectionId());
if(null != clienConnection)
{
synchronized(clienConnection)
{
log.info("正在通知{},通知结果{}",topic,dto);
clienConnection.reply(dto.getServerAgreementContent());
}
}
}
log.info("结束通知{}",topic);
}
public void replyTerminalMessage(Topic topic, ServerDto dto) throws MqttException {
if(dto.isReplyMessage() && null != dto.getServerAgreementContent().getReplyCommdTopic(topic))
{
String tc = dto.getServerAgreementContent().getReplyCommdTopic(topic);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(dto.getServerAgreementContent().getCommd());
log.info("回复终端{}的消息{}",tc,new String(mqttMessage.getPayload()));
terminalService.publish(dto.getServerAgreementContent().getReplyCommdTopic(topic),mqttMessage);
}
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* 数据持久化服务
*/
public abstract class DataPersistenceService {
protected BaseDao baseDao = new BaseDao();
public abstract void persistence(Topic topic, ServerDto serverDto);
public abstract void addDeviceSensorData(Topic topic, ServerDto serverDto);
/**
* 记录操作日志
* @param deviceOperationTypeEnum
* @param deviceNewState
*/
public void logDeviceOperation(String deviceInfoId, DeviceOperationTypeEnum deviceOperationTypeEnum, String deviceNewState)
{
List<Object> operateHisList = new ArrayList<>();
//如果老的和新的不一致,记录日志
LogDeviceOperation operateHis = new LogDeviceOperation();
operateHis.setDeviceInfoId(deviceInfoId);
deviceOperationTypeEnum.setDeviceOperationLog(operateHis);
if(StringUtils.isNoneBlank(deviceNewState))
{
operateHis.setDeviceNewState(deviceNewState);
}
operateHis.setIsStateChange(1);
operateHis.setDeviceOperationTime(CommonUtil.getNowTimeMilly());
operateHisList.add(operateHis);
baseDao.insertList(operateHisList, TableGenerateSqlEnum.LogDeviceOperation.getNowTableName());
}
/**
* 离线处理
* @param imei
*/
public void offLine(String imei) {
addAlarm(upDeviceInfoOffLine(imei));
}
/**
* 更新设备表离线信息
* @param imei
* @return
*/
public abstract String[] upDeviceInfoOffLine(String imei);
/**
* 离线告警记录添加(实时离线)
* @param deviceInfoIds
*/
private void addAlarm(String... deviceInfoIds)
{
List<Object> deviceAlarmInfoList = new ArrayList<>();
for(String deviceInfoId:deviceInfoIds)
{
DeviceAlarmInfo deviceAlarmInfo = new DeviceAlarmInfo();
deviceAlarmInfo.setDeviceInfoId(deviceInfoId);
deviceAlarmInfo.setAlarmTime(CommonUtil.getNowTimeMilly());
deviceAlarmInfo.setAlarmState(1);
deviceAlarmInfo.setAlarmCode("09");
deviceAlarmInfo.setIsSendNumber(0);
deviceAlarmInfoList.add(deviceAlarmInfo);
}
if(null != deviceAlarmInfoList && deviceAlarmInfoList.size() !=0 )
{
String tableName = TableUtil.getNowTableName( "ly_device_alarm_info", "device_alarm_info",3);
baseDao.insertList(deviceAlarmInfoList,tableName);
}
}
}
... ...
package com.zhonglai.luhui.mqtt.service;
package com.zhonglai.luhui.mqtt.comm.service;
import com.luhui.ly.device.mqtt.comm.dto.ServerDto;
import com.luhui.ly.device.mqtt.comm.factory.BusinessAgreement;
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import com.luhui.ly.device.mqtt.comm.config.RedisConfig;
import com.luhui.ly.device.mqtt.comm.service.DataPersistenceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/**
* 不指定redis数据库(全局监听)
*/
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
@Autowired
private DataPersistenceService dtaPersistenceService;
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
if(expiredKey.startsWith(RedisConfig.getRedisKeyPath())) //指定设备离线
{
String imei = expiredKey.replace(RedisConfig.getRedisKeyPath(),"");
dtaPersistenceService.offLine(imei);
}
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import com.luhui.ly.comm.util.GsonConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 缓存数据服务
*/
@Service
public class RedisService {
private static final Logger log = LoggerFactory.getLogger(com.luhui.ly.device.mqtt.comm.service.RedisService.class);
@Resource
private RedisTemplate<String,Object> redisTemplate;
@Value("${mqtt.client.device_life}")
private long device_life; //设备生命周期
/**
* @param key
* @return 获得值
* redis有五种数据类型 opsForValue表示是操作字符串类型
*/
public Object get(String key){
return key == null ? null : redisTemplate.opsForValue().get(key);
}
//本来只可以放入string类型,但是我们配置了自动序列化所以这儿可以传入object
public boolean set(String key,Object value){
try{
redisTemplate.opsForValue().set(key,value);
return true;
}catch (Exception e){
log.error("redis set value exception:{}",e);
return false;
}
}
/**
* 原子操作
* @param key
* @param value
* @return
*/
public boolean setexDevice(String key,Object value){
try{//TimeUnit.SECONDS指定类型为秒
redisTemplate.opsForValue().set(key,value,device_life, TimeUnit.SECONDS);
return true;
}catch (Exception e){
log.error("redis set value and expire exception:{}",e);
return false;
}
}
/**
* 原子操作
* @param key
* @param value
* @param expire 过期时间 秒
* @return
*/
public boolean setex(String key,Object value,long expire){
try{//TimeUnit.SECONDS指定类型为秒
redisTemplate.opsForValue().set(key,value,expire, TimeUnit.SECONDS);
return true;
}catch (Exception e){
log.error("redis set value and expire exception:{}",e);
return false;
}
}
/**
* 非原子操作
* @param key
* @param expire
* @return
*/
public boolean expire(String key,long expire){
try{//这儿没有ops什么的是因为每种数据类型都能设置过期时间
redisTemplate.expire(key,expire,TimeUnit.SECONDS);
return true;
}catch (Exception e){
log.error("redis set key expire exception:{}",e);
return false;
}
}
/**
*
* @param key
* @return 获取key的过期时间
*/
public long ttl(String key){
return redisTemplate.getExpire(key);
}
/**
*
* @param keys 删除key 可变参数
*/
public void del(String ...keys){
if(keys!=null&&keys.length>0) {
redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(keys));
}
}
/**
*
* @param key
* @param step 传入正数 就是加多少 传入负数就是减多少
* @return
*/
public long incrBy(String key,long step){
return redisTemplate.opsForValue().increment(key,step);
}
/**
*
* @param key
* @param value
* @return 如果该key存在就返回false 设置不成功 key不存在就返回ture设置成功
*/
public boolean setnx(String key,Object value){
return redisTemplate.opsForValue().setIfAbsent(key,value);
}
// /**
// * 原子操作
// * @param key
// * @param value
// * @param expire 在上面方法加上过期时间设置
// * @return
// */
// public boolean setnxAndExpire(String key,Object value,long expire){
// return redisTemplate.opsForValue().setIfAbsent(key,value,expire,TimeUnit.SECONDS);
// }
/**
*
* @param key
* @param value
* @return 如果该key存在就返回之前的value 不存在就返回null
*/
public Object getAndSet(String key,Object value){
return redisTemplate.opsForValue().getAndSet(key,value);
}
/**
*
* @param key
* @return 判断key是否存在
*/
public boolean hasKey(String key){
return redisTemplate.hasKey(key);
}
/***list的长度**/
public long llen(String key){
return redisTemplate.opsForList().size(key);
}
/**
* 获取key中index位置的值,负数就反过来数,-1为最后一个
* @param key
* @param index
* @return
*/
public Object lgetByIndex(String key,long index){
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
log.error("redis lgetByIndex error,key:{},index:{}exception:{}",key,index,e);
return null;
}
}
/**
* 将list放入缓存
* @param key 键
* @param value 值
* @return
*/
public boolean lrpush(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
log.error("redis lrpush error,key:{},value:{}exception:{}",key,value,e);
return false;
}
}
/**
* 将list放入缓存
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lrpush(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
log.error("redis lrpush error,key:{},value:{},timeL{},exception:{}",key,value,time,e);
return false;
}
}
/**
* 将list放入缓存
* @param key 键
* @param value 值
* @return
*/
public boolean lrpush(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
log.error("redis lrpush error,key:{},value:{},exception:{}",key,value,e);
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lrpush(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
log.error("redis lrpush error,key:{},value:{},time:{},exception:{}",key,value,time,e);
return false;
}
}
/**
* 根据索引修改list中的某条数据
* @param key 键
* @param index 索引
* @param value 值
* @return
*/
public boolean lUpdateByIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
log.error("redis lUpdateByIndex error,key:{},index:{},value:{},exception:{}",key,index,value,e);
return false;
}
}
/**
* 移除N个值为value
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lrem(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
log.error("redis lrem error,key:{},count:{},value:{},exception:{}",key,count,value,e);
return 0;
}
}
/*****hash数据类型方法 opsForHash表示是操作字符串类型*****/
/**
* @param key 健
* @param field 属性
* @param value 值
* @return
*/
public boolean hset(String key, String field, Object value) {
try {
redisTemplate.opsForHash().put(key, field, value);
return true;
}catch (Exception e){
log.error("redis hset eror,key:{},field:{},value:{}",key,field,value);
return false;
}
}
/**
*
* @param key
* @param field
* @param value
* @param seconds(秒) 过期时间
* @return
*/
public boolean hset(String key, String field, Object value,long seconds) {
try {
redisTemplate.opsForHash().put(key, field, value);
expire(key,seconds);//调用通用方法设置过期时间
return true;
}catch (Exception e){
log.error("redis hset and expire eror,key:{},field:{},value:{},exception:{}",key,field,value,e);
return false;
}
}
/**
* 获取key中field属性的值
* @param key
* @param field
* @return
*/
public Object hget(String key,String field){
return redisTemplate.opsForHash().get(key,field);
}
/**
* 获取key中多个属性的键值对,这儿使用map来接收
* @param key
* @param fields
* @return
*/
public Map<String,Object> hmget(String key, String...fields){
Map<String,Object> map = new HashMap<>();
for (String field :fields){
map.put(field,hget(key,field));
}
return map;
}
/**
* @param key 获得该key下的所有键值对
* @return
*/
public Map<Object, Object> hmget(String key){
return redisTemplate.opsForHash().entries(key);
}
/**
* @param key 获得该key下的所有键值对
* @return
*/
//map----json字符串---->对象
public <T>T hmgetObject(String key,Class<T> tClass){
Map<Object, Object> hmget = hmget(key);
if(CollectionUtils.isEmpty(hmget)) return null;
//查询到了 先把数据转成json字符串
String s = GsonConstructor.get().toJson(hmget);
//再把json字符串转回对象
return GsonConstructor.get().fromJson(s,tClass);
}
/**
* @param key 键
* @param map 对应多个键值
* @return
*/
public boolean hmset(String key,Map<Object,Object> map){
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
}catch (Exception e){
log.error("redis hmset eror,key:{},value:{},exception:{}",key,map,e);
return false;
}
}
public boolean hmset(String key,Object object){
try {
String s = GsonConstructor.get().toJson(object);
Map<String, String> map = GsonConstructor.get().fromJson(s,HashMap.class);
redisTemplate.opsForHash().putAll(key, map);
return true;
}catch (Exception e){
log.error("redis hmset eror,key:{},object:{},exception:{}",key,object,e);
return false;
}
}
/**
* @param key 键
* @param map 对应多个键值
* @param seconds 过期时间(秒)
* @return
*/
public boolean hmset(String key,Map<String,Object> map,long seconds){
try {
redisTemplate.opsForHash().putAll(key, map);
expire(key,seconds);
return true;
}catch (Exception e){
log.error("redis hmset eror,key:{},value:{},expireTime,exception:{}",key,map,seconds,e);
return false;
}
}
/**
*删除key中的属性
* @param key
* @param fields
*/
public void hdel(String key,Object...fields){
redisTemplate.opsForHash().delete(key,fields);
}
/**
* 判断key中是否存在某属性
* @param key
* @param field
* @return
*/
public boolean hHashKey(String key,String field){
return redisTemplate.opsForHash().hasKey(key,field);
}
/**
* 对key中filed的value增加多少 如果是减少就传入负数
* @param key
* @param field
* @param step 正数增加,负数减少
* @return
*/
public double hincr(String key,String field,double step){
return redisTemplate.opsForHash().increment(key,field,step);
}
/**
* key中多少个
* @param key
* @return
*/
public long hlen(String key){
return redisTemplate.opsForHash().size(key);
}
/******其他方法用到在增加********/
/***set集合***/
/**
* 获取key中所有元素
* @param key
* @return
*/
public Set<Object> sgetAll(String key){
try {
return redisTemplate.opsForSet().members(key);
}catch (Exception e){
log.error("redis sgetAll error,key:{},exception:{}",key,e);
return null;
}
}
/**
* 判断value是否在key中
* @param key
* @param value
* @return
*/
public boolean sexists(String key,Object value){
try {
return redisTemplate.opsForSet().isMember(key,value);
}catch (Exception e){
log.error("redis sexists error,key:{},value:{},exception:{}",key,value,e);
return false;
}
}
/**
* 插入多个元素
* @param key
* @param values
* @return 成功的个数
*/
public long sset(String key ,Object...values){
try {
return redisTemplate.opsForSet().add(key,values);
}catch (Exception e){
log.error("redis sset error,key:{},value:{},values:{},exception:{}",key,values,e);
return 0;
}
}
/**
* 添加元素并设置过期时间 (非原子操作)
* @param key
* @param time
* @param values
* @return
*/
public long sset(String key ,long time,Object...values){
try {
long count = redisTemplate.opsForSet().add(key,values);
expire(key,time);
return count;
}catch (Exception e){
log.error("redis sset error,key:{},value:{},values:{},exception:{}",key,values,e);
return 0;
}
}
/**
* 获取set的长度
* @param key
* @return
*/
public long sgetSize(String key){
try {
return redisTemplate.opsForSet().size(key);
}catch (Exception e){
log.error("redis sgetSize error,key:{},exception:{}",key,e);
return 0;
}
}
/**
* 移除值为value的
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long sRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
log.error("redis sRemove error,key:{},values:{},exception:{}",key,values,e);
return 0;
}
}
/**
* 随机取count个元素 count为正数就取不重复的 负数就有可能重复
* @param key
* @param count
* @return
*/
public List<Object> sRandom(String key,long count) {
try {
return redisTemplate.opsForSet().randomMembers(key,count);
} catch (Exception e) {
log.error("redis sRandom error,key:{},count:{},exception:{}",key,count,e);
return null;
}
}
/****zset工具类***/
/**
* 添加元素
* @param key
* @param member
* @param score
* @return
*/
public boolean zadd(String key,Object member,double score){
try {
return redisTemplate.opsForZSet().add(key,member,score);
} catch (Exception e) {
log.error("redis zadd error,key:{},value:{},score:{},exception:{}",key,member,score,e);
return false;
}
}
public Set<String> zrange(String key,int start,int end){
try {
Set<Object> range = redisTemplate.opsForZSet().
range(key, start, end);
if(range==null||range.size()==0) return null;
return range.stream().
map(o->(String)o).collect(Collectors.toSet());
} catch (Exception e) {
log.error("redis zrange error,key:{},start:{},end:{},exception:{}",
key,start,end,e);
return null;
}
}
/**
* 模糊匹配key
* @param match
* @return
*/
public Set<String> keys(String match){
Set<String> ks = redisTemplate.keys(match);
return ks;
}
/***这个有点多,就不一一写了 大家用到什么去补全剩下的API
* Set< V > range(K key, long start, long end); 下标范围取
* Long remove(K key, Object… values); 删除元素
*Double incrementScore(K key, V value, double delta); 增加分数
* Long rank(K key, Object o); 当前元素的位置
*Long reverseRank(K key, Object o); 反过来的位置
*Set< TypedTuple< V >> rangeWithScores(K key, long start, long end); 下标取出来带分数
* Set< V > rangeByScore(K key, double min, double max); 根据分数取
* Set< TypedTuple< V >> rangeByScoreWithScores(K key, double min, double max); 根据分数取 并携带分数
* Set< TypedTuple< V >> rangeByScoreWithScores(K key, double min, double max, long offset, long count); 翻页
*Long count(K key, double min, double max); 统计分数段的个数
*Long size(K key); 个数 底层就是card
* Long zCard(K key); 个数
* Double score(K key, Object o); 查看某个元素的分数
* Long removeRange(K key, long start, long end); 根据下标删除某个范围
* Long removeRangeByScore(K key, double min, double max); 删除某个分数段
* 等等
* **/
}
... ...
package com.zhonglai.luhui.mqtt.comm.service;
import com.luhui.ly.device.mqtt.comm.config.SysParameter;
import com.luhui.ly.device.mqtt.comm.service.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 终端服务
*/
@Service
public class TerminalService {
private static final Logger log = LoggerFactory.getLogger(com.luhui.ly.device.mqtt.comm.service.TerminalService.class);
@Autowired
private MqttCallback mqttCallback;
@Autowired
private SysParameter sysParameter;
//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
private final static ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(50000));
@Value("${mqtt.broker}")
private String broker;
@Value("${mqtt.clientId}")
private String clientId;
@Value("${mqtt.topics}")
private String topics;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
private MqttClient mqttclient;
private MqttConnectOptions options;
private void init() throws MqttException {
if(null == mqttclient)
{
mqttclient = new MqttClient(broker, clientId, new MemoryPersistence());
}
options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(15);
//设置断开后重新连接
options.setAutomaticReconnect(true);
mqttclient.setCallback(mqttCallback);
}
private void connect() throws MqttException {
options.setUserName(username);
options.setPassword(password.toCharArray());
mqttclient.connect(options);
}
public void subscribe() throws MqttException {
mqttclient.subscribe(topics.split(","));
}
@PostConstruct
public void startMqttListenerService() throws MqttException {
log.info("-----------开始启动mqtt监听服务--------------------");
init();
log.info("-----------启动参数{}--------------------",options);
connect();
subscribe();
sysParameter.inittopicconfig();
log.info("-----------mqtt监听服务启动成功--------------------");
}
public void subscribe(String[] topicFilters) throws MqttException {
mqttclient.subscribe(topicFilters);
}
public void publish(String topic, MqttMessage message) throws MqttException {
mqttclient.publish(topic,message);
}
public void publish(String topic, String messageStr) throws MqttException {
MqttMessage message = new MqttMessage();
message.setPayload(messageStr.getBytes());
mqttclient.publish(topic,message);
}
public void closeClient (String clientId,String code,String messageStr) throws MqttException {
String topic = "SYSOPERATION/CLOSE";
MqttMessage message = new MqttMessage();
Charset charset = Charset.forName("utf-8");
ByteBuffer payload = charset.encode(clientId+","+code+","+messageStr);
message.setPayload(payload.array());
mqttclient.publish(topic,message);
}
}
... ...
package com.zhonglai.luhui.mqtt.comm.service;
public interface TopicsService {
String getClienConnectionMapKey(String imei, String messageid);
String getTopicFromImei(String imei, String messageid);
}
... ...
package com.zhonglai.luhui.mqtt.comm.util;
import com.google.gson.JsonObject;
import org.apache.commons.lang3.StringUtils;
public class ChangUtil {
public static boolean changString(JsonObject oldData, String strName, String strValue)
{
if(StringUtils.isNoneBlank(strValue))
{
if(!oldData.has(strName) || !strValue.equals(oldData.get(strName).getAsString()))
{
oldData.addProperty(strName,strValue);
return true;
}
}
return false;
}
public static boolean changInt(JsonObject oldData, String intName, Integer intValue)
{
if(null != intValue)
{
if(!oldData.has(intName) || intValue -oldData.get(intName).getAsInt() != 0 )
{
oldData.addProperty(intName,intValue);
return true;
}
}
return false;
}
public static boolean changFloat(JsonObject oldData, String intName, Float intValue)
{
if(null != intValue)
{
if(!oldData.has(intName) || intValue -oldData.get(intName).getAsFloat() != 0 )
{
oldData.addProperty(intName,intValue);
return true;
}
}
return false;
}
}
... ...
... ... @@ -289,6 +289,24 @@
<artifactId>mapper-spring-boot-starter</artifactId>
<version>${tkmapper.version}</version>
</dependency>
<!-- mqtt -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.0.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.jodah</groupId>
<artifactId>expiringmap</artifactId>
<version>0.5.8</version>
</dependency>
</dependencies>
... ...
package com.ruoyi.system.dto;
package com.ruoyi.common.core.domain;
import lombok.Data;
... ... @@ -37,4 +37,13 @@ public class Message {
this.code = code.getCode();
this.message = code.getMessage();
}
public void setCode(MessageCode messageCode )
{
code = messageCode.code;
}
public void setCode(MessageCodeType code) {
this.code = code.getCode();
}
}
... ...
package com.ruoyi.system.dto;
package com.ruoyi.common.core.domain;
public enum MessageCode implements MessageCodeType{
DEFAULT_FAIL_CODE(0, "请求失败"),
... ...
package com.ruoyi.system.dto;
package com.ruoyi.common.core.domain;
public interface MessageCodeType {
int getCode();
... ...
package com.ruoyi.common.utils;
import java.util.Arrays;
public class ByteUtil {
/**
* byte数组中取int数值,本方法适用于(低位在前,高位在后)的顺序,和和intToBytes()配套使用
*
* @param src
* byte数组
* @param offset
* 从数组的第offset位开始
* @return int数值
*/
public static long bytesToLongASC(byte[] src, int offset,int lenth) {
int value = 0;
for(int i=0;i<lenth;i++)
{
value = value | ((src[offset+i] & 0xFF)<<(8*i));
}
return value;
}
/**
* 把16进制字符串转换成字节数组
*
* @param hex
* @return
*/
public static byte[] hexStringToByte(String hex) {
int len = (hex.length() / 2);
byte[] result = new byte[len];
char[] achar = hex.toCharArray();
for (int i = 0; i < len; i++) {
int pos = i * 2;
result[i] = (byte) (toByte(achar[pos]) << 4 | toByte(achar[pos + 1]));
}
return result;
}
private static byte toByte(char c) {
byte b = (byte) "0123456789ABCDEF".indexOf(c);
return b;
}
/**
* 把16进制字符串转换成字节数组
*
* @param hex
* @return
*/
public static String hexStringToSpace(String hex) {
if (null == hex) {
return null;
} else {
StringBuilder sb = new StringBuilder(hex.length() << 1);
for(int i = 0; i < hex.length(); i+=2) {
sb.append(hex.substring(i,i+2)).append(" ");
}
return sb.toString();
}
}
/**
* 把原数组加点目标数组后面
* @param dest 目标数组
* @param src 原数组
* @return
*/
public static byte[] addBytes(byte[] dest,byte[] src )
{
int dl = dest.length;
int sl = src.length;
dest = Arrays.copyOf(dest, dl+sl);//数组扩容
System.arraycopy(src,0,dest,dl,src.length);
return dest;
}
/**
* 将int数值转换为占四个字节的byte数组,本方法适用于(高位在前,低位在后)的顺序。 和bytesToInt2()配套使用
*/
public static byte[] intToBytesDESC(long value,int lenth)
{
byte[] src = new byte[lenth];
for(int i=0;i<lenth;i++)
{
src[i] = (byte) ((value>>(8*(lenth-i-1))) & 0xFF);
}
return src;
}
/**
* 将int数值转换为占四个字节的byte数组,本方法适用于(低位在前,高位在后)的顺序。 和bytesToInt()配套使用
* @param value
* 要转换的int值
* @return byte数组
*/
public static byte[] intToBytesASC( long value,int lenth)
{
byte[] src = new byte[lenth];
for(int i=lenth;i>0;i--)
{
src[i-1] = (byte) ((value>>(8*(i-1))) & 0xFF);
}
return src;
}
public static void main(String[] args) {
System.out.println(ByteUtil.toHexString( ByteUtil.intToBytesASC(2011239256,4)));
}
/**
* ip转化位4byte
* @param ip
* @return
*/
public static byte[] ipTo4Byte(String ip)
{
String[] ips = ip.split(".");
return new byte[]{(byte) Integer.parseInt(ips[0]),(byte) Integer.parseInt(ips[1]),(byte) Integer.parseInt(ips[2]),(byte) Integer.parseInt(ips[3])};
}
/**
* byte数组中取int数值,本方法适用于(低位在后,高位在前)的顺序。和intToBytes2()配套使用
*/
public static long bytesToLongDESC(byte[] src, int offset,int lenth) {
long value = 0;
for(int i=lenth;i>0;i--)
{
value = value | ((src[offset+(lenth-i)] & 0xFF)<<(8*(i-1)));
}
return value;
}
private static final char[] hex = "0123456789abcdef".toCharArray();
public static String toHexString(byte[] bytes) {
if (null == bytes) {
return null;
} else {
StringBuilder sb = new StringBuilder(bytes.length << 1);
for(int i = 0; i < bytes.length; ++i) {
sb.append(hex[(bytes[i] & 240) >> 4]).append(hex[bytes[i] & 15]);
}
return sb.toString();
}
}
/**
* 计算CRC16/Modbus校验码 低位在前,高位在后
*
* @param str 十六进制字符串
* @return
*/
public static String getCRC16(String str) {
byte[] bytes = hexStringToByte(str);
return getCRC16(bytes);
}
/**
* 计算CRC16/Modbus校验码 低位在前,高位在后
*
* @return
*/
public static String getCRC16( byte[] bytes) {
int CRC = 0x0000ffff;
int POLYNOMIAL = 0x0000a001;
int i, j;
for (i = 0; i < bytes.length; i++) {
CRC ^= ((int) bytes[i] & 0x000000ff);
for (j = 0; j < 8; j++) {
if ((CRC & 0x00000001) != 0) {
CRC >>= 1;
CRC ^= POLYNOMIAL;
} else {
CRC >>= 1;
}
}
}
String crc = Integer.toHexString(CRC);
if (crc.length() == 2) {
crc = "00" + crc;
} else if (crc.length() == 3) {
crc = "0" + crc;
}
crc = crc.substring(2, 4) + crc.substring(0, 2);
return crc.toUpperCase();
}
}
... ...
... ... @@ -47,8 +47,8 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
update iot_role
<trim prefix="SET" suffixOverrides=",">
<if test="create_time != null">create_time = #{create_time},</if>
<if test="describe != null">describe = #{describe},</if>
<if test="name != null">name = #{name},</if>
<if test="describe != null">`describe` = #{describe},</if>
<if test="name != null">`name` = #{name},</if>
<if test="used != null">used = #{used},</if>
</trim>
where id = #{id}
... ...