作者 crossoverJie

:sparkles: Introducing new features.服务端心跳检测

@@ -24,8 +24,8 @@ public class CIMClientHandleInitializer extends ChannelInitializer<Channel> { @@ -24,8 +24,8 @@ public class CIMClientHandleInitializer extends ChannelInitializer<Channel> {
24 @Override 24 @Override
25 protected void initChannel(Channel ch) throws Exception { 25 protected void initChannel(Channel ch) throws Exception {
26 ch.pipeline() 26 ch.pipeline()
27 - //60 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中  
28 - .addLast(new IdleStateHandler(0, 60, 0)) 27 + //45 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
  28 + .addLast(new IdleStateHandler(0, 45, 0))
29 29
30 //心跳解码 30 //心跳解码
31 //.addLast(new HeartbeatEncode()) 31 //.addLast(new HeartbeatEncode())
1 package com.crossoverjie.cim.server.config; 1 package com.crossoverjie.cim.server.config;
2 2
  3 +import com.crossoverjie.cim.common.constant.Constants;
  4 +import com.crossoverjie.cim.common.protocol.CIMRequestProto;
3 import okhttp3.OkHttpClient; 5 import okhttp3.OkHttpClient;
4 import org.I0Itec.zkclient.ZkClient; 6 import org.I0Itec.zkclient.ZkClient;
5 import org.springframework.beans.factory.annotation.Autowired; 7 import org.springframework.beans.factory.annotation.Autowired;
@@ -39,4 +41,19 @@ public class BeanConfig { @@ -39,4 +41,19 @@ public class BeanConfig {
39 .retryOnConnectionFailure(true); 41 .retryOnConnectionFailure(true);
40 return builder.build(); 42 return builder.build();
41 } 43 }
  44 +
  45 +
  46 + /**
  47 + * 创建心跳单例
  48 + * @return
  49 + */
  50 + @Bean(value = "heartBeat")
  51 + public CIMRequestProto.CIMReqProtocol heartBeat() {
  52 + CIMRequestProto.CIMReqProtocol heart = CIMRequestProto.CIMReqProtocol.newBuilder()
  53 + .setRequestId(0L)
  54 + .setReqMsg("ping")
  55 + .setType(Constants.CommandType.PING)
  56 + .build();
  57 + return heart;
  58 + }
42 } 59 }
@@ -8,10 +8,10 @@ import com.crossoverjie.cim.common.protocol.CIMRequestProto; @@ -8,10 +8,10 @@ import com.crossoverjie.cim.common.protocol.CIMRequestProto;
8 import com.crossoverjie.cim.server.config.AppConfiguration; 8 import com.crossoverjie.cim.server.config.AppConfiguration;
9 import com.crossoverjie.cim.server.util.SessionSocketHolder; 9 import com.crossoverjie.cim.server.util.SessionSocketHolder;
10 import com.crossoverjie.cim.server.util.SpringBeanFactory; 10 import com.crossoverjie.cim.server.util.SpringBeanFactory;
11 -import io.netty.channel.ChannelHandler;  
12 -import io.netty.channel.ChannelHandlerContext;  
13 -import io.netty.channel.SimpleChannelInboundHandler; 11 +import io.netty.channel.*;
14 import io.netty.channel.socket.nio.NioSocketChannel; 12 import io.netty.channel.socket.nio.NioSocketChannel;
  13 +import io.netty.handler.timeout.IdleState;
  14 +import io.netty.handler.timeout.IdleStateEvent;
15 import okhttp3.*; 15 import okhttp3.*;
16 import org.slf4j.Logger; 16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory; 17 import org.slf4j.LoggerFactory;
@@ -31,16 +31,52 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto @@ -31,16 +31,52 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
31 private final static Logger LOGGER = LoggerFactory.getLogger(CIMServerHandle.class); 31 private final static Logger LOGGER = LoggerFactory.getLogger(CIMServerHandle.class);
32 32
33 private final MediaType mediaType = MediaType.parse("application/json"); 33 private final MediaType mediaType = MediaType.parse("application/json");
  34 +
34 /** 35 /**
35 * 取消绑定 36 * 取消绑定
  37 + *
36 * @param ctx 38 * @param ctx
37 * @throws Exception 39 * @throws Exception
38 */ 40 */
39 @Override 41 @Override
40 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 42 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
41 CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel()); 43 CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());
42 - LOGGER.info("用户[{}]下线",userInfo.getUserName());  
43 - SessionSocketHolder.remove((NioSocketChannel) ctx.channel()); 44 + userOffLine(userInfo, (NioSocketChannel) ctx.channel());
  45 + }
  46 +
  47 + @Override
  48 + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  49 + if (evt instanceof IdleStateEvent) {
  50 + IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
  51 + if (idleStateEvent.state() == IdleState.READER_IDLE) {
  52 +
  53 + //向客户端发送消息
  54 + CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
  55 + CIMRequestProto.CIMReqProtocol.class);
  56 + ctx.writeAndFlush(heartBeat).addListeners(new ChannelFutureListener() {
  57 + @Override
  58 + public void operationComplete(ChannelFuture future) throws Exception {
  59 + if (!future.isSuccess()) {
  60 + //下线客户端
  61 + CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) future.channel());
  62 + userOffLine(userInfo, (NioSocketChannel) future.channel());
  63 + }
  64 + }
  65 + });
  66 + }
  67 + }
  68 + super.userEventTriggered(ctx, evt);
  69 + }
  70 +
  71 + /**
  72 + * 用户下线
  73 + * @param userInfo
  74 + * @param channel
  75 + * @throws IOException
  76 + */
  77 + private void userOffLine(CIMUserInfo userInfo, NioSocketChannel channel) throws IOException {
  78 + LOGGER.info("用户[{}]下线", userInfo.getUserName());
  79 + SessionSocketHolder.remove(channel);
44 SessionSocketHolder.removeSession(userInfo.getUserId()); 80 SessionSocketHolder.removeSession(userInfo.getUserId());
45 81
46 //清除路由关系 82 //清除路由关系
@@ -49,6 +85,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto @@ -49,6 +85,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
49 85
50 /** 86 /**
51 * 清除路由关系 87 * 清除路由关系
  88 + *
52 * @param userInfo 89 * @param userInfo
53 * @throws IOException 90 * @throws IOException
54 */ 91 */
@@ -71,7 +108,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto @@ -71,7 +108,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
71 if (!response.isSuccessful()) { 108 if (!response.isSuccessful()) {
72 throw new IOException("Unexpected code " + response); 109 throw new IOException("Unexpected code " + response);
73 } 110 }
74 - }finally { 111 + } finally {
75 response.body().close(); 112 response.body().close();
76 } 113 }
77 } 114 }
@@ -81,20 +118,19 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto @@ -81,20 +118,19 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
81 protected void channelRead0(ChannelHandlerContext ctx, CIMRequestProto.CIMReqProtocol msg) throws Exception { 118 protected void channelRead0(ChannelHandlerContext ctx, CIMRequestProto.CIMReqProtocol msg) throws Exception {
82 LOGGER.info("收到msg={}", msg.toString()); 119 LOGGER.info("收到msg={}", msg.toString());
83 120
84 - if (msg.getType() == Constants.CommandType.LOGIN){ 121 + if (msg.getType() == Constants.CommandType.LOGIN) {
85 //保存客户端与 Channel 之间的关系 122 //保存客户端与 Channel 之间的关系
86 - SessionSocketHolder.put(msg.getRequestId(),(NioSocketChannel)ctx.channel()) ;  
87 - SessionSocketHolder.saveSession(msg.getRequestId(),msg.getReqMsg());  
88 - LOGGER.info("客户端[{}]上线成功",msg.getReqMsg()); 123 + SessionSocketHolder.put(msg.getRequestId(), (NioSocketChannel) ctx.channel());
  124 + SessionSocketHolder.saveSession(msg.getRequestId(), msg.getReqMsg());
  125 + LOGGER.info("客户端[{}]上线成功", msg.getReqMsg());
89 } 126 }
90 127
91 } 128 }
92 129
93 130
94 -  
95 @Override 131 @Override
96 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 132 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
97 - if (CIMException.isResetByPeer(cause.getMessage())){ 133 + if (CIMException.isResetByPeer(cause.getMessage())) {
98 return; 134 return;
99 } 135 }
100 136
@@ -8,6 +8,7 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder; @@ -8,6 +8,7 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
8 import io.netty.handler.codec.protobuf.ProtobufEncoder; 8 import io.netty.handler.codec.protobuf.ProtobufEncoder;
9 import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 9 import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
10 import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; 10 import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
  11 +import io.netty.handler.timeout.IdleStateHandler;
11 12
12 /** 13 /**
13 * Function: 14 * Function:
@@ -24,6 +25,8 @@ public class CIMServerInitializer extends ChannelInitializer<Channel> { @@ -24,6 +25,8 @@ public class CIMServerInitializer extends ChannelInitializer<Channel> {
24 protected void initChannel(Channel ch) throws Exception { 25 protected void initChannel(Channel ch) throws Exception {
25 26
26 ch.pipeline() 27 ch.pipeline()
  28 + //45 秒没有向客户端发送消息就发生心跳
  29 + .addLast(new IdleStateHandler(45, 0, 0))
27 // google Protobuf 编解码 30 // google Protobuf 编解码
28 .addLast(new ProtobufVarint32FrameDecoder()) 31 .addLast(new ProtobufVarint32FrameDecoder())
29 .addLast(new ProtobufDecoder(CIMRequestProto.CIMReqProtocol.getDefaultInstance())) 32 .addLast(new ProtobufDecoder(CIMRequestProto.CIMReqProtocol.getDefaultInstance()))