作者 crossoverJie
提交者 GitHub

Merge pull request #26 from crossoverJie/cim-1.0.2

cim 1.0.2
正在显示 33 个修改的文件 包含 599 行增加212 行删除
... ... @@ -73,12 +73,6 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
... ...
package com.crossoverjie.cim.client;
import com.crossoverjie.cim.client.scanner.Scan;
import com.crossoverjie.cim.client.service.impl.ClientInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
... ... @@ -15,6 +17,8 @@ public class CIMClientApplication implements CommandLineRunner{
private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientApplication.class);
@Autowired
private ClientInfo clientInfo ;
public static void main(String[] args) {
SpringApplication.run(CIMClientApplication.class, args);
LOGGER.info("启动 Client 服务成功");
... ... @@ -26,5 +30,6 @@ public class CIMClientApplication implements CommandLineRunner{
Thread thread = new Thread(scan);
thread.setName("scan-thread");
thread.start();
clientInfo.saveStartDate();
}
}
\ No newline at end of file
... ...
package com.crossoverjie.cim.client.client;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.init.CIMClientHandleInitializer;
import com.crossoverjie.cim.client.service.MsgHandle;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.service.impl.ClientInfo;
import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
... ... @@ -16,6 +19,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -28,7 +32,7 @@ import javax.annotation.PostConstruct;
* Function:
*
* @author crossoverJie
* Date: 22/05/2018 14:19
* Date: 22/05/2018 14:19
* @since JDK 1.8
*/
@Component
... ... @@ -36,7 +40,7 @@ public class CIMClient {
private final static Logger LOGGER = LoggerFactory.getLogger(CIMClient.class);
private EventLoopGroup group = new NioEventLoopGroup();
private EventLoopGroup group = new NioEventLoopGroup(0, new DefaultThreadFactory("cim-work"));
@Value("${cim.user.id}")
private long userId;
... ... @@ -49,6 +53,20 @@ public class CIMClient {
@Autowired
private RouteRequest routeRequest;
@Autowired
private AppConfiguration configuration;
@Autowired
private MsgHandle msgHandle;
@Autowired
private ClientInfo clientInfo;
/**
* 重试次数
*/
private int errorCount;
@PostConstruct
public void start() throws Exception {
... ... @@ -70,14 +88,25 @@ public class CIMClient {
* @param cimServer
* @throws InterruptedException
*/
private void startClient(CIMServerResVO.ServerInfo cimServer) throws InterruptedException {
private void startClient(CIMServerResVO.ServerInfo cimServer) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new CIMClientHandleInitializer())
;
ChannelFuture future = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
ChannelFuture future = null;
try {
future = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
} catch (InterruptedException e) {
errorCount++;
if (errorCount >= configuration.getErrorCount()) {
LOGGER.error("链接失败次数达到上限[{}]次", errorCount);
msgHandle.shutdown();
}
LOGGER.error("连接失败", e);
}
if (future.isSuccess()) {
LOGGER.info("启动 cim client 成功");
}
... ... @@ -90,10 +119,26 @@ public class CIMClient {
* @return 路由服务器信息
* @throws Exception
*/
private CIMServerResVO.ServerInfo userLogin() throws Exception {
private CIMServerResVO.ServerInfo userLogin() {
LoginReqVO loginReqVO = new LoginReqVO(userId, userName);
CIMServerResVO.ServerInfo cimServer = routeRequest.getCIMServer(loginReqVO);
LOGGER.info("cimServer=[{}]", cimServer.toString());
CIMServerResVO.ServerInfo cimServer = null;
try {
cimServer = routeRequest.getCIMServer(loginReqVO);
//保存系统信息
clientInfo.saveServiceInfo(cimServer.getIp() + ":" + cimServer.getCimServerPort())
.saveUserInfo(userId, userName);
LOGGER.info("cimServer=[{}]", cimServer.toString());
} catch (Exception e) {
errorCount++;
if (errorCount >= configuration.getErrorCount()) {
LOGGER.error("重连次数达到上限[{}]次", errorCount);
msgHandle.shutdown();
}
LOGGER.error("登录失败", e);
}
return cimServer;
}
... ... @@ -145,11 +190,25 @@ public class CIMClient {
}
public void reconnect() throws Exception {
if (channel != null && channel.isActive()) {
return;
}
//首先清除路由信息,下线
routeRequest.offLine();
LOGGER.info("开始重连。。");
start();
LOGGER.info("重连成功!!");
}
/**
* 关闭
*
* @throws InterruptedException
*/
public void close() throws InterruptedException {
channel.close() ;
channel.close();
}
}
... ...
... ... @@ -22,6 +22,15 @@ public class AppConfiguration {
@Value("${cim.msg.logger.path}")
private String msgLoggerPath ;
@Value("${cim.clear.route.request.url}")
private String clearRouteUrl ;
@Value("${cim.heartbeat.time}")
private long heartBeatTime ;
@Value("${cim.reconnect.count}")
private int errorCount ;
public Long getUserId() {
return userId;
}
... ... @@ -45,4 +54,30 @@ public class AppConfiguration {
public void setMsgLoggerPath(String msgLoggerPath) {
this.msgLoggerPath = msgLoggerPath;
}
public long getHeartBeatTime() {
return heartBeatTime;
}
public void setHeartBeatTime(long heartBeatTime) {
this.heartBeatTime = heartBeatTime;
}
public String getClearRouteUrl() {
return clearRouteUrl;
}
public void setClearRouteUrl(String clearRouteUrl) {
this.clearRouteUrl = clearRouteUrl;
}
public int getErrorCount() {
return errorCount;
}
public void setErrorCount(int errorCount) {
this.errorCount = errorCount;
}
}
... ...
... ... @@ -82,6 +82,17 @@ public class BeanConfig {
return productExecutor ;
}
@Bean("scheduledTask")
public ScheduledExecutorService buildSchedule(){
ThreadFactory sche = new ThreadFactoryBuilder()
.setNameFormat("scheduled-%d")
.setDaemon(true)
.build();
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,sche) ;
return scheduledExecutorService ;
}
/**
* 回调 bean
* @return
... ...
package com.crossoverjie.cim.client.handle;
import com.crossoverjie.cim.client.thread.ReConnectJob;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.crossoverjie.cim.common.protocol.CIMResponseProto;
import com.crossoverjie.cim.common.util.NettyAttrUtil;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
... ... @@ -13,7 +15,9 @@ import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Function:
... ... @@ -31,6 +35,8 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
private ThreadPoolExecutor threadPoolExecutor ;
private ScheduledExecutorService scheduledExecutorService ;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
... ... @@ -38,13 +44,19 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
if (evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
//LOGGER.info("定时检测服务端是否存活");
if (idleStateEvent.state() == IdleState.WRITER_IDLE){
CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
CIMRequestProto.CIMReqProtocol.class);
ctx.writeAndFlush(heartBeat).addListeners(ChannelFutureListener.CLOSE_ON_FAILURE) ;
ctx.writeAndFlush(heartBeat).addListeners((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
LOGGER.error("IO error,close Channel");
future.channel().close();
}
}) ;
}
}
super.userEventTriggered(ctx, evt);
... ... @@ -58,10 +70,24 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, CIMResponseProto.CIMResProtocol msg) throws Exception {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("客户端断开了,重新连接!");
//从服务端收到消息时被调用
//LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ;
if (scheduledExecutorService == null){
scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ;
}
// TODO: 2019-01-22 后期可以改为不用定时任务,连上后就关闭任务 节省性能。
scheduledExecutorService.scheduleAtFixedRate(new ReConnectJob(ctx),0,10, TimeUnit.SECONDS) ;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, CIMResponseProto.CIMResProtocol msg) throws Exception {
//心跳更新时间
if (msg.getType() == Constants.CommandType.PING){
//LOGGER.info("收到服务端心跳!!!");
NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis());
}
if (msg.getType() != Constants.CommandType.PING) {
//回调消息
... ... @@ -70,6 +96,10 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
LOGGER.info(msg.getResMsg());
}
}
/**
... ...
... ... @@ -24,8 +24,8 @@ public class CIMClientHandleInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//30 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(0, 30, 0))
//10 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(0, 10, 0))
//心跳解码
//.addLast(new HeartbeatEncode())
... ...
... ... @@ -48,4 +48,10 @@ public interface MsgHandle {
* @return 是否应当跳过当前消息(包含了":" 就需要跳过)
*/
boolean innerCommand(String msg) ;
/**
* 关闭系统
*/
void shutdown() ;
}
... ...
... ... @@ -41,10 +41,13 @@ public interface RouteRequest {
CIMServerResVO.ServerInfo getCIMServer(LoginReqVO loginReqVO) throws Exception;
/**
*
* @return 获取所有在线用户
* 获取所有在线用户
* @return
* @throws Exception
*/
List<OnlineUsersResVO.DataBodyBean> onlineUsers()throws Exception ;
void offLine() ;
}
... ...
package com.crossoverjie.cim.client.service.impl;
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-20 17:16
* @since JDK 1.8
*/
@Service
public class ClientHeartBeatHandlerImpl implements HeartBeatHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(ClientHeartBeatHandlerImpl.class);
@Autowired
private CIMClient cimClient;
@Override
public void process(ChannelHandlerContext ctx) throws Exception {
//重连
cimClient.reconnect();
}
}
... ...
package com.crossoverjie.cim.client.service.impl;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-21 23:35
* @since JDK 1.8
*/
@Component
public class ClientInfo {
private Info info = new Info() ;
public Info get(){
return info ;
}
public ClientInfo saveUserInfo(long userId,String userName){
info.setUserId(userId);
info.setUserName(userName);
return this;
}
public ClientInfo saveServiceInfo(String serviceInfo){
info.setServiceInfo(serviceInfo);
return this;
}
public ClientInfo saveStartDate(){
info.setStartDate(new Date());
return this;
}
private class Info{
private String userName;
private long userId ;
private String serviceInfo ;
private Date startDate ;
public Info() {
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public long getUserId() {
return userId;
}
public void setUserId(long userId) {
this.userId = userId;
}
public String getServiceInfo() {
return serviceInfo;
}
public void setServiceInfo(String serviceInfo) {
this.serviceInfo = serviceInfo;
}
public Date getStartDate() {
return startDate;
}
public void setStartDate(Date startDate) {
this.startDate = startDate;
}
}
}
... ...
package com.crossoverjie.cim.client.service.impl;
import com.alibaba.fastjson.JSON;
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.service.MsgHandle;
... ... @@ -16,6 +17,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
... ... @@ -25,40 +27,44 @@ import java.util.concurrent.TimeUnit;
* Function:
*
* @author crossoverJie
* Date: 2018/12/26 11:15
* Date: 2018/12/26 11:15
* @since JDK 1.8
*/
@Service
public class MsgHandler implements MsgHandle {
private final static Logger LOGGER = LoggerFactory.getLogger(MsgHandler.class);
@Autowired
private RouteRequest routeRequest ;
private RouteRequest routeRequest;
@Autowired
private AppConfiguration configuration;
@Resource(name = "callBackThreadPool")
private ThreadPoolExecutor executor;
@Autowired
private ThreadPoolExecutor executor ;
private CIMClient cimClient;
@Autowired
private CIMClient cimClient ;
private MsgLogger msgLogger;
@Autowired
private MsgLogger msgLogger ;
private ClientInfo clientInfo ;
private boolean aiModel = false ;
private boolean aiModel = false;
@Override
public void sendMsg(String msg) {
if (aiModel){
if (aiModel) {
aiChat(msg);
}else {
} else {
normalChat(msg);
}
}
/**
* 正常聊天
*
* @param msg
*/
private void normalChat(String msg) {
... ... @@ -72,7 +78,7 @@ public class MsgHandler implements MsgHandle {
try {
p2pChat(p2PReqVO);
} catch (Exception e) {
LOGGER.error("Exception",e);
LOGGER.error("Exception", e);
}
} else {
... ... @@ -81,21 +87,22 @@ public class MsgHandler implements MsgHandle {
try {
groupChat(groupReqVO);
} catch (Exception e) {
LOGGER.error("Exception",e);
LOGGER.error("Exception", e);
}
}
}
/**
* AI model
*
* @param msg
*/
private void aiChat(String msg) {
msg = msg.replace("吗","") ;
msg = msg.replace("嘛","") ;
msg = msg.replace("?","!");
msg = msg.replace("?","!");
msg = msg.replace("你","我");
msg = msg.replace("吗", "");
msg = msg.replace("嘛", "");
msg = msg.replace("?", "!");
msg = msg.replace("?", "!");
msg = msg.replace("你", "我");
System.out.println("AI:\033[31;4m" + msg + "\033[0m");
}
... ... @@ -113,7 +120,7 @@ public class MsgHandler implements MsgHandle {
@Override
public boolean checkMsg(String msg) {
if (StringUtil.isEmpty(msg)){
if (StringUtil.isEmpty(msg)) {
LOGGER.warn("不能发送空消息!");
return true;
}
... ... @@ -123,41 +130,47 @@ public class MsgHandler implements MsgHandle {
@Override
public boolean innerCommand(String msg) {
if (msg.startsWith(":")){
// TODO: 2019-01-22 判断逻辑过多,需要重构。
if (msg.startsWith(":")) {
Map<String, String> allStatusCode = SystemCommandEnumType.getAllStatusCode();
if (SystemCommandEnumType.QUIT.getCommandType().trim().equals(msg)){
if (SystemCommandEnumType.QUIT.getCommandType().trim().equals(msg)) {
//关闭系统
shutdown();
} else if (SystemCommandEnumType.ALL.getCommandType().trim().equals(msg)){
} else if (SystemCommandEnumType.ALL.getCommandType().trim().equals(msg)) {
printAllCommand(allStatusCode);
} else if (SystemCommandEnumType.ONLINE_USER.getCommandType().toLowerCase().trim().equals(msg.toLowerCase())){
} else if (SystemCommandEnumType.ONLINE_USER.getCommandType().toLowerCase().trim().equals(msg.toLowerCase())) {
//打印在线用户
printOnlineUsers();
} else if (msg.startsWith(SystemCommandEnumType.QUERY.getCommandType().trim() + " ")){
} else if (msg.startsWith(SystemCommandEnumType.QUERY.getCommandType().trim() + " ")) {
//查询聊天记录
queryChatHistory(msg);
}else if (SystemCommandEnumType.AI.getCommandType().trim().equals(msg.toLowerCase())){
} else if (SystemCommandEnumType.AI.getCommandType().trim().equals(msg.toLowerCase())) {
//开启 AI 模式
aiModel = true ;
System.out.println("\033[31;4m" + "Hello,我是估值两亿的 AI 机器人!" + "\033[0m");
}else if (SystemCommandEnumType.QAI.getCommandType().trim().equals(msg.toLowerCase())){
aiModel = true;
System.out.println("\033[31;4m" + "Hello,我是估值两亿的 AI 机器人!" + "\033[0m");
} else if (SystemCommandEnumType.QAI.getCommandType().trim().equals(msg.toLowerCase())) {
//关闭 AI 模式
aiModel = false ;
System.out.println("\033[31;4m" + "。゚(゚´ω`゚)゚。 AI 下线了!" + "\033[0m");
}else if (msg.startsWith(SystemCommandEnumType.PREFIX.getCommandType().trim() + " ")){
aiModel = false;
System.out.println("\033[31;4m" + "。゚(゚´ω`゚)゚。 AI 下线了!" + "\033[0m");
} else if (msg.startsWith(SystemCommandEnumType.PREFIX.getCommandType().trim() + " ")) {
//模糊匹配
prefixSearch(msg);
}else {
} else if (SystemCommandEnumType.INFO.getCommandType().trim().equals(msg.toLowerCase())) {
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
LOGGER.info("client info=[{}]", JSON.toJSONString(clientInfo.get()));
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
} else {
printAllCommand(allStatusCode);
}
return true ;
return true;
}else {
return false ;
} else {
return false;
}
... ... @@ -166,12 +179,13 @@ public class MsgHandler implements MsgHandle {
/**
* 模糊匹配
*
* @param msg
*/
private void prefixSearch(String msg) {
try {
List<OnlineUsersResVO.DataBodyBean> onlineUsers = routeRequest.onlineUsers();
TrieTree trieTree = new TrieTree() ;
TrieTree trieTree = new TrieTree();
for (OnlineUsersResVO.DataBodyBean onlineUser : onlineUsers) {
trieTree.insert(onlineUser.getUserName());
}
... ... @@ -186,16 +200,17 @@ public class MsgHandler implements MsgHandle {
}
} catch (Exception e) {
LOGGER.error("Exception" ,e);
LOGGER.error("Exception", e);
}
}
/**
* 查询聊天记录
*
* @param msg
*/
private void queryChatHistory(String msg) {
String[] split = msg.split(" ") ;
String[] split = msg.split(" ");
String res = msgLogger.query(split[1]);
System.out.println(res);
}
... ... @@ -209,20 +224,22 @@ public class MsgHandler implements MsgHandle {
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
for (OnlineUsersResVO.DataBodyBean onlineUser : onlineUsers) {
LOGGER.info("userId={}=====userName={}",onlineUser.getUserId(),onlineUser.getUserName());
LOGGER.info("userId={}=====userName={}", onlineUser.getUserId(), onlineUser.getUserName());
}
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
} catch (Exception e) {
LOGGER.error("Exception" ,e);
LOGGER.error("Exception", e);
}
}
/**
* 关闭系统
*/
private void shutdown() {
@Override
public void shutdown() {
LOGGER.info("系统关闭中。。。。");
routeRequest.offLine();
msgLogger.stop();
executor.shutdown();
try {
... ... @@ -231,7 +248,7 @@ public class MsgHandler implements MsgHandle {
}
cimClient.close();
} catch (InterruptedException e) {
LOGGER.error("InterruptedException",e);
LOGGER.error("InterruptedException", e);
}
System.exit(0);
}
... ...
... ... @@ -45,7 +45,7 @@ public class RouteRequestImpl implements RouteRequest {
private String p2pRouteRequestUrl;
@Value("${cim.server.route.request.url}")
private String serverRouteRequestUrl;
private String serverRouteLoginUrl;
@Value("${cim.server.online.user.url}")
private String onlineUserUrl;
... ... @@ -120,7 +120,7 @@ public class RouteRequestImpl implements RouteRequest {
RequestBody requestBody = RequestBody.create(mediaType,jsonObject.toString());
Request request = new Request.Builder()
.url(serverRouteRequestUrl)
.url(serverRouteLoginUrl)
.post(requestBody)
.build();
... ... @@ -178,4 +178,26 @@ public class RouteRequestImpl implements RouteRequest {
return onlineUsersResVO.getDataBody();
}
@Override
public void offLine() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("userId", appConfiguration.getUserId());
jsonObject.put("msg", "offLine");
RequestBody requestBody = RequestBody.create(mediaType, jsonObject.toString());
Request request = new Request.Builder()
.url(appConfiguration.getClearRouteUrl())
.post(requestBody)
.build();
Response response = null;
try {
response = okHttpClient.newCall(request).execute();
} catch (IOException e) {
LOGGER.error("exception",e);
} finally {
response.body().close();
}
}
}
... ...
package com.crossoverjie.cim.client.thread;
import com.crossoverjie.cim.client.service.impl.ClientHeartBeatHandlerImpl;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-20 21:35
* @since JDK 1.8
*/
public class ReConnectJob implements Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(ReConnectJob.class);
private ChannelHandlerContext context ;
private HeartBeatHandler heartBeatHandler ;
public ReConnectJob(ChannelHandlerContext context) {
this.context = context;
this.heartBeatHandler = SpringBeanFactory.getBean(ClientHeartBeatHandlerImpl.class) ;
}
@Override
public void run() {
try {
heartBeatHandler.process(context);
} catch (Exception e) {
LOGGER.error("Exception",e);
}
}
}
... ...
... ... @@ -25,6 +25,8 @@ cim.server.route.request.url=http://45.78.28.220:8083/login
# 在线用户
cim.server.online.user.url=http://45.78.28.220:8083/onlineUser
# 清除路由信息
cim.clear.route.request.url=http://45.78.28.220:8083/offLine
###=======本地模拟======###
## 群发消息
... ... @@ -39,6 +41,9 @@ cim.server.online.user.url=http://45.78.28.220:8083/onlineUser
## 在线用户
#cim.server.online.user.url=http://localhost:8083/onlineUser
# 清除路由信息
#cim.clear.route.request.url=http://localhost:8083/offLine
# 客户端唯一ID
cim.user.id=1545574841528
cim.user.userName=zhangsan
... ... @@ -51,4 +56,10 @@ cim.callback.thread.pool.size = 2
# 关闭健康检查权限
management.security.enabled=false
# SpringAdmin 地址
spring.boot.admin.url=http://127.0.0.1:8888
\ No newline at end of file
spring.boot.admin.url=http://127.0.0.1:8888
# 检测多少秒没有收到服务端端心跳后重新登录获取连接
cim.heartbeat.time = 60
# 客户端连接失败重连次数
cim.reconnect.count =3
\ No newline at end of file
... ...
... ... @@ -38,5 +38,11 @@
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
... ...
... ... @@ -20,7 +20,8 @@ public enum SystemCommandEnumType {
QUERY(":q ","【:q 关键字】查询聊天记录"),
AI(":ai ","开启 AI 模式"),
QAI(":qai ","关闭 AI 模式"),
PREFIX(":pu ","模糊匹配用户")
PREFIX(":pu ","模糊匹配用户"),
INFO(":info ","获取客户端信息")
;
... ...
package com.crossoverjie.cim.common.kit;
import io.netty.channel.ChannelHandlerContext;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-20 17:15
* @since JDK 1.8
*/
public interface HeartBeatHandler {
/**
* 处理心跳
* @param ctx
* @throws Exception
*/
void process(ChannelHandlerContext ctx) throws Exception ;
}
... ...
package com.crossoverjie.cim.server.util;
package com.crossoverjie.cim.common.util;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
... ...
... ... @@ -174,6 +174,9 @@ public class AccountServiceRedisImpl implements AccountService {
@Override
public void offLine(Long userId) throws Exception {
// TODO: 2019-01-21 改为一个原子命令,以防数据一致性
//删除路由
redisTemplate.delete(ROUTE_PREFIX + userId) ;
... ...
... ... @@ -64,13 +64,6 @@
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
... ...
... ... @@ -51,7 +51,7 @@ public class BeanConfig {
public CIMRequestProto.CIMReqProtocol heartBeat() {
CIMRequestProto.CIMReqProtocol heart = CIMRequestProto.CIMReqProtocol.newBuilder()
.setRequestId(0L)
.setReqMsg("ping")
.setReqMsg("pong")
.setType(Constants.CommandType.PING)
.build();
return heart;
... ...
... ... @@ -3,13 +3,18 @@ package com.crossoverjie.cim.server.handle;
import com.alibaba.fastjson.JSONObject;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.crossoverjie.cim.common.util.NettyAttrUtil;
import com.crossoverjie.cim.server.config.AppConfiguration;
import com.crossoverjie.cim.server.util.NettyAttrUtil;
import com.crossoverjie.cim.server.kit.ServerHeartBeatHandlerImpl;
import com.crossoverjie.cim.server.util.SessionSocketHolder;
import com.crossoverjie.cim.server.util.SpringBeanFactory;
import io.netty.channel.*;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
... ... @@ -55,23 +60,11 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.READER_IDLE) {
AppConfiguration configuration = SpringBeanFactory.getBean(AppConfiguration.class);
long heartBeatTime = configuration.getHeartBeatTime() * 1000;
//向客户端发送消息
CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
CIMRequestProto.CIMReqProtocol.class);
ctx.writeAndFlush(heartBeat).addListeners(ChannelFutureListener.CLOSE_ON_FAILURE);
Long lastReadTime = NettyAttrUtil.getReaderTime(ctx.channel());
long now = System.currentTimeMillis();
if (lastReadTime != null && now - lastReadTime > heartBeatTime){
CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());
LOGGER.warn("客户端[{}]心跳超时[{}]ms,需要关闭连接!",userInfo.getUserName(),now - lastReadTime);
userOffLine(userInfo, (NioSocketChannel) ctx.channel());
ctx.channel().close();
}
LOGGER.info("定时检测客户端端是否存活");
HeartBeatHandler heartBeatHandler = SpringBeanFactory.getBean(ServerHeartBeatHandlerImpl.class) ;
heartBeatHandler.process(ctx) ;
}
}
super.userEventTriggered(ctx, evt);
... ... @@ -93,7 +86,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
}
/**
* 清除路由关系
* 下线,清除路由关系
*
* @param userInfo
* @throws IOException
... ... @@ -137,6 +130,15 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
//心跳更新时间
if (msg.getType() == Constants.CommandType.PING){
NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis());
//向客户端响应 pong 消息
CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
CIMRequestProto.CIMReqProtocol.class);
ctx.writeAndFlush(heartBeat).addListeners((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
LOGGER.error("IO error,close Channel");
future.channel().close();
}
}) ;
}
}
... ...
... ... @@ -25,8 +25,8 @@ public class CIMServerInitializer extends ChannelInitializer<Channel> {
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//30 秒没有向客户端发送消息就发生心跳
.addLast(new IdleStateHandler(30, 0, 0))
//11 秒没有向客户端发送消息就发生心跳
.addLast(new IdleStateHandler(11, 0, 0))
// google Protobuf 编解码
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(CIMRequestProto.CIMReqProtocol.getDefaultInstance()))
... ...
package com.crossoverjie.cim.server.kit;
import com.alibaba.fastjson.JSONObject;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.server.config.AppConfiguration;
import com.crossoverjie.cim.server.util.SessionSocketHolder;
import com.crossoverjie.cim.server.util.SpringBeanFactory;
import io.netty.channel.socket.nio.NioSocketChannel;
import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-20 17:20
* @since JDK 1.8
*/
@Component
public class RouteHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(RouteHandler.class);
private final MediaType mediaType = MediaType.parse("application/json");
/**
* 用户下线
* @param userInfo
* @param channel
* @throws IOException
*/
public void userOffLine(CIMUserInfo userInfo, NioSocketChannel channel) throws IOException {
if (userInfo != null){
LOGGER.info("用户[{}]下线", userInfo.getUserName());
SessionSocketHolder.removeSession(userInfo.getUserId());
//清除路由关系
clearRouteInfo(userInfo);
}
SessionSocketHolder.remove(channel);
}
/**
* 清除路由关系
*
* @param userInfo
* @throws IOException
*/
private void clearRouteInfo(CIMUserInfo userInfo) throws IOException {
OkHttpClient okHttpClient = SpringBeanFactory.getBean(OkHttpClient.class);
AppConfiguration configuration = SpringBeanFactory.getBean(AppConfiguration.class);
JSONObject jsonObject = new JSONObject();
jsonObject.put("userId", userInfo.getUserId());
jsonObject.put("msg", "offLine");
RequestBody requestBody = RequestBody.create(mediaType, jsonObject.toString());
Request request = new Request.Builder()
.url(configuration.getClearRouteUrl())
.post(requestBody)
.build();
Response response = null;
try {
response = okHttpClient.newCall(request).execute();
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
} finally {
response.body().close();
}
}
}
... ...
package com.crossoverjie.cim.server.kit;
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.common.util.NettyAttrUtil;
import com.crossoverjie.cim.server.config.AppConfiguration;
import com.crossoverjie.cim.server.util.SessionSocketHolder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-20 17:16
* @since JDK 1.8
*/
@Service
public class ServerHeartBeatHandlerImpl implements HeartBeatHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(ServerHeartBeatHandlerImpl.class);
@Autowired
private RouteHandler routeHandler ;
@Autowired
private AppConfiguration appConfiguration ;
@Override
public void process(ChannelHandlerContext ctx) throws Exception {
long heartBeatTime = appConfiguration.getHeartBeatTime() * 1000;
Long lastReadTime = NettyAttrUtil.getReaderTime(ctx.channel());
long now = System.currentTimeMillis();
if (lastReadTime != null && now - lastReadTime > heartBeatTime){
CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());
if (userInfo != null){
LOGGER.warn("客户端[{}]心跳超时[{}]ms,需要关闭连接!",userInfo.getUserName(),now - lastReadTime);
}
routeHandler.userOffLine(userInfo, (NioSocketChannel) ctx.channel());
ctx.channel().close();
}
}
}
... ...
... ... @@ -34,4 +34,4 @@ app.zk.root=/route
cim.clear.route.request.url=http://localhost:8083/offLine
# 检测多少秒没有收到客户端心跳后服务端关闭连接
cim.heartbeat.time = 40
\ No newline at end of file
cim.heartbeat.time = 30
\ No newline at end of file
... ...
... ... @@ -30,7 +30,6 @@
<module>cim-server</module>
<module>cim-client</module>
<module>cim-common</module>
<module>springboot-admin</module>
<module>cim-zk</module>
<module>cim-forward-route</module>
</modules>
... ...
# 监控 admin
[SpringBoot admin](https://github.com/codecentric/spring-boot-admin)
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.crossoverjie.netty</groupId>
<artifactId>cim</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>springboot-admin</artifactId>
<packaging>jar</packaging>
<name>admin</name>
<description>springBoot admin</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<logback.version>1.2.3</logback.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-server</artifactId>
<version>1.5.7</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-server-ui</artifactId>
<version>1.5.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.ai.obc.springboot.admin;
import de.codecentric.boot.admin.config.EnableAdminServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Configuration;
@SpringBootApplication
@Configuration
@EnableAutoConfiguration
@EnableAdminServer
public class AdminApplication {
public static void main(String[] args) {
SpringApplication.run(AdminApplication.class, args);
}
}
spring.application.name=spring-boot-admin
server.cimServerPort = 8888
logging.level.root=info
\ No newline at end of file
package com.ai.obc.springboot.admin;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class AdminApplicationTests {
@Test
public void contextLoads() {
}
}