作者 钟来

历史数据存储

正在显示 14 个修改的文件 包含 352 行增加49 行删除
... ... @@ -2,7 +2,9 @@ package com.zhonglai.luhui.smart.feeder;
import com.zhonglai.luhui.smart.feeder.config.OpenCVConfig;
import com.zhonglai.luhui.smart.feeder.config.OperatingData;
import com.zhonglai.luhui.smart.feeder.service.InitService;
import com.zhonglai.luhui.smart.spare.feeder.service.SerialPortService;
import org.bytedeco.javacv.FFmpegFrameGrabber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
... ... @@ -17,6 +19,14 @@ public class Main {
logger.info("配置参数");
InitService.initConfig();
logger.info("开始加载FFmpeg");
try {
FFmpegFrameGrabber.tryLoad();
} catch (FFmpegFrameGrabber.Exception e) {
throw new RuntimeException(e);
}
logger.info("FFmpeg加载完成");
logger.info("启动服务");
InitService.startService();
... ...
... ... @@ -14,9 +14,9 @@ import com.zhonglai.luhui.smart.feeder.dto.mqtt.CmdDto;
import com.zhonglai.luhui.smart.feeder.dto.mqtt.Condata;
import com.zhonglai.luhui.smart.feeder.dto.mqtt.DevicedatRequest;
import com.zhonglai.luhui.smart.feeder.dto.mqtt.Info;
import com.zhonglai.luhui.smart.feeder.service.device.SerialPortService;
import com.zhonglai.luhui.smart.feeder.util.FeederCommdUtil;
import com.zhonglai.luhui.smart.feeder.util.MessageUtil;
import com.zhonglai.luhui.smart.spare.feeder.service.SerialPortService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
... ... @@ -42,6 +42,7 @@ public class DateListenService {
public void run()
{
logger.info("数据上报");
//更新投料机数据
ScheduledConfig.scheduler.scheduleWithFixedDelay(() -> {
ModbusDto modbusDto = serialPortService.sendHexData(FeederCommdUtil.readAll());
... ...
... ... @@ -67,6 +67,7 @@ public class FishGroupImageRecognitionService {
public void run()
{
logger.info("图像识别");
scheduledFuture = ScheduledConfig.scheduler.scheduleWithFixedDelay(() -> {
if (!OperatingData.cameraData.getFishGroupImageRecognIsRun())
{
... ... @@ -85,10 +86,11 @@ public class FishGroupImageRecognitionService {
if(!cameraHandle.isOpen())
{
if (!cameraHandle.init())
{
return;
}
return;
// if (!cameraHandle.init())
// {
// return;
// }
}
OperatingData.cameraData.setFishGroupImageRecognIsRun(true);
brightnessIdentifyFishRegion();
... ...
package com.zhonglai.luhui.smart.feeder.service;
import com.zhonglai.luhui.smart.feeder.config.OperatingData;
import com.zhonglai.luhui.smart.feeder.service.device.CameraHandle;
import com.zhonglai.luhui.smart.feeder.service.device.SerialPortService;
import com.zhonglai.luhui.smart.feeder.service.device.handle.CameraRtspHandle;
import com.zhonglai.luhui.smart.feeder.service.netty.NettyClient;
import com.zhonglai.luhui.smart.spare.feeder.service.NettyClient;
import com.zhonglai.luhui.smart.spare.feeder.service.SerialPortService;
import java.io.BufferedReader;
import java.io.InputStreamReader;
... ... @@ -36,13 +37,17 @@ public class InitService {
/**
* 串口服务器启动
*/
String portName = OperatingData.feederConfig.getSerialPortConfig().getPortName();
serialPortService = new SerialPortService();
serialPortService.connect(portName);
// 启动重连线程
new Thread(() -> serialPortService.reconnect(portName, 5000)).start(); // 每5秒尝试重连一次
/**
* mq远程登录
*/
nettyClient = new NettyClient();
nettyClient.run();
nettyClient = new NettyClient(OperatingData.sysConfig.getNettyConfig().getHost(), OperatingData.sysConfig.getNettyConfig().getPort());
nettyClient.start();
/**
* 初始化海康的摄像头
... ... @@ -67,6 +72,5 @@ public class InitService {
mqttService = new MqttService();
mqttService.start();
}
}
... ...
... ... @@ -55,6 +55,7 @@ public class MqttService {
public void start()
{
log.info("数据转发服务");
if(null == mqttclient)
{
init();
... ...
... ... @@ -4,9 +4,11 @@ import com.ruoyi.common.utils.DateUtils;
import com.zhonglai.luhui.smart.feeder.config.OperatingData;
import com.zhonglai.luhui.smart.feeder.config.ScheduledConfig;
import com.zhonglai.luhui.smart.feeder.dto.mqtt.Condata;
import com.zhonglai.luhui.smart.feeder.service.device.SerialPortService;
import com.zhonglai.luhui.smart.feeder.util.FeederCommd06ResponseType;
import com.zhonglai.luhui.smart.feeder.util.FeederCommdUtil;
import com.zhonglai.luhui.smart.spare.feeder.service.SerialPortService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.concurrent.ScheduledFuture;
... ... @@ -16,6 +18,8 @@ import java.util.concurrent.TimeUnit;
* 尺度步长法
*/
public class ScaleStepMethodService {
private static final Logger logger = LoggerFactory.getLogger(ScaleStepMethodService.class);
private Integer logTady = Integer.parseInt(DateUtils.parseDateToStr("yyyyMMdd",new Date()));
private ScheduledFuture scheduledFuture;
... ... @@ -28,6 +32,7 @@ public class ScaleStepMethodService {
public void run()
{
logger.info("鱼群图像识别控制投料控制");
scheduledFuture = ScheduledConfig.scheduler.scheduleWithFixedDelay(() -> {
Integer nowTady = Integer.parseInt(DateUtils.parseDateToStr("yyyyMMdd",new Date()));
if( OperatingData.cameraData.getScaleAreaList().size()==0)
... ... @@ -69,7 +74,7 @@ public class ScaleStepMethodService {
if (OperatingData.cameraConfig.getFeedingControl())
{
//发送停止投料指令
if (serialPortService.isOpen())
if (serialPortService.isConnected())
{
if(OperatingData.feederConfig.getCondata().getRunmode()==1)
{
... ...
... ... @@ -62,6 +62,7 @@ public class SerialPortService {
}
OperatingData.feederData.setInfo(info);
}
logger.info("端口启动情况:{}",serialPort.isOpen());
}
private SerialPort findSerialPort()
... ... @@ -70,10 +71,8 @@ public class SerialPortService {
SerialPort[] serialPorts = SerialPort.getCommPorts();//查找所有串口
for(SerialPort port:serialPorts){
System.out.println("Port:"+port.getSystemPortName());//打印串口名称,如COM4
System.out.println("PortDesc:"+port.getPortDescription());//打印串口类型,如USB Serial
System.out.println("PortDesc:"+port.getDescriptivePortName());//打印串口的完整类型,如USB-SERIAL CH340(COM4)
if(port.getPortDescription().indexOf("USB")>=0)
logger.info("Port:{},PortDesc:{},PortDesc:{}",port.getSystemPortName(),port.getPortDescription(),port.getDescriptivePortName());//打印串口名称,如COM4;打印串口类型,如USB Serial;打印串口的完整类型,如USB-SERIAL CH340(COM4)
if(port.getSystemPortName().indexOf("ttyS0")>=0)
{
serialPort = port;
break;
... ...
... ... @@ -35,6 +35,7 @@ public class CameraRtspHandle implements CameraHandle {
public CameraRtspHandle()
{
logger.info("初始化海康的摄像头");
init();
}
... ... @@ -46,10 +47,8 @@ public class CameraRtspHandle implements CameraHandle {
{
return true;
}
try {
FFmpegFrameGrabber.tryLoad();
logger.info("配置的视频源类型"+OperatingData.cameraConfig.getCameraInterfaceType().toLowerCase());
switch (OperatingData.cameraConfig.getCameraInterfaceType().toLowerCase())
{
case "rtsp":
... ... @@ -138,6 +137,7 @@ public class CameraRtspHandle implements CameraHandle {
}
private boolean initRtsp() throws FFmpegFrameGrabber.Exception, InterruptedException {
logger.error("加载Rtsp");
String ip = findCameraIp();
if(StringUtils.isEmpty(ip))
{
... ... @@ -327,7 +327,7 @@ public class CameraRtspHandle implements CameraHandle {
while (addresses.hasMoreElements()) {
InetAddress addr = addresses.nextElement();
if (addr.isSiteLocalAddress()) { // Checks if this address is a "site local" address.
logger.info("主机的IP:{}",addr.getHostAddress());
if(addr.getHostAddress().contains("192.168"))
{
return addr.getHostAddress();
... ... @@ -342,6 +342,7 @@ public class CameraRtspHandle implements CameraHandle {
public static String findCameraIp() {
String localIP = getLocalIp();
logger.info("找到的主机ip:"+localIP);
if(null == localIP)
{
return null;
... ... @@ -366,7 +367,7 @@ public class CameraRtspHandle implements CameraHandle {
while (StringUtils.isEmpty(ip[0]))
{
findCamera(localIP);
Thread.sleep(1000);
Thread.sleep(10000);
}
} catch (InterruptedException e) {
e.printStackTrace();
... ... @@ -381,6 +382,7 @@ public class CameraRtspHandle implements CameraHandle {
}
return ip[0];
} catch (InterruptedException e) {
logger.error("查找摄像头失败",e);
}
return null;
}
... ... @@ -412,24 +414,32 @@ public class CameraRtspHandle implements CameraHandle {
private static String getCameraIp(String ip) throws Exception {
logger.info("开始查找摄像头的ip");
// 1.创建组播socket,加入指定的组播地址和端口
InetAddress group = InetAddress.getByName("239.255.255.250");
MulticastSocket multicastSocket = new MulticastSocket(37020);
multicastSocket.joinGroup(new InetSocketAddress(group,37020),NetworkInterface.getByInetAddress(InetAddress.getByName(ip)));
multicastSocket.setSoTimeout(100000);
multicastSocket.setSoTimeout(10000);
// 2.创建接收的数据包
byte[] buf = new byte[1024];
// 4.解析数据包,并打印出来
HCCameraRepose probe = null;
while (ObjectUtil.isEmpty(probe))
{
DatagramPacket dpReceive = new DatagramPacket(buf, buf.length);
logger.info("发送组播查找信息");
// 3.调用socket对象的接收方法接收数据包
multicastSocket.receive(dpReceive);
try {
multicastSocket.receive(dpReceive);
}catch (Exception e)
{
logger.error("组播查找异常",e);
}
String receivedXml = new String(dpReceive.getData(), 0, dpReceive.getLength());
logger.info("查找海康摄像头的结果:{}",receivedXml);
String startTag = "<IPv4Address>";
String endTag = "</IPv4Address>";
... ...
... ... @@ -5,8 +5,8 @@ import com.ruoyi.common.utils.GsonConstructor;
import com.zhonglai.luhui.smart.feeder.config.OperatingData;
import com.zhonglai.luhui.smart.feeder.dto.mqtt.CmdDto;
import com.zhonglai.luhui.smart.feeder.dto.mqtt.HeadDto;
import com.zhonglai.luhui.smart.feeder.service.netty.NettyClient;
import com.zhonglai.luhui.smart.feeder.util.MessageUtil;
import com.zhonglai.luhui.smart.spare.feeder.service.NettyClient;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.apache.commons.lang3.StringUtils;
... ... @@ -14,6 +14,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
... ... @@ -29,28 +30,7 @@ public class AgreementHandler extends MessageToMessageDecoder<String> {
private CameracontrolService cameracontrolService = new CameracontrolService();
private NettyClient nettyClient;
public AgreementHandler(NettyClient nettyClient)
{
this.nettyClient = nettyClient;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("设备上线");
nettyClient.setCtx(ctx);
// 连接建立时的处理,发送请求注册消息给服务器
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("cmd","manualcontrol");
jsonObject.addProperty("type","4G.hs");
MessageUtil.sendMessage(ctx, new CmdDto().setImei(OperatingData.sysConfig.getNettyConfig().getClientId()).setJsonObject(jsonObject).generateCmd(),true);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
logger.info("设备离线");
}
private static boolean checkAgreement(String data)
{
... ... @@ -65,6 +45,7 @@ public class AgreementHandler extends MessageToMessageDecoder<String> {
return rs;
}
@Override
protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
try {
... ... @@ -81,6 +62,7 @@ public class AgreementHandler extends MessageToMessageDecoder<String> {
break;
case "cfgdata":
cfgdataService.noticeFeeder(ctx,cmdDto);
logger.info("cfgdata数据通知结束");
break;
case "manualcontrol":
manualcontrolService.noticeFeeder(ctx,cmdDto);
... ...
... ... @@ -31,7 +31,7 @@ public class CfgdataService {
private static final Logger logger = LoggerFactory.getLogger(CfgdataService.class);
public void noticeFeeder(ChannelHandlerContext ctx,CmdDto cmdDto)
{
if(!InitService.serialPortService.open())
if(!InitService.serialPortService.isConnected())
{
MessageUtil.sendFeederResponseMessage(ctx,"cfgdataOK", FeederBackstateTtpe.serialPortErr,0);
return;
... ...
... ... @@ -25,7 +25,7 @@ public class ManualcontrolService {
public void noticeFeeder(ChannelHandlerContext ctx,CmdDto cmdDto)
{
if(!InitService.serialPortService.open())
if(!InitService.serialPortService.isConnected())
{
MessageUtil.sendFeederResponseMessage(ctx,"manualcontrolOK", FeederBackstateTtpe.serialPortErr,0);
return;
... ...
... ... @@ -104,7 +104,7 @@ public class NettyClient {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("StringEncoder", new StringEncoder(Charset.forName("gb2312")));
pipeline.addLast("StringDecoder", new StringDecoder(Charset.forName("gb2312")));
pipeline.addLast("AgreementHandler", new AgreementHandler(nettyClient));
pipeline.addLast("AgreementHandler", new AgreementHandler());
}
}
... ...
package com.zhonglai.luhui.smart.spare.feeder.service;
import com.google.gson.JsonObject;
import com.zhonglai.luhui.smart.feeder.config.OperatingData;
import com.zhonglai.luhui.smart.feeder.dto.mqtt.CmdDto;
import com.zhonglai.luhui.smart.feeder.service.feeder.AgreementHandler;
import com.zhonglai.luhui.smart.feeder.util.MessageUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class NettyClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private final String host;
private final int port;
private Bootstrap bootstrap;
private EventLoopGroup group;
private Channel channel;
private ChannelHandlerContext ctx;
private ChannelFuture future;
private final AtomicInteger retryCount = new AtomicInteger();
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() {
group = new NioEventLoopGroup();
try {
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("StringEncoder", new StringEncoder(Charset.forName("gb2312")));
ch.pipeline().addLast("StringDecoder", new StringDecoder(Charset.forName("gb2312")));
ch.pipeline().addLast(new AgreementHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("设备上线");
setCtx(ctx);
// 连接建立时的处理,发送请求注册消息给服务器
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("cmd","manualcontrol");
jsonObject.addProperty("type","4G.hs");
MessageUtil.sendMessage(ctx, new CmdDto().setImei(OperatingData.sysConfig.getNettyConfig().getClientId()).setJsonObject(jsonObject).generateCmd(),true);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
logger.info("设备离线");
future.channel().eventLoop().schedule(NettyClient.this::connect, 5, TimeUnit.SECONDS);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
});
}
});
connect();
} catch (Exception e) {
e.printStackTrace();
}
}
public void shutdown() {
group.shutdownGracefully();
}
private void connect() {
if (channel != null && channel.isActive()) {
return;
}
try {
future = bootstrap.connect(host, port);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
channel = future.channel();
logger.info("运程连接成功");
} else {
logger.info("运程连接失败,尝试重连...");
retryCount.incrementAndGet();
future.channel().eventLoop().schedule(NettyClient.this::connect, 5, TimeUnit.SECONDS);
}
}
}).sync();
} catch (InterruptedException e) {
logger.error("连接失败",e);
future.channel().eventLoop().schedule(NettyClient.this::connect, 5, TimeUnit.SECONDS);
}
}
public ChannelHandlerContext getCtx() {
return ctx;
}
public void setCtx(ChannelHandlerContext ctx)
{
this.ctx = ctx;
}
}
... ...
package com.zhonglai.luhui.smart.spare.feeder.service;
import com.fazecast.jSerialComm.SerialPort;
import com.fazecast.jSerialComm.SerialPortEvent;
import com.ruoyi.common.utils.ByteUtil;
import com.zhonglai.luhui.smart.feeder.config.OperatingData;
import com.zhonglai.luhui.smart.feeder.dto.ModbusDto;
import com.zhonglai.luhui.smart.feeder.dto.SerialPortConfig;
import com.zhonglai.luhui.smart.feeder.dto.commd.FeederCommdDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 端口服务
*/
public class SerialPortService implements com.fazecast.jSerialComm.SerialPortDataListener {
private static final Logger logger = LoggerFactory.getLogger(SerialPortService.class);
private SerialPort serialPort;
private boolean isConnected = false;
private final Object lock = new Object();
private BlockingQueue<ModbusDto> dataQueue = new LinkedBlockingQueue<>();
public void connect(String portName) {
serialPort = findSerialPort(portName);
// 尝试打开串口
logger.info("尝试打开串口:"+portName);
// serialPort = SerialPort.getCommPort(portName);
if (serialPort == null) {
logger.error("无法找到串口: " + portName);
return;
}
logger.info("串口尝试打开成功,准备连接:"+portName);
try {
SerialPortConfig serialPortConfig = OperatingData.feederConfig.getSerialPortConfig();
// 配置串口参数
serialPort.setBaudRate(serialPortConfig.getBaudrate());
serialPort.setNumDataBits(serialPortConfig.getDataBits());
serialPort.setNumStopBits(serialPortConfig.getStopBits());
serialPort.setParity(serialPortConfig.getParity());
// 打开串口
serialPort.openPort();
serialPort.addDataListener(this);
// 设置串口为非阻塞模式,以便可以异步接收数据
serialPort.setComPortTimeouts(SerialPort.TIMEOUT_READ_SEMI_BLOCKING, 0, 0);
isConnected = true;
logger.info("串口已连接: " + portName);
} catch (Exception e) {
logger.error("串口连接失败: " , e);
}
}
@Override
public int getListeningEvents() {
// 监听数据到达事件
return SerialPort.LISTENING_EVENT_DATA_AVAILABLE;//返回要监听的事件类型,以供回调函数使用。可发回的事件包括:SerialPort.LISTENING_EVENT_DATA_AVAILABLE,SerialPort.LISTENING_EVENT_DATA_WRITTEN,SerialPort.LISTENING_EVENT_DATA_RECEIVED。分别对应有数据在串口(不论是读的还是写的),有数据写入串口,从串口读取数据。如果AVAILABLE和RECEIVED同时被监听,优先触发RECEIVED
}
@Override
public void serialEvent(SerialPortEvent event) {
if (event.getEventType() == SerialPort.LISTENING_EVENT_DATA_AVAILABLE) {
try {
// 读取串口数据
byte[] newData = new byte[serialPort.bytesAvailable()];
int numRead = serialPort.readBytes(newData, newData.length);
logger.info("串口返回{}字节数据:{}",numRead, ByteUtil.toHexString(newData));
FeederCommdDto commdDto = new FeederCommdDto(newData);
dataQueue.offer(commdDto); // 将数据添加到队列中// 处理串口返回的数据
} catch (Exception e) {
logger.error("读取串口数据时出错: " , e);
}
}
}
public void disconnect() {
if (isConnected) {
try {
serialPort.removeDataListener();
serialPort.closePort();
isConnected = false;
logger.info("串口已断开连接");
} catch (Exception e) {
logger.error("串口断开失败: " , e);
}
}
}
public boolean isConnected() {
return isConnected;
}
// 实现掉线重连逻辑
public void reconnect(String portName, int reconnectIntervalMillis) {
while (!isConnected) {
try {
Thread.sleep(reconnectIntervalMillis);
connect(portName);
} catch (InterruptedException e) {
logger.error("重连线程被中断: " , e);
Thread.currentThread().interrupt();
return;
}
}
}
public ModbusDto sendByte(byte[] bytes)
{
synchronized (lock)
{
if(isConnected)
{
serialPort.writeBytes(bytes,bytes.length);
try {
ModbusDto reStr = dataQueue.poll(15, TimeUnit.SECONDS);
logger.info("接串口通知数据:{}",reStr);
return reStr;
} catch (InterruptedException e) {
logger.error("等待串口返回数据异常!" + e);
disconnect();
}
}else{
logger.error("串口未打开!" );
return null;
}
}
return null;
}
private SerialPort findSerialPort(String portName)
{
SerialPort[] serialPorts = SerialPort.getCommPorts();//查找所有串口
logger.info("总串口数"+serialPorts.length);
for(SerialPort port:serialPorts){
logger.info("Port:{},PortDesc:{},PortDesc:{}",port.getSystemPortName(),port.getPortDescription(),port.getDescriptivePortName());//打印串口名称,如COM4;打印串口类型,如USB Serial;打印串口的完整类型,如USB-SERIAL CH340(COM4)
if(port.getSystemPortName().equals(portName))
{
return port;
}
}
return null;
}
public ModbusDto sendHexData(String hexStr) {
logger.info("串口写入:{}",hexStr);
byte[] bytes = ByteUtil.hexStringToByte(hexStr.replace(" ","").trim().toUpperCase());
return sendByte(bytes);
}
}
... ...