CIMClientHandle.java
4.0 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package com.crossoverjie.cim.client.handle;
import com.crossoverjie.cim.client.thread.ReConnectJob;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.crossoverjie.cim.common.protocol.CIMResponseProto;
import com.crossoverjie.cim.common.util.NettyAttrUtil;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Function:
*
* @author crossoverJie
* Date: 16/02/2018 18:09
* @since JDK 1.8
*/
@ChannelHandler.Sharable
public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProto.CIMResProtocol> {
private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientHandle.class);
private MsgHandleCaller caller ;
private ThreadPoolExecutor threadPoolExecutor ;
private ScheduledExecutorService scheduledExecutorService ;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
//LOGGER.info("定时检测服务端是否存活");
if (idleStateEvent.state() == IdleState.WRITER_IDLE){
CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
CIMRequestProto.CIMReqProtocol.class);
ctx.writeAndFlush(heartBeat).addListeners((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
LOGGER.error("IO error,close Channel");
future.channel().close();
}
}) ;
}
}
super.userEventTriggered(ctx, evt);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//客户端和服务端建立连接时调用
LOGGER.info("cim server connect success!");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("客户端断开了,重新连接!");
if (scheduledExecutorService == null){
scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ;
}
// TODO: 2019-01-22 后期可以改为不用定时任务,连上后就关闭任务 节省性能。
scheduledExecutorService.scheduleAtFixedRate(new ReConnectJob(ctx),0,10, TimeUnit.SECONDS) ;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, CIMResponseProto.CIMResProtocol msg) throws Exception {
//心跳更新时间
if (msg.getType() == Constants.CommandType.PING){
//LOGGER.info("收到服务端心跳!!!");
NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis());
}
if (msg.getType() != Constants.CommandType.PING) {
//回调消息
callBackMsg(msg.getResMsg());
LOGGER.info(msg.getResMsg());
}
}
/**
* 回调消息
* @param msg
*/
private void callBackMsg(String msg) {
threadPoolExecutor = SpringBeanFactory.getBean("callBackThreadPool",ThreadPoolExecutor.class) ;
threadPoolExecutor.execute(() -> {
caller = SpringBeanFactory.getBean(MsgHandleCaller.class) ;
caller.getMsgHandleListener().handle(msg);
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//异常时断开连接
cause.printStackTrace() ;
ctx.close() ;
}
}