作者 crossoverJie
提交者 GitHub

Merge pull request #16 from crossoverJie/cim-1.0.1

cim 1.0.1
正在显示 33 个修改的文件 包含 1249 行增加66 行删除
@@ -66,10 +66,6 @@ @@ -66,10 +66,6 @@
66 <artifactId>spring-boot-starter-actuator</artifactId> 66 <artifactId>spring-boot-starter-actuator</artifactId>
67 </dependency> 67 </dependency>
68 68
69 - <dependency>  
70 - <groupId>de.codecentric</groupId>  
71 - <artifactId>spring-boot-admin-starter-client</artifactId>  
72 - </dependency>  
73 69
74 <dependency> 70 <dependency>
75 <groupId>ch.qos.logback</groupId> 71 <groupId>ch.qos.logback</groupId>
@@ -19,6 +19,9 @@ public class AppConfiguration { @@ -19,6 +19,9 @@ public class AppConfiguration {
19 @Value("${cim.user.userName}") 19 @Value("${cim.user.userName}")
20 private String userName; 20 private String userName;
21 21
  22 + @Value("${cim.msg.logger.path}")
  23 + private String msgLoggerPath ;
  24 +
22 public Long getUserId() { 25 public Long getUserId() {
23 return userId; 26 return userId;
24 } 27 }
@@ -34,4 +37,12 @@ public class AppConfiguration { @@ -34,4 +37,12 @@ public class AppConfiguration {
34 public void setUserName(String userName) { 37 public void setUserName(String userName) {
35 this.userName = userName; 38 this.userName = userName;
36 } 39 }
  40 +
  41 + public String getMsgLoggerPath() {
  42 + return msgLoggerPath;
  43 + }
  44 +
  45 + public void setMsgLoggerPath(String msgLoggerPath) {
  46 + this.msgLoggerPath = msgLoggerPath;
  47 + }
37 } 48 }
1 package com.crossoverjie.cim.client.config; 1 package com.crossoverjie.cim.client.config;
2 2
3 import com.crossoverjie.cim.client.handle.MsgHandleCaller; 3 import com.crossoverjie.cim.client.handle.MsgHandleCaller;
  4 +import com.crossoverjie.cim.client.service.impl.MsgCallBackListener;
4 import com.crossoverjie.cim.common.constant.Constants; 5 import com.crossoverjie.cim.common.constant.Constants;
5 import com.crossoverjie.cim.common.protocol.CIMRequestProto; 6 import com.crossoverjie.cim.common.protocol.CIMRequestProto;
6 import com.google.common.util.concurrent.ThreadFactoryBuilder; 7 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -87,9 +88,7 @@ public class BeanConfig { @@ -87,9 +88,7 @@ public class BeanConfig {
87 */ 88 */
88 @Bean 89 @Bean
89 public MsgHandleCaller buildCaller(){ 90 public MsgHandleCaller buildCaller(){
90 - MsgHandleCaller caller = new MsgHandleCaller(msg -> {  
91 - //处理业务逻辑,或者自定义实现接口  
92 - }) ; 91 + MsgHandleCaller caller = new MsgHandleCaller(new MsgCallBackListener()) ;
93 92
94 return caller ; 93 return caller ;
95 } 94 }
1 package com.crossoverjie.cim.client.handle; 1 package com.crossoverjie.cim.client.handle;
2 2
3 import com.crossoverjie.cim.client.util.SpringBeanFactory; 3 import com.crossoverjie.cim.client.util.SpringBeanFactory;
  4 +import com.crossoverjie.cim.common.constant.Constants;
4 import com.crossoverjie.cim.common.protocol.CIMRequestProto; 5 import com.crossoverjie.cim.common.protocol.CIMRequestProto;
5 import com.crossoverjie.cim.common.protocol.CIMResponseProto; 6 import com.crossoverjie.cim.common.protocol.CIMResponseProto;
  7 +import io.netty.channel.ChannelFutureListener;
6 import io.netty.channel.ChannelHandler; 8 import io.netty.channel.ChannelHandler;
7 import io.netty.channel.ChannelHandlerContext; 9 import io.netty.channel.ChannelHandlerContext;
8 import io.netty.channel.SimpleChannelInboundHandler; 10 import io.netty.channel.SimpleChannelInboundHandler;
@@ -39,7 +41,7 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt @@ -39,7 +41,7 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
39 if (idleStateEvent.state() == IdleState.WRITER_IDLE){ 41 if (idleStateEvent.state() == IdleState.WRITER_IDLE){
40 CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat", 42 CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
41 CIMRequestProto.CIMReqProtocol.class); 43 CIMRequestProto.CIMReqProtocol.class);
42 - ctx.writeAndFlush(heartBeat) ; 44 + ctx.writeAndFlush(heartBeat).addListeners(ChannelFutureListener.CLOSE_ON_FAILURE) ;
43 } 45 }
44 46
45 47
@@ -56,15 +58,18 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt @@ -56,15 +58,18 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
56 } 58 }
57 59
58 @Override 60 @Override
59 - protected void channelRead0(ChannelHandlerContext channelHandlerContext, CIMResponseProto.CIMResProtocol responseProtocol) throws Exception { 61 + protected void channelRead0(ChannelHandlerContext channelHandlerContext, CIMResponseProto.CIMResProtocol msg) throws Exception {
60 62
61 //从服务端收到消息时被调用 63 //从服务端收到消息时被调用
62 //LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ; 64 //LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ;
63 65
  66 + if (msg.getType() != Constants.CommandType.PING) {
64 //回调消息 67 //回调消息
65 - callBackMsg(responseProtocol.getResMsg()); 68 + callBackMsg(msg.getResMsg());
  69 +
  70 + LOGGER.info(msg.getResMsg());
  71 + }
66 72
67 - LOGGER.info(responseProtocol.getResMsg());  
68 } 73 }
69 74
70 /** 75 /**
@@ -24,8 +24,8 @@ public class CIMClientHandleInitializer extends ChannelInitializer<Channel> { @@ -24,8 +24,8 @@ public class CIMClientHandleInitializer extends ChannelInitializer<Channel> {
24 @Override 24 @Override
25 protected void initChannel(Channel ch) throws Exception { 25 protected void initChannel(Channel ch) throws Exception {
26 ch.pipeline() 26 ch.pipeline()
27 - //60 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中  
28 - .addLast(new IdleStateHandler(0, 60, 0)) 27 + //30 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
  28 + .addLast(new IdleStateHandler(0, 30, 0))
29 29
30 //心跳解码 30 //心跳解码
31 //.addLast(new HeartbeatEncode()) 31 //.addLast(new HeartbeatEncode())
@@ -2,6 +2,7 @@ package com.crossoverjie.cim.client.scanner; @@ -2,6 +2,7 @@ package com.crossoverjie.cim.client.scanner;
2 2
3 import com.crossoverjie.cim.client.config.AppConfiguration; 3 import com.crossoverjie.cim.client.config.AppConfiguration;
4 import com.crossoverjie.cim.client.service.MsgHandle; 4 import com.crossoverjie.cim.client.service.MsgHandle;
  5 +import com.crossoverjie.cim.client.service.MsgLogger;
5 import com.crossoverjie.cim.client.util.SpringBeanFactory; 6 import com.crossoverjie.cim.client.util.SpringBeanFactory;
6 import org.slf4j.Logger; 7 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory; 8 import org.slf4j.LoggerFactory;
@@ -26,9 +27,12 @@ public class Scan implements Runnable { @@ -26,9 +27,12 @@ public class Scan implements Runnable {
26 27
27 private MsgHandle msgHandle ; 28 private MsgHandle msgHandle ;
28 29
  30 + private MsgLogger msgLogger ;
  31 +
29 public Scan() { 32 public Scan() {
30 this.configuration = SpringBeanFactory.getBean(AppConfiguration.class); 33 this.configuration = SpringBeanFactory.getBean(AppConfiguration.class);
31 this.msgHandle = SpringBeanFactory.getBean(MsgHandle.class) ; 34 this.msgHandle = SpringBeanFactory.getBean(MsgHandle.class) ;
  35 + this.msgLogger = SpringBeanFactory.getBean(MsgLogger.class) ;
32 } 36 }
33 37
34 @Override 38 @Override
@@ -50,6 +54,8 @@ public class Scan implements Runnable { @@ -50,6 +54,8 @@ public class Scan implements Runnable {
50 //真正的发送消息 54 //真正的发送消息
51 msgHandle.sendMsg(msg) ; 55 msgHandle.sendMsg(msg) ;
52 56
  57 + //写入聊天记录
  58 + msgLogger.log(msg) ;
53 59
54 LOGGER.info("{}:【{}】", configuration.getUserName(), msg); 60 LOGGER.info("{}:【{}】", configuration.getUserName(), msg);
55 } 61 }
  1 +package com.crossoverjie.cim.client.service;
  2 +
  3 +/**
  4 + * Function:
  5 + *
  6 + * @author crossoverJie
  7 + * Date: 2019/1/6 15:23
  8 + * @since JDK 1.8
  9 + */
  10 +public interface MsgLogger {
  11 +
  12 + /**
  13 + * 异步写入消息
  14 + * @param msg
  15 + */
  16 + void log(String msg) ;
  17 +
  18 +
  19 + /**
  20 + * 停止写入
  21 + */
  22 + void stop() ;
  23 +
  24 + /**
  25 + * 查询聊天记录
  26 + * @param key 关键字
  27 + * @return
  28 + */
  29 + String query(String key) ;
  30 +}
  1 +package com.crossoverjie.cim.client.service.impl;
  2 +
  3 +import com.crossoverjie.cim.client.config.AppConfiguration;
  4 +import com.crossoverjie.cim.client.service.MsgLogger;
  5 +import org.slf4j.Logger;
  6 +import org.slf4j.LoggerFactory;
  7 +import org.springframework.beans.factory.annotation.Autowired;
  8 +import org.springframework.stereotype.Service;
  9 +
  10 +import java.io.IOException;
  11 +import java.nio.charset.Charset;
  12 +import java.nio.file.*;
  13 +import java.time.LocalDate;
  14 +import java.util.Arrays;
  15 +import java.util.List;
  16 +import java.util.concurrent.ArrayBlockingQueue;
  17 +import java.util.concurrent.BlockingQueue;
  18 +import java.util.stream.Collectors;
  19 +import java.util.stream.Stream;
  20 +
  21 +/**
  22 + * Function:
  23 + *
  24 + * @author crossoverJie
  25 + * Date: 2019/1/6 15:26
  26 + * @since JDK 1.8
  27 + */
  28 +@Service
  29 +public class AsyncMsgLogger implements MsgLogger {
  30 +
  31 + private final static Logger LOGGER = LoggerFactory.getLogger(AsyncMsgLogger.class);
  32 +
  33 + /**
  34 + * The default buffer size.
  35 + */
  36 + private static final int DEFAULT_QUEUE_SIZE = 16;
  37 + private BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(DEFAULT_QUEUE_SIZE);
  38 +
  39 + private volatile boolean started = false;
  40 + private Worker worker = new Worker();
  41 +
  42 + @Autowired
  43 + private AppConfiguration appConfiguration;
  44 +
  45 + @Override
  46 + public void log(String msg) {
  47 + //开始消费
  48 + startMsgLogger();
  49 + try {
  50 + // TODO: 2019/1/6 消息堆满是否阻塞线程?
  51 + blockingQueue.put(msg);
  52 + } catch (InterruptedException e) {
  53 + LOGGER.error("InterruptedException", e);
  54 + }
  55 + }
  56 +
  57 + private class Worker extends Thread {
  58 +
  59 +
  60 + @Override
  61 + public void run() {
  62 + while (started) {
  63 + try {
  64 + String msg = blockingQueue.take();
  65 + writeLog(msg);
  66 + } catch (InterruptedException e) {
  67 + break;
  68 + }
  69 + }
  70 + }
  71 +
  72 + }
  73 +
  74 +
  75 + private void writeLog(String msg) {
  76 +
  77 + LocalDate today = LocalDate.now();
  78 + int year = today.getYear();
  79 + int month = today.getMonthValue();
  80 + int day = today.getDayOfMonth();
  81 +
  82 + String dir = appConfiguration.getMsgLoggerPath() + appConfiguration.getUserName() + "/";
  83 + String fileName = dir + year + month + day + ".log";
  84 +
  85 + Path file = Paths.get(fileName);
  86 + boolean exists = Files.exists(Paths.get(dir), LinkOption.NOFOLLOW_LINKS);
  87 + try {
  88 + if (!exists) {
  89 + Files.createDirectories(Paths.get(dir));
  90 + }
  91 +
  92 + List<String> lines = Arrays.asList(msg);
  93 +
  94 + Files.write(file, lines, Charset.forName("UTF-8"), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
  95 + } catch (IOException e) {
  96 + LOGGER.info("IOException", e);
  97 + }
  98 +
  99 + }
  100 +
  101 + /**
  102 + * 开始工作
  103 + */
  104 + private void startMsgLogger() {
  105 + if (started) {
  106 + return;
  107 + }
  108 +
  109 + worker.setDaemon(true);
  110 + worker.setName("AsyncMsgLogger-Worker");
  111 + started = true;
  112 + worker.start();
  113 + }
  114 +
  115 +
  116 + @Override
  117 + public void stop() {
  118 + started = false;
  119 + worker.interrupt();
  120 + }
  121 +
  122 + @Override
  123 + public String query(String key) {
  124 + StringBuilder sb = new StringBuilder();
  125 +
  126 + Path path = Paths.get(appConfiguration.getMsgLoggerPath() + appConfiguration.getUserName() + "/");
  127 +
  128 + try {
  129 + Stream<Path> list = Files.list(path);
  130 + List<Path> collect = list.collect(Collectors.toList());
  131 + for (Path file : collect) {
  132 + List<String> strings = Files.readAllLines(file);
  133 + for (String msg : strings) {
  134 + if (msg.trim().contains(key)) {
  135 + sb.append(msg).append("\n");
  136 + }
  137 + }
  138 +
  139 + }
  140 + } catch (IOException e) {
  141 + LOGGER.info("IOException", e);
  142 + }
  143 +
  144 + return sb.toString().replace(key, "\033[31;4m" + key + "\033[0m");
  145 + }
  146 +}
  1 +package com.crossoverjie.cim.client.service.impl;
  2 +
  3 +import com.crossoverjie.cim.client.service.CustomMsgHandleListener;
  4 +import com.crossoverjie.cim.client.service.MsgLogger;
  5 +import com.crossoverjie.cim.client.util.SpringBeanFactory;
  6 +
  7 +/**
  8 + * Function:自定义收到消息回调
  9 + *
  10 + * @author crossoverJie
  11 + * Date: 2019/1/6 17:49
  12 + * @since JDK 1.8
  13 + */
  14 +public class MsgCallBackListener implements CustomMsgHandleListener {
  15 +
  16 +
  17 + private MsgLogger msgLogger ;
  18 +
  19 + public MsgCallBackListener() {
  20 + this.msgLogger = SpringBeanFactory.getBean(MsgLogger.class) ;
  21 + }
  22 +
  23 + @Override
  24 + public void handle(String msg) {
  25 + msgLogger.log(msg) ;
  26 + }
  27 +}
@@ -3,10 +3,12 @@ package com.crossoverjie.cim.client.service.impl; @@ -3,10 +3,12 @@ package com.crossoverjie.cim.client.service.impl;
3 import com.crossoverjie.cim.client.client.CIMClient; 3 import com.crossoverjie.cim.client.client.CIMClient;
4 import com.crossoverjie.cim.client.config.AppConfiguration; 4 import com.crossoverjie.cim.client.config.AppConfiguration;
5 import com.crossoverjie.cim.client.service.MsgHandle; 5 import com.crossoverjie.cim.client.service.MsgHandle;
  6 +import com.crossoverjie.cim.client.service.MsgLogger;
6 import com.crossoverjie.cim.client.service.RouteRequest; 7 import com.crossoverjie.cim.client.service.RouteRequest;
7 import com.crossoverjie.cim.client.vo.req.GroupReqVO; 8 import com.crossoverjie.cim.client.vo.req.GroupReqVO;
8 import com.crossoverjie.cim.client.vo.req.P2PReqVO; 9 import com.crossoverjie.cim.client.vo.req.P2PReqVO;
9 import com.crossoverjie.cim.client.vo.res.OnlineUsersResVO; 10 import com.crossoverjie.cim.client.vo.res.OnlineUsersResVO;
  11 +import com.crossoverjie.cim.common.data.construct.TrieTree;
10 import com.crossoverjie.cim.common.enums.SystemCommandEnumType; 12 import com.crossoverjie.cim.common.enums.SystemCommandEnumType;
11 import com.crossoverjie.cim.common.util.StringUtil; 13 import com.crossoverjie.cim.common.util.StringUtil;
12 import org.slf4j.Logger; 14 import org.slf4j.Logger;
@@ -41,8 +43,25 @@ public class MsgHandler implements MsgHandle { @@ -41,8 +43,25 @@ public class MsgHandler implements MsgHandle {
41 @Autowired 43 @Autowired
42 private CIMClient cimClient ; 44 private CIMClient cimClient ;
43 45
  46 + @Autowired
  47 + private MsgLogger msgLogger ;
  48 +
  49 + private boolean aiModel = false ;
  50 +
44 @Override 51 @Override
45 public void sendMsg(String msg) { 52 public void sendMsg(String msg) {
  53 + if (aiModel){
  54 + aiChat(msg);
  55 + }else {
  56 + normalChat(msg);
  57 + }
  58 + }
  59 +
  60 + /**
  61 + * 正常聊天
  62 + * @param msg
  63 + */
  64 + private void normalChat(String msg) {
46 String[] totalMsg = msg.split(";;"); 65 String[] totalMsg = msg.split(";;");
47 if (totalMsg.length > 1) { 66 if (totalMsg.length > 1) {
48 //私聊 67 //私聊
@@ -67,6 +86,19 @@ public class MsgHandler implements MsgHandle { @@ -67,6 +86,19 @@ public class MsgHandler implements MsgHandle {
67 } 86 }
68 } 87 }
69 88
  89 + /**
  90 + * AI model
  91 + * @param msg
  92 + */
  93 + private void aiChat(String msg) {
  94 + msg = msg.replace("吗","") ;
  95 + msg = msg.replace("嘛","") ;
  96 + msg = msg.replace("?","!");
  97 + msg = msg.replace("?","!");
  98 + msg = msg.replace("你","我");
  99 + System.out.println("AI:\033[31;4m" + msg + "\033[0m");
  100 + }
  101 +
70 @Override 102 @Override
71 public void groupChat(GroupReqVO groupReqVO) throws Exception { 103 public void groupChat(GroupReqVO groupReqVO) throws Exception {
72 routeRequest.sendGroupMsg(groupReqVO); 104 routeRequest.sendGroupMsg(groupReqVO);
@@ -104,6 +136,20 @@ public class MsgHandler implements MsgHandle { @@ -104,6 +136,20 @@ public class MsgHandler implements MsgHandle {
104 //打印在线用户 136 //打印在线用户
105 printOnlineUsers(); 137 printOnlineUsers();
106 138
  139 + } else if (msg.startsWith(SystemCommandEnumType.QUERY.getCommandType().trim() + " ")){
  140 + //查询聊天记录
  141 + queryChatHistory(msg);
  142 + }else if (SystemCommandEnumType.AI.getCommandType().trim().equals(msg.toLowerCase())){
  143 + //开启 AI 模式
  144 + aiModel = true ;
  145 + System.out.println("\033[31;4m" + "Hello,我是估值两亿的 AI 机器人!" + "\033[0m");
  146 + }else if (SystemCommandEnumType.QAI.getCommandType().trim().equals(msg.toLowerCase())){
  147 + //关闭 AI 模式
  148 + aiModel = false ;
  149 + System.out.println("\033[31;4m" + "。゚(゚´ω`゚)゚。 AI 下线了!" + "\033[0m");
  150 + }else if (msg.startsWith(SystemCommandEnumType.PREFIX.getCommandType().trim() + " ")){
  151 + //模糊匹配
  152 + prefixSearch(msg);
107 }else { 153 }else {
108 printAllCommand(allStatusCode); 154 printAllCommand(allStatusCode);
109 } 155 }
@@ -117,6 +163,43 @@ public class MsgHandler implements MsgHandle { @@ -117,6 +163,43 @@ public class MsgHandler implements MsgHandle {
117 163
118 } 164 }
119 165
  166 +
  167 + /**
  168 + * 模糊匹配
  169 + * @param msg
  170 + */
  171 + private void prefixSearch(String msg) {
  172 + try {
  173 + List<OnlineUsersResVO.DataBodyBean> onlineUsers = routeRequest.onlineUsers();
  174 + TrieTree trieTree = new TrieTree() ;
  175 + for (OnlineUsersResVO.DataBodyBean onlineUser : onlineUsers) {
  176 + trieTree.insert(onlineUser.getUserName());
  177 + }
  178 +
  179 + String[] split = msg.split(" ");
  180 + String key = split[1];
  181 + List<String> list = trieTree.prefixSearch(key);
  182 +
  183 + for (String res : list) {
  184 + res = res.replace(key, "\033[31;4m" + key + "\033[0m");
  185 + System.out.println(res);
  186 + }
  187 +
  188 + } catch (Exception e) {
  189 + LOGGER.error("Exception" ,e);
  190 + }
  191 + }
  192 +
  193 + /**
  194 + * 查询聊天记录
  195 + * @param msg
  196 + */
  197 + private void queryChatHistory(String msg) {
  198 + String[] split = msg.split(" ") ;
  199 + String res = msgLogger.query(split[1]);
  200 + System.out.println(res);
  201 + }
  202 +
120 /** 203 /**
121 * 打印在线用户 204 * 打印在线用户
122 */ 205 */
@@ -140,6 +223,7 @@ public class MsgHandler implements MsgHandle { @@ -140,6 +223,7 @@ public class MsgHandler implements MsgHandle {
140 */ 223 */
141 private void shutdown() { 224 private void shutdown() {
142 LOGGER.info("系统关闭中。。。。"); 225 LOGGER.info("系统关闭中。。。。");
  226 + msgLogger.stop();
143 executor.shutdown(); 227 executor.shutdown();
144 try { 228 try {
145 while (!executor.awaitTermination(1, TimeUnit.SECONDS)) { 229 while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
@@ -69,9 +69,13 @@ public class RouteRequestImpl implements RouteRequest { @@ -69,9 +69,13 @@ public class RouteRequestImpl implements RouteRequest {
69 .build(); 69 .build();
70 70
71 Response response = okHttpClient.newCall(request).execute() ; 71 Response response = okHttpClient.newCall(request).execute() ;
  72 + try {
72 if (!response.isSuccessful()){ 73 if (!response.isSuccessful()){
73 throw new IOException("Unexpected code " + response); 74 throw new IOException("Unexpected code " + response);
74 } 75 }
  76 + }finally {
  77 + response.body().close();
  78 + }
75 } 79 }
76 80
77 @Override 81 @Override
@@ -92,13 +96,19 @@ public class RouteRequestImpl implements RouteRequest { @@ -92,13 +96,19 @@ public class RouteRequestImpl implements RouteRequest {
92 throw new IOException("Unexpected code " + response); 96 throw new IOException("Unexpected code " + response);
93 } 97 }
94 98
95 - String json = response.body().string() ; 99 + ResponseBody body = response.body();
  100 + try {
  101 + String json = body.string() ;
96 BaseResponse baseResponse = JSON.parseObject(json, BaseResponse.class); 102 BaseResponse baseResponse = JSON.parseObject(json, BaseResponse.class);
97 103
98 //选择的账号不存在 104 //选择的账号不存在
99 if (baseResponse.getCode().equals(StatusEnum.OFF_LINE.getCode())){ 105 if (baseResponse.getCode().equals(StatusEnum.OFF_LINE.getCode())){
100 LOGGER.error(p2PReqVO.getReceiveUserId() + ":" + StatusEnum.OFF_LINE.getMessage()); 106 LOGGER.error(p2PReqVO.getReceiveUserId() + ":" + StatusEnum.OFF_LINE.getMessage());
101 } 107 }
  108 +
  109 + }finally {
  110 + body.close();
  111 + }
102 } 112 }
103 113
104 @Override 114 @Override
@@ -118,20 +128,24 @@ public class RouteRequestImpl implements RouteRequest { @@ -118,20 +128,24 @@ public class RouteRequestImpl implements RouteRequest {
118 if (!response.isSuccessful()){ 128 if (!response.isSuccessful()){
119 throw new IOException("Unexpected code " + response); 129 throw new IOException("Unexpected code " + response);
120 } 130 }
  131 + CIMServerResVO cimServerResVO ;
  132 + ResponseBody body = response.body();
  133 + try {
  134 + String json = body.string();
  135 + cimServerResVO = JSON.parseObject(json, CIMServerResVO.class);
121 136
122 - String json = response.body().string();  
123 - CIMServerResVO cimServerResVO = JSON.parseObject(json, CIMServerResVO.class);  
124 -  
125 - //重复登录  
126 - if (cimServerResVO.getCode().equals(StatusEnum.REPEAT_LOGIN.getCode())){  
127 - LOGGER.error(appConfiguration.getUserName() + ":" + StatusEnum.REPEAT_LOGIN.getMessage()); 137 + //重复失败
  138 + if (!cimServerResVO.getCode().equals(StatusEnum.SUCCESS.getCode())){
  139 + LOGGER.error(appConfiguration.getUserName() + ":" + cimServerResVO.getMessage());
128 System.exit(-1); 140 System.exit(-1);
129 } 141 }
130 142
131 - if (!cimServerResVO.getCode().equals(StatusEnum.SUCCESS.getCode())){  
132 - throw new RuntimeException("route server exception code=" + cimServerResVO.getCode()) ; 143 + }finally {
  144 + body.close();
133 } 145 }
134 146
  147 +
  148 +
135 return cimServerResVO.getDataBody(); 149 return cimServerResVO.getDataBody();
136 } 150 }
137 151
@@ -152,8 +166,15 @@ public class RouteRequestImpl implements RouteRequest { @@ -152,8 +166,15 @@ public class RouteRequestImpl implements RouteRequest {
152 } 166 }
153 167
154 168
155 - String json = response.body().string() ;  
156 - OnlineUsersResVO onlineUsersResVO = JSON.parseObject(json, OnlineUsersResVO.class); 169 + ResponseBody body = response.body();
  170 + OnlineUsersResVO onlineUsersResVO ;
  171 + try {
  172 + String json = body.string() ;
  173 + onlineUsersResVO = JSON.parseObject(json, OnlineUsersResVO.class);
  174 +
  175 + }finally {
  176 + body.close();
  177 + }
157 178
158 return onlineUsersResVO.getDataBody(); 179 return onlineUsersResVO.getDataBody();
159 } 180 }
1 -spring.application.name=netty-heartbeat-client 1 +spring.application.name=cim-client
2 2
3 # web port 3 # web port
4 server.port=8082 4 server.port=8082
@@ -8,6 +8,9 @@ swagger.enable = true @@ -8,6 +8,9 @@ swagger.enable = true
8 8
9 logging.level.root=info 9 logging.level.root=info
10 10
  11 +#消息记录存放路径
  12 +cim.msg.logger.path = /opt/logs/cim/
  13 +
11 14
12 ###=======生产模拟======### 15 ###=======生产模拟======###
13 # 群发消息 16 # 群发消息
  1 +package com.crossoverjie.cim.client.service.impl;
  2 +
  3 +import com.crossoverjie.cim.client.CIMClientApplication;
  4 +import com.crossoverjie.cim.client.service.MsgLogger;
  5 +import org.junit.Test;
  6 +import org.junit.runner.RunWith;
  7 +import org.springframework.beans.factory.annotation.Autowired;
  8 +import org.springframework.boot.test.context.SpringBootTest;
  9 +import org.springframework.test.context.junit4.SpringRunner;
  10 +
  11 +import java.util.concurrent.TimeUnit;
  12 +
  13 +@SpringBootTest(classes = CIMClientApplication.class)
  14 +@RunWith(SpringRunner.class)
  15 +public class AsyncMsgLoggerTest {
  16 +
  17 +
  18 +
  19 + @Autowired
  20 + private MsgLogger msgLogger ;
  21 +
  22 + @Test
  23 + public void writeLog() throws Exception {
  24 + for (int i = 0; i < 10; i++) {
  25 + msgLogger.log("zhangsan:【asdsd】" + i);
  26 + }
  27 +
  28 + TimeUnit.SECONDS.sleep(2);
  29 + }
  30 +
  31 +
  32 +
  33 + @Test
  34 + public void query(){
  35 + String crossoverJie = msgLogger.query("crossoverJie");
  36 + System.out.println(crossoverJie);
  37 + }
  38 +
  39 +}
@@ -8,7 +8,12 @@ import org.junit.Test; @@ -8,7 +8,12 @@ import org.junit.Test;
8 import org.slf4j.Logger; 8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory; 9 import org.slf4j.LoggerFactory;
10 10
  11 +import java.io.IOException;
  12 +import java.nio.charset.Charset;
  13 +import java.nio.file.*;
  14 +import java.time.LocalDate;
11 import java.util.ArrayList; 15 import java.util.ArrayList;
  16 +import java.util.Arrays;
12 import java.util.List; 17 import java.util.List;
13 18
14 /** 19 /**
@@ -60,4 +65,119 @@ public class CommonTest { @@ -60,4 +65,119 @@ public class CommonTest {
60 } 65 }
61 LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); 66 LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
62 } 67 }
  68 +
  69 +
  70 + @Test
  71 + public void searchMsg(){
  72 + StringBuilder sb = new StringBuilder() ;
  73 + String allMsg = "于是在之前的基础上我完善了一些内容,先来看看这个项目的介绍吧:\n" +
  74 + "\n" +
  75 + "CIM(CROSS-IM) 一款面向开发者的 IM(即时通讯)系统;同时提供了一些组件帮助开发者构建一款属于自己可水平扩展的 IM 。\n" +
  76 + "\n" +
  77 + "借助 CIM 你可以实现以下需求:" ;
  78 +
  79 + String key = "IM" ;
  80 +
  81 + String[] split = allMsg.split("\n");
  82 + for (String msg : split) {
  83 + if (msg.trim().contains(key)){
  84 + sb.append(msg).append("\n") ;
  85 + }
  86 + }
  87 + int pos = 0;
  88 +
  89 + String result = sb.toString();
  90 +
  91 + int count = 1 ;
  92 + int multiple = 2 ;
  93 + while((pos = result.indexOf(key, pos)) >= 0) {
  94 +
  95 + LOGGER.info("{},{}",pos, pos + key.length());
  96 +
  97 + if (count == 1){
  98 + sb.insert(pos,"**");
  99 + }else {
  100 + Double pow = Math.pow(multiple, count);
  101 + sb.insert(pos +pow.intValue(),"**");
  102 + }
  103 +
  104 + pos += key.length();
  105 +
  106 + if (count == 1){
  107 + sb.insert(pos +2,"**");
  108 + }else {
  109 + Double pow = Math.pow(multiple, count);
  110 + sb.insert((pos +2) + pow.intValue(),"**");
  111 +
  112 + }
  113 +
  114 +
  115 + count ++ ;
  116 + }
  117 +
  118 + System.out.println(sb);
  119 + }
  120 + @Test
  121 + public void searchMsg2(){
  122 + StringBuilder sb = new StringBuilder() ;
  123 + String allMsg = "于是在之前的基础上我完善了一些内容,先来看看这个项目的介绍吧:\n" +
  124 + "\n" +
  125 + "CIM(CROSS-IM) 一款面向开发者的 IM(即时通讯)系统;同时提供了一些组件帮助开发者构建一款属于自己可水平扩展的 IM 。\n" +
  126 + "\n" +
  127 + "借助 CIM 你可以实现以下需求:" ;
  128 +
  129 + String key = "CIM" ;
  130 +
  131 + String[] split = allMsg.split("\n");
  132 + for (String msg : split) {
  133 + if (msg.trim().contains(key)){
  134 + sb.append(msg).append("\n") ;
  135 + }
  136 + }
  137 + int pos = 0;
  138 +
  139 + String result = sb.toString();
  140 +
  141 + int count = 1 ;
  142 + int multiple = 2 ;
  143 + while((pos = result.indexOf(key, pos)) >= 0) {
  144 +
  145 + LOGGER.info("{},{}",pos, pos + key.length());
  146 +
  147 + pos += key.length();
  148 +
  149 +
  150 + count ++ ;
  151 + }
  152 +
  153 + System.out.println(sb.toString());
  154 + System.out.println(sb.toString().replace(key,"\033[31;4m" + key+"\033[0m"));
  155 + }
  156 +
  157 + @Test
  158 + public void log(){
  159 + String msg = "hahahdsadsd" ;
  160 + LocalDate today = LocalDate.now();
  161 + int year = today.getYear();
  162 + int month = today.getMonthValue();
  163 + int day = today.getDayOfMonth();
  164 +
  165 + String dir = "/opt/logs/cim/zhangsan" + "/";
  166 + String fileName = dir + year + month + day + ".log";
  167 + LOGGER.info("fileName={}", fileName);
  168 +
  169 + Path file = Paths.get(fileName);
  170 + boolean exists = Files.exists(Paths.get(dir), LinkOption.NOFOLLOW_LINKS);
  171 + try {
  172 + if (!exists) {
  173 + Files.createDirectories(Paths.get(dir));
  174 + }
  175 +
  176 + List<String> lines = Arrays.asList(msg);
  177 +
  178 + Files.write(file, lines, Charset.forName("UTF-8"), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
  179 + } catch (IOException e) {
  180 + LOGGER.info("IOException", e);
  181 + }
  182 + }
63 } 183 }
  1 +package com.crossoverjie.cim.common.data.construct;
  2 +
  3 +import com.crossoverjie.cim.common.util.StringUtil;
  4 +
  5 +import java.util.ArrayList;
  6 +import java.util.List;
  7 +
  8 +/**
  9 + * Function:字典树字符前缀模糊匹配
  10 + *
  11 + * @author crossoverJie
  12 + * Date: 2019/1/7 18:58
  13 + * @since JDK 1.8
  14 + */
  15 +public class TrieTree {
  16 +
  17 + /**
  18 + * 大小写都可保存
  19 + */
  20 + private static final int CHILDREN_LENGTH = 26 * 2;
  21 + private static final int MAX_CHAR_LENGTH = 16;
  22 +
  23 + private static final char UPPERCASE_STAR = 'A';
  24 +
  25 + /**
  26 + * 小写就要 -71
  27 + */
  28 + private static final char LOWERCASE_STAR = 'G';
  29 +
  30 + private Node root;
  31 +
  32 + public TrieTree() {
  33 + root = new Node();
  34 + }
  35 +
  36 + /**
  37 + * 写入
  38 + *
  39 + * @param data
  40 + */
  41 + public void insert(String data) {
  42 + this.insert(this.root, data);
  43 + }
  44 +
  45 + private void insert(Node root, String data) {
  46 + char[] chars = data.toCharArray();
  47 + for (int i = 0; i < chars.length; i++) {
  48 + char aChar = chars[i];
  49 + int index;
  50 + if (Character.isUpperCase(aChar)) {
  51 + index = aChar - UPPERCASE_STAR;
  52 + } else {
  53 + //小写就要 -71
  54 + index = aChar - LOWERCASE_STAR;
  55 + }
  56 +
  57 +
  58 + if (index >= 0 && index < CHILDREN_LENGTH) {
  59 + if (root.children[index] == null) {
  60 + Node node = new Node();
  61 + root.children[index] = node;
  62 + root.children[index].data = chars[i];
  63 +
  64 + }
  65 +
  66 + //最后一个字符设置标志
  67 + if (i + 1 == chars.length) {
  68 + root.children[index].isEnd = true;
  69 + }
  70 +
  71 + //指向下一节点
  72 + root = root.children[index];
  73 + }
  74 +
  75 + }
  76 + }
  77 +
  78 +
  79 + /**
  80 + * 递归深度遍历
  81 + *
  82 + * @param key
  83 + * @return
  84 + */
  85 + public List<String> prefixSearch(String key) {
  86 + List<String> value = new ArrayList<String>();
  87 + if (StringUtil.isEmpty(key)) {
  88 + return value;
  89 + }
  90 +
  91 + char k = key.charAt(0);
  92 + int index;
  93 + if (Character.isUpperCase(k)) {
  94 + index = k - UPPERCASE_STAR;
  95 + } else {
  96 + index = k - LOWERCASE_STAR;
  97 +
  98 + }
  99 + if (root.children != null && root.children[index] != null) {
  100 + return query(root.children[index], value,
  101 + key.substring(1), String.valueOf(k));
  102 + }
  103 + return value;
  104 + }
  105 +
  106 + private List<String> query(Node child, List<String> value, String key, String result) {
  107 +
  108 + if (child.isEnd && key == null) {
  109 + value.add(result);
  110 + }
  111 + if (StringUtil.isNotEmpty(key)) {
  112 + char ca = key.charAt(0);
  113 +
  114 + int index;
  115 + if (Character.isUpperCase(ca)) {
  116 + index = ca - UPPERCASE_STAR;
  117 + } else {
  118 + index = ca - LOWERCASE_STAR;
  119 +
  120 + }
  121 +
  122 + if (child.children[index] != null) {
  123 + query(child.children[index], value, key.substring(1).equals("") ? null : key.substring(1), result + ca);
  124 + }
  125 + } else {
  126 + for (int i = 0; i < CHILDREN_LENGTH; i++) {
  127 + if (child.children[i] == null) {
  128 + continue;
  129 + }
  130 +
  131 + int j;
  132 + if (Character.isUpperCase(child.children[i].data)) {
  133 + j = UPPERCASE_STAR + i;
  134 + } else {
  135 + j = LOWERCASE_STAR + i;
  136 + }
  137 +
  138 + char temp = (char) j;
  139 + query(child.children[i], value, null, result + temp);
  140 + }
  141 + }
  142 +
  143 + return value;
  144 + }
  145 +
  146 +
  147 + /**
  148 + * 查询所有
  149 + *
  150 + * @return
  151 + */
  152 + public List<String> all() {
  153 + char[] chars = new char[MAX_CHAR_LENGTH];
  154 + List<String> value = depth(this.root, new ArrayList<String>(), chars, 0);
  155 + return value;
  156 + }
  157 +
  158 +
  159 + public List<String> depth(Node node, List<String> list, char[] chars, int index) {
  160 + if (node.children == null || node.children.length == 0) {
  161 + return list;
  162 + }
  163 +
  164 + Node[] children = node.children;
  165 +
  166 + for (int i = 0; i < children.length; i++) {
  167 + Node child = children[i];
  168 +
  169 + if (child == null) {
  170 + continue;
  171 + }
  172 +
  173 + if (child.isEnd) {
  174 + chars[index] = child.data;
  175 +
  176 + char[] temp = new char[index + 1];
  177 + for (int j = 0; j < chars.length; j++) {
  178 + if (chars[j] == 0) {
  179 + continue;
  180 + }
  181 +
  182 + temp[j] = chars[j];
  183 + }
  184 + list.add(String.valueOf(temp));
  185 + return list;
  186 + } else {
  187 + chars[index] = child.data;
  188 +
  189 + index++;
  190 +
  191 + depth(child, list, chars, index);
  192 +
  193 + index = 0;
  194 + }
  195 + }
  196 +
  197 +
  198 + return list;
  199 + }
  200 +
  201 +
  202 + /**
  203 + * 字典树节点
  204 + */
  205 + private class Node {
  206 + /**
  207 + * 是否为最后一个字符
  208 + */
  209 + public boolean isEnd = false;
  210 +
  211 + /**
  212 + * 如果支持查询,则不需要存储数据
  213 + */
  214 + public char data;
  215 +
  216 + public Node[] children = new Node[CHILDREN_LENGTH];
  217 +
  218 + }
  219 +}
@@ -3,6 +3,10 @@ package com.crossoverjie.cim.common.enums; @@ -3,6 +3,10 @@ package com.crossoverjie.cim.common.enums;
3 import java.util.ArrayList; 3 import java.util.ArrayList;
4 import java.util.List; 4 import java.util.List;
5 5
  6 +/**
  7 + * @author crossoverJie
  8 + */
  9 +
6 public enum StatusEnum { 10 public enum StatusEnum {
7 11
8 /** 成功 */ 12 /** 成功 */
@@ -20,6 +24,9 @@ public enum StatusEnum { @@ -20,6 +24,9 @@ public enum StatusEnum {
20 /** 账号不在线 */ 24 /** 账号不在线 */
21 OFF_LINE("7000", "你选择的账号不在线,请重新选择!"), 25 OFF_LINE("7000", "你选择的账号不在线,请重新选择!"),
22 26
  27 + /** 登录信息不匹配 */
  28 + ACCOUNT_NOT_MATCH("9100", "登录信息不匹配!"),
  29 +
23 /** 请求限流 */ 30 /** 请求限流 */
24 REQUEST_LIMIT("6000", "请求限流"), 31 REQUEST_LIMIT("6000", "请求限流"),
25 ; 32 ;
@@ -15,8 +15,12 @@ import java.util.Map; @@ -15,8 +15,12 @@ import java.util.Map;
15 public enum SystemCommandEnumType { 15 public enum SystemCommandEnumType {
16 16
17 ALL(":all ","获取所有命令"), 17 ALL(":all ","获取所有命令"),
18 - ONLINE_USER(":olu","获取所有在线用户"),  
19 - QUIT(":q ","退出程序") 18 + ONLINE_USER(":olu ","获取所有在线用户"),
  19 + QUIT(":q! ","退出程序"),
  20 + QUERY(":q ","【:q 关键字】查询聊天记录"),
  21 + AI(":ai ","开启 AI 模式"),
  22 + QAI(":qai ","关闭 AI 模式"),
  23 + PREFIX(":pu ","模糊匹配用户")
20 24
21 ; 25 ;
22 26
@@ -40,6 +40,15 @@ public final class CIMResponseProto { @@ -40,6 +40,15 @@ public final class CIMResponseProto {
40 */ 40 */
41 com.google.protobuf.ByteString 41 com.google.protobuf.ByteString
42 getResMsgBytes(); 42 getResMsgBytes();
  43 +
  44 + /**
  45 + * <code>required int32 type = 3;</code>
  46 + */
  47 + boolean hasType();
  48 + /**
  49 + * <code>required int32 type = 3;</code>
  50 + */
  51 + int getType();
43 } 52 }
44 /** 53 /**
45 * Protobuf type {@code protocol.CIMResProtocol} 54 * Protobuf type {@code protocol.CIMResProtocol}
@@ -56,6 +65,7 @@ public final class CIMResponseProto { @@ -56,6 +65,7 @@ public final class CIMResponseProto {
56 private CIMResProtocol() { 65 private CIMResProtocol() {
57 responseId_ = 0L; 66 responseId_ = 0L;
58 resMsg_ = ""; 67 resMsg_ = "";
  68 + type_ = 0;
59 } 69 }
60 70
61 @Override 71 @Override
@@ -100,6 +110,11 @@ public final class CIMResponseProto { @@ -100,6 +110,11 @@ public final class CIMResponseProto {
100 responseId_ = input.readInt64(); 110 responseId_ = input.readInt64();
101 break; 111 break;
102 } 112 }
  113 + case 24: {
  114 + bitField0_ |= 0x00000004;
  115 + type_ = input.readInt32();
  116 + break;
  117 + }
103 } 118 }
104 } 119 }
105 } catch (com.google.protobuf.InvalidProtocolBufferException e) { 120 } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -182,6 +197,21 @@ public final class CIMResponseProto { @@ -182,6 +197,21 @@ public final class CIMResponseProto {
182 } 197 }
183 } 198 }
184 199
  200 + public static final int TYPE_FIELD_NUMBER = 3;
  201 + private int type_;
  202 + /**
  203 + * <code>required int32 type = 3;</code>
  204 + */
  205 + public boolean hasType() {
  206 + return ((bitField0_ & 0x00000004) == 0x00000004);
  207 + }
  208 + /**
  209 + * <code>required int32 type = 3;</code>
  210 + */
  211 + public int getType() {
  212 + return type_;
  213 + }
  214 +
185 private byte memoizedIsInitialized = -1; 215 private byte memoizedIsInitialized = -1;
186 public final boolean isInitialized() { 216 public final boolean isInitialized() {
187 byte isInitialized = memoizedIsInitialized; 217 byte isInitialized = memoizedIsInitialized;
@@ -196,6 +226,10 @@ public final class CIMResponseProto { @@ -196,6 +226,10 @@ public final class CIMResponseProto {
196 memoizedIsInitialized = 0; 226 memoizedIsInitialized = 0;
197 return false; 227 return false;
198 } 228 }
  229 + if (!hasType()) {
  230 + memoizedIsInitialized = 0;
  231 + return false;
  232 + }
199 memoizedIsInitialized = 1; 233 memoizedIsInitialized = 1;
200 return true; 234 return true;
201 } 235 }
@@ -208,6 +242,9 @@ public final class CIMResponseProto { @@ -208,6 +242,9 @@ public final class CIMResponseProto {
208 if (((bitField0_ & 0x00000001) == 0x00000001)) { 242 if (((bitField0_ & 0x00000001) == 0x00000001)) {
209 output.writeInt64(2, responseId_); 243 output.writeInt64(2, responseId_);
210 } 244 }
  245 + if (((bitField0_ & 0x00000004) == 0x00000004)) {
  246 + output.writeInt32(3, type_);
  247 + }
211 unknownFields.writeTo(output); 248 unknownFields.writeTo(output);
212 } 249 }
213 250
@@ -223,6 +260,10 @@ public final class CIMResponseProto { @@ -223,6 +260,10 @@ public final class CIMResponseProto {
223 size += com.google.protobuf.CodedOutputStream 260 size += com.google.protobuf.CodedOutputStream
224 .computeInt64Size(2, responseId_); 261 .computeInt64Size(2, responseId_);
225 } 262 }
  263 + if (((bitField0_ & 0x00000004) == 0x00000004)) {
  264 + size += com.google.protobuf.CodedOutputStream
  265 + .computeInt32Size(3, type_);
  266 + }
226 size += unknownFields.getSerializedSize(); 267 size += unknownFields.getSerializedSize();
227 memoizedSize = size; 268 memoizedSize = size;
228 return size; 269 return size;
@@ -249,6 +290,11 @@ public final class CIMResponseProto { @@ -249,6 +290,11 @@ public final class CIMResponseProto {
249 result = result && getResMsg() 290 result = result && getResMsg()
250 .equals(other.getResMsg()); 291 .equals(other.getResMsg());
251 } 292 }
  293 + result = result && (hasType() == other.hasType());
  294 + if (hasType()) {
  295 + result = result && (getType()
  296 + == other.getType());
  297 + }
252 result = result && unknownFields.equals(other.unknownFields); 298 result = result && unknownFields.equals(other.unknownFields);
253 return result; 299 return result;
254 } 300 }
@@ -269,6 +315,10 @@ public final class CIMResponseProto { @@ -269,6 +315,10 @@ public final class CIMResponseProto {
269 hash = (37 * hash) + RESMSG_FIELD_NUMBER; 315 hash = (37 * hash) + RESMSG_FIELD_NUMBER;
270 hash = (53 * hash) + getResMsg().hashCode(); 316 hash = (53 * hash) + getResMsg().hashCode();
271 } 317 }
  318 + if (hasType()) {
  319 + hash = (37 * hash) + TYPE_FIELD_NUMBER;
  320 + hash = (53 * hash) + getType();
  321 + }
272 hash = (29 * hash) + unknownFields.hashCode(); 322 hash = (29 * hash) + unknownFields.hashCode();
273 memoizedHashCode = hash; 323 memoizedHashCode = hash;
274 return hash; 324 return hash;
@@ -402,6 +452,8 @@ public final class CIMResponseProto { @@ -402,6 +452,8 @@ public final class CIMResponseProto {
402 bitField0_ = (bitField0_ & ~0x00000001); 452 bitField0_ = (bitField0_ & ~0x00000001);
403 resMsg_ = ""; 453 resMsg_ = "";
404 bitField0_ = (bitField0_ & ~0x00000002); 454 bitField0_ = (bitField0_ & ~0x00000002);
  455 + type_ = 0;
  456 + bitField0_ = (bitField0_ & ~0x00000004);
405 return this; 457 return this;
406 } 458 }
407 459
@@ -434,6 +486,10 @@ public final class CIMResponseProto { @@ -434,6 +486,10 @@ public final class CIMResponseProto {
434 to_bitField0_ |= 0x00000002; 486 to_bitField0_ |= 0x00000002;
435 } 487 }
436 result.resMsg_ = resMsg_; 488 result.resMsg_ = resMsg_;
  489 + if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
  490 + to_bitField0_ |= 0x00000004;
  491 + }
  492 + result.type_ = type_;
437 result.bitField0_ = to_bitField0_; 493 result.bitField0_ = to_bitField0_;
438 onBuilt(); 494 onBuilt();
439 return result; 495 return result;
@@ -484,6 +540,9 @@ public final class CIMResponseProto { @@ -484,6 +540,9 @@ public final class CIMResponseProto {
484 resMsg_ = other.resMsg_; 540 resMsg_ = other.resMsg_;
485 onChanged(); 541 onChanged();
486 } 542 }
  543 + if (other.hasType()) {
  544 + setType(other.getType());
  545 + }
487 this.mergeUnknownFields(other.unknownFields); 546 this.mergeUnknownFields(other.unknownFields);
488 onChanged(); 547 onChanged();
489 return this; 548 return this;
@@ -496,6 +555,9 @@ public final class CIMResponseProto { @@ -496,6 +555,9 @@ public final class CIMResponseProto {
496 if (!hasResMsg()) { 555 if (!hasResMsg()) {
497 return false; 556 return false;
498 } 557 }
  558 + if (!hasType()) {
  559 + return false;
  560 + }
499 return true; 561 return true;
500 } 562 }
501 563
@@ -625,6 +687,38 @@ public final class CIMResponseProto { @@ -625,6 +687,38 @@ public final class CIMResponseProto {
625 onChanged(); 687 onChanged();
626 return this; 688 return this;
627 } 689 }
  690 +
  691 + private int type_ ;
  692 + /**
  693 + * <code>required int32 type = 3;</code>
  694 + */
  695 + public boolean hasType() {
  696 + return ((bitField0_ & 0x00000004) == 0x00000004);
  697 + }
  698 + /**
  699 + * <code>required int32 type = 3;</code>
  700 + */
  701 + public int getType() {
  702 + return type_;
  703 + }
  704 + /**
  705 + * <code>required int32 type = 3;</code>
  706 + */
  707 + public Builder setType(int value) {
  708 + bitField0_ |= 0x00000004;
  709 + type_ = value;
  710 + onChanged();
  711 + return this;
  712 + }
  713 + /**
  714 + * <code>required int32 type = 3;</code>
  715 + */
  716 + public Builder clearType() {
  717 + bitField0_ = (bitField0_ & ~0x00000004);
  718 + type_ = 0;
  719 + onChanged();
  720 + return this;
  721 + }
628 public final Builder setUnknownFields( 722 public final Builder setUnknownFields(
629 final com.google.protobuf.UnknownFieldSet unknownFields) { 723 final com.google.protobuf.UnknownFieldSet unknownFields) {
630 return super.setUnknownFields(unknownFields); 724 return super.setUnknownFields(unknownFields);
@@ -688,10 +782,11 @@ public final class CIMResponseProto { @@ -688,10 +782,11 @@ public final class CIMResponseProto {
688 descriptor; 782 descriptor;
689 static { 783 static {
690 String[] descriptorData = { 784 String[] descriptorData = {
691 - "\n\027BaseResponseProto.proto\022\010protocol\"4\n\016C" + 785 + "\n\027BaseResponseProto.proto\022\010protocol\"B\n\016C" +
692 "IMResProtocol\022\022\n\nresponseId\030\002 \002(\003\022\016\n\006res" + 786 "IMResProtocol\022\022\n\nresponseId\030\002 \002(\003\022\016\n\006res" +
693 - "Msg\030\001 \002(\tB8\n$com.crossoverjie.cim.common" +  
694 - ".protocolB\020CIMResponseProto" 787 + "Msg\030\001 \002(\t\022\014\n\004type\030\003 \002(\005B8\n$com.crossover" +
  788 + "jie.cim.common.protocolB\020CIMResponseProt" +
  789 + "o"
695 }; 790 };
696 com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = 791 com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
697 new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { 792 new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() {
@@ -710,7 +805,7 @@ public final class CIMResponseProto { @@ -710,7 +805,7 @@ public final class CIMResponseProto {
710 internal_static_protocol_CIMResProtocol_fieldAccessorTable = new 805 internal_static_protocol_CIMResProtocol_fieldAccessorTable = new
711 com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( 806 com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
712 internal_static_protocol_CIMResProtocol_descriptor, 807 internal_static_protocol_CIMResProtocol_descriptor,
713 - new String[] { "ResponseId", "ResMsg", }); 808 + new String[] { "ResponseId", "ResMsg", "Type", });
714 } 809 }
715 810
716 // @@protoc_insertion_point(outer_class_scope) 811 // @@protoc_insertion_point(outer_class_scope)
  1 +package com.crossoverjie.cim.common.data.construct;
  2 +
  3 +import org.junit.Assert;
  4 +import org.junit.Test;
  5 +
  6 +import java.util.List;
  7 +
  8 +public class TrieTreeTest {
  9 + @Test
  10 + public void insert() throws Exception {
  11 + TrieTree trieTree = new TrieTree();
  12 + trieTree.insert("abc");
  13 + trieTree.insert("abcd");
  14 + }
  15 +
  16 +
  17 + @Test
  18 + public void all() throws Exception {
  19 + TrieTree trieTree = new TrieTree();
  20 + trieTree.insert("ABC");
  21 + trieTree.insert("abC");
  22 + List<String> all = trieTree.all();
  23 + String result = "";
  24 + for (String s : all) {
  25 + result += s + ",";
  26 + System.out.println(s);
  27 + }
  28 +
  29 + Assert.assertTrue("ABC,abC,".equals(result));
  30 +
  31 + }
  32 + @Test
  33 + public void all2() throws Exception {
  34 + TrieTree trieTree = new TrieTree();
  35 + trieTree.insert("abc");
  36 + trieTree.insert("abC");
  37 + List<String> all = trieTree.all();
  38 + String result = "";
  39 + for (String s : all) {
  40 + result += s + ",";
  41 + System.out.println(s);
  42 + }
  43 +
  44 + //Assert.assertTrue("ABC,abC,".equals(result));
  45 +
  46 + }
  47 +
  48 + @Test
  49 + public void prefixSearch() throws Exception {
  50 + TrieTree trieTree = new TrieTree();
  51 + trieTree.insert("abc");
  52 + trieTree.insert("abd");
  53 + trieTree.insert("ABe");
  54 +
  55 + List<String> ab = trieTree.prefixSearch("AB");
  56 + for (String s : ab) {
  57 + System.out.println(s);
  58 + }
  59 +
  60 + System.out.println("========");
  61 +
  62 + //char[] chars = new char[3] ;
  63 + //for (int i = 0; i < 3; i++) {
  64 + // int a = 97 + i ;
  65 + // chars[i] = (char) a ;
  66 + //}
  67 + //
  68 + //String s = String.valueOf(chars);
  69 + //System.out.println(s);
  70 + }
  71 +
  72 + @Test
  73 + public void prefixSearch2() throws Exception {
  74 + TrieTree trieTree = new TrieTree();
  75 + trieTree.insert("Cde");
  76 + trieTree.insert("CDa");
  77 + trieTree.insert("ABe");
  78 +
  79 + List<String> ab = trieTree.prefixSearch("AC");
  80 + for (String s : ab) {
  81 + System.out.println(s);
  82 + }
  83 + Assert.assertTrue(ab.size() == 0);
  84 + }
  85 +
  86 + @Test
  87 + public void prefixSearch3() throws Exception {
  88 + TrieTree trieTree = new TrieTree();
  89 + trieTree.insert("Cde");
  90 + trieTree.insert("CDa");
  91 + trieTree.insert("ABe");
  92 +
  93 + List<String> ab = trieTree.prefixSearch("CD");
  94 + for (String s : ab) {
  95 + System.out.println(s);
  96 + }
  97 + Assert.assertTrue(ab.size() == 1);
  98 + }
  99 +
  100 + @Test
  101 + public void prefixSearch4() throws Exception {
  102 + TrieTree trieTree = new TrieTree();
  103 + trieTree.insert("Cde");
  104 + trieTree.insert("CDa");
  105 + trieTree.insert("ABe");
  106 +
  107 + List<String> ab = trieTree.prefixSearch("Cd");
  108 + String result = "";
  109 + for (String s : ab) {
  110 + result += s + ",";
  111 + System.out.println(s);
  112 + }
  113 + Assert.assertTrue(result.equals("Cde,"));
  114 + }
  115 +
  116 + @Test
  117 + public void prefixSearch5() throws Exception {
  118 + TrieTree trieTree = new TrieTree();
  119 + trieTree.insert("Cde");
  120 + trieTree.insert("CDa");
  121 + trieTree.insert("ABe");
  122 + trieTree.insert("CDfff");
  123 + trieTree.insert("Cdfff");
  124 +
  125 + List<String> ab = trieTree.prefixSearch("Cd");
  126 + String result = "";
  127 + for (String s : ab) {
  128 + result += s + ",";
  129 + System.out.println(s);
  130 + }
  131 + Assert.assertTrue(result.equals("Cde,Cdfff,"));
  132 + }
  133 +
  134 + @Test
  135 + public void prefixSearch6() throws Exception {
  136 + TrieTree trieTree = new TrieTree();
  137 + trieTree.insert("Cde");
  138 + trieTree.insert("CDa");
  139 + trieTree.insert("ABe");
  140 + trieTree.insert("CDfff");
  141 + trieTree.insert("Cdfff");
  142 +
  143 + List<String> ab = trieTree.prefixSearch("CD");
  144 + String result = "";
  145 + for (String s : ab) {
  146 + result += s + ",";
  147 + System.out.println(s);
  148 + }
  149 + Assert.assertTrue(result.equals("CDa,CDfff,"));
  150 + }
  151 +
  152 + @Test
  153 + public void prefixSearch7() throws Exception {
  154 + TrieTree trieTree = new TrieTree();
  155 + trieTree.insert("Cde");
  156 + trieTree.insert("CDa");
  157 + trieTree.insert("ABe");
  158 + trieTree.insert("CDfff");
  159 + trieTree.insert("Cdfff");
  160 +
  161 + List<String> ab = trieTree.prefixSearch("");
  162 + String result = "";
  163 + for (String s : ab) {
  164 + result += s + ",";
  165 + System.out.println(s);
  166 + }
  167 + Assert.assertTrue(result.equals(""));
  168 + }
  169 +
  170 + @Test
  171 + public void prefixSearch8() throws Exception {
  172 + TrieTree trieTree = new TrieTree();
  173 +
  174 + List<String> ab = trieTree.prefixSearch("");
  175 + String result = "";
  176 + for (String s : ab) {
  177 + result += s + ",";
  178 + System.out.println(s);
  179 + }
  180 + Assert.assertTrue(result.equals(""));
  181 + }
  182 +
  183 +
  184 + @Test
  185 + public void prefixSearch9() throws Exception {
  186 + TrieTree trieTree = new TrieTree();
  187 + trieTree.insert("Cde");
  188 + trieTree.insert("CDa");
  189 + trieTree.insert("ABe");
  190 + trieTree.insert("CDfff");
  191 + trieTree.insert("Cdfff");
  192 +
  193 + List<String> ab = trieTree.prefixSearch("CDFD");
  194 + String result = "";
  195 + for (String s : ab) {
  196 + result += s + ",";
  197 + System.out.println(s);
  198 + }
  199 + Assert.assertTrue(result.equals(""));
  200 + }
  201 +
  202 +
  203 + @Test
  204 + public void prefixSearch10() throws Exception {
  205 + TrieTree trieTree = new TrieTree();
  206 + trieTree.insert("crossoverJie");
  207 + trieTree.insert("zhangsan");
  208 +
  209 + List<String> ab = trieTree.prefixSearch("c");
  210 + String result = "";
  211 + for (String s : ab) {
  212 + result += s + ",";
  213 + System.out.println(s);
  214 + }
  215 + Assert.assertTrue(result.equals("crossoverJie,"));
  216 + }
  217 +
  218 +}
@@ -74,11 +74,6 @@ @@ -74,11 +74,6 @@
74 </dependency> 74 </dependency>
75 75
76 <dependency> 76 <dependency>
77 - <groupId>de.codecentric</groupId>  
78 - <artifactId>spring-boot-admin-starter-client</artifactId>  
79 - </dependency>  
80 -  
81 - <dependency>  
82 <groupId>ch.qos.logback</groupId> 77 <groupId>ch.qos.logback</groupId>
83 <artifactId>logback-classic</artifactId> 78 <artifactId>logback-classic</artifactId>
84 </dependency> 79 </dependency>
@@ -144,8 +144,8 @@ public class RouteController { @@ -144,8 +144,8 @@ public class RouteController {
144 BaseResponse<CIMServerResVO> res = new BaseResponse(); 144 BaseResponse<CIMServerResVO> res = new BaseResponse();
145 145
146 //登录校验 146 //登录校验
147 - boolean login = accountService.login(loginReqVO);  
148 - if (login) { 147 + StatusEnum status = accountService.login(loginReqVO);
  148 + if (status == StatusEnum.SUCCESS) {
149 String server = serverCache.selectServer(); 149 String server = serverCache.selectServer();
150 String[] serverInfo = server.split(":"); 150 String[] serverInfo = server.split(":");
151 CIMServerResVO vo = new CIMServerResVO(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2])); 151 CIMServerResVO vo = new CIMServerResVO(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2]));
@@ -154,12 +154,10 @@ public class RouteController { @@ -154,12 +154,10 @@ public class RouteController {
154 accountService.saveRouteInfo(loginReqVO,server); 154 accountService.saveRouteInfo(loginReqVO,server);
155 155
156 res.setDataBody(vo); 156 res.setDataBody(vo);
157 - res.setCode(StatusEnum.SUCCESS.getCode());  
158 - res.setMessage(StatusEnum.SUCCESS.getMessage());  
159 - } else {  
160 - res.setCode(StatusEnum.REPEAT_LOGIN.getCode());  
161 - res.setMessage(StatusEnum.REPEAT_LOGIN.getMessage()); 157 +
162 } 158 }
  159 + res.setCode(status.getCode());
  160 + res.setMessage(status.getMessage());
163 161
164 return res; 162 return res;
165 } 163 }
1 package com.crossoverjie.cim.route.service; 1 package com.crossoverjie.cim.route.service;
2 2
  3 +import com.crossoverjie.cim.common.enums.StatusEnum;
3 import com.crossoverjie.cim.route.vo.req.ChatReqVO; 4 import com.crossoverjie.cim.route.vo.req.ChatReqVO;
4 import com.crossoverjie.cim.route.vo.req.LoginReqVO; 5 import com.crossoverjie.cim.route.vo.req.LoginReqVO;
5 import com.crossoverjie.cim.route.vo.res.CIMServerResVO; 6 import com.crossoverjie.cim.route.vo.res.CIMServerResVO;
@@ -30,7 +31,7 @@ public interface AccountService { @@ -30,7 +31,7 @@ public interface AccountService {
30 * @return true 成功 false 失败 31 * @return true 成功 false 失败
31 * @throws Exception 32 * @throws Exception
32 */ 33 */
33 - boolean login(LoginReqVO loginReqVO) throws Exception ; 34 + StatusEnum login(LoginReqVO loginReqVO) throws Exception ;
34 35
35 /** 36 /**
36 * 保存路由信息 37 * 保存路由信息
1 package com.crossoverjie.cim.route.service.impl; 1 package com.crossoverjie.cim.route.service.impl;
2 2
3 import com.alibaba.fastjson.JSONObject; 3 import com.alibaba.fastjson.JSONObject;
  4 +import com.crossoverjie.cim.common.enums.StatusEnum;
4 import com.crossoverjie.cim.common.exception.CIMException; 5 import com.crossoverjie.cim.common.exception.CIMException;
5 import com.crossoverjie.cim.common.pojo.CIMUserInfo; 6 import com.crossoverjie.cim.common.pojo.CIMUserInfo;
6 import com.crossoverjie.cim.route.service.AccountService; 7 import com.crossoverjie.cim.route.service.AccountService;
@@ -69,26 +70,26 @@ public class AccountServiceRedisImpl implements AccountService { @@ -69,26 +70,26 @@ public class AccountServiceRedisImpl implements AccountService {
69 } 70 }
70 71
71 @Override 72 @Override
72 - public boolean login(LoginReqVO loginReqVO) throws Exception { 73 + public StatusEnum login(LoginReqVO loginReqVO) throws Exception {
73 //再去Redis里查询 74 //再去Redis里查询
74 String key = ACCOUNT_PREFIX + loginReqVO.getUserId(); 75 String key = ACCOUNT_PREFIX + loginReqVO.getUserId();
75 String userName = redisTemplate.opsForValue().get(key); 76 String userName = redisTemplate.opsForValue().get(key);
76 if (null == userName) { 77 if (null == userName) {
77 - return false; 78 + return StatusEnum.ACCOUNT_NOT_MATCH;
78 } 79 }
79 80
80 if (!userName.equals(loginReqVO.getUserName())) { 81 if (!userName.equals(loginReqVO.getUserName())) {
81 - return false; 82 + return StatusEnum.ACCOUNT_NOT_MATCH;
82 } 83 }
83 84
84 //登录成功,保存登录状态 85 //登录成功,保存登录状态
85 boolean status = userInfoCacheService.saveAndCheckUserLoginStatus(loginReqVO.getUserId()); 86 boolean status = userInfoCacheService.saveAndCheckUserLoginStatus(loginReqVO.getUserId());
86 if (status == false){ 87 if (status == false){
87 //重复登录 88 //重复登录
88 - return false; 89 + return StatusEnum.REPEAT_LOGIN ;
89 } 90 }
90 91
91 - return true; 92 + return StatusEnum.SUCCESS;
92 } 93 }
93 94
94 @Override 95 @Override
@@ -102,6 +103,7 @@ public class AccountServiceRedisImpl implements AccountService { @@ -102,6 +103,7 @@ public class AccountServiceRedisImpl implements AccountService {
102 103
103 Map<Long, CIMServerResVO> routes = new HashMap<>(64); 104 Map<Long, CIMServerResVO> routes = new HashMap<>(64);
104 105
  106 +
105 RedisConnection connection = redisTemplate.getConnectionFactory().getConnection(); 107 RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
106 ScanOptions options = ScanOptions.scanOptions() 108 ScanOptions options = ScanOptions.scanOptions()
107 .match(ROUTE_PREFIX + "*") 109 .match(ROUTE_PREFIX + "*")
@@ -115,6 +117,11 @@ public class AccountServiceRedisImpl implements AccountService { @@ -115,6 +117,11 @@ public class AccountServiceRedisImpl implements AccountService {
115 parseServerInfo(routes, key); 117 parseServerInfo(routes, key);
116 118
117 } 119 }
  120 + try {
  121 + scan.close();
  122 + } catch (IOException e) {
  123 + LOGGER.error("IOException",e);
  124 + }
118 125
119 return routes; 126 return routes;
120 } 127 }
@@ -156,9 +163,13 @@ public class AccountServiceRedisImpl implements AccountService { @@ -156,9 +163,13 @@ public class AccountServiceRedisImpl implements AccountService {
156 .build(); 163 .build();
157 164
158 Response response = okHttpClient.newCall(request).execute(); 165 Response response = okHttpClient.newCall(request).execute();
  166 + try {
159 if (!response.isSuccessful()) { 167 if (!response.isSuccessful()) {
160 throw new IOException("Unexpected code " + response); 168 throw new IOException("Unexpected code " + response);
161 } 169 }
  170 + }finally {
  171 + response.body().close();
  172 + }
162 } 173 }
163 174
164 @Override 175 @Override
@@ -13,6 +13,7 @@ import org.springframework.boot.test.context.SpringBootTest; @@ -13,6 +13,7 @@ import org.springframework.boot.test.context.SpringBootTest;
13 import org.springframework.test.context.junit4.SpringRunner; 13 import org.springframework.test.context.junit4.SpringRunner;
14 14
15 import java.util.Map; 15 import java.util.Map;
  16 +import java.util.concurrent.TimeUnit;
16 17
17 @SpringBootTest(classes = RouteApplication.class) 18 @SpringBootTest(classes = RouteApplication.class)
18 @RunWith(SpringRunner.class) 19 @RunWith(SpringRunner.class)
@@ -25,8 +26,12 @@ public class AccountServiceRedisImplTest { @@ -25,8 +26,12 @@ public class AccountServiceRedisImplTest {
25 26
26 @Test 27 @Test
27 public void loadRouteRelated() throws Exception { 28 public void loadRouteRelated() throws Exception {
  29 + for (int i = 0; i < 100; i++) {
  30 +
28 Map<Long, CIMServerResVO> longCIMServerResVOMap = accountService.loadRouteRelated(); 31 Map<Long, CIMServerResVO> longCIMServerResVOMap = accountService.loadRouteRelated();
29 - LOGGER.info("longCIMServerResVOMap={}" , JSON.toJSONString(longCIMServerResVOMap)); 32 + LOGGER.info("longCIMServerResVOMap={},cun={}" , JSON.toJSONString(longCIMServerResVOMap),i);
  33 + }
  34 + TimeUnit.SECONDS.sleep(10);
30 } 35 }
31 36
32 } 37 }
@@ -64,10 +64,6 @@ @@ -64,10 +64,6 @@
64 <artifactId>spring-boot-starter-actuator</artifactId> 64 <artifactId>spring-boot-starter-actuator</artifactId>
65 </dependency> 65 </dependency>
66 66
67 - <dependency>  
68 - <groupId>de.codecentric</groupId>  
69 - <artifactId>spring-boot-admin-starter-client</artifactId>  
70 - </dependency>  
71 67
72 <dependency> 68 <dependency>
73 <groupId>io.netty</groupId> 69 <groupId>io.netty</groupId>
@@ -28,6 +28,9 @@ public class AppConfiguration { @@ -28,6 +28,9 @@ public class AppConfiguration {
28 @Value("${cim.clear.route.request.url}") 28 @Value("${cim.clear.route.request.url}")
29 private String clearRouteUrl ; 29 private String clearRouteUrl ;
30 30
  31 + @Value("${cim.heartbeat.time}")
  32 + private long heartBeatTime ;
  33 +
31 public String getClearRouteUrl() { 34 public String getClearRouteUrl() {
32 return clearRouteUrl; 35 return clearRouteUrl;
33 } 36 }
@@ -67,4 +70,12 @@ public class AppConfiguration { @@ -67,4 +70,12 @@ public class AppConfiguration {
67 public void setCimServerPort(int cimServerPort) { 70 public void setCimServerPort(int cimServerPort) {
68 this.cimServerPort = cimServerPort; 71 this.cimServerPort = cimServerPort;
69 } 72 }
  73 +
  74 + public long getHeartBeatTime() {
  75 + return heartBeatTime;
  76 + }
  77 +
  78 + public void setHeartBeatTime(long heartBeatTime) {
  79 + this.heartBeatTime = heartBeatTime;
  80 + }
70 } 81 }
1 package com.crossoverjie.cim.server.config; 1 package com.crossoverjie.cim.server.config;
2 2
  3 +import com.crossoverjie.cim.common.constant.Constants;
  4 +import com.crossoverjie.cim.common.protocol.CIMRequestProto;
3 import okhttp3.OkHttpClient; 5 import okhttp3.OkHttpClient;
4 import org.I0Itec.zkclient.ZkClient; 6 import org.I0Itec.zkclient.ZkClient;
5 import org.springframework.beans.factory.annotation.Autowired; 7 import org.springframework.beans.factory.annotation.Autowired;
@@ -39,4 +41,19 @@ public class BeanConfig { @@ -39,4 +41,19 @@ public class BeanConfig {
39 .retryOnConnectionFailure(true); 41 .retryOnConnectionFailure(true);
40 return builder.build(); 42 return builder.build();
41 } 43 }
  44 +
  45 +
  46 + /**
  47 + * 创建心跳单例
  48 + * @return
  49 + */
  50 + @Bean(value = "heartBeat")
  51 + public CIMRequestProto.CIMReqProtocol heartBeat() {
  52 + CIMRequestProto.CIMReqProtocol heart = CIMRequestProto.CIMReqProtocol.newBuilder()
  53 + .setRequestId(0L)
  54 + .setReqMsg("ping")
  55 + .setType(Constants.CommandType.PING)
  56 + .build();
  57 + return heart;
  58 + }
42 } 59 }
@@ -6,12 +6,13 @@ import com.crossoverjie.cim.common.exception.CIMException; @@ -6,12 +6,13 @@ import com.crossoverjie.cim.common.exception.CIMException;
6 import com.crossoverjie.cim.common.pojo.CIMUserInfo; 6 import com.crossoverjie.cim.common.pojo.CIMUserInfo;
7 import com.crossoverjie.cim.common.protocol.CIMRequestProto; 7 import com.crossoverjie.cim.common.protocol.CIMRequestProto;
8 import com.crossoverjie.cim.server.config.AppConfiguration; 8 import com.crossoverjie.cim.server.config.AppConfiguration;
  9 +import com.crossoverjie.cim.server.util.NettyAttrUtil;
9 import com.crossoverjie.cim.server.util.SessionSocketHolder; 10 import com.crossoverjie.cim.server.util.SessionSocketHolder;
10 import com.crossoverjie.cim.server.util.SpringBeanFactory; 11 import com.crossoverjie.cim.server.util.SpringBeanFactory;
11 -import io.netty.channel.ChannelHandler;  
12 -import io.netty.channel.ChannelHandlerContext;  
13 -import io.netty.channel.SimpleChannelInboundHandler; 12 +import io.netty.channel.*;
14 import io.netty.channel.socket.nio.NioSocketChannel; 13 import io.netty.channel.socket.nio.NioSocketChannel;
  14 +import io.netty.handler.timeout.IdleState;
  15 +import io.netty.handler.timeout.IdleStateEvent;
15 import okhttp3.*; 16 import okhttp3.*;
16 import org.slf4j.Logger; 17 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory; 18 import org.slf4j.LoggerFactory;
@@ -31,16 +32,60 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto @@ -31,16 +32,60 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
31 private final static Logger LOGGER = LoggerFactory.getLogger(CIMServerHandle.class); 32 private final static Logger LOGGER = LoggerFactory.getLogger(CIMServerHandle.class);
32 33
33 private final MediaType mediaType = MediaType.parse("application/json"); 34 private final MediaType mediaType = MediaType.parse("application/json");
  35 +
34 /** 36 /**
35 * 取消绑定 37 * 取消绑定
  38 + *
36 * @param ctx 39 * @param ctx
37 * @throws Exception 40 * @throws Exception
38 */ 41 */
39 @Override 42 @Override
40 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 43 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  44 + //可能出现业务判断离线后再次触发 channelInactive
  45 + CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());
  46 + if (userInfo != null){
  47 + LOGGER.warn("[{}]触发 channelInactive 掉线!",userInfo.getUserName());
  48 + userOffLine(userInfo, (NioSocketChannel) ctx.channel());
  49 + ctx.channel().close();
  50 + }
  51 + }
  52 +
  53 + @Override
  54 + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  55 + if (evt instanceof IdleStateEvent) {
  56 + IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
  57 + if (idleStateEvent.state() == IdleState.READER_IDLE) {
  58 + AppConfiguration configuration = SpringBeanFactory.getBean(AppConfiguration.class);
  59 + long heartBeatTime = configuration.getHeartBeatTime() * 1000;
  60 +
  61 +
  62 + //向客户端发送消息
  63 + CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
  64 + CIMRequestProto.CIMReqProtocol.class);
  65 + ctx.writeAndFlush(heartBeat).addListeners(ChannelFutureListener.CLOSE_ON_FAILURE);
  66 +
  67 + Long lastReadTime = NettyAttrUtil.getReaderTime(ctx.channel());
  68 + long now = System.currentTimeMillis();
  69 + if (lastReadTime != null && now - lastReadTime > heartBeatTime){
41 CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel()); 70 CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());
42 - LOGGER.info("用户[{}]下线",userInfo.getUserName());  
43 - SessionSocketHolder.remove((NioSocketChannel) ctx.channel()); 71 + LOGGER.warn("客户端[{}]心跳超时[{}]ms,需要关闭连接!",userInfo.getUserName(),now - lastReadTime);
  72 + userOffLine(userInfo, (NioSocketChannel) ctx.channel());
  73 + ctx.channel().close();
  74 + }
  75 + }
  76 + }
  77 + super.userEventTriggered(ctx, evt);
  78 + }
  79 +
  80 + /**
  81 + * 用户下线
  82 + * @param userInfo
  83 + * @param channel
  84 + * @throws IOException
  85 + */
  86 + private void userOffLine(CIMUserInfo userInfo, NioSocketChannel channel) throws IOException {
  87 + LOGGER.info("用户[{}]下线", userInfo.getUserName());
  88 + SessionSocketHolder.remove(channel);
44 SessionSocketHolder.removeSession(userInfo.getUserId()); 89 SessionSocketHolder.removeSession(userInfo.getUserId());
45 90
46 //清除路由关系 91 //清除路由关系
@@ -49,6 +94,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto @@ -49,6 +94,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
49 94
50 /** 95 /**
51 * 清除路由关系 96 * 清除路由关系
  97 + *
52 * @param userInfo 98 * @param userInfo
53 * @throws IOException 99 * @throws IOException
54 */ 100 */
@@ -71,7 +117,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto @@ -71,7 +117,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
71 if (!response.isSuccessful()) { 117 if (!response.isSuccessful()) {
72 throw new IOException("Unexpected code " + response); 118 throw new IOException("Unexpected code " + response);
73 } 119 }
74 - }finally { 120 + } finally {
75 response.body().close(); 121 response.body().close();
76 } 122 }
77 } 123 }
@@ -81,20 +127,24 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto @@ -81,20 +127,24 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
81 protected void channelRead0(ChannelHandlerContext ctx, CIMRequestProto.CIMReqProtocol msg) throws Exception { 127 protected void channelRead0(ChannelHandlerContext ctx, CIMRequestProto.CIMReqProtocol msg) throws Exception {
82 LOGGER.info("收到msg={}", msg.toString()); 128 LOGGER.info("收到msg={}", msg.toString());
83 129
84 - if (msg.getType() == Constants.CommandType.LOGIN){ 130 + if (msg.getType() == Constants.CommandType.LOGIN) {
85 //保存客户端与 Channel 之间的关系 131 //保存客户端与 Channel 之间的关系
86 - SessionSocketHolder.put(msg.getRequestId(),(NioSocketChannel)ctx.channel()) ;  
87 - SessionSocketHolder.saveSession(msg.getRequestId(),msg.getReqMsg());  
88 - LOGGER.info("客户端[{}]上线成功",msg.getReqMsg()); 132 + SessionSocketHolder.put(msg.getRequestId(), (NioSocketChannel) ctx.channel());
  133 + SessionSocketHolder.saveSession(msg.getRequestId(), msg.getReqMsg());
  134 + LOGGER.info("客户端[{}]上线成功", msg.getReqMsg());
89 } 135 }
90 136
  137 + //心跳更新时间
  138 + if (msg.getType() == Constants.CommandType.PING){
  139 + NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis());
91 } 140 }
92 141
  142 + }
93 143
94 144
95 @Override 145 @Override
96 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 146 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
97 - if (CIMException.isResetByPeer(cause.getMessage())){ 147 + if (CIMException.isResetByPeer(cause.getMessage())) {
98 return; 148 return;
99 } 149 }
100 150
@@ -8,6 +8,7 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder; @@ -8,6 +8,7 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
8 import io.netty.handler.codec.protobuf.ProtobufEncoder; 8 import io.netty.handler.codec.protobuf.ProtobufEncoder;
9 import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 9 import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
10 import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; 10 import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
  11 +import io.netty.handler.timeout.IdleStateHandler;
11 12
12 /** 13 /**
13 * Function: 14 * Function:
@@ -24,6 +25,8 @@ public class CIMServerInitializer extends ChannelInitializer<Channel> { @@ -24,6 +25,8 @@ public class CIMServerInitializer extends ChannelInitializer<Channel> {
24 protected void initChannel(Channel ch) throws Exception { 25 protected void initChannel(Channel ch) throws Exception {
25 26
26 ch.pipeline() 27 ch.pipeline()
  28 + //30 秒没有向客户端发送消息就发生心跳
  29 + .addLast(new IdleStateHandler(30, 0, 0))
27 // google Protobuf 编解码 30 // google Protobuf 编解码
28 .addLast(new ProtobufVarint32FrameDecoder()) 31 .addLast(new ProtobufVarint32FrameDecoder())
29 .addLast(new ProtobufDecoder(CIMRequestProto.CIMReqProtocol.getDefaultInstance())) 32 .addLast(new ProtobufDecoder(CIMRequestProto.CIMReqProtocol.getDefaultInstance()))
  1 +package com.crossoverjie.cim.server.util;
  2 +
  3 +import io.netty.channel.Channel;
  4 +import io.netty.util.Attribute;
  5 +import io.netty.util.AttributeKey;
  6 +
  7 +/**
  8 + * Function:
  9 + *
  10 + * @author crossoverJie
  11 + * Date: 2019/1/9 00:57
  12 + * @since JDK 1.8
  13 + */
  14 +public class NettyAttrUtil {
  15 +
  16 + private static final AttributeKey<String> ATTR_KEY_READER_TIME = AttributeKey.valueOf("readerTime");
  17 +
  18 +
  19 + public static void updateReaderTime(Channel channel, Long time) {
  20 + channel.attr(ATTR_KEY_READER_TIME).set(time.toString());
  21 + }
  22 +
  23 + public static Long getReaderTime(Channel channel) {
  24 + String value = getAttribute(channel, ATTR_KEY_READER_TIME);
  25 +
  26 + if (value != null) {
  27 + return Long.valueOf(value);
  28 + }
  29 + return null;
  30 + }
  31 +
  32 +
  33 + private static String getAttribute(Channel channel, AttributeKey<String> key) {
  34 + Attribute<String> attr = channel.attr(key);
  35 + return attr.get();
  36 + }
  37 +}
@@ -32,3 +32,6 @@ app.zk.root=/route @@ -32,3 +32,6 @@ app.zk.root=/route
32 32
33 # 清除路由信息 33 # 清除路由信息
34 cim.clear.route.request.url=http://localhost:8083/offLine 34 cim.clear.route.request.url=http://localhost:8083/offLine
  35 +
  36 +# 检测多少秒没有收到客户端心跳后服务端关闭连接
  37 +cim.heartbeat.time = 40
  1 +package com.crossoverjie.cim.server.util;
  2 +
  3 +
  4 +import org.junit.Test;
  5 +
  6 +import java.util.concurrent.TimeUnit;
  7 +
  8 +public class NettyAttrUtilTest {
  9 +
  10 + @Test
  11 + public void test() throws InterruptedException {
  12 + long heartbeat = 2 * 1000 ;
  13 +
  14 + long now = System.currentTimeMillis();
  15 + TimeUnit.SECONDS.sleep(1);
  16 +
  17 + long end = System.currentTimeMillis();
  18 +
  19 + if ((end - now) > heartbeat){
  20 + System.out.println("超时");
  21 + }else {
  22 + System.out.println("没有超时");
  23 + }
  24 + }
  25 +
  26 +}
@@ -8,6 +8,6 @@ option java_outer_classname = "CIMResponseProto"; @@ -8,6 +8,6 @@ option java_outer_classname = "CIMResponseProto";
8 message CIMResProtocol { 8 message CIMResProtocol {
9 required int64 responseId = 2; 9 required int64 responseId = 2;
10 required string resMsg = 1; 10 required string resMsg = 1;
11 - 11 + required int32 type = 3;
12 12
13 } 13 }