作者 crossoverJie

:sparkles: Introducing new features.ring buffer delay task data struct

@@ -15,6 +15,11 @@ @@ -15,6 +15,11 @@
15 15
16 <dependencies> 16 <dependencies>
17 <dependency> 17 <dependency>
  18 + <groupId>ch.qos.logback</groupId>
  19 + <artifactId>logback-classic</artifactId>
  20 + </dependency>
  21 +
  22 + <dependency>
18 <groupId>com.google.protobuf</groupId> 23 <groupId>com.google.protobuf</groupId>
19 <artifactId>protobuf-java</artifactId> 24 <artifactId>protobuf-java</artifactId>
20 </dependency> 25 </dependency>
  1 +package com.crossoverjie.cim.common.data.construct;
  2 +
  3 +import org.slf4j.Logger;
  4 +import org.slf4j.LoggerFactory;
  5 +
  6 +import java.util.HashSet;
  7 +import java.util.Set;
  8 +import java.util.concurrent.ExecutorService;
  9 +import java.util.concurrent.TimeUnit;
  10 +import java.util.concurrent.atomic.AtomicInteger;
  11 +import java.util.concurrent.locks.Condition;
  12 +import java.util.concurrent.locks.Lock;
  13 +import java.util.concurrent.locks.ReentrantLock;
  14 +
  15 +/**
  16 + * Function:Ring Queue, it can be used to delay task.
  17 + *
  18 + * @author crossoverJie
  19 + * Date: 2019-09-20 14:46
  20 + * @since JDK 1.8
  21 + */
  22 +public final class RingBufferWheel {
  23 +
  24 + private Logger logger = LoggerFactory.getLogger(RingBufferWheel.class);
  25 +
  26 +
  27 + private static final int STATIC_RING_SIZE = 64;
  28 +
  29 + private Object[] ringBuffer;
  30 +
  31 + private int bufferSize;
  32 +
  33 + private ExecutorService executorService;
  34 +
  35 + private AtomicInteger taskSize = new AtomicInteger();
  36 +
  37 + private volatile boolean stop = false;
  38 +
  39 + private Lock lock = new ReentrantLock();
  40 + private Condition condition = lock.newCondition();
  41 +
  42 + public RingBufferWheel(ExecutorService executorService) {
  43 + this.executorService = executorService;
  44 + bufferSize = STATIC_RING_SIZE;
  45 + ringBuffer = new Object[bufferSize];
  46 + }
  47 +
  48 +
  49 + public void addTask(Task task) {
  50 + int key = task.getKey();
  51 + Set<Task> tasks = get(key);
  52 +
  53 + if (tasks != null) {
  54 + int cycleNum = cycleNum(key, bufferSize);
  55 + task.setCycleNum(cycleNum);
  56 + tasks.add(task);
  57 + } else {
  58 + int index = mod(key, bufferSize);
  59 + int cycleNum = cycleNum(key, bufferSize);
  60 + task.setCycleNum(index);
  61 + task.setCycleNum(cycleNum);
  62 + Set<Task> sets = new HashSet<>();
  63 + sets.add(task);
  64 + put(key, sets);
  65 + }
  66 +
  67 + taskSize.incrementAndGet();
  68 +
  69 + }
  70 +
  71 + public int taskSize() {
  72 + return taskSize.get();
  73 + }
  74 +
  75 + public void start() {
  76 + logger.info("delay task is starting");
  77 + Thread job = new Thread(new TriggerJob());
  78 + job.setName("consumer RingBuffer thread");
  79 + job.start();
  80 + }
  81 +
  82 + public void stop(boolean force) {
  83 + if (force) {
  84 + logger.info("delay task is forced stop");
  85 + stop = true;
  86 + executorService.shutdownNow();
  87 + } else {
  88 + logger.info("delay task is stopping");
  89 + try {
  90 + lock.lock();
  91 + condition.await();
  92 + stop = true;
  93 + } catch (InterruptedException e) {
  94 + logger.error("InterruptedException", e);
  95 + } finally {
  96 + lock.unlock();
  97 + }
  98 + executorService.shutdown();
  99 + }
  100 +
  101 +
  102 + }
  103 +
  104 +
  105 + private Set<Task> get(int key) {
  106 + int index = mod(key, bufferSize);
  107 + return (Set<Task>) ringBuffer[index];
  108 + }
  109 +
  110 + private void put(int key, Set<Task> tasks) {
  111 + int index = mod(key, bufferSize);
  112 + ringBuffer[index] = tasks;
  113 + }
  114 +
  115 + private Set<Task> remove(int key) {
  116 + Set<Task> tempTask = new HashSet<>();
  117 + Set<Task> result = new HashSet<>();
  118 +
  119 + Set<Task> tasks = (Set<Task>) ringBuffer[key];
  120 + if (tasks == null) {
  121 + return result;
  122 + }
  123 +
  124 + for (Task task : tasks) {
  125 + if (task.getCycleNum() == 0) {
  126 + result.add(task);
  127 +
  128 + size2Notify();
  129 + } else {
  130 + // decrement 1 cycle number and update origin data
  131 + task.setCycleNum(task.getCycleNum() - 1);
  132 + tempTask.add(task);
  133 + }
  134 + }
  135 +
  136 + //update origin data
  137 + ringBuffer[key] = tempTask;
  138 +
  139 + return result;
  140 + }
  141 +
  142 + private void size2Notify() {
  143 + lock.lock();
  144 + int size = taskSize.decrementAndGet();
  145 + if (size == 0) {
  146 + condition.signal();
  147 + }
  148 + lock.unlock();
  149 + }
  150 +
  151 + private int mod(int target, int mod) {
  152 + // equals target % mod
  153 + return target & (mod - 1);
  154 + }
  155 +
  156 + private int cycleNum(int target, int mod) {
  157 + //equals target/mod
  158 + return target >> Integer.bitCount(mod - 1);
  159 + }
  160 +
  161 + public abstract static class Task extends Thread {
  162 +
  163 +
  164 + private int cycleNum;
  165 +
  166 + private int key;
  167 +
  168 + @Override
  169 + public void run() {
  170 + }
  171 +
  172 + public int getKey() {
  173 + return key;
  174 + }
  175 +
  176 + public void setKey(int key) {
  177 + this.key = key;
  178 + }
  179 +
  180 + public int getCycleNum() {
  181 + return cycleNum;
  182 + }
  183 +
  184 + private void setCycleNum(int cycleNum) {
  185 + this.cycleNum = cycleNum;
  186 + }
  187 + }
  188 +
  189 +
  190 + private class TriggerJob implements Runnable {
  191 +
  192 + @Override
  193 + public void run() {
  194 + int index = 0;
  195 + while (!stop) {
  196 +
  197 + Set<Task> tasks = remove(index);
  198 + for (Task task : tasks) {
  199 + executorService.submit(task);
  200 + }
  201 +
  202 + if (++index > bufferSize - 1) {
  203 + index = 0;
  204 + }
  205 +
  206 + try {
  207 + TimeUnit.SECONDS.sleep(1);
  208 + } catch (InterruptedException e) {
  209 + logger.error("InterruptedException", e);
  210 + }
  211 + }
  212 +
  213 + logger.info("delay task is stopped");
  214 + }
  215 + }
  216 +}
  1 +package com.crossoverjie.cim.common;
  2 +
  3 +import org.junit.Test;
  4 +
  5 +import java.util.concurrent.TimeUnit;
  6 +
  7 +/**
  8 + * Function:
  9 + *
  10 + * @author crossoverJie
  11 + * Date: 2019-09-23 14:21
  12 + * @since JDK 1.8
  13 + */
  14 +public class CommonTest {
  15 +
  16 + @Test
  17 + public void test() throws InterruptedException {
  18 +
  19 +
  20 + System.out.println(is2(9));
  21 +
  22 + System.out.println(Integer.bitCount(64-1));
  23 +
  24 + int target = 1569312600 ;
  25 + int mod = 64 ;
  26 + System.out.println(target % mod);
  27 +
  28 + System.out.println(mod(target,mod));
  29 + System.out.println("============");
  30 +
  31 + System.out.println(cycleNum(256,64)) ;
  32 +
  33 + cycle();
  34 + }
  35 +
  36 +
  37 + private int mod(int target, int mod){
  38 + // equals target % mod
  39 + return target & (mod -1) ;
  40 + }
  41 +
  42 + private int cycleNum(int target,int mod){
  43 + //equals target/mod
  44 + return target >> Integer.bitCount(mod-1) ;
  45 + }
  46 +
  47 + private boolean is2(int target){
  48 + if (target < 0){
  49 + return false ;
  50 + }
  51 +
  52 + int value = target & (target - 1) ;
  53 + if (value != 0){
  54 + return false ;
  55 + }
  56 +
  57 + return true ;
  58 + }
  59 +
  60 +
  61 + private void cycle() throws InterruptedException {
  62 + int index = 0 ;
  63 + while (true){
  64 + System.out.println("=======" + index);
  65 +
  66 + if (++index > 63){
  67 + index = 0 ;
  68 + }
  69 + TimeUnit.MILLISECONDS.sleep(200);
  70 + }
  71 + }
  72 +
  73 +}
  1 +package com.crossoverjie.cim.common.data.construct;
  2 +
  3 +import org.junit.Test;
  4 +import org.slf4j.Logger;
  5 +import org.slf4j.LoggerFactory;
  6 +
  7 +import java.util.concurrent.ExecutorService;
  8 +import java.util.concurrent.Executors;
  9 +import java.util.concurrent.TimeUnit;
  10 +
  11 +public class RingBufferWheelTest {
  12 +
  13 + private static Logger logger = LoggerFactory.getLogger(RingBufferWheelTest.class) ;
  14 +
  15 + public static void main(String[] args) throws InterruptedException {
  16 + test4();
  17 +
  18 + return;
  19 + }
  20 +
  21 + private static void test1() throws InterruptedException {
  22 + ExecutorService executorService = Executors.newFixedThreadPool(2) ;
  23 +
  24 + Task task = new Task() ;
  25 + task.setKey(10);
  26 + RingBufferWheel wheel = new RingBufferWheel(executorService) ;
  27 + wheel.addTask(task) ;
  28 +
  29 + task = new Task() ;
  30 + task.setKey(74);
  31 + wheel.addTask(task) ;
  32 +
  33 + wheel.start();
  34 +
  35 + while (true){
  36 + logger.info("task size={}" , wheel.taskSize());
  37 + TimeUnit.SECONDS.sleep(1);
  38 + }
  39 + }
  40 + private static void test2() throws InterruptedException {
  41 + ExecutorService executorService = Executors.newFixedThreadPool(2) ;
  42 +
  43 + Task task = new Task() ;
  44 + task.setKey(10);
  45 + RingBufferWheel wheel = new RingBufferWheel(executorService) ;
  46 + wheel.addTask(task) ;
  47 +
  48 + task = new Task() ;
  49 + task.setKey(74);
  50 + wheel.addTask(task) ;
  51 +
  52 + wheel.start();
  53 +
  54 +// new Thread(() -> {
  55 +// while (true){
  56 +// logger.info("task size={}" , wheel.taskSize());
  57 +// try {
  58 +// TimeUnit.SECONDS.sleep(1);
  59 +// } catch (InterruptedException e) {
  60 +// e.printStackTrace();
  61 +// }
  62 +// }
  63 +// }).start();
  64 +
  65 + TimeUnit.SECONDS.sleep(12);
  66 + wheel.stop(true);
  67 +
  68 +
  69 + }
  70 + private static void test3() throws InterruptedException {
  71 + ExecutorService executorService = Executors.newFixedThreadPool(2) ;
  72 +
  73 + Task task = new Task() ;
  74 + task.setKey(10);
  75 + RingBufferWheel wheel = new RingBufferWheel(executorService) ;
  76 + wheel.addTask(task) ;
  77 +
  78 + task = new Task() ;
  79 + task.setKey(74);
  80 + wheel.addTask(task) ;
  81 +
  82 + wheel.start();
  83 +
  84 +
  85 + TimeUnit.SECONDS.sleep(2);
  86 + wheel.stop(false);
  87 +
  88 +
  89 + }
  90 + private static void test4() throws InterruptedException {
  91 + ExecutorService executorService = Executors.newFixedThreadPool(2) ;
  92 +
  93 + RingBufferWheel wheel = new RingBufferWheel(executorService) ;
  94 +
  95 + for (int i = 0; i < 65; i++) {
  96 + Job task = new Job(i) ;
  97 + task.setKey(i);
  98 + wheel.addTask(task);
  99 + }
  100 +
  101 + wheel.start();
  102 +
  103 + logger.info("task size={}",wheel.taskSize());
  104 +
  105 + wheel.stop(false);
  106 +
  107 +
  108 + }
  109 +
  110 +
  111 + private static class Task extends RingBufferWheel.Task{
  112 +
  113 + @Override
  114 + public void run() {
  115 + logger.info("================");
  116 + }
  117 + }
  118 +
  119 + private static class Job extends RingBufferWheel.Task{
  120 +
  121 + private int num ;
  122 +
  123 + public Job(int num) {
  124 + this.num = num;
  125 + }
  126 +
  127 + @Override
  128 + public void run() {
  129 + logger.info("number={}" , num);
  130 + }
  131 + }
  132 +}