作者 crossoverJie
提交者 GitHub

Merge pull request #57 from crossoverJie/cim-1.0.6

cim 1.0.6
... ... @@ -50,8 +50,9 @@
* [x] 路由(`cim-forward-route`)服务自身是无状态,可用 `Nginx` 代理支持高可用。
* [x] 服务端自动剔除离线客户端。
* [x] 客户端自动重连。
* [x] 延时消息
* [ ] 分组群聊。
* [ ] Android SDK
* [ ] SDK 开发包
* [ ] 离线消息。
* [ ] 协议支持消息加密。
* [ ] 更多的客户端路由策略。
... ...
... ... @@ -3,6 +3,7 @@ package com.crossoverjie.cim.client.config;
import com.crossoverjie.cim.client.handle.MsgHandleCaller;
import com.crossoverjie.cim.client.service.impl.MsgCallBackListener;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.data.construct.RingBufferWheel;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import okhttp3.OkHttpClient;
... ... @@ -104,4 +105,11 @@ public class BeanConfig {
return caller ;
}
@Bean
public RingBufferWheel bufferWheel(){
ExecutorService executorService = Executors.newFixedThreadPool(2) ;
return new RingBufferWheel(executorService) ;
}
}
... ...
package com.crossoverjie.cim.client.handle;
import com.crossoverjie.cim.client.service.EchoService;
import com.crossoverjie.cim.client.service.ShutDownMsg;
import com.crossoverjie.cim.client.service.impl.EchoServiceImpl;
import com.crossoverjie.cim.client.thread.ReConnectJob;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.common.constant.Constants;
... ... @@ -41,6 +43,8 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
private ShutDownMsg shutDownMsg ;
private EchoService echoService ;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
... ... @@ -68,7 +72,6 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//客户端和服务端建立连接时调用
LOGGER.info("cim server connect success!");
}
... ... @@ -95,6 +98,10 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
@Override
protected void channelRead0(ChannelHandlerContext ctx, CIMResponseProto.CIMResProtocol msg) throws Exception {
if (echoService == null){
echoService = SpringBeanFactory.getBean(EchoServiceImpl.class) ;
}
//心跳更新时间
if (msg.getType() == Constants.CommandType.PING){
... ... @@ -108,7 +115,7 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
//将消息中的 emoji 表情格式化为 Unicode 编码以便在终端可以显示
String response = EmojiParser.parseToUnicode(msg.getResMsg());
System.out.println(response);
echoService.echo(response);
}
... ...
... ... @@ -5,6 +5,9 @@ import com.crossoverjie.cim.client.service.EchoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDate;
import java.time.LocalTime;
/**
* Function:
*
... ... @@ -21,8 +24,10 @@ public class EchoServiceImpl implements EchoService {
private AppConfiguration appConfiguration;
@Override
public void echo(String msg,Object... replace) {
msg = "\033[31;4m" + appConfiguration.getUserName() + PREFIX + "\033[0m" + " " + msg;
public void echo(String msg, Object... replace) {
String date = LocalDate.now().toString() + " " + LocalTime.now().withNano(0).toString();
msg = "[" + date + "] \033[31;4m" + appConfiguration.getUserName() + PREFIX + "\033[0m" + " " + msg;
String log = print(msg, replace);
... ... @@ -32,6 +37,7 @@ public class EchoServiceImpl implements EchoService {
/**
* print msg
*
* @param msg
* @param place
* @return
... ... @@ -42,7 +48,7 @@ public class EchoServiceImpl implements EchoService {
for (int i = 0; i < place.length; i++) {
int index = msg.indexOf("{}", k);
if (index == -1){
if (index == -1) {
return msg;
}
... ... @@ -63,9 +69,9 @@ public class EchoServiceImpl implements EchoService {
k = index + 2;
}
if (sb.toString().equals("")){
return msg ;
}else {
if (sb.toString().equals("")) {
return msg;
} else {
return sb.toString();
}
}
... ...
package com.crossoverjie.cim.client.service.impl.command;
import com.crossoverjie.cim.client.service.EchoService;
import com.crossoverjie.cim.client.service.InnerCommand;
import com.crossoverjie.cim.client.service.MsgHandle;
import com.crossoverjie.cim.common.data.construct.RingBufferWheel;
import com.vdurmont.emoji.EmojiParser;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-09-25 00:37
* @since JDK 1.8
*/
@Service
public class DelayMsgCommand implements InnerCommand {
@Autowired
private EchoService echoService ;
@Autowired
private MsgHandle msgHandle ;
@Autowired
private RingBufferWheel ringBufferWheel ;
@Override
public void process(String msg) {
if (msg.split(" ").length <=2){
echoService.echo("incorrect commond, :delay [msg] [delayTime]") ;
return ;
}
String message = msg.split(" ")[1] ;
Integer delayTime = Integer.valueOf(msg.split(" ")[2]);
RingBufferWheel.Task task = new DelayMsgJob(message) ;
task.setKey(delayTime);
ringBufferWheel.addTask(task);
ringBufferWheel.start();
echoService.echo(EmojiParser.parseToUnicode(msg));
}
private class DelayMsgJob extends RingBufferWheel.Task{
private String msg ;
public DelayMsgJob(String msg) {
this.msg = msg;
}
@Override
public void run() {
msgHandle.sendMsg(msg);
}
}
}
... ...
... ... @@ -6,6 +6,7 @@ import com.crossoverjie.cim.client.service.InnerCommand;
import com.crossoverjie.cim.client.service.MsgLogger;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.service.ShutDownMsg;
import com.crossoverjie.cim.common.data.construct.RingBufferWheel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -45,6 +46,9 @@ public class ShutDownCommand implements InnerCommand {
@Autowired
private ShutDownMsg shutDownMsg ;
@Autowired
private RingBufferWheel ringBufferWheel ;
@Override
public void process(String msg) {
echoService.echo("cim client closing...");
... ... @@ -52,6 +56,7 @@ public class ShutDownCommand implements InnerCommand {
routeRequest.offLine();
msgLogger.stop();
executor.shutdown();
ringBufferWheel.stop(false);
try {
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
echoService.echo("thread pool closing");
... ...
... ... @@ -15,6 +15,11 @@
<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
... ...
package com.crossoverjie.cim.common.data.construct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Function:Ring Queue, it can be used to delay task.
*
* @author crossoverJie
* Date: 2019-09-20 14:46
* @since JDK 1.8
*/
public final class RingBufferWheel {
private Logger logger = LoggerFactory.getLogger(RingBufferWheel.class);
/**
* default ring buffer size
*/
private static final int STATIC_RING_SIZE = 64;
private Object[] ringBuffer;
private int bufferSize;
/**
* business thread pool
*/
private ExecutorService executorService;
private AtomicInteger taskSize = new AtomicInteger();
/***
* task running sign
*/
private volatile boolean stop = false;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
/**
* Create a new delay task ring buffer by default size
* @param executorService the business thread pool
*/
public RingBufferWheel(ExecutorService executorService) {
this.executorService = executorService;
this.bufferSize = STATIC_RING_SIZE;
this.ringBuffer = new Object[bufferSize];
}
/**
* Create a new delay task ring buffer by custom buffer size
* @param executorService the business thread pool
* @param bufferSize custom buffer size
*/
public RingBufferWheel(ExecutorService executorService, int bufferSize) {
this(executorService);
if (!powerOf2(bufferSize)) {
throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2");
}
this.bufferSize = bufferSize;
this.ringBuffer = new Object[bufferSize];
}
/**
* Add a task into the ring buffer
* @param task business task extends RingBufferWheel.Task
*/
public void addTask(Task task) {
int key = task.getKey();
Set<Task> tasks = get(key);
if (tasks != null) {
int cycleNum = cycleNum(key, bufferSize);
task.setCycleNum(cycleNum);
tasks.add(task);
} else {
int index = mod(key, bufferSize);
int cycleNum = cycleNum(key, bufferSize);
task.setCycleNum(index);
task.setCycleNum(cycleNum);
Set<Task> sets = new HashSet<>();
sets.add(task);
put(key, sets);
}
taskSize.incrementAndGet();
}
/**
* thread safe
* @return the size of ring buffer
*/
public int taskSize() {
return taskSize.get();
}
/**
* Start background thread to consumer wheel timer, it will run until you call method {@link #stop}
*/
public void start() {
logger.info("delay task is starting");
Thread job = new Thread(new TriggerJob());
job.setName("consumer RingBuffer thread");
job.start();
}
/**
* Stop consumer ring buffer thread
* @param force True will force close consumer thread and discard all pending tasks
* otherwise the consumer thread waits for all tasks to completes before closing.
*/
public void stop(boolean force) {
if (force) {
logger.info("delay task is forced stop");
stop = true;
executorService.shutdownNow();
} else {
logger.info("delay task is stopping");
if (taskSize() > 0){
try {
lock.lock();
condition.await();
stop = true;
} catch (InterruptedException e) {
logger.error("InterruptedException", e);
} finally {
lock.unlock();
}
}
executorService.shutdown();
}
}
private Set<Task> get(int key) {
int index = mod(key, bufferSize);
return (Set<Task>) ringBuffer[index];
}
private void put(int key, Set<Task> tasks) {
int index = mod(key, bufferSize);
ringBuffer[index] = tasks;
}
private Set<Task> remove(int key) {
Set<Task> tempTask = new HashSet<>();
Set<Task> result = new HashSet<>();
Set<Task> tasks = (Set<Task>) ringBuffer[key];
if (tasks == null) {
return result;
}
for (Task task : tasks) {
if (task.getCycleNum() == 0) {
result.add(task);
size2Notify();
} else {
// decrement 1 cycle number and update origin data
task.setCycleNum(task.getCycleNum() - 1);
tempTask.add(task);
}
}
//update origin data
ringBuffer[key] = tempTask;
return result;
}
private void size2Notify() {
lock.lock();
int size = taskSize.decrementAndGet();
if (size == 0) {
condition.signal();
}
lock.unlock();
}
private boolean powerOf2(int target) {
if (target < 0) {
return false;
}
int value = target & (target - 1);
if (value != 0) {
return false;
}
return true;
}
private int mod(int target, int mod) {
// equals target % mod
return target & (mod - 1);
}
private int cycleNum(int target, int mod) {
//equals target/mod
return target >> Integer.bitCount(mod - 1);
}
/**
* An abstract class used to implement business.
*/
public abstract static class Task extends Thread {
private int cycleNum;
private int key;
@Override
public void run() {
}
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public int getCycleNum() {
return cycleNum;
}
private void setCycleNum(int cycleNum) {
this.cycleNum = cycleNum;
}
}
private class TriggerJob implements Runnable {
@Override
public void run() {
int index = 0;
while (!stop) {
Set<Task> tasks = remove(index);
for (Task task : tasks) {
executorService.submit(task);
}
if (++index > bufferSize - 1) {
index = 0;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
logger.error("InterruptedException", e);
}
}
logger.info("delay task is stopped");
}
}
}
... ...
... ... @@ -20,7 +20,8 @@ public enum SystemCommandEnum {
QAI(":qai ","关闭 AI 模式","CloseAIModelCommand"),
PREFIX(":pu ","模糊匹配用户","PrefixSearchCommand"),
EMOJI(":emoji ","emoji 表情列表","EmojiCommand"),
INFO(":info ","获取客户端信息","EchoInfoCommand")
INFO(":info ","获取客户端信息","EchoInfoCommand"),
DELAY_MSG(":delay ","delay message, :delay [msg] [delayTime]","DelayMsgCommand")
;
... ...
package com.crossoverjie.cim.common;
import org.junit.Test;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.concurrent.TimeUnit;
/**
* Function:
*
* @author crossoverJie
* Date: 2019-09-23 14:21
* @since JDK 1.8
*/
public class CommonTest {
@Test
public void test2(){
System.out.println(LocalDate.now().toString());
System.out.println(LocalTime.now().withNano(0).toString());
}
@Test
public void test() throws InterruptedException {
System.out.println(is2(9));
System.out.println(Integer.bitCount(64-1));
int target = 1569312600 ;
int mod = 64 ;
System.out.println(target % mod);
System.out.println(mod(target,mod));
System.out.println("============");
System.out.println(cycleNum(256,64)) ;
cycle();
}
private int mod(int target, int mod){
// equals target % mod
return target & (mod -1) ;
}
private int cycleNum(int target,int mod){
//equals target/mod
return target >> Integer.bitCount(mod-1) ;
}
private boolean is2(int target){
if (target < 0){
return false ;
}
int value = target & (target - 1) ;
if (value != 0){
return false ;
}
return true ;
}
private void cycle() throws InterruptedException {
int index = 0 ;
while (true){
System.out.println("=======" + index);
if (++index > 63){
index = 0 ;
}
TimeUnit.MILLISECONDS.sleep(200);
}
}
}
... ...
package com.crossoverjie.cim.common.data.construct;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class RingBufferWheelTest {
private static Logger logger = LoggerFactory.getLogger(RingBufferWheelTest.class) ;
public static void main(String[] args) throws InterruptedException {
test5();
return;
}
private static void test1() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2) ;
Task task = new Task() ;
task.setKey(10);
RingBufferWheel wheel = new RingBufferWheel(executorService) ;
wheel.addTask(task) ;
task = new Task() ;
task.setKey(74);
wheel.addTask(task) ;
wheel.start();
while (true){
logger.info("task size={}" , wheel.taskSize());
TimeUnit.SECONDS.sleep(1);
}
}
private static void test2() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2) ;
Task task = new Task() ;
task.setKey(10);
RingBufferWheel wheel = new RingBufferWheel(executorService) ;
wheel.addTask(task) ;
task = new Task() ;
task.setKey(74);
wheel.addTask(task) ;
wheel.start();
// new Thread(() -> {
// while (true){
// logger.info("task size={}" , wheel.taskSize());
// try {
// TimeUnit.SECONDS.sleep(1);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// }).start();
TimeUnit.SECONDS.sleep(12);
wheel.stop(true);
}
private static void test3() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2) ;
Task task = new Task() ;
task.setKey(10);
RingBufferWheel wheel = new RingBufferWheel(executorService) ;
wheel.addTask(task) ;
task = new Task() ;
task.setKey(74);
wheel.addTask(task) ;
wheel.start();
TimeUnit.SECONDS.sleep(2);
wheel.stop(false);
}
private static void test4() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2) ;
RingBufferWheel wheel = new RingBufferWheel(executorService) ;
for (int i = 0; i < 65; i++) {
Job task = new Job(i) ;
task.setKey(i);
wheel.addTask(task);
}
wheel.start();
logger.info("task size={}",wheel.taskSize());
wheel.stop(false);
}
private static void test5() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2) ;
RingBufferWheel wheel = new RingBufferWheel(executorService,512) ;
for (int i = 0; i < 65; i++) {
Job task = new Job(i) ;
task.setKey(i);
wheel.addTask(task);
}
wheel.start();
logger.info("task size={}",wheel.taskSize());
wheel.stop(false);
}
private static class Task extends RingBufferWheel.Task{
@Override
public void run() {
logger.info("================");
}
}
private static class Job extends RingBufferWheel.Task{
private int num ;
public Job(int num) {
this.num = num;
}
@Override
public void run() {
logger.info("number={}" , num);
}
}
}
\ No newline at end of file
... ...