RocketMqService.java 2.3 KB
package com.ruoyi.system.rocketmq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.core.domain.Message;
import com.ruoyi.common.core.domain.MessageCode;
import com.ruoyi.common.core.domain.MessageCodeType;
import com.ruoyi.system.dto.DeviceCommandApi;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;


@Service
public class RocketMqService {
    @Value("${rocketmq.producer.send-topic}")
    private String sendTopic; //客户端操作时间
    @Value("${rocketmq.producer.send-tags}")
    private String sendTags; //客户端操作时间
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public Message send(DeviceCommandApi deviceCommandApi)
    {
        return send(sendTopic, JSONObject.toJSONBytes(deviceCommandApi));
    }

    public Message send(String topic,String payload)
    {
        return send(topic,payload.getBytes());
    }

    public Message send(String topic,byte[] payload)
    {
        org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message(topic,sendTags,payload);
        try {
            org.apache.rocketmq.common.message.Message ms = rocketMQTemplate.getProducer().request(msg,30000l);
            JSONObject jsonObject = (JSONObject) JSON.parse(new String(ms.getBody()));
            return new Message(MessageCode.getMessageCode(jsonObject.getInteger("code")),jsonObject.getString("message"),jsonObject.get("data"));
        } catch (RequestTimeoutException e) {
            e.printStackTrace();
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return new Message(MessageCode.DEFAULT_FAIL_CODE);
    }
}