作者 钟来

设备数据监听服务开发

@@ -186,4 +186,9 @@ public class DateUtils extends org.apache.commons.lang3.time.DateUtils @@ -186,4 +186,9 @@ public class DateUtils extends org.apache.commons.lang3.time.DateUtils
186 String time = System.currentTimeMillis() / 1000L + ""; 186 String time = System.currentTimeMillis() / 1000L + "";
187 return Integer.parseInt(time); 187 return Integer.parseInt(time);
188 } 188 }
  189 +
  190 + public static int getBeforeDawnTimeMilly(int now) {
  191 + int daySecond = 86400;
  192 + return now - (now + 28800) % daySecond;
  193 + }
189 } 194 }
@@ -3,6 +3,7 @@ package com.zhonglai.luhui.alarm.dto; @@ -3,6 +3,7 @@ package com.zhonglai.luhui.alarm.dto;
3 import com.alibaba.otter.canal.protocol.CanalEntry; 3 import com.alibaba.otter.canal.protocol.CanalEntry;
4 import com.zhonglai.luhui.alarm.clas.UpAlarmFactory; 4 import com.zhonglai.luhui.alarm.clas.UpAlarmFactory;
5 5
  6 +import java.lang.reflect.Method;
6 import java.util.List; 7 import java.util.List;
7 8
8 public class IotDevice { 9 public class IotDevice {
@@ -136,10 +136,6 @@ public class IotTerminal { @@ -136,10 +136,6 @@ public class IotTerminal {
136 { 136 {
137 return null; 137 return null;
138 } 138 }
139 - if(iotTerminal.getId().startsWith("864814074929612"))  
140 - {  
141 - System.out.println(iotTerminal.getUser_info_id());  
142 - }  
143 if(null != iotTerminal.getUser_info_id()) 139 if(null != iotTerminal.getUser_info_id())
144 { 140 {
145 CachAlarmConfig.addDeviceUser(iotTerminal.getId(),iotTerminal.getUser_info_id()); 141 CachAlarmConfig.addDeviceUser(iotTerminal.getId(),iotTerminal.getUser_info_id());
  1 +<?xml version="1.0" encoding="UTF-8"?>
  2 +<project xmlns="http://maven.apache.org/POM/4.0.0"
  3 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5 + <modelVersion>4.0.0</modelVersion>
  6 + <parent>
  7 + <groupId>com.zhonglai.luhui</groupId>
  8 + <artifactId>lh-modules</artifactId>
  9 + <version>1.0-SNAPSHOT</version>
  10 + </parent>
  11 +
  12 + <artifactId>lh-deviceInfo-sync</artifactId>
  13 +
  14 + <properties>
  15 + <maven.compiler.source>8</maven.compiler.source>
  16 + <maven.compiler.target>8</maven.compiler.target>
  17 + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  18 + </properties>
  19 +
  20 + <dependencies>
  21 +
  22 + <dependency>
  23 + <groupId>cn.hutool</groupId>
  24 + <artifactId>hutool-all</artifactId>
  25 + </dependency>
  26 + <dependency>
  27 + <groupId>redis.clients</groupId>
  28 + <artifactId>jedis</artifactId>
  29 + </dependency>
  30 + <dependency>
  31 + <groupId>org.projectlombok</groupId>
  32 + <artifactId>lombok</artifactId>
  33 + <scope>provided</scope>
  34 + </dependency>
  35 +
  36 + <dependency>
  37 + <groupId>com.zhonglai.luhui</groupId>
  38 + <artifactId>lh-service-dao</artifactId>
  39 + </dependency>
  40 +
  41 + <dependency>
  42 + <groupId>ch.qos.logback</groupId>
  43 + <artifactId>logback-classic</artifactId>
  44 + </dependency>
  45 + </dependencies>
  46 +</project>
  1 +package com.luhui.deviceinfo.sync;
  2 +
  3 +import com.luhui.deviceinfo.sync.service.CleanupTask;
  4 +import com.luhui.deviceinfo.sync.service.MySqlDeviceInfoLisenService;
  5 +
  6 +public class DeviceInfoSyncMain {
  7 + public static void main(String[] args) {
  8 + // 注册Shutdown Hook
  9 + Runtime.getRuntime().addShutdownHook(new CleanupTask());
  10 +
  11 + MySqlDeviceInfoLisenService.start();
  12 + }
  13 +}
  1 +package com.luhui.deviceinfo.sync.service;
  2 +
  3 +import org.slf4j.Logger;
  4 +import org.slf4j.LoggerFactory;
  5 +
  6 +public class CleanupTask extends Thread{
  7 + private static final Logger logger = LoggerFactory.getLogger(CleanupTask.class);
  8 + @Override
  9 + public void run() {
  10 + logger.info("程序关闭");
  11 +
  12 + logger.info("关闭触发告警");
  13 + MySqlDeviceInfoLisenService.close();
  14 +
  15 + }
  16 +}
  1 +package com.luhui.deviceinfo.sync.service;
  2 +
  3 +import cn.hutool.db.nosql.redis.RedisDS;
  4 +import org.slf4j.Logger;
  5 +import org.slf4j.LoggerFactory;
  6 +import redis.clients.jedis.Jedis;
  7 +
  8 +import java.util.HashMap;
  9 +import java.util.Map;
  10 +
  11 +public class JedisService {
  12 +
  13 + private static final Logger logger = LoggerFactory.getLogger("redis-log");
  14 +
  15 + private static final RedisDS deviceInfoRedis = RedisDS.create();
  16 + private static final RedisDS deviceHostRedis = RedisDS.create("devicehost");
  17 +
  18 + private static void log(String action, Map<String, String> kvMap) {
  19 + StringBuilder sb = new StringBuilder();
  20 + sb.append("{\"tag\":\"jedis\",\"action\":\"").append(action).append("\"");
  21 + for (Map.Entry<String, String> entry : kvMap.entrySet()) {
  22 + sb.append(",\"").append(entry.getKey()).append("\":\"").append(entry.getValue()).append("\"");
  23 + }
  24 + sb.append("}");
  25 + logger.info(sb.toString());
  26 + }
  27 +
  28 + public static Map<String, String> getDevicHost(String imei) {
  29 + Map<String, String> result = null;
  30 + try (Jedis jedis = deviceHostRedis.getJedis()) {
  31 + result = jedis.hgetAll(imei);
  32 + } catch (Exception e) {
  33 + Map<String, String> error = new HashMap<String, String>();
  34 + error.put("imei", imei);
  35 + error.put("error", e.getMessage());
  36 + log("getDevicHost_error", error);
  37 + }
  38 + return result;
  39 + }
  40 +
  41 + public static Map<String, String> getDevicInfo(String deviceInfoId) {
  42 + Map<String, String> result = null;
  43 + try (Jedis jedis = deviceInfoRedis.getJedis()) {
  44 + result = jedis.hgetAll(deviceInfoId);
  45 + } catch (Exception e) {
  46 + Map<String, String> error = new HashMap<String, String>();
  47 + error.put("deviceInfoId", deviceInfoId);
  48 + error.put("error", e.getMessage());
  49 + log("getDevicInfo_error", error);
  50 + }
  51 + return result;
  52 + }
  53 +
  54 + public static void setDevicInfo(String deviceInfoId, String attribute, String value) {
  55 + if (value == null) {
  56 + throw new IllegalArgumentException("Value cannot be null");
  57 + }
  58 + try (Jedis jedis = deviceInfoRedis.getJedis()) {
  59 + jedis.hset(deviceInfoId, attribute, value);
  60 + Map<String, String> info = new HashMap<String, String>();
  61 + info.put("deviceInfoId", deviceInfoId);
  62 + info.put("attribute", attribute);
  63 + info.put("value", value);
  64 + log("setDevicInfo", info);
  65 + } catch (Exception e) {
  66 + Map<String, String> error = new HashMap<String, String>();
  67 + error.put("deviceInfoId", deviceInfoId);
  68 + error.put("attribute", attribute);
  69 + error.put("value", value);
  70 + error.put("error", e.getMessage());
  71 + log("setDevicInfo_error", error);
  72 + }
  73 + }
  74 +
  75 + public static void setDevicHost(String imei, String attribute, String value) {
  76 + if (value == null) {
  77 + throw new IllegalArgumentException("Value cannot be null");
  78 + }
  79 + try (Jedis jedis = deviceHostRedis.getJedis()) {
  80 + jedis.hset(imei, attribute, value);
  81 + Map<String, String> info = new HashMap<String, String>();
  82 + info.put("imei", imei);
  83 + info.put("attribute", attribute);
  84 + info.put("value", value);
  85 + log("setDevicHost", info);
  86 + } catch (Exception e) {
  87 + Map<String, String> error = new HashMap<String, String>();
  88 + error.put("imei", imei);
  89 + error.put("attribute", attribute);
  90 + error.put("value", value);
  91 + error.put("error", e.getMessage());
  92 + log("setDevicHost_error", error);
  93 + }
  94 + }
  95 +
  96 + public static void setDevicHostBatch(String imei, Map<String, String> fields) {
  97 + if (fields == null || fields.containsValue(null)) {
  98 + throw new IllegalArgumentException("Fields cannot be null or contain null values");
  99 + }
  100 + try (Jedis jedis = deviceHostRedis.getJedis()) {
  101 + jedis.hmset(imei, fields);
  102 + Map<String, String> info = new HashMap<String, String>();
  103 + info.put("imei", imei);
  104 + info.put("fieldsCount", String.valueOf(fields.size()));
  105 + log("setDevicHostBatch", info);
  106 + } catch (Exception e) {
  107 + Map<String, String> error = new HashMap<String, String>();
  108 + error.put("imei", imei);
  109 + error.put("error", e.getMessage());
  110 + log("setDevicHostBatch_error", error);
  111 + }
  112 + }
  113 +
  114 + public static void setDevicInfoBatchWithTTL(String imei, Map<String, String> fields, int ttlSeconds) {
  115 + if (fields == null ) {
  116 + throw new IllegalArgumentException("Fields cannot be null or contain null values");
  117 + }
  118 + if(fields.containsValue(null))
  119 + {
  120 + // 删除值为空的属性
  121 + fields.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue().isEmpty());
  122 + }
  123 +
  124 + try (Jedis jedis = deviceInfoRedis.getJedis()) {
  125 + jedis.hmset(imei, fields);
  126 + if (ttlSeconds > 0) {
  127 + jedis.expire(imei, ttlSeconds);
  128 + }
  129 +
  130 +// Map<String, String> info = new HashMap<String, String>();
  131 +// info.put("imei", imei);
  132 +// info.put("fieldsCount", String.valueOf(fields.size()));
  133 +// info.put("ttl", String.valueOf(ttlSeconds));
  134 +// log("setDevicHostBatchWithTTL", info);
  135 +
  136 + } catch (Exception e) {
  137 + Map<String, String> error = new HashMap<String, String>();
  138 + error.put("imei", imei);
  139 + error.put("fieldsCount", String.valueOf(fields.size()));
  140 + error.put("ttl", String.valueOf(ttlSeconds));
  141 + error.put("error", e.getMessage());
  142 + log("setDevicHostBatchWithTTL_error", error);
  143 + }
  144 + }
  145 +
  146 + public static void setDevicHostBatchWithTTL(String imei, Map<String, String> fields, int ttlSeconds) {
  147 + if (fields == null ) {
  148 + throw new IllegalArgumentException("Fields cannot be null or contain null values");
  149 + }
  150 + if(fields.containsValue(null))
  151 + {
  152 + // 删除值为空的属性
  153 + fields.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue().isEmpty());
  154 + }
  155 +
  156 + try (Jedis jedis = deviceHostRedis.getJedis()) {
  157 + jedis.hmset(imei, fields);
  158 + if (ttlSeconds > 0) {
  159 + jedis.expire(imei, ttlSeconds);
  160 + }
  161 +
  162 +// Map<String, String> info = new HashMap<String, String>();
  163 +// info.put("imei", imei);
  164 +// info.put("fieldsCount", String.valueOf(fields.size()));
  165 +// info.put("ttl", String.valueOf(ttlSeconds));
  166 +// log("setDevicHostBatchWithTTL", info);
  167 +
  168 + } catch (Exception e) {
  169 + Map<String, String> error = new HashMap<String, String>();
  170 + error.put("imei", imei);
  171 + error.put("fieldsCount", String.valueOf(fields.size()));
  172 + error.put("ttl", String.valueOf(ttlSeconds));
  173 + error.put("error", e.getMessage());
  174 + log("setDevicHostBatchWithTTL_error", error);
  175 + }
  176 + }
  177 +
  178 + public static void setDevicHostWithTTL(String imei, String attribute, String value, int ttlSeconds) {
  179 + if (value == null) {
  180 + throw new IllegalArgumentException("Value cannot be null");
  181 + }
  182 + try (Jedis jedis = deviceHostRedis.getJedis()) {
  183 + jedis.hset(imei, attribute, value);
  184 + jedis.expire(imei, ttlSeconds);
  185 + Map<String, String> info = new HashMap<String, String>();
  186 + info.put("imei", imei);
  187 + info.put("attribute", attribute);
  188 + info.put("value", value);
  189 + info.put("ttl", String.valueOf(ttlSeconds));
  190 + log("setDevicHostWithTTL", info);
  191 + } catch (Exception e) {
  192 + Map<String, String> error = new HashMap<String, String>();
  193 + error.put("imei", imei);
  194 + error.put("attribute", attribute);
  195 + error.put("value", value);
  196 + error.put("ttl", String.valueOf(ttlSeconds));
  197 + error.put("error", e.getMessage());
  198 + log("setDevicHostWithTTL_error", error);
  199 + }
  200 + }
  201 +
  202 + public static boolean devicHostExists(String imei) {
  203 + boolean exists = false;
  204 + try (Jedis jedis = deviceHostRedis.getJedis()) {
  205 + exists = jedis.exists(imei);
  206 + Map<String, String> info = new HashMap<String, String>();
  207 + info.put("imei", imei);
  208 + info.put("exists", String.valueOf(exists));
  209 + log("devicHostExists", info);
  210 + } catch (Exception e) {
  211 + Map<String, String> error = new HashMap<String, String>();
  212 + error.put("imei", imei);
  213 + error.put("error", e.getMessage());
  214 + log("devicHostExists_error", error);
  215 + }
  216 + return exists;
  217 + }
  218 +
  219 + public static void main(String[] args) {
  220 + Map<String, String> fields = new HashMap<String, String>();
  221 + fields.put("type", "MODEL-X");
  222 + fields.put("status", "online");
  223 + fields.put("alarm", "000");
  224 + setDevicHostBatch("test:123", fields);
  225 + }
  226 +}
  1 +package com.luhui.deviceinfo.sync.service;
  2 +
  3 +import java.util.HashMap;
  4 +import java.util.List;
  5 +import java.util.Map;
  6 +import java.util.concurrent.ScheduledFuture;
  7 +import java.util.concurrent.TimeUnit;
  8 +
  9 +import com.luhui.deviceinfo.sync.util.ThreadPoolUtil;
  10 +import com.zhonglai.luhui.service.dao.BaseDao;
  11 +import org.slf4j.Logger;
  12 +import org.slf4j.LoggerFactory;
  13 +/**
  14 + * 监听mysql数据库的设备信息表
  15 + */
  16 +public class MySqlDeviceInfoLisenService {
  17 + private static final Logger logger = LoggerFactory.getLogger(MySqlDeviceInfoLisenService.class);
  18 +
  19 + private static BaseDao baseDao = new BaseDao();
  20 + private static ScheduledFuture scheduledDeviceHost;
  21 + private static ScheduledFuture scheduledDeviceInfo;
  22 + private static ScheduledFuture scheduledWdbTerminal;
  23 + private static ScheduledFuture scheduledIotTerminal;
  24 + private static ScheduledFuture scheduledIotDevice;
  25 + public static void start() {
  26 +
  27 + // 异步获取数据
  28 + scheduledDeviceHost = ThreadPoolUtil.executor.scheduleWithFixedDelay(() -> {
  29 + try {
  30 + upDeviceHost();
  31 + }catch (Exception e)
  32 + {
  33 + logger.info("触发告警业务异常",e);
  34 + }
  35 +
  36 + },0,30, TimeUnit.SECONDS);
  37 +
  38 + scheduledDeviceInfo = ThreadPoolUtil.executor.scheduleWithFixedDelay(() -> {
  39 + try {
  40 + upDeviceInfo();
  41 + }catch (Exception e)
  42 + {
  43 + logger.info("触发告警业务异常",e);
  44 + }
  45 +
  46 + },0,30, TimeUnit.SECONDS);
  47 +
  48 + scheduledWdbTerminal = ThreadPoolUtil.executor.scheduleWithFixedDelay(() -> {
  49 + try {
  50 + upWdbTerminal();
  51 + }catch (Exception e)
  52 + {
  53 + logger.info("触发告警业务异常",e);
  54 + }
  55 +
  56 + },0,30, TimeUnit.SECONDS);
  57 + scheduledIotTerminal = ThreadPoolUtil.executor.scheduleWithFixedDelay(() -> {
  58 + try {
  59 + upIotTerminal();
  60 + }catch (Exception e)
  61 + {
  62 + logger.info("触发告警业务异常",e);
  63 + }
  64 +
  65 + },0,30, TimeUnit.SECONDS);
  66 +
  67 + scheduledIotDevice = ThreadPoolUtil.executor.scheduleWithFixedDelay(() -> {
  68 + try {
  69 + upIotDevice();
  70 + }catch (Exception e)
  71 + {
  72 + logger.info("触发告警业务异常",e);
  73 + }
  74 +
  75 + },0,30, TimeUnit.SECONDS);
  76 +
  77 + }
  78 +
  79 +
  80 + public static void close() {
  81 + // 取消任务并关闭资源
  82 + cancelTask(scheduledDeviceHost);
  83 + cancelTask(scheduledDeviceInfo);
  84 + cancelTask(scheduledWdbTerminal);
  85 + cancelTask(scheduledIotTerminal);
  86 + cancelTask(scheduledIotDevice);
  87 + ThreadPoolUtil.close();
  88 + }
  89 +
  90 + private static void cancelTask(ScheduledFuture<?> task) {
  91 + if (task != null) {
  92 + task.cancel(false);
  93 + }
  94 + }
  95 +
  96 + private static void upDeviceHost()
  97 + {
  98 + try {
  99 + List<Map<String,Object>> mapList = baseDao.findListBysql("SELECT id,device_model,`data`,data_update_time,alarm_code,interval_time FROM liu_yu_le.device_host WHERE data_update_time >= UNIX_TIMESTAMP(NOW()) - 60");
  100 + if(null != mapList && mapList.size()>0)
  101 + {
  102 + long time = System.currentTimeMillis();
  103 + for (Map<String,Object> map : mapList)
  104 + {
  105 + String id = (String) map.get("id");
  106 + map.remove("id");
  107 + Integer interval_time = (Integer) map.get("interval_time");
  108 + if (null == interval_time)
  109 + {
  110 + interval_time = 60;
  111 + }
  112 + Map<String,String> savemap = new HashMap<>();
  113 + savemap.put("type", (String) map.get("device_model"));
  114 + savemap.put("dataState", (String) map.get("data_state"));
  115 + savemap.put("dataUpdateTime", map.get("data_update_time")+"");
  116 + savemap.put("deviceConfig", (String) map.get("device_config"));
  117 + savemap.put("alarm", (String) map.get("alarm_code"));
  118 +
  119 + JedisService.setDevicHostBatchWithTTL(id, savemap, interval_time);
  120 + }
  121 + logger.info("鱼儿乐主机更新条数:{},耗时 {} ms",mapList.size(),time-System.currentTimeMillis());
  122 + }
  123 + }catch (Exception e)
  124 + {
  125 + logger.info("数据处理异常",e);
  126 + }
  127 + }
  128 +
  129 + private static void upDeviceInfo()
  130 + {
  131 + try {
  132 + List<Map<String,Object>> mapList = baseDao.findBysql("SELECT a.id,a.device_model,a.data_state,a.data_update_time,a.device_config,a.alarm_code,b.`interval_time` FROM liu_yu_le.device_info a LEFT JOIN liu_yu_le.`device_host` b ON a.`device_id`=b.`id` WHERE a.data_update_time >= UNIX_TIMESTAMP(NOW()) - 60");
  133 + if(null != mapList && mapList.size()>0)
  134 + {
  135 + long time = System.currentTimeMillis();
  136 + for (Map<String,Object> map : mapList)
  137 + {
  138 + String id = (String) map.get("id");
  139 + Map<String,String> savemap = new HashMap<>();
  140 + savemap.put("type", (String) map.get("device_model"));
  141 + savemap.put("dataState", (String) map.get("data_state"));
  142 + savemap.put("dataUpdateTime", map.get("data_update_time")+"");
  143 + savemap.put("deviceConfig", (String) map.get("device_config"));
  144 + savemap.put("alarm", (String) map.get("alarm_code"));
  145 + Integer interval_time = (Integer) map.get("interval_time");
  146 + if (null == interval_time)
  147 + {
  148 + interval_time = 60;
  149 + }
  150 + JedisService.setDevicInfoBatchWithTTL(id, savemap, interval_time);
  151 + }
  152 + logger.info("设备表更新条数:{},耗时 {} ms",mapList.size(),time-System.currentTimeMillis());
  153 + }
  154 + }catch (Exception e)
  155 + {
  156 + logger.info("数据处理异常",e);
  157 + }
  158 + }
  159 +
  160 + private static void upWdbTerminal()
  161 + {
  162 + try {
  163 + List<Map<String,Object>> mapList = baseDao.findBysql("SELECT a.id,a.`data`,a.data_update_time,a.alarm_code,b.`interval_time` FROM liu_yu_le.wdb_terminal a LEFT JOIN liu_yu_le.`device_host` b ON a.`base_station_id`=b.`id` WHERE a.data_update_time >= UNIX_TIMESTAMP(NOW()) - 60");
  164 + if(null != mapList && mapList.size()>0)
  165 + {
  166 + long time = System.currentTimeMillis();
  167 + for (Map<String,Object> map : mapList)
  168 + {
  169 + String id = (String) map.get("id");
  170 +
  171 + Map<String,String> savemap = new HashMap<>();
  172 + savemap.put("type", "WDB");
  173 + savemap.put("dataState", (String) map.get("data"));
  174 + savemap.put("dataUpdateTime", map.get("data_update_time")+"");
  175 + savemap.put("alarm", (String) map.get("alarm_code"));
  176 + Integer interval_time = (Integer) map.get("interval_time");
  177 + if (null == interval_time)
  178 + {
  179 + interval_time = 60;
  180 + }
  181 + JedisService.setDevicInfoBatchWithTTL(id, savemap, interval_time);
  182 + }
  183 + logger.info("温度宝表更新条数:{},耗时 {} ms",mapList.size(),time-System.currentTimeMillis());
  184 + }
  185 + }catch (Exception e)
  186 + {
  187 + logger.info("数据处理异常",e);
  188 + }
  189 + }
  190 +
  191 + private static void upIotTerminal()
  192 + {
  193 + try {
  194 + List<Map<String,Object>> mapList = baseDao.findBysql("SELECT a.id,a.`mqtt_username`,a.`things_model_value`,a.`things_model_config`,a.`update_time`,b.device_life FROM `mqtt_broker`.`iot_terminal` a LEFT JOIN `mqtt_broker`.`iot_device` b ON a.`device_id`=b.`client_id` WHERE a.data_update_time >= UNIX_TIMESTAMP(NOW()) - 60");
  195 + if(null != mapList && mapList.size()>0)
  196 + {
  197 + long time = System.currentTimeMillis();
  198 + for (Map<String,Object> map : mapList)
  199 + {
  200 + String id = (String) map.get("id");
  201 +
  202 + Map<String,String> savemap = new HashMap<>();
  203 + savemap.put("type", (String) map.get("mqtt_username"));
  204 + savemap.put("things_model_value", (String) map.get("things_model_value"));
  205 + savemap.put("things_model_config", (String) map.get("things_model_config"));
  206 + savemap.put("dataUpdateTime", null != map.get("update_time")? map.get("update_time")+"":null);
  207 + Integer interval_time = null != map.get("device_life")? Integer.parseInt(map.get("device_life")+""):60;
  208 + JedisService.setDevicInfoBatchWithTTL(id, savemap, interval_time);
  209 +
  210 + }
  211 + logger.info("终端表更新条数:{},耗时 {} ms",mapList.size(),time-System.currentTimeMillis());
  212 +
  213 + }
  214 + }catch (Exception e)
  215 + {
  216 + logger.info("数据处理异常",e);
  217 + }
  218 + }
  219 +
  220 + private static void upIotDevice()
  221 + {
  222 + try {
  223 + List<Map<String,Object>> mapList = baseDao.findBysql("SELECT `client_id`,`mqtt_username`,`things_model_value`,`things_model_config`,`update_time`,device_life FROM `mqtt_broker`.`iot_device` WHERE data_update_time >= UNIX_TIMESTAMP(NOW()) - 60");
  224 + if(null != mapList && mapList.size()>0)
  225 + {
  226 + long time = System.currentTimeMillis();
  227 + for (Map<String,Object> map : mapList)
  228 + {
  229 + String id = (String) map.get("client_id");
  230 +
  231 + Map<String,String> savemap = new HashMap<>();
  232 + savemap.put("type", (String) map.get("mqtt_username"));
  233 + savemap.put("things_model_value", (String) map.get("things_model_value"));
  234 + savemap.put("things_model_config", map.get("things_model_config")+"");
  235 + savemap.put("dataUpdateTime", null != map.get("update_time")? map.get("update_time")+"":null);
  236 + Integer interval_time = null != map.get("device_life")? Integer.parseInt(map.get("device_life")+""):60;
  237 +
  238 + JedisService.setDevicHostBatchWithTTL(id, savemap, interval_time);
  239 + }
  240 + logger.info("主机表更新条数:{},耗时 {} ms",mapList.size(),time-System.currentTimeMillis());
  241 + }
  242 + }catch (Exception e)
  243 + {
  244 + logger.info("数据处理异常",e);
  245 + }
  246 + }
  247 +}
  1 +package com.luhui.deviceinfo.sync.util;
  2 +
  3 +import java.util.concurrent.Executors;
  4 +import java.util.concurrent.ScheduledThreadPoolExecutor;
  5 +import java.util.concurrent.ThreadPoolExecutor;
  6 +import java.util.concurrent.TimeUnit;
  7 +
  8 +public class ThreadPoolUtil {
  9 + private static int poolSize = 10; // 线程池大小
  10 + public static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor (poolSize, Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
  11 +
  12 + public static void close()
  13 + {
  14 + // 关闭调度器
  15 + executor.shutdown();
  16 +
  17 + try {
  18 + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
  19 + executor.shutdownNow();
  20 + }
  21 + } catch (InterruptedException e) {
  22 + executor.shutdownNow();
  23 + Thread.currentThread().interrupt();
  24 + }
  25 + }
  26 +}
  1 +#-------------------------------------------------------------------------------
  2 +# Redis客户端配置样例
  3 +# 每一个分组代表一个Redis实例
  4 +# 无分组的Pool配置为所有分组的共用配置,如果分组自己定义Pool配置,则覆盖共用配置
  5 +# 池配置来自于:https://www.cnblogs.com/jklk/p/7095067.html
  6 +#-------------------------------------------------------------------------------
  7 +
  8 +#----- 默认(公有)配置
  9 +# 地址,默认localhost
  10 +host = 119.23.218.181
  11 +# 端口,默认6379
  12 +port = 6379
  13 +# 超时,默认2000
  14 +timeout = 2000
  15 +# 连接超时,默认timeout
  16 +connectionTimeout = 2000
  17 +# 读取超时,默认timeout
  18 +soTimeout = 2000
  19 +# 密码,默认无
  20 +# password =
  21 +# 数据库序号,默认0
  22 +database = 2
  23 +# 客户端名,默认"Hutool"
  24 +clientName = Hutool
  25 +# SSL连接,默认false
  26 +ssl = false;
  27 +
  28 +#----- 自定义分组的连接
  29 +[devicehost]
  30 +# 地址,默认localhost
  31 +host = 119.23.218.181
  32 +port = 6379
  33 +# password =
  34 +database = 3
  35 +# 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true
  36 +BlockWhenExhausted = true;
  37 +# 设置的逐出策略类名, 默认DefaultEvictionPolicy(当连接超过最大空闲时间,或连接数超过最大空闲连接数)
  38 +evictionPolicyClassName = org.apache.commons.pool2.impl.DefaultEvictionPolicy
  39 +# 是否启用pool的jmx管理功能, 默认true
  40 +jmxEnabled = true;
  41 +# 是否启用后进先出, 默认true
  42 +lifo = true;
  43 +# 最大空闲连接数, 默认8个
  44 +maxIdle = 8
  45 +# 最小空闲连接数, 默认0
  46 +minIdle = 0
  47 +# 最大连接数, 默认8个
  48 +maxTotal = 8
  49 +# 获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间, 默认-1
  50 +maxWaitMillis = -1
  51 +# 逐出连接的最小空闲时间 默认1800000毫秒(30分钟)
  52 +minEvictableIdleTimeMillis = 1800000
  53 +# 每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3
  54 +numTestsPerEvictionRun = 3;
  55 +# 对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略)
  56 +SoftMinEvictableIdleTimeMillis = 1800000
  57 +# 在获取连接的时候检查有效性, 默认false
  58 +testOnBorrow = false
  59 +# 在空闲时检查有效性, 默认false
  60 +testWhileIdle = false
  61 +# 逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1
  62 +timeBetweenEvictionRunsMillis = -1
  1 +<?xml version="1.0" encoding="UTF-8"?>
  2 +<configuration>
  3 + <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
  4 + <file>logs/output.log</file>
  5 + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  6 + <fileNamePattern>logs/output.%d{yyyy-MM-dd}.log</fileNamePattern>
  7 + <maxHistory>5</maxHistory>
  8 + </rollingPolicy>
  9 + <encoder>
  10 + <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
  11 + </encoder>
  12 + </appender>
  13 +
  14 + <!-- 控制台输出 -->
  15 + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
  16 + <encoder>
  17 + <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
  18 + </encoder>
  19 + </appender>
  20 +
  21 + <!--系统操作日志-->
  22 + <root level="info">
  23 + <appender-ref ref="FILE" />
  24 + <appender-ref ref="CONSOLE" />
  25 + </root>
  26 +
  27 +</configuration>
@@ -36,6 +36,7 @@ @@ -36,6 +36,7 @@
36 <module>lh-superweb</module> 36 <module>lh-superweb</module>
37 <module>lh-superweb-jar</module> 37 <module>lh-superweb-jar</module>
38 <module>lh-ssh-service-lesten</module> 38 <module>lh-ssh-service-lesten</module>
  39 + <module>lh-deviceInfo-sync</module>
39 </modules> 40 </modules>
40 41
41 <properties> 42 <properties>