|
|
|
package com.zhonglai.luhui.neutrino.proxy.common.mqtt;
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
|
|
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
|
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
|
|
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
|
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
import javax.annotation.PreDestroy;
|
|
|
|
import java.util.Set;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* 通过mqtt实现消息推送,用来处理长链接
|
|
|
|
*/
|
|
|
|
public class MqttService {
|
|
|
|
private final Logger log = LoggerFactory.getLogger(this.getClass());
|
|
|
|
|
|
|
|
private MqttClient mqttclient;
|
|
|
|
private MqttConnectOptions options;
|
|
|
|
|
|
|
|
private OperateService operateService;
|
|
|
|
|
|
|
|
// 使用 volatile 确保多线程内存可见性和禁止指令重排
|
|
|
|
private static volatile MqttService instance;
|
|
|
|
|
|
|
|
private MqttService(OperateService operateService) {
|
|
|
|
this.operateService = operateService;
|
|
|
|
// 初始化代码
|
|
|
|
try {
|
|
|
|
sart();
|
|
|
|
} catch (MqttException e) {
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public static MqttService getInstance( OperateService operateService) {
|
|
|
|
if (instance == null) {
|
|
|
|
synchronized (MqttService.class) {
|
|
|
|
if (instance == null) {
|
|
|
|
instance = new MqttService(operateService);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return instance;
|
|
|
|
}
|
|
|
|
|
|
|
|
private void init() throws MqttException {
|
|
|
|
if(null == mqttclient)
|
|
|
|
{
|
|
|
|
mqttclient = new MqttClient(MqttConfig.mqttBroker, MqttConfig.mqttClientId, new MemoryPersistence());
|
|
|
|
}
|
|
|
|
options = new MqttConnectOptions();
|
|
|
|
options.setCleanSession(true);
|
|
|
|
options.setConnectionTimeout(15);
|
|
|
|
options.setKeepAliveInterval(60); // 添加心跳设置,单位为秒
|
|
|
|
//设置断开后重新连接
|
|
|
|
options.setAutomaticReconnect(true);
|
|
|
|
mqttclient.setCallback(new MqttCallback(this));
|
|
|
|
}
|
|
|
|
|
|
|
|
public boolean connect() {
|
|
|
|
try {
|
|
|
|
options.setUserName(MqttConfig.mqttUserName);
|
|
|
|
options.setPassword(MqttConfig.mqttPassword.toCharArray());
|
|
|
|
mqttclient.connect(options);
|
|
|
|
return true;
|
|
|
|
} catch (MqttException e) {
|
|
|
|
log.error("-----------mqtt连接服务器失败--------------------",e);
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
public void sart() throws MqttException{
|
|
|
|
MqttConfig.init();
|
|
|
|
log.info("-----------开始启动mqtt监听服务--------------------");
|
|
|
|
init();
|
|
|
|
connect();
|
|
|
|
log.info("-----------mqtt连接服务器成功--------------------");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
@PreDestroy
|
|
|
|
public void stop() throws MqttException {
|
|
|
|
if(null != mqttclient)
|
|
|
|
{
|
|
|
|
log.info("退出mqtt服务");
|
|
|
|
mqttclient.unsubscribe(MqttConfig.subTopic.toArray(new String[MqttConfig.subTopic.size()]));
|
|
|
|
mqttclient.disconnect();
|
|
|
|
mqttclient.close();
|
|
|
|
}
|
|
|
|
instance = null;
|
|
|
|
}
|
|
|
|
public void publish(String topic, String messageStr) throws MqttException {
|
|
|
|
MqttMessage message = new MqttMessage();
|
|
|
|
message.setPayload(messageStr.getBytes());
|
|
|
|
mqttclient.publish(topic,message);
|
|
|
|
}
|
|
|
|
|
|
|
|
public Set<String> subscribe()
|
|
|
|
{
|
|
|
|
try {
|
|
|
|
mqttclient.subscribe(MqttConfig.subTopic.toArray(new String[MqttConfig.subTopic.size()]));
|
|
|
|
} catch (MqttException e) {
|
|
|
|
e.printStackTrace();
|
|
|
|
}
|
|
|
|
return MqttConfig.subTopic;
|
|
|
|
}
|
|
|
|
|
|
|
|
public boolean isConnect()
|
|
|
|
{
|
|
|
|
return mqttclient != null && mqttclient.isConnected();
|
|
|
|
}
|
|
|
|
|
|
|
|
public void operate(Topic topic, JSONObject payloaddata)
|
|
|
|
{
|
|
|
|
operateService.operate(topic,payloaddata);
|
|
|
|
}
|
|
|
|
} |