作者 crossoverJie

:sparkles: Introducing new features.心跳完善

@@ -24,8 +24,8 @@ public class CIMClientHandleInitializer extends ChannelInitializer<Channel> { @@ -24,8 +24,8 @@ public class CIMClientHandleInitializer extends ChannelInitializer<Channel> {
24 @Override 24 @Override
25 protected void initChannel(Channel ch) throws Exception { 25 protected void initChannel(Channel ch) throws Exception {
26 ch.pipeline() 26 ch.pipeline()
27 - //45 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中  
28 - .addLast(new IdleStateHandler(0, 45, 0)) 27 + //30 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
  28 + .addLast(new IdleStateHandler(0, 30, 0))
29 29
30 //心跳解码 30 //心跳解码
31 //.addLast(new HeartbeatEncode()) 31 //.addLast(new HeartbeatEncode())
@@ -28,6 +28,9 @@ public class AppConfiguration { @@ -28,6 +28,9 @@ public class AppConfiguration {
28 @Value("${cim.clear.route.request.url}") 28 @Value("${cim.clear.route.request.url}")
29 private String clearRouteUrl ; 29 private String clearRouteUrl ;
30 30
  31 + @Value("${cim.heartbeat.time}")
  32 + private long heartBeatTime ;
  33 +
31 public String getClearRouteUrl() { 34 public String getClearRouteUrl() {
32 return clearRouteUrl; 35 return clearRouteUrl;
33 } 36 }
@@ -67,4 +70,12 @@ public class AppConfiguration { @@ -67,4 +70,12 @@ public class AppConfiguration {
67 public void setCimServerPort(int cimServerPort) { 70 public void setCimServerPort(int cimServerPort) {
68 this.cimServerPort = cimServerPort; 71 this.cimServerPort = cimServerPort;
69 } 72 }
  73 +
  74 + public long getHeartBeatTime() {
  75 + return heartBeatTime;
  76 + }
  77 +
  78 + public void setHeartBeatTime(long heartBeatTime) {
  79 + this.heartBeatTime = heartBeatTime;
  80 + }
70 } 81 }
@@ -6,6 +6,7 @@ import com.crossoverjie.cim.common.exception.CIMException; @@ -6,6 +6,7 @@ import com.crossoverjie.cim.common.exception.CIMException;
6 import com.crossoverjie.cim.common.pojo.CIMUserInfo; 6 import com.crossoverjie.cim.common.pojo.CIMUserInfo;
7 import com.crossoverjie.cim.common.protocol.CIMRequestProto; 7 import com.crossoverjie.cim.common.protocol.CIMRequestProto;
8 import com.crossoverjie.cim.server.config.AppConfiguration; 8 import com.crossoverjie.cim.server.config.AppConfiguration;
  9 +import com.crossoverjie.cim.server.util.NettyAttrUtil;
9 import com.crossoverjie.cim.server.util.SessionSocketHolder; 10 import com.crossoverjie.cim.server.util.SessionSocketHolder;
10 import com.crossoverjie.cim.server.util.SpringBeanFactory; 11 import com.crossoverjie.cim.server.util.SpringBeanFactory;
11 import io.netty.channel.*; 12 import io.netty.channel.*;
@@ -50,26 +51,23 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto @@ -50,26 +51,23 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
50 if (evt instanceof IdleStateEvent) { 51 if (evt instanceof IdleStateEvent) {
51 IdleStateEvent idleStateEvent = (IdleStateEvent) evt; 52 IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
52 if (idleStateEvent.state() == IdleState.READER_IDLE) { 53 if (idleStateEvent.state() == IdleState.READER_IDLE) {
  54 + AppConfiguration configuration = SpringBeanFactory.getBean(AppConfiguration.class);
  55 + long heartBeatTime = configuration.getHeartBeatTime() * 1000;
  56 +
53 57
54 //向客户端发送消息 58 //向客户端发送消息
55 CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat", 59 CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
56 CIMRequestProto.CIMReqProtocol.class); 60 CIMRequestProto.CIMReqProtocol.class);
57 - ctx.writeAndFlush(heartBeat).addListeners(new ChannelFutureListener() {  
58 - @Override  
59 - public void operationComplete(ChannelFuture future) throws Exception {  
60 - //下线客户端  
61 - CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) future.channel());  
62 - if (!future.isSuccess()) { 61 + ctx.writeAndFlush(heartBeat).addListeners(ChannelFutureListener.CLOSE_ON_FAILURE);
63 62
64 - LOGGER.info("向客户端[{}]下发心跳失败",userInfo.getUserName());  
65 -  
66 - userOffLine(userInfo, (NioSocketChannel) future.channel());  
67 - future.channel().close();  
68 - }else {  
69 - LOGGER.info("向客户端[{}]下发心跳成功",userInfo.getUserName());  
70 - } 63 + Long lastReadTime = NettyAttrUtil.getReaderTime(ctx.channel());
  64 + long now = System.currentTimeMillis();
  65 + if (lastReadTime != null && now - lastReadTime > heartBeatTime){
  66 + CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());
  67 + LOGGER.warn("客户端[{}]心跳超时,需要关闭连接",userInfo.getUserName());
  68 + userOffLine(userInfo, (NioSocketChannel) ctx.channel());
  69 + ctx.channel().close();
71 } 70 }
72 - });  
73 } 71 }
74 } 72 }
75 super.userEventTriggered(ctx, evt); 73 super.userEventTriggered(ctx, evt);
@@ -132,6 +130,11 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto @@ -132,6 +130,11 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
132 LOGGER.info("客户端[{}]上线成功", msg.getReqMsg()); 130 LOGGER.info("客户端[{}]上线成功", msg.getReqMsg());
133 } 131 }
134 132
  133 + //心跳更新时间
  134 + if (msg.getType() == Constants.CommandType.PING){
  135 + NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis());
  136 + }
  137 +
135 } 138 }
136 139
137 140
@@ -25,7 +25,7 @@ public class CIMServerInitializer extends ChannelInitializer<Channel> { @@ -25,7 +25,7 @@ public class CIMServerInitializer extends ChannelInitializer<Channel> {
25 protected void initChannel(Channel ch) throws Exception { 25 protected void initChannel(Channel ch) throws Exception {
26 26
27 ch.pipeline() 27 ch.pipeline()
28 - //45 秒没有向客户端发送消息就发生心跳 28 + //30 秒没有向客户端发送消息就发生心跳
29 .addLast(new IdleStateHandler(30, 0, 0)) 29 .addLast(new IdleStateHandler(30, 0, 0))
30 // google Protobuf 编解码 30 // google Protobuf 编解码
31 .addLast(new ProtobufVarint32FrameDecoder()) 31 .addLast(new ProtobufVarint32FrameDecoder())
  1 +package com.crossoverjie.cim.server.util;
  2 +
  3 +import io.netty.channel.Channel;
  4 +import io.netty.util.Attribute;
  5 +import io.netty.util.AttributeKey;
  6 +
  7 +/**
  8 + * Function:
  9 + *
  10 + * @author crossoverJie
  11 + * Date: 2019/1/9 00:57
  12 + * @since JDK 1.8
  13 + */
  14 +public class NettyAttrUtil {
  15 +
  16 + private static final AttributeKey<String> ATTR_KEY_READER_TIME = AttributeKey.valueOf("readerTime");
  17 +
  18 +
  19 + public static void updateReaderTime(Channel channel,Long time){
  20 + channel.attr(ATTR_KEY_READER_TIME).set(time.toString());
  21 + }
  22 +
  23 + public static Long getReaderTime(Channel channel){
  24 + String value = getAttribute(channel, ATTR_KEY_READER_TIME);
  25 + return Long.valueOf(value) ;
  26 + }
  27 +
  28 +
  29 + private static String getAttribute(Channel channel, AttributeKey<String> key) {
  30 + Attribute<String> attr = channel.attr(key);
  31 + return attr.get();
  32 + }
  33 +}
@@ -32,3 +32,6 @@ app.zk.root=/route @@ -32,3 +32,6 @@ app.zk.root=/route
32 32
33 # 清除路由信息 33 # 清除路由信息
34 cim.clear.route.request.url=http://localhost:8083/offLine 34 cim.clear.route.request.url=http://localhost:8083/offLine
  35 +
  36 +# 检测多少秒没有收到客户端心跳后服务端关闭连接
  37 +cim.heartbeat.time = 40
  1 +package com.crossoverjie.cim.server.util;
  2 +
  3 +
  4 +import org.junit.Test;
  5 +
  6 +import java.util.concurrent.TimeUnit;
  7 +
  8 +public class NettyAttrUtilTest {
  9 +
  10 + @Test
  11 + public void test() throws InterruptedException {
  12 + long heartbeat = 2 * 1000 ;
  13 +
  14 + long now = System.currentTimeMillis();
  15 + TimeUnit.SECONDS.sleep(1);
  16 +
  17 + long end = System.currentTimeMillis();
  18 +
  19 + if ((end - now) > heartbeat){
  20 + System.out.println("超时");
  21 + }else {
  22 + System.out.println("没有超时");
  23 + }
  24 + }
  25 +
  26 +}