作者 crossoverJie
提交者 GitHub

Merge pull request #58 from crossoverJie/cim-1.0.7

cim 1.0.7
@@ -40,7 +40,6 @@ public class DelayMsgCommand implements InnerCommand { @@ -40,7 +40,6 @@ public class DelayMsgCommand implements InnerCommand {
40 RingBufferWheel.Task task = new DelayMsgJob(message) ; 40 RingBufferWheel.Task task = new DelayMsgJob(message) ;
41 task.setKey(delayTime); 41 task.setKey(delayTime);
42 ringBufferWheel.addTask(task); 42 ringBufferWheel.addTask(task);
43 - ringBufferWheel.start();  
44 echoService.echo(EmojiParser.parseToUnicode(msg)); 43 echoService.echo(EmojiParser.parseToUnicode(msg));
45 } 44 }
46 45
@@ -3,10 +3,13 @@ package com.crossoverjie.cim.common.data.construct; @@ -3,10 +3,13 @@ package com.crossoverjie.cim.common.data.construct;
3 import org.slf4j.Logger; 3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory; 4 import org.slf4j.LoggerFactory;
5 5
  6 +import java.util.HashMap;
6 import java.util.HashSet; 7 import java.util.HashSet;
  8 +import java.util.Map;
7 import java.util.Set; 9 import java.util.Set;
8 import java.util.concurrent.ExecutorService; 10 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.TimeUnit; 11 import java.util.concurrent.TimeUnit;
  12 +import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicInteger; 13 import java.util.concurrent.atomic.AtomicInteger;
11 import java.util.concurrent.locks.Condition; 14 import java.util.concurrent.locks.Condition;
12 import java.util.concurrent.locks.Lock; 15 import java.util.concurrent.locks.Lock;
@@ -38,25 +41,32 @@ public final class RingBufferWheel { @@ -38,25 +41,32 @@ public final class RingBufferWheel {
38 */ 41 */
39 private ExecutorService executorService; 42 private ExecutorService executorService;
40 43
41 - private AtomicInteger taskSize = new AtomicInteger(); 44 + private volatile int size = 0;
42 45
43 /*** 46 /***
44 - * task running sign 47 + * task stop sign
45 */ 48 */
46 private volatile boolean stop = false; 49 private volatile boolean stop = false;
47 50
48 - private volatile boolean start = false ; 51 + /**
  52 + * task start sign
  53 + */
  54 + private volatile AtomicBoolean start = new AtomicBoolean(false);
49 55
50 /** 56 /**
51 * total tick times 57 * total tick times
52 */ 58 */
53 - private AtomicInteger tick = new AtomicInteger() ; 59 + private AtomicInteger tick = new AtomicInteger();
54 60
55 private Lock lock = new ReentrantLock(); 61 private Lock lock = new ReentrantLock();
56 private Condition condition = lock.newCondition(); 62 private Condition condition = lock.newCondition();
57 63
  64 + private AtomicInteger taskId = new AtomicInteger();
  65 + private Map<Integer, Task> taskMap = new HashMap<>(16);
  66 +
58 /** 67 /**
59 * Create a new delay task ring buffer by default size 68 * Create a new delay task ring buffer by default size
  69 + *
60 * @param executorService the business thread pool 70 * @param executorService the business thread pool
61 */ 71 */
62 public RingBufferWheel(ExecutorService executorService) { 72 public RingBufferWheel(ExecutorService executorService) {
@@ -68,8 +78,9 @@ public final class RingBufferWheel { @@ -68,8 +78,9 @@ public final class RingBufferWheel {
68 78
69 /** 79 /**
70 * Create a new delay task ring buffer by custom buffer size 80 * Create a new delay task ring buffer by custom buffer size
  81 + *
71 * @param executorService the business thread pool 82 * @param executorService the business thread pool
72 - * @param bufferSize custom buffer size 83 + * @param bufferSize custom buffer size
73 */ 84 */
74 public RingBufferWheel(ExecutorService executorService, int bufferSize) { 85 public RingBufferWheel(ExecutorService executorService, int bufferSize) {
75 this(executorService); 86 this(executorService);
@@ -82,56 +93,110 @@ public final class RingBufferWheel { @@ -82,56 +93,110 @@ public final class RingBufferWheel {
82 } 93 }
83 94
84 /** 95 /**
85 - * Add a task into the ring buffer  
86 - * @param task business task extends RingBufferWheel.Task 96 + * Add a task into the ring buffer(thread safe)
  97 + *
  98 + * @param task business task extends {@link Task}
87 */ 99 */
88 - public void addTask(Task task) { 100 + public int addTask(Task task) {
89 int key = task.getKey(); 101 int key = task.getKey();
90 - Set<Task> tasks = get(key); 102 + int id;
91 103
92 - if (tasks != null) {  
93 - int cycleNum = cycleNum(key, bufferSize);  
94 - task.setCycleNum(cycleNum);  
95 - tasks.add(task);  
96 - } else { 104 + try {
  105 + lock.lock();
97 int index = mod(key, bufferSize); 106 int index = mod(key, bufferSize);
98 - int cycleNum = cycleNum(key, bufferSize);  
99 - task.setCycleNum(index);  
100 - task.setCycleNum(cycleNum);  
101 - Set<Task> sets = new HashSet<>();  
102 - sets.add(task);  
103 - put(key, sets); 107 + task.setIndex(index);
  108 + Set<Task> tasks = get(index);
  109 +
  110 + if (tasks != null) {
  111 + int cycleNum = cycleNum(key, bufferSize);
  112 + task.setCycleNum(cycleNum);
  113 + tasks.add(task);
  114 + } else {
  115 + int cycleNum = cycleNum(key, bufferSize);
  116 + task.setCycleNum(index);
  117 + task.setCycleNum(cycleNum);
  118 + Set<Task> sets = new HashSet<>();
  119 + sets.add(task);
  120 + put(key, sets);
  121 + }
  122 + id = taskId.incrementAndGet();
  123 + taskMap.put(id, task);
  124 + size++;
  125 + } finally {
  126 + lock.unlock();
104 } 127 }
105 128
106 - taskSize.incrementAndGet(); 129 + start();
  130 +
  131 + return id;
  132 + }
  133 +
  134 +
  135 + /**
  136 + * Cancel task by taskId
  137 + * @param id unique id through {@link #addTask(Task)}
  138 + * @return
  139 + */
  140 + public boolean cancel(int id) {
  141 +
  142 + boolean flag = false;
  143 + Set<Task> tempTask = new HashSet<>();
  144 +
  145 + try {
  146 + lock.lock();
  147 + Task task = taskMap.get(id);
  148 + if (task == null) {
  149 + return false;
  150 + }
  151 +
  152 + Set<Task> tasks = get(task.getIndex());
  153 + for (Task tk : tasks) {
  154 + if (tk.getKey() == task.getKey() && tk.getCycleNum() == task.getCycleNum()) {
  155 + size--;
  156 + flag = true;
  157 + } else {
  158 + tempTask.add(tk);
  159 + }
  160 +
  161 + }
  162 + //update origin data
  163 + ringBuffer[task.getIndex()] = tempTask;
  164 + } finally {
  165 + lock.unlock();
  166 + }
107 167
  168 + return flag;
108 } 169 }
109 170
110 /** 171 /**
111 - * thread safe 172 + * Thread safe
  173 + *
112 * @return the size of ring buffer 174 * @return the size of ring buffer
113 */ 175 */
114 public int taskSize() { 176 public int taskSize() {
115 - return taskSize.get(); 177 + return size;
116 } 178 }
117 179
118 /** 180 /**
119 * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop} 181 * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop}
120 */ 182 */
121 public void start() { 183 public void start() {
  184 + if (!start.get()) {
  185 +
  186 + if (start.compareAndSet(start.get(), true)) {
  187 + logger.info("delay task is starting");
  188 + Thread job = new Thread(new TriggerJob());
  189 + job.setName("consumer RingBuffer thread");
  190 + job.start();
  191 + start.set(true);
  192 + }
122 193
123 - if (!start){  
124 - logger.info("delay task is starting");  
125 - Thread job = new Thread(new TriggerJob());  
126 - job.setName("consumer RingBuffer thread");  
127 - job.start();  
128 - start = true ;  
129 } 194 }
130 -  
131 } 195 }
132 196
133 /** 197 /**
134 * Stop consumer ring buffer thread 198 * Stop consumer ring buffer thread
  199 + *
135 * @param force True will force close consumer thread and discard all pending tasks 200 * @param force True will force close consumer thread and discard all pending tasks
136 * otherwise the consumer thread waits for all tasks to completes before closing. 201 * otherwise the consumer thread waits for all tasks to completes before closing.
137 */ 202 */
@@ -142,7 +207,7 @@ public final class RingBufferWheel { @@ -142,7 +207,7 @@ public final class RingBufferWheel {
142 executorService.shutdownNow(); 207 executorService.shutdownNow();
143 } else { 208 } else {
144 logger.info("delay task is stopping"); 209 logger.info("delay task is stopping");
145 - if (taskSize() > 0){ 210 + if (taskSize() > 0) {
146 try { 211 try {
147 lock.lock(); 212 lock.lock();
148 condition.await(); 213 condition.await();
@@ -160,8 +225,7 @@ public final class RingBufferWheel { @@ -160,8 +225,7 @@ public final class RingBufferWheel {
160 } 225 }
161 226
162 227
163 - private Set<Task> get(int key) {  
164 - int index = mod(key, bufferSize); 228 + private Set<Task> get(int index) {
165 return (Set<Task>) ringBuffer[index]; 229 return (Set<Task>) ringBuffer[index];
166 } 230 }
167 231
@@ -200,11 +264,11 @@ public final class RingBufferWheel { @@ -200,11 +264,11 @@ public final class RingBufferWheel {
200 private void size2Notify() { 264 private void size2Notify() {
201 try { 265 try {
202 lock.lock(); 266 lock.lock();
203 - int size = taskSize.decrementAndGet(); 267 + size--;
204 if (size == 0) { 268 if (size == 0) {
205 condition.signal(); 269 condition.signal();
206 } 270 }
207 - }finally { 271 + } finally {
208 lock.unlock(); 272 lock.unlock();
209 } 273 }
210 } 274 }
@@ -223,7 +287,7 @@ public final class RingBufferWheel { @@ -223,7 +287,7 @@ public final class RingBufferWheel {
223 287
224 private int mod(int target, int mod) { 288 private int mod(int target, int mod) {
225 // equals target % mod 289 // equals target % mod
226 - target = target + tick.get() ; 290 + target = target + tick.get();
227 return target & (mod - 1); 291 return target & (mod - 1);
228 } 292 }
229 293
@@ -237,6 +301,7 @@ public final class RingBufferWheel { @@ -237,6 +301,7 @@ public final class RingBufferWheel {
237 */ 301 */
238 public abstract static class Task extends Thread { 302 public abstract static class Task extends Thread {
239 303
  304 + private int index;
240 305
241 private int cycleNum; 306 private int cycleNum;
242 307
@@ -261,6 +326,14 @@ public final class RingBufferWheel { @@ -261,6 +326,14 @@ public final class RingBufferWheel {
261 private void setCycleNum(int cycleNum) { 326 private void setCycleNum(int cycleNum) {
262 this.cycleNum = cycleNum; 327 this.cycleNum = cycleNum;
263 } 328 }
  329 +
  330 + public int getIndex() {
  331 + return index;
  332 + }
  333 +
  334 + private void setIndex(int index) {
  335 + this.index = index;
  336 + }
264 } 337 }
265 338
266 339
@@ -270,23 +343,24 @@ public final class RingBufferWheel { @@ -270,23 +343,24 @@ public final class RingBufferWheel {
270 public void run() { 343 public void run() {
271 int index = 0; 344 int index = 0;
272 while (!stop) { 345 while (!stop) {
  346 + try {
  347 + Set<Task> tasks = remove(index);
  348 + for (Task task : tasks) {
  349 + executorService.submit(task);
  350 + }
273 351
274 - Set<Task> tasks = remove(index);  
275 - for (Task task : tasks) {  
276 - executorService.submit(task);  
277 - }  
278 -  
279 - if (++index > bufferSize - 1) {  
280 - index = 0;  
281 - } 352 + if (++index > bufferSize - 1) {
  353 + index = 0;
  354 + }
282 355
283 - //Total tick number of records  
284 - tick.incrementAndGet();  
285 - try { 356 + //Total tick number of records
  357 + tick.incrementAndGet();
286 TimeUnit.SECONDS.sleep(1); 358 TimeUnit.SECONDS.sleep(1);
287 - } catch (InterruptedException e) {  
288 - logger.error("InterruptedException", e); 359 +
  360 + } catch (Exception e) {
  361 + logger.error("Exception", e);
289 } 362 }
  363 +
290 } 364 }
291 365
292 logger.info("delay task is stopped"); 366 logger.info("delay task is stopped");
1 package com.crossoverjie.cim.common.data.construct; 1 package com.crossoverjie.cim.common.data.construct;
2 2
  3 +
  4 +import com.google.common.util.concurrent.ThreadFactoryBuilder;
  5 +import io.netty.util.HashedWheelTimer;
  6 +import io.netty.util.Timeout;
  7 +import io.netty.util.TimerTask;
3 import org.slf4j.Logger; 8 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory; 9 import org.slf4j.LoggerFactory;
5 10
  11 +import java.util.concurrent.BlockingQueue;
6 import java.util.concurrent.ExecutorService; 12 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Executors; 13 import java.util.concurrent.Executors;
  14 +import java.util.concurrent.LinkedBlockingQueue;
  15 +import java.util.concurrent.ThreadFactory;
  16 +import java.util.concurrent.ThreadPoolExecutor;
8 import java.util.concurrent.TimeUnit; 17 import java.util.concurrent.TimeUnit;
9 18
10 public class RingBufferWheelTest { 19 public class RingBufferWheelTest {
11 20
12 private static Logger logger = LoggerFactory.getLogger(RingBufferWheelTest.class) ; 21 private static Logger logger = LoggerFactory.getLogger(RingBufferWheelTest.class) ;
13 22
14 - public static void main(String[] args) throws InterruptedException {  
15 - test6(); 23 + public static void main(String[] args) throws Exception {
  24 +
  25 + test7();
16 26
17 - return;  
18 } 27 }
19 28
20 private static void test1() throws InterruptedException { 29 private static void test1() throws InterruptedException {
@@ -29,8 +38,6 @@ public class RingBufferWheelTest { @@ -29,8 +38,6 @@ public class RingBufferWheelTest {
29 task.setKey(74); 38 task.setKey(74);
30 wheel.addTask(task) ; 39 wheel.addTask(task) ;
31 40
32 - wheel.start();  
33 -  
34 while (true){ 41 while (true){
35 logger.info("task size={}" , wheel.taskSize()); 42 logger.info("task size={}" , wheel.taskSize());
36 TimeUnit.SECONDS.sleep(1); 43 TimeUnit.SECONDS.sleep(1);
@@ -75,11 +82,9 @@ public class RingBufferWheelTest { @@ -75,11 +82,9 @@ public class RingBufferWheelTest {
75 wheel.addTask(task) ; 82 wheel.addTask(task) ;
76 83
77 task = new Task() ; 84 task = new Task() ;
78 - task.setKey(74); 85 + task.setKey(60);
79 wheel.addTask(task) ; 86 wheel.addTask(task) ;
80 87
81 - wheel.start();  
82 -  
83 88
84 TimeUnit.SECONDS.sleep(2); 89 TimeUnit.SECONDS.sleep(2);
85 wheel.stop(false); 90 wheel.stop(false);
@@ -116,8 +121,6 @@ public class RingBufferWheelTest { @@ -116,8 +121,6 @@ public class RingBufferWheelTest {
116 wheel.addTask(task); 121 wheel.addTask(task);
117 } 122 }
118 123
119 - wheel.start();  
120 -  
121 logger.info("task size={}",wheel.taskSize()); 124 logger.info("task size={}",wheel.taskSize());
122 125
123 wheel.stop(false); 126 wheel.stop(false);
@@ -135,13 +138,69 @@ public class RingBufferWheelTest { @@ -135,13 +138,69 @@ public class RingBufferWheelTest {
135 wheel.addTask(task); 138 wheel.addTask(task);
136 } 139 }
137 140
138 - wheel.start();  
139 -  
140 - TimeUnit.SECONDS.sleep(10); 141 + TimeUnit.SECONDS.sleep(5);
141 RingBufferWheel.Task task = new Job(15) ; 142 RingBufferWheel.Task task = new Job(15) ;
142 task.setKey(15); 143 task.setKey(15);
143 wheel.addTask(task); 144 wheel.addTask(task);
144 - wheel.start(); 145 +
  146 + logger.info("task size={}",wheel.taskSize());
  147 +
  148 + wheel.stop(false);
  149 + }
  150 +
  151 + private static void test7() throws InterruptedException {
  152 + ExecutorService executorService = Executors.newFixedThreadPool(2) ;
  153 +
  154 + RingBufferWheel wheel = new RingBufferWheel(executorService,512) ;
  155 +
  156 + for (int i = 0; i < 10; i++) {
  157 + RingBufferWheel.Task task = new Job(i) ;
  158 + task.setKey(i);
  159 + wheel.addTask(task);
  160 + }
  161 +
  162 + RingBufferWheel.Task task = new Job(15) ;
  163 + task.setKey(15);
  164 + int cancel = wheel.addTask(task);
  165 +
  166 + new Thread(() -> {
  167 + boolean flag = wheel.cancel(cancel);
  168 + logger.info("cancel task={}",flag) ;
  169 + }).start();
  170 +
  171 + RingBufferWheel.Task task1 = new Job(20) ;
  172 + task1.setKey(20);
  173 + wheel.addTask(task1) ;
  174 +
  175 + logger.info("task size={}",wheel.taskSize());
  176 +
  177 + wheel.stop(false);
  178 + }
  179 +
  180 +
  181 + private static void concurrentTest() throws Exception {
  182 + BlockingQueue<Runnable> queue = new LinkedBlockingQueue(10);
  183 + ThreadFactory product = new ThreadFactoryBuilder()
  184 + .setNameFormat("msg-callback-%d")
  185 + .setDaemon(true)
  186 + .build();
  187 + ThreadPoolExecutor business = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MILLISECONDS, queue,product);
  188 +
  189 + ExecutorService executorService = Executors.newFixedThreadPool(10) ;
  190 + RingBufferWheel wheel = new RingBufferWheel(executorService) ;
  191 +
  192 + for (int i = 0; i < 10; i++) {
  193 + business.execute(new Runnable() {
  194 + @Override
  195 + public void run() {
  196 + for (int i1 = 0; i1 < 30; i1++) {
  197 + RingBufferWheel.Task task = new Job(i1) ;
  198 + task.setKey(i1);
  199 + wheel.addTask(task);
  200 + }
  201 + }
  202 + });
  203 + }
145 204
146 logger.info("task size={}",wheel.taskSize()); 205 logger.info("task size={}",wheel.taskSize());
147 206
@@ -172,4 +231,37 @@ public class RingBufferWheelTest { @@ -172,4 +231,37 @@ public class RingBufferWheelTest {
172 } 231 }
173 232
174 } 233 }
  234 +
  235 +
  236 + public static void hashTimerTest(){
  237 +
  238 + BlockingQueue<Runnable> queue = new LinkedBlockingQueue(10);
  239 + ThreadFactory product = new ThreadFactoryBuilder()
  240 + .setNameFormat("msg-callback-%d")
  241 + .setDaemon(true)
  242 + .build();
  243 + ThreadPoolExecutor business = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MILLISECONDS, queue,product);
  244 + HashedWheelTimer hashedWheelTimer = new HashedWheelTimer() ;
  245 +
  246 + for (int i = 0; i < 10; i++) {
  247 + int finalI = i;
  248 + business.execute(new Runnable() {
  249 + @Override
  250 + public void run() {
  251 +
  252 + for (int i1 = 0; i1 < 10; i1++) {
  253 + hashedWheelTimer.newTimeout(new TimerTask() {
  254 + @Override
  255 + public void run(Timeout timeout) throws Exception {
  256 + logger.info("====" + finalI);
  257 + }
  258 + }, finalI,TimeUnit.SECONDS) ;
  259 + }
  260 + }
  261 + });
  262 + }
  263 +
  264 +
  265 +
  266 + }
175 } 267 }