作者 crossoverJie

:bulb: Documenting source code.

@@ -24,28 +24,60 @@ public final class RingBufferWheel { @@ -24,28 +24,60 @@ public final class RingBufferWheel {
24 private Logger logger = LoggerFactory.getLogger(RingBufferWheel.class); 24 private Logger logger = LoggerFactory.getLogger(RingBufferWheel.class);
25 25
26 26
  27 + /**
  28 + * default ring buffer size
  29 + */
27 private static final int STATIC_RING_SIZE = 64; 30 private static final int STATIC_RING_SIZE = 64;
28 31
29 private Object[] ringBuffer; 32 private Object[] ringBuffer;
30 33
31 private int bufferSize; 34 private int bufferSize;
32 35
  36 + /**
  37 + * business thread pool
  38 + */
33 private ExecutorService executorService; 39 private ExecutorService executorService;
34 40
35 private AtomicInteger taskSize = new AtomicInteger(); 41 private AtomicInteger taskSize = new AtomicInteger();
36 42
  43 + /***
  44 + * task running sign
  45 + */
37 private volatile boolean stop = false; 46 private volatile boolean stop = false;
38 47
39 private Lock lock = new ReentrantLock(); 48 private Lock lock = new ReentrantLock();
40 private Condition condition = lock.newCondition(); 49 private Condition condition = lock.newCondition();
41 50
  51 + /**
  52 + * Create a new delay task ring buffer by default size
  53 + * @param executorService the business thread pool
  54 + */
42 public RingBufferWheel(ExecutorService executorService) { 55 public RingBufferWheel(ExecutorService executorService) {
43 this.executorService = executorService; 56 this.executorService = executorService;
44 - bufferSize = STATIC_RING_SIZE;  
45 - ringBuffer = new Object[bufferSize]; 57 + this.bufferSize = STATIC_RING_SIZE;
  58 + this.ringBuffer = new Object[bufferSize];
46 } 59 }
47 60
48 61
  62 + /**
  63 + * Create a new delay task ring buffer by custom buffer size
  64 + * @param executorService the business thread pool
  65 + * @param bufferSize custom buffer size
  66 + */
  67 + public RingBufferWheel(ExecutorService executorService, int bufferSize) {
  68 + this(executorService);
  69 +
  70 + if (!powerOf2(bufferSize)) {
  71 + throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2");
  72 + }
  73 + this.bufferSize = bufferSize;
  74 + this.ringBuffer = new Object[bufferSize];
  75 + }
  76 +
  77 + /**
  78 + * Add a task into the ring buffer
  79 + * @param task business task extends RingBufferWheel.Task
  80 + */
49 public void addTask(Task task) { 81 public void addTask(Task task) {
50 int key = task.getKey(); 82 int key = task.getKey();
51 Set<Task> tasks = get(key); 83 Set<Task> tasks = get(key);
@@ -68,10 +100,17 @@ public final class RingBufferWheel { @@ -68,10 +100,17 @@ public final class RingBufferWheel {
68 100
69 } 101 }
70 102
  103 + /**
  104 + * thread safe
  105 + * @return the size of ring buffer
  106 + */
71 public int taskSize() { 107 public int taskSize() {
72 return taskSize.get(); 108 return taskSize.get();
73 } 109 }
74 110
  111 + /**
  112 + * Start background thread to consumer wheel timer, it will run until you call method {@link #stop}
  113 + */
75 public void start() { 114 public void start() {
76 logger.info("delay task is starting"); 115 logger.info("delay task is starting");
77 Thread job = new Thread(new TriggerJob()); 116 Thread job = new Thread(new TriggerJob());
@@ -79,6 +118,11 @@ public final class RingBufferWheel { @@ -79,6 +118,11 @@ public final class RingBufferWheel {
79 job.start(); 118 job.start();
80 } 119 }
81 120
  121 + /**
  122 + * Stop consumer ring buffer thread
  123 + * @param force True will force close consumer thread and discard all pending tasks
  124 + * otherwise the consumer thread waits for all tasks to completes before closing.
  125 + */
82 public void stop(boolean force) { 126 public void stop(boolean force) {
83 if (force) { 127 if (force) {
84 logger.info("delay task is forced stop"); 128 logger.info("delay task is forced stop");
@@ -148,6 +192,18 @@ public final class RingBufferWheel { @@ -148,6 +192,18 @@ public final class RingBufferWheel {
148 lock.unlock(); 192 lock.unlock();
149 } 193 }
150 194
  195 + private boolean powerOf2(int target) {
  196 + if (target < 0) {
  197 + return false;
  198 + }
  199 + int value = target & (target - 1);
  200 + if (value != 0) {
  201 + return false;
  202 + }
  203 +
  204 + return true;
  205 + }
  206 +
151 private int mod(int target, int mod) { 207 private int mod(int target, int mod) {
152 // equals target % mod 208 // equals target % mod
153 return target & (mod - 1); 209 return target & (mod - 1);
@@ -158,6 +214,9 @@ public final class RingBufferWheel { @@ -158,6 +214,9 @@ public final class RingBufferWheel {
158 return target >> Integer.bitCount(mod - 1); 214 return target >> Integer.bitCount(mod - 1);
159 } 215 }
160 216
  217 + /**
  218 + * An abstract class used to implement business.
  219 + */
161 public abstract static class Task extends Thread { 220 public abstract static class Task extends Thread {
162 221
163 222
@@ -13,7 +13,7 @@ public class RingBufferWheelTest { @@ -13,7 +13,7 @@ public class RingBufferWheelTest {
13 private static Logger logger = LoggerFactory.getLogger(RingBufferWheelTest.class) ; 13 private static Logger logger = LoggerFactory.getLogger(RingBufferWheelTest.class) ;
14 14
15 public static void main(String[] args) throws InterruptedException { 15 public static void main(String[] args) throws InterruptedException {
16 - test4(); 16 + test5();
17 17
18 return; 18 return;
19 } 19 }
@@ -106,6 +106,25 @@ public class RingBufferWheelTest { @@ -106,6 +106,25 @@ public class RingBufferWheelTest {
106 106
107 107
108 } 108 }
  109 + private static void test5() throws InterruptedException {
  110 + ExecutorService executorService = Executors.newFixedThreadPool(2) ;
  111 +
  112 + RingBufferWheel wheel = new RingBufferWheel(executorService,512) ;
  113 +
  114 + for (int i = 0; i < 65; i++) {
  115 + Job task = new Job(i) ;
  116 + task.setKey(i);
  117 + wheel.addTask(task);
  118 + }
  119 +
  120 + wheel.start();
  121 +
  122 + logger.info("task size={}",wheel.taskSize());
  123 +
  124 + wheel.stop(false);
  125 +
  126 +
  127 + }
109 128
110 129
111 private static class Task extends RingBufferWheel.Task{ 130 private static class Task extends RingBufferWheel.Task{