作者 crossoverJie

reconnect optimize.

... ... @@ -6,6 +6,7 @@ import com.crossoverjie.cim.client.service.EchoService;
import com.crossoverjie.cim.client.service.MsgHandle;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.service.impl.ClientInfo;
import com.crossoverjie.cim.client.thread.ContextHolder;
import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
... ... @@ -90,7 +91,7 @@ public class CIMClient {
* 启动客户端
*
* @param cimServer
* @throws InterruptedException
* @throws Exception
*/
private void startClient(CIMServerResVO.ServerInfo cimServer) {
Bootstrap bootstrap = new Bootstrap();
... ... @@ -102,7 +103,7 @@ public class CIMClient {
ChannelFuture future = null;
try {
future = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
} catch (InterruptedException e) {
} catch (Exception e) {
errorCount++;
if (errorCount >= configuration.getErrorCount()) {
... ... @@ -204,9 +205,10 @@ public class CIMClient {
//首先清除路由信息,下线
routeRequest.offLine();
LOGGER.info("reconnect....");
echoService.echo("cim server shutdown, reconnecting....");
start();
LOGGER.info("reconnect success");
echoService.echo("Great! reconnect success!!!");
ContextHolder.clear();
}
/**
... ...
... ... @@ -87,7 +87,7 @@ public class BeanConfig {
@Bean("scheduledTask")
public ScheduledExecutorService buildSchedule(){
ThreadFactory sche = new ThreadFactoryBuilder()
.setNameFormat("scheduled-%d")
.setNameFormat("reconnect-job-%d")
.setDaemon(true)
.build();
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,sche) ;
... ...
package com.crossoverjie.cim.client.service.impl;
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.client.thread.ContextHolder;
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
... ... @@ -28,6 +29,7 @@ public class ClientHeartBeatHandlerImpl implements HeartBeatHandler {
public void process(ChannelHandlerContext ctx) throws Exception {
//重连
ContextHolder.setReconnect(true);
cimClient.reconnect();
}
... ...
... ... @@ -5,12 +5,14 @@ import com.alibaba.fastjson.JSONObject;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.service.EchoService;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.thread.ContextHolder;
import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
import com.crossoverjie.cim.client.vo.req.P2PReqVO;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
import com.crossoverjie.cim.client.vo.res.OnlineUsersResVO;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.res.BaseResponse;
import okhttp3.*;
import org.slf4j.Logger;
... ... @@ -140,6 +142,13 @@ public class RouteRequestImpl implements RouteRequest {
//重复失败
if (!cimServerResVO.getCode().equals(StatusEnum.SUCCESS.getCode())){
echoService.echo(cimServerResVO.getMessage());
// when client in reconnect state, could not exit.
if (ContextHolder.getReconnect()){
echoService.echo("###{}###", StatusEnum.RECONNECT_FAIL.getMessage());
throw new CIMException(StatusEnum.RECONNECT_FAIL);
}
System.exit(-1);
}
... ...
package com.crossoverjie.cim.client.thread;
/**
* Function: Something about of client runtime sign.
*
* @author crossoverJie
* Date: 2020-04-13 02:10
* @since JDK 1.8
*/
public class ContextHolder {
private static final ThreadLocal<Boolean> IS_RECONNECT = new ThreadLocal<>() ;
public static void setReconnect(boolean reconnect){
IS_RECONNECT.set(reconnect);
}
public static Boolean getReconnect(){
return IS_RECONNECT.get() ;
}
public static void clear(){
IS_RECONNECT.remove();
}
}
... ...
... ... @@ -27,7 +27,9 @@ public enum StatusEnum {
/** 账号不在线 */
OFF_LINE("7000", "你选择的账号不在线,请重新选择!"),
SERVER_NOT_AVAILABLE("7100", "CIM server is not available, please try again later!"),
SERVER_NOT_AVAILABLE("7100", "cim server is not available, please try again later!"),
RECONNECT_FAIL("7200", "reconnect fail, continue to retry!"),
/** 登录信息不匹配 */
ACCOUNT_NOT_MATCH("9100", "The User information you have used is incorrect!"),
... ...
... ... @@ -2,6 +2,8 @@ package com.crossoverjie.cim.route.cache;
import com.crossoverjie.cim.route.kit.ZKit;
import com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
... ... @@ -20,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
@Component
public class ServerCache {
private static Logger logger = LoggerFactory.getLogger(ServerCache.class) ;
@Autowired
private LoadingCache<String, String> cache;
... ... @@ -43,7 +46,13 @@ public class ServerCache {
public void updateCache(List<String> currentChilds) {
cache.invalidateAll();
for (String currentChild : currentChilds) {
String key = currentChild.split("-")[1];
// currentChild=ip-127.0.0.1:11212:9082 or 127.0.0.1:11212:9082
String key ;
if (currentChild.split("-").length == 2){
key = currentChild.split("-")[1];
}else {
key = currentChild ;
}
addCache(key);
}
}
... ... @@ -54,7 +63,7 @@ public class ServerCache {
*
* @return
*/
public List<String> getAll() {
public List<String> getServerList() {
List<String> list = new ArrayList<>();
... ... @@ -72,4 +81,11 @@ public class ServerCache {
}
/**
* rebuild cache list
*/
public void rebuildCacheList(){
updateCache(getServerList()) ;
}
}
... ...
... ... @@ -153,7 +153,7 @@ public class RouteController {
BaseResponse<CIMServerResVO> res = new BaseResponse();
// check server available
String server = routeHandle.routeServer(serverCache.getAll(),String.valueOf(loginReqVO.getUserId()));
String server = routeHandle.routeServer(serverCache.getServerList(),String.valueOf(loginReqVO.getUserId()));
RouteInfo routeInfo = RouteInfoParseUtil.parse(server);
commonBizService.checkServerAvailable(routeInfo);
... ...
... ... @@ -3,7 +3,11 @@ package com.crossoverjie.cim.route.service;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.pojo.RouteInfo;
import com.crossoverjie.cim.route.cache.ServerCache;
import com.crossoverjie.cim.route.kit.NetAddressIsReachable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
... ... @@ -15,6 +19,11 @@ import org.springframework.stereotype.Component;
*/
@Component
public class CommonBizService {
private static Logger logger = LoggerFactory.getLogger(CommonBizService.class) ;
@Autowired
private ServerCache serverCache ;
/**
* check ip and port
... ... @@ -23,6 +32,11 @@ public class CommonBizService {
public void checkServerAvailable(RouteInfo routeInfo){
boolean reachable = NetAddressIsReachable.checkAddressReachable(routeInfo.getIp(), routeInfo.getCimServerPort(), 1000);
if (!reachable) {
logger.error("ip={}, port={} are not available", routeInfo.getIp(), routeInfo.getCimServerPort());
// rebuild cache
serverCache.rebuildCacheList();
throw new CIMException(StatusEnum.SERVER_NOT_AVAILABLE) ;
}
... ...
... ... @@ -22,19 +22,19 @@ app.zk.connect.timeout=15000
app.zk.root=/route
#路由策略,轮询
#app.route.way=com.crossoverjie.cim.common.route.algorithm.loop.LoopHandle
app.route.way=com.crossoverjie.cim.common.route.algorithm.loop.LoopHandle
#路由策略,随机
#app.route.way=com.crossoverjie.cim.common.route.algorithm.random.RandomHandle
#路由策略,一致性 hash
app.route.way=com.crossoverjie.cim.common.route.algorithm.consistenthash.ConsistentHashHandle
#app.route.way=com.crossoverjie.cim.common.route.algorithm.consistenthash.ConsistentHashHandle
#一致性 hash 算法具体实现--自定义有序 map
#app.route.way.consitenthash=com.crossoverjie.cim.common.route.algorithm.consistenthash.SortArrayMapConsistentHash
#一致性 hash 算法具体实现--TreeMap
app.route.way.consitenthash=com.crossoverjie.cim.common.route.algorithm.consistenthash.TreeMapConsistentHash
#app.route.way.consitenthash=com.crossoverjie.cim.common.route.algorithm.consistenthash.TreeMapConsistentHash
# Redis 配置
spring.redis.host=xx
... ...