作者 crossoverJie

:white_check_mark: Adding tests.

@@ -186,6 +186,10 @@ public class CIMClient { @@ -186,6 +186,10 @@ public class CIMClient {
186 if (channel != null && channel.isActive()) { 186 if (channel != null && channel.isActive()) {
187 return; 187 return;
188 } 188 }
  189 + //首先清除路由信息,下线
  190 + routeRequest.offLine();
  191 +
  192 + LOGGER.info("开始重连。。");
189 start(); 193 start();
190 } 194 }
191 195
@@ -69,7 +69,8 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt @@ -69,7 +69,8 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
69 69
70 @Override 70 @Override
71 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 71 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
72 - LOGGER.info("客户端断开了!"); 72 + LOGGER.info("客户端断开了,重新连接!");
  73 +
73 if (scheduledExecutorService == null){ 74 if (scheduledExecutorService == null){
74 scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ; 75 scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ;
75 } 76 }
1 package com.crossoverjie.cim.client.service.impl; 1 package com.crossoverjie.cim.client.service.impl;
2 2
3 import com.crossoverjie.cim.client.client.CIMClient; 3 import com.crossoverjie.cim.client.client.CIMClient;
4 -import com.crossoverjie.cim.client.config.AppConfiguration;  
5 import com.crossoverjie.cim.client.service.RouteRequest; 4 import com.crossoverjie.cim.client.service.RouteRequest;
6 import com.crossoverjie.cim.common.kit.HeartBeatHandler; 5 import com.crossoverjie.cim.common.kit.HeartBeatHandler;
7 import io.netty.channel.ChannelHandlerContext; 6 import io.netty.channel.ChannelHandlerContext;
8 -import okhttp3.MediaType;  
9 import org.slf4j.Logger; 7 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory; 8 import org.slf4j.LoggerFactory;
11 import org.springframework.beans.factory.annotation.Autowired; 9 import org.springframework.beans.factory.annotation.Autowired;
@@ -22,10 +20,6 @@ import org.springframework.stereotype.Service; @@ -22,10 +20,6 @@ import org.springframework.stereotype.Service;
22 public class ClientHeartBeatHandlerImpl implements HeartBeatHandler { 20 public class ClientHeartBeatHandlerImpl implements HeartBeatHandler {
23 21
24 private final static Logger LOGGER = LoggerFactory.getLogger(ClientHeartBeatHandlerImpl.class); 22 private final static Logger LOGGER = LoggerFactory.getLogger(ClientHeartBeatHandlerImpl.class);
25 - private final MediaType mediaType = MediaType.parse("application/json");  
26 -  
27 - @Autowired  
28 - private AppConfiguration appConfiguration;  
29 23
30 @Autowired 24 @Autowired
31 private CIMClient cimClient; 25 private CIMClient cimClient;
@@ -36,9 +30,6 @@ public class ClientHeartBeatHandlerImpl implements HeartBeatHandler { @@ -36,9 +30,6 @@ public class ClientHeartBeatHandlerImpl implements HeartBeatHandler {
36 @Override 30 @Override
37 public void process(ChannelHandlerContext ctx) throws Exception { 31 public void process(ChannelHandlerContext ctx) throws Exception {
38 32
39 - //首先清除路由信息,下线  
40 - routeRequest.offLine();  
41 -  
42 //重连 33 //重连
43 cimClient.reconnect(); 34 cimClient.reconnect();
44 35
@@ -174,6 +174,9 @@ public class AccountServiceRedisImpl implements AccountService { @@ -174,6 +174,9 @@ public class AccountServiceRedisImpl implements AccountService {
174 174
175 @Override 175 @Override
176 public void offLine(Long userId) throws Exception { 176 public void offLine(Long userId) throws Exception {
  177 +
  178 + // TODO: 2019-01-21 改为一个原子命令,以防数据一致性
  179 +
177 //删除路由 180 //删除路由
178 redisTemplate.delete(ROUTE_PREFIX + userId) ; 181 redisTemplate.delete(ROUTE_PREFIX + userId) ;
179 182
@@ -51,7 +51,7 @@ public class BeanConfig { @@ -51,7 +51,7 @@ public class BeanConfig {
51 public CIMRequestProto.CIMReqProtocol heartBeat() { 51 public CIMRequestProto.CIMReqProtocol heartBeat() {
52 CIMRequestProto.CIMReqProtocol heart = CIMRequestProto.CIMReqProtocol.newBuilder() 52 CIMRequestProto.CIMReqProtocol heart = CIMRequestProto.CIMReqProtocol.newBuilder()
53 .setRequestId(0L) 53 .setRequestId(0L)
54 - .setReqMsg("ping") 54 + .setReqMsg("pong")
55 .setType(Constants.CommandType.PING) 55 .setType(Constants.CommandType.PING)
56 .build(); 56 .build();
57 return heart; 57 return heart;
@@ -61,18 +61,6 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto @@ -61,18 +61,6 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
61 IdleStateEvent idleStateEvent = (IdleStateEvent) evt; 61 IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
62 if (idleStateEvent.state() == IdleState.READER_IDLE) { 62 if (idleStateEvent.state() == IdleState.READER_IDLE) {
63 63
64 - LOGGER.info("服务端没有收到消息,向客户端发送心跳!");  
65 -  
66 - //向客户端发送消息  
67 - CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",  
68 - CIMRequestProto.CIMReqProtocol.class);  
69 - ctx.writeAndFlush(heartBeat).addListeners((ChannelFutureListener) future -> {  
70 - if (!future.isSuccess()) {  
71 - LOGGER.error("IO error,close Channel");  
72 - future.channel().close();  
73 - }  
74 - }) ;  
75 -  
76 HeartBeatHandler heartBeatHandler = SpringBeanFactory.getBean(ServerHeartBeatHandlerImpl.class) ; 64 HeartBeatHandler heartBeatHandler = SpringBeanFactory.getBean(ServerHeartBeatHandlerImpl.class) ;
77 heartBeatHandler.process(ctx) ; 65 heartBeatHandler.process(ctx) ;
78 } 66 }
@@ -140,6 +128,15 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto @@ -140,6 +128,15 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
140 //心跳更新时间 128 //心跳更新时间
141 if (msg.getType() == Constants.CommandType.PING){ 129 if (msg.getType() == Constants.CommandType.PING){
142 NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis()); 130 NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis());
  131 + //向客户端响应 pong 消息
  132 + CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
  133 + CIMRequestProto.CIMReqProtocol.class);
  134 + ctx.writeAndFlush(heartBeat).addListeners((ChannelFutureListener) future -> {
  135 + if (!future.isSuccess()) {
  136 + LOGGER.error("IO error,close Channel");
  137 + future.channel().close();
  138 + }
  139 + }) ;
143 } 140 }
144 141
145 } 142 }