作者 crossoverJie
提交者 GitHub

Merge branch 'master' into feature_add_visualisation

正在显示 35 个修改的文件 包含 1469 行增加106 行删除
... ... @@ -116,3 +116,9 @@ fabric.properties
.idea/httpRequests
.idea/
*.iml
*.project
*.settings/
*.classpath
*.factorypath
.vscode/
\ No newline at end of file
... ...
... ... @@ -38,36 +38,21 @@
| <img src="https://ws3.sinaimg.cn/large/006tNbRwly1fys8flaofrj315e0ose81.jpg" height="295px" /> | <img src="https://ws4.sinaimg.cn/large/006tNbRwly1fys8mpa6wij31240lghdt.jpg" height="295px" />
### cim-server
`IM` 服务端;用于接收 `client` 连接、消息透传、消息推送等功能。
**支持集群部署。**
### cim-forward-route
消息路由服务器;用于处理消息路由、消息转发、用户登录、用户下线以及一些运营工具(获取在线用户数等)。
### cim-client
`IM` 客户端;给用户使用的消息终端,一个命令即可启动并向其他人发起通讯(群聊、私聊)。
## TODO LIST
* [x] 群聊。
* [x] 私聊。
* [x] 内置命令。
* [x] [群聊](#群聊)。
* [x] [私聊](#私聊)。
* [x] [内置命令](#客户端内置命令)。
* [x] [聊天记录查询](#聊天记录查询)。
* [x] [一键开启价值 2 亿的 `AI` 模式](#ai-模式)。
* [x] 使用 `Google Protocol Buffer` 高效编解码。
* [x] 根据实际情况灵活的水平扩容、缩容。
* [x] 路由(`cim-forward-route`)服务自身是无状态,可用 `Nginx` 代理支持高可用。
* [ ] 聊天记录查询。
* [x] 服务端自动剔除离线客户端。
* [ ] 弱网环境下客户端自动重连。
* [ ] 分组群聊。
* [ ] 离线消息。
* [ ] 路由服务自动分配。
* [ ] 协议支持消息加密。
* [ ] 弱网情况下客户端自动上线。
* [ ] 更多的客户端路由策略。
... ... @@ -81,6 +66,21 @@
- `Redis` 存放各个客户端的路由信息、账号信息、在线状态等。
- `Zookeeper` 用于 `IM-server` 服务的注册与发现。
### cim-server
`IM` 服务端;用于接收 `client` 连接、消息透传、消息推送等功能。
**支持集群部署。**
### cim-forward-route
消息路由服务器;用于处理消息路由、消息转发、用户登录、用户下线以及一些运营工具(获取在线用户数等)。
### cim-client
`IM` 客户端;给用户使用的消息终端,一个命令即可启动并向其他人发起通讯(群聊、私聊)。
## 流程图
![](https://ws1.sinaimg.cn/large/006tNbRwly1fylfxevl2ij30it0etaau.jpg)
... ... @@ -135,19 +135,80 @@ java -jar cim-client-1.0.0-SNAPSHOT.jar --server.port=8084 --cim.user.id=銝
如上图,启动两个客户端可以互相通信即可。
### 本地启动客户端
#### 注册账号
```shell
curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
"reqNo": "1234567890",
"timeStamp": 0,
"userName": "zhangsan"
}' 'http://路由服务器:8083/registerAccount'
```
从返回结果中获取 `userId`
```json
{
"code":"9000",
"message":"成功",
"reqNo":null,
"dataBody":{
"userId":1547028929407,
"userName":"test"
}
}
```
#### 启动本地客户端
```shell
# 启动本地客户端
cp /cim/cim-client/target/cim-client-1.0.0-SNAPSHOT.jar /xx/work/route0/
cd /xx/work/route0/
java -jar cim-client-1.0.0-SNAPSHOT.jar --server.port=8084 --cim.user.id=上方返回的userId --cim.user.userName=用户名 --cim.group.route.request.url=http://路由服务器:8083/groupRoute --cim.server.route.request.url=http://路由服务器:8083/login
```
## 客户端内置命令
| 命令 | 描述|
| ------ | ------ |
| `:q` | 退出客户端|
| `:q!` | 退出客户端|
| `:olu` | 获取所有在线用户信息 |
| `:all` | 获取所有命令 |
| `:q` | 【:q 关键字】查询聊天记录 |
| `:ai` | 开启 AI 模式 |
| `:qai` | 关闭 AI 模式 |
| `:pu` | 模糊匹配用户 |
| `:` | 更多命令正在开发中。。 |
![](https://ws3.sinaimg.cn/large/006tNbRwly1fylh7bdlo6g30go01shdt.gif)
### 聊天记录查询
![](https://ws2.sinaimg.cn/large/006tNc79gy1fz3uwmb5hsj30s8046wm3.jpg)
使用命令 `:q 关键字` 即可查询与个人相关的聊天记录。
> 客户端聊天记录默认存放在 `/opt/logs/cim/`,所以需要这个目录的写入权限。也可在启动命令中加入 `--cim.msg.logger.path = /自定义` 参数自定义目录。
### AI 模式
![](https://ws3.sinaimg.cn/large/006tNc79gy1fz3vf3nsq3j31dc0j01ky.jpg)
使用命令 `:ai` 开启 AI 模式,之后所有的消息都会由 `AI` 响应。
`:qai` 退出 AI 模式。
### 前缀匹配用户名
![](https://ws4.sinaimg.cn/large/006tNc79gy1fz3vo4tgkjj31ni09s41u.jpg)
使用命令 `:qu prefix` 可以按照前缀的方式搜索用户信息。
> 该功能主要用于在移动端中的输入框中搜索用户。
## 群聊/私聊
### 群聊
... ... @@ -173,7 +234,7 @@ java -jar cim-client-1.0.0-SNAPSHOT.jar --server.port=8084 --cim.user.id=銝
![](https://ws3.sinaimg.cn/large/006tNbRwly1fylicmjj6cj31wg07c4qp.jpg)
![](https://ws1.sinaimg.cn/large/006tNbRwly1fylicwhe04j31ua03ejsv.jpg)
同时另一个账号是收不到消息的。
同时另一个账号收不到消息。
![](https://ws3.sinaimg.cn/large/006tNbRwly1fylie727jaj31t20dq1ky.jpg)
... ...
... ... @@ -66,10 +66,6 @@
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-client</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
... ...
... ... @@ -19,6 +19,9 @@ public class AppConfiguration {
@Value("${cim.user.userName}")
private String userName;
@Value("${cim.msg.logger.path}")
private String msgLoggerPath ;
public Long getUserId() {
return userId;
}
... ... @@ -34,4 +37,12 @@ public class AppConfiguration {
public void setUserName(String userName) {
this.userName = userName;
}
public String getMsgLoggerPath() {
return msgLoggerPath;
}
public void setMsgLoggerPath(String msgLoggerPath) {
this.msgLoggerPath = msgLoggerPath;
}
}
... ...
package com.crossoverjie.cim.client.config;
import com.crossoverjie.cim.client.handle.MsgHandleCaller;
import com.crossoverjie.cim.client.service.impl.MsgCallBackListener;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
... ... @@ -87,9 +88,7 @@ public class BeanConfig {
*/
@Bean
public MsgHandleCaller buildCaller(){
MsgHandleCaller caller = new MsgHandleCaller(msg -> {
//处理业务逻辑,或者自定义实现接口
}) ;
MsgHandleCaller caller = new MsgHandleCaller(new MsgCallBackListener()) ;
return caller ;
}
... ...
package com.crossoverjie.cim.client.handle;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.crossoverjie.cim.common.protocol.CIMResponseProto;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
... ... @@ -39,7 +41,7 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
if (idleStateEvent.state() == IdleState.WRITER_IDLE){
CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
CIMRequestProto.CIMReqProtocol.class);
ctx.writeAndFlush(heartBeat) ;
ctx.writeAndFlush(heartBeat).addListeners(ChannelFutureListener.CLOSE_ON_FAILURE) ;
}
... ... @@ -56,15 +58,18 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, CIMResponseProto.CIMResProtocol responseProtocol) throws Exception {
protected void channelRead0(ChannelHandlerContext channelHandlerContext, CIMResponseProto.CIMResProtocol msg) throws Exception {
//从服务端收到消息时被调用
//LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ;
//回调消息
callBackMsg(responseProtocol.getResMsg());
if (msg.getType() != Constants.CommandType.PING) {
//回调消息
callBackMsg(msg.getResMsg());
LOGGER.info(msg.getResMsg());
}
LOGGER.info(responseProtocol.getResMsg());
}
/**
... ...
... ... @@ -24,8 +24,8 @@ public class CIMClientHandleInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//60 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(0, 60, 0))
//30 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(0, 30, 0))
//心跳解码
//.addLast(new HeartbeatEncode())
... ...
... ... @@ -2,6 +2,7 @@ package com.crossoverjie.cim.client.scanner;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.service.MsgHandle;
import com.crossoverjie.cim.client.service.MsgLogger;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
... ... @@ -26,9 +27,12 @@ public class Scan implements Runnable {
private MsgHandle msgHandle ;
private MsgLogger msgLogger ;
public Scan() {
this.configuration = SpringBeanFactory.getBean(AppConfiguration.class);
this.msgHandle = SpringBeanFactory.getBean(MsgHandle.class) ;
this.msgLogger = SpringBeanFactory.getBean(MsgLogger.class) ;
}
@Override
... ... @@ -50,6 +54,8 @@ public class Scan implements Runnable {
//真正的发送消息
msgHandle.sendMsg(msg) ;
//写入聊天记录
msgLogger.log(msg) ;
LOGGER.info("{}:【{}】", configuration.getUserName(), msg);
}
... ...
package com.crossoverjie.cim.client.service;
/**
* Function:
*
* @author crossoverJie
* Date: 2019/1/6 15:23
* @since JDK 1.8
*/
public interface MsgLogger {
/**
* 异步写入消息
* @param msg
*/
void log(String msg) ;
/**
* 停止写入
*/
void stop() ;
/**
* 查询聊天记录
* @param key 关键字
* @return
*/
String query(String key) ;
}
... ...
package com.crossoverjie.cim.client.service.impl;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.service.MsgLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.*;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Function:
*
* @author crossoverJie
* Date: 2019/1/6 15:26
* @since JDK 1.8
*/
@Service
public class AsyncMsgLogger implements MsgLogger {
private final static Logger LOGGER = LoggerFactory.getLogger(AsyncMsgLogger.class);
/**
* The default buffer size.
*/
private static final int DEFAULT_QUEUE_SIZE = 16;
private BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(DEFAULT_QUEUE_SIZE);
private volatile boolean started = false;
private Worker worker = new Worker();
@Autowired
private AppConfiguration appConfiguration;
@Override
public void log(String msg) {
//开始消费
startMsgLogger();
try {
// TODO: 2019/1/6 消息堆满是否阻塞线程?
blockingQueue.put(msg);
} catch (InterruptedException e) {
LOGGER.error("InterruptedException", e);
}
}
private class Worker extends Thread {
@Override
public void run() {
while (started) {
try {
String msg = blockingQueue.take();
writeLog(msg);
} catch (InterruptedException e) {
break;
}
}
}
}
private void writeLog(String msg) {
LocalDate today = LocalDate.now();
int year = today.getYear();
int month = today.getMonthValue();
int day = today.getDayOfMonth();
String dir = appConfiguration.getMsgLoggerPath() + appConfiguration.getUserName() + "/";
String fileName = dir + year + month + day + ".log";
Path file = Paths.get(fileName);
boolean exists = Files.exists(Paths.get(dir), LinkOption.NOFOLLOW_LINKS);
try {
if (!exists) {
Files.createDirectories(Paths.get(dir));
}
List<String> lines = Arrays.asList(msg);
Files.write(file, lines, Charset.forName("UTF-8"), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
} catch (IOException e) {
LOGGER.info("IOException", e);
}
}
/**
* 开始工作
*/
private void startMsgLogger() {
if (started) {
return;
}
worker.setDaemon(true);
worker.setName("AsyncMsgLogger-Worker");
started = true;
worker.start();
}
@Override
public void stop() {
started = false;
worker.interrupt();
}
@Override
public String query(String key) {
StringBuilder sb = new StringBuilder();
Path path = Paths.get(appConfiguration.getMsgLoggerPath() + appConfiguration.getUserName() + "/");
try {
Stream<Path> list = Files.list(path);
List<Path> collect = list.collect(Collectors.toList());
for (Path file : collect) {
List<String> strings = Files.readAllLines(file);
for (String msg : strings) {
if (msg.trim().contains(key)) {
sb.append(msg).append("\n");
}
}
}
} catch (IOException e) {
LOGGER.info("IOException", e);
}
return sb.toString().replace(key, "\033[31;4m" + key + "\033[0m");
}
}
... ...
package com.crossoverjie.cim.client.service.impl;
import com.crossoverjie.cim.client.service.CustomMsgHandleListener;
import com.crossoverjie.cim.client.service.MsgLogger;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
/**
* Function:自定义收到消息回调
*
* @author crossoverJie
* Date: 2019/1/6 17:49
* @since JDK 1.8
*/
public class MsgCallBackListener implements CustomMsgHandleListener {
private MsgLogger msgLogger ;
public MsgCallBackListener() {
this.msgLogger = SpringBeanFactory.getBean(MsgLogger.class) ;
}
@Override
public void handle(String msg) {
msgLogger.log(msg) ;
}
}
... ...
... ... @@ -3,10 +3,12 @@ package com.crossoverjie.cim.client.service.impl;
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.service.MsgHandle;
import com.crossoverjie.cim.client.service.MsgLogger;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.P2PReqVO;
import com.crossoverjie.cim.client.vo.res.OnlineUsersResVO;
import com.crossoverjie.cim.common.data.construct.TrieTree;
import com.crossoverjie.cim.common.enums.SystemCommandEnumType;
import com.crossoverjie.cim.common.util.StringUtil;
import org.slf4j.Logger;
... ... @@ -41,8 +43,25 @@ public class MsgHandler implements MsgHandle {
@Autowired
private CIMClient cimClient ;
@Autowired
private MsgLogger msgLogger ;
private boolean aiModel = false ;
@Override
public void sendMsg(String msg) {
if (aiModel){
aiChat(msg);
}else {
normalChat(msg);
}
}
/**
* 正常聊天
* @param msg
*/
private void normalChat(String msg) {
String[] totalMsg = msg.split(";;");
if (totalMsg.length > 1) {
//私聊
... ... @@ -67,6 +86,19 @@ public class MsgHandler implements MsgHandle {
}
}
/**
* AI model
* @param msg
*/
private void aiChat(String msg) {
msg = msg.replace("吗","") ;
msg = msg.replace("嘛","") ;
msg = msg.replace("?","!");
msg = msg.replace("?","!");
msg = msg.replace("你","我");
System.out.println("AI:\033[31;4m" + msg + "\033[0m");
}
@Override
public void groupChat(GroupReqVO groupReqVO) throws Exception {
routeRequest.sendGroupMsg(groupReqVO);
... ... @@ -104,6 +136,20 @@ public class MsgHandler implements MsgHandle {
//打印在线用户
printOnlineUsers();
} else if (msg.startsWith(SystemCommandEnumType.QUERY.getCommandType().trim() + " ")){
//查询聊天记录
queryChatHistory(msg);
}else if (SystemCommandEnumType.AI.getCommandType().trim().equals(msg.toLowerCase())){
//开启 AI 模式
aiModel = true ;
System.out.println("\033[31;4m" + "Hello,我是估值两亿的 AI 机器人!" + "\033[0m");
}else if (SystemCommandEnumType.QAI.getCommandType().trim().equals(msg.toLowerCase())){
//关闭 AI 模式
aiModel = false ;
System.out.println("\033[31;4m" + "。゚(゚´ω`゚)゚。 AI 下线了!" + "\033[0m");
}else if (msg.startsWith(SystemCommandEnumType.PREFIX.getCommandType().trim() + " ")){
//模糊匹配
prefixSearch(msg);
}else {
printAllCommand(allStatusCode);
}
... ... @@ -117,6 +163,43 @@ public class MsgHandler implements MsgHandle {
}
/**
* 模糊匹配
* @param msg
*/
private void prefixSearch(String msg) {
try {
List<OnlineUsersResVO.DataBodyBean> onlineUsers = routeRequest.onlineUsers();
TrieTree trieTree = new TrieTree() ;
for (OnlineUsersResVO.DataBodyBean onlineUser : onlineUsers) {
trieTree.insert(onlineUser.getUserName());
}
String[] split = msg.split(" ");
String key = split[1];
List<String> list = trieTree.prefixSearch(key);
for (String res : list) {
res = res.replace(key, "\033[31;4m" + key + "\033[0m");
System.out.println(res);
}
} catch (Exception e) {
LOGGER.error("Exception" ,e);
}
}
/**
* 查询聊天记录
* @param msg
*/
private void queryChatHistory(String msg) {
String[] split = msg.split(" ") ;
String res = msgLogger.query(split[1]);
System.out.println(res);
}
/**
* 打印在线用户
*/
... ... @@ -140,6 +223,7 @@ public class MsgHandler implements MsgHandle {
*/
private void shutdown() {
LOGGER.info("系统关闭中。。。。");
msgLogger.stop();
executor.shutdown();
try {
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
... ...
... ... @@ -69,8 +69,12 @@ public class RouteRequestImpl implements RouteRequest {
.build();
Response response = okHttpClient.newCall(request).execute() ;
if (!response.isSuccessful()){
throw new IOException("Unexpected code " + response);
try {
if (!response.isSuccessful()){
throw new IOException("Unexpected code " + response);
}
}finally {
response.body().close();
}
}
... ... @@ -92,12 +96,18 @@ public class RouteRequestImpl implements RouteRequest {
throw new IOException("Unexpected code " + response);
}
String json = response.body().string() ;
BaseResponse baseResponse = JSON.parseObject(json, BaseResponse.class);
ResponseBody body = response.body();
try {
String json = body.string() ;
BaseResponse baseResponse = JSON.parseObject(json, BaseResponse.class);
//选择的账号不存在
if (baseResponse.getCode().equals(StatusEnum.OFF_LINE.getCode())){
LOGGER.error(p2PReqVO.getReceiveUserId() + ":" + StatusEnum.OFF_LINE.getMessage());
}
//选择的账号不存在
if (baseResponse.getCode().equals(StatusEnum.OFF_LINE.getCode())){
LOGGER.error(p2PReqVO.getReceiveUserId() + ":" + StatusEnum.OFF_LINE.getMessage());
}finally {
body.close();
}
}
... ... @@ -118,19 +128,23 @@ public class RouteRequestImpl implements RouteRequest {
if (!response.isSuccessful()){
throw new IOException("Unexpected code " + response);
}
String json = response.body().string();
CIMServerResVO cimServerResVO = JSON.parseObject(json, CIMServerResVO.class);
//重复登录
if (cimServerResVO.getCode().equals(StatusEnum.REPEAT_LOGIN.getCode())){
LOGGER.error(appConfiguration.getUserName() + ":" + StatusEnum.REPEAT_LOGIN.getMessage());
System.exit(-1);
CIMServerResVO cimServerResVO ;
ResponseBody body = response.body();
try {
String json = body.string();
cimServerResVO = JSON.parseObject(json, CIMServerResVO.class);
//重复失败
if (!cimServerResVO.getCode().equals(StatusEnum.SUCCESS.getCode())){
LOGGER.error(appConfiguration.getUserName() + ":" + cimServerResVO.getMessage());
System.exit(-1);
}
}finally {
body.close();
}
if (!cimServerResVO.getCode().equals(StatusEnum.SUCCESS.getCode())){
throw new RuntimeException("route server exception code=" + cimServerResVO.getCode()) ;
}
return cimServerResVO.getDataBody();
}
... ... @@ -152,8 +166,15 @@ public class RouteRequestImpl implements RouteRequest {
}
String json = response.body().string() ;
OnlineUsersResVO onlineUsersResVO = JSON.parseObject(json, OnlineUsersResVO.class);
ResponseBody body = response.body();
OnlineUsersResVO onlineUsersResVO ;
try {
String json = body.string() ;
onlineUsersResVO = JSON.parseObject(json, OnlineUsersResVO.class);
}finally {
body.close();
}
return onlineUsersResVO.getDataBody();
}
... ...
spring.application.name=netty-heartbeat-client
spring.application.name=cim-client
# web port
server.port=8082
... ... @@ -8,6 +8,9 @@ swagger.enable = true
logging.level.root=info
#消息记录存放路径
cim.msg.logger.path = /opt/logs/cim/
###=======生产模拟======###
# 群发消息
... ... @@ -34,7 +37,7 @@ cim.server.online.user.url=http://45.78.28.220:8083/onlineUser
#cim.server.route.request.url=http://localhost:8083/login
#
## 在线用户
#cim.server.online.user=http://localhost:8083/onlineUser
#cim.server.online.user.url=http://localhost:8083/onlineUser
# 客户端唯一ID
cim.user.id=1545574841528
... ...
package com.crossoverjie.cim.client.service.impl;
import com.crossoverjie.cim.client.CIMClientApplication;
import com.crossoverjie.cim.client.service.MsgLogger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.TimeUnit;
@SpringBootTest(classes = CIMClientApplication.class)
@RunWith(SpringRunner.class)
public class AsyncMsgLoggerTest {
@Autowired
private MsgLogger msgLogger ;
@Test
public void writeLog() throws Exception {
for (int i = 0; i < 10; i++) {
msgLogger.log("zhangsan:【asdsd】" + i);
}
TimeUnit.SECONDS.sleep(2);
}
@Test
public void query(){
String crossoverJie = msgLogger.query("crossoverJie");
System.out.println(crossoverJie);
}
}
\ No newline at end of file
... ...
... ... @@ -8,7 +8,12 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.*;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
... ... @@ -60,4 +65,119 @@ public class CommonTest {
}
LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
}
@Test
public void searchMsg(){
StringBuilder sb = new StringBuilder() ;
String allMsg = "于是在之前的基础上我完善了一些内容,先来看看这个项目的介绍吧:\n" +
"\n" +
"CIM(CROSS-IM) 一款面向开发者的 IM(即时通讯)系统;同时提供了一些组件帮助开发者构建一款属于自己可水平扩展的 IM 。\n" +
"\n" +
"借助 CIM 你可以实现以下需求:" ;
String key = "IM" ;
String[] split = allMsg.split("\n");
for (String msg : split) {
if (msg.trim().contains(key)){
sb.append(msg).append("\n") ;
}
}
int pos = 0;
String result = sb.toString();
int count = 1 ;
int multiple = 2 ;
while((pos = result.indexOf(key, pos)) >= 0) {
LOGGER.info("{},{}",pos, pos + key.length());
if (count == 1){
sb.insert(pos,"**");
}else {
Double pow = Math.pow(multiple, count);
sb.insert(pos +pow.intValue(),"**");
}
pos += key.length();
if (count == 1){
sb.insert(pos +2,"**");
}else {
Double pow = Math.pow(multiple, count);
sb.insert((pos +2) + pow.intValue(),"**");
}
count ++ ;
}
System.out.println(sb);
}
@Test
public void searchMsg2(){
StringBuilder sb = new StringBuilder() ;
String allMsg = "于是在之前的基础上我完善了一些内容,先来看看这个项目的介绍吧:\n" +
"\n" +
"CIM(CROSS-IM) 一款面向开发者的 IM(即时通讯)系统;同时提供了一些组件帮助开发者构建一款属于自己可水平扩展的 IM 。\n" +
"\n" +
"借助 CIM 你可以实现以下需求:" ;
String key = "CIM" ;
String[] split = allMsg.split("\n");
for (String msg : split) {
if (msg.trim().contains(key)){
sb.append(msg).append("\n") ;
}
}
int pos = 0;
String result = sb.toString();
int count = 1 ;
int multiple = 2 ;
while((pos = result.indexOf(key, pos)) >= 0) {
LOGGER.info("{},{}",pos, pos + key.length());
pos += key.length();
count ++ ;
}
System.out.println(sb.toString());
System.out.println(sb.toString().replace(key,"\033[31;4m" + key+"\033[0m"));
}
@Test
public void log(){
String msg = "hahahdsadsd" ;
LocalDate today = LocalDate.now();
int year = today.getYear();
int month = today.getMonthValue();
int day = today.getDayOfMonth();
String dir = "/opt/logs/cim/zhangsan" + "/";
String fileName = dir + year + month + day + ".log";
LOGGER.info("fileName={}", fileName);
Path file = Paths.get(fileName);
boolean exists = Files.exists(Paths.get(dir), LinkOption.NOFOLLOW_LINKS);
try {
if (!exists) {
Files.createDirectories(Paths.get(dir));
}
List<String> lines = Arrays.asList(msg);
Files.write(file, lines, Charset.forName("UTF-8"), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
} catch (IOException e) {
LOGGER.info("IOException", e);
}
}
}
... ...
package com.crossoverjie.cim.common.data.construct;
import com.crossoverjie.cim.common.util.StringUtil;
import java.util.ArrayList;
import java.util.List;
/**
* Function:字典树字符前缀模糊匹配
*
* @author crossoverJie
* Date: 2019/1/7 18:58
* @since JDK 1.8
*/
public class TrieTree {
/**
* 大小写都可保存
*/
private static final int CHILDREN_LENGTH = 26 * 2;
/**
* 存放的最大字符串长度
*/
private static final int MAX_CHAR_LENGTH = 16;
private static final char UPPERCASE_STAR = 'A';
/**
* 小写就要 -71
*/
private static final char LOWERCASE_STAR = 'G';
private Node root;
public TrieTree() {
root = new Node();
}
/**
* 写入
*
* @param data
*/
public void insert(String data) {
this.insert(this.root, data);
}
private void insert(Node root, String data) {
char[] chars = data.toCharArray();
for (int i = 0; i < chars.length; i++) {
char aChar = chars[i];
int index;
if (Character.isUpperCase(aChar)) {
index = aChar - UPPERCASE_STAR;
} else {
//小写就要 -71
index = aChar - LOWERCASE_STAR;
}
if (index >= 0 && index < CHILDREN_LENGTH) {
if (root.children[index] == null) {
Node node = new Node();
root.children[index] = node;
root.children[index].data = chars[i];
}
//最后一个字符设置标志
if (i + 1 == chars.length) {
root.children[index].isEnd = true;
}
//指向下一节点
root = root.children[index];
}
}
}
/**
* 递归深度遍历
*
* @param key
* @return
*/
public List<String> prefixSearch(String key) {
List<String> value = new ArrayList<String>();
if (StringUtil.isEmpty(key)) {
return value;
}
char k = key.charAt(0);
int index;
if (Character.isUpperCase(k)) {
index = k - UPPERCASE_STAR;
} else {
index = k - LOWERCASE_STAR;
}
if (root.children != null && root.children[index] != null) {
return query(root.children[index], value,
key.substring(1), String.valueOf(k));
}
return value;
}
private List<String> query(Node child, List<String> value, String key, String result) {
if (child.isEnd && key == null) {
value.add(result);
}
if (StringUtil.isNotEmpty(key)) {
char ca = key.charAt(0);
int index;
if (Character.isUpperCase(ca)) {
index = ca - UPPERCASE_STAR;
} else {
index = ca - LOWERCASE_STAR;
}
if (child.children[index] != null) {
query(child.children[index], value, key.substring(1).equals("") ? null : key.substring(1), result + ca);
}
} else {
for (int i = 0; i < CHILDREN_LENGTH; i++) {
if (child.children[i] == null) {
continue;
}
int j;
if (Character.isUpperCase(child.children[i].data)) {
j = UPPERCASE_STAR + i;
} else {
j = LOWERCASE_STAR + i;
}
char temp = (char) j;
query(child.children[i], value, null, result + temp);
}
}
return value;
}
/**
* 查询所有
*
* @return
*/
public List<String> all() {
char[] chars = new char[MAX_CHAR_LENGTH];
List<String> value = depth(this.root, new ArrayList<String>(), chars, 0);
return value;
}
public List<String> depth(Node node, List<String> list, char[] chars, int index) {
if (node.children == null || node.children.length == 0) {
return list;
}
Node[] children = node.children;
for (int i = 0; i < children.length; i++) {
Node child = children[i];
if (child == null) {
continue;
}
if (child.isEnd) {
chars[index] = child.data;
char[] temp = new char[index + 1];
for (int j = 0; j < chars.length; j++) {
if (chars[j] == 0) {
continue;
}
temp[j] = chars[j];
}
list.add(String.valueOf(temp));
return list;
} else {
chars[index] = child.data;
index++;
depth(child, list, chars, index);
index = 0;
}
}
return list;
}
/**
* 字典树节点
*/
private class Node {
/**
* 是否为最后一个字符
*/
public boolean isEnd = false;
/**
* 如果只是查询,则不需要存储数据
*/
public char data;
public Node[] children = new Node[CHILDREN_LENGTH];
}
}
... ...
... ... @@ -3,6 +3,10 @@ package com.crossoverjie.cim.common.enums;
import java.util.ArrayList;
import java.util.List;
/**
* @author crossoverJie
*/
public enum StatusEnum {
/** 成功 */
... ... @@ -20,6 +24,9 @@ public enum StatusEnum {
/** 账号不在线 */
OFF_LINE("7000", "你选择的账号不在线,请重新选择!"),
/** 登录信息不匹配 */
ACCOUNT_NOT_MATCH("9100", "登录信息不匹配!"),
/** 请求限流 */
REQUEST_LIMIT("6000", "请求限流"),
;
... ...
... ... @@ -15,8 +15,12 @@ import java.util.Map;
public enum SystemCommandEnumType {
ALL(":all ","获取所有命令"),
ONLINE_USER(":olu","获取所有在线用户"),
QUIT(":q ","退出程序")
ONLINE_USER(":olu ","获取所有在线用户"),
QUIT(":q! ","退出程序"),
QUERY(":q ","【:q 关键字】查询聊天记录"),
AI(":ai ","开启 AI 模式"),
QAI(":qai ","关闭 AI 模式"),
PREFIX(":pu ","模糊匹配用户")
;
... ...
... ... @@ -40,6 +40,15 @@ public final class CIMResponseProto {
*/
com.google.protobuf.ByteString
getResMsgBytes();
/**
* <code>required int32 type = 3;</code>
*/
boolean hasType();
/**
* <code>required int32 type = 3;</code>
*/
int getType();
}
/**
* Protobuf type {@code protocol.CIMResProtocol}
... ... @@ -56,6 +65,7 @@ public final class CIMResponseProto {
private CIMResProtocol() {
responseId_ = 0L;
resMsg_ = "";
type_ = 0;
}
@Override
... ... @@ -100,6 +110,11 @@ public final class CIMResponseProto {
responseId_ = input.readInt64();
break;
}
case 24: {
bitField0_ |= 0x00000004;
type_ = input.readInt32();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
... ... @@ -182,6 +197,21 @@ public final class CIMResponseProto {
}
}
public static final int TYPE_FIELD_NUMBER = 3;
private int type_;
/**
* <code>required int32 type = 3;</code>
*/
public boolean hasType() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>required int32 type = 3;</code>
*/
public int getType() {
return type_;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
... ... @@ -196,6 +226,10 @@ public final class CIMResponseProto {
memoizedIsInitialized = 0;
return false;
}
if (!hasType()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
... ... @@ -208,6 +242,9 @@ public final class CIMResponseProto {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeInt64(2, responseId_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeInt32(3, type_);
}
unknownFields.writeTo(output);
}
... ... @@ -223,6 +260,10 @@ public final class CIMResponseProto {
size += com.google.protobuf.CodedOutputStream
.computeInt64Size(2, responseId_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(3, type_);
}
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
... ... @@ -249,6 +290,11 @@ public final class CIMResponseProto {
result = result && getResMsg()
.equals(other.getResMsg());
}
result = result && (hasType() == other.hasType());
if (hasType()) {
result = result && (getType()
== other.getType());
}
result = result && unknownFields.equals(other.unknownFields);
return result;
}
... ... @@ -269,6 +315,10 @@ public final class CIMResponseProto {
hash = (37 * hash) + RESMSG_FIELD_NUMBER;
hash = (53 * hash) + getResMsg().hashCode();
}
if (hasType()) {
hash = (37 * hash) + TYPE_FIELD_NUMBER;
hash = (53 * hash) + getType();
}
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
... ... @@ -402,6 +452,8 @@ public final class CIMResponseProto {
bitField0_ = (bitField0_ & ~0x00000001);
resMsg_ = "";
bitField0_ = (bitField0_ & ~0x00000002);
type_ = 0;
bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
... ... @@ -434,6 +486,10 @@ public final class CIMResponseProto {
to_bitField0_ |= 0x00000002;
}
result.resMsg_ = resMsg_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.type_ = type_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
... ... @@ -484,6 +540,9 @@ public final class CIMResponseProto {
resMsg_ = other.resMsg_;
onChanged();
}
if (other.hasType()) {
setType(other.getType());
}
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
... ... @@ -496,6 +555,9 @@ public final class CIMResponseProto {
if (!hasResMsg()) {
return false;
}
if (!hasType()) {
return false;
}
return true;
}
... ... @@ -625,6 +687,38 @@ public final class CIMResponseProto {
onChanged();
return this;
}
private int type_ ;
/**
* <code>required int32 type = 3;</code>
*/
public boolean hasType() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>required int32 type = 3;</code>
*/
public int getType() {
return type_;
}
/**
* <code>required int32 type = 3;</code>
*/
public Builder setType(int value) {
bitField0_ |= 0x00000004;
type_ = value;
onChanged();
return this;
}
/**
* <code>required int32 type = 3;</code>
*/
public Builder clearType() {
bitField0_ = (bitField0_ & ~0x00000004);
type_ = 0;
onChanged();
return this;
}
public final Builder setUnknownFields(
final com.google.protobuf.UnknownFieldSet unknownFields) {
return super.setUnknownFields(unknownFields);
... ... @@ -688,10 +782,11 @@ public final class CIMResponseProto {
descriptor;
static {
String[] descriptorData = {
"\n\027BaseResponseProto.proto\022\010protocol\"4\n\016C" +
"\n\027BaseResponseProto.proto\022\010protocol\"B\n\016C" +
"IMResProtocol\022\022\n\nresponseId\030\002 \002(\003\022\016\n\006res" +
"Msg\030\001 \002(\tB8\n$com.crossoverjie.cim.common" +
".protocolB\020CIMResponseProto"
"Msg\030\001 \002(\t\022\014\n\004type\030\003 \002(\005B8\n$com.crossover" +
"jie.cim.common.protocolB\020CIMResponseProt" +
"o"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() {
... ... @@ -710,7 +805,7 @@ public final class CIMResponseProto {
internal_static_protocol_CIMResProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_protocol_CIMResProtocol_descriptor,
new String[] { "ResponseId", "ResMsg", });
new String[] { "ResponseId", "ResMsg", "Type", });
}
// @@protoc_insertion_point(outer_class_scope)
... ...
package com.crossoverjie.cim.common.data.construct;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class TrieTreeTest {
@Test
public void insert() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("abc");
trieTree.insert("abcd");
}
@Test
public void all() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("ABC");
trieTree.insert("abC");
List<String> all = trieTree.all();
String result = "";
for (String s : all) {
result += s + ",";
System.out.println(s);
}
Assert.assertTrue("ABC,abC,".equals(result));
}
@Test
public void all2() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("abc");
trieTree.insert("abC");
List<String> all = trieTree.all();
String result = "";
for (String s : all) {
result += s + ",";
System.out.println(s);
}
//Assert.assertTrue("ABC,abC,".equals(result));
}
@Test
public void prefixSea() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("java");
trieTree.insert("jsf");
trieTree.insert("jsp");
trieTree.insert("javascript");
trieTree.insert("php");
String result ="";
List<String> ab = trieTree.prefixSearch("jav");
for (String s : ab) {
result += s+",";
System.out.println(s);
}
Assert.assertTrue(result.equals("java,javascript,"));
}
@Test
public void prefixSea2() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("java");
trieTree.insert("jsf");
trieTree.insert("jsp");
trieTree.insert("javascript");
trieTree.insert("php");
String result ="";
List<String> ab = trieTree.prefixSearch("j");
for (String s : ab) {
result += s+",";
System.out.println(s);
}
Assert.assertTrue(result.equals("java,javascript,jsf,jsp,"));
}
@Test
public void prefixSea3() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("java");
trieTree.insert("jsf");
trieTree.insert("jsp");
trieTree.insert("javascript");
trieTree.insert("php");
String result ="";
List<String> ab = trieTree.prefixSearch("js");
for (String s : ab) {
result += s+",";
System.out.println(s);
}
Assert.assertTrue(result.equals("jsf,jsp,"));
}
@Test
public void prefixSea4() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("java");
trieTree.insert("jsf");
trieTree.insert("jsp");
trieTree.insert("javascript");
trieTree.insert("php");
String result ="";
List<String> ab = trieTree.prefixSearch("jav");
for (String s : ab) {
result += s+",";
System.out.println(s);
}
Assert.assertTrue(result.equals("java,javascript,"));
}
@Test
public void prefixSea5() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("java");
trieTree.insert("jsf");
trieTree.insert("jsp");
trieTree.insert("javascript");
trieTree.insert("php");
String result ="";
List<String> ab = trieTree.prefixSearch("js");
for (String s : ab) {
result += s+",";
System.out.println(s);
}
Assert.assertTrue(result.equals("jsf,jsp,"));
}
@Test
public void prefixSearch() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("abc");
trieTree.insert("abd");
trieTree.insert("ABe");
List<String> ab = trieTree.prefixSearch("AB");
for (String s : ab) {
System.out.println(s);
}
System.out.println("========");
//char[] chars = new char[3] ;
//for (int i = 0; i < 3; i++) {
// int a = 97 + i ;
// chars[i] = (char) a ;
//}
//
//String s = String.valueOf(chars);
//System.out.println(s);
}
@Test
public void prefixSearch2() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("Cde");
trieTree.insert("CDa");
trieTree.insert("ABe");
List<String> ab = trieTree.prefixSearch("AC");
for (String s : ab) {
System.out.println(s);
}
Assert.assertTrue(ab.size() == 0);
}
@Test
public void prefixSearch3() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("Cde");
trieTree.insert("CDa");
trieTree.insert("ABe");
List<String> ab = trieTree.prefixSearch("CD");
for (String s : ab) {
System.out.println(s);
}
Assert.assertTrue(ab.size() == 1);
}
@Test
public void prefixSearch4() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("Cde");
trieTree.insert("CDa");
trieTree.insert("ABe");
List<String> ab = trieTree.prefixSearch("Cd");
String result = "";
for (String s : ab) {
result += s + ",";
System.out.println(s);
}
Assert.assertTrue(result.equals("Cde,"));
}
@Test
public void prefixSearch44() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("a");
trieTree.insert("b");
trieTree.insert("c");
trieTree.insert("d");
trieTree.insert("e");
trieTree.insert("f");
trieTree.insert("g");
trieTree.insert("h");
}
@Test
public void prefixSearch5() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("Cde");
trieTree.insert("CDa");
trieTree.insert("ABe");
trieTree.insert("CDfff");
trieTree.insert("Cdfff");
List<String> ab = trieTree.prefixSearch("Cd");
String result = "";
for (String s : ab) {
result += s + ",";
System.out.println(s);
}
Assert.assertTrue(result.equals("Cde,Cdfff,"));
}
@Test
public void prefixSearch6() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("Cde");
trieTree.insert("CDa");
trieTree.insert("ABe");
trieTree.insert("CDfff");
trieTree.insert("Cdfff");
List<String> ab = trieTree.prefixSearch("CD");
String result = "";
for (String s : ab) {
result += s + ",";
System.out.println(s);
}
Assert.assertTrue(result.equals("CDa,CDfff,"));
}
@Test
public void prefixSearch7() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("Cde");
trieTree.insert("CDa");
trieTree.insert("ABe");
trieTree.insert("CDfff");
trieTree.insert("Cdfff");
List<String> ab = trieTree.prefixSearch("");
String result = "";
for (String s : ab) {
result += s + ",";
System.out.println(s);
}
Assert.assertTrue(result.equals(""));
}
@Test
public void prefixSearch8() throws Exception {
TrieTree trieTree = new TrieTree();
List<String> ab = trieTree.prefixSearch("");
String result = "";
for (String s : ab) {
result += s + ",";
System.out.println(s);
}
Assert.assertTrue(result.equals(""));
}
@Test
public void prefixSearch9() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("Cde");
trieTree.insert("CDa");
trieTree.insert("ABe");
trieTree.insert("CDfff");
trieTree.insert("Cdfff");
List<String> ab = trieTree.prefixSearch("CDFD");
String result = "";
for (String s : ab) {
result += s + ",";
System.out.println(s);
}
Assert.assertTrue(result.equals(""));
}
@Test
public void prefixSearch10() throws Exception {
TrieTree trieTree = new TrieTree();
trieTree.insert("crossoverJie");
trieTree.insert("zhangsan");
List<String> ab = trieTree.prefixSearch("c");
String result = "";
for (String s : ab) {
result += s + ",";
System.out.println(s);
}
Assert.assertTrue(result.equals("crossoverJie,"));
}
}
\ No newline at end of file
... ...
... ... @@ -74,11 +74,6 @@
</dependency>
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-client</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
... ...
... ... @@ -144,8 +144,8 @@ public class RouteController {
BaseResponse<CIMServerResVO> res = new BaseResponse();
//登录校验
boolean login = accountService.login(loginReqVO);
if (login) {
StatusEnum status = accountService.login(loginReqVO);
if (status == StatusEnum.SUCCESS) {
String server = serverCache.selectServer();
String[] serverInfo = server.split(":");
CIMServerResVO vo = new CIMServerResVO(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2]));
... ... @@ -154,12 +154,10 @@ public class RouteController {
accountService.saveRouteInfo(loginReqVO,server);
res.setDataBody(vo);
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
} else {
res.setCode(StatusEnum.REPEAT_LOGIN.getCode());
res.setMessage(StatusEnum.REPEAT_LOGIN.getMessage());
}
res.setCode(status.getCode());
res.setMessage(status.getMessage());
return res;
}
... ...
package com.crossoverjie.cim.route.service;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.route.vo.req.ChatReqVO;
import com.crossoverjie.cim.route.vo.req.LoginReqVO;
import com.crossoverjie.cim.route.vo.res.CIMServerResVO;
... ... @@ -30,7 +31,7 @@ public interface AccountService {
* @return true 成功 false 失败
* @throws Exception
*/
boolean login(LoginReqVO loginReqVO) throws Exception ;
StatusEnum login(LoginReqVO loginReqVO) throws Exception ;
/**
* 保存路由信息
... ...
package com.crossoverjie.cim.route.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.route.service.AccountService;
... ... @@ -69,26 +70,26 @@ public class AccountServiceRedisImpl implements AccountService {
}
@Override
public boolean login(LoginReqVO loginReqVO) throws Exception {
public StatusEnum login(LoginReqVO loginReqVO) throws Exception {
//再去Redis里查询
String key = ACCOUNT_PREFIX + loginReqVO.getUserId();
String userName = redisTemplate.opsForValue().get(key);
if (null == userName) {
return false;
return StatusEnum.ACCOUNT_NOT_MATCH;
}
if (!userName.equals(loginReqVO.getUserName())) {
return false;
return StatusEnum.ACCOUNT_NOT_MATCH;
}
//登录成功,保存登录状态
boolean status = userInfoCacheService.saveAndCheckUserLoginStatus(loginReqVO.getUserId());
if (status == false){
//重复登录
return false;
return StatusEnum.REPEAT_LOGIN ;
}
return true;
return StatusEnum.SUCCESS;
}
@Override
... ... @@ -102,6 +103,7 @@ public class AccountServiceRedisImpl implements AccountService {
Map<Long, CIMServerResVO> routes = new HashMap<>(64);
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
ScanOptions options = ScanOptions.scanOptions()
.match(ROUTE_PREFIX + "*")
... ... @@ -115,6 +117,11 @@ public class AccountServiceRedisImpl implements AccountService {
parseServerInfo(routes, key);
}
try {
scan.close();
} catch (IOException e) {
LOGGER.error("IOException",e);
}
return routes;
}
... ... @@ -156,8 +163,12 @@ public class AccountServiceRedisImpl implements AccountService {
.build();
Response response = okHttpClient.newCall(request).execute();
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
try {
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
}finally {
response.body().close();
}
}
... ...
... ... @@ -13,6 +13,7 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@SpringBootTest(classes = RouteApplication.class)
@RunWith(SpringRunner.class)
... ... @@ -25,8 +26,12 @@ public class AccountServiceRedisImplTest {
@Test
public void loadRouteRelated() throws Exception {
Map<Long, CIMServerResVO> longCIMServerResVOMap = accountService.loadRouteRelated();
LOGGER.info("longCIMServerResVOMap={}" , JSON.toJSONString(longCIMServerResVOMap));
for (int i = 0; i < 100; i++) {
Map<Long, CIMServerResVO> longCIMServerResVOMap = accountService.loadRouteRelated();
LOGGER.info("longCIMServerResVOMap={},cun={}" , JSON.toJSONString(longCIMServerResVOMap),i);
}
TimeUnit.SECONDS.sleep(10);
}
}
\ No newline at end of file
... ...
... ... @@ -64,10 +64,6 @@
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-client</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
... ...
... ... @@ -28,6 +28,9 @@ public class AppConfiguration {
@Value("${cim.clear.route.request.url}")
private String clearRouteUrl ;
@Value("${cim.heartbeat.time}")
private long heartBeatTime ;
public String getClearRouteUrl() {
return clearRouteUrl;
}
... ... @@ -67,4 +70,12 @@ public class AppConfiguration {
public void setCimServerPort(int cimServerPort) {
this.cimServerPort = cimServerPort;
}
public long getHeartBeatTime() {
return heartBeatTime;
}
public void setHeartBeatTime(long heartBeatTime) {
this.heartBeatTime = heartBeatTime;
}
}
... ...
package com.crossoverjie.cim.server.config;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import okhttp3.OkHttpClient;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -39,4 +41,19 @@ public class BeanConfig {
.retryOnConnectionFailure(true);
return builder.build();
}
/**
* 创建心跳单例
* @return
*/
@Bean(value = "heartBeat")
public CIMRequestProto.CIMReqProtocol heartBeat() {
CIMRequestProto.CIMReqProtocol heart = CIMRequestProto.CIMReqProtocol.newBuilder()
.setRequestId(0L)
.setReqMsg("ping")
.setType(Constants.CommandType.PING)
.build();
return heart;
}
}
... ...
... ... @@ -6,12 +6,13 @@ import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.crossoverjie.cim.server.config.AppConfiguration;
import com.crossoverjie.cim.server.util.NettyAttrUtil;
import com.crossoverjie.cim.server.util.SessionSocketHolder;
import com.crossoverjie.cim.server.util.SpringBeanFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.*;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
... ... @@ -31,16 +32,60 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
private final static Logger LOGGER = LoggerFactory.getLogger(CIMServerHandle.class);
private final MediaType mediaType = MediaType.parse("application/json");
/**
* 取消绑定
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//可能出现业务判断离线后再次触发 channelInactive
CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());
LOGGER.info("用户[{}]下线",userInfo.getUserName());
SessionSocketHolder.remove((NioSocketChannel) ctx.channel());
if (userInfo != null){
LOGGER.warn("[{}]触发 channelInactive 掉线!",userInfo.getUserName());
userOffLine(userInfo, (NioSocketChannel) ctx.channel());
ctx.channel().close();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.READER_IDLE) {
AppConfiguration configuration = SpringBeanFactory.getBean(AppConfiguration.class);
long heartBeatTime = configuration.getHeartBeatTime() * 1000;
//向客户端发送消息
CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
CIMRequestProto.CIMReqProtocol.class);
ctx.writeAndFlush(heartBeat).addListeners(ChannelFutureListener.CLOSE_ON_FAILURE);
Long lastReadTime = NettyAttrUtil.getReaderTime(ctx.channel());
long now = System.currentTimeMillis();
if (lastReadTime != null && now - lastReadTime > heartBeatTime){
CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());
LOGGER.warn("客户端[{}]心跳超时[{}]ms,需要关闭连接!",userInfo.getUserName(),now - lastReadTime);
userOffLine(userInfo, (NioSocketChannel) ctx.channel());
ctx.channel().close();
}
}
}
super.userEventTriggered(ctx, evt);
}
/**
* 用户下线
* @param userInfo
* @param channel
* @throws IOException
*/
private void userOffLine(CIMUserInfo userInfo, NioSocketChannel channel) throws IOException {
LOGGER.info("用户[{}]下线", userInfo.getUserName());
SessionSocketHolder.remove(channel);
SessionSocketHolder.removeSession(userInfo.getUserId());
//清除路由关系
... ... @@ -49,6 +94,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
/**
* 清除路由关系
*
* @param userInfo
* @throws IOException
*/
... ... @@ -71,7 +117,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
}finally {
} finally {
response.body().close();
}
}
... ... @@ -81,20 +127,24 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
protected void channelRead0(ChannelHandlerContext ctx, CIMRequestProto.CIMReqProtocol msg) throws Exception {
LOGGER.info("收到msg={}", msg.toString());
if (msg.getType() == Constants.CommandType.LOGIN){
if (msg.getType() == Constants.CommandType.LOGIN) {
//保存客户端与 Channel 之间的关系
SessionSocketHolder.put(msg.getRequestId(),(NioSocketChannel)ctx.channel()) ;
SessionSocketHolder.saveSession(msg.getRequestId(),msg.getReqMsg());
LOGGER.info("客户端[{}]上线成功",msg.getReqMsg());
SessionSocketHolder.put(msg.getRequestId(), (NioSocketChannel) ctx.channel());
SessionSocketHolder.saveSession(msg.getRequestId(), msg.getReqMsg());
LOGGER.info("客户端[{}]上线成功", msg.getReqMsg());
}
}
//心跳更新时间
if (msg.getType() == Constants.CommandType.PING){
NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (CIMException.isResetByPeer(cause.getMessage())){
if (CIMException.isResetByPeer(cause.getMessage())) {
return;
}
... ...
... ... @@ -8,6 +8,7 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.timeout.IdleStateHandler;
/**
* Function:
... ... @@ -24,6 +25,8 @@ public class CIMServerInitializer extends ChannelInitializer<Channel> {
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//30 秒没有向客户端发送消息就发生心跳
.addLast(new IdleStateHandler(30, 0, 0))
// google Protobuf 编解码
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(CIMRequestProto.CIMReqProtocol.getDefaultInstance()))
... ...
package com.crossoverjie.cim.server.util;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
/**
* Function:
*
* @author crossoverJie
* Date: 2019/1/9 00:57
* @since JDK 1.8
*/
public class NettyAttrUtil {
private static final AttributeKey<String> ATTR_KEY_READER_TIME = AttributeKey.valueOf("readerTime");
public static void updateReaderTime(Channel channel, Long time) {
channel.attr(ATTR_KEY_READER_TIME).set(time.toString());
}
public static Long getReaderTime(Channel channel) {
String value = getAttribute(channel, ATTR_KEY_READER_TIME);
if (value != null) {
return Long.valueOf(value);
}
return null;
}
private static String getAttribute(Channel channel, AttributeKey<String> key) {
Attribute<String> attr = channel.attr(key);
return attr.get();
}
}
... ...
... ... @@ -31,4 +31,7 @@ app.zk.addr=47.98.194.60:2182
app.zk.root=/route
# 清除路由信息
cim.clear.route.request.url=http://localhost:8083/offLine
\ No newline at end of file
cim.clear.route.request.url=http://localhost:8083/offLine
# 检测多少秒没有收到客户端心跳后服务端关闭连接
cim.heartbeat.time = 40
\ No newline at end of file
... ...
package com.crossoverjie.cim.server.util;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
public class NettyAttrUtilTest {
@Test
public void test() throws InterruptedException {
long heartbeat = 2 * 1000 ;
long now = System.currentTimeMillis();
TimeUnit.SECONDS.sleep(1);
long end = System.currentTimeMillis();
if ((end - now) > heartbeat){
System.out.println("超时");
}else {
System.out.println("没有超时");
}
}
}
\ No newline at end of file
... ...
... ... @@ -8,6 +8,6 @@ option java_outer_classname = "CIMResponseProto";
message CIMResProtocol {
required int64 responseId = 2;
required string resMsg = 1;
required int32 type = 3;
}
... ...