|
|
|
package com.zhonglai.luhui.device.protocol.factory.control;
|
|
|
|
|
|
|
|
import com.google.gson.JsonObject;
|
|
|
|
import com.ruoyi.common.utils.GsonConstructor;
|
|
|
|
import com.zhonglai.luhui.device.analysis.comm.clien.ClienConnection;
|
|
|
|
import com.zhonglai.luhui.device.analysis.comm.clien.impl.ClienConnectionImpl;
|
|
|
|
import com.zhonglai.luhui.device.analysis.comm.dto.ApiClientRePlyDto;
|
|
|
|
import com.zhonglai.luhui.device.analysis.comm.dto.TerminalClientRePlyDto;
|
|
|
|
import com.zhonglai.luhui.device.analysis.comm.factory.Topic;
|
|
|
|
import com.zhonglai.luhui.device.analysis.dto.Message;
|
|
|
|
import com.zhonglai.luhui.device.analysis.dto.MessageCode;
|
|
|
|
import com.zhonglai.luhui.device.analysis.util.TopicUtil;
|
|
|
|
import com.zhonglai.luhui.device.protocol.factory.analysis.ProtocolParserFactory;
|
|
|
|
import com.zhonglai.luhui.device.protocol.factory.config.DeviceCach;
|
|
|
|
import com.zhonglai.luhui.device.protocol.factory.config.PluginsClassLoader;
|
|
|
|
import com.zhonglai.luhui.device.protocol.factory.dto.DeviceCommand;
|
|
|
|
import com.zhonglai.luhui.device.protocol.factory.dto.NoticeMessageDto;
|
|
|
|
import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceHostDto;
|
|
|
|
import com.zhonglai.luhui.device.protocol.factory.service.IotThingsModelService;
|
|
|
|
import com.zhonglai.luhui.device.protocol.factory.service.PersistenceDBService;
|
|
|
|
import net.jodah.expiringmap.ExpirationListener;
|
|
|
|
import net.jodah.expiringmap.ExpirationPolicy;
|
|
|
|
import net.jodah.expiringmap.ExpiringMap;
|
|
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
|
|
import org.apache.rocketmq.spring.annotation.MessageModel;
|
|
|
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
|
import org.apache.rocketmq.spring.annotation.SelectorType;
|
|
|
|
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
|
|
|
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
|
|
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* 设备指令监听服务
|
|
|
|
*/
|
|
|
|
@Service
|
|
|
|
@RocketMQMessageListener(consumerGroup = "deviceCommand", topic = "deviceCommandListen",messageModel = MessageModel.BROADCASTING)
|
|
|
|
public class DeviceCommandListenService implements RocketMQReplyListener<MessageExt, Message> {
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(DeviceCommandListenService.class);
|
|
|
|
|
|
|
|
private static ExpiringMap<String, ClienConnection> clienConnectionMap = ExpiringMap.builder().maxSize(20000).expiration(15, TimeUnit.SECONDS)
|
|
|
|
.asyncExpirationListener(new ExpirationListener<String, ClienConnection>() {
|
|
|
|
@Override
|
|
|
|
public void expired(String s, ClienConnection clienConnection) {
|
|
|
|
log.info("{} 通道消失了>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>",s);
|
|
|
|
clienConnection.close();
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.expirationPolicy(ExpirationPolicy.CREATED).build();
|
|
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
private PersistenceDBService persistenceDBService;
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
private ClienNoticeServiceFactory clienNoticeServiceFactory;
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
private IotThingsModelService iotThingsModelService;
|
|
|
|
|
|
|
|
@Value("${mqtt.client.operationTime}")
|
|
|
|
private long operationTime; //客户端操作时间
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Message onMessage(MessageExt messageExt) {
|
|
|
|
log.info("监听到消息{}",messageExt);
|
|
|
|
|
|
|
|
String str = new String(messageExt.getBody());
|
|
|
|
log.info("消息body{}",str);
|
|
|
|
|
|
|
|
DeviceCommand deviceCommand = GsonConstructor.get().fromJson(str, DeviceCommand.class);
|
|
|
|
ParserDeviceHostDto parserDeviceHostDto = DeviceCach.getDeviceHost(deviceCommand.getDeviceId());
|
|
|
|
if(null == parserDeviceHostDto)
|
|
|
|
{
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"设备连接丢失,请稍后尝试");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (clienConnectionMap.containsKey(deviceCommand.getDeviceId()))
|
|
|
|
{
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"有其他人在控制设备,请稍后重试!");
|
|
|
|
}
|
|
|
|
|
|
|
|
String classname = persistenceDBService.getClassnameFromIotProtocolClassId(parserDeviceHostDto.getIotProduct().getAnalysis_clas());
|
|
|
|
try {
|
|
|
|
;
|
|
|
|
ProtocolParserFactory protocolParserFactory = PluginsClassLoader.getJarClass(ProtocolParserFactory.class,classname);
|
|
|
|
String controlCls = classname.replace("analysis."+protocolParserFactory.getClass().getSimpleName(),"control.DeviceCommandListenServiceImpl");
|
|
|
|
DeviceCommandServiceFactory deviceCommandServiceFactory = PluginsClassLoader.getJarClass(DeviceCommandServiceFactory.class,controlCls);
|
|
|
|
if(null != deviceCommandServiceFactory)
|
|
|
|
{
|
|
|
|
switch (deviceCommand.getCommandType())
|
|
|
|
{
|
|
|
|
case read:
|
|
|
|
NoticeMessageDto noticeMessageDomain = deviceCommandServiceFactory.read(deviceCommand.getDeviceId(), deviceCommand.getData());
|
|
|
|
if(null == noticeMessageDomain)
|
|
|
|
{
|
|
|
|
new Message(MessageCode.DEFAULT_FAIL_CODE,"该设备不支持读取功能");
|
|
|
|
}
|
|
|
|
return sendMessage(noticeMessageDomain);
|
|
|
|
case write:
|
|
|
|
noticeMessageDomain = deviceCommandServiceFactory.write(deviceCommand.getDeviceId(), deviceCommand.getData());
|
|
|
|
if(null == noticeMessageDomain)
|
|
|
|
{
|
|
|
|
new Message(MessageCode.DEFAULT_FAIL_CODE,"该设备不支持写入功能");
|
|
|
|
}
|
|
|
|
return sendMessage(noticeMessageDomain);
|
|
|
|
case notice:
|
|
|
|
if(deviceCommandServiceFactory.notice(deviceCommand.getDeviceId(), deviceCommand.getData()))
|
|
|
|
{
|
|
|
|
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"指令发送成功");
|
|
|
|
}else {
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"指令发送失败");
|
|
|
|
}
|
|
|
|
case cleanDeviceHost:
|
|
|
|
DeviceCach.cleanDeviceHost(deviceCommand.getDeviceId());
|
|
|
|
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"指令发送成功");
|
|
|
|
case cleanDeviceInfo:
|
|
|
|
DeviceCach.cleanDeviceInfo(deviceCommand.getDeviceId()+"_"+deviceCommand.getData().get("sensor_number").getAsString());
|
|
|
|
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"指令发送成功");
|
|
|
|
case upIotThingsModel:
|
|
|
|
iotThingsModelService.upIotThingsModel(deviceCommand.getData().get("product_id").getAsInt());
|
|
|
|
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"指令发送成功");
|
|
|
|
case upIotThingsModelTranslate:
|
|
|
|
iotThingsModelService.upIotThingsModelTranslate(deviceCommand.getData().get("product_id").getAsInt());
|
|
|
|
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"指令发送成功");
|
|
|
|
default:
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"指令类型不存在,请联系管理员");
|
|
|
|
}
|
|
|
|
}else {
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"该设备的控制服务异常,请联系管理员");
|
|
|
|
}
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
log.error("消息发送失败",e);
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"等待通知错误");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public Message sendMessage( NoticeMessageDto noticeMessageDomain) throws InterruptedException {
|
|
|
|
|
|
|
|
Topic topic = noticeMessageDomain.getTopic();
|
|
|
|
|
|
|
|
//设置通知渠道
|
|
|
|
ClienConnection clienConnection = new ClienConnectionImpl();
|
|
|
|
log.info("设置通知渠道 {} {}",topic.getClientid(),clienConnection);
|
|
|
|
clienConnectionMap.put(topic.getClientid(),clienConnection);
|
|
|
|
clienNoticeServiceFactory.sendMessage(noticeMessageDomain);
|
|
|
|
synchronized(clienConnection)
|
|
|
|
{
|
|
|
|
log.info("{}等待通知",topic.getClientid());
|
|
|
|
clienConnection.wait(operationTime*1000+3000l);
|
|
|
|
}
|
|
|
|
//清楚通道
|
|
|
|
clienConnectionMap.remove(topic.getClientid());
|
|
|
|
log.info("{}收到通知{}",topic.getClientid(),clienConnection.getReplyMessage());
|
|
|
|
Message message = clienConnection.getReplyMessage();
|
|
|
|
log.info("{}返回通知{}",topic.getClientid(),message);
|
|
|
|
|
|
|
|
return message;
|
|
|
|
}
|
|
|
|
|
|
|
|
public ClienConnection getClienConnection(String clientid)
|
|
|
|
{
|
|
|
|
return clienConnectionMap.get(clientid);
|
|
|
|
}
|
|
|
|
|
|
|
|
public static boolean hasClienConnection(String clientid)
|
|
|
|
{
|
|
|
|
return clienConnectionMap.containsKey(clientid);
|
|
|
|
}
|
|
|
|
|
|
|
|
private void log(Topic topic,JsonObject jsonObject)
|
|
|
|
{
|
|
|
|
// AddPostDto addPostDto = new AddPostDto();
|
|
|
|
// addPostDto.setData(JSON.parseObject(jsonObject.toString()));
|
|
|
|
// addPostDto.setIotTerminalList(new ArrayList<>());
|
|
|
|
// addPostDto.setDeviceSensorDataList(new ArrayList<>());
|
|
|
|
// addPostDto.setLogDeviceOperationList(new ArrayList<>());
|
|
|
|
// businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ADD,topic,addPostDto,"远程控制",true);
|
|
|
|
// cacheServiceImpl.updateCache(topic,addPostDto);
|
|
|
|
// addPostDto.setLogDeviceOperationList(logDeviceOperationList);
|
|
|
|
// dataPersistenceService.persistence(topic,addPostDto);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* 通知给api操作端
|
|
|
|
* @param clientid
|
|
|
|
* @param apiClientRePlyDto
|
|
|
|
*/
|
|
|
|
public void replySendMessage(String clientid, ApiClientRePlyDto apiClientRePlyDto)
|
|
|
|
{
|
|
|
|
log.info("开始通知{},数据:{}",clientid,apiClientRePlyDto);
|
|
|
|
//判断有没有需要回复的客户端,如果有就回复
|
|
|
|
ClienConnection clienConnection = getClienConnection(clientid);
|
|
|
|
if(null != clienConnection)
|
|
|
|
{
|
|
|
|
synchronized(clienConnection)
|
|
|
|
{
|
|
|
|
log.info("正在通知{},通知结果{}",clientid,apiClientRePlyDto);
|
|
|
|
clienConnection.reply(apiClientRePlyDto);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
log.info("结束通知{}",clientid);
|
|
|
|
}
|
|
|
|
|
|
|
|
} |
...
|
...
|
|