|
...
|
...
|
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; |
|
|
|
public class CollectPlcDataTask {
|
|
|
|
protected static final Logger logger = LoggerFactory.getLogger(CollectPlcDataTask.class);
|
|
|
|
|
|
|
|
private static final int maxDataLenth = 30;
|
|
|
|
private static final int maxDataLenth = 100;
|
|
|
|
public void collect(MqttService mqttService) {
|
|
|
|
ScheduledThreadPool.scheduler.scheduleAtFixedRate(() -> {
|
|
|
|
try {
|
|
...
|
...
|
@@ -51,43 +51,48 @@ public class CollectPlcDataTask { |
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private void pubMqttData(MqttService mqttService, String plcId,CachPlcConfig cachPlcConfig) throws Exception
|
|
|
|
{
|
|
|
|
private void pubMqttData(MqttService mqttService, String plcId, CachPlcConfig cachPlcConfig) throws Exception {
|
|
|
|
Map<String, PlcPoint> map = cachPlcConfig.getPlcMap();
|
|
|
|
|
|
|
|
List<PlcPoint> plcPoints = new ArrayList<>();
|
|
|
|
|
|
|
|
int dataLenth = 1 ;
|
|
|
|
for (String system : map.keySet())
|
|
|
|
{
|
|
|
|
PlcPoint plcPoint = map.get(system);
|
|
|
|
int count = 0;
|
|
|
|
for (PlcPoint plcPoint : map.values()) {
|
|
|
|
plcPoints.add(plcPoint);
|
|
|
|
dataLenth++;
|
|
|
|
if (dataLenth == maxDataLenth)
|
|
|
|
{
|
|
|
|
if(!subMqttData(mqttService, plcId, plcPoints))
|
|
|
|
{
|
|
|
|
plcPoints.clear();
|
|
|
|
break;
|
|
|
|
count++;
|
|
|
|
|
|
|
|
if (count == maxDataLenth) {
|
|
|
|
boolean success = subMqttData(mqttService, plcId, plcPoints);
|
|
|
|
System.out.println("plc " + plcId + " 发送数据 " + plcPoints.size() + " 条");
|
|
|
|
|
|
|
|
if (!success) {
|
|
|
|
// 如果失败,可以选择重试或者丢弃,但不应该 break 掉后续数据
|
|
|
|
System.err.println("plc " + plcId + " 数据发送失败,本批次丢弃");
|
|
|
|
}
|
|
|
|
System.out.println("plc "+plcId+" 发送数据 "+plcPoints.size()+" 条,等待1s接着发送");
|
|
|
|
dataLenth = 1;
|
|
|
|
plcPoints.clear();
|
|
|
|
|
|
|
|
// 重置计数器和列表
|
|
|
|
count = 0;
|
|
|
|
plcPoints = new ArrayList<>();
|
|
|
|
|
|
|
|
try {
|
|
|
|
Thread.sleep(1000l);
|
|
|
|
Thread.sleep(1000L); // 控制发送速率
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
Thread.currentThread().interrupt(); // 正确的中断处理
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (plcPoints.size() > 0)
|
|
|
|
{
|
|
|
|
subMqttData(mqttService, plcId, plcPoints);
|
|
|
|
System.out.println("plc "+plcId+" 发送数据 "+plcPoints.size()+" 条");
|
|
|
|
// 处理剩余未满一批的数据
|
|
|
|
if (!plcPoints.isEmpty()) {
|
|
|
|
boolean success = subMqttData(mqttService, plcId, plcPoints);
|
|
|
|
logger.info("plc " + plcId + " 发送数据 " + plcPoints.size() + " 条");
|
|
|
|
if (!success) {
|
|
|
|
logger.info("plc " + plcId + " 数据发送失败,本批次丢弃");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private boolean subMqttData(MqttService mqttService, String plcId, List<PlcPoint> plcPoints)
|
|
|
|
{
|
...
|
...
|
|