作者 crossoverJie

:recycle: Refactoring code.

正在显示 31 个修改的文件 包含 663 行增加67 行删除
1 package com.crossoverjie.cim.client; 1 package com.crossoverjie.cim.client;
2 2
  3 +import com.crossoverjie.cim.client.client.CIMClient;
3 import com.crossoverjie.cim.client.scanner.Scan; 4 import com.crossoverjie.cim.client.scanner.Scan;
4 import org.slf4j.Logger; 5 import org.slf4j.Logger;
5 import org.slf4j.LoggerFactory; 6 import org.slf4j.LoggerFactory;
@@ -17,7 +18,7 @@ public class CIMClientApplication implements CommandLineRunner{ @@ -17,7 +18,7 @@ public class CIMClientApplication implements CommandLineRunner{
17 private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientApplication.class); 18 private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientApplication.class);
18 19
19 @Autowired 20 @Autowired
20 - private HeartbeatClient heartbeatClient ; 21 + private CIMClient heartbeatClient ;
21 22
22 23
23 public static void main(String[] args) { 24 public static void main(String[] args) {
1 -package com.crossoverjie.cim.client; 1 +package com.crossoverjie.cim.client.client;
2 2
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 import com.crossoverjie.cim.client.init.CustomerHandleInitializer; 4 import com.crossoverjie.cim.client.init.CustomerHandleInitializer;
@@ -29,9 +29,9 @@ import javax.annotation.PostConstruct; @@ -29,9 +29,9 @@ import javax.annotation.PostConstruct;
29 * @since JDK 1.8 29 * @since JDK 1.8
30 */ 30 */
31 @Component 31 @Component
32 -public class HeartbeatClient { 32 +public class CIMClient {
33 33
34 - private final static Logger LOGGER = LoggerFactory.getLogger(HeartbeatClient.class); 34 + private final static Logger LOGGER = LoggerFactory.getLogger(CIMClient.class);
35 35
36 private EventLoopGroup group = new NioEventLoopGroup(); 36 private EventLoopGroup group = new NioEventLoopGroup();
37 37
1 package com.crossoverjie.cim.client.controller; 1 package com.crossoverjie.cim.client.controller;
2 2
3 -import com.crossoverjie.cim.client.HeartbeatClient; 3 +import com.crossoverjie.cim.client.client.CIMClient;
4 import com.crossoverjie.cim.client.vo.req.SendMsgReqVO; 4 import com.crossoverjie.cim.client.vo.req.SendMsgReqVO;
5 import com.crossoverjie.cim.client.vo.res.SendMsgResVO; 5 import com.crossoverjie.cim.client.vo.res.SendMsgResVO;
6 import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO; 6 import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
@@ -38,7 +38,7 @@ public class IndexController { @@ -38,7 +38,7 @@ public class IndexController {
38 private CounterService counterService; 38 private CounterService counterService;
39 39
40 @Autowired 40 @Autowired
41 - private HeartbeatClient heartbeatClient ; 41 + private CIMClient heartbeatClient ;
42 42
43 43
44 44
1 package com.crossoverjie.cim.client.scanner; 1 package com.crossoverjie.cim.client.scanner;
2 2
3 -import com.crossoverjie.cim.client.HeartbeatClient; 3 +import com.crossoverjie.cim.client.client.CIMClient;
4 import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO; 4 import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
5 import org.slf4j.Logger; 5 import org.slf4j.Logger;
6 import org.slf4j.LoggerFactory; 6 import org.slf4j.LoggerFactory;
@@ -18,9 +18,9 @@ public class Scan implements Runnable{ @@ -18,9 +18,9 @@ public class Scan implements Runnable{
18 18
19 private final static Logger LOGGER = LoggerFactory.getLogger(Scan.class); 19 private final static Logger LOGGER = LoggerFactory.getLogger(Scan.class);
20 20
21 - private HeartbeatClient heartbeatClient ; 21 + private CIMClient heartbeatClient ;
22 22
23 - public Scan(HeartbeatClient heartbeatClient) { 23 + public Scan(CIMClient heartbeatClient) {
24 this.heartbeatClient = heartbeatClient; 24 this.heartbeatClient = heartbeatClient;
25 } 25 }
26 26
@@ -27,6 +27,12 @@ @@ -27,6 +27,12 @@
27 <dependency> 27 <dependency>
28 <groupId>com.crossoverjie.netty</groupId> 28 <groupId>com.crossoverjie.netty</groupId>
29 <artifactId>cim-common</artifactId> 29 <artifactId>cim-common</artifactId>
  30 + <exclusions>
  31 + <exclusion>
  32 + <artifactId>log4j</artifactId>
  33 + <groupId>log4j</groupId>
  34 + </exclusion>
  35 + </exclusions>
30 </dependency> 36 </dependency>
31 37
32 38
1 package com.crossoverjie.cim.route; 1 package com.crossoverjie.cim.route;
2 2
  3 +import com.crossoverjie.cim.route.kit.ServerListListener;
3 import org.slf4j.Logger; 4 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory; 5 import org.slf4j.LoggerFactory;
  6 +import org.springframework.boot.CommandLineRunner;
5 import org.springframework.boot.SpringApplication; 7 import org.springframework.boot.SpringApplication;
6 import org.springframework.boot.autoconfigure.SpringBootApplication; 8 import org.springframework.boot.autoconfigure.SpringBootApplication;
7 9
@@ -9,7 +11,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -9,7 +11,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
9 * @author crossoverJie 11 * @author crossoverJie
10 */ 12 */
11 @SpringBootApplication 13 @SpringBootApplication
12 -public class RouteApplication { 14 +public class RouteApplication implements CommandLineRunner{
13 15
14 private final static Logger LOGGER = LoggerFactory.getLogger(RouteApplication.class); 16 private final static Logger LOGGER = LoggerFactory.getLogger(RouteApplication.class);
15 17
@@ -18,4 +20,12 @@ public class RouteApplication { @@ -18,4 +20,12 @@ public class RouteApplication {
18 LOGGER.info("启动 route 成功"); 20 LOGGER.info("启动 route 成功");
19 } 21 }
20 22
  23 + @Override
  24 + public void run(String... args) throws Exception {
  25 +
  26 + //监听服务
  27 + Thread thread = new Thread(new ServerListListener());
  28 + thread.setName("zk-listener");
  29 + thread.start() ;
  30 + }
21 } 31 }
  1 +package com.crossoverjie.cim.route.cache;
  2 +
  3 +import com.crossoverjie.cim.route.kit.ZKit;
  4 +import com.google.common.cache.LoadingCache;
  5 +import org.springframework.beans.factory.annotation.Autowired;
  6 +import org.springframework.stereotype.Component;
  7 +
  8 +import java.util.ArrayList;
  9 +import java.util.List;
  10 +import java.util.Map;
  11 +import java.util.concurrent.atomic.AtomicLong;
  12 +
  13 +/**
  14 + * Function: 服务器节点缓存
  15 + *
  16 + * @author crossoverJie
  17 + * Date: 2018/8/19 01:31
  18 + * @since JDK 1.8
  19 + */
  20 +@Component
  21 +public class ServerCache {
  22 +
  23 +
  24 + @Autowired
  25 + private LoadingCache<String, String> cache;
  26 +
  27 + @Autowired
  28 + private ZKit zkUtil;
  29 +
  30 + private AtomicLong index = new AtomicLong();
  31 +
  32 +
  33 + public void addCache(String key) {
  34 + cache.put(key, key);
  35 + }
  36 +
  37 +
  38 + /**
  39 + * 更新所有缓存/先删除 再新增
  40 + *
  41 + * @param currentChilds
  42 + */
  43 + public void updateCache(List<String> currentChilds) {
  44 + cache.invalidateAll();
  45 + for (String currentChild : currentChilds) {
  46 + String key = currentChild.split("-")[1];
  47 + addCache(key);
  48 + }
  49 + }
  50 +
  51 +
  52 + /**
  53 + * 获取所有的服务列表
  54 + *
  55 + * @return
  56 + */
  57 + public List<String> getAll() {
  58 +
  59 + List<String> list = new ArrayList<>();
  60 +
  61 + if (cache.size() == 0) {
  62 + List<String> allNode = zkUtil.getAllNode();
  63 + for (String node : allNode) {
  64 + String key = node.split("-")[1];
  65 + addCache(key);
  66 + }
  67 + }
  68 + for (Map.Entry<String, String> entry : cache.asMap().entrySet()) {
  69 + list.add(entry.getKey());
  70 + }
  71 + return list;
  72 +
  73 + }
  74 +
  75 + /**
  76 + * 选取服务器
  77 + *
  78 + * @return
  79 + */
  80 + public String selectServer() {
  81 + List<String> all = getAll();
  82 + if (all.size() == 0) {
  83 + throw new RuntimeException("路由列表为空");
  84 + }
  85 + Long position = index.incrementAndGet() % all.size();
  86 + if (position < 0) {
  87 + position = 0L;
  88 + }
  89 +
  90 + return all.get(position.intValue());
  91 + }
  92 +}
  1 +package com.crossoverjie.cim.route.config;
  2 +
  3 +import org.springframework.beans.factory.annotation.Value;
  4 +import org.springframework.stereotype.Component;
  5 +
  6 +/**
  7 + * Function:
  8 + *
  9 + * @author crossoverJie
  10 + * Date: 2018/8/24 01:43
  11 + * @since JDK 1.8
  12 + */
  13 +@Component
  14 +public class AppConfiguration {
  15 +
  16 + @Value("${app.zk.root}")
  17 + private String zkRoot;
  18 +
  19 + @Value("${app.zk.addr}")
  20 + private String zkAddr;
  21 +
  22 +
  23 + @Value("${server.port}")
  24 + private int port;
  25 +
  26 + public int getPort() {
  27 + return port;
  28 + }
  29 +
  30 + public void setPort(int port) {
  31 + this.port = port;
  32 + }
  33 +
  34 + public String getZkRoot() {
  35 + return zkRoot;
  36 + }
  37 +
  38 + public void setZkRoot(String zkRoot) {
  39 + this.zkRoot = zkRoot;
  40 + }
  41 +
  42 + public String getZkAddr() {
  43 + return zkAddr;
  44 + }
  45 +
  46 + public void setZkAddr(String zkAddr) {
  47 + this.zkAddr = zkAddr;
  48 + }
  49 +
  50 +}
  1 +package com.crossoverjie.cim.route.config;
  2 +
  3 +import com.google.common.cache.CacheBuilder;
  4 +import com.google.common.cache.CacheLoader;
  5 +import com.google.common.cache.LoadingCache;
  6 +import org.I0Itec.zkclient.ZkClient;
  7 +import org.springframework.beans.factory.annotation.Autowired;
  8 +import org.springframework.context.annotation.Bean;
  9 +import org.springframework.context.annotation.Configuration;
  10 +
  11 +/**
  12 + * Function:
  13 + *
  14 + * @author crossoverJie
  15 + * Date: 2018/12/23 00:25
  16 + * @since JDK 1.8
  17 + */
  18 +@Configuration
  19 +public class BeanConfig {
  20 +
  21 + @Autowired
  22 + private AppConfiguration appConfiguration ;
  23 +
  24 + @Bean
  25 + public ZkClient buildZKClient(){
  26 + return new ZkClient(appConfiguration.getZkAddr(), 5000);
  27 + }
  28 +
  29 + @Bean
  30 + public LoadingCache<String,String> buildCache(){
  31 + return CacheBuilder.newBuilder()
  32 + .build(new CacheLoader<String, String>() {
  33 + @Override
  34 + public String load(String s) throws Exception {
  35 + return null;
  36 + }
  37 + });
  38 + }
  39 +}
1 package com.crossoverjie.cim.route.controller; 1 package com.crossoverjie.cim.route.controller;
2 2
3 -import com.crossoverjie.cim.route.vo.req.P2PRequest;  
4 -import com.crossoverjie.cim.route.vo.req.GroupRequest;  
5 import com.crossoverjie.cim.common.enums.StatusEnum; 3 import com.crossoverjie.cim.common.enums.StatusEnum;
6 import com.crossoverjie.cim.common.res.BaseResponse; 4 import com.crossoverjie.cim.common.res.BaseResponse;
  5 +import com.crossoverjie.cim.common.res.NULLBody;
  6 +import com.crossoverjie.cim.route.cache.ServerCache;
  7 +import com.crossoverjie.cim.route.vo.req.GroupRequest;
  8 +import com.crossoverjie.cim.route.vo.req.P2PRequest;
  9 +import com.crossoverjie.cim.route.vo.res.CIMServerResVO;
7 import io.swagger.annotations.ApiOperation; 10 import io.swagger.annotations.ApiOperation;
8 import org.slf4j.Logger; 11 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory; 12 import org.slf4j.LoggerFactory;
  13 +import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.stereotype.Controller; 14 import org.springframework.stereotype.Controller;
11 import org.springframework.web.bind.annotation.RequestBody; 15 import org.springframework.web.bind.annotation.RequestBody;
12 import org.springframework.web.bind.annotation.RequestMapping; 16 import org.springframework.web.bind.annotation.RequestMapping;
@@ -25,11 +29,14 @@ import org.springframework.web.bind.annotation.ResponseBody; @@ -25,11 +29,14 @@ import org.springframework.web.bind.annotation.ResponseBody;
25 public class RouteController { 29 public class RouteController {
26 private final static Logger LOGGER = LoggerFactory.getLogger(RouteController.class); 30 private final static Logger LOGGER = LoggerFactory.getLogger(RouteController.class);
27 31
  32 + @Autowired
  33 + private ServerCache serverCache ;
  34 +
28 @ApiOperation("群聊 API") 35 @ApiOperation("群聊 API")
29 @RequestMapping(value = "groupRoute",method = RequestMethod.POST) 36 @RequestMapping(value = "groupRoute",method = RequestMethod.POST)
30 @ResponseBody() 37 @ResponseBody()
31 - public BaseResponse groupRoute(@RequestBody GroupRequest groupRequest){  
32 - BaseResponse res = new BaseResponse(); 38 + public BaseResponse<NULLBody> groupRoute(@RequestBody GroupRequest groupRequest){
  39 + BaseResponse<NULLBody> res = new BaseResponse();
33 40
34 LOGGER.info("msg=[{}]",groupRequest.toString()); 41 LOGGER.info("msg=[{}]",groupRequest.toString());
35 42
@@ -47,13 +54,35 @@ public class RouteController { @@ -47,13 +54,35 @@ public class RouteController {
47 @ApiOperation("私聊 API") 54 @ApiOperation("私聊 API")
48 @RequestMapping(value = "p2pRoute",method = RequestMethod.POST) 55 @RequestMapping(value = "p2pRoute",method = RequestMethod.POST)
49 @ResponseBody() 56 @ResponseBody()
50 - public BaseResponse p2pRoute(@RequestBody P2PRequest p2pRequest){  
51 - BaseResponse res = new BaseResponse(); 57 + public BaseResponse<NULLBody> p2pRoute(@RequestBody P2PRequest p2pRequest){
  58 + BaseResponse<NULLBody> res = new BaseResponse();
  59 +
  60 + res.setCode(StatusEnum.SUCCESS.getCode()) ;
  61 + res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
  62 + return res ;
  63 + }
  64 +
  65 + /**
  66 + * 获取一台 CIM server
  67 + * @return
  68 + */
  69 + @ApiOperation("获取服务器")
  70 + @RequestMapping(value = "getCIMServer",method = RequestMethod.POST)
  71 + @ResponseBody()
  72 + public BaseResponse<CIMServerResVO> getCIMServer(){
  73 + BaseResponse<CIMServerResVO> res = new BaseResponse();
  74 +
  75 + String server = serverCache.selectServer();
  76 + String[] serverInfo = server.split(":");
  77 + CIMServerResVO vo = new CIMServerResVO(serverInfo[0],Integer.parseInt(serverInfo[1])) ;
52 78
  79 + res.setDataBody(vo);
53 res.setCode(StatusEnum.SUCCESS.getCode()) ; 80 res.setCode(StatusEnum.SUCCESS.getCode()) ;
54 res.setMessage(StatusEnum.SUCCESS.getMessage()) ; 81 res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
55 return res ; 82 return res ;
56 } 83 }
57 84
58 85
  86 +
  87 +
59 } 88 }
  1 +package com.crossoverjie.cim.route.kit;
  2 +
  3 +import com.crossoverjie.cim.route.config.AppConfiguration;
  4 +import com.crossoverjie.cim.route.util.SpringBeanFactory;
  5 +import org.slf4j.Logger;
  6 +import org.slf4j.LoggerFactory;
  7 +
  8 +/**
  9 + * Function:
  10 + *
  11 + * @author crossoverJie
  12 + * Date: 2018/12/23 00:35
  13 + * @since JDK 1.8
  14 + */
  15 +public class ServerListListener implements Runnable{
  16 +
  17 + private static Logger logger = LoggerFactory.getLogger(ServerListListener.class);
  18 +
  19 + private ZKit zkUtil;
  20 +
  21 + private AppConfiguration appConfiguration ;
  22 +
  23 +
  24 + public ServerListListener() {
  25 + zkUtil = SpringBeanFactory.getBean(ZKit.class) ;
  26 + appConfiguration = SpringBeanFactory.getBean(AppConfiguration.class) ;
  27 + }
  28 +
  29 + @Override
  30 + public void run() {
  31 + //注册监听服务
  32 + zkUtil.subscribeEvent(appConfiguration.getZkRoot());
  33 +
  34 + }
  35 +}
  1 +package com.crossoverjie.cim.route.kit;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.crossoverjie.cim.route.cache.ServerCache;
  5 +import org.I0Itec.zkclient.IZkChildListener;
  6 +import org.I0Itec.zkclient.ZkClient;
  7 +import org.slf4j.Logger;
  8 +import org.slf4j.LoggerFactory;
  9 +import org.springframework.beans.factory.annotation.Autowired;
  10 +import org.springframework.stereotype.Component;
  11 +
  12 +import java.util.List;
  13 +
  14 +/**
  15 + * Function: Zookeeper 工具
  16 + *
  17 + * @author crossoverJie
  18 + * Date: 2018/8/19 00:33
  19 + * @since JDK 1.8
  20 + */
  21 +@Component
  22 +public class ZKit {
  23 +
  24 + private static Logger logger = LoggerFactory.getLogger(ZKit.class);
  25 +
  26 +
  27 + @Autowired
  28 + private ZkClient zkClient;
  29 +
  30 + @Autowired
  31 + private ServerCache serverCache ;
  32 +
  33 +
  34 + /**
  35 + * 监听事件
  36 + *
  37 + * @param path
  38 + */
  39 + public void subscribeEvent(String path) {
  40 + zkClient.subscribeChildChanges(path, new IZkChildListener() {
  41 + @Override
  42 + public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
  43 + logger.info("清除/更新本地缓存 parentPath=【{}】,currentChilds=【{}】", parentPath,currentChilds.toString());
  44 +
  45 + //更新所有缓存/先删除 再新增
  46 + serverCache.updateCache(currentChilds) ;
  47 + }
  48 + });
  49 +
  50 +
  51 + }
  52 +
  53 +
  54 + /**
  55 + * 获取所有注册节点
  56 + * @return
  57 + */
  58 + public List<String> getAllNode(){
  59 + List<String> children = zkClient.getChildren("/route");
  60 + logger.info("查询所有节点成功=【{}】", JSON.toJSONString(children));
  61 + return children;
  62 + }
  63 +
  64 +
  65 +}
  1 +package com.crossoverjie.cim.route.util;
  2 +
  3 +import org.springframework.beans.BeansException;
  4 +import org.springframework.context.ApplicationContext;
  5 +import org.springframework.context.ApplicationContextAware;
  6 +import org.springframework.stereotype.Component;
  7 +
  8 +@Component
  9 +public final class SpringBeanFactory implements ApplicationContextAware{
  10 + private static ApplicationContext context;
  11 +
  12 + public static <T> T getBean(Class<T> c){
  13 + return context.getBean(c);
  14 + }
  15 +
  16 +
  17 + public static <T> T getBean(String name,Class<T> clazz){
  18 + return context.getBean(name,clazz);
  19 + }
  20 +
  21 + @Override
  22 + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  23 + context = applicationContext;
  24 + }
  25 +
  26 +
  27 +}
  1 +package com.crossoverjie.cim.route.vo.res;
  2 +
  3 +import java.io.Serializable;
  4 +
  5 +/**
  6 + * Function:
  7 + *
  8 + * @author crossoverJie
  9 + * Date: 2018/12/23 00:43
  10 + * @since JDK 1.8
  11 + */
  12 +public class CIMServerResVO implements Serializable {
  13 +
  14 + private String ip ;
  15 + private Integer port;
  16 +
  17 + public CIMServerResVO(String ip, Integer port) {
  18 + this.ip = ip;
  19 + this.port = port;
  20 + }
  21 +
  22 + public String getIp() {
  23 + return ip;
  24 + }
  25 +
  26 + public void setIp(String ip) {
  27 + this.ip = ip;
  28 + }
  29 +
  30 + public Integer getPort() {
  31 + return port;
  32 + }
  33 +
  34 + public void setPort(Integer port) {
  35 + this.port = port;
  36 + }
  37 +
  38 + @Override
  39 + public String toString() {
  40 + return "CIMServerResVO{" +
  41 + "ip='" + ip + '\'' +
  42 + ", port=" + port +
  43 + '}';
  44 + }
  45 +}
@@ -12,3 +12,9 @@ logging.level.root=info @@ -12,3 +12,9 @@ logging.level.root=info
12 # 关闭健康检查权限 12 # 关闭健康检查权限
13 management.security.enabled=false 13 management.security.enabled=false
14 14
  15 +# zk 地址
  16 +app.zk.addr=47.98.194.60:2181
  17 +
  18 +# zk 注册根节点
  19 +app.zk.root=/route
  20 +
@@ -29,6 +29,12 @@ @@ -29,6 +29,12 @@
29 <dependency> 29 <dependency>
30 <groupId>com.crossoverjie.netty</groupId> 30 <groupId>com.crossoverjie.netty</groupId>
31 <artifactId>cim-common</artifactId> 31 <artifactId>cim-common</artifactId>
  32 + <exclusions>
  33 + <exclusion>
  34 + <artifactId>log4j</artifactId>
  35 + <groupId>log4j</groupId>
  36 + </exclusion>
  37 + </exclusions>
32 </dependency> 38 </dependency>
33 39
34 <dependency> 40 <dependency>
1 package com.crossoverjie.cim.server; 1 package com.crossoverjie.cim.server;
2 2
  3 +import com.crossoverjie.cim.server.config.AppConfiguration;
  4 +import com.crossoverjie.cim.server.kit.RegistryZK;
3 import org.slf4j.Logger; 5 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory; 6 import org.slf4j.LoggerFactory;
  7 +import org.springframework.beans.factory.annotation.Autowired;
  8 +import org.springframework.boot.CommandLineRunner;
5 import org.springframework.boot.SpringApplication; 9 import org.springframework.boot.SpringApplication;
6 import org.springframework.boot.autoconfigure.SpringBootApplication; 10 import org.springframework.boot.autoconfigure.SpringBootApplication;
7 11
  12 +import java.net.InetAddress;
  13 +
8 /** 14 /**
9 * @author crossoverJie 15 * @author crossoverJie
10 */ 16 */
11 @SpringBootApplication 17 @SpringBootApplication
12 -public class CIMServerApplication { 18 +public class CIMServerApplication implements CommandLineRunner{
13 19
14 private final static Logger LOGGER = LoggerFactory.getLogger(CIMServerApplication.class); 20 private final static Logger LOGGER = LoggerFactory.getLogger(CIMServerApplication.class);
15 21
  22 + @Autowired
  23 + private AppConfiguration appConfiguration ;
  24 +
16 public static void main(String[] args) { 25 public static void main(String[] args) {
17 SpringApplication.run(CIMServerApplication.class, args); 26 SpringApplication.run(CIMServerApplication.class, args);
18 LOGGER.info("启动 Server 成功"); 27 LOGGER.info("启动 Server 成功");
19 } 28 }
20 29
  30 + @Override
  31 + public void run(String... args) throws Exception {
  32 + //获得本机IP
  33 + String addr = InetAddress.getLocalHost().getHostAddress();
  34 + Thread thread = new Thread(new RegistryZK(addr, appConfiguration.getPort()));
  35 + thread.setName("registry-zk");
  36 + thread.start() ;
  37 + }
21 } 38 }
  1 +package com.crossoverjie.cim.server.config;
  2 +
  3 +import org.springframework.beans.factory.annotation.Value;
  4 +import org.springframework.stereotype.Component;
  5 +
  6 +/**
  7 + * Function:
  8 + *
  9 + * @author crossoverJie
  10 + * Date: 2018/8/24 01:43
  11 + * @since JDK 1.8
  12 + */
  13 +@Component
  14 +public class AppConfiguration {
  15 +
  16 + @Value("${app.zk.root}")
  17 + private String zkRoot;
  18 +
  19 + @Value("${app.zk.addr}")
  20 + private String zkAddr;
  21 +
  22 + @Value("${app.zk.switch}")
  23 + private boolean zkSwitch;
  24 +
  25 + @Value("${server.port}")
  26 + private int port;
  27 +
  28 + public int getPort() {
  29 + return port;
  30 + }
  31 +
  32 + public void setPort(int port) {
  33 + this.port = port;
  34 + }
  35 +
  36 + public String getZkRoot() {
  37 + return zkRoot;
  38 + }
  39 +
  40 + public void setZkRoot(String zkRoot) {
  41 + this.zkRoot = zkRoot;
  42 + }
  43 +
  44 + public String getZkAddr() {
  45 + return zkAddr;
  46 + }
  47 +
  48 + public void setZkAddr(String zkAddr) {
  49 + this.zkAddr = zkAddr;
  50 + }
  51 +
  52 + public boolean isZkSwitch() {
  53 + return zkSwitch;
  54 + }
  55 +
  56 + public void setZkSwitch(boolean zkSwitch) {
  57 + this.zkSwitch = zkSwitch;
  58 + }
  59 +}
  1 +package com.crossoverjie.cim.server.config;
  2 +
  3 +import org.I0Itec.zkclient.ZkClient;
  4 +import org.springframework.beans.factory.annotation.Autowired;
  5 +import org.springframework.context.annotation.Bean;
  6 +import org.springframework.context.annotation.Configuration;
  7 +
  8 +/**
  9 + * Function:
  10 + *
  11 + * @author crossoverJie
  12 + * Date: 2018/12/23 00:25
  13 + * @since JDK 1.8
  14 + */
  15 +@Configuration
  16 +public class BeanConfig {
  17 +
  18 + @Autowired
  19 + private AppConfiguration appConfiguration ;
  20 +
  21 + @Bean
  22 + public ZkClient buildZKClient(){
  23 + return new ZkClient(appConfiguration.getZkAddr(), 5000);
  24 + }
  25 +}
@@ -4,7 +4,7 @@ import com.crossoverjie.cim.common.enums.StatusEnum; @@ -4,7 +4,7 @@ import com.crossoverjie.cim.common.enums.StatusEnum;
4 import com.crossoverjie.cim.common.res.BaseResponse; 4 import com.crossoverjie.cim.common.res.BaseResponse;
5 import com.crossoverjie.cim.server.vo.req.SendMsgReqVO; 5 import com.crossoverjie.cim.server.vo.req.SendMsgReqVO;
6 import com.crossoverjie.cim.common.constant.Constants; 6 import com.crossoverjie.cim.common.constant.Constants;
7 -import com.crossoverjie.cim.server.server.HeartBeatServer; 7 +import com.crossoverjie.cim.server.server.CIMServer;
8 import com.crossoverjie.cim.server.vo.res.SendMsgResVO; 8 import com.crossoverjie.cim.server.vo.res.SendMsgResVO;
9 import io.swagger.annotations.ApiOperation; 9 import io.swagger.annotations.ApiOperation;
10 import org.springframework.beans.factory.annotation.Autowired; 10 import org.springframework.beans.factory.annotation.Autowired;
@@ -27,7 +27,7 @@ import org.springframework.web.bind.annotation.ResponseBody; @@ -27,7 +27,7 @@ import org.springframework.web.bind.annotation.ResponseBody;
27 public class IndexController { 27 public class IndexController {
28 28
29 @Autowired 29 @Autowired
30 - private HeartBeatServer heartbeatClient ; 30 + private CIMServer heartbeatClient ;
31 31
32 32
33 /** 33 /**
@@ -3,13 +3,9 @@ package com.crossoverjie.cim.server.handle; @@ -3,13 +3,9 @@ package com.crossoverjie.cim.server.handle;
3 import com.crossoverjie.cim.common.protocol.BaseRequestProto; 3 import com.crossoverjie.cim.common.protocol.BaseRequestProto;
4 import com.crossoverjie.cim.common.protocol.BaseResponseProto; 4 import com.crossoverjie.cim.common.protocol.BaseResponseProto;
5 import com.crossoverjie.cim.server.util.NettySocketHolder; 5 import com.crossoverjie.cim.server.util.NettySocketHolder;
6 -import com.crossoverjie.cim.common.pojo.CustomProtocol;  
7 -import io.netty.buffer.ByteBuf;  
8 -import io.netty.buffer.Unpooled;  
9 import io.netty.channel.ChannelHandlerContext; 6 import io.netty.channel.ChannelHandlerContext;
10 import io.netty.channel.SimpleChannelInboundHandler; 7 import io.netty.channel.SimpleChannelInboundHandler;
11 import io.netty.channel.socket.nio.NioSocketChannel; 8 import io.netty.channel.socket.nio.NioSocketChannel;
12 -import io.netty.util.CharsetUtil;  
13 import org.slf4j.Logger; 9 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory; 10 import org.slf4j.LoggerFactory;
15 11
@@ -20,11 +16,9 @@ import org.slf4j.LoggerFactory; @@ -20,11 +16,9 @@ import org.slf4j.LoggerFactory;
20 * Date: 17/05/2018 18:52 16 * Date: 17/05/2018 18:52
21 * @since JDK 1.8 17 * @since JDK 1.8
22 */ 18 */
23 -public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<BaseRequestProto.RequestProtocol> { 19 +public class ServerHandle extends SimpleChannelInboundHandler<BaseRequestProto.RequestProtocol> {
24 20
25 - private final static Logger LOGGER = LoggerFactory.getLogger(HeartBeatSimpleHandle.class);  
26 -  
27 - private static final ByteBuf HEART_BEAT = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(new CustomProtocol(123456L,"pong").toString(),CharsetUtil.UTF_8)); 21 + private final static Logger LOGGER = LoggerFactory.getLogger(ServerHandle.class);
28 22
29 23
30 /** 24 /**
@@ -34,26 +28,12 @@ public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<BaseReque @@ -34,26 +28,12 @@ public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<BaseReque
34 */ 28 */
35 @Override 29 @Override
36 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 30 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
37 -  
38 NettySocketHolder.remove((NioSocketChannel) ctx.channel()); 31 NettySocketHolder.remove((NioSocketChannel) ctx.channel());
39 } 32 }
40 33
41 @Override 34 @Override
42 - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
43 -  
44 - /*if (evt instanceof IdleStateEvent){  
45 - IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;  
46 -  
47 - if (idleStateEvent.state() == IdleState.READER_IDLE){  
48 - LOGGER.info("已经5秒没有收到信息!");  
49 - //向客户端发送消息  
50 - ctx.writeAndFlush(HEART_BEAT).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;  
51 - }  
52 -  
53 -  
54 - }*/  
55 -  
56 - super.userEventTriggered(ctx, evt); 35 + public void channelActive(ChannelHandlerContext ctx) throws Exception {
  36 + LOGGER.info("有客户端连上来了。。");
57 } 37 }
58 38
59 @Override 39 @Override
1 package com.crossoverjie.cim.server.init; 1 package com.crossoverjie.cim.server.init;
2 2
3 import com.crossoverjie.cim.common.protocol.BaseRequestProto; 3 import com.crossoverjie.cim.common.protocol.BaseRequestProto;
4 -import com.crossoverjie.cim.server.handle.HeartBeatSimpleHandle; 4 +import com.crossoverjie.cim.server.handle.ServerHandle;
5 import io.netty.channel.Channel; 5 import io.netty.channel.Channel;
6 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelInitializer;
7 import io.netty.handler.codec.protobuf.ProtobufDecoder; 7 import io.netty.handler.codec.protobuf.ProtobufDecoder;
8 import io.netty.handler.codec.protobuf.ProtobufEncoder; 8 import io.netty.handler.codec.protobuf.ProtobufEncoder;
9 import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 9 import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
10 import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; 10 import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
11 -import io.netty.handler.timeout.IdleStateHandler;  
12 11
13 /** 12 /**
14 * Function: 13 * Function:
@@ -18,23 +17,18 @@ import io.netty.handler.timeout.IdleStateHandler; @@ -18,23 +17,18 @@ import io.netty.handler.timeout.IdleStateHandler;
18 * @since JDK 1.8 17 * @since JDK 1.8
19 */ 18 */
20 public class HeartbeatInitializer extends ChannelInitializer<Channel> { 19 public class HeartbeatInitializer extends ChannelInitializer<Channel> {
  20 +
  21 + private final ServerHandle serverHandle = new ServerHandle();
  22 +
21 @Override 23 @Override
22 protected void initChannel(Channel ch) throws Exception { 24 protected void initChannel(Channel ch) throws Exception {
23 - ch.pipeline()  
24 - //五秒没有收到消息 将IdleStateHandler 添加到 ChannelPipeline 中  
25 - .addLast(new IdleStateHandler(5, 0, 0))  
26 - //.addLast(new HeartbeatDecoder())  
27 -  
28 - //字符串解析,换行防拆包  
29 - //.addLast(new LineBasedFrameDecoder(1024))  
30 - //.addLast(new StringDecoder())  
31 25
  26 + ch.pipeline()
32 // google Protobuf 编解码 27 // google Protobuf 编解码
33 .addLast(new ProtobufVarint32FrameDecoder()) 28 .addLast(new ProtobufVarint32FrameDecoder())
34 .addLast(new ProtobufDecoder(BaseRequestProto.RequestProtocol.getDefaultInstance())) 29 .addLast(new ProtobufDecoder(BaseRequestProto.RequestProtocol.getDefaultInstance()))
35 .addLast(new ProtobufVarint32LengthFieldPrepender()) 30 .addLast(new ProtobufVarint32LengthFieldPrepender())
36 .addLast(new ProtobufEncoder()) 31 .addLast(new ProtobufEncoder())
37 -  
38 - .addLast(new HeartBeatSimpleHandle()); 32 + .addLast(serverHandle);
39 } 33 }
40 } 34 }
  1 +package com.crossoverjie.cim.server.kit;
  2 +
  3 +import com.crossoverjie.cim.server.config.AppConfiguration;
  4 +import com.crossoverjie.cim.server.util.SpringBeanFactory;
  5 +import org.slf4j.Logger;
  6 +import org.slf4j.LoggerFactory;
  7 +
  8 +/**
  9 + * Function:
  10 + *
  11 + * @author crossoverJie
  12 + * Date: 2018/8/24 01:37
  13 + * @since JDK 1.8
  14 + */
  15 +public class RegistryZK implements Runnable {
  16 +
  17 + private static Logger logger = LoggerFactory.getLogger(RegistryZK.class);
  18 +
  19 + private ZKit zKit;
  20 +
  21 + private AppConfiguration appConfiguration ;
  22 +
  23 + private String ip;
  24 + private int port;
  25 +
  26 + public RegistryZK(String ip, int port) {
  27 + this.ip = ip;
  28 + this.port = port;
  29 + zKit = SpringBeanFactory.getBean(ZKit.class) ;
  30 + appConfiguration = SpringBeanFactory.getBean(AppConfiguration.class) ;
  31 + }
  32 +
  33 + @Override
  34 + public void run() {
  35 +
  36 + //创建父节点
  37 + zKit.createRootNode();
  38 +
  39 + //是否要将自己注册到 ZK
  40 + if (appConfiguration.isZkSwitch()){
  41 + String path = appConfiguration.getZkRoot() + "/ip-" + ip + ":" + port;
  42 + zKit.createNode(path, path);
  43 + logger.info("注册 zookeeper 成功,msg=[{}]", path);
  44 + }
  45 +
  46 +
  47 + }
  48 +}
  1 +package com.crossoverjie.cim.server.kit;
  2 +
  3 +import com.crossoverjie.cim.server.config.AppConfiguration;
  4 +import org.I0Itec.zkclient.ZkClient;
  5 +import org.slf4j.Logger;
  6 +import org.slf4j.LoggerFactory;
  7 +import org.springframework.beans.factory.annotation.Autowired;
  8 +import org.springframework.stereotype.Component;
  9 +
  10 +/**
  11 + * Function: Zookeeper 工具
  12 + *
  13 + * @author crossoverJie
  14 + * Date: 2018/8/19 00:33
  15 + * @since JDK 1.8
  16 + */
  17 +@Component
  18 +public class ZKit {
  19 +
  20 + private static Logger logger = LoggerFactory.getLogger(ZKit.class);
  21 +
  22 +
  23 + @Autowired
  24 + private ZkClient zkClient;
  25 +
  26 + @Autowired
  27 + private AppConfiguration appConfiguration ;
  28 +
  29 + /**
  30 + * 创建父级节点
  31 + */
  32 + public void createRootNode(){
  33 + boolean exists = zkClient.exists(appConfiguration.getZkRoot());
  34 + if (exists){
  35 + return;
  36 + }
  37 +
  38 + //创建 root
  39 + zkClient.createPersistent(appConfiguration.getZkRoot()) ;
  40 + }
  41 +
  42 + /**
  43 + * 写入指定节点 临时目录
  44 + *
  45 + * @param path
  46 + * @param value
  47 + */
  48 + public void createNode(String path, String value) {
  49 + zkClient.createEphemeral(path, value);
  50 + }
  51 +
  52 +}
@@ -33,9 +33,9 @@ import java.net.InetSocketAddress; @@ -33,9 +33,9 @@ import java.net.InetSocketAddress;
33 * @since JDK 1.8 33 * @since JDK 1.8
34 */ 34 */
35 @Component 35 @Component
36 -public class HeartBeatServer { 36 +public class CIMServer {
37 37
38 - private final static Logger LOGGER = LoggerFactory.getLogger(HeartBeatServer.class); 38 + private final static Logger LOGGER = LoggerFactory.getLogger(CIMServer.class);
39 39
40 private EventLoopGroup boss = new NioEventLoopGroup(); 40 private EventLoopGroup boss = new NioEventLoopGroup();
41 private EventLoopGroup work = new NioEventLoopGroup(); 41 private EventLoopGroup work = new NioEventLoopGroup();
@@ -18,3 +18,13 @@ spring.boot.admin.url=http://127.0.0.1:8888 @@ -18,3 +18,13 @@ spring.boot.admin.url=http://127.0.0.1:8888
18 18
19 #自定义监控端点 key 19 #自定义监控端点 key
20 monitor.channel.map.key=channelMap 20 monitor.channel.map.key=channelMap
  21 +
  22 +
  23 +# 是否注册 zk
  24 +app.zk.switch=true
  25 +
  26 +# zk 地址
  27 +app.zk.addr=47.98.194.60:2181
  28 +
  29 +# zk 注册根节点
  30 +app.zk.root=/route
@@ -2,7 +2,7 @@ package com.crossoverjie.cim.server.zk; @@ -2,7 +2,7 @@ package com.crossoverjie.cim.server.zk;
2 2
3 import com.crossoverjie.cim.server.zk.util.AppConfiguration; 3 import com.crossoverjie.cim.server.zk.util.AppConfiguration;
4 import com.crossoverjie.cim.server.zk.thread.RegistryZK; 4 import com.crossoverjie.cim.server.zk.thread.RegistryZK;
5 -import com.crossoverjie.cim.server.zk.util.ZKUtil; 5 +import com.crossoverjie.cim.server.zk.util.ZKit;
6 import org.slf4j.Logger; 6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory; 7 import org.slf4j.LoggerFactory;
8 import org.springframework.beans.factory.annotation.Autowired; 8 import org.springframework.beans.factory.annotation.Autowired;
@@ -24,7 +24,7 @@ public class Application implements CommandLineRunner{ @@ -24,7 +24,7 @@ public class Application implements CommandLineRunner{
24 private AppConfiguration appConfiguration ; 24 private AppConfiguration appConfiguration ;
25 25
26 @Autowired 26 @Autowired
27 - private static ZKUtil zkUtil ; 27 + private static ZKit zkUtil ;
28 28
29 public static void main(String[] args) { 29 public static void main(String[] args) {
30 SpringApplication.run(Application.class, args); 30 SpringApplication.run(Application.class, args);
1 package com.crossoverjie.cim.server.zk.cache; 1 package com.crossoverjie.cim.server.zk.cache;
2 2
3 -import com.crossoverjie.cim.server.zk.util.ZKUtil; 3 +import com.crossoverjie.cim.server.zk.util.ZKit;
4 import com.google.common.cache.LoadingCache; 4 import com.google.common.cache.LoadingCache;
5 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.beans.factory.annotation.Autowired;
6 import org.springframework.stereotype.Component; 6 import org.springframework.stereotype.Component;
@@ -25,7 +25,7 @@ public class ServerCache { @@ -25,7 +25,7 @@ public class ServerCache {
25 private LoadingCache<String, String> cache; 25 private LoadingCache<String, String> cache;
26 26
27 @Autowired 27 @Autowired
28 - private ZKUtil zkUtil; 28 + private ZKit zkUtil;
29 29
30 private AtomicLong index = new AtomicLong(); 30 private AtomicLong index = new AtomicLong();
31 31
@@ -2,7 +2,7 @@ package com.crossoverjie.cim.server.zk.thread; @@ -2,7 +2,7 @@ package com.crossoverjie.cim.server.zk.thread;
2 2
3 import com.crossoverjie.cim.server.zk.util.AppConfiguration; 3 import com.crossoverjie.cim.server.zk.util.AppConfiguration;
4 import com.crossoverjie.cim.server.zk.util.SpringBeanFactory; 4 import com.crossoverjie.cim.server.zk.util.SpringBeanFactory;
5 -import com.crossoverjie.cim.server.zk.util.ZKUtil; 5 +import com.crossoverjie.cim.server.zk.util.ZKit;
6 import org.slf4j.Logger; 6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory; 7 import org.slf4j.LoggerFactory;
8 8
@@ -17,7 +17,7 @@ public class RegistryZK implements Runnable { @@ -17,7 +17,7 @@ public class RegistryZK implements Runnable {
17 17
18 private static Logger logger = LoggerFactory.getLogger(RegistryZK.class); 18 private static Logger logger = LoggerFactory.getLogger(RegistryZK.class);
19 19
20 - private ZKUtil zkUtil; 20 + private ZKit zkUtil;
21 21
22 private AppConfiguration appConfiguration ; 22 private AppConfiguration appConfiguration ;
23 23
@@ -27,7 +27,7 @@ public class RegistryZK implements Runnable { @@ -27,7 +27,7 @@ public class RegistryZK implements Runnable {
27 public RegistryZK(String ip, int port) { 27 public RegistryZK(String ip, int port) {
28 this.ip = ip; 28 this.ip = ip;
29 this.port = port; 29 this.port = port;
30 - zkUtil = SpringBeanFactory.getBean(ZKUtil.class) ; 30 + zkUtil = SpringBeanFactory.getBean(ZKit.class) ;
31 appConfiguration = SpringBeanFactory.getBean(AppConfiguration.class) ; 31 appConfiguration = SpringBeanFactory.getBean(AppConfiguration.class) ;
32 } 32 }
33 33
@@ -19,9 +19,9 @@ import java.util.List; @@ -19,9 +19,9 @@ import java.util.List;
19 * @since JDK 1.8 19 * @since JDK 1.8
20 */ 20 */
21 @Component 21 @Component
22 -public class ZKUtil { 22 +public class ZKit {
23 23
24 - private static Logger logger = LoggerFactory.getLogger(ZKUtil.class); 24 + private static Logger logger = LoggerFactory.getLogger(ZKit.class);
25 25
26 26
27 @Autowired 27 @Autowired
1 -spring.application.name=netty-action-zk 1 +spring.application.name=cim-zk
2 2
3 # web port 3 # web port
4 server.port=8083 4 server.port=8083