作者 crossoverJie

:art: Improving structure / format of the code.

@@ -73,12 +73,6 @@ @@ -73,12 +73,6 @@
73 </dependency> 73 </dependency>
74 74
75 <dependency> 75 <dependency>
76 - <groupId>io.netty</groupId>  
77 - <artifactId>netty-all</artifactId>  
78 - <version>${netty.version}</version>  
79 - </dependency>  
80 -  
81 - <dependency>  
82 <groupId>junit</groupId> 76 <groupId>junit</groupId>
83 <artifactId>junit</artifactId> 77 <artifactId>junit</artifactId>
84 </dependency> 78 </dependency>
@@ -4,6 +4,7 @@ import com.crossoverjie.cim.client.util.SpringBeanFactory; @@ -4,6 +4,7 @@ import com.crossoverjie.cim.client.util.SpringBeanFactory;
4 import com.crossoverjie.cim.common.constant.Constants; 4 import com.crossoverjie.cim.common.constant.Constants;
5 import com.crossoverjie.cim.common.protocol.CIMRequestProto; 5 import com.crossoverjie.cim.common.protocol.CIMRequestProto;
6 import com.crossoverjie.cim.common.protocol.CIMResponseProto; 6 import com.crossoverjie.cim.common.protocol.CIMResponseProto;
  7 +import com.crossoverjie.cim.common.util.NettyAttrUtil;
7 import io.netty.channel.ChannelFutureListener; 8 import io.netty.channel.ChannelFutureListener;
8 import io.netty.channel.ChannelHandler; 9 import io.netty.channel.ChannelHandler;
9 import io.netty.channel.ChannelHandlerContext; 10 import io.netty.channel.ChannelHandlerContext;
@@ -41,7 +42,12 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt @@ -41,7 +42,12 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
41 if (idleStateEvent.state() == IdleState.WRITER_IDLE){ 42 if (idleStateEvent.state() == IdleState.WRITER_IDLE){
42 CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat", 43 CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
43 CIMRequestProto.CIMReqProtocol.class); 44 CIMRequestProto.CIMReqProtocol.class);
44 - ctx.writeAndFlush(heartBeat).addListeners(ChannelFutureListener.CLOSE_ON_FAILURE) ; 45 + ctx.writeAndFlush(heartBeat).addListeners((ChannelFutureListener) future -> {
  46 + if (!future.isSuccess()) {
  47 + LOGGER.error("IO error,close Channel");
  48 + future.channel().close();
  49 + }
  50 + }) ;
45 } 51 }
46 52
47 53
@@ -58,10 +64,12 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt @@ -58,10 +64,12 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
58 } 64 }
59 65
60 @Override 66 @Override
61 - protected void channelRead0(ChannelHandlerContext channelHandlerContext, CIMResponseProto.CIMResProtocol msg) throws Exception { 67 + protected void channelRead0(ChannelHandlerContext ctx, CIMResponseProto.CIMResProtocol msg) throws Exception {
62 68
63 - //从服务端收到消息时被调用  
64 - //LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ; 69 + //心跳更新时间
  70 + if (msg.getType() == Constants.CommandType.PING){
  71 + NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis());
  72 + }
65 73
66 if (msg.getType() != Constants.CommandType.PING) { 74 if (msg.getType() != Constants.CommandType.PING) {
67 //回调消息 75 //回调消息
  1 +package com.crossoverjie.cim.client.service.impl;
  2 +
  3 +import com.crossoverjie.cim.common.kit.HeartBeatHandler;
  4 +import io.netty.channel.ChannelHandlerContext;
  5 +
  6 +/**
  7 + * Function:
  8 + *
  9 + * @author crossoverJie
  10 + * Date: 2019-01-20 17:16
  11 + * @since JDK 1.8
  12 + */
  13 +public class ClientHeartBeatHandlerImpl implements HeartBeatHandler {
  14 +
  15 + @Override
  16 + public void process(ChannelHandlerContext ctx) throws Exception {
  17 +
  18 + }
  19 +}
@@ -51,4 +51,7 @@ cim.callback.thread.pool.size = 2 @@ -51,4 +51,7 @@ cim.callback.thread.pool.size = 2
51 # 关闭健康检查权限 51 # 关闭健康检查权限
52 management.security.enabled=false 52 management.security.enabled=false
53 # SpringAdmin 地址 53 # SpringAdmin 地址
54 -spring.boot.admin.url=http://127.0.0.1:8888  
  54 +spring.boot.admin.url=http://127.0.0.1:8888
  55 +
  56 +# 检测多少秒没有收到服务端端心跳后重新登录获取连接
  57 +cim.heartbeat.time = 40
@@ -38,5 +38,11 @@ @@ -38,5 +38,11 @@
38 <groupId>com.github.sgroschupf</groupId> 38 <groupId>com.github.sgroschupf</groupId>
39 <artifactId>zkclient</artifactId> 39 <artifactId>zkclient</artifactId>
40 </dependency> 40 </dependency>
  41 +
  42 + <dependency>
  43 + <groupId>io.netty</groupId>
  44 + <artifactId>netty-all</artifactId>
  45 + <version>${netty.version}</version>
  46 + </dependency>
41 </dependencies> 47 </dependencies>
42 </project> 48 </project>
  1 +package com.crossoverjie.cim.common.kit;
  2 +
  3 +import io.netty.channel.ChannelHandlerContext;
  4 +
  5 +/**
  6 + * Function:
  7 + *
  8 + * @author crossoverJie
  9 + * Date: 2019-01-20 17:15
  10 + * @since JDK 1.8
  11 + */
  12 +public interface HeartBeatHandler {
  13 +
  14 + /**
  15 + * 处理
  16 + */
  17 + void process(ChannelHandlerContext ctx) throws Exception ;
  18 +}
1 -package com.crossoverjie.cim.server.util; 1 +package com.crossoverjie.cim.common.util;
2 2
3 import io.netty.channel.Channel; 3 import io.netty.channel.Channel;
4 import io.netty.util.Attribute; 4 import io.netty.util.Attribute;
@@ -64,13 +64,6 @@ @@ -64,13 +64,6 @@
64 <artifactId>spring-boot-starter-actuator</artifactId> 64 <artifactId>spring-boot-starter-actuator</artifactId>
65 </dependency> 65 </dependency>
66 66
67 -  
68 - <dependency>  
69 - <groupId>io.netty</groupId>  
70 - <artifactId>netty-all</artifactId>  
71 - <version>${netty.version}</version>  
72 - </dependency>  
73 -  
74 <dependency> 67 <dependency>
75 <groupId>junit</groupId> 68 <groupId>junit</groupId>
76 <artifactId>junit</artifactId> 69 <artifactId>junit</artifactId>
@@ -3,13 +3,18 @@ package com.crossoverjie.cim.server.handle; @@ -3,13 +3,18 @@ package com.crossoverjie.cim.server.handle;
3 import com.alibaba.fastjson.JSONObject; 3 import com.alibaba.fastjson.JSONObject;
4 import com.crossoverjie.cim.common.constant.Constants; 4 import com.crossoverjie.cim.common.constant.Constants;
5 import com.crossoverjie.cim.common.exception.CIMException; 5 import com.crossoverjie.cim.common.exception.CIMException;
  6 +import com.crossoverjie.cim.common.kit.HeartBeatHandler;
6 import com.crossoverjie.cim.common.pojo.CIMUserInfo; 7 import com.crossoverjie.cim.common.pojo.CIMUserInfo;
7 import com.crossoverjie.cim.common.protocol.CIMRequestProto; 8 import com.crossoverjie.cim.common.protocol.CIMRequestProto;
  9 +import com.crossoverjie.cim.common.util.NettyAttrUtil;
8 import com.crossoverjie.cim.server.config.AppConfiguration; 10 import com.crossoverjie.cim.server.config.AppConfiguration;
9 -import com.crossoverjie.cim.server.util.NettyAttrUtil; 11 +import com.crossoverjie.cim.server.kit.ServerHeartBeatHandlerImpl;
10 import com.crossoverjie.cim.server.util.SessionSocketHolder; 12 import com.crossoverjie.cim.server.util.SessionSocketHolder;
11 import com.crossoverjie.cim.server.util.SpringBeanFactory; 13 import com.crossoverjie.cim.server.util.SpringBeanFactory;
12 -import io.netty.channel.*; 14 +import io.netty.channel.ChannelFutureListener;
  15 +import io.netty.channel.ChannelHandler;
  16 +import io.netty.channel.ChannelHandlerContext;
  17 +import io.netty.channel.SimpleChannelInboundHandler;
13 import io.netty.channel.socket.nio.NioSocketChannel; 18 import io.netty.channel.socket.nio.NioSocketChannel;
14 import io.netty.handler.timeout.IdleState; 19 import io.netty.handler.timeout.IdleState;
15 import io.netty.handler.timeout.IdleStateEvent; 20 import io.netty.handler.timeout.IdleStateEvent;
@@ -55,23 +60,19 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto @@ -55,23 +60,19 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
55 if (evt instanceof IdleStateEvent) { 60 if (evt instanceof IdleStateEvent) {
56 IdleStateEvent idleStateEvent = (IdleStateEvent) evt; 61 IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
57 if (idleStateEvent.state() == IdleState.READER_IDLE) { 62 if (idleStateEvent.state() == IdleState.READER_IDLE) {
58 - AppConfiguration configuration = SpringBeanFactory.getBean(AppConfiguration.class);  
59 - long heartBeatTime = configuration.getHeartBeatTime() * 1000;  
60 -  
61 63
62 //向客户端发送消息 64 //向客户端发送消息
63 CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat", 65 CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
64 CIMRequestProto.CIMReqProtocol.class); 66 CIMRequestProto.CIMReqProtocol.class);
65 - ctx.writeAndFlush(heartBeat).addListeners(ChannelFutureListener.CLOSE_ON_FAILURE);  
66 -  
67 - Long lastReadTime = NettyAttrUtil.getReaderTime(ctx.channel());  
68 - long now = System.currentTimeMillis();  
69 - if (lastReadTime != null && now - lastReadTime > heartBeatTime){  
70 - CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());  
71 - LOGGER.warn("客户端[{}]心跳超时[{}]ms,需要关闭连接!",userInfo.getUserName(),now - lastReadTime);  
72 - userOffLine(userInfo, (NioSocketChannel) ctx.channel());  
73 - ctx.channel().close();  
74 - } 67 + ctx.writeAndFlush(heartBeat).addListeners((ChannelFutureListener) future -> {
  68 + if (!future.isSuccess()) {
  69 + LOGGER.error("IO error,close Channel");
  70 + future.channel().close();
  71 + }
  72 + }) ;
  73 +
  74 + HeartBeatHandler heartBeatHandler = SpringBeanFactory.getBean(ServerHeartBeatHandlerImpl.class) ;
  75 + heartBeatHandler.process(ctx) ;
75 } 76 }
76 } 77 }
77 super.userEventTriggered(ctx, evt); 78 super.userEventTriggered(ctx, evt);
  1 +package com.crossoverjie.cim.server.kit;
  2 +
  3 +import com.alibaba.fastjson.JSONObject;
  4 +import com.crossoverjie.cim.common.pojo.CIMUserInfo;
  5 +import com.crossoverjie.cim.server.config.AppConfiguration;
  6 +import com.crossoverjie.cim.server.util.SessionSocketHolder;
  7 +import com.crossoverjie.cim.server.util.SpringBeanFactory;
  8 +import io.netty.channel.socket.nio.NioSocketChannel;
  9 +import okhttp3.*;
  10 +import org.slf4j.Logger;
  11 +import org.slf4j.LoggerFactory;
  12 +import org.springframework.stereotype.Component;
  13 +
  14 +import java.io.IOException;
  15 +
  16 +/**
  17 + * Function:
  18 + *
  19 + * @author crossoverJie
  20 + * Date: 2019-01-20 17:20
  21 + * @since JDK 1.8
  22 + */
  23 +@Component
  24 +public class RouteHandler {
  25 + private final static Logger LOGGER = LoggerFactory.getLogger(RouteHandler.class);
  26 +
  27 +
  28 + private final MediaType mediaType = MediaType.parse("application/json");
  29 +
  30 + /**
  31 + * 用户下线
  32 + * @param userInfo
  33 + * @param channel
  34 + * @throws IOException
  35 + */
  36 + public void userOffLine(CIMUserInfo userInfo, NioSocketChannel channel) throws IOException {
  37 + LOGGER.info("用户[{}]下线", userInfo.getUserName());
  38 + SessionSocketHolder.remove(channel);
  39 + SessionSocketHolder.removeSession(userInfo.getUserId());
  40 +
  41 + //清除路由关系
  42 + clearRouteInfo(userInfo);
  43 + }
  44 +
  45 +
  46 + /**
  47 + * 清除路由关系
  48 + *
  49 + * @param userInfo
  50 + * @throws IOException
  51 + */
  52 + private void clearRouteInfo(CIMUserInfo userInfo) throws IOException {
  53 + OkHttpClient okHttpClient = SpringBeanFactory.getBean(OkHttpClient.class);
  54 + AppConfiguration configuration = SpringBeanFactory.getBean(AppConfiguration.class);
  55 + JSONObject jsonObject = new JSONObject();
  56 + jsonObject.put("userId", userInfo.getUserId());
  57 + jsonObject.put("msg", "offLine");
  58 + RequestBody requestBody = RequestBody.create(mediaType, jsonObject.toString());
  59 +
  60 + Request request = new Request.Builder()
  61 + .url(configuration.getClearRouteUrl())
  62 + .post(requestBody)
  63 + .build();
  64 +
  65 + Response response = null;
  66 + try {
  67 + response = okHttpClient.newCall(request).execute();
  68 + if (!response.isSuccessful()) {
  69 + throw new IOException("Unexpected code " + response);
  70 + }
  71 + } finally {
  72 + response.body().close();
  73 + }
  74 + }
  75 +
  76 +}
  1 +package com.crossoverjie.cim.server.kit;
  2 +
  3 +import com.crossoverjie.cim.common.kit.HeartBeatHandler;
  4 +import com.crossoverjie.cim.common.pojo.CIMUserInfo;
  5 +import com.crossoverjie.cim.common.util.NettyAttrUtil;
  6 +import com.crossoverjie.cim.server.config.AppConfiguration;
  7 +import com.crossoverjie.cim.server.util.SessionSocketHolder;
  8 +import io.netty.channel.ChannelHandlerContext;
  9 +import io.netty.channel.socket.nio.NioSocketChannel;
  10 +import org.slf4j.Logger;
  11 +import org.slf4j.LoggerFactory;
  12 +import org.springframework.beans.factory.annotation.Autowired;
  13 +import org.springframework.stereotype.Service;
  14 +
  15 +/**
  16 + * Function:
  17 + *
  18 + * @author crossoverJie
  19 + * Date: 2019-01-20 17:16
  20 + * @since JDK 1.8
  21 + */
  22 +@Service
  23 +public class ServerHeartBeatHandlerImpl implements HeartBeatHandler {
  24 +
  25 + private final static Logger LOGGER = LoggerFactory.getLogger(ServerHeartBeatHandlerImpl.class);
  26 +
  27 + @Autowired
  28 + private RouteHandler routeHandler ;
  29 +
  30 + @Autowired
  31 + private AppConfiguration appConfiguration ;
  32 +
  33 + @Override
  34 + public void process(ChannelHandlerContext ctx) throws Exception {
  35 +
  36 + long heartBeatTime = appConfiguration.getHeartBeatTime() * 1000;
  37 +
  38 + Long lastReadTime = NettyAttrUtil.getReaderTime(ctx.channel());
  39 + long now = System.currentTimeMillis();
  40 + if (lastReadTime != null && now - lastReadTime > heartBeatTime){
  41 + CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());
  42 + LOGGER.warn("客户端[{}]心跳超时[{}]ms,需要关闭连接!",userInfo.getUserName(),now - lastReadTime);
  43 + routeHandler.userOffLine(userInfo, (NioSocketChannel) ctx.channel());
  44 + ctx.channel().close();
  45 + }
  46 + }
  47 +}