作者 crossoverJie

:bug: Fixing a bug.#70

@@ -4,6 +4,7 @@ import com.crossoverjie.cim.client.config.AppConfiguration; @@ -4,6 +4,7 @@ import com.crossoverjie.cim.client.config.AppConfiguration;
4 import com.crossoverjie.cim.client.init.CIMClientHandleInitializer; 4 import com.crossoverjie.cim.client.init.CIMClientHandleInitializer;
5 import com.crossoverjie.cim.client.service.EchoService; 5 import com.crossoverjie.cim.client.service.EchoService;
6 import com.crossoverjie.cim.client.service.MsgHandle; 6 import com.crossoverjie.cim.client.service.MsgHandle;
  7 +import com.crossoverjie.cim.client.service.ReConnectManager;
7 import com.crossoverjie.cim.client.service.RouteRequest; 8 import com.crossoverjie.cim.client.service.RouteRequest;
8 import com.crossoverjie.cim.client.service.impl.ClientInfo; 9 import com.crossoverjie.cim.client.service.impl.ClientInfo;
9 import com.crossoverjie.cim.client.thread.ContextHolder; 10 import com.crossoverjie.cim.client.thread.ContextHolder;
@@ -67,6 +68,9 @@ public class CIMClient { @@ -67,6 +68,9 @@ public class CIMClient {
67 @Autowired 68 @Autowired
68 private ClientInfo clientInfo; 69 private ClientInfo clientInfo;
69 70
  71 + @Autowired
  72 + private ReConnectManager reConnectManager ;
  73 +
70 /** 74 /**
71 * 重试次数 75 * 重试次数
72 */ 76 */
@@ -110,7 +114,7 @@ public class CIMClient { @@ -110,7 +114,7 @@ public class CIMClient {
110 LOGGER.error("连接失败次数达到上限[{}]次", errorCount); 114 LOGGER.error("连接失败次数达到上限[{}]次", errorCount);
111 msgHandle.shutdown(); 115 msgHandle.shutdown();
112 } 116 }
113 - LOGGER.error("连接失败", e); 117 + LOGGER.error("Connect fail!", e);
114 } 118 }
115 if (future.isSuccess()) { 119 if (future.isSuccess()) {
116 echoService.echo("Start cim client success!"); 120 echoService.echo("Start cim client success!");
@@ -140,7 +144,7 @@ public class CIMClient { @@ -140,7 +144,7 @@ public class CIMClient {
140 errorCount++; 144 errorCount++;
141 145
142 if (errorCount >= configuration.getErrorCount()) { 146 if (errorCount >= configuration.getErrorCount()) {
143 - LOGGER.error("重连次数达到上限[{}]次", errorCount); 147 + echoService.echo("The maximum number of reconnections has been reached[{}]times, close cim client!", errorCount);
144 msgHandle.shutdown(); 148 msgHandle.shutdown();
145 } 149 }
146 LOGGER.error("login fail", e); 150 LOGGER.error("login fail", e);
@@ -198,6 +202,13 @@ public class CIMClient { @@ -198,6 +202,13 @@ public class CIMClient {
198 } 202 }
199 203
200 204
  205 + /**
  206 + * 1. clear route information.
  207 + * 2. reconnect.
  208 + * 3. shutdown reconnect job.
  209 + * 4. reset reconnect state.
  210 + * @throws Exception
  211 + */
201 public void reconnect() throws Exception { 212 public void reconnect() throws Exception {
202 if (channel != null && channel.isActive()) { 213 if (channel != null && channel.isActive()) {
203 return; 214 return;
@@ -207,7 +218,8 @@ public class CIMClient { @@ -207,7 +218,8 @@ public class CIMClient {
207 218
208 echoService.echo("cim server shutdown, reconnecting...."); 219 echoService.echo("cim server shutdown, reconnecting....");
209 start(); 220 start();
210 - echoService.echo("Great! reconnect success!!!"); 221 + echoService.echo("Great! reConnect success!!!");
  222 + reConnectManager.reConnectSuccess();
211 ContextHolder.clear(); 223 ContextHolder.clear();
212 } 224 }
213 225
@@ -87,7 +87,7 @@ public class BeanConfig { @@ -87,7 +87,7 @@ public class BeanConfig {
87 @Bean("scheduledTask") 87 @Bean("scheduledTask")
88 public ScheduledExecutorService buildSchedule(){ 88 public ScheduledExecutorService buildSchedule(){
89 ThreadFactory sche = new ThreadFactoryBuilder() 89 ThreadFactory sche = new ThreadFactoryBuilder()
90 - .setNameFormat("reconnect-job-%d") 90 + .setNameFormat("reConnect-job-%d")
91 .setDaemon(true) 91 .setDaemon(true)
92 .build(); 92 .build();
93 ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,sche) ; 93 ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,sche) ;
1 package com.crossoverjie.cim.client.handle; 1 package com.crossoverjie.cim.client.handle;
2 2
3 import com.crossoverjie.cim.client.service.EchoService; 3 import com.crossoverjie.cim.client.service.EchoService;
  4 +import com.crossoverjie.cim.client.service.ReConnectManager;
4 import com.crossoverjie.cim.client.service.ShutDownMsg; 5 import com.crossoverjie.cim.client.service.ShutDownMsg;
5 import com.crossoverjie.cim.client.service.impl.EchoServiceImpl; 6 import com.crossoverjie.cim.client.service.impl.EchoServiceImpl;
6 -import com.crossoverjie.cim.client.thread.ReConnectJob;  
7 import com.crossoverjie.cim.client.util.SpringBeanFactory; 7 import com.crossoverjie.cim.client.util.SpringBeanFactory;
8 import com.crossoverjie.cim.common.constant.Constants; 8 import com.crossoverjie.cim.common.constant.Constants;
9 import com.crossoverjie.cim.common.protocol.CIMRequestProto; 9 import com.crossoverjie.cim.common.protocol.CIMRequestProto;
@@ -21,7 +21,6 @@ import org.slf4j.LoggerFactory; @@ -21,7 +21,6 @@ import org.slf4j.LoggerFactory;
21 21
22 import java.util.concurrent.ScheduledExecutorService; 22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ThreadPoolExecutor; 23 import java.util.concurrent.ThreadPoolExecutor;
24 -import java.util.concurrent.TimeUnit;  
25 24
26 /** 25 /**
27 * Function: 26 * Function:
@@ -41,6 +40,8 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt @@ -41,6 +40,8 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
41 40
42 private ScheduledExecutorService scheduledExecutorService ; 41 private ScheduledExecutorService scheduledExecutorService ;
43 42
  43 + private ReConnectManager reConnectManager ;
  44 +
44 private ShutDownMsg shutDownMsg ; 45 private ShutDownMsg shutDownMsg ;
45 46
46 private EchoService echoService ; 47 private EchoService echoService ;
@@ -88,10 +89,11 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt @@ -88,10 +89,11 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
88 89
89 if (scheduledExecutorService == null){ 90 if (scheduledExecutorService == null){
90 scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ; 91 scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ;
  92 + reConnectManager = SpringBeanFactory.getBean(ReConnectManager.class) ;
91 } 93 }
92 LOGGER.info("客户端断开了,重新连接!"); 94 LOGGER.info("客户端断开了,重新连接!");
93 // TODO: 2019-01-22 后期可以改为不用定时任务,连上后就关闭任务 节省性能。 95 // TODO: 2019-01-22 后期可以改为不用定时任务,连上后就关闭任务 节省性能。
94 - scheduledExecutorService.scheduleAtFixedRate(new ReConnectJob(ctx),0,10, TimeUnit.SECONDS) ; 96 + reConnectManager.reConnect(ctx);
95 } 97 }
96 98
97 @Override 99 @Override
  1 +package com.crossoverjie.cim.client.service;
  2 +
  3 +import com.crossoverjie.cim.client.thread.ReConnectJob;
  4 +import com.google.common.util.concurrent.ThreadFactoryBuilder;
  5 +import io.netty.channel.ChannelHandlerContext;
  6 +import org.springframework.stereotype.Component;
  7 +
  8 +import java.util.concurrent.ScheduledExecutorService;
  9 +import java.util.concurrent.ScheduledThreadPoolExecutor;
  10 +import java.util.concurrent.ThreadFactory;
  11 +import java.util.concurrent.TimeUnit;
  12 +
  13 +/**
  14 + * Function:
  15 + *
  16 + * @author crossoverJie
  17 + * Date: 2020-04-15 00:26
  18 + * @since JDK 1.8
  19 + */
  20 +@Component
  21 +public final class ReConnectManager {
  22 +
  23 + private ScheduledExecutorService scheduledExecutorService;
  24 +
  25 + /**
  26 + * Trigger reconnect job
  27 + * @param ctx
  28 + */
  29 + public void reConnect(ChannelHandlerContext ctx) {
  30 + buildExecutor() ;
  31 + scheduledExecutorService.scheduleAtFixedRate(new ReConnectJob(ctx),0,10, TimeUnit.SECONDS) ;
  32 + }
  33 +
  34 + /**
  35 + * Close reconnect job if reconnect success.
  36 + */
  37 + public void reConnectSuccess(){
  38 + scheduledExecutorService.shutdown();
  39 + }
  40 +
  41 +
  42 + /***
  43 + * build an thread executor
  44 + * @return
  45 + */
  46 + private ScheduledExecutorService buildExecutor() {
  47 + if (scheduledExecutorService == null || scheduledExecutorService.isShutdown()) {
  48 + ThreadFactory sche = new ThreadFactoryBuilder()
  49 + .setNameFormat("reConnect-job-%d")
  50 + .setDaemon(true)
  51 + .build();
  52 + scheduledExecutorService = new ScheduledThreadPoolExecutor(1, sche);
  53 + return scheduledExecutorService;
  54 + } else {
  55 + return scheduledExecutorService;
  56 + }
  57 + }
  58 +}
@@ -143,7 +143,7 @@ public class RouteRequestImpl implements RouteRequest { @@ -143,7 +143,7 @@ public class RouteRequestImpl implements RouteRequest {
143 if (!cimServerResVO.getCode().equals(StatusEnum.SUCCESS.getCode())){ 143 if (!cimServerResVO.getCode().equals(StatusEnum.SUCCESS.getCode())){
144 echoService.echo(cimServerResVO.getMessage()); 144 echoService.echo(cimServerResVO.getMessage());
145 145
146 - // when client in reconnect state, could not exit. 146 + // when client in reConnect state, could not exit.
147 if (ContextHolder.getReconnect()){ 147 if (ContextHolder.getReconnect()){
148 echoService.echo("###{}###", StatusEnum.RECONNECT_FAIL.getMessage()); 148 echoService.echo("###{}###", StatusEnum.RECONNECT_FAIL.getMessage());
149 throw new CIMException(StatusEnum.RECONNECT_FAIL); 149 throw new CIMException(StatusEnum.RECONNECT_FAIL);