作者 crossoverJie

:sparkles: 1. thread safe ---> addTask

2. to optimize the api
@@ -7,6 +7,7 @@ import java.util.HashSet; @@ -7,6 +7,7 @@ import java.util.HashSet;
7 import java.util.Set; 7 import java.util.Set;
8 import java.util.concurrent.ExecutorService; 8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.TimeUnit; 9 import java.util.concurrent.TimeUnit;
  10 +import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicInteger; 11 import java.util.concurrent.atomic.AtomicInteger;
11 import java.util.concurrent.locks.Condition; 12 import java.util.concurrent.locks.Condition;
12 import java.util.concurrent.locks.Lock; 13 import java.util.concurrent.locks.Lock;
@@ -38,7 +39,7 @@ public final class RingBufferWheel { @@ -38,7 +39,7 @@ public final class RingBufferWheel {
38 */ 39 */
39 private ExecutorService executorService; 40 private ExecutorService executorService;
40 41
41 - private AtomicInteger taskSize = new AtomicInteger(); 42 + private volatile int size = 0 ;
42 43
43 /*** 44 /***
44 * task stop sign 45 * task stop sign
@@ -48,18 +49,19 @@ public final class RingBufferWheel { @@ -48,18 +49,19 @@ public final class RingBufferWheel {
48 /** 49 /**
49 * task start sign 50 * task start sign
50 */ 51 */
51 - private volatile boolean start = false ; 52 + private volatile AtomicBoolean start = new AtomicBoolean(false);
52 53
53 /** 54 /**
54 * total tick times 55 * total tick times
55 */ 56 */
56 - private AtomicInteger tick = new AtomicInteger() ; 57 + private AtomicInteger tick = new AtomicInteger();
57 58
58 private Lock lock = new ReentrantLock(); 59 private Lock lock = new ReentrantLock();
59 private Condition condition = lock.newCondition(); 60 private Condition condition = lock.newCondition();
60 61
61 /** 62 /**
62 * Create a new delay task ring buffer by default size 63 * Create a new delay task ring buffer by default size
  64 + *
63 * @param executorService the business thread pool 65 * @param executorService the business thread pool
64 */ 66 */
65 public RingBufferWheel(ExecutorService executorService) { 67 public RingBufferWheel(ExecutorService executorService) {
@@ -71,6 +73,7 @@ public final class RingBufferWheel { @@ -71,6 +73,7 @@ public final class RingBufferWheel {
71 73
72 /** 74 /**
73 * Create a new delay task ring buffer by custom buffer size 75 * Create a new delay task ring buffer by custom buffer size
  76 + *
74 * @param executorService the business thread pool 77 * @param executorService the business thread pool
75 * @param bufferSize custom buffer size 78 * @param bufferSize custom buffer size
76 */ 79 */
@@ -86,10 +89,14 @@ public final class RingBufferWheel { @@ -86,10 +89,14 @@ public final class RingBufferWheel {
86 89
87 /** 90 /**
88 * Add a task into the ring buffer 91 * Add a task into the ring buffer
  92 + *
89 * @param task business task extends RingBufferWheel.Task 93 * @param task business task extends RingBufferWheel.Task
90 */ 94 */
91 public void addTask(Task task) { 95 public void addTask(Task task) {
92 int key = task.getKey(); 96 int key = task.getKey();
  97 +
  98 + try {
  99 + lock.lock();
93 Set<Task> tasks = get(key); 100 Set<Task> tasks = get(key);
94 101
95 if (tasks != null) { 102 if (tasks != null) {
@@ -105,35 +112,45 @@ public final class RingBufferWheel { @@ -105,35 +112,45 @@ public final class RingBufferWheel {
105 sets.add(task); 112 sets.add(task);
106 put(key, sets); 113 put(key, sets);
107 } 114 }
108 -  
109 - taskSize.incrementAndGet(); 115 + size ++ ;
  116 + }finally {
  117 + lock.unlock();
  118 + }
110 119
111 start(); 120 start();
  121 +
  122 +
112 } 123 }
113 124
114 /** 125 /**
115 * thread safe 126 * thread safe
  127 + *
116 * @return the size of ring buffer 128 * @return the size of ring buffer
117 */ 129 */
118 public int taskSize() { 130 public int taskSize() {
119 - return taskSize.get(); 131 + return size;
120 } 132 }
121 133
122 /** 134 /**
123 * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop} 135 * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop}
124 */ 136 */
125 public void start() { 137 public void start() {
126 - if (!start){ 138 + if (!start.get()) {
  139 +
  140 + if (start.compareAndSet(start.get(), true)) {
127 logger.info("delay task is starting"); 141 logger.info("delay task is starting");
128 Thread job = new Thread(new TriggerJob()); 142 Thread job = new Thread(new TriggerJob());
129 job.setName("consumer RingBuffer thread"); 143 job.setName("consumer RingBuffer thread");
130 job.start(); 144 job.start();
131 - start = true ; 145 + start.set(true);
  146 + }
  147 +
132 } 148 }
133 } 149 }
134 150
135 /** 151 /**
136 * Stop consumer ring buffer thread 152 * Stop consumer ring buffer thread
  153 + *
137 * @param force True will force close consumer thread and discard all pending tasks 154 * @param force True will force close consumer thread and discard all pending tasks
138 * otherwise the consumer thread waits for all tasks to completes before closing. 155 * otherwise the consumer thread waits for all tasks to completes before closing.
139 */ 156 */
@@ -144,7 +161,7 @@ public final class RingBufferWheel { @@ -144,7 +161,7 @@ public final class RingBufferWheel {
144 executorService.shutdownNow(); 161 executorService.shutdownNow();
145 } else { 162 } else {
146 logger.info("delay task is stopping"); 163 logger.info("delay task is stopping");
147 - if (taskSize() > 0){ 164 + if (taskSize() > 0) {
148 try { 165 try {
149 lock.lock(); 166 lock.lock();
150 condition.await(); 167 condition.await();
@@ -202,11 +219,11 @@ public final class RingBufferWheel { @@ -202,11 +219,11 @@ public final class RingBufferWheel {
202 private void size2Notify() { 219 private void size2Notify() {
203 try { 220 try {
204 lock.lock(); 221 lock.lock();
205 - int size = taskSize.decrementAndGet(); 222 + size -- ;
206 if (size == 0) { 223 if (size == 0) {
207 condition.signal(); 224 condition.signal();
208 } 225 }
209 - }finally { 226 + } finally {
210 lock.unlock(); 227 lock.unlock();
211 } 228 }
212 } 229 }
@@ -225,7 +242,7 @@ public final class RingBufferWheel { @@ -225,7 +242,7 @@ public final class RingBufferWheel {
225 242
226 private int mod(int target, int mod) { 243 private int mod(int target, int mod) {
227 // equals target % mod 244 // equals target % mod
228 - target = target + tick.get() ; 245 + target = target + tick.get();
229 return target & (mod - 1); 246 return target & (mod - 1);
230 } 247 }
231 248
@@ -272,7 +289,7 @@ public final class RingBufferWheel { @@ -272,7 +289,7 @@ public final class RingBufferWheel {
272 public void run() { 289 public void run() {
273 int index = 0; 290 int index = 0;
274 while (!stop) { 291 while (!stop) {
275 - 292 + try {
276 Set<Task> tasks = remove(index); 293 Set<Task> tasks = remove(index);
277 for (Task task : tasks) { 294 for (Task task : tasks) {
278 executorService.submit(task); 295 executorService.submit(task);
@@ -284,11 +301,12 @@ public final class RingBufferWheel { @@ -284,11 +301,12 @@ public final class RingBufferWheel {
284 301
285 //Total tick number of records 302 //Total tick number of records
286 tick.incrementAndGet(); 303 tick.incrementAndGet();
287 - try {  
288 TimeUnit.SECONDS.sleep(1); 304 TimeUnit.SECONDS.sleep(1);
289 - } catch (InterruptedException e) {  
290 - logger.error("InterruptedException", e); 305 +
  306 + } catch (Exception e) {
  307 + logger.error("Exception", e);
291 } 308 }
  309 +
292 } 310 }
293 311
294 logger.info("delay task is stopped"); 312 logger.info("delay task is stopped");
@@ -3,6 +3,9 @@ package com.crossoverjie.cim.common.data.construct; @@ -3,6 +3,9 @@ package com.crossoverjie.cim.common.data.construct;
3 3
4 import com.google.common.util.concurrent.ThreadFactoryBuilder; 4 import com.google.common.util.concurrent.ThreadFactoryBuilder;
5 5
  6 +import io.netty.util.HashedWheelTimer;
  7 +import io.netty.util.Timeout;
  8 +import io.netty.util.TimerTask;
6 import org.slf4j.Logger; 9 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory; 10 import org.slf4j.LoggerFactory;
8 11
@@ -18,10 +21,10 @@ public class RingBufferWheelTest { @@ -18,10 +21,10 @@ public class RingBufferWheelTest {
18 21
19 private static Logger logger = LoggerFactory.getLogger(RingBufferWheelTest.class) ; 22 private static Logger logger = LoggerFactory.getLogger(RingBufferWheelTest.class) ;
20 23
21 - public static void main(String[] args) throws InterruptedException {  
22 - test5(); 24 + public static void main(String[] args) throws Exception {
  25 +
  26 + concurrentTest();
23 27
24 - return;  
25 } 28 }
26 29
27 private static void test1() throws InterruptedException { 30 private static void test1() throws InterruptedException {
@@ -150,7 +153,7 @@ public class RingBufferWheelTest { @@ -150,7 +153,7 @@ public class RingBufferWheelTest {
150 153
151 154
152 } 155 }
153 - private static void cuncrrentTest6() throws InterruptedException { 156 + private static void concurrentTest() throws Exception {
154 BlockingQueue<Runnable> queue = new LinkedBlockingQueue(10); 157 BlockingQueue<Runnable> queue = new LinkedBlockingQueue(10);
155 ThreadFactory product = new ThreadFactoryBuilder() 158 ThreadFactory product = new ThreadFactoryBuilder()
156 .setNameFormat("msg-callback-%d") 159 .setNameFormat("msg-callback-%d")
@@ -161,25 +164,19 @@ public class RingBufferWheelTest { @@ -161,25 +164,19 @@ public class RingBufferWheelTest {
161 ExecutorService executorService = Executors.newFixedThreadPool(10) ; 164 ExecutorService executorService = Executors.newFixedThreadPool(10) ;
162 RingBufferWheel wheel = new RingBufferWheel(executorService) ; 165 RingBufferWheel wheel = new RingBufferWheel(executorService) ;
163 166
  167 + for (int i = 0; i < 10; i++) {
164 business.execute(new Runnable() { 168 business.execute(new Runnable() {
165 @Override 169 @Override
166 public void run() { 170 public void run() {
167 - 171 + for (int i1 = 0; i1 < 30; i1++) {
  172 + RingBufferWheel.Task task = new Job(i1) ;
  173 + task.setKey(i1);
  174 + wheel.addTask(task);
  175 + }
168 } 176 }
169 }); 177 });
170 -  
171 - for (int i = 0; i < 10; i++) {  
172 - RingBufferWheel.Task task = new Job(i) ;  
173 - task.setKey(i);  
174 - wheel.addTask(task);  
175 } 178 }
176 179
177 -  
178 - TimeUnit.SECONDS.sleep(10);  
179 - RingBufferWheel.Task task = new Job(15) ;  
180 - task.setKey(15);  
181 - wheel.addTask(task);  
182 -  
183 logger.info("task size={}",wheel.taskSize()); 180 logger.info("task size={}",wheel.taskSize());
184 181
185 wheel.stop(false); 182 wheel.stop(false);
@@ -209,4 +206,37 @@ public class RingBufferWheelTest { @@ -209,4 +206,37 @@ public class RingBufferWheelTest {
209 } 206 }
210 207
211 } 208 }
  209 +
  210 +
  211 + public static void hashTimerTest(){
  212 +
  213 + BlockingQueue<Runnable> queue = new LinkedBlockingQueue(10);
  214 + ThreadFactory product = new ThreadFactoryBuilder()
  215 + .setNameFormat("msg-callback-%d")
  216 + .setDaemon(true)
  217 + .build();
  218 + ThreadPoolExecutor business = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MILLISECONDS, queue,product);
  219 + HashedWheelTimer hashedWheelTimer = new HashedWheelTimer() ;
  220 +
  221 + for (int i = 0; i < 10; i++) {
  222 + int finalI = i;
  223 + business.execute(new Runnable() {
  224 + @Override
  225 + public void run() {
  226 +
  227 + for (int i1 = 0; i1 < 10; i1++) {
  228 + hashedWheelTimer.newTimeout(new TimerTask() {
  229 + @Override
  230 + public void run(Timeout timeout) throws Exception {
  231 + logger.info("====" + finalI);
  232 + }
  233 + }, finalI,TimeUnit.SECONDS) ;
  234 + }
  235 + }
  236 + });
  237 + }
  238 +
  239 +
  240 +
  241 + }
212 } 242 }