RocketMqService.java
2.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.ruoyi.system.rocketmq;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.core.domain.Message;
import com.ruoyi.common.core.domain.MessageCode;
import com.ruoyi.common.core.domain.MessageCodeType;
import com.ruoyi.system.dto.DeviceCommandApi;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class RocketMqService {
@Value("${rocketmq.producer.send-topic}")
private String sendTopic; //客户端操作时间
@Value("${rocketmq.producer.send-tags}")
private String sendTags; //客户端操作时间
@Autowired
private RocketMQTemplate rocketMQTemplate;
public Message send(DeviceCommandApi deviceCommandApi)
{
return send(sendTopic, JSONObject.toJSONBytes(deviceCommandApi));
}
public Message send(String topic,String payload)
{
return send(topic,payload.getBytes());
}
public Message send(String topic,byte[] payload)
{
org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message(topic,sendTags,payload);
try {
org.apache.rocketmq.common.message.Message ms = rocketMQTemplate.getProducer().request(msg,30000l);
JSONObject jsonObject = (JSONObject) JSON.parse(new String(ms.getBody()));
return new Message(MessageCode.getMessageCode(jsonObject.getInteger("code")),jsonObject.getString("message"),jsonObject.get("data"));
} catch (RequestTimeoutException e) {
e.printStackTrace();
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Message(MessageCode.DEFAULT_FAIL_CODE);
}
}