作者 钟来

新版x6开发

正在显示 17 个修改的文件 包含 199 行增加128 行删除
package com.zhonglai.luhui.mqtt.comm.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class RedisKeyMqttUser {
public static String ROLEID;
public static String USERNAME;
@Value("${mqtt.redis.key.roleid}")
public void setROLEID(String roleid) {
ROLEID = roleid;
}
@Value("${mqtt.redis.key.username}")
public void setUSERNAME(String username) {
USERNAME = username;
}
}
... ... @@ -49,9 +49,15 @@ public class BaseDao {
List<Object> valueList = new ArrayList<Object>();
for(Field field:fields)
{//
PublicSQLConfig publicSQLConfig = field.getAnnotation(PublicSQLConfig.class);
if(null !=publicSQLConfig && !publicSQLConfig.isSelect())
{
continue;
}
Method method;
try {
method = object.getClass().getMethod("get"+com.zhonglai.luhui.mqtt.comm.util.StringUtils.getName(field.getName()));
Object value = method.invoke(object);
if(null != value)
{
... ... @@ -164,6 +170,11 @@ public class BaseDao {
String values = "(";
for(Field field:fields)
{//
PublicSQLConfig publicSQLConfig = field.getAnnotation(PublicSQLConfig.class);
if(null !=publicSQLConfig && !publicSQLConfig.isSelect())
{
continue;
}
Method method;
try {
method = object.getClass().getMethod("get"+com.zhonglai.luhui.mqtt.comm.util.StringUtils.getName(field.getName()));
... ... @@ -232,9 +243,14 @@ public class BaseDao {
int j = 0;
for(int i=0;i<fields.length;i++)
{
Field field = fields[i];
try {
PublicSQLConfig publicSQLConfig = field.getAnnotation(PublicSQLConfig.class);
if(null !=publicSQLConfig && !publicSQLConfig.isSelect())
{
continue;
}
Method method = null;
try {
method = object.getClass().getMethod("get"+com.zhonglai.luhui.mqtt.comm.util.StringUtils.getName(field.getName()));
... ... @@ -307,6 +323,11 @@ public class BaseDao {
String idName = "id";
for(Field field:fields)
{
PublicSQLConfig publicSQLConfig1 = field.getAnnotation(PublicSQLConfig.class);
if(null !=publicSQLConfig1 && !publicSQLConfig1.isSelect())
{
continue;
}
PublicSQLConfig publicSQLConfig = field.getAnnotation(PublicSQLConfig.class);
if(null != publicSQLConfig && publicSQLConfig.isPrimarykey())
{
... ... @@ -401,6 +422,11 @@ public class BaseDao {
Field[] fields = clas.getDeclaredFields();
for(Field field:fields)
{
PublicSQLConfig publicSQLConfig1 = field.getAnnotation(PublicSQLConfig.class);
if(null !=publicSQLConfig1 && !publicSQLConfig1.isSelect())
{
continue;
}
PublicSQLConfig publicSQLConfig = field.getAnnotation(PublicSQLConfig.class);
if(null != publicSQLConfig && publicSQLConfig.isPrimarykey())
{
... ... @@ -712,6 +738,11 @@ public class BaseDao {
for(int i=0;i<fields.length;i++)
{
Field field = fields[i];
PublicSQLConfig publicSQLConfig = field.getAnnotation(PublicSQLConfig.class);
if(null !=publicSQLConfig && !publicSQLConfig.isSelect())
{
continue;
}
try {
Method method;
method = object.getClass().getMethod("get"+ com.zhonglai.luhui.mqtt.comm.util.StringUtils.getName(field.getName()));
... ... @@ -783,6 +814,11 @@ public class BaseDao {
{//
Method method;
try {
PublicSQLConfig publicSQLConfig = field.getAnnotation(PublicSQLConfig.class);
if(null !=publicSQLConfig && !publicSQLConfig.isSelect())
{
continue;
}
method = object.getClass().getMethod("get"+com.zhonglai.luhui.mqtt.comm.util.StringUtils.getName(field.getName()));
Object value = method.invoke(object);
if(null != value)
... ... @@ -845,6 +881,11 @@ public class BaseDao {
sb.append("(");
for(Field field:fields)
{
PublicSQLConfig publicSQLConfig = field.getAnnotation(PublicSQLConfig.class);
if(null !=publicSQLConfig && !publicSQLConfig.isSelect())
{
continue;
}
if(!"".equals(update) )
{
sb.append(",");
... ... @@ -960,7 +1001,6 @@ public class BaseDao {
tableNmae = (String) tObject;
}
} catch (NoSuchMethodException e) {
System.out.println("未找到getTableName");
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
... ...
... ... @@ -8,7 +8,7 @@ import com.ruoyi.system.domain.IotThingsModel;
*/
public interface ThingsModelBase<T> {
void conversionThingsModel(IotThingsModel thingsModel);
void addValue(T t);
void addValue(Object t);
// @JSONField(serialize=false)
String getView();
String getSaveView();
... ...
... ... @@ -11,8 +11,8 @@ public class ArrayModelOutput extends ThingsModelItemBase<JSONArray>
private String arrayType;
@Override
public void addValue(JSONArray jsonArray) {
setValue(jsonArray);
public void addValue(Object jsonArray) {
setValue((JSONArray) jsonArray);
}
@Override
... ...
... ... @@ -10,8 +10,8 @@ public class BoolModelOutput extends ThingsModelItemBase<Boolean>
private String trueText;
@Override
public void addValue(Boolean bl) {
setValue( bl);
public void addValue(Object bl) {
setValue((Boolean) bl);
}
@Override
... ...
... ... @@ -14,8 +14,11 @@ public class DecimalModelOutput extends ThingsModelItemBase<BigDecimal>
private String unit;
@Override
public void addValue(BigDecimal object) {
setValue(object);
public void addValue(Object object) {
if(null != object)
{
setValue(new BigDecimal(object.toString()));
}
}
@Override
... ...
... ... @@ -17,8 +17,8 @@ public class IntegerModelOutput extends ThingsModelItemBase<Integer>
private Integer acy; //精度
@Override
public void addValue(Integer object) {
setValue(object);
public void addValue(Object object) {
setValue((Integer) object);
}
@Override
... ... @@ -36,6 +36,10 @@ public class IntegerModelOutput extends ThingsModelItemBase<Integer>
{
return null;
}
if(null == acy)
{
acy = 1;
}
BigDecimal bigDecimal = new BigDecimal(getValue().toString());
return bigDecimal.divide(new BigDecimal(acy),acy.toString().length()-1, RoundingMode.HALF_UP).toString();
}
... ...
... ... @@ -8,6 +8,7 @@ import com.zhonglai.luhui.mqtt.dto.MessageCode;
import com.zhonglai.luhui.mqtt.service.db.DeviceService;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
... ... @@ -16,7 +17,7 @@ import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(consumerGroup = "deviceCommand", topic = "lh-mqtt-service-deviceCommand")
@RocketMQMessageListener(consumerGroup = "deviceCommand", topic = "${rocketmq.send-topic:}",selectorType = SelectorType.TAG,selectorExpression = "${rocketmq.send-tag:}")
public class RocketMqService implements RocketMQReplyListener<MessageExt, Message> {
private static final Logger log = LoggerFactory.getLogger(MqttCallback.class);
... ... @@ -29,6 +30,7 @@ public class RocketMqService implements RocketMQReplyListener<MessageExt, Messag
// String clint = MessageUtil.getReplyToClient(messageExt);
String str = new String(messageExt.getBody());
log.info("消息body{}",str);
DeviceCommandApi deviceCommandApi = JSON.parseObject(str, DeviceCommandApi.class);
try {
return deviceCommandApi.invokeApi(deviceService);
... ...
... ... @@ -88,7 +88,6 @@ public class BusinessDataUpdateService {
}else{
iotDevice.setListen_service_ip(SysParameter.service_ip+":"+port+contextPath);
}
logger.info("更新网关数据{}",iotDevice);
if(null== iotDevice.getStatus() || 1 == iotDevice.getStatus() || 4==iotDevice.getStatus())
{
iotDevice.setStatus(3);
... ... @@ -99,8 +98,11 @@ public class BusinessDataUpdateService {
serverDto.setIotDevice(iotDevice);
}else{ //终端
IotTerminal iotTerminal = translateTerminal(type,key,olddevice,jsData,isOperLog,serverDto);
logger.info("更新终端数据{}",iotTerminal);
iotTerminal.setData_update_time(DateUtils.getNowTimeMilly());
if(null== iotTerminal.getOnline() || 1 == iotTerminal.getOnline() || 4==iotTerminal.getOnline())
{
iotTerminal.setOnline(3);
}
serverDto.getIotTerminalList().add(iotTerminal);
}
}
... ...
... ... @@ -18,7 +18,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.TimeUnit;
... ... @@ -31,42 +30,43 @@ public class ClienNoticeService {
@Autowired
private TerminalService terminalService;
private ExpiringMap<String, ClienConnection> clienConnectionMap;
@Value("${mqtt.client.operationTime}")
private long operationTime; //客户端操作时间
// maxSize: 设置最大值,添加第11个entry时,会导致第1个立马过期(即使没到过期时间)
// expiration:设置每个key有效时间10s, 如果key不设置过期时间,key永久有效。
// variableExpiration: 允许更新过期时间值,如果不设置variableExpiration,不允许后面更改过期时间,一旦执行更改过期时间操作会抛异常UnsupportedOperationException
// policy:
// CREATED: 只在put和replace方法清零过期时间
// ACCESSED: 在CREATED策略基础上增加, 在还没过期时get方法清零过期时间。
// 清零过期时间也就是重置过期时间,重新计算过期时间.
private static ExpiringMap<String, ClienConnection> clienConnectionMap = ExpiringMap.builder().maxSize(20000).expiration(5, TimeUnit.SECONDS)
.asyncExpirationListener(new ExpirationListener<String, ClienConnection>() {
@Override
public void expired(String s, ClienConnection clienConnection) {
log.info("{} 通道消失了>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>",s);
clienConnection.close();
}
})
.expirationPolicy(ExpirationPolicy.CREATED).build();
@Value("#{${mqtt.top_return_map}}")
private Map<String,String> top_return_map; //topic返回的对应关系
@PostConstruct
public void init()
{
// maxSize: 设置最大值,添加第11个entry时,会导致第1个立马过期(即使没到过期时间)
// expiration:设置每个key有效时间10s, 如果key不设置过期时间,key永久有效。
// variableExpiration: 允许更新过期时间值,如果不设置variableExpiration,不允许后面更改过期时间,一旦执行更改过期时间操作会抛异常UnsupportedOperationException
// policy:
// CREATED: 只在put和replace方法清零过期时间
// ACCESSED: 在CREATED策略基础上增加, 在还没过期时get方法清零过期时间。
// 清零过期时间也就是重置过期时间,重新计算过期时间.
clienConnectionMap = ExpiringMap.builder().maxSize(20000).expiration(operationTime, TimeUnit.SECONDS)
.asyncExpirationListener((ExpirationListener<String, ClienConnection>) (s, clienConnection) -> clienConnection.close())
.expirationPolicy(ExpirationPolicy.CREATED).build();
}
public Message sendMessage(Topic topic, MqttMessage mqttMessage) throws MqttException, InterruptedException {
//设置通知渠道
ClienConnection clienConnection = new ClienConnectionImpl();
log.info("{} {} {} {}",topic.generateClienKey(),topic.getTopicType(),top_return_map,clienConnection);
clienConnectionMap.put(topic.generateClienKey().replace(topic.getTopicType(),top_return_map.get(topic.getTopicType())),clienConnection);
String key = topic.generateClienKey().replace(topic.getTopicType(),top_return_map.get(topic.getTopicType()));
log.info("设置通知渠道 {} {}",key,clienConnection);
clienConnectionMap.put(key,clienConnection);
sendMessage(topic.generateSendMessageTopic(),mqttMessage);
synchronized(clienConnection)
{
log.info("{}等待通知",topic.getClientid());
clienConnection.wait(operationTime*1000+3000l);
}
//清楚通道
clienConnectionMap.remove(key);
log.info("{}收到通知{}",topic.getClientid(),clienConnection.getReplyMessage().getMessage());
Message message = clienConnection.getReplyMessage();
log.info("{}返回通知{}",topic.getClientid(),message);
... ...
... ... @@ -119,19 +119,21 @@ public class DataModeAnalysisService {
serverDto.getLogDeviceOperationList().add(dviceLogService.newLogDeviceOperation(id,thingsModelBase.getSaveView(),null,controlModel+thingsModelItemBase.getName()+"为"+thingsModelBase.getView(),jsData.toString()));
}
if(null != thingsModel && null != thingsModel.getType() && 2==thingsModel.getType())
{
config.put(key,thingsModelBase);
data.put(key,thingsModelBase);
}else
switch (thingsModel.getIs_config())
{
if(null != thingsModel && null !=thingsModel.getIs_config() && 1==thingsModel.getIs_config())
{
case 0:
data.put(key,thingsModelBase);
break;
case 1:
config.put(key,thingsModelBase);
}else{
break;
case 2:
config.put(key,thingsModelBase);
data.put(key,thingsModelBase);
break;
default:
data.put(key,thingsModelBase);
}
break;
}
}
SaveDataDto saveDataDto = new SaveDataDto();
... ...
... ... @@ -49,8 +49,12 @@ public class TerminalService {
private String broker;
@Value("${mqtt.clientId}")
private String clientId;
@Value("${mqtt.topics}")
private String topics;
@Value("${mqtt.roleid}")
private String roleid;
@Value("#{'${mqtt.mqtt_usernames}'.split(',')}")
private List<String> mqtt_usernames;
@Value("#{'${mqtt.topics}'.split(',')}")
private List<String> topics;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
... ... @@ -80,7 +84,34 @@ public class TerminalService {
}
public void subscribe() throws MqttException {
mqttclient.subscribe(topics.split(","));
List<String> ts = getCompletionTopics();
mqttclient.subscribe(ts.toArray(new String[ts.size()]));
}
public List<String> getCompletionTopics()
{
List<String> ts = new ArrayList<>();
for(String mqtt_username:mqtt_usernames)
{
StringBuffer topic = new StringBuffer();
topic.append("/");
topic.append(roleid);
topic.append("/");
topic.append(mqtt_username);
topic.append("/");
topic.append("+");
topic.append("/");
topic.append("+");
for(String tc:topics)
{
StringBuffer t = new StringBuffer(topic);
t.append("/");
t.append(tc);
ts.add(t.toString());
}
}
return ts;
}
@PostConstruct
... ... @@ -103,7 +134,8 @@ public class TerminalService {
{
List<String> roleids=new ArrayList<>();
List<String> usernames= new ArrayList<>();
for(String topicstr:topics.split(","))
List<String> ts = getCompletionTopics();
for(String topicstr:ts)
{
Topic topic = new Topic(topicstr);
if(null != topic)
... ...
... ... @@ -41,10 +41,12 @@ public class CacheServiceImpl implements CacheService {
IotDevice iotDevice = serverDto.getIotDevice();
if(null == iotDevice)
{
return false;
iotDevice = new IotDevice();
iotDevice.setClient_id(topic.getClientid());
iotDevice.setStatus(3);
}
setIotDeviceToRedis(iotDevice);
log.info("{} 缓存更新 {}",topic,iotDevice);
log.info("缓存更新 {} ",topic);
List<IotTerminal> list = serverDto.getIotTerminalList();
if(null != list && list.size() != 0 )
... ... @@ -54,7 +56,7 @@ public class CacheServiceImpl implements CacheService {
setIotTerminalToRedis(iotTerminal);
}
}
log.info("{} 缓存更新 {}",topic,list);
log.info("缓存更新 {} ",topic);
return true;
}
... ... @@ -81,7 +83,7 @@ public class CacheServiceImpl implements CacheService {
if(null == iotTerminal)
{
String imei = id.split("_")[0];
IotDevice iotDevice = getIotDevice(id);
IotDevice iotDevice = getIotDevice(imei);
if(null == iotDevice)
{
return null;
... ...
... ... @@ -25,12 +25,15 @@ public class DataPersistenceServiceImpl extends DataPersistenceService {
private DeviceLogService dviceLogService;
@Override
public void persistence(Topic topic, ServerDto serverDto) {
log.info("更新数据{}",topic);
IotDevice iotDevice = serverDto.getIotDevice();
if(null == iotDevice)
{
return ;
iotDevice = new IotDevice();
iotDevice.setClient_id(topic.getClientid());
iotDevice.setStatus(3);
}
log.info("更新网关数据{}",iotDevice);
baseDao.update(iotDevice);
List<IotTerminal> list = serverDto.getIotTerminalList();
... ... @@ -38,7 +41,6 @@ public class DataPersistenceServiceImpl extends DataPersistenceService {
{
for (IotTerminal iotTerminal:list)
{
log.info("更新终端数据{}",iotTerminal);
baseDao.update(iotTerminal);
}
}
... ...
... ... @@ -21,6 +21,8 @@ import com.zhonglai.luhui.mqtt.service.db.mode.TerminalDataThingsModeService;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
... ... @@ -31,6 +33,7 @@ import java.util.Set;
@Service
public class DeviceService {
private static final Logger log = LoggerFactory.getLogger(DeviceService.class);
@Autowired
private ClienNoticeService clienNoticeService;
... ... @@ -152,7 +155,7 @@ public class DeviceService {
* @throws InterruptedException
*/
public Message read(String clienid,Map<String,Object> map) throws MqttException, InterruptedException {
log.info("api请求读取 {} 参数 {}",map);
if(null == map || map.size() ==0)
{
return new Message(MessageCode.DEFAULT_FAIL_CODE,"参数验证失败");
... ...
... ... @@ -27,53 +27,53 @@ public class ReadReqTopic implements BusinessAgreement<ReadReqDto> {
@Autowired
private ClienNoticeService clienNoticeService; //客户端通知服务
@Autowired
private BusinessDataUpdateService businessDataUpdateService ;
private TerminalDataThingsModeService terminalDataThingsModeService;
@Override
public ServerDto analysis(Topic topic, ReadReqDto data) {
if(1==data.getCode())
{
// JSONObject vjsonObject = data.getData().clone();
businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ADD,topic,true,data);
JSONObject vjsonObject = data.getData().clone();
// businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ADD,topic,true,data);
// if(null != vjsonObject && vjsonObject.size() !=0 )
// {
// for(String vkey:vjsonObject.keySet())
// {
// JSONObject jsData = vjsonObject.getJSONObject(vkey);
// for(String key:jsData.keySet())
// {
// IotThingsModel thingsModel = terminalDataThingsModeService.getIotThingsModel(topic.getUsername(),key);
// if(null == thingsModel) //没有配置的 都按字符串处理
// {
// thingsModel = new IotThingsModel();
// thingsModel.setData_type(ThingsModelDataTypeEnum.STRING.name());
// thingsModel.setIdentifier(key);
// thingsModel.setModel_name(key);
// thingsModel.setIs_top(0);
// thingsModel.setIs_monitor(0);
// thingsModel.setIs_save_log(0);
// thingsModel.setIs_config(0);
// JSONObject jsonObject = new JSONObject();
// jsonObject.put("maxLength",255);
// thingsModel.setSpecs(jsonObject.toString());
// }
// String data_type = thingsModel.getData_type().toUpperCase();
// if(!EnumUtils.isValidEnum(ThingsModelDataTypeEnum.class,data_type))
// {
// data_type = ThingsModelDataTypeEnum.STRING.name();
// }
// Class<ThingsModelBase> aClass = Enum.valueOf(ThingsModelDataTypeEnum.class,data_type).getaClass();
// ThingsModelBase thingsModelBase = JSON.parseObject(thingsModel.getSpecs(),aClass);
// thingsModelBase.conversionThingsModel(thingsModel);
//
// thingsModelBase.addValue(jsData.get(key));
// jsData.put(key,thingsModelBase);
// }
// vjsonObject.put(vkey,jsData);
// }
// data.setData(vjsonObject);
// }
if(null != vjsonObject && vjsonObject.size() !=0 )
{
for(String vkey:vjsonObject.keySet())
{
JSONObject jsData = vjsonObject.getJSONObject(vkey);
for(String key:jsData.keySet())
{
IotThingsModel thingsModel = terminalDataThingsModeService.getIotThingsModel(topic.getUsername(),key);
if(null == thingsModel) //没有配置的 都按字符串处理
{
thingsModel = new IotThingsModel();
thingsModel.setData_type(ThingsModelDataTypeEnum.STRING.name());
thingsModel.setIdentifier(key);
thingsModel.setModel_name(key);
thingsModel.setIs_top(0);
thingsModel.setIs_monitor(0);
thingsModel.setIs_save_log(0);
thingsModel.setIs_config(0);
JSONObject jsonObject = new JSONObject();
jsonObject.put("maxLength",255);
thingsModel.setSpecs(jsonObject.toString());
}
String data_type = thingsModel.getData_type().toUpperCase();
if(!EnumUtils.isValidEnum(ThingsModelDataTypeEnum.class,data_type))
{
data_type = ThingsModelDataTypeEnum.STRING.name();
}
Class<ThingsModelBase> aClass = Enum.valueOf(ThingsModelDataTypeEnum.class,data_type).getaClass();
ThingsModelBase thingsModelBase = JSON.parseObject(thingsModel.getSpecs(),aClass);
thingsModelBase.conversionThingsModel(thingsModel);
thingsModelBase.addValue(jsData.get(key));
jsData.put(key,thingsModelBase);
}
vjsonObject.put(vkey,jsData);
}
data.setData(vjsonObject);
}
clienNoticeService.replySendMessage(topic, message -> {
if(null != data.getData())
{
... ...
... ... @@ -43,21 +43,18 @@ mqtt:
broker: tcp://175.24.61.68:1883
#唯一标识
clientId: ${random.uuid}
#公司id
roleid: 2
mqtt_usernames: 6_WP
#订阅的topic
topics: "/2/6_WP/+/+/ADD_POST,/2/6_WP/+/+/ALL_POST,/2/6_WP/+/+/DB_TOPIC_DISTRIBUTE,/2/6_WP/+/+/GET/+,/2/6_WP/+/+/online,/2/6_WP/+/+/PUT_REQ/+,/2/6_WP/+/+/READ_REQ/+"
topics: ADD_POST,ALL_POST,DB_TOPIC_DISTRIBUTE,GET/+,online,PUT_REQ/+,READ_REQ/+
topicconfig: "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/{{messageid}}"
top_return_map: '{"PUT":"PUT_REQ","READ":"READ_REQ"}'
username: sysuser
password: "!@#1qaz"
client:
#客户端操作时间
operationTime: 5
redis:
key:
#角色
roleid:
#用户
username:
operationTime: 10
sys:
redis:
... ... @@ -67,4 +64,6 @@ sys:
#rocketmq配置信息
rocketmq:
#nameservice服务器地址(多个以英文逗号隔开)
name-server: 47.115.144.179:9876
\ No newline at end of file
name-server: 47.115.144.179:9876
send-topic: lh-mqtt-service-deviceCommand-test
send-tag: 1
\ No newline at end of file
... ...