作者 crossoverJie

:art: Improving structure / format of the code.

package com.crossoverjie.cim.client.client;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.init.CIMClientHandleInitializer;
import com.crossoverjie.cim.client.service.MsgHandle;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
... ... @@ -16,6 +18,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -28,7 +31,7 @@ import javax.annotation.PostConstruct;
* Function:
*
* @author crossoverJie
* Date: 22/05/2018 14:19
* Date: 22/05/2018 14:19
* @since JDK 1.8
*/
@Component
... ... @@ -36,7 +39,7 @@ public class CIMClient {
private final static Logger LOGGER = LoggerFactory.getLogger(CIMClient.class);
private EventLoopGroup group = new NioEventLoopGroup();
private EventLoopGroup group = new NioEventLoopGroup(0, new DefaultThreadFactory("cim-work"));
@Value("${cim.user.id}")
private long userId;
... ... @@ -49,6 +52,17 @@ public class CIMClient {
@Autowired
private RouteRequest routeRequest;
@Autowired
private AppConfiguration configuration;
@Autowired
private MsgHandle msgHandle;
/**
* 重试次数
*/
private int errorCount;
@PostConstruct
public void start() throws Exception {
... ... @@ -70,14 +84,25 @@ public class CIMClient {
* @param cimServer
* @throws InterruptedException
*/
private void startClient(CIMServerResVO.ServerInfo cimServer) throws InterruptedException {
private void startClient(CIMServerResVO.ServerInfo cimServer) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new CIMClientHandleInitializer())
;
ChannelFuture future = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
ChannelFuture future = null;
try {
future = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
} catch (InterruptedException e) {
errorCount++;
if (errorCount >= configuration.getErrorCount()) {
LOGGER.error("链接失败次数达到上限[{}]次", errorCount);
msgHandle.shutdown();
}
LOGGER.error("连接失败", e);
}
if (future.isSuccess()) {
LOGGER.info("启动 cim client 成功");
}
... ... @@ -90,10 +115,21 @@ public class CIMClient {
* @return 路由服务器信息
* @throws Exception
*/
private CIMServerResVO.ServerInfo userLogin() throws Exception {
private CIMServerResVO.ServerInfo userLogin() {
LoginReqVO loginReqVO = new LoginReqVO(userId, userName);
CIMServerResVO.ServerInfo cimServer = routeRequest.getCIMServer(loginReqVO);
LOGGER.info("cimServer=[{}]", cimServer.toString());
CIMServerResVO.ServerInfo cimServer = null;
try {
cimServer = routeRequest.getCIMServer(loginReqVO);
LOGGER.info("cimServer=[{}]", cimServer.toString());
} catch (Exception e) {
errorCount++;
if (errorCount >= configuration.getErrorCount()) {
LOGGER.error("重连次数达到上限[{}]次", errorCount);
msgHandle.shutdown();
}
LOGGER.error("登录失败", e);
}
return cimServer;
}
... ... @@ -145,11 +181,17 @@ public class CIMClient {
}
public void reconnect() throws Exception {
start();
}
/**
* 关闭
*
* @throws InterruptedException
*/
public void close() throws InterruptedException {
channel.close() ;
channel.close();
}
}
... ...
... ... @@ -22,6 +22,15 @@ public class AppConfiguration {
@Value("${cim.msg.logger.path}")
private String msgLoggerPath ;
@Value("${cim.clear.route.request.url}")
private String clearRouteUrl ;
@Value("${cim.heartbeat.time}")
private long heartBeatTime ;
@Value("${cim.reconnect.count}")
private int errorCount ;
public Long getUserId() {
return userId;
}
... ... @@ -45,4 +54,30 @@ public class AppConfiguration {
public void setMsgLoggerPath(String msgLoggerPath) {
this.msgLoggerPath = msgLoggerPath;
}
public long getHeartBeatTime() {
return heartBeatTime;
}
public void setHeartBeatTime(long heartBeatTime) {
this.heartBeatTime = heartBeatTime;
}
public String getClearRouteUrl() {
return clearRouteUrl;
}
public void setClearRouteUrl(String clearRouteUrl) {
this.clearRouteUrl = clearRouteUrl;
}
public int getErrorCount() {
return errorCount;
}
public void setErrorCount(int errorCount) {
this.errorCount = errorCount;
}
}
... ...
... ... @@ -82,6 +82,17 @@ public class BeanConfig {
return productExecutor ;
}
@Bean("scheduledTask")
public ScheduledExecutorService buildSchedule(){
ThreadFactory sche = new ThreadFactoryBuilder()
.setNameFormat("scheduled-%d")
.setDaemon(true)
.build();
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,sche) ;
return scheduledExecutorService ;
}
/**
* 回调 bean
* @return
... ...
package com.crossoverjie.cim.client.handle;
import com.crossoverjie.cim.client.thread.HeartBeatJob;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
... ... @@ -14,7 +15,9 @@ import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Function:
... ... @@ -32,6 +35,8 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
private ThreadPoolExecutor threadPoolExecutor ;
private ScheduledExecutorService scheduledExecutorService ;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
... ... @@ -50,7 +55,6 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
}) ;
}
}
super.userEventTriggered(ctx, evt);
... ... @@ -68,6 +72,7 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
//心跳更新时间
if (msg.getType() == Constants.CommandType.PING){
LOGGER.info("收到服务端心跳!!!");
NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis());
}
... ... @@ -78,6 +83,13 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
LOGGER.info(msg.getResMsg());
}
if (scheduledExecutorService == null){
scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ;
}
scheduledExecutorService.scheduleAtFixedRate(new HeartBeatJob(ctx),30,30, TimeUnit.SECONDS) ;
}
/**
... ...
... ... @@ -25,7 +25,7 @@ public class CIMClientHandleInitializer extends ChannelInitializer<Channel> {
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//30 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(0, 30, 0))
.addLast(new IdleStateHandler(0, 50, 0))
//心跳解码
//.addLast(new HeartbeatEncode())
... ...
... ... @@ -48,4 +48,10 @@ public interface MsgHandle {
* @return 是否应当跳过当前消息(包含了":" 就需要跳过)
*/
boolean innerCommand(String msg) ;
/**
* 关闭系统
*/
void shutdown() ;
}
... ...
... ... @@ -41,10 +41,13 @@ public interface RouteRequest {
CIMServerResVO.ServerInfo getCIMServer(LoginReqVO loginReqVO) throws Exception;
/**
*
* @return 获取所有在线用户
* 获取所有在线用户
* @return
* @throws Exception
*/
List<OnlineUsersResVO.DataBodyBean> onlineUsers()throws Exception ;
void offLine() ;
}
... ...
package com.crossoverjie.cim.client.service.impl;
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
import com.crossoverjie.cim.common.util.NettyAttrUtil;
import io.netty.channel.ChannelHandlerContext;
import okhttp3.MediaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Function:
... ... @@ -10,10 +19,39 @@ import io.netty.channel.ChannelHandlerContext;
* Date: 2019-01-20 17:16
* @since JDK 1.8
*/
@Service
public class ClientHeartBeatHandlerImpl implements HeartBeatHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(ClientHeartBeatHandlerImpl.class);
private final MediaType mediaType = MediaType.parse("application/json");
@Autowired
private AppConfiguration appConfiguration ;
@Autowired
private CIMClient cimClient ;
@Autowired
private RouteRequest routeRequest;
@Override
public void process(ChannelHandlerContext ctx) throws Exception {
long heartBeatTime = appConfiguration.getHeartBeatTime() * 1000;
Long lastReadTime = NettyAttrUtil.getReaderTime(ctx.channel());
long now = System.currentTimeMillis();
if (lastReadTime != null && now - lastReadTime > heartBeatTime){
LOGGER.warn("服务端心跳超时[{}]ms,[{}]需要关闭重新连接!",now - lastReadTime,appConfiguration.getUserName());
//首先清除路由信息,下线
routeRequest.offLine();
//重连
cimClient.reconnect();
}
}
}
... ...
... ... @@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
... ... @@ -37,7 +38,7 @@ public class MsgHandler implements MsgHandle {
@Autowired
private AppConfiguration configuration;
@Autowired
@Resource(name = "callBackThreadPool")
private ThreadPoolExecutor executor ;
@Autowired
... ... @@ -221,8 +222,10 @@ public class MsgHandler implements MsgHandle {
/**
* 关闭系统
*/
private void shutdown() {
@Override
public void shutdown() {
LOGGER.info("系统关闭中。。。。");
routeRequest.offLine();
msgLogger.stop();
executor.shutdown();
try {
... ...
... ... @@ -178,4 +178,26 @@ public class RouteRequestImpl implements RouteRequest {
return onlineUsersResVO.getDataBody();
}
@Override
public void offLine() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("userId", appConfiguration.getUserId());
jsonObject.put("msg", "offLine");
RequestBody requestBody = RequestBody.create(mediaType, jsonObject.toString());
Request request = new Request.Builder()
.url(appConfiguration.getClearRouteUrl())
.post(requestBody)
.build();
Response response = null;
try {
response = okHttpClient.newCall(request).execute();
} catch (IOException e) {
LOGGER.error("exception",e);
} finally {
response.body().close();
}
}
}
... ...
package com.crossoverjie.cim.client.thread;
import com.crossoverjie.cim.client.service.impl.ClientHeartBeatHandlerImpl;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-20 21:35
* @since JDK 1.8
*/
public class HeartBeatJob implements Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(HeartBeatJob.class);
private ChannelHandlerContext context ;
private HeartBeatHandler heartBeatHandler ;
public HeartBeatJob(ChannelHandlerContext context) {
this.context = context;
this.heartBeatHandler = SpringBeanFactory.getBean(ClientHeartBeatHandlerImpl.class) ;
}
@Override
public void run() {
try {
heartBeatHandler.process(context);
} catch (Exception e) {
e.printStackTrace();
}
}
}
... ...
... ... @@ -25,6 +25,8 @@ cim.server.route.request.url=http://45.78.28.220:8083/login
# 在线用户
cim.server.online.user.url=http://45.78.28.220:8083/onlineUser
# 清除路由信息
cim.clear.route.request.url=http://45.78.28.220:8083/offLine
###=======本地模拟======###
## 群发消息
... ... @@ -39,6 +41,9 @@ cim.server.online.user.url=http://45.78.28.220:8083/onlineUser
## 在线用户
#cim.server.online.user.url=http://localhost:8083/onlineUser
# 清除路由信息
#cim.clear.route.request.url=http://localhost:8083/offLine
# 客户端唯一ID
cim.user.id=1545574841528
cim.user.userName=zhangsan
... ... @@ -54,4 +59,7 @@ management.security.enabled=false
spring.boot.admin.url=http://127.0.0.1:8888
# 检测多少秒没有收到服务端端心跳后重新登录获取连接
cim.heartbeat.time = 40
\ No newline at end of file
cim.heartbeat.time = 60
# 客户端连接失败重连次数
cim.reconnect.count =3
\ No newline at end of file
... ...
... ... @@ -12,7 +12,9 @@ import io.netty.channel.ChannelHandlerContext;
public interface HeartBeatHandler {
/**
* 处理
* 处理心跳
* @param ctx
* @throws Exception
*/
void process(ChannelHandlerContext ctx) throws Exception ;
}
... ...
... ... @@ -94,7 +94,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
}
/**
* 清除路由关系
* 下线,清除路由关系
*
* @param userInfo
* @throws IOException
... ...