作者 XellossRyan
提交者 GitHub

Merge pull request #1 from crossoverJie/master

merge
正在显示 51 个修改的文件 包含 1233 行增加247 行删除
... ... @@ -49,8 +49,9 @@
* [x] 根据实际情况灵活的水平扩容、缩容。
* [x] 路由(`cim-forward-route`)服务自身是无状态,可用 `Nginx` 代理支持高可用。
* [x] 服务端自动剔除离线客户端。
* [ ] 弱网环境下客户端自动重连。
* [x] 客户端自动重连。
* [ ] 分组群聊。
* [ ] Android SDK。
* [ ] 离线消息。
* [ ] 协议支持消息加密。
* [ ] 更多的客户端路由策略。
... ... @@ -179,6 +180,7 @@ java -jar cim-client-1.0.0-SNAPSHOT.jar --server.port=8084 --cim.user.id=上方
| `:ai` | 开启 AI 模式 |
| `:qai` | 关闭 AI 模式 |
| `:pu` | 模糊匹配用户 |
| `:info` | 获取客户端信息 |
| `:` | 更多命令正在开发中。。 |
![](https://ws3.sinaimg.cn/large/006tNbRwly1fylh7bdlo6g30go01shdt.gif)
... ... @@ -205,7 +207,7 @@ java -jar cim-client-1.0.0-SNAPSHOT.jar --server.port=8084 --cim.user.id=上方
![](https://ws4.sinaimg.cn/large/006tNc79gy1fz3vo4tgkjj31ni09s41u.jpg)
使用命令 `:qu prefix` 可以按照前缀的方式重新用户信息。
使用命令 `:qu prefix` 可以按照前缀的方式搜索用户信息。
> 该功能主要用于在移动端中的输入框中搜索用户。
... ... @@ -248,6 +250,12 @@ java -jar cim-client-1.0.0-SNAPSHOT.jar --server.port=8084 --cim.user.id=上方
![](https://ws1.sinaimg.cn/large/006tKfTcly1ftmfdo6mhmj30760760t7.jpg)
# Code Visualization
[![Watch the video](https://img.youtube.com/vi/NhV_brPIG74/maxresdefault.jpg)](https://youtu.be/NhV_brPIG74)
### Code Visualization:
Here is a cool visualization of the code evolution
[![Watch the video](https://img.youtube.com/vi/NhV_brPIG74/0.jpg)](https://www.youtube.com/watch?v=NhV_brPIG74)
[https://www.youtube.com/watch?v=NhV_brPIG74](https://www.youtube.com/watch?v=NhV_brPIG74)
... ...
... ... @@ -73,12 +73,6 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
... ...
package com.crossoverjie.cim.client;
import com.crossoverjie.cim.client.scanner.Scan;
import com.crossoverjie.cim.client.service.impl.ClientInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
... ... @@ -15,6 +17,8 @@ public class CIMClientApplication implements CommandLineRunner{
private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientApplication.class);
@Autowired
private ClientInfo clientInfo ;
public static void main(String[] args) {
SpringApplication.run(CIMClientApplication.class, args);
LOGGER.info("启动 Client 服务成功");
... ... @@ -26,5 +30,6 @@ public class CIMClientApplication implements CommandLineRunner{
Thread thread = new Thread(scan);
thread.setName("scan-thread");
thread.start();
clientInfo.saveStartDate();
}
}
\ No newline at end of file
... ...
package com.crossoverjie.cim.client.client;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.init.CIMClientHandleInitializer;
import com.crossoverjie.cim.client.service.MsgHandle;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.service.impl.ClientInfo;
import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
... ... @@ -16,6 +19,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -36,7 +40,7 @@ public class CIMClient {
private final static Logger LOGGER = LoggerFactory.getLogger(CIMClient.class);
private EventLoopGroup group = new NioEventLoopGroup();
private EventLoopGroup group = new NioEventLoopGroup(0, new DefaultThreadFactory("cim-work"));
@Value("${cim.user.id}")
private long userId;
... ... @@ -49,6 +53,20 @@ public class CIMClient {
@Autowired
private RouteRequest routeRequest;
@Autowired
private AppConfiguration configuration;
@Autowired
private MsgHandle msgHandle;
@Autowired
private ClientInfo clientInfo;
/**
* 重试次数
*/
private int errorCount;
@PostConstruct
public void start() throws Exception {
... ... @@ -70,14 +88,25 @@ public class CIMClient {
* @param cimServer
* @throws InterruptedException
*/
private void startClient(CIMServerResVO.ServerInfo cimServer) throws InterruptedException {
private void startClient(CIMServerResVO.ServerInfo cimServer) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new CIMClientHandleInitializer())
;
ChannelFuture future = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
ChannelFuture future = null;
try {
future = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
} catch (InterruptedException e) {
errorCount++;
if (errorCount >= configuration.getErrorCount()) {
LOGGER.error("链接失败次数达到上限[{}]次", errorCount);
msgHandle.shutdown();
}
LOGGER.error("连接失败", e);
}
if (future.isSuccess()) {
LOGGER.info("启动 cim client 成功");
}
... ... @@ -90,10 +119,26 @@ public class CIMClient {
* @return 路由服务器信息
* @throws Exception
*/
private CIMServerResVO.ServerInfo userLogin() throws Exception {
private CIMServerResVO.ServerInfo userLogin() {
LoginReqVO loginReqVO = new LoginReqVO(userId, userName);
CIMServerResVO.ServerInfo cimServer = routeRequest.getCIMServer(loginReqVO);
CIMServerResVO.ServerInfo cimServer = null;
try {
cimServer = routeRequest.getCIMServer(loginReqVO);
//保存系统信息
clientInfo.saveServiceInfo(cimServer.getIp() + ":" + cimServer.getCimServerPort())
.saveUserInfo(userId, userName);
LOGGER.info("cimServer=[{}]", cimServer.toString());
} catch (Exception e) {
errorCount++;
if (errorCount >= configuration.getErrorCount()) {
LOGGER.error("重连次数达到上限[{}]次", errorCount);
msgHandle.shutdown();
}
LOGGER.error("登录失败", e);
}
return cimServer;
}
... ... @@ -145,11 +190,27 @@ public class CIMClient {
}
public void reconnect() throws Exception {
if (channel != null && channel.isActive()) {
return;
}
//首先清除路由信息,下线
routeRequest.offLine();
LOGGER.info("开始重连。。");
start();
LOGGER.info("重连成功!!");
}
/**
* 关闭
*
* @throws InterruptedException
*/
public void close() throws InterruptedException {
channel.close() ;
if (channel != null){
channel.close();
}
}
}
... ...
... ... @@ -22,6 +22,15 @@ public class AppConfiguration {
@Value("${cim.msg.logger.path}")
private String msgLoggerPath ;
@Value("${cim.clear.route.request.url}")
private String clearRouteUrl ;
@Value("${cim.heartbeat.time}")
private long heartBeatTime ;
@Value("${cim.reconnect.count}")
private int errorCount ;
public Long getUserId() {
return userId;
}
... ... @@ -45,4 +54,30 @@ public class AppConfiguration {
public void setMsgLoggerPath(String msgLoggerPath) {
this.msgLoggerPath = msgLoggerPath;
}
public long getHeartBeatTime() {
return heartBeatTime;
}
public void setHeartBeatTime(long heartBeatTime) {
this.heartBeatTime = heartBeatTime;
}
public String getClearRouteUrl() {
return clearRouteUrl;
}
public void setClearRouteUrl(String clearRouteUrl) {
this.clearRouteUrl = clearRouteUrl;
}
public int getErrorCount() {
return errorCount;
}
public void setErrorCount(int errorCount) {
this.errorCount = errorCount;
}
}
... ...
... ... @@ -82,6 +82,17 @@ public class BeanConfig {
return productExecutor ;
}
@Bean("scheduledTask")
public ScheduledExecutorService buildSchedule(){
ThreadFactory sche = new ThreadFactoryBuilder()
.setNameFormat("scheduled-%d")
.setDaemon(true)
.build();
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,sche) ;
return scheduledExecutorService ;
}
/**
* 回调 bean
* @return
... ...
package com.crossoverjie.cim.client.handle;
import com.crossoverjie.cim.client.thread.ReConnectJob;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.crossoverjie.cim.common.protocol.CIMResponseProto;
import com.crossoverjie.cim.common.util.NettyAttrUtil;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
... ... @@ -13,7 +15,9 @@ import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Function:
... ... @@ -31,6 +35,8 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
private ThreadPoolExecutor threadPoolExecutor ;
private ScheduledExecutorService scheduledExecutorService ;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
... ... @@ -38,12 +44,18 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
if (evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
//LOGGER.info("定时检测服务端是否存活");
if (idleStateEvent.state() == IdleState.WRITER_IDLE){
CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
CIMRequestProto.CIMReqProtocol.class);
ctx.writeAndFlush(heartBeat).addListeners(ChannelFutureListener.CLOSE_ON_FAILURE) ;
ctx.writeAndFlush(heartBeat).addListeners((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
LOGGER.error("IO error,close Channel");
future.channel().close();
}
}) ;
}
}
... ... @@ -58,10 +70,24 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, CIMResponseProto.CIMResProtocol msg) throws Exception {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("客户端断开了,重新连接!");
if (scheduledExecutorService == null){
scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ;
}
// TODO: 2019-01-22 后期可以改为不用定时任务,连上后就关闭任务 节省性能。
scheduledExecutorService.scheduleAtFixedRate(new ReConnectJob(ctx),0,10, TimeUnit.SECONDS) ;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, CIMResponseProto.CIMResProtocol msg) throws Exception {
//从服务端收到消息时被调用
//LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ;
//心跳更新时间
if (msg.getType() == Constants.CommandType.PING){
//LOGGER.info("收到服务端心跳!!!");
NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis());
}
if (msg.getType() != Constants.CommandType.PING) {
//回调消息
... ... @@ -70,6 +96,10 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
LOGGER.info(msg.getResMsg());
}
}
/**
... ...
... ... @@ -24,8 +24,8 @@ public class CIMClientHandleInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//30 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(0, 30, 0))
//10 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(0, 10, 0))
//心跳解码
//.addLast(new HeartbeatEncode())
... ...
package com.crossoverjie.cim.client.service;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-27 19:26
* @since JDK 1.8
*/
public interface InnerCommand {
/**
* 执行
* @param msg
*/
void process(String msg) ;
}
... ...
package com.crossoverjie.cim.client.service;
import com.crossoverjie.cim.client.service.impl.command.PrintAllCommand;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.common.enums.SystemCommandEnum;
import com.crossoverjie.cim.common.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-27 19:39
* @since JDK 1.8
*/
@Component
public class InnerCommandContext {
private final static Logger LOGGER = LoggerFactory.getLogger(InnerCommandContext.class);
/**
* 获取执行器实例
* @param command 执行器实例
* @return
*/
public InnerCommand getInstance(String command) {
Map<String, String> allClazz = SystemCommandEnum.getAllClazz();
//兼容需要命令后接参数的数据 :q cross
String[] trim = command.trim().split(" ");
String clazz = allClazz.get(trim[0]);
InnerCommand innerCommand = null;
try {
if (StringUtil.isEmpty(clazz)){
clazz = PrintAllCommand.class.getName() ;
}
innerCommand = (InnerCommand) SpringBeanFactory.getBean(Class.forName(clazz));
} catch (Exception e) {
LOGGER.error("Exception", e);
}
return innerCommand;
}
}
... ...
... ... @@ -48,4 +48,20 @@ public interface MsgHandle {
* @return 是否应当跳过当前消息(包含了":" 就需要跳过)
*/
boolean innerCommand(String msg) ;
/**
* 关闭系统
*/
void shutdown() ;
/**
* 开启 AI 模式
*/
void openAIModel() ;
/**
* 关闭 AI 模式
*/
void closeAIModel() ;
}
... ...
... ... @@ -41,10 +41,13 @@ public interface RouteRequest {
CIMServerResVO.ServerInfo getCIMServer(LoginReqVO loginReqVO) throws Exception;
/**
*
* @return 获取所有在线用户
* 获取所有在线用户
* @return
* @throws Exception
*/
List<OnlineUsersResVO.DataBodyBean> onlineUsers()throws Exception ;
void offLine() ;
}
... ...
package com.crossoverjie.cim.client.service.impl;
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-20 17:16
* @since JDK 1.8
*/
@Service
public class ClientHeartBeatHandlerImpl implements HeartBeatHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(ClientHeartBeatHandlerImpl.class);
@Autowired
private CIMClient cimClient;
@Override
public void process(ChannelHandlerContext ctx) throws Exception {
//重连
cimClient.reconnect();
}
}
... ...
package com.crossoverjie.cim.client.service.impl;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-21 23:35
* @since JDK 1.8
*/
@Component
public class ClientInfo {
private Info info = new Info() ;
public Info get(){
return info ;
}
public ClientInfo saveUserInfo(long userId,String userName){
info.setUserId(userId);
info.setUserName(userName);
return this;
}
public ClientInfo saveServiceInfo(String serviceInfo){
info.setServiceInfo(serviceInfo);
return this;
}
public ClientInfo saveStartDate(){
info.setStartDate(new Date());
return this;
}
private class Info{
private String userName;
private long userId ;
private String serviceInfo ;
private Date startDate ;
public Info() {
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public long getUserId() {
return userId;
}
public void setUserId(long userId) {
this.userId = userId;
}
public String getServiceInfo() {
return serviceInfo;
}
public void setServiceInfo(String serviceInfo) {
this.serviceInfo = serviceInfo;
}
public Date getStartDate() {
return startDate;
}
public void setStartDate(Date startDate) {
this.startDate = startDate;
}
}
}
... ...
... ... @@ -2,20 +2,19 @@ package com.crossoverjie.cim.client.service.impl;
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.service.MsgHandle;
import com.crossoverjie.cim.client.service.MsgLogger;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.service.*;
import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.P2PReqVO;
import com.crossoverjie.cim.client.vo.res.OnlineUsersResVO;
import com.crossoverjie.cim.common.data.construct.TrieTree;
import com.crossoverjie.cim.common.enums.SystemCommandEnumType;
import com.crossoverjie.cim.common.enums.SystemCommandEnum;
import com.crossoverjie.cim.common.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
... ... @@ -32,33 +31,40 @@ import java.util.concurrent.TimeUnit;
public class MsgHandler implements MsgHandle {
private final static Logger LOGGER = LoggerFactory.getLogger(MsgHandler.class);
@Autowired
private RouteRequest routeRequest ;
private RouteRequest routeRequest;
@Autowired
private AppConfiguration configuration;
@Resource(name = "callBackThreadPool")
private ThreadPoolExecutor executor;
@Autowired
private CIMClient cimClient;
@Autowired
private ThreadPoolExecutor executor ;
private MsgLogger msgLogger;
@Autowired
private CIMClient cimClient ;
private ClientInfo clientInfo;
@Autowired
private MsgLogger msgLogger ;
private InnerCommandContext innerCommandContext ;
private boolean aiModel = false ;
private boolean aiModel = false;
@Override
public void sendMsg(String msg) {
if (aiModel){
if (aiModel) {
aiChat(msg);
}else {
} else {
normalChat(msg);
}
}
/**
* 正常聊天
*
* @param msg
*/
private void normalChat(String msg) {
... ... @@ -72,7 +78,7 @@ public class MsgHandler implements MsgHandle {
try {
p2pChat(p2PReqVO);
} catch (Exception e) {
LOGGER.error("Exception",e);
LOGGER.error("Exception", e);
}
} else {
... ... @@ -81,21 +87,22 @@ public class MsgHandler implements MsgHandle {
try {
groupChat(groupReqVO);
} catch (Exception e) {
LOGGER.error("Exception",e);
LOGGER.error("Exception", e);
}
}
}
/**
* AI model
*
* @param msg
*/
private void aiChat(String msg) {
msg = msg.replace("吗","") ;
msg = msg.replace("嘛","") ;
msg = msg.replace("?","!");
msg = msg.replace("?","!");
msg = msg.replace("你","我");
msg = msg.replace("吗", "");
msg = msg.replace("嘛", "");
msg = msg.replace("?", "!");
msg = msg.replace("?", "!");
msg = msg.replace("你", "我");
System.out.println("AI:\033[31;4m" + msg + "\033[0m");
}
... ... @@ -113,7 +120,7 @@ public class MsgHandler implements MsgHandle {
@Override
public boolean checkMsg(String msg) {
if (StringUtil.isEmpty(msg)){
if (StringUtil.isEmpty(msg)) {
LOGGER.warn("不能发送空消息!");
return true;
}
... ... @@ -123,41 +130,15 @@ public class MsgHandler implements MsgHandle {
@Override
public boolean innerCommand(String msg) {
if (msg.startsWith(":")){
Map<String, String> allStatusCode = SystemCommandEnumType.getAllStatusCode();
if (SystemCommandEnumType.QUIT.getCommandType().trim().equals(msg)){
//关闭系统
shutdown();
} else if (SystemCommandEnumType.ALL.getCommandType().trim().equals(msg)){
printAllCommand(allStatusCode);
} else if (SystemCommandEnumType.ONLINE_USER.getCommandType().toLowerCase().trim().equals(msg.toLowerCase())){
//打印在线用户
printOnlineUsers();
} else if (msg.startsWith(SystemCommandEnumType.QUERY.getCommandType().trim() + " ")){
//查询聊天记录
queryChatHistory(msg);
}else if (SystemCommandEnumType.AI.getCommandType().trim().equals(msg.toLowerCase())){
//开启 AI 模式
aiModel = true ;
System.out.println("\033[31;4m" + "Hello,我是估值两亿的 AI 机器人!" + "\033[0m");
}else if (SystemCommandEnumType.QAI.getCommandType().trim().equals(msg.toLowerCase())){
//关闭 AI 模式
aiModel = false ;
System.out.println("\033[31;4m" + "。゚(゚´ω`゚)゚。 AI 下线了!" + "\033[0m");
}else if (msg.startsWith(SystemCommandEnumType.PREFIX.getCommandType().trim() + " ")){
//模糊匹配
prefixSearch(msg);
}else {
printAllCommand(allStatusCode);
}
if (msg.startsWith(":")) {
InnerCommand instance = innerCommandContext.getInstance(msg);
instance.process(msg) ;
return true ;
return true;
}else {
return false ;
} else {
return false;
}
... ... @@ -166,12 +147,13 @@ public class MsgHandler implements MsgHandle {
/**
* 模糊匹配
*
* @param msg
*/
private void prefixSearch(String msg) {
try {
List<OnlineUsersResVO.DataBodyBean> onlineUsers = routeRequest.onlineUsers();
TrieTree trieTree = new TrieTree() ;
TrieTree trieTree = new TrieTree();
for (OnlineUsersResVO.DataBodyBean onlineUser : onlineUsers) {
trieTree.insert(onlineUser.getUserName());
}
... ... @@ -186,16 +168,17 @@ public class MsgHandler implements MsgHandle {
}
} catch (Exception e) {
LOGGER.error("Exception" ,e);
LOGGER.error("Exception", e);
}
}
/**
* 查询聊天记录
*
* @param msg
*/
private void queryChatHistory(String msg) {
String[] split = msg.split(" ") ;
String[] split = msg.split(" ");
String res = msgLogger.query(split[1]);
System.out.println(res);
}
... ... @@ -209,20 +192,22 @@ public class MsgHandler implements MsgHandle {
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
for (OnlineUsersResVO.DataBodyBean onlineUser : onlineUsers) {
LOGGER.info("userId={}=====userName={}",onlineUser.getUserId(),onlineUser.getUserName());
LOGGER.info("userId={}=====userName={}", onlineUser.getUserId(), onlineUser.getUserName());
}
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
} catch (Exception e) {
LOGGER.error("Exception" ,e);
LOGGER.error("Exception", e);
}
}
/**
* 关闭系统
*/
private void shutdown() {
@Override
public void shutdown() {
LOGGER.info("系统关闭中。。。。");
routeRequest.offLine();
msgLogger.stop();
executor.shutdown();
try {
... ... @@ -231,11 +216,21 @@ public class MsgHandler implements MsgHandle {
}
cimClient.close();
} catch (InterruptedException e) {
LOGGER.error("InterruptedException",e);
LOGGER.error("InterruptedException", e);
}
System.exit(0);
}
@Override
public void openAIModel() {
aiModel = true;
}
@Override
public void closeAIModel() {
aiModel = false ;
}
private void printAllCommand(Map<String, String> allStatusCode) {
LOGGER.warn("====================================");
for (Map.Entry<String, String> stringStringEntry : allStatusCode.entrySet()) {
... ...
... ... @@ -45,7 +45,7 @@ public class RouteRequestImpl implements RouteRequest {
private String p2pRouteRequestUrl;
@Value("${cim.server.route.request.url}")
private String serverRouteRequestUrl;
private String serverRouteLoginUrl;
@Value("${cim.server.online.user.url}")
private String onlineUserUrl;
... ... @@ -120,7 +120,7 @@ public class RouteRequestImpl implements RouteRequest {
RequestBody requestBody = RequestBody.create(mediaType,jsonObject.toString());
Request request = new Request.Builder()
.url(serverRouteRequestUrl)
.url(serverRouteLoginUrl)
.post(requestBody)
.build();
... ... @@ -178,4 +178,26 @@ public class RouteRequestImpl implements RouteRequest {
return onlineUsersResVO.getDataBody();
}
@Override
public void offLine() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("userId", appConfiguration.getUserId());
jsonObject.put("msg", "offLine");
RequestBody requestBody = RequestBody.create(mediaType, jsonObject.toString());
Request request = new Request.Builder()
.url(appConfiguration.getClearRouteUrl())
.post(requestBody)
.build();
Response response = null;
try {
response = okHttpClient.newCall(request).execute();
} catch (IOException e) {
LOGGER.error("exception",e);
} finally {
response.body().close();
}
}
}
... ...
package com.crossoverjie.cim.client.service.impl.command;
import com.crossoverjie.cim.client.service.InnerCommand;
import com.crossoverjie.cim.client.service.MsgHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-27 19:37
* @since JDK 1.8
*/
@Service
public class CloseAIModelCommand implements InnerCommand {
private final static Logger LOGGER = LoggerFactory.getLogger(CloseAIModelCommand.class);
@Autowired
private MsgHandle msgHandle ;
@Override
public void process(String msg) {
msgHandle.closeAIModel();
System.out.println("\033[31;4m" + "。゚(゚´ω`゚)゚。 AI 下线了!" + "\033[0m");
}
}
... ...
package com.crossoverjie.cim.client.service.impl.command;
import com.alibaba.fastjson.JSON;
import com.crossoverjie.cim.client.service.InnerCommand;
import com.crossoverjie.cim.client.service.impl.ClientInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-27 19:37
* @since JDK 1.8
*/
@Service
public class EchoInfoCommand implements InnerCommand {
private final static Logger LOGGER = LoggerFactory.getLogger(EchoInfoCommand.class);
@Autowired
private ClientInfo clientInfo;
@Override
public void process(String msg) {
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
LOGGER.info("client info=[{}]", JSON.toJSONString(clientInfo.get()));
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
}
}
... ...
package com.crossoverjie.cim.client.service.impl.command;
import com.crossoverjie.cim.client.service.InnerCommand;
import com.crossoverjie.cim.client.service.MsgHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-27 19:37
* @since JDK 1.8
*/
@Service
public class OpenAIModelCommand implements InnerCommand {
private final static Logger LOGGER = LoggerFactory.getLogger(OpenAIModelCommand.class);
@Autowired
private MsgHandle msgHandle ;
@Override
public void process(String msg) {
msgHandle.openAIModel();
System.out.println("\033[31;4m" + "Hello,我是估值两亿的 AI 机器人!" + "\033[0m");
}
}
... ...
package com.crossoverjie.cim.client.service.impl.command;
import com.crossoverjie.cim.client.service.InnerCommand;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.vo.res.OnlineUsersResVO;
import com.crossoverjie.cim.common.data.construct.TrieTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-27 19:37
* @since JDK 1.8
*/
@Service
public class PrefixSearchCommand implements InnerCommand {
private final static Logger LOGGER = LoggerFactory.getLogger(PrefixSearchCommand.class);
@Autowired
private RouteRequest routeRequest ;
@Override
public void process(String msg) {
try {
List<OnlineUsersResVO.DataBodyBean> onlineUsers = routeRequest.onlineUsers();
TrieTree trieTree = new TrieTree();
for (OnlineUsersResVO.DataBodyBean onlineUser : onlineUsers) {
trieTree.insert(onlineUser.getUserName());
}
String[] split = msg.split(" ");
String key = split[1];
List<String> list = trieTree.prefixSearch(key);
for (String res : list) {
res = res.replace(key, "\033[31;4m" + key + "\033[0m");
System.out.println(res);
}
} catch (Exception e) {
LOGGER.error("Exception", e);
}
}
}
... ...
package com.crossoverjie.cim.client.service.impl.command;
import com.crossoverjie.cim.client.service.InnerCommand;
import com.crossoverjie.cim.common.enums.SystemCommandEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.Map;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-27 19:37
* @since JDK 1.8
*/
@Service
public class PrintAllCommand implements InnerCommand {
private final static Logger LOGGER = LoggerFactory.getLogger(PrintAllCommand.class);
@Override
public void process(String msg) {
Map<String, String> allStatusCode = SystemCommandEnum.getAllStatusCode();
LOGGER.warn("====================================");
for (Map.Entry<String, String> stringStringEntry : allStatusCode.entrySet()) {
String key = stringStringEntry.getKey();
String value = stringStringEntry.getValue();
LOGGER.warn(key + "----->" + value);
}
LOGGER.warn("====================================");
}
}
... ...
package com.crossoverjie.cim.client.service.impl.command;
import com.crossoverjie.cim.client.service.InnerCommand;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.vo.res.OnlineUsersResVO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-27 19:37
* @since JDK 1.8
*/
@Service
public class PrintOnlineUsersCommand implements InnerCommand {
private final static Logger LOGGER = LoggerFactory.getLogger(PrintOnlineUsersCommand.class);
@Autowired
private RouteRequest routeRequest ;
@Override
public void process(String msg) {
try {
List<OnlineUsersResVO.DataBodyBean> onlineUsers = routeRequest.onlineUsers();
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
for (OnlineUsersResVO.DataBodyBean onlineUser : onlineUsers) {
LOGGER.info("userId={}=====userName={}", onlineUser.getUserId(), onlineUser.getUserName());
}
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
} catch (Exception e) {
LOGGER.error("Exception", e);
}
}
}
... ...
package com.crossoverjie.cim.client.service.impl.command;
import com.crossoverjie.cim.client.service.InnerCommand;
import com.crossoverjie.cim.client.service.MsgLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-27 19:37
* @since JDK 1.8
*/
@Service
public class QueryHistoryCommand implements InnerCommand {
private final static Logger LOGGER = LoggerFactory.getLogger(QueryHistoryCommand.class);
@Autowired
private MsgLogger msgLogger ;
@Override
public void process(String msg) {
String[] split = msg.split(" ");
String res = msgLogger.query(split[1]);
System.out.println(res);
}
}
... ...
package com.crossoverjie.cim.client.service.impl.command;
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.client.service.InnerCommand;
import com.crossoverjie.cim.client.service.MsgLogger;
import com.crossoverjie.cim.client.service.RouteRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-27 19:28
* @since JDK 1.8
*/
@Service
public class ShutDownCommand implements InnerCommand {
private final static Logger LOGGER = LoggerFactory.getLogger(ShutDownCommand.class);
@Autowired
private RouteRequest routeRequest ;
@Autowired
private CIMClient cimClient;
@Autowired
private MsgLogger msgLogger;
@Resource(name = "callBackThreadPool")
private ThreadPoolExecutor executor;
@Override
public void process(String msg) {
LOGGER.info("系统关闭中。。。。");
routeRequest.offLine();
msgLogger.stop();
executor.shutdown();
try {
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
LOGGER.info("线程池关闭中。。。。");
}
cimClient.close();
} catch (InterruptedException e) {
LOGGER.error("InterruptedException", e);
}
System.exit(0);
}
}
... ...
package com.crossoverjie.cim.client.thread;
import com.crossoverjie.cim.client.service.impl.ClientHeartBeatHandlerImpl;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-20 21:35
* @since JDK 1.8
*/
public class ReConnectJob implements Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(ReConnectJob.class);
private ChannelHandlerContext context ;
private HeartBeatHandler heartBeatHandler ;
public ReConnectJob(ChannelHandlerContext context) {
this.context = context;
this.heartBeatHandler = SpringBeanFactory.getBean(ClientHeartBeatHandlerImpl.class) ;
}
@Override
public void run() {
try {
heartBeatHandler.process(context);
} catch (Exception e) {
LOGGER.error("Exception",e);
}
}
}
... ...
... ... @@ -25,6 +25,8 @@ cim.server.route.request.url=http://45.78.28.220:8083/login
# 在线用户
cim.server.online.user.url=http://45.78.28.220:8083/onlineUser
# 清除路由信息
cim.clear.route.request.url=http://45.78.28.220:8083/offLine
###=======本地模拟======###
## 群发消息
... ... @@ -39,6 +41,9 @@ cim.server.online.user.url=http://45.78.28.220:8083/onlineUser
## 在线用户
#cim.server.online.user.url=http://localhost:8083/onlineUser
# 清除路由信息
#cim.clear.route.request.url=http://localhost:8083/offLine
# 客户端唯一ID
cim.user.id=1545574841528
cim.user.userName=zhangsan
... ... @@ -52,3 +57,9 @@ cim.callback.thread.pool.size = 2
management.security.enabled=false
# SpringAdmin 地址
spring.boot.admin.url=http://127.0.0.1:8888
# 检测多少秒没有收到服务端端心跳后重新登录获取连接
cim.heartbeat.time = 60
# 客户端连接失败重连次数
cim.reconnect.count =3
\ No newline at end of file
... ...
package com.crossoverjie.cim.client.service;
import com.crossoverjie.cim.client.CIMClientApplication;
import com.crossoverjie.cim.common.enums.SystemCommandEnum;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = CIMClientApplication.class)
@RunWith(SpringRunner.class)
public class InnerCommandContextTest {
@Autowired
private InnerCommandContext context;
@Test
public void execute() {
String msg = ":all";
InnerCommand execute = context.getInstance(msg);
execute.process(msg) ;
}
@Test
public void execute3() {
String msg = SystemCommandEnum.ONLINE_USER.getCommandType();
InnerCommand execute = context.getInstance(msg);
execute.process(msg) ;
}
@Test
public void execute4() {
String msg = ":q 天气";
InnerCommand execute = context.getInstance(msg);
execute.process(msg) ;
}
@Test
public void execute5() {
String msg = ":q crossoverJie";
InnerCommand execute = context.getInstance(msg);
execute.process(msg) ;
}
@Test
public void execute6() {
String msg = SystemCommandEnum.AI.getCommandType();
InnerCommand execute = context.getInstance(msg);
execute.process(msg) ;
}
@Test
public void execute7() {
String msg = SystemCommandEnum.QAI.getCommandType();
InnerCommand execute = context.getInstance(msg);
execute.process(msg) ;
}
@Test
public void execute8() {
String msg = ":pu cross";
InnerCommand execute = context.getInstance(msg);
execute.process(msg) ;
}
@Test
public void execute9() {
String msg = SystemCommandEnum.INFO.getCommandType();
InnerCommand execute = context.getInstance(msg);
execute.process(msg) ;
}
@Test
public void execute10() {
String msg = "dsds";
InnerCommand execute = context.getInstance(msg);
execute.process(msg) ;
}
// @Test
public void quit() {
String msg = ":q!";
InnerCommand execute = context.getInstance(msg);
execute.process(msg) ;
}
}
\ No newline at end of file
... ...
... ... @@ -38,5 +38,11 @@
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
... ...
... ... @@ -18,6 +18,10 @@ public class TrieTree {
* 大小写都可保存
*/
private static final int CHILDREN_LENGTH = 26 * 2;
/**
* 存放的最大字符串长度
*/
private static final int MAX_CHAR_LENGTH = 16;
private static final char UPPERCASE_STAR = 'A';
... ... @@ -209,7 +213,7 @@ public class TrieTree {
public boolean isEnd = false;
/**
* 如果支持查询,则不需要存储数据
* 如果只是查询,则不需要存储数据
*/
public char data;
... ...
package com.crossoverjie.cim.common.enums;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
... ... @@ -12,15 +10,16 @@ import java.util.Map;
* Date: 2018/12/26 18:38
* @since JDK 1.8
*/
public enum SystemCommandEnumType {
public enum SystemCommandEnum {
ALL(":all ","获取所有命令"),
ONLINE_USER(":olu ","获取所有在线用户"),
QUIT(":q! ","退出程序"),
QUERY(":q ","【:q 关键字】查询聊天记录"),
AI(":ai ","开启 AI 模式"),
QAI(":qai ","关闭 AI 模式"),
PREFIX(":pu ","模糊匹配用户")
ALL(":all ","获取所有命令","PrintAllCommand"),
ONLINE_USER(":olu ","获取所有在线用户","PrintOnlineUsersCommand"),
QUIT(":q! ","退出程序","ShutDownCommand"),
QUERY(":q ","【:q 关键字】查询聊天记录","QueryHistoryCommand"),
AI(":ai ","开启 AI 模式","OpenAIModelCommand"),
QAI(":qai ","关闭 AI 模式","CloseAIModelCommand"),
PREFIX(":pu ","模糊匹配用户","PrefixSearchCommand"),
INFO(":info ","获取客户端信息","EchoInfoCommand")
;
... ... @@ -30,15 +29,21 @@ public enum SystemCommandEnumType {
/** 枚举描述 */
private final String desc;
/**
* 实现类
*/
private final String clazz ;
/**
* 构建一个 。
* @param commandType 枚举值码。
* @param desc 枚举描述。
*/
private SystemCommandEnumType(String commandType, String desc) {
private SystemCommandEnum(String commandType, String desc, String clazz) {
this.commandType = commandType;
this.desc = desc;
this.clazz = clazz ;
}
/**
... ... @@ -48,6 +53,13 @@ public enum SystemCommandEnumType {
public String getCommandType() {
return commandType;
}
/**
* 获取 class。
* @return class。
*/
public String getClazz() {
return clazz;
}
/**
* 得到枚举描述。
... ... @@ -79,13 +91,21 @@ public enum SystemCommandEnumType {
* @return 全部枚举值码。
*/
public static Map<String,String> getAllStatusCode() {
List<String> list = new ArrayList<String>();
Map<String,String> map = new HashMap<String, String>(16) ;
for (SystemCommandEnumType status : values()) {
list.add(status.code());
for (SystemCommandEnum status : values()) {
map.put(status.getCommandType(),status.getDesc()) ;
}
return map;
}
public static Map<String,String> getAllClazz() {
Map<String,String> map = new HashMap<String, String>(16) ;
for (SystemCommandEnum status : values()) {
map.put(status.getCommandType().trim(),"com.crossoverjie.cim.client.service.impl.command." + status.getClazz()) ;
}
return map;
}
}
\ No newline at end of file
... ...
package com.crossoverjie.cim.common.kit;
import io.netty.channel.ChannelHandlerContext;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-20 17:15
* @since JDK 1.8
*/
public interface HeartBeatHandler {
/**
* 处理心跳
* @param ctx
* @throws Exception
*/
void process(ChannelHandlerContext ctx) throws Exception ;
}
... ...
package com.crossoverjie.cim.server.util;
package com.crossoverjie.cim.common.util;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
... ...
... ... @@ -46,6 +46,102 @@ public class TrieTreeTest {
}
@Test
public void prefixSea() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("java");
trieTree.insert("jsf");
trieTree.insert("jsp");
trieTree.insert("javascript");
trieTree.insert("php");
String result ="";
List<String> ab = trieTree.prefixSearch("jav");
for (String s : ab) {
result += s+",";
System.out.println(s);
}
Assert.assertTrue(result.equals("java,javascript,"));
}
@Test
public void prefixSea2() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("java");
trieTree.insert("jsf");
trieTree.insert("jsp");
trieTree.insert("javascript");
trieTree.insert("php");
String result ="";
List<String> ab = trieTree.prefixSearch("j");
for (String s : ab) {
result += s+",";
System.out.println(s);
}
Assert.assertTrue(result.equals("java,javascript,jsf,jsp,"));
}
@Test
public void prefixSea3() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("java");
trieTree.insert("jsf");
trieTree.insert("jsp");
trieTree.insert("javascript");
trieTree.insert("php");
String result ="";
List<String> ab = trieTree.prefixSearch("js");
for (String s : ab) {
result += s+",";
System.out.println(s);
}
Assert.assertTrue(result.equals("jsf,jsp,"));
}
@Test
public void prefixSea4() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("java");
trieTree.insert("jsf");
trieTree.insert("jsp");
trieTree.insert("javascript");
trieTree.insert("php");
String result ="";
List<String> ab = trieTree.prefixSearch("jav");
for (String s : ab) {
result += s+",";
System.out.println(s);
}
Assert.assertTrue(result.equals("java,javascript,"));
}
@Test
public void prefixSea5() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("java");
trieTree.insert("jsf");
trieTree.insert("jsp");
trieTree.insert("javascript");
trieTree.insert("php");
String result ="";
List<String> ab = trieTree.prefixSearch("js");
for (String s : ab) {
result += s+",";
System.out.println(s);
}
Assert.assertTrue(result.equals("jsf,jsp,"));
}
@Test
public void prefixSearch() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("abc");
... ... @@ -112,6 +208,19 @@ public class TrieTreeTest {
}
Assert.assertTrue(result.equals("Cde,"));
}
@Test
public void prefixSearch44() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("a");
trieTree.insert("b");
trieTree.insert("c");
trieTree.insert("d");
trieTree.insert("e");
trieTree.insert("f");
trieTree.insert("g");
trieTree.insert("h");
}
@Test
public void prefixSearch5() throws Exception {
... ...
... ... @@ -9,7 +9,7 @@ public class SystemCommandEnumTypeTest {
@Test
public void getAllStatusCode() throws Exception {
Map<String, String> allStatusCode = SystemCommandEnumType.getAllStatusCode();
Map<String, String> allStatusCode = SystemCommandEnum.getAllStatusCode();
for (Map.Entry<String, String> stringStringEntry : allStatusCode.entrySet()) {
String key = stringStringEntry.getKey();
String value = stringStringEntry.getValue();
... ...
... ... @@ -81,7 +81,6 @@ public class RouteController {
return res;
}
// TODO: 2018/12/26 这些基于 HTTP 接口的远程通信都可以换为 SpringCloud
/**
* 私聊路由
... ...
... ... @@ -174,6 +174,9 @@ public class AccountServiceRedisImpl implements AccountService {
@Override
public void offLine(Long userId) throws Exception {
// TODO: 2019-01-21 改为一个原子命令,以防数据一致性
//删除路由
redisTemplate.delete(ROUTE_PREFIX + userId) ;
... ...
... ... @@ -64,13 +64,6 @@
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
... ...
... ... @@ -51,7 +51,7 @@ public class BeanConfig {
public CIMRequestProto.CIMReqProtocol heartBeat() {
CIMRequestProto.CIMReqProtocol heart = CIMRequestProto.CIMReqProtocol.newBuilder()
.setRequestId(0L)
.setReqMsg("ping")
.setReqMsg("pong")
.setType(Constants.CommandType.PING)
.build();
return heart;
... ...
... ... @@ -3,13 +3,18 @@ package com.crossoverjie.cim.server.handle;
import com.alibaba.fastjson.JSONObject;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.crossoverjie.cim.common.util.NettyAttrUtil;
import com.crossoverjie.cim.server.config.AppConfiguration;
import com.crossoverjie.cim.server.util.NettyAttrUtil;
import com.crossoverjie.cim.server.kit.ServerHeartBeatHandlerImpl;
import com.crossoverjie.cim.server.util.SessionSocketHolder;
import com.crossoverjie.cim.server.util.SpringBeanFactory;
import io.netty.channel.*;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
... ... @@ -55,23 +60,11 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.READER_IDLE) {
AppConfiguration configuration = SpringBeanFactory.getBean(AppConfiguration.class);
long heartBeatTime = configuration.getHeartBeatTime() * 1000;
//向客户端发送消息
CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
CIMRequestProto.CIMReqProtocol.class);
ctx.writeAndFlush(heartBeat).addListeners(ChannelFutureListener.CLOSE_ON_FAILURE);
LOGGER.info("定时检测客户端端是否存活");
Long lastReadTime = NettyAttrUtil.getReaderTime(ctx.channel());
long now = System.currentTimeMillis();
if (lastReadTime != null && now - lastReadTime > heartBeatTime){
CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());
LOGGER.warn("客户端[{}]心跳超时[{}]ms,需要关闭连接!",userInfo.getUserName(),now - lastReadTime);
userOffLine(userInfo, (NioSocketChannel) ctx.channel());
ctx.channel().close();
}
HeartBeatHandler heartBeatHandler = SpringBeanFactory.getBean(ServerHeartBeatHandlerImpl.class) ;
heartBeatHandler.process(ctx) ;
}
}
super.userEventTriggered(ctx, evt);
... ... @@ -93,7 +86,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
}
/**
* 清除路由关系
* 下线,清除路由关系
*
* @param userInfo
* @throws IOException
... ... @@ -137,6 +130,15 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
//心跳更新时间
if (msg.getType() == Constants.CommandType.PING){
NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis());
//向客户端响应 pong 消息
CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
CIMRequestProto.CIMReqProtocol.class);
ctx.writeAndFlush(heartBeat).addListeners((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
LOGGER.error("IO error,close Channel");
future.channel().close();
}
}) ;
}
}
... ...
... ... @@ -25,8 +25,8 @@ public class CIMServerInitializer extends ChannelInitializer<Channel> {
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//30 秒没有向客户端发送消息就发生心跳
.addLast(new IdleStateHandler(30, 0, 0))
//11 秒没有向客户端发送消息就发生心跳
.addLast(new IdleStateHandler(11, 0, 0))
// google Protobuf 编解码
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(CIMRequestProto.CIMReqProtocol.getDefaultInstance()))
... ...
package com.crossoverjie.cim.server.kit;
import com.alibaba.fastjson.JSONObject;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.server.config.AppConfiguration;
import com.crossoverjie.cim.server.util.SessionSocketHolder;
import com.crossoverjie.cim.server.util.SpringBeanFactory;
import io.netty.channel.socket.nio.NioSocketChannel;
import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-20 17:20
* @since JDK 1.8
*/
@Component
public class RouteHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(RouteHandler.class);
private final MediaType mediaType = MediaType.parse("application/json");
/**
* 用户下线
* @param userInfo
* @param channel
* @throws IOException
*/
public void userOffLine(CIMUserInfo userInfo, NioSocketChannel channel) throws IOException {
if (userInfo != null){
LOGGER.info("用户[{}]下线", userInfo.getUserName());
SessionSocketHolder.removeSession(userInfo.getUserId());
//清除路由关系
clearRouteInfo(userInfo);
}
SessionSocketHolder.remove(channel);
}
/**
* 清除路由关系
*
* @param userInfo
* @throws IOException
*/
private void clearRouteInfo(CIMUserInfo userInfo) throws IOException {
OkHttpClient okHttpClient = SpringBeanFactory.getBean(OkHttpClient.class);
AppConfiguration configuration = SpringBeanFactory.getBean(AppConfiguration.class);
JSONObject jsonObject = new JSONObject();
jsonObject.put("userId", userInfo.getUserId());
jsonObject.put("msg", "offLine");
RequestBody requestBody = RequestBody.create(mediaType, jsonObject.toString());
Request request = new Request.Builder()
.url(configuration.getClearRouteUrl())
.post(requestBody)
.build();
Response response = null;
try {
response = okHttpClient.newCall(request).execute();
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
} finally {
response.body().close();
}
}
}
... ...
package com.crossoverjie.cim.server.kit;
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.common.util.NettyAttrUtil;
import com.crossoverjie.cim.server.config.AppConfiguration;
import com.crossoverjie.cim.server.util.SessionSocketHolder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-01-20 17:16
* @since JDK 1.8
*/
@Service
public class ServerHeartBeatHandlerImpl implements HeartBeatHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(ServerHeartBeatHandlerImpl.class);
@Autowired
private RouteHandler routeHandler ;
@Autowired
private AppConfiguration appConfiguration ;
@Override
public void process(ChannelHandlerContext ctx) throws Exception {
long heartBeatTime = appConfiguration.getHeartBeatTime() * 1000;
Long lastReadTime = NettyAttrUtil.getReaderTime(ctx.channel());
long now = System.currentTimeMillis();
if (lastReadTime != null && now - lastReadTime > heartBeatTime){
CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());
if (userInfo != null){
LOGGER.warn("客户端[{}]心跳超时[{}]ms,需要关闭连接!",userInfo.getUserName(),now - lastReadTime);
}
routeHandler.userOffLine(userInfo, (NioSocketChannel) ctx.channel());
ctx.channel().close();
}
}
}
... ...
... ... @@ -34,4 +34,4 @@ app.zk.root=/route
cim.clear.route.request.url=http://localhost:8083/offLine
# 检测多少秒没有收到客户端心跳后服务端关闭连接
cim.heartbeat.time = 40
\ No newline at end of file
cim.heartbeat.time = 30
\ No newline at end of file
... ...
... ... @@ -34,3 +34,13 @@ spring.redis.host=47.98.194.60
spring.redis.port=6379
![](https://ws2.sinaimg.cn/large/006tNbRwly1fymbjn98f6j31bn0u0aff.jpg)
账号信息会存放在 `Redis`
## 本地如何模拟调试?
至少需要启动以下服务:
1. 服务端
2. 路由
3. 至少两个客户端
4. `redis`、`zk` 基础组件
\ No newline at end of file
... ...
... ... @@ -30,7 +30,6 @@
<module>cim-server</module>
<module>cim-client</module>
<module>cim-common</module>
<module>springboot-admin</module>
<module>cim-zk</module>
<module>cim-forward-route</module>
</modules>
... ...
# 监控 admin
[SpringBoot admin](https://github.com/codecentric/spring-boot-admin)
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.crossoverjie.netty</groupId>
<artifactId>cim</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>springboot-admin</artifactId>
<packaging>jar</packaging>
<name>admin</name>
<description>springBoot admin</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<logback.version>1.2.3</logback.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-server</artifactId>
<version>1.5.7</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-server-ui</artifactId>
<version>1.5.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.ai.obc.springboot.admin;
import de.codecentric.boot.admin.config.EnableAdminServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Configuration;
@SpringBootApplication
@Configuration
@EnableAutoConfiguration
@EnableAdminServer
public class AdminApplication {
public static void main(String[] args) {
SpringApplication.run(AdminApplication.class, args);
}
}
spring.application.name=spring-boot-admin
server.cimServerPort = 8888
logging.level.root=info
\ No newline at end of file
package com.ai.obc.springboot.admin;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class AdminApplicationTests {
@Test
public void contextLoads() {
}
}