作者 crossoverJie

:sparkles: Introducing new features.zk

... ... @@ -16,6 +16,12 @@
<dependency>
<groupId>com.crossoverjie.netty</groupId>
<artifactId>netty-action-common</artifactId>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
... ... @@ -60,6 +66,15 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
... ...
package com.crossoverjie.netty.action.zk;
import com.crossoverjie.netty.action.zk.thread.RegistryZK;
import com.crossoverjie.netty.action.zk.util.AppConfiguration;
import com.crossoverjie.netty.action.zk.util.ZKUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.net.InetAddress;
/**
* @author crossoverJie
*/
... ... @@ -14,14 +20,24 @@ public class Application implements CommandLineRunner{
private final static Logger LOGGER = LoggerFactory.getLogger(Application.class);
@Autowired
private AppConfiguration appConfiguration ;
@Autowired
private static ZKUtil zkUtil ;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
LOGGER.info("启动应用成功");
}
@Override
public void run(String... args) throws Exception {
//获得本机IP
String addr = InetAddress.getLocalHost().getHostAddress();
Thread thread = new Thread(new RegistryZK(addr, appConfiguration.getPort()));
thread.setName("registry-zk");
thread.start() ;
}
}
\ No newline at end of file
... ...
package com.crossoverjie.netty.action.zk.cache;
import com.crossoverjie.netty.action.zk.util.ZKUtil;
import com.google.common.cache.LoadingCache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* Function: 服务器节点缓存
*
* @author crossoverJie
* Date: 2018/8/19 01:31
* @since JDK 1.8
*/
@Component
public class ServerCache {
@Autowired
private LoadingCache<String,String> cache ;
@Autowired
private ZKUtil zkUtil ;
private AtomicLong index = new AtomicLong() ;
public void addCache(String key){
cache.put(key, key);
}
/**
* 更新所有缓存/先删除 再新增
* @param currentChilds
*/
public void updateCache(List<String> currentChilds){
cache.invalidateAll() ;
for (String currentChild : currentChilds) {
String key = currentChild.split("-")[1] ;
addCache(key) ;
}
}
/**
* 获取所有的服务列表
* @return
*/
public List<String> getAll(){
List<String> list = new ArrayList<>() ;
if (cache.size() == 0){
List<String> allNode = zkUtil.getAllNode();
for (String node : allNode) {
String key = node.split("-")[1] ;
addCache(key) ;
}
}
for (Map.Entry<String, String> entry : cache.asMap().entrySet()) {
list.add(entry.getKey());
}
return list ;
}
/**
* 选取服务器
* @return
*/
public String selectServer(){
List<String> all = getAll();
if (all.size() == 0){
throw new RuntimeException("路由列表为空") ;
}
Long position = index.incrementAndGet() % all.size();
if (position < 0){
position = 0L ;
}
return all.get(position.intValue()) ;
}
}
... ...
package com.crossoverjie.netty.action.zk.config;
import com.crossoverjie.netty.action.zk.util.AppConfiguration;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/8/24 01:28
* @since JDK 1.8
*/
@Configuration
public class AppConfig {
@Autowired
private AppConfiguration appConfiguration ;
@Bean
public ZkClient buildZKClient(){
return new ZkClient(appConfiguration.getZkAddr(), 5000);
}
@Bean
public LoadingCache<String,String> buildCache(){
return CacheBuilder.newBuilder()
.build(new CacheLoader<String, String>() {
@Override
public String load(String s) throws Exception {
return null;
}
});
}
}
... ...
package com.crossoverjie.netty.action.zk.thread;
import com.crossoverjie.netty.action.zk.util.AppConfiguration;
import com.crossoverjie.netty.action.zk.util.SpringBeanFactory;
import com.crossoverjie.netty.action.zk.util.ZKUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/8/24 01:37
* @since JDK 1.8
*/
public class RegistryZK implements Runnable {
private static Logger logger = LoggerFactory.getLogger(RegistryZK.class);
private ZKUtil zkUtil;
private AppConfiguration appConfiguration ;
private String ip;
private int port;
public RegistryZK(String ip, int port) {
this.ip = ip;
this.port = port;
zkUtil = SpringBeanFactory.getBean(ZKUtil.class) ;
appConfiguration = SpringBeanFactory.getBean(AppConfiguration.class) ;
}
@Override
public void run() {
//创建父节点
zkUtil.createRootNode();
//是否要将自己注册到 ZK
if (appConfiguration.isZkSwitch()){
String path = appConfiguration.getZkRoot() + "/ip-" + ip + ":" + port;
zkUtil.createNode(path, path);
logger.info("注册 zookeeper 成功,msg=[{}]", path);
}
//注册监听服务
zkUtil.subscribeEvent(appConfiguration.getZkRoot());
}
}
\ No newline at end of file
... ...
package com.crossoverjie.netty.action.zk.util;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/8/24 01:43
* @since JDK 1.8
*/
@Component
public class AppConfiguration {
@Value("${app.zk.root}")
private String zkRoot;
@Value("${app.zk.addr}")
private String zkAddr;
@Value("${app.zk.switch}")
private boolean zkSwitch;
@Value("${server.port}")
private int port;
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getZkRoot() {
return zkRoot;
}
public void setZkRoot(String zkRoot) {
this.zkRoot = zkRoot;
}
public String getZkAddr() {
return zkAddr;
}
public void setZkAddr(String zkAddr) {
this.zkAddr = zkAddr;
}
public boolean isZkSwitch() {
return zkSwitch;
}
public void setZkSwitch(boolean zkSwitch) {
this.zkSwitch = zkSwitch;
}
}
... ...
package com.crossoverjie.netty.action.zk.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public final class SpringBeanFactory implements ApplicationContextAware{
private static ApplicationContext context;
public static <T> T getBean(Class<T> c){
return context.getBean(c);
}
public static <T> T getBean(String name,Class<T> clazz){
return context.getBean(name,clazz);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
}
}
... ...
package com.crossoverjie.netty.action.zk.util;
import com.alibaba.fastjson.JSON;
import com.crossoverjie.netty.action.zk.cache.ServerCache;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Function: Zookeeper 工具
*
* @author crossoverJie
* Date: 2018/8/19 00:33
* @since JDK 1.8
*/
@Component
public class ZKUtil {
private static Logger logger = LoggerFactory.getLogger(ZKUtil.class);
@Autowired
private ZkClient zkClient;
@Autowired
private AppConfiguration appConfiguration ;
@Autowired
private ServerCache serverCache ;
/**
* 创建父级节点
*/
public void createRootNode(){
boolean exists = zkClient.exists(appConfiguration.getZkRoot());
if (exists){
return;
}
//创建 root
zkClient.createPersistent(appConfiguration.getZkRoot()) ;
}
/**
* 写入指定节点 临时目录
*
* @param path
* @param value
*/
public void createNode(String path, String value) {
zkClient.createEphemeral(path, value);
}
/**
* 监听事件
*
* @param path
*/
public void subscribeEvent(String path) {
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
logger.info("清除/更新本地缓存 parentPath=【{}】,currentChilds=【{}】", parentPath,currentChilds.toString());
//更新所有缓存/先删除 再新增
serverCache.updateCache(currentChilds) ;
}
});
}
/**
* 获取所有注册节点
* @return
*/
public List<String> getAllNode(){
List<String> children = zkClient.getChildren("/route");
logger.info("查询所有节点成功=【{}】", JSON.toJSONString(children));
return children;
}
/**
* 关闭 ZK
*/
public void closeZK() {
logger.info("正在关闭 ZK");
zkClient.close();
logger.info("关闭 ZK 成功");
}
}
... ...
... ... @@ -6,4 +6,15 @@ server.port=8083
# 是否打开swagger
swagger.enable = true
logging.level.root=info
\ No newline at end of file
logging.level.root=info
# 是否注册 zk
app.zk.switch=true
# zk 地址
app.zk.addr=10.1.241.103:2181
# zk 注册根节点
app.zk.root=/route
\ No newline at end of file
... ...