作者 crossoverJie

:white_check_mark: Adding tests.

... ... @@ -183,6 +183,9 @@ public class CIMClient {
public void reconnect() throws Exception {
if (channel != null && channel.isActive()) {
return;
}
start();
}
... ...
package com.crossoverjie.cim.client.handle;
import com.crossoverjie.cim.client.thread.HeartBeatJob;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
... ... @@ -16,6 +17,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Function:
... ... @@ -66,6 +68,15 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("客户端断开了!");
if (scheduledExecutorService == null){
scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ;
}
scheduledExecutorService.scheduleAtFixedRate(new HeartBeatJob(ctx),0,10, TimeUnit.SECONDS) ;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, CIMResponseProto.CIMResProtocol msg) throws Exception {
//心跳更新时间
... ... @@ -81,12 +92,9 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
LOGGER.info(msg.getResMsg());
}
if (scheduledExecutorService == null){
scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ;
}
//scheduledExecutorService.scheduleAtFixedRate(new HeartBeatJob(ctx),60,60, TimeUnit.SECONDS) ;
}
... ...
... ... @@ -4,7 +4,6 @@ import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
import com.crossoverjie.cim.common.util.NettyAttrUtil;
import io.netty.channel.ChannelHandlerContext;
import okhttp3.MediaType;
import org.slf4j.Logger;
... ... @@ -26,10 +25,10 @@ public class ClientHeartBeatHandlerImpl implements HeartBeatHandler {
private final MediaType mediaType = MediaType.parse("application/json");
@Autowired
private AppConfiguration appConfiguration ;
private AppConfiguration appConfiguration;
@Autowired
private CIMClient cimClient ;
private CIMClient cimClient;
@Autowired
private RouteRequest routeRequest;
... ... @@ -37,13 +36,6 @@ public class ClientHeartBeatHandlerImpl implements HeartBeatHandler {
@Override
public void process(ChannelHandlerContext ctx) throws Exception {
long heartBeatTime = appConfiguration.getHeartBeatTime() * 1000;
Long lastReadTime = NettyAttrUtil.getReaderTime(ctx.channel());
long now = System.currentTimeMillis();
if (lastReadTime != null && now - lastReadTime > heartBeatTime){
LOGGER.warn("服务端心跳超时[{}]ms,[{}]需要关闭重新连接!",now - lastReadTime,appConfiguration.getUserName());
//首先清除路由信息,下线
routeRequest.offLine();
... ... @@ -51,7 +43,6 @@ public class ClientHeartBeatHandlerImpl implements HeartBeatHandler {
cimClient.reconnect();
}
}
}
... ...
... ... @@ -32,7 +32,7 @@ public class HeartBeatJob implements Runnable {
try {
heartBeatHandler.process(context);
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("Exception",e);
}
}
}
... ...
... ... @@ -26,7 +26,7 @@ public class CIMServerInitializer extends ChannelInitializer<Channel> {
ch.pipeline()
//30 秒没有向客户端发送消息就发生心跳
.addLast(new IdleStateHandler(15, 0, 0))
.addLast(new IdleStateHandler(11, 0, 0))
// google Protobuf 编解码
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(CIMRequestProto.CIMReqProtocol.getDefaultInstance()))
... ...