作者 crossoverJie

:bulb: Documenting source code.

@@ -3,7 +3,9 @@ package com.crossoverjie.cim.common.data.construct; @@ -3,7 +3,9 @@ package com.crossoverjie.cim.common.data.construct;
3 import org.slf4j.Logger; 3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory; 4 import org.slf4j.LoggerFactory;
5 5
  6 +import java.util.HashMap;
6 import java.util.HashSet; 7 import java.util.HashSet;
  8 +import java.util.Map;
7 import java.util.Set; 9 import java.util.Set;
8 import java.util.concurrent.ExecutorService; 10 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.TimeUnit; 11 import java.util.concurrent.TimeUnit;
@@ -39,7 +41,7 @@ public final class RingBufferWheel { @@ -39,7 +41,7 @@ public final class RingBufferWheel {
39 */ 41 */
40 private ExecutorService executorService; 42 private ExecutorService executorService;
41 43
42 - private volatile int size = 0 ; 44 + private volatile int size = 0;
43 45
44 /*** 46 /***
45 * task stop sign 47 * task stop sign
@@ -59,6 +61,9 @@ public final class RingBufferWheel { @@ -59,6 +61,9 @@ public final class RingBufferWheel {
59 private Lock lock = new ReentrantLock(); 61 private Lock lock = new ReentrantLock();
60 private Condition condition = lock.newCondition(); 62 private Condition condition = lock.newCondition();
61 63
  64 + private AtomicInteger taskId = new AtomicInteger();
  65 + private Map<Integer, Task> taskMap = new HashMap<>(16);
  66 +
62 /** 67 /**
63 * Create a new delay task ring buffer by default size 68 * Create a new delay task ring buffer by default size
64 * 69 *
@@ -88,23 +93,25 @@ public final class RingBufferWheel { @@ -88,23 +93,25 @@ public final class RingBufferWheel {
88 } 93 }
89 94
90 /** 95 /**
91 - * Add a task into the ring buffer 96 + * Add a task into the ring buffer(thread safe)
92 * 97 *
93 - * @param task business task extends RingBufferWheel.Task 98 + * @param task business task extends {@link Task}
94 */ 99 */
95 - public void addTask(Task task) { 100 + public int addTask(Task task) {
96 int key = task.getKey(); 101 int key = task.getKey();
  102 + int id;
97 103
98 try { 104 try {
99 lock.lock(); 105 lock.lock();
100 - Set<Task> tasks = get(key); 106 + int index = mod(key, bufferSize);
  107 + task.setIndex(index);
  108 + Set<Task> tasks = get(index);
101 109
102 if (tasks != null) { 110 if (tasks != null) {
103 int cycleNum = cycleNum(key, bufferSize); 111 int cycleNum = cycleNum(key, bufferSize);
104 task.setCycleNum(cycleNum); 112 task.setCycleNum(cycleNum);
105 tasks.add(task); 113 tasks.add(task);
106 } else { 114 } else {
107 - int index = mod(key, bufferSize);  
108 int cycleNum = cycleNum(key, bufferSize); 115 int cycleNum = cycleNum(key, bufferSize);
109 task.setCycleNum(index); 116 task.setCycleNum(index);
110 task.setCycleNum(cycleNum); 117 task.setCycleNum(cycleNum);
@@ -112,18 +119,57 @@ public final class RingBufferWheel { @@ -112,18 +119,57 @@ public final class RingBufferWheel {
112 sets.add(task); 119 sets.add(task);
113 put(key, sets); 120 put(key, sets);
114 } 121 }
115 - size ++ ;  
116 - }finally { 122 + id = taskId.incrementAndGet();
  123 + taskMap.put(id, task);
  124 + size++;
  125 + } finally {
117 lock.unlock(); 126 lock.unlock();
118 } 127 }
119 128
120 start(); 129 start();
121 130
  131 + return id;
  132 + }
  133 +
  134 +
  135 + /**
  136 + * Cancel task by taskId
  137 + * @param id unique id through {@link #addTask(Task)}
  138 + * @return
  139 + */
  140 + public boolean cancel(int id) {
  141 +
  142 + boolean flag = false;
  143 + Set<Task> tempTask = new HashSet<>();
  144 +
  145 + try {
  146 + lock.lock();
  147 + Task task = taskMap.get(id);
  148 + if (task == null) {
  149 + return false;
  150 + }
122 151
  152 + Set<Task> tasks = get(task.getIndex());
  153 + for (Task tk : tasks) {
  154 + if (tk.getKey() == task.getKey() && tk.getCycleNum() == task.getCycleNum()) {
  155 + size--;
  156 + flag = true;
  157 + } else {
  158 + tempTask.add(tk);
  159 + }
  160 +
  161 + }
  162 + //update origin data
  163 + ringBuffer[task.getIndex()] = tempTask;
  164 + } finally {
  165 + lock.unlock();
  166 + }
  167 +
  168 + return flag;
123 } 169 }
124 170
125 /** 171 /**
126 - * thread safe 172 + * Thread safe
127 * 173 *
128 * @return the size of ring buffer 174 * @return the size of ring buffer
129 */ 175 */
@@ -179,8 +225,7 @@ public final class RingBufferWheel { @@ -179,8 +225,7 @@ public final class RingBufferWheel {
179 } 225 }
180 226
181 227
182 - private Set<Task> get(int key) {  
183 - int index = mod(key, bufferSize); 228 + private Set<Task> get(int index) {
184 return (Set<Task>) ringBuffer[index]; 229 return (Set<Task>) ringBuffer[index];
185 } 230 }
186 231
@@ -219,7 +264,7 @@ public final class RingBufferWheel { @@ -219,7 +264,7 @@ public final class RingBufferWheel {
219 private void size2Notify() { 264 private void size2Notify() {
220 try { 265 try {
221 lock.lock(); 266 lock.lock();
222 - size -- ; 267 + size--;
223 if (size == 0) { 268 if (size == 0) {
224 condition.signal(); 269 condition.signal();
225 } 270 }
@@ -256,6 +301,7 @@ public final class RingBufferWheel { @@ -256,6 +301,7 @@ public final class RingBufferWheel {
256 */ 301 */
257 public abstract static class Task extends Thread { 302 public abstract static class Task extends Thread {
258 303
  304 + private int index;
259 305
260 private int cycleNum; 306 private int cycleNum;
261 307
@@ -280,6 +326,14 @@ public final class RingBufferWheel { @@ -280,6 +326,14 @@ public final class RingBufferWheel {
280 private void setCycleNum(int cycleNum) { 326 private void setCycleNum(int cycleNum) {
281 this.cycleNum = cycleNum; 327 this.cycleNum = cycleNum;
282 } 328 }
  329 +
  330 + public int getIndex() {
  331 + return index;
  332 + }
  333 +
  334 + private void setIndex(int index) {
  335 + this.index = index;
  336 + }
283 } 337 }
284 338
285 339
@@ -2,7 +2,6 @@ package com.crossoverjie.cim.common.data.construct; @@ -2,7 +2,6 @@ package com.crossoverjie.cim.common.data.construct;
2 2
3 3
4 import com.google.common.util.concurrent.ThreadFactoryBuilder; 4 import com.google.common.util.concurrent.ThreadFactoryBuilder;
5 -  
6 import io.netty.util.HashedWheelTimer; 5 import io.netty.util.HashedWheelTimer;
7 import io.netty.util.Timeout; 6 import io.netty.util.Timeout;
8 import io.netty.util.TimerTask; 7 import io.netty.util.TimerTask;
@@ -23,7 +22,7 @@ public class RingBufferWheelTest { @@ -23,7 +22,7 @@ public class RingBufferWheelTest {
23 22
24 public static void main(String[] args) throws Exception { 23 public static void main(String[] args) throws Exception {
25 24
26 - test6(); 25 + test7();
27 26
28 } 27 }
29 28
@@ -147,9 +146,38 @@ public class RingBufferWheelTest { @@ -147,9 +146,38 @@ public class RingBufferWheelTest {
147 logger.info("task size={}",wheel.taskSize()); 146 logger.info("task size={}",wheel.taskSize());
148 147
149 wheel.stop(false); 148 wheel.stop(false);
  149 + }
  150 +
  151 + private static void test7() throws InterruptedException {
  152 + ExecutorService executorService = Executors.newFixedThreadPool(2) ;
  153 +
  154 + RingBufferWheel wheel = new RingBufferWheel(executorService,512) ;
150 155
  156 + for (int i = 0; i < 10; i++) {
  157 + RingBufferWheel.Task task = new Job(i) ;
  158 + task.setKey(i);
  159 + wheel.addTask(task);
  160 + }
151 161
  162 + RingBufferWheel.Task task = new Job(15) ;
  163 + task.setKey(15);
  164 + int cancel = wheel.addTask(task);
  165 +
  166 + new Thread(() -> {
  167 + boolean flag = wheel.cancel(cancel);
  168 + logger.info("cancel task={}",flag) ;
  169 + }).start();
  170 +
  171 + RingBufferWheel.Task task1 = new Job(20) ;
  172 + task1.setKey(20);
  173 + wheel.addTask(task1) ;
  174 +
  175 + logger.info("task size={}",wheel.taskSize());
  176 +
  177 + wheel.stop(false);
152 } 178 }
  179 +
  180 +
153 private static void concurrentTest() throws Exception { 181 private static void concurrentTest() throws Exception {
154 BlockingQueue<Runnable> queue = new LinkedBlockingQueue(10); 182 BlockingQueue<Runnable> queue = new LinkedBlockingQueue(10);
155 ThreadFactory product = new ThreadFactoryBuilder() 183 ThreadFactory product = new ThreadFactoryBuilder()