作者 crossoverJie

Merge branch 'master' into cim-1.0.6

# Conflicts:
#	cim-common/src/main/java/com/crossoverjie/cim/common/data/construct/RingBufferWheel.java
#	cim-common/src/test/java/com/crossoverjie/cim/common/data/construct/RingBufferWheelTest.java
@@ -26,7 +26,7 @@ @@ -26,7 +26,7 @@
26 - 适用于 `APP` 的消息推送中间件。 26 - 适用于 `APP` 的消息推送中间件。
27 - `IOT` 海量连接场景中的消息透传中间件。 27 - `IOT` 海量连接场景中的消息透传中间件。
28 28
29 -> 我有在公网部署了一套演示环境,想要体验的可以[联系我](#联系作者)加入内测群获取账号 29 +> 在使用或开发过程中有任何疑问都可[联系我](#联系作者)
30 30
31 ## 视频演示 31 ## 视频演示
32 32
@@ -50,7 +50,7 @@ @@ -50,7 +50,7 @@
50 * [x] 路由(`cim-forward-route`)服务自身是无状态,可用 `Nginx` 代理支持高可用。 50 * [x] 路由(`cim-forward-route`)服务自身是无状态,可用 `Nginx` 代理支持高可用。
51 * [x] 服务端自动剔除离线客户端。 51 * [x] 服务端自动剔除离线客户端。
52 * [x] 客户端自动重连。 52 * [x] 客户端自动重连。
53 -* [x] 延时消息 53 +* [x] [延时消息](#延时消息)
54 * [ ] 分组群聊。 54 * [ ] 分组群聊。
55 * [ ] SDK 开发包。 55 * [ ] SDK 开发包。
56 * [ ] 离线消息。 56 * [ ] 离线消息。
@@ -183,6 +183,7 @@ java -jar cim-client-1.0.0-SNAPSHOT.jar --server.port=8084 --cim.user.id=上方 @@ -183,6 +183,7 @@ java -jar cim-client-1.0.0-SNAPSHOT.jar --server.port=8084 --cim.user.id=上方
183 | `:pu` | 模糊匹配用户 | 183 | `:pu` | 模糊匹配用户 |
184 | `:info` | 获取客户端信息 | 184 | `:info` | 获取客户端信息 |
185 | `:emoji [option]` | 查询表情包 [option:页码] | 185 | `:emoji [option]` | 查询表情包 [option:页码] |
  186 +| `:delay [msg] [delayTime]` | 发送延时消息 |
186 | `:` | 更多命令正在开发中。。 | 187 | `:` | 更多命令正在开发中。。 |
187 188
188 ![](https://ws3.sinaimg.cn/large/006tNbRwly1fylh7bdlo6g30go01shdt.gif) 189 ![](https://ws3.sinaimg.cn/large/006tNbRwly1fylh7bdlo6g30go01shdt.gif)
@@ -250,7 +251,15 @@ java -jar cim-client-1.0.0-SNAPSHOT.jar --server.port=8084 --cim.user.id=上方 @@ -250,7 +251,15 @@ java -jar cim-client-1.0.0-SNAPSHOT.jar --server.port=8084 --cim.user.id=上方
250 ![](https://tva1.sinaimg.cn/large/006y8mN6ly1g6j910cqrzj30dn05qjw9.jpg) 251 ![](https://tva1.sinaimg.cn/large/006y8mN6ly1g6j910cqrzj30dn05qjw9.jpg)
251 ![](https://tva1.sinaimg.cn/large/006y8mN6ly1g6j99hazg6j30ax03hq35.jpg) 252 ![](https://tva1.sinaimg.cn/large/006y8mN6ly1g6j99hazg6j30ax03hq35.jpg)
252 253
  254 +### 延时消息
253 255
  256 +发送 10s 的延时消息:
  257 +
  258 +```shell
  259 +:delay delayMsg 10
  260 +```
  261 +
  262 +![](https://tva1.sinaimg.cn/large/006y8mN6ly1g7brppmokqg30gn07gafj.gif)
254 263
255 ## 联系作者 264 ## 联系作者
256 - [crossoverJie@gmail.com](mailto:crossoverJie@gmail.com) 265 - [crossoverJie@gmail.com](mailto:crossoverJie@gmail.com)
@@ -50,6 +50,11 @@ public final class RingBufferWheel { @@ -50,6 +50,11 @@ public final class RingBufferWheel {
50 */ 50 */
51 private volatile boolean start = false ; 51 private volatile boolean start = false ;
52 52
  53 + /**
  54 + * total tick times
  55 + */
  56 + private AtomicInteger tick = new AtomicInteger() ;
  57 +
53 private Lock lock = new ReentrantLock(); 58 private Lock lock = new ReentrantLock();
54 private Condition condition = lock.newCondition(); 59 private Condition condition = lock.newCondition();
55 60
@@ -115,7 +120,7 @@ public final class RingBufferWheel { @@ -115,7 +120,7 @@ public final class RingBufferWheel {
115 } 120 }
116 121
117 /** 122 /**
118 - * Start background thread to consumer wheel timer, it will run until you call method {@link #stop} 123 + * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop}
119 */ 124 */
120 public void start() { 125 public void start() {
121 if (!start){ 126 if (!start){
@@ -195,12 +200,15 @@ public final class RingBufferWheel { @@ -195,12 +200,15 @@ public final class RingBufferWheel {
195 } 200 }
196 201
197 private void size2Notify() { 202 private void size2Notify() {
198 - lock.lock();  
199 - int size = taskSize.decrementAndGet();  
200 - if (size == 0) {  
201 - condition.signal(); 203 + try {
  204 + lock.lock();
  205 + int size = taskSize.decrementAndGet();
  206 + if (size == 0) {
  207 + condition.signal();
  208 + }
  209 + }finally {
  210 + lock.unlock();
202 } 211 }
203 - lock.unlock();  
204 } 212 }
205 213
206 private boolean powerOf2(int target) { 214 private boolean powerOf2(int target) {
@@ -217,6 +225,7 @@ public final class RingBufferWheel { @@ -217,6 +225,7 @@ public final class RingBufferWheel {
217 225
218 private int mod(int target, int mod) { 226 private int mod(int target, int mod) {
219 // equals target % mod 227 // equals target % mod
  228 + target = target + tick.get() ;
220 return target & (mod - 1); 229 return target & (mod - 1);
221 } 230 }
222 231
@@ -273,6 +282,8 @@ public final class RingBufferWheel { @@ -273,6 +282,8 @@ public final class RingBufferWheel {
273 index = 0; 282 index = 0;
274 } 283 }
275 284
  285 + //Total tick number of records
  286 + tick.incrementAndGet();
276 try { 287 try {
277 TimeUnit.SECONDS.sleep(1); 288 TimeUnit.SECONDS.sleep(1);
278 } catch (InterruptedException e) { 289 } catch (InterruptedException e) {
1 package com.crossoverjie.cim.common.data.construct; 1 package com.crossoverjie.cim.common.data.construct;
2 2
  3 +
3 import com.google.common.util.concurrent.ThreadFactoryBuilder; 4 import com.google.common.util.concurrent.ThreadFactoryBuilder;
  5 +
4 import org.slf4j.Logger; 6 import org.slf4j.Logger;
5 import org.slf4j.LoggerFactory; 7 import org.slf4j.LoggerFactory;
6 8
@@ -17,7 +19,7 @@ public class RingBufferWheelTest { @@ -17,7 +19,7 @@ public class RingBufferWheelTest {
17 private static Logger logger = LoggerFactory.getLogger(RingBufferWheelTest.class) ; 19 private static Logger logger = LoggerFactory.getLogger(RingBufferWheelTest.class) ;
18 20
19 public static void main(String[] args) throws InterruptedException { 21 public static void main(String[] args) throws InterruptedException {
20 - test1(); 22 + test5();
21 23
22 return; 24 return;
23 } 25 }
@@ -25,7 +27,7 @@ public class RingBufferWheelTest { @@ -25,7 +27,7 @@ public class RingBufferWheelTest {
25 private static void test1() throws InterruptedException { 27 private static void test1() throws InterruptedException {
26 ExecutorService executorService = Executors.newFixedThreadPool(2) ; 28 ExecutorService executorService = Executors.newFixedThreadPool(2) ;
27 29
28 - Task task = new Task() ; 30 + RingBufferWheel.Task task = new Task() ;
29 task.setKey(10); 31 task.setKey(10);
30 RingBufferWheel wheel = new RingBufferWheel(executorService) ; 32 RingBufferWheel wheel = new RingBufferWheel(executorService) ;
31 wheel.addTask(task) ; 33 wheel.addTask(task) ;
@@ -42,7 +44,7 @@ public class RingBufferWheelTest { @@ -42,7 +44,7 @@ public class RingBufferWheelTest {
42 private static void test2() throws InterruptedException { 44 private static void test2() throws InterruptedException {
43 ExecutorService executorService = Executors.newFixedThreadPool(2) ; 45 ExecutorService executorService = Executors.newFixedThreadPool(2) ;
44 46
45 - Task task = new Task() ; 47 + RingBufferWheel.Task task = new Task() ;
46 task.setKey(10); 48 task.setKey(10);
47 RingBufferWheel wheel = new RingBufferWheel(executorService) ; 49 RingBufferWheel wheel = new RingBufferWheel(executorService) ;
48 wheel.addTask(task) ; 50 wheel.addTask(task) ;
@@ -72,7 +74,7 @@ public class RingBufferWheelTest { @@ -72,7 +74,7 @@ public class RingBufferWheelTest {
72 private static void test3() throws InterruptedException { 74 private static void test3() throws InterruptedException {
73 ExecutorService executorService = Executors.newFixedThreadPool(2) ; 75 ExecutorService executorService = Executors.newFixedThreadPool(2) ;
74 76
75 - Task task = new Task() ; 77 + RingBufferWheel.Task task = new Task() ;
76 task.setKey(10); 78 task.setKey(10);
77 RingBufferWheel wheel = new RingBufferWheel(executorService) ; 79 RingBufferWheel wheel = new RingBufferWheel(executorService) ;
78 wheel.addTask(task) ; 80 wheel.addTask(task) ;
@@ -95,7 +97,7 @@ public class RingBufferWheelTest { @@ -95,7 +97,7 @@ public class RingBufferWheelTest {
95 RingBufferWheel wheel = new RingBufferWheel(executorService) ; 97 RingBufferWheel wheel = new RingBufferWheel(executorService) ;
96 98
97 for (int i = 0; i < 65; i++) { 99 for (int i = 0; i < 65; i++) {
98 - Job task = new Job(i) ; 100 + RingBufferWheel.Task task = new Job(i) ;
99 task.setKey(i); 101 task.setKey(i);
100 wheel.addTask(task); 102 wheel.addTask(task);
101 } 103 }
@@ -114,7 +116,7 @@ public class RingBufferWheelTest { @@ -114,7 +116,7 @@ public class RingBufferWheelTest {
114 RingBufferWheel wheel = new RingBufferWheel(executorService,512) ; 116 RingBufferWheel wheel = new RingBufferWheel(executorService,512) ;
115 117
116 for (int i = 0; i < 65; i++) { 118 for (int i = 0; i < 65; i++) {
117 - Job task = new Job(i) ; 119 + RingBufferWheel.Task task = new Job(i) ;
118 task.setKey(i); 120 task.setKey(i);
119 wheel.addTask(task); 121 wheel.addTask(task);
120 } 122 }
@@ -125,14 +127,28 @@ public class RingBufferWheelTest { @@ -125,14 +127,28 @@ public class RingBufferWheelTest {
125 127
126 128
127 } 129 }
  130 + private static void test6() throws InterruptedException {
  131 + ExecutorService executorService = Executors.newFixedThreadPool(2) ;
128 132
  133 + RingBufferWheel wheel = new RingBufferWheel(executorService,512) ;
129 134
130 - private static class Task extends RingBufferWheel.Task{  
131 -  
132 - @Override  
133 - public void run() {  
134 - logger.info("================"); 135 + for (int i = 0; i < 10; i++) {
  136 + RingBufferWheel.Task task = new Job(i) ;
  137 + task.setKey(i);
  138 + wheel.addTask(task);
135 } 139 }
  140 +
  141 +
  142 + TimeUnit.SECONDS.sleep(10);
  143 + RingBufferWheel.Task task = new Job(15) ;
  144 + task.setKey(15);
  145 + wheel.addTask(task);
  146 +
  147 + logger.info("task size={}",wheel.taskSize());
  148 +
  149 + wheel.stop(false);
  150 +
  151 +
136 } 152 }
137 private static void cuncrrentTest6() throws InterruptedException { 153 private static void cuncrrentTest6() throws InterruptedException {
138 BlockingQueue<Runnable> queue = new LinkedBlockingQueue(10); 154 BlockingQueue<Runnable> queue = new LinkedBlockingQueue(10);
@@ -184,4 +200,13 @@ public class RingBufferWheelTest { @@ -184,4 +200,13 @@ public class RingBufferWheelTest {
184 logger.info("number={}" , num); 200 logger.info("number={}" , num);
185 } 201 }
186 } 202 }
  203 +
  204 + private static class Task extends RingBufferWheel.Task{
  205 +
  206 + @Override
  207 + public void run() {
  208 + logger.info("================");
  209 + }
  210 +
  211 + }
187 } 212 }