作者 crossoverJie
提交者 GitHub

Merge pull request #88 from crossoverJie/fix-issue-70

Fix issue 70
... ... @@ -4,8 +4,10 @@ import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.init.CIMClientHandleInitializer;
import com.crossoverjie.cim.client.service.EchoService;
import com.crossoverjie.cim.client.service.MsgHandle;
import com.crossoverjie.cim.client.service.ReConnectManager;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.service.impl.ClientInfo;
import com.crossoverjie.cim.client.thread.ContextHolder;
import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
... ... @@ -66,6 +68,9 @@ public class CIMClient {
@Autowired
private ClientInfo clientInfo;
@Autowired
private ReConnectManager reConnectManager ;
/**
* 重试次数
*/
... ... @@ -90,7 +95,7 @@ public class CIMClient {
* 启动客户端
*
* @param cimServer
* @throws InterruptedException
* @throws Exception
*/
private void startClient(CIMServerResVO.ServerInfo cimServer) {
Bootstrap bootstrap = new Bootstrap();
... ... @@ -102,14 +107,14 @@ public class CIMClient {
ChannelFuture future = null;
try {
future = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
} catch (InterruptedException e) {
} catch (Exception e) {
errorCount++;
if (errorCount >= configuration.getErrorCount()) {
LOGGER.error("连接失败次数达到上限[{}]次", errorCount);
msgHandle.shutdown();
}
LOGGER.error("连接失败", e);
LOGGER.error("Connect fail!", e);
}
if (future.isSuccess()) {
echoService.echo("Start cim client success!");
... ... @@ -139,7 +144,7 @@ public class CIMClient {
errorCount++;
if (errorCount >= configuration.getErrorCount()) {
LOGGER.error("重连次数达到上限[{}]次", errorCount);
echoService.echo("The maximum number of reconnections has been reached[{}]times, close cim client!", errorCount);
msgHandle.shutdown();
}
LOGGER.error("login fail", e);
... ... @@ -197,6 +202,13 @@ public class CIMClient {
}
/**
* 1. clear route information.
* 2. reconnect.
* 3. shutdown reconnect job.
* 4. reset reconnect state.
* @throws Exception
*/
public void reconnect() throws Exception {
if (channel != null && channel.isActive()) {
return;
... ... @@ -204,9 +216,11 @@ public class CIMClient {
//首先清除路由信息,下线
routeRequest.offLine();
LOGGER.info("reconnect....");
echoService.echo("cim server shutdown, reconnecting....");
start();
LOGGER.info("reconnect success");
echoService.echo("Great! reConnect success!!!");
reConnectManager.reConnectSuccess();
ContextHolder.clear();
}
/**
... ...
... ... @@ -87,7 +87,7 @@ public class BeanConfig {
@Bean("scheduledTask")
public ScheduledExecutorService buildSchedule(){
ThreadFactory sche = new ThreadFactoryBuilder()
.setNameFormat("scheduled-%d")
.setNameFormat("reConnect-job-%d")
.setDaemon(true)
.build();
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,sche) ;
... ...
package com.crossoverjie.cim.client.handle;
import com.crossoverjie.cim.client.service.EchoService;
import com.crossoverjie.cim.client.service.ReConnectManager;
import com.crossoverjie.cim.client.service.ShutDownMsg;
import com.crossoverjie.cim.client.service.impl.EchoServiceImpl;
import com.crossoverjie.cim.client.thread.ReConnectJob;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
... ... @@ -21,7 +21,6 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Function:
... ... @@ -41,6 +40,8 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
private ScheduledExecutorService scheduledExecutorService ;
private ReConnectManager reConnectManager ;
private ShutDownMsg shutDownMsg ;
private EchoService echoService ;
... ... @@ -52,8 +53,6 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
if (evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
//LOGGER.info("定时检测服务端是否存活");
if (idleStateEvent.state() == IdleState.WRITER_IDLE){
CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
CIMRequestProto.CIMReqProtocol.class);
... ... @@ -90,10 +89,11 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
if (scheduledExecutorService == null){
scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ;
reConnectManager = SpringBeanFactory.getBean(ReConnectManager.class) ;
}
LOGGER.info("客户端断开了,重新连接!");
// TODO: 2019-01-22 后期可以改为不用定时任务,连上后就关闭任务 节省性能。
scheduledExecutorService.scheduleAtFixedRate(new ReConnectJob(ctx),0,10, TimeUnit.SECONDS) ;
reConnectManager.reConnect(ctx);
}
@Override
... ...
package com.crossoverjie.cim.client.service;
import com.crossoverjie.cim.client.thread.ReConnectJob;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.stereotype.Component;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* Function:
*
* @author crossoverJie
* Date: 2020-04-15 00:26
* @since JDK 1.8
*/
@Component
public final class ReConnectManager {
private ScheduledExecutorService scheduledExecutorService;
/**
* Trigger reconnect job
* @param ctx
*/
public void reConnect(ChannelHandlerContext ctx) {
buildExecutor() ;
scheduledExecutorService.scheduleAtFixedRate(new ReConnectJob(ctx),0,10, TimeUnit.SECONDS) ;
}
/**
* Close reconnect job if reconnect success.
*/
public void reConnectSuccess(){
scheduledExecutorService.shutdown();
}
/***
* build an thread executor
* @return
*/
private ScheduledExecutorService buildExecutor() {
if (scheduledExecutorService == null || scheduledExecutorService.isShutdown()) {
ThreadFactory sche = new ThreadFactoryBuilder()
.setNameFormat("reConnect-job-%d")
.setDaemon(true)
.build();
scheduledExecutorService = new ScheduledThreadPoolExecutor(1, sche);
return scheduledExecutorService;
} else {
return scheduledExecutorService;
}
}
}
... ...
package com.crossoverjie.cim.client.service.impl;
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.client.thread.ContextHolder;
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
... ... @@ -28,6 +29,7 @@ public class ClientHeartBeatHandlerImpl implements HeartBeatHandler {
public void process(ChannelHandlerContext ctx) throws Exception {
//重连
ContextHolder.setReconnect(true);
cimClient.reconnect();
}
... ...
... ... @@ -5,12 +5,14 @@ import com.alibaba.fastjson.JSONObject;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.service.EchoService;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.thread.ContextHolder;
import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
import com.crossoverjie.cim.client.vo.req.P2PReqVO;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
import com.crossoverjie.cim.client.vo.res.OnlineUsersResVO;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.res.BaseResponse;
import okhttp3.*;
import org.slf4j.Logger;
... ... @@ -140,6 +142,13 @@ public class RouteRequestImpl implements RouteRequest {
//重复失败
if (!cimServerResVO.getCode().equals(StatusEnum.SUCCESS.getCode())){
echoService.echo(cimServerResVO.getMessage());
// when client in reConnect state, could not exit.
if (ContextHolder.getReconnect()){
echoService.echo("###{}###", StatusEnum.RECONNECT_FAIL.getMessage());
throw new CIMException(StatusEnum.RECONNECT_FAIL);
}
System.exit(-1);
}
... ...
... ... @@ -37,7 +37,7 @@ public class ShutDownCommand implements InnerCommand {
private MsgLogger msgLogger;
@Resource(name = "callBackThreadPool")
private ThreadPoolExecutor executor;
private ThreadPoolExecutor callBackExecutor;
@Autowired
private EchoService echoService ;
... ... @@ -55,10 +55,10 @@ public class ShutDownCommand implements InnerCommand {
shutDownMsg.shutdown();
routeRequest.offLine();
msgLogger.stop();
executor.shutdown();
callBackExecutor.shutdown();
ringBufferWheel.stop(false);
try {
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
while (!callBackExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
echoService.echo("thread pool closing");
}
cimClient.close();
... ...
package com.crossoverjie.cim.client.thread;
/**
* Function: Something about of client runtime sign.
*
* @author crossoverJie
* Date: 2020-04-13 02:10
* @since JDK 1.8
*/
public class ContextHolder {
private static final ThreadLocal<Boolean> IS_RECONNECT = new ThreadLocal<>() ;
public static void setReconnect(boolean reconnect){
IS_RECONNECT.set(reconnect);
}
public static Boolean getReconnect(){
return IS_RECONNECT.get() ;
}
public static void clear(){
IS_RECONNECT.remove();
}
}
... ...
... ... @@ -27,7 +27,9 @@ public enum StatusEnum {
/** 账号不在线 */
OFF_LINE("7000", "你选择的账号不在线,请重新选择!"),
SERVER_NOT_AVAILABLE("7100", "CIM server is not available, please try again later!"),
SERVER_NOT_AVAILABLE("7100", "cim server is not available, please try again later!"),
RECONNECT_FAIL("7200", "reconnect fail, continue to retry!"),
/** 登录信息不匹配 */
ACCOUNT_NOT_MATCH("9100", "The User information you have used is incorrect!"),
... ...
... ... @@ -2,6 +2,8 @@ package com.crossoverjie.cim.route.cache;
import com.crossoverjie.cim.route.kit.ZKit;
import com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
... ... @@ -20,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
@Component
public class ServerCache {
private static Logger logger = LoggerFactory.getLogger(ServerCache.class) ;
@Autowired
private LoadingCache<String, String> cache;
... ... @@ -43,7 +46,13 @@ public class ServerCache {
public void updateCache(List<String> currentChilds) {
cache.invalidateAll();
for (String currentChild : currentChilds) {
String key = currentChild.split("-")[1];
// currentChild=ip-127.0.0.1:11212:9082 or 127.0.0.1:11212:9082
String key ;
if (currentChild.split("-").length == 2){
key = currentChild.split("-")[1];
}else {
key = currentChild ;
}
addCache(key);
}
}
... ... @@ -54,7 +63,7 @@ public class ServerCache {
*
* @return
*/
public List<String> getAll() {
public List<String> getServerList() {
List<String> list = new ArrayList<>();
... ... @@ -72,4 +81,11 @@ public class ServerCache {
}
/**
* rebuild cache list
*/
public void rebuildCacheList(){
updateCache(getServerList()) ;
}
}
... ...
... ... @@ -153,7 +153,7 @@ public class RouteController {
BaseResponse<CIMServerResVO> res = new BaseResponse();
// check server available
String server = routeHandle.routeServer(serverCache.getAll(),String.valueOf(loginReqVO.getUserId()));
String server = routeHandle.routeServer(serverCache.getServerList(),String.valueOf(loginReqVO.getUserId()));
RouteInfo routeInfo = RouteInfoParseUtil.parse(server);
commonBizService.checkServerAvailable(routeInfo);
... ...
... ... @@ -3,7 +3,11 @@ package com.crossoverjie.cim.route.service;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.pojo.RouteInfo;
import com.crossoverjie.cim.route.cache.ServerCache;
import com.crossoverjie.cim.route.kit.NetAddressIsReachable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
... ... @@ -15,6 +19,11 @@ import org.springframework.stereotype.Component;
*/
@Component
public class CommonBizService {
private static Logger logger = LoggerFactory.getLogger(CommonBizService.class) ;
@Autowired
private ServerCache serverCache ;
/**
* check ip and port
... ... @@ -23,6 +32,11 @@ public class CommonBizService {
public void checkServerAvailable(RouteInfo routeInfo){
boolean reachable = NetAddressIsReachable.checkAddressReachable(routeInfo.getIp(), routeInfo.getCimServerPort(), 1000);
if (!reachable) {
logger.error("ip={}, port={} are not available", routeInfo.getIp(), routeInfo.getCimServerPort());
// rebuild cache
serverCache.rebuildCacheList();
throw new CIMException(StatusEnum.SERVER_NOT_AVAILABLE) ;
}
... ...
... ... @@ -22,19 +22,19 @@ app.zk.connect.timeout=15000
app.zk.root=/route
#路由策略,轮询
#app.route.way=com.crossoverjie.cim.common.route.algorithm.loop.LoopHandle
app.route.way=com.crossoverjie.cim.common.route.algorithm.loop.LoopHandle
#路由策略,随机
#app.route.way=com.crossoverjie.cim.common.route.algorithm.random.RandomHandle
#路由策略,一致性 hash
app.route.way=com.crossoverjie.cim.common.route.algorithm.consistenthash.ConsistentHashHandle
#app.route.way=com.crossoverjie.cim.common.route.algorithm.consistenthash.ConsistentHashHandle
#一致性 hash 算法具体实现--自定义有序 map
#app.route.way.consitenthash=com.crossoverjie.cim.common.route.algorithm.consistenthash.SortArrayMapConsistentHash
#一致性 hash 算法具体实现--TreeMap
app.route.way.consitenthash=com.crossoverjie.cim.common.route.algorithm.consistenthash.TreeMapConsistentHash
#app.route.way.consitenthash=com.crossoverjie.cim.common.route.algorithm.consistenthash.TreeMapConsistentHash
# Redis 配置
spring.redis.host=xx
... ...