作者 crossoverJie

:sparkles: Introducing new features.cim support delay msg

@@ -3,6 +3,7 @@ package com.crossoverjie.cim.client.config; @@ -3,6 +3,7 @@ package com.crossoverjie.cim.client.config;
3 import com.crossoverjie.cim.client.handle.MsgHandleCaller; 3 import com.crossoverjie.cim.client.handle.MsgHandleCaller;
4 import com.crossoverjie.cim.client.service.impl.MsgCallBackListener; 4 import com.crossoverjie.cim.client.service.impl.MsgCallBackListener;
5 import com.crossoverjie.cim.common.constant.Constants; 5 import com.crossoverjie.cim.common.constant.Constants;
  6 +import com.crossoverjie.cim.common.data.construct.RingBufferWheel;
6 import com.crossoverjie.cim.common.protocol.CIMRequestProto; 7 import com.crossoverjie.cim.common.protocol.CIMRequestProto;
7 import com.google.common.util.concurrent.ThreadFactoryBuilder; 8 import com.google.common.util.concurrent.ThreadFactoryBuilder;
8 import okhttp3.OkHttpClient; 9 import okhttp3.OkHttpClient;
@@ -104,4 +105,11 @@ public class BeanConfig { @@ -104,4 +105,11 @@ public class BeanConfig {
104 return caller ; 105 return caller ;
105 } 106 }
106 107
  108 +
  109 + @Bean
  110 + public RingBufferWheel bufferWheel(){
  111 + ExecutorService executorService = Executors.newFixedThreadPool(2) ;
  112 + return new RingBufferWheel(executorService) ;
  113 + }
  114 +
107 } 115 }
1 package com.crossoverjie.cim.client.handle; 1 package com.crossoverjie.cim.client.handle;
2 2
  3 +import com.crossoverjie.cim.client.service.EchoService;
3 import com.crossoverjie.cim.client.service.ShutDownMsg; 4 import com.crossoverjie.cim.client.service.ShutDownMsg;
  5 +import com.crossoverjie.cim.client.service.impl.EchoServiceImpl;
4 import com.crossoverjie.cim.client.thread.ReConnectJob; 6 import com.crossoverjie.cim.client.thread.ReConnectJob;
5 import com.crossoverjie.cim.client.util.SpringBeanFactory; 7 import com.crossoverjie.cim.client.util.SpringBeanFactory;
6 import com.crossoverjie.cim.common.constant.Constants; 8 import com.crossoverjie.cim.common.constant.Constants;
@@ -41,6 +43,8 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt @@ -41,6 +43,8 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
41 43
42 private ShutDownMsg shutDownMsg ; 44 private ShutDownMsg shutDownMsg ;
43 45
  46 + private EchoService echoService ;
  47 +
44 48
45 @Override 49 @Override
46 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 50 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
@@ -68,7 +72,6 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt @@ -68,7 +72,6 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
68 72
69 @Override 73 @Override
70 public void channelActive(ChannelHandlerContext ctx) throws Exception { 74 public void channelActive(ChannelHandlerContext ctx) throws Exception {
71 -  
72 //客户端和服务端建立连接时调用 75 //客户端和服务端建立连接时调用
73 LOGGER.info("cim server connect success!"); 76 LOGGER.info("cim server connect success!");
74 } 77 }
@@ -95,6 +98,10 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt @@ -95,6 +98,10 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
95 98
96 @Override 99 @Override
97 protected void channelRead0(ChannelHandlerContext ctx, CIMResponseProto.CIMResProtocol msg) throws Exception { 100 protected void channelRead0(ChannelHandlerContext ctx, CIMResponseProto.CIMResProtocol msg) throws Exception {
  101 + if (echoService == null){
  102 + echoService = SpringBeanFactory.getBean(EchoServiceImpl.class) ;
  103 + }
  104 +
98 105
99 //心跳更新时间 106 //心跳更新时间
100 if (msg.getType() == Constants.CommandType.PING){ 107 if (msg.getType() == Constants.CommandType.PING){
@@ -108,7 +115,7 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt @@ -108,7 +115,7 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
108 115
109 //将消息中的 emoji 表情格式化为 Unicode 编码以便在终端可以显示 116 //将消息中的 emoji 表情格式化为 Unicode 编码以便在终端可以显示
110 String response = EmojiParser.parseToUnicode(msg.getResMsg()); 117 String response = EmojiParser.parseToUnicode(msg.getResMsg());
111 - System.out.println(response); 118 + echoService.echo(response);
112 } 119 }
113 120
114 121
@@ -5,6 +5,9 @@ import com.crossoverjie.cim.client.service.EchoService; @@ -5,6 +5,9 @@ import com.crossoverjie.cim.client.service.EchoService;
5 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.beans.factory.annotation.Autowired;
6 import org.springframework.stereotype.Service; 6 import org.springframework.stereotype.Service;
7 7
  8 +import java.time.LocalDate;
  9 +import java.time.LocalTime;
  10 +
8 /** 11 /**
9 * Function: 12 * Function:
10 * 13 *
@@ -21,8 +24,10 @@ public class EchoServiceImpl implements EchoService { @@ -21,8 +24,10 @@ public class EchoServiceImpl implements EchoService {
21 private AppConfiguration appConfiguration; 24 private AppConfiguration appConfiguration;
22 25
23 @Override 26 @Override
24 - public void echo(String msg,Object... replace) {  
25 - msg = "\033[31;4m" + appConfiguration.getUserName() + PREFIX + "\033[0m" + " " + msg; 27 + public void echo(String msg, Object... replace) {
  28 + String date = LocalDate.now().toString() + " " + LocalTime.now().withNano(0).toString();
  29 +
  30 + msg = "[" + date + "] \033[31;4m" + appConfiguration.getUserName() + PREFIX + "\033[0m" + " " + msg;
26 31
27 String log = print(msg, replace); 32 String log = print(msg, replace);
28 33
@@ -32,6 +37,7 @@ public class EchoServiceImpl implements EchoService { @@ -32,6 +37,7 @@ public class EchoServiceImpl implements EchoService {
32 37
33 /** 38 /**
34 * print msg 39 * print msg
  40 + *
35 * @param msg 41 * @param msg
36 * @param place 42 * @param place
37 * @return 43 * @return
@@ -42,7 +48,7 @@ public class EchoServiceImpl implements EchoService { @@ -42,7 +48,7 @@ public class EchoServiceImpl implements EchoService {
42 for (int i = 0; i < place.length; i++) { 48 for (int i = 0; i < place.length; i++) {
43 int index = msg.indexOf("{}", k); 49 int index = msg.indexOf("{}", k);
44 50
45 - if (index == -1){ 51 + if (index == -1) {
46 return msg; 52 return msg;
47 } 53 }
48 54
@@ -63,9 +69,9 @@ public class EchoServiceImpl implements EchoService { @@ -63,9 +69,9 @@ public class EchoServiceImpl implements EchoService {
63 69
64 k = index + 2; 70 k = index + 2;
65 } 71 }
66 - if (sb.toString().equals("")){  
67 - return msg ;  
68 - }else { 72 + if (sb.toString().equals("")) {
  73 + return msg;
  74 + } else {
69 return sb.toString(); 75 return sb.toString();
70 } 76 }
71 } 77 }
  1 +package com.crossoverjie.cim.client.service.impl.command;
  2 +
  3 +import com.crossoverjie.cim.client.service.EchoService;
  4 +import com.crossoverjie.cim.client.service.InnerCommand;
  5 +import com.crossoverjie.cim.client.service.MsgHandle;
  6 +import com.crossoverjie.cim.common.data.construct.RingBufferWheel;
  7 +import com.vdurmont.emoji.EmojiParser;
  8 +import org.springframework.beans.factory.annotation.Autowired;
  9 +import org.springframework.stereotype.Service;
  10 +
  11 +/**
  12 + * Function:
  13 + *
  14 + * @author crossoverJie
  15 + * Date: 2019-09-25 00:37
  16 + * @since JDK 1.8
  17 + */
  18 +@Service
  19 +public class DelayMsgCommand implements InnerCommand {
  20 +
  21 + @Autowired
  22 + private EchoService echoService ;
  23 +
  24 + @Autowired
  25 + private MsgHandle msgHandle ;
  26 +
  27 + @Autowired
  28 + private RingBufferWheel ringBufferWheel ;
  29 +
  30 + @Override
  31 + public void process(String msg) {
  32 + if (msg.split(" ").length <=2){
  33 + echoService.echo("incorrect commond, :delay [msg] [delayTime]") ;
  34 + return ;
  35 + }
  36 +
  37 + String message = msg.split(" ")[1] ;
  38 + Integer delayTime = Integer.valueOf(msg.split(" ")[2]);
  39 +
  40 + RingBufferWheel.Task task = new DelayMsgJob(message) ;
  41 + task.setKey(delayTime);
  42 + ringBufferWheel.addTask(task);
  43 + ringBufferWheel.start();
  44 + echoService.echo(EmojiParser.parseToUnicode(msg));
  45 + }
  46 +
  47 +
  48 +
  49 + private class DelayMsgJob extends RingBufferWheel.Task{
  50 +
  51 + private String msg ;
  52 +
  53 + public DelayMsgJob(String msg) {
  54 + this.msg = msg;
  55 + }
  56 +
  57 + @Override
  58 + public void run() {
  59 + msgHandle.sendMsg(msg);
  60 + }
  61 + }
  62 +}
@@ -6,6 +6,7 @@ import com.crossoverjie.cim.client.service.InnerCommand; @@ -6,6 +6,7 @@ import com.crossoverjie.cim.client.service.InnerCommand;
6 import com.crossoverjie.cim.client.service.MsgLogger; 6 import com.crossoverjie.cim.client.service.MsgLogger;
7 import com.crossoverjie.cim.client.service.RouteRequest; 7 import com.crossoverjie.cim.client.service.RouteRequest;
8 import com.crossoverjie.cim.client.service.ShutDownMsg; 8 import com.crossoverjie.cim.client.service.ShutDownMsg;
  9 +import com.crossoverjie.cim.common.data.construct.RingBufferWheel;
9 import org.slf4j.Logger; 10 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory; 11 import org.slf4j.LoggerFactory;
11 import org.springframework.beans.factory.annotation.Autowired; 12 import org.springframework.beans.factory.annotation.Autowired;
@@ -45,6 +46,9 @@ public class ShutDownCommand implements InnerCommand { @@ -45,6 +46,9 @@ public class ShutDownCommand implements InnerCommand {
45 @Autowired 46 @Autowired
46 private ShutDownMsg shutDownMsg ; 47 private ShutDownMsg shutDownMsg ;
47 48
  49 + @Autowired
  50 + private RingBufferWheel ringBufferWheel ;
  51 +
48 @Override 52 @Override
49 public void process(String msg) { 53 public void process(String msg) {
50 echoService.echo("cim client closing..."); 54 echoService.echo("cim client closing...");
@@ -52,6 +56,7 @@ public class ShutDownCommand implements InnerCommand { @@ -52,6 +56,7 @@ public class ShutDownCommand implements InnerCommand {
52 routeRequest.offLine(); 56 routeRequest.offLine();
53 msgLogger.stop(); 57 msgLogger.stop();
54 executor.shutdown(); 58 executor.shutdown();
  59 + ringBufferWheel.stop(false);
55 try { 60 try {
56 while (!executor.awaitTermination(1, TimeUnit.SECONDS)) { 61 while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
57 echoService.echo("thread pool closing"); 62 echoService.echo("thread pool closing");
@@ -130,14 +130,16 @@ public final class RingBufferWheel { @@ -130,14 +130,16 @@ public final class RingBufferWheel {
130 executorService.shutdownNow(); 130 executorService.shutdownNow();
131 } else { 131 } else {
132 logger.info("delay task is stopping"); 132 logger.info("delay task is stopping");
133 - try {  
134 - lock.lock();  
135 - condition.await();  
136 - stop = true;  
137 - } catch (InterruptedException e) {  
138 - logger.error("InterruptedException", e);  
139 - } finally {  
140 - lock.unlock(); 133 + if (taskSize() > 0){
  134 + try {
  135 + lock.lock();
  136 + condition.await();
  137 + stop = true;
  138 + } catch (InterruptedException e) {
  139 + logger.error("InterruptedException", e);
  140 + } finally {
  141 + lock.unlock();
  142 + }
141 } 143 }
142 executorService.shutdown(); 144 executorService.shutdown();
143 } 145 }
@@ -20,7 +20,8 @@ public enum SystemCommandEnum { @@ -20,7 +20,8 @@ public enum SystemCommandEnum {
20 QAI(":qai ","关闭 AI 模式","CloseAIModelCommand"), 20 QAI(":qai ","关闭 AI 模式","CloseAIModelCommand"),
21 PREFIX(":pu ","模糊匹配用户","PrefixSearchCommand"), 21 PREFIX(":pu ","模糊匹配用户","PrefixSearchCommand"),
22 EMOJI(":emoji ","emoji 表情列表","EmojiCommand"), 22 EMOJI(":emoji ","emoji 表情列表","EmojiCommand"),
23 - INFO(":info ","获取客户端信息","EchoInfoCommand") 23 + INFO(":info ","获取客户端信息","EchoInfoCommand"),
  24 + DELAY_MSG(":delay ","delay message, :delay [msg] [delayTime]","DelayMsgCommand")
24 25
25 ; 26 ;
26 27
@@ -2,6 +2,8 @@ package com.crossoverjie.cim.common; @@ -2,6 +2,8 @@ package com.crossoverjie.cim.common;
2 2
3 import org.junit.Test; 3 import org.junit.Test;
4 4
  5 +import java.time.LocalDate;
  6 +import java.time.LocalTime;
5 import java.util.concurrent.TimeUnit; 7 import java.util.concurrent.TimeUnit;
6 8
7 /** 9 /**
@@ -13,6 +15,13 @@ import java.util.concurrent.TimeUnit; @@ -13,6 +15,13 @@ import java.util.concurrent.TimeUnit;
13 */ 15 */
14 public class CommonTest { 16 public class CommonTest {
15 17
  18 +
  19 + @Test
  20 + public void test2(){
  21 + System.out.println(LocalDate.now().toString());
  22 + System.out.println(LocalTime.now().withNano(0).toString());
  23 + }
  24 +
16 @Test 25 @Test
17 public void test() throws InterruptedException { 26 public void test() throws InterruptedException {
18 27