作者 crossoverJie

:bug: fix add repeat delay job exception

@@ -45,6 +45,13 @@ public final class RingBufferWheel { @@ -45,6 +45,13 @@ public final class RingBufferWheel {
45 */ 45 */
46 private volatile boolean stop = false; 46 private volatile boolean stop = false;
47 47
  48 + private volatile boolean start = false ;
  49 +
  50 + /**
  51 + * total tick times
  52 + */
  53 + private AtomicInteger tick = new AtomicInteger() ;
  54 +
48 private Lock lock = new ReentrantLock(); 55 private Lock lock = new ReentrantLock();
49 private Condition condition = lock.newCondition(); 56 private Condition condition = lock.newCondition();
50 57
@@ -112,10 +119,15 @@ public final class RingBufferWheel { @@ -112,10 +119,15 @@ public final class RingBufferWheel {
112 * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop} 119 * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop}
113 */ 120 */
114 public void start() { 121 public void start() {
  122 +
  123 + if (!start){
115 logger.info("delay task is starting"); 124 logger.info("delay task is starting");
116 Thread job = new Thread(new TriggerJob()); 125 Thread job = new Thread(new TriggerJob());
117 job.setName("consumer RingBuffer thread"); 126 job.setName("consumer RingBuffer thread");
118 job.start(); 127 job.start();
  128 + start = true ;
  129 + }
  130 +
119 } 131 }
120 132
121 /** 133 /**
@@ -211,6 +223,7 @@ public final class RingBufferWheel { @@ -211,6 +223,7 @@ public final class RingBufferWheel {
211 223
212 private int mod(int target, int mod) { 224 private int mod(int target, int mod) {
213 // equals target % mod 225 // equals target % mod
  226 + target = target + tick.get() ;
214 return target & (mod - 1); 227 return target & (mod - 1);
215 } 228 }
216 229
@@ -267,6 +280,8 @@ public final class RingBufferWheel { @@ -267,6 +280,8 @@ public final class RingBufferWheel {
267 index = 0; 280 index = 0;
268 } 281 }
269 282
  283 + //Total tick number of records
  284 + tick.incrementAndGet();
270 try { 285 try {
271 TimeUnit.SECONDS.sleep(1); 286 TimeUnit.SECONDS.sleep(1);
272 } catch (InterruptedException e) { 287 } catch (InterruptedException e) {
@@ -12,7 +12,7 @@ public class RingBufferWheelTest { @@ -12,7 +12,7 @@ public class RingBufferWheelTest {
12 private static Logger logger = LoggerFactory.getLogger(RingBufferWheelTest.class) ; 12 private static Logger logger = LoggerFactory.getLogger(RingBufferWheelTest.class) ;
13 13
14 public static void main(String[] args) throws InterruptedException { 14 public static void main(String[] args) throws InterruptedException {
15 - test5(); 15 + test6();
16 16
17 return; 17 return;
18 } 18 }
@@ -124,6 +124,31 @@ public class RingBufferWheelTest { @@ -124,6 +124,31 @@ public class RingBufferWheelTest {
124 124
125 125
126 } 126 }
  127 + private static void test6() throws InterruptedException {
  128 + ExecutorService executorService = Executors.newFixedThreadPool(2) ;
  129 +
  130 + RingBufferWheel wheel = new RingBufferWheel(executorService,512) ;
  131 +
  132 + for (int i = 0; i < 10; i++) {
  133 + RingBufferWheel.Task task = new Job(i) ;
  134 + task.setKey(i);
  135 + wheel.addTask(task);
  136 + }
  137 +
  138 + wheel.start();
  139 +
  140 + TimeUnit.SECONDS.sleep(10);
  141 + RingBufferWheel.Task task = new Job(15) ;
  142 + task.setKey(15);
  143 + wheel.addTask(task);
  144 + wheel.start();
  145 +
  146 + logger.info("task size={}",wheel.taskSize());
  147 +
  148 + wheel.stop(false);
  149 +
  150 +
  151 + }
127 152
128 private static class Job extends RingBufferWheel.Task{ 153 private static class Job extends RingBufferWheel.Task{
129 154