作者 crossoverJie
提交者 GitHub

Merge pull request #33 from crossoverJie/cim-1.0.4

cim 1.0.4
正在显示 19 个修改的文件 包含 845 行增加8 行删除
1 package com.crossoverjie.cim.client.handle; 1 package com.crossoverjie.cim.client.handle;
2 2
  3 +import com.crossoverjie.cim.client.service.ShutDownMsg;
3 import com.crossoverjie.cim.client.thread.ReConnectJob; 4 import com.crossoverjie.cim.client.thread.ReConnectJob;
4 import com.crossoverjie.cim.client.util.SpringBeanFactory; 5 import com.crossoverjie.cim.client.util.SpringBeanFactory;
5 import com.crossoverjie.cim.common.constant.Constants; 6 import com.crossoverjie.cim.common.constant.Constants;
@@ -37,6 +38,8 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt @@ -37,6 +38,8 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
37 38
38 private ScheduledExecutorService scheduledExecutorService ; 39 private ScheduledExecutorService scheduledExecutorService ;
39 40
  41 + private ShutDownMsg shutDownMsg ;
  42 +
40 43
41 @Override 44 @Override
42 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 45 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
@@ -71,11 +74,20 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt @@ -71,11 +74,20 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
71 74
72 @Override 75 @Override
73 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 76 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
74 - LOGGER.info("客户端断开了,重新连接!"); 77 +
  78 + if (shutDownMsg == null){
  79 + shutDownMsg = SpringBeanFactory.getBean(ShutDownMsg.class) ;
  80 + }
  81 +
  82 + //用户主动退出,不执行重连逻辑
  83 + if (shutDownMsg.checkStatus()){
  84 + return;
  85 + }
75 86
76 if (scheduledExecutorService == null){ 87 if (scheduledExecutorService == null){
77 scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ; 88 scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ;
78 } 89 }
  90 + LOGGER.info("客户端断开了,重新连接!");
79 // TODO: 2019-01-22 后期可以改为不用定时任务,连上后就关闭任务 节省性能。 91 // TODO: 2019-01-22 后期可以改为不用定时任务,连上后就关闭任务 节省性能。
80 scheduledExecutorService.scheduleAtFixedRate(new ReConnectJob(ctx),0,10, TimeUnit.SECONDS) ; 92 scheduledExecutorService.scheduleAtFixedRate(new ReConnectJob(ctx),0,10, TimeUnit.SECONDS) ;
81 } 93 }
  1 +package com.crossoverjie.cim.client.service;
  2 +
  3 +import org.springframework.stereotype.Component;
  4 +
  5 +/**
  6 + * Function:
  7 + *
  8 + * @author crossoverJie
  9 + * Date: 2019-02-27 16:17
  10 + * @since JDK 1.8
  11 + */
  12 +@Component
  13 +public class ShutDownMsg {
  14 + private boolean isCommand ;
  15 +
  16 + /**
  17 + * 置为用户主动退出状态
  18 + */
  19 + public void shutdown(){
  20 + isCommand = true ;
  21 + }
  22 +
  23 + public boolean checkStatus(){
  24 + return isCommand ;
  25 + }
  26 +}
@@ -25,6 +25,9 @@ public class QueryHistoryCommand implements InnerCommand { @@ -25,6 +25,9 @@ public class QueryHistoryCommand implements InnerCommand {
25 @Override 25 @Override
26 public void process(String msg) { 26 public void process(String msg) {
27 String[] split = msg.split(" "); 27 String[] split = msg.split(" ");
  28 + if (split.length < 2){
  29 + return;
  30 + }
28 String res = msgLogger.query(split[1]); 31 String res = msgLogger.query(split[1]);
29 System.out.println(res); 32 System.out.println(res);
30 } 33 }
@@ -4,6 +4,7 @@ import com.crossoverjie.cim.client.client.CIMClient; @@ -4,6 +4,7 @@ import com.crossoverjie.cim.client.client.CIMClient;
4 import com.crossoverjie.cim.client.service.InnerCommand; 4 import com.crossoverjie.cim.client.service.InnerCommand;
5 import com.crossoverjie.cim.client.service.MsgLogger; 5 import com.crossoverjie.cim.client.service.MsgLogger;
6 import com.crossoverjie.cim.client.service.RouteRequest; 6 import com.crossoverjie.cim.client.service.RouteRequest;
  7 +import com.crossoverjie.cim.client.service.ShutDownMsg;
7 import org.slf4j.Logger; 8 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory; 9 import org.slf4j.LoggerFactory;
9 import org.springframework.beans.factory.annotation.Autowired; 10 import org.springframework.beans.factory.annotation.Autowired;
@@ -36,9 +37,14 @@ public class ShutDownCommand implements InnerCommand { @@ -36,9 +37,14 @@ public class ShutDownCommand implements InnerCommand {
36 @Resource(name = "callBackThreadPool") 37 @Resource(name = "callBackThreadPool")
37 private ThreadPoolExecutor executor; 38 private ThreadPoolExecutor executor;
38 39
  40 +
  41 + @Autowired
  42 + private ShutDownMsg shutDownMsg ;
  43 +
39 @Override 44 @Override
40 public void process(String msg) { 45 public void process(String msg) {
41 LOGGER.info("系统关闭中。。。。"); 46 LOGGER.info("系统关闭中。。。。");
  47 + shutDownMsg.shutdown();
42 routeRequest.offLine(); 48 routeRequest.offLine();
43 msgLogger.stop(); 49 msgLogger.stop();
44 executor.shutdown(); 50 executor.shutdown();
@@ -45,8 +45,8 @@ cim.clear.route.request.url=http://45.78.28.220:8083/offLine @@ -45,8 +45,8 @@ cim.clear.route.request.url=http://45.78.28.220:8083/offLine
45 #cim.clear.route.request.url=http://localhost:8083/offLine 45 #cim.clear.route.request.url=http://localhost:8083/offLine
46 46
47 # 客户端唯一ID 47 # 客户端唯一ID
48 -cim.user.id=1545574841528  
49 -cim.user.userName=zhangsan 48 +cim.user.id=1551267098213
  49 +cim.user.userName=test3
50 50
51 # 回调线程队列大小 51 # 回调线程队列大小
52 cim.callback.thread.queue.size = 2 52 cim.callback.thread.queue.size = 2
  1 +package com.crossoverjie.cim.common.data.construct;
  2 +
  3 +import java.util.Arrays;
  4 +import java.util.Comparator;
  5 +
  6 +/**
  7 + * Function:根据 key 排序的 Map
  8 + *
  9 + * @author crossoverJie
  10 + * Date: 2019-02-25 18:17
  11 + * @since JDK 1.8
  12 + */
  13 +public class SortArrayMap {
  14 +
  15 + /**
  16 + * 核心数组
  17 + */
  18 + private Node[] buckets;
  19 +
  20 + private static final int DEFAULT_SIZE = 10;
  21 +
  22 + /**
  23 + * 数组大小
  24 + */
  25 + private int size = 0;
  26 +
  27 + public SortArrayMap() {
  28 + buckets = new Node[DEFAULT_SIZE];
  29 + }
  30 +
  31 + /**
  32 + * 写入数据
  33 + * @param key
  34 + * @param value
  35 + */
  36 + public void add(Long key, String value) {
  37 + checkSize(size + 1);
  38 + Node node = new Node(key, value);
  39 + buckets[size++] = node;
  40 + }
  41 +
  42 + /**
  43 + * 校验是否需要扩容
  44 + * @param size
  45 + */
  46 + private void checkSize(int size) {
  47 + if (size >= buckets.length) {
  48 + //扩容自身的 3/2
  49 + int oldLen = buckets.length;
  50 + int newLen = oldLen + (oldLen >> 1);
  51 + buckets = Arrays.copyOf(buckets, newLen);
  52 + }
  53 + }
  54 +
  55 + /**
  56 + * 顺时针取出数据
  57 + * @param key
  58 + * @return
  59 + */
  60 + public String firstNodeValue(long key) {
  61 + if (size == 0){
  62 + return null ;
  63 + }
  64 + for (Node bucket : buckets) {
  65 + if (bucket == null){
  66 + continue;
  67 + }
  68 + if (bucket.key >= key) {
  69 + return bucket.value;
  70 + }
  71 + }
  72 +
  73 + return buckets[0].value;
  74 +
  75 + }
  76 +
  77 + /**
  78 + * 排序
  79 + */
  80 + public void sort() {
  81 + Arrays.sort(buckets, 0, size, new Comparator<Node>() {
  82 + @Override
  83 + public int compare(Node o1, Node o2) {
  84 + if (o1.key > o2.key) {
  85 + return 1;
  86 + } else {
  87 + return -1;
  88 + }
  89 + }
  90 + });
  91 + }
  92 +
  93 + public void print() {
  94 + for (Node bucket : buckets) {
  95 + if (bucket == null) {
  96 + continue;
  97 + }
  98 + System.out.println(bucket.toString());
  99 + }
  100 + }
  101 +
  102 + public int size() {
  103 + return size;
  104 + }
  105 +
  106 + /**
  107 + * 数据节点
  108 + */
  109 + private class Node {
  110 + public Long key;
  111 + public String value;
  112 +
  113 + public Node(Long key, String value) {
  114 + this.key = key;
  115 + this.value = value;
  116 + }
  117 +
  118 + @Override
  119 + public String toString() {
  120 + return "Node{" +
  121 + "key=" + key +
  122 + ", value='" + value + '\'' +
  123 + '}';
  124 + }
  125 +
  126 + }
  127 +
  128 +}
  1 +package com.crossoverjie.cim.common.route.algorithm;
  2 +
  3 +import java.util.List;
  4 +
  5 +/**
  6 + * Function:
  7 + *
  8 + * @author crossoverJie
  9 + * Date: 2019-02-27 00:31
  10 + * @since JDK 1.8
  11 + */
  12 +public interface RouteHandle {
  13 +
  14 + /**
  15 + * 再一批服务器里进行路由
  16 + * @param values
  17 + * @param key
  18 + * @return
  19 + */
  20 + String routeServer(List<String> values,String key) ;
  21 +}
  1 +package com.crossoverjie.cim.common.route.algorithm.consistenthash;
  2 +
  3 +import java.io.UnsupportedEncodingException;
  4 +import java.security.MessageDigest;
  5 +import java.security.NoSuchAlgorithmException;
  6 +import java.util.List;
  7 +
  8 +/**
  9 + * Function:一致性 hash 算法抽象类
  10 + *
  11 + * @author crossoverJie
  12 + * Date: 2019-02-27 00:35
  13 + * @since JDK 1.8
  14 + */
  15 +public abstract class AbstractConsistentHash {
  16 +
  17 + /**
  18 + * 新增节点
  19 + * @param key
  20 + * @param value
  21 + */
  22 + protected abstract void add(long key,String value);
  23 +
  24 + /**
  25 + * 排序节点,数据结构自身支持排序可以不用重写
  26 + */
  27 + protected void sort(){}
  28 +
  29 + /**
  30 + * 根据当前的 key 通过一致性 hash 算法的规则取出一个节点
  31 + * @param value
  32 + * @return
  33 + */
  34 + protected abstract String getFirstNodeValue(String value);
  35 +
  36 + /**
  37 + * 传入节点列表以及客户端信息获取一个服务节点
  38 + * @param values
  39 + * @param key
  40 + * @return
  41 + */
  42 + public String process(List<String> values,String key){
  43 +
  44 + for (String value : values) {
  45 + add(hash(value), value);
  46 + }
  47 + sort();
  48 +
  49 + return getFirstNodeValue(key) ;
  50 + }
  51 +
  52 + /**
  53 + * hash 运算
  54 + * @param value
  55 + * @return
  56 + */
  57 + public Long hash(String value){
  58 + MessageDigest md5;
  59 + try {
  60 + md5 = MessageDigest.getInstance("MD5");
  61 + } catch (NoSuchAlgorithmException e) {
  62 + throw new RuntimeException("MD5 not supported", e);
  63 + }
  64 + md5.reset();
  65 + byte[] keyBytes = null;
  66 + try {
  67 + keyBytes = value.getBytes("UTF-8");
  68 + } catch (UnsupportedEncodingException e) {
  69 + throw new RuntimeException("Unknown string :" + value, e);
  70 + }
  71 +
  72 + md5.update(keyBytes);
  73 + byte[] digest = md5.digest();
  74 +
  75 + // hash code, Truncate to 32-bits
  76 + long hashCode = ((long) (digest[3] & 0xFF) << 24)
  77 + | ((long) (digest[2] & 0xFF) << 16)
  78 + | ((long) (digest[1] & 0xFF) << 8)
  79 + | (digest[0] & 0xFF);
  80 +
  81 + long truncateHashCode = hashCode & 0xffffffffL;
  82 + return truncateHashCode;
  83 + }
  84 +}
  1 +package com.crossoverjie.cim.common.route.algorithm.consistenthash;
  2 +
  3 +import com.crossoverjie.cim.common.route.algorithm.RouteHandle;
  4 +
  5 +import java.util.List;
  6 +
  7 +/**
  8 + * Function:
  9 + *
  10 + * @author crossoverJie
  11 + * Date: 2019-02-27 00:33
  12 + * @since JDK 1.8
  13 + */
  14 +public class ConsistentHashHandle implements RouteHandle {
  15 + private AbstractConsistentHash hash ;
  16 +
  17 + public void setHash(AbstractConsistentHash hash) {
  18 + this.hash = hash;
  19 + }
  20 +
  21 + @Override
  22 + public String routeServer(List<String> values, String key) {
  23 + return hash.process(values, key);
  24 + }
  25 +}
  1 +package com.crossoverjie.cim.common.route.algorithm.consistenthash;
  2 +
  3 +import com.crossoverjie.cim.common.data.construct.SortArrayMap;
  4 +
  5 +/**
  6 + * Function:自定义排序 Map 实现
  7 + *
  8 + * @author crossoverJie
  9 + * Date: 2019-02-27 00:38
  10 + * @since JDK 1.8
  11 + */
  12 +public class SortArrayMapConsistentHash extends AbstractConsistentHash {
  13 +
  14 + private SortArrayMap sortArrayMap = new SortArrayMap();
  15 +
  16 + /**
  17 + * 虚拟节点数量
  18 + */
  19 + private static final int VIRTUAL_NODE_SIZE = 2 ;
  20 +
  21 + @Override
  22 + public void add(long key, String value) {
  23 + for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
  24 + Long hash = super.hash("vir" + key + i);
  25 + sortArrayMap.add(hash,value);
  26 + }
  27 + sortArrayMap.add(key, value);
  28 + }
  29 +
  30 + @Override
  31 + public void sort() {
  32 + sortArrayMap.sort();
  33 + }
  34 +
  35 + @Override
  36 + public String getFirstNodeValue(String value) {
  37 + long hash = super.hash(value);
  38 + System.out.println("value=" + value + " hash = " + hash);
  39 + return sortArrayMap.firstNodeValue(hash);
  40 + }
  41 +
  42 +}
  1 +package com.crossoverjie.cim.common.route.algorithm.consistenthash;
  2 +
  3 +import java.util.SortedMap;
  4 +import java.util.TreeMap;
  5 +
  6 +/**
  7 + * Function:TreeMap 实现
  8 + *
  9 + * @author crossoverJie
  10 + * Date: 2019-02-27 01:16
  11 + * @since JDK 1.8
  12 + */
  13 +public class TreeMapConsistentHash extends AbstractConsistentHash {
  14 + private TreeMap<Long,String> treeMap = new TreeMap<Long, String>() ;
  15 +
  16 + /**
  17 + * 虚拟节点数量
  18 + */
  19 + private static final int VIRTUAL_NODE_SIZE = 2 ;
  20 +
  21 + @Override
  22 + public void add(long key, String value) {
  23 + for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
  24 + Long hash = super.hash("vir" + key + i);
  25 + treeMap.put(hash,value);
  26 + }
  27 + treeMap.put(key, value);
  28 + }
  29 +
  30 + @Override
  31 + public String getFirstNodeValue(String value) {
  32 + long hash = super.hash(value);
  33 + System.out.println("value=" + value + " hash = " + hash);
  34 + SortedMap<Long, String> last = treeMap.tailMap(hash);
  35 + if (!last.isEmpty()) {
  36 + return last.get(last.firstKey());
  37 + }
  38 + return treeMap.firstEntry().getValue();
  39 + }
  40 +}
  1 +package com.crossoverjie.cim.common.route.algorithm.loop;
  2 +
  3 +import com.crossoverjie.cim.common.route.algorithm.RouteHandle;
  4 +
  5 +import java.util.List;
  6 +import java.util.concurrent.atomic.AtomicLong;
  7 +
  8 +/**
  9 + * Function:
  10 + *
  11 + * @author crossoverJie
  12 + * Date: 2019-02-27 15:13
  13 + * @since JDK 1.8
  14 + */
  15 +public class LoopHandle implements RouteHandle {
  16 + private AtomicLong index = new AtomicLong();
  17 +
  18 + @Override
  19 + public String routeServer(List<String> values,String key) {
  20 + if (values.size() == 0) {
  21 + throw new RuntimeException("CIM 服务器可用服务列表为空");
  22 + }
  23 + Long position = index.incrementAndGet() % values.size();
  24 + if (position < 0) {
  25 + position = 0L;
  26 + }
  27 +
  28 + return values.get(position.intValue());
  29 + }
  30 +}
  1 +package com.crossoverjie.cim.common.data.construct;
  2 +
  3 +import org.junit.Test;
  4 +
  5 +import java.util.SortedMap;
  6 +import java.util.TreeMap;
  7 +
  8 +public class SortArrayMapTest {
  9 +
  10 + @Test
  11 + public void ad() {
  12 + SortArrayMap map = new SortArrayMap() ;
  13 + for (int i = 0; i < 9; i++) {
  14 + map.add(Long.valueOf(i) ,"127.0.0." + i);
  15 + }
  16 + map.print();
  17 + System.out.println(map.size());
  18 + }
  19 +
  20 + @Test
  21 + public void add() {
  22 + SortArrayMap map = new SortArrayMap() ;
  23 + for (int i = 0; i < 10; i++) {
  24 + map.add(Long.valueOf(i) ,"127.0.0." + i);
  25 + }
  26 + map.print();
  27 + System.out.println(map.size());
  28 + }
  29 +
  30 + @Test
  31 + public void add2() {
  32 + SortArrayMap map = new SortArrayMap() ;
  33 + for (int i = 0; i < 20; i++) {
  34 + map.add(Long.valueOf(i) ,"127.0.0." + i);
  35 + }
  36 + map.sort();
  37 + map.print();
  38 + System.out.println(map.size());
  39 + }
  40 +
  41 + @Test
  42 + public void add3() {
  43 + SortArrayMap map = new SortArrayMap() ;
  44 +
  45 + map.add(100L,"127.0.0.100");
  46 + map.add(10L,"127.0.0.10");
  47 + map.add(8L,"127.0.0.8");
  48 + map.add(1000L,"127.0.0.1000");
  49 +
  50 + map.print();
  51 + System.out.println(map.size());
  52 + }
  53 +
  54 + @Test
  55 + public void firstNode() {
  56 + SortArrayMap map = new SortArrayMap() ;
  57 +
  58 + map.add(100L,"127.0.0.100");
  59 + map.add(10L,"127.0.0.10");
  60 + map.add(8L,"127.0.0.8");
  61 + map.add(1000L,"127.0.0.1000");
  62 +
  63 + map.sort();
  64 + map.print();
  65 + String value = map.firstNodeValue(101);
  66 + System.out.println(value);
  67 + }
  68 + @Test
  69 + public void firstNode2() {
  70 + SortArrayMap map = new SortArrayMap() ;
  71 +
  72 + map.add(100L,"127.0.0.100");
  73 + map.add(10L,"127.0.0.10");
  74 + map.add(8L,"127.0.0.8");
  75 + map.add(1000L,"127.0.0.1000");
  76 +
  77 + map.sort();
  78 + map.print();
  79 + String value = map.firstNodeValue(1);
  80 + System.out.println(value);
  81 + }
  82 + @Test
  83 + public void firstNode3() {
  84 + SortArrayMap map = new SortArrayMap() ;
  85 +
  86 + map.add(100L,"127.0.0.100");
  87 + map.add(10L,"127.0.0.10");
  88 + map.add(8L,"127.0.0.8");
  89 + map.add(1000L,"127.0.0.1000");
  90 +
  91 + map.sort();
  92 + map.print();
  93 + String value = map.firstNodeValue(1001);
  94 + System.out.println(value);
  95 + }
  96 + @Test
  97 + public void firstNode4() {
  98 + SortArrayMap map = new SortArrayMap() ;
  99 +
  100 + map.add(100L,"127.0.0.100");
  101 + map.add(10L,"127.0.0.10");
  102 + map.add(8L,"127.0.0.8");
  103 + map.add(1000L,"127.0.0.1000");
  104 +
  105 + map.sort();
  106 + map.print();
  107 + String value = map.firstNodeValue(9);
  108 + System.out.println(value);
  109 + }
  110 +
  111 + @Test
  112 + public void add4() {
  113 + SortArrayMap map = new SortArrayMap() ;
  114 +
  115 + map.add(100L,"127.0.0.100");
  116 + map.add(10L,"127.0.0.10");
  117 + map.add(8L,"127.0.0.8");
  118 + map.add(1000L,"127.0.0.1000");
  119 +
  120 + map.sort();
  121 + map.print();
  122 + System.out.println(map.size());
  123 + }
  124 +
  125 + int count = 1000000 ;
  126 + @Test
  127 + public void add5() {
  128 + SortArrayMap map = new SortArrayMap() ;
  129 +
  130 +
  131 + long star = System.currentTimeMillis() ;
  132 + for (int i = 0; i < count; i++) {
  133 + double d = Math.random();
  134 + int ran = (int)(d*100);
  135 + map.add(Long.valueOf(i + ran) ,"127.0.0." + i);
  136 + }
  137 + map.sort();
  138 + long end = System.currentTimeMillis() ;
  139 + System.out.println("排序耗时 " + (end -star));
  140 + System.out.println(map.size());
  141 +
  142 +
  143 +
  144 + }
  145 +
  146 + @Test
  147 + public void add6(){
  148 +
  149 + SortArrayMap map = new SortArrayMap() ;
  150 + long star = System.currentTimeMillis() ;
  151 + for (int i = 0; i < count; i++) {
  152 + double d = Math.random();
  153 + int ran = (int)(d*100);
  154 + map.add(Long.valueOf(i + ran) ,"127.0.0." + i);
  155 + }
  156 + long end = System.currentTimeMillis() ;
  157 + System.out.println("不排耗时 " + (end -star));
  158 + System.out.println(map.size());
  159 + }
  160 + @Test
  161 + public void add7(){
  162 +
  163 + TreeMap<Long,String> treeMap = new TreeMap<Long, String>() ;
  164 + long star = System.currentTimeMillis() ;
  165 + for (int i = 0; i < count; i++) {
  166 + double d = Math.random();
  167 + int ran = (int)(d*100);
  168 + treeMap.put(Long.valueOf(i + ran) ,"127.0.0." + i);
  169 + }
  170 + long end = System.currentTimeMillis() ;
  171 + System.out.println("耗时 " + (end -star));
  172 + System.out.println(treeMap.size());
  173 + }
  174 +
  175 + @Test
  176 + public void add8(){
  177 +
  178 + TreeMap<Long,String> map = new TreeMap<Long, String>() ;
  179 + map.put(100L,"127.0.0.100");
  180 + map.put(10L,"127.0.0.10");
  181 + map.put(8L,"127.0.0.8");
  182 + map.put(1000L,"127.0.0.1000");
  183 +
  184 + SortedMap<Long, String> last = map.tailMap(101L);
  185 + if (!last.isEmpty()) {
  186 + System.out.println(last.get(last.firstKey()));
  187 + }else {
  188 + System.out.println(map.firstEntry().getValue());
  189 + }
  190 + }
  191 +}
  1 +package com.crossoverjie.cim.common.route.algorithm.consistenthash;
  2 +
  3 +import org.junit.Assert;
  4 +import org.junit.Test;
  5 +
  6 +import java.util.ArrayList;
  7 +import java.util.List;
  8 +
  9 +public class SortArrayMapConsistentHashTest {
  10 +
  11 + @Test
  12 + public void getFirstNodeValue() {
  13 + AbstractConsistentHash map = new SortArrayMapConsistentHash() ;
  14 +
  15 + List<String> strings = new ArrayList<String>();
  16 + for (int i = 0; i < 10; i++) {
  17 + strings.add("127.0.0." + i) ;
  18 + }
  19 + String process = map.process(strings,"zhangsan");
  20 + System.out.println(process);
  21 + Assert.assertEquals("127.0.0.2",process);
  22 +
  23 + }
  24 +
  25 + @Test
  26 + public void getFirstNodeValue2() {
  27 + AbstractConsistentHash map = new SortArrayMapConsistentHash() ;
  28 +
  29 + List<String> strings = new ArrayList<String>();
  30 + for (int i = 0; i < 10; i++) {
  31 + strings.add("127.0.0." + i) ;
  32 + }
  33 + String process = map.process(strings,"zhangsan2");
  34 + System.out.println(process);
  35 +
  36 + Assert.assertEquals("127.0.0.3",process);
  37 + }
  38 +
  39 + @Test
  40 + public void getFirstNodeValue3() {
  41 + AbstractConsistentHash map = new SortArrayMapConsistentHash() ;
  42 +
  43 + List<String> strings = new ArrayList<String>();
  44 + for (int i = 0; i < 10; i++) {
  45 + strings.add("127.0.0." + i) ;
  46 + }
  47 + String process = map.process(strings,"1551253899106");
  48 +
  49 + System.out.println(process);
  50 + Assert.assertEquals("127.0.0.6",process);
  51 + }
  52 +
  53 +
  54 + @Test
  55 + public void getFirstNodeValue4() {
  56 + AbstractConsistentHash map = new SortArrayMapConsistentHash() ;
  57 +
  58 + List<String> strings = new ArrayList<String>();
  59 + strings.add("45.78.28.220:9000:8081") ;
  60 + strings.add("45.78.28.220:9100:9081") ;
  61 +
  62 + String process = map.process(strings,"1551253899106");
  63 +
  64 + System.out.println(process);
  65 + Assert.assertEquals("45.78.28.220:9000:8081",process);
  66 + }
  67 + @Test
  68 + public void getFirstNodeValue5() {
  69 + AbstractConsistentHash map = new SortArrayMapConsistentHash() ;
  70 +
  71 + List<String> strings = new ArrayList<String>();
  72 + strings.add("45.78.28.220:9000:8081") ;
  73 + strings.add("45.78.28.220:9100:9081") ;
  74 + strings.add("45.78.28.220:9100:10081") ;
  75 +
  76 + String process = map.process(strings,"1551253899106");
  77 +
  78 + System.out.println(process);
  79 + Assert.assertEquals("45.78.28.220:9000:8081",process);
  80 + }
  81 +
  82 + @Test
  83 + public void getFirstNodeValue6() {
  84 + AbstractConsistentHash map = new SortArrayMapConsistentHash() ;
  85 +
  86 + List<String> strings = new ArrayList<String>();
  87 + strings.add("45.78.28.220:9000:8081") ;
  88 + strings.add("45.78.28.220:9100:9081") ;
  89 + strings.add("45.78.28.220:9100:10081") ;
  90 +
  91 + String process = map.process(strings,"1551253899106");
  92 +
  93 + System.out.println(process);
  94 + Assert.assertEquals("45.78.28.220:9000:8081",process);
  95 + }
  96 + @Test
  97 + public void getFirstNodeValue7() {
  98 + AbstractConsistentHash map = new SortArrayMapConsistentHash() ;
  99 +
  100 + List<String> strings = new ArrayList<String>();
  101 + strings.add("45.78.28.220:9000:8081") ;
  102 + strings.add("45.78.28.220:9100:9081") ;
  103 + strings.add("45.78.28.220:9100:10081") ;
  104 + strings.add("45.78.28.220:9100:00081") ;
  105 +
  106 + String process = map.process(strings,"1551253899106");
  107 +
  108 + System.out.println(process);
  109 + Assert.assertEquals("45.78.28.220:9000:8081",process);
  110 + }
  111 +
  112 +
  113 +
  114 +}
  1 +package com.crossoverjie.cim.common.route.algorithm.consistenthash;
  2 +
  3 +import org.junit.Assert;
  4 +import org.junit.Test;
  5 +
  6 +import java.util.ArrayList;
  7 +import java.util.List;
  8 +
  9 +public class TreeMapConsistentHashTest {
  10 +
  11 + @Test
  12 + public void getFirstNodeValue() {
  13 + AbstractConsistentHash map = new TreeMapConsistentHash() ;
  14 +
  15 + List<String> strings = new ArrayList<String>();
  16 + for (int i = 0; i < 10; i++) {
  17 + strings.add("127.0.0." + i) ;
  18 + }
  19 + String process = map.process(strings,"zhangsan");
  20 + System.out.println(process);
  21 + Assert.assertEquals("127.0.0.2",process);
  22 + }
  23 +
  24 +
  25 +
  26 + @Test
  27 + public void getFirstNodeValue2() {
  28 + AbstractConsistentHash map = new TreeMapConsistentHash() ;
  29 +
  30 + List<String> strings = new ArrayList<String>();
  31 + for (int i = 0; i < 10; i++) {
  32 + strings.add("127.0.0." + i) ;
  33 + }
  34 + String process = map.process(strings,"zhangsan2");
  35 + System.out.println(process);
  36 +
  37 + Assert.assertEquals("127.0.0.3",process);
  38 + }
  39 +
  40 +
  41 + @Test
  42 + public void getFirstNodeValue3() {
  43 + AbstractConsistentHash map = new TreeMapConsistentHash() ;
  44 +
  45 + List<String> strings = new ArrayList<String>();
  46 + for (int i = 0; i < 10; i++) {
  47 + strings.add("127.0.0." + i) ;
  48 + }
  49 + String process = map.process(strings,"1551253899106");
  50 +
  51 + System.out.println(process);
  52 + Assert.assertEquals("127.0.0.6",process);
  53 + }
  54 +}
@@ -26,6 +26,12 @@ public class AppConfiguration { @@ -26,6 +26,12 @@ public class AppConfiguration {
26 @Value("${app.zk.connect.timeout}") 26 @Value("${app.zk.connect.timeout}")
27 private int zkConnectTimeout; 27 private int zkConnectTimeout;
28 28
  29 + @Value("${app.route.way}")
  30 + private String routeWay;
  31 +
  32 + @Value("${app.route.way.consitenthash}")
  33 + private String consistentHashWay;
  34 +
29 public int getZkConnectTimeout() { 35 public int getZkConnectTimeout() {
30 return zkConnectTimeout; 36 return zkConnectTimeout;
31 } 37 }
@@ -54,4 +60,19 @@ public class AppConfiguration { @@ -54,4 +60,19 @@ public class AppConfiguration {
54 this.zkAddr = zkAddr; 60 this.zkAddr = zkAddr;
55 } 61 }
56 62
  63 + public String getRouteWay() {
  64 + return routeWay;
  65 + }
  66 +
  67 + public void setRouteWay(String routeWay) {
  68 + this.routeWay = routeWay;
  69 + }
  70 +
  71 + public String getConsistentHashWay() {
  72 + return consistentHashWay;
  73 + }
  74 +
  75 + public void setConsistentHashWay(String consistentHashWay) {
  76 + this.consistentHashWay = consistentHashWay;
  77 + }
57 } 78 }
1 package com.crossoverjie.cim.route.config; 1 package com.crossoverjie.cim.route.config;
2 2
  3 +import com.crossoverjie.cim.common.route.algorithm.RouteHandle;
  4 +import com.crossoverjie.cim.common.route.algorithm.consistenthash.AbstractConsistentHash;
3 import com.google.common.cache.CacheBuilder; 5 import com.google.common.cache.CacheBuilder;
4 import com.google.common.cache.CacheLoader; 6 import com.google.common.cache.CacheLoader;
5 import com.google.common.cache.LoadingCache; 7 import com.google.common.cache.LoadingCache;
@@ -13,6 +15,7 @@ import org.springframework.data.redis.core.RedisTemplate; @@ -13,6 +15,7 @@ import org.springframework.data.redis.core.RedisTemplate;
13 import org.springframework.data.redis.core.StringRedisTemplate; 15 import org.springframework.data.redis.core.StringRedisTemplate;
14 import org.springframework.data.redis.serializer.StringRedisSerializer; 16 import org.springframework.data.redis.serializer.StringRedisSerializer;
15 17
  18 +import java.lang.reflect.Method;
16 import java.util.concurrent.TimeUnit; 19 import java.util.concurrent.TimeUnit;
17 20
18 /** 21 /**
@@ -26,15 +29,15 @@ import java.util.concurrent.TimeUnit; @@ -26,15 +29,15 @@ import java.util.concurrent.TimeUnit;
26 public class BeanConfig { 29 public class BeanConfig {
27 30
28 @Autowired 31 @Autowired
29 - private AppConfiguration appConfiguration ; 32 + private AppConfiguration appConfiguration;
30 33
31 @Bean 34 @Bean
32 - public ZkClient buildZKClient(){ 35 + public ZkClient buildZKClient() {
33 return new ZkClient(appConfiguration.getZkAddr(), appConfiguration.getZkConnectTimeout()); 36 return new ZkClient(appConfiguration.getZkAddr(), appConfiguration.getZkConnectTimeout());
34 } 37 }
35 38
36 @Bean 39 @Bean
37 - public LoadingCache<String,String> buildCache(){ 40 + public LoadingCache<String, String> buildCache() {
38 return CacheBuilder.newBuilder() 41 return CacheBuilder.newBuilder()
39 .build(new CacheLoader<String, String>() { 42 .build(new CacheLoader<String, String>() {
40 @Override 43 @Override
@@ -47,6 +50,7 @@ public class BeanConfig { @@ -47,6 +50,7 @@ public class BeanConfig {
47 50
48 /** 51 /**
49 * Redis bean 52 * Redis bean
  53 + *
50 * @param factory 54 * @param factory
51 * @return 55 * @return
52 */ 56 */
@@ -62,6 +66,7 @@ public class BeanConfig { @@ -62,6 +66,7 @@ public class BeanConfig {
62 66
63 /** 67 /**
64 * http client 68 * http client
  69 + *
65 * @return okHttp 70 * @return okHttp
66 */ 71 */
67 @Bean 72 @Bean
@@ -69,8 +74,27 @@ public class BeanConfig { @@ -69,8 +74,27 @@ public class BeanConfig {
69 OkHttpClient.Builder builder = new OkHttpClient.Builder(); 74 OkHttpClient.Builder builder = new OkHttpClient.Builder();
70 builder.connectTimeout(30, TimeUnit.SECONDS) 75 builder.connectTimeout(30, TimeUnit.SECONDS)
71 .readTimeout(10, TimeUnit.SECONDS) 76 .readTimeout(10, TimeUnit.SECONDS)
72 - .writeTimeout(10,TimeUnit.SECONDS) 77 + .writeTimeout(10, TimeUnit.SECONDS)
73 .retryOnConnectionFailure(true); 78 .retryOnConnectionFailure(true);
74 return builder.build(); 79 return builder.build();
75 } 80 }
  81 +
  82 + @Bean
  83 + public RouteHandle buildRouteHandle() throws Exception {
  84 + String routeWay = appConfiguration.getRouteWay();
  85 + RouteHandle routeHandle = (RouteHandle) Class.forName(routeWay).newInstance();
  86 + if (routeWay.contains("ConsistentHash")) {
  87 + //一致性 hash 算法
  88 + Method method = Class.forName(routeWay).getMethod("setHash", AbstractConsistentHash.class);
  89 + AbstractConsistentHash consistentHash = (AbstractConsistentHash)
  90 + Class.forName(appConfiguration.getConsistentHashWay()).newInstance();
  91 + method.invoke(routeHandle,consistentHash) ;
  92 + return routeHandle ;
  93 + } else {
  94 +
  95 + return routeHandle;
  96 +
  97 + }
  98 +
  99 + }
76 } 100 }
@@ -5,6 +5,7 @@ import com.crossoverjie.cim.common.exception.CIMException; @@ -5,6 +5,7 @@ import com.crossoverjie.cim.common.exception.CIMException;
5 import com.crossoverjie.cim.common.pojo.CIMUserInfo; 5 import com.crossoverjie.cim.common.pojo.CIMUserInfo;
6 import com.crossoverjie.cim.common.res.BaseResponse; 6 import com.crossoverjie.cim.common.res.BaseResponse;
7 import com.crossoverjie.cim.common.res.NULLBody; 7 import com.crossoverjie.cim.common.res.NULLBody;
  8 +import com.crossoverjie.cim.common.route.algorithm.RouteHandle;
8 import com.crossoverjie.cim.route.cache.ServerCache; 9 import com.crossoverjie.cim.route.cache.ServerCache;
9 import com.crossoverjie.cim.route.service.AccountService; 10 import com.crossoverjie.cim.route.service.AccountService;
10 import com.crossoverjie.cim.route.service.UserInfoCacheService; 11 import com.crossoverjie.cim.route.service.UserInfoCacheService;
@@ -48,6 +49,10 @@ public class RouteController { @@ -48,6 +49,10 @@ public class RouteController {
48 @Autowired 49 @Autowired
49 private UserInfoCacheService userInfoCacheService ; 50 private UserInfoCacheService userInfoCacheService ;
50 51
  52 +
  53 + @Autowired
  54 + private RouteHandle routeHandle ;
  55 +
51 @ApiOperation("群聊 API") 56 @ApiOperation("群聊 API")
52 @RequestMapping(value = "groupRoute", method = RequestMethod.POST) 57 @RequestMapping(value = "groupRoute", method = RequestMethod.POST)
53 @ResponseBody() 58 @ResponseBody()
@@ -145,7 +150,8 @@ public class RouteController { @@ -145,7 +150,8 @@ public class RouteController {
145 //登录校验 150 //登录校验
146 StatusEnum status = accountService.login(loginReqVO); 151 StatusEnum status = accountService.login(loginReqVO);
147 if (status == StatusEnum.SUCCESS) { 152 if (status == StatusEnum.SUCCESS) {
148 - String server = serverCache.selectServer(); 153 +
  154 + String server = routeHandle.routeServer(serverCache.getAll(),String.valueOf(loginReqVO.getUserId()));
149 String[] serverInfo = server.split(":"); 155 String[] serverInfo = server.split(":");
150 CIMServerResVO vo = new CIMServerResVO(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2])); 156 CIMServerResVO vo = new CIMServerResVO(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2]));
151 157
@@ -21,7 +21,17 @@ app.zk.connect.timeout=15000 @@ -21,7 +21,17 @@ app.zk.connect.timeout=15000
21 # zk 注册根节点 21 # zk 注册根节点
22 app.zk.root=/route 22 app.zk.root=/route
23 23
  24 +#路由策略,轮询
  25 +#app.route.way=com.crossoverjie.cim.common.route.algorithm.loop.LoopHandle
24 26
  27 +#路由策略,一致性 hash
  28 +app.route.way=com.crossoverjie.cim.common.route.algorithm.consistenthash.ConsistentHashHandle
  29 +
  30 +#一致性 hash 算法具体实现--自定义有序 map
  31 +#app.route.way.consitenthash=com.crossoverjie.cim.common.route.algorithm.consistenthash.SortArrayMapConsistentHash
  32 +
  33 +#一致性 hash 算法具体实现--TreeMap
  34 +app.route.way.consitenthash=com.crossoverjie.cim.common.route.algorithm.consistenthash.TreeMapConsistentHash
25 35
26 # Redis 配置 36 # Redis 配置
27 spring.redis.host=47.98.194.60 37 spring.redis.host=47.98.194.60