作者 crossoverJie

:sparkles: Introducing new features.服务端分开 push

正在显示 15 个修改的文件 包含 247 行增加24 行删除
package com.crossoverjie.netty.action.common.constant;
/**
* Function:常量
*
* @author crossoverJie
* Date: 28/03/2018 17:41
* @since JDK 1.8
*/
public class Constants {
/**
* 服务端手动 push 次数
*/
public static final String COUNTER_SERVER_PUSH_COUNT = "counter.server.push.count" ;
/**
* 客户端手动 push 次数
*/
public static final String COUNTER_CLIENT_PUSH_COUNT = "counter.client.push.count" ;
}
... ...
package com.crossoverjie.netty.action.common.pojo;
import java.io.Serializable;
/**
* Function:
*
... ... @@ -7,8 +9,9 @@ package com.crossoverjie.netty.action.common.pojo;
* Date: 17/05/2018 17:50
* @since JDK 1.8
*/
public class CustomProtocol {
public class CustomProtocol implements Serializable{
private static final long serialVersionUID = 4671171056588401542L;
private long id ;
private String content ;
... ...
package com.crossoverjie.netty.action.client.config;
import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Function:构建心跳使用的 bean
*
* @author crossoverJie
* Date: 24/05/2018 15:55
* @since JDK 1.8
*/
@Configuration
public class HeartBeatConfig {
@Value("${channel.id}")
private long id ;
@Bean(value = "heartBeat")
public CustomProtocol heartBeat(){
return new CustomProtocol(id,"ping") ;
}
}
... ...
... ... @@ -3,12 +3,14 @@ package com.crossoverjie.netty.action.client.controller;
import com.crossoverjie.netty.action.client.HeartbeatClient;
import com.crossoverjie.netty.action.client.vo.req.SendMsgReqVO;
import com.crossoverjie.netty.action.client.vo.res.SendMsgResVO;
import com.crossoverjie.netty.action.common.constant.Constants;
import com.crossoverjie.netty.action.common.enums.StatusEnum;
import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
import com.crossoverjie.netty.action.common.res.BaseResponse;
import com.crossoverjie.netty.action.common.util.RandomUtil;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.metrics.CounterService;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
... ... @@ -25,6 +27,12 @@ import org.springframework.web.bind.annotation.ResponseBody;
@RequestMapping("/")
public class IndexController {
/**
* 统计 service
*/
@Autowired
private CounterService counterService;
@Autowired
private HeartbeatClient heartbeatClient ;
... ... @@ -33,13 +41,15 @@ public class IndexController {
* @param sendMsgReqVO
* @return
*/
@ApiOperation("发送消息")
@ApiOperation("客户端发送消息")
@RequestMapping("sendMsg")
@ResponseBody
public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){
BaseResponse<SendMsgResVO> res = new BaseResponse();
heartbeatClient.sendMsg(new CustomProtocol(sendMsgReqVO.getId(),sendMsgReqVO.getMsg())) ;
counterService.increment(Constants.COUNTER_CLIENT_PUSH_COUNT);
SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
sendMsgResVO.setMsg("OK") ;
res.setCode(StatusEnum.SUCCESS.getCode()) ;
... ...
package com.crossoverjie.netty.action.client.handle;
import com.crossoverjie.netty.action.client.util.SpringBeanFactory;
import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
... ... @@ -23,6 +24,7 @@ public class EchoClientHandle extends SimpleChannelInboundHandler<ByteBuf> {
private final static Logger LOGGER = LoggerFactory.getLogger(EchoClientHandle.class);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
... ... @@ -31,9 +33,9 @@ public class EchoClientHandle extends SimpleChannelInboundHandler<ByteBuf> {
if (idleStateEvent.state() == IdleState.WRITER_IDLE){
LOGGER.info("已经 10 秒没有发送信息!");
//向客户端发送消息
CustomProtocol customProtocol = new CustomProtocol(45678L,"ping") ;
ctx.writeAndFlush(Unpooled.copiedBuffer(customProtocol.toString(), CharsetUtil.UTF_8)) ;
//向服务端发送消息
CustomProtocol heartBeat = SpringBeanFactory.getBean("heartBeat", CustomProtocol.class);
ctx.writeAndFlush(heartBeat) ;
}
... ...
package com.crossoverjie.netty.action.client.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public final class SpringBeanFactory implements ApplicationContextAware{
private static ApplicationContext context;
public static <T> T getBean(Class<T> c){
return context.getBean(c);
}
public static <T> T getBean(String name,Class<T> clazz){
return context.getBean(name,clazz);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
}
}
... ...
... ... @@ -11,6 +11,8 @@ netty.server.port=11211
logging.level.root=info
channel.id=100
# 关闭健康检查权限
management.security.enabled=false
... ...
package com.crossoverjie.netty.action.config;
import com.crossoverjie.netty.action.endpoint.CustomEndpoint;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Function: 监控端点配置
*
* @author crossoverJie
* Date: 17/04/2018 15:48
* @since JDK 1.8
*/
@Configuration
public class EndPointConfig {
@Value("${monitor.channel.map.key}")
private String sortList;
@Bean
public CustomEndpoint buildEndPoint(){
CustomEndpoint customEndpoint = new CustomEndpoint(sortList) ;
return customEndpoint ;
}
}
... ...
package com.crossoverjie.netty.action.controller;
import com.crossoverjie.netty.action.common.constant.Constants;
import com.crossoverjie.netty.action.common.enums.StatusEnum;
import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
import com.crossoverjie.netty.action.common.res.BaseResponse;
... ... @@ -9,6 +10,7 @@ import com.crossoverjie.netty.action.vo.req.SendMsgReqVO;
import com.crossoverjie.netty.action.vo.res.SendMsgResVO;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.metrics.CounterService;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
... ... @@ -30,18 +32,27 @@ public class IndexController {
@Autowired
private HeartBeatServer heartbeatClient ;
/**
* 统计 service
*/
@Autowired
private CounterService counterService;
/**
* 向服务端发消息
* @param sendMsgReqVO
* @return
*/
@ApiOperation("发送消息")
@ApiOperation("服务端发送消息")
@RequestMapping("sendMsg")
@ResponseBody
public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){
BaseResponse<SendMsgResVO> res = new BaseResponse();
heartbeatClient.sendMsg(new CustomProtocol(sendMsgReqVO.getId(),sendMsgReqVO.getMsg())) ;
counterService.increment(Constants.COUNTER_SERVER_PUSH_COUNT);
SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
sendMsgResVO.setMsg("OK") ;
res.setCode(StatusEnum.SUCCESS.getCode()) ;
... ...
package com.crossoverjie.netty.action.endpoint;
import com.crossoverjie.netty.action.util.NettySocketHolder;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.endpoint.AbstractEndpoint;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Function: 自定义端点监控
*
* @author crossoverJie
* Date: 17/04/2018 14:47
* @since JDK 1.8
*/
public class CustomEndpoint extends AbstractEndpoint<Map<Long,NioSocketChannel>> {
/**
* 监控端点的 访问地址
* @param id
*/
public CustomEndpoint(String id) {
//false 表示不是敏感端点
super(id, false);
}
@Override
public Map<Long, NioSocketChannel> invoke() {
return NettySocketHolder.getMAP();
}
}
... ...
... ... @@ -3,6 +3,7 @@ package com.crossoverjie.netty.action.handle;
import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
import com.crossoverjie.netty.action.common.util.RandomUtil;
import com.crossoverjie.netty.action.util.NettySocketHolder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
... ... @@ -27,6 +28,19 @@ public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<CustomPro
private final static Logger LOGGER = LoggerFactory.getLogger(HeartBeatSimpleHandle.class);
private static final ByteBuf HEART_BEAT = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(new CustomProtocol(123456L,"pong").toString(),CharsetUtil.UTF_8));
/**
* 取消绑定
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
NettySocketHolder.remove((NioSocketChannel) ctx.channel());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
... ... @@ -37,9 +51,7 @@ public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<CustomPro
if (idleStateEvent.state() == IdleState.READER_IDLE){
LOGGER.info("已经5秒没有收到信息!");
//向客户端发送消息
CustomProtocol customProtocol = new CustomProtocol(RandomUtil.getRandom(),"pong") ;
ctx.writeAndFlush(Unpooled.copiedBuffer(customProtocol.toString(), CharsetUtil.UTF_8))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
ctx.writeAndFlush(HEART_BEAT) ;
}
... ... @@ -50,7 +62,7 @@ public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<CustomPro
@Override
protected void channelRead0(ChannelHandlerContext ctx, CustomProtocol customProtocol) throws Exception {
LOGGER.info("customProtocol={}", customProtocol);
LOGGER.info("收到customProtocol={}", customProtocol);
NettySocketHolder.put(customProtocol.getId(),(NioSocketChannel)ctx.channel()) ;
}
... ...
... ... @@ -38,12 +38,13 @@ public class HeartBeatServer {
@Value("${netty.server.port}")
private int nettyPort ;
private int nettyPort;
private NioServerSocketChannel channel ;
private NioServerSocketChannel channel;
/**
* 启动 Netty
*
* @return
* @throws InterruptedException
*/
... ... @@ -55,14 +56,14 @@ public class HeartBeatServer {
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(nettyPort))
//保持长连接
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new HeartbeatInitializer());
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()){
if (future.isSuccess()) {
LOGGER.info("启动 Netty 成功");
}
channel = (NioServerSocketChannel) future.channel() ;
channel = (NioServerSocketChannel) future.channel();
}
... ... @@ -70,22 +71,23 @@ public class HeartBeatServer {
* 销毁
*/
@PreDestroy
public void destroy(){
boss.shutdownGracefully().syncUninterruptibly() ;
work.shutdownGracefully().syncUninterruptibly() ;
public void destroy() {
boss.shutdownGracefully().syncUninterruptibly();
work.shutdownGracefully().syncUninterruptibly();
LOGGER.info("关闭 Netty 成功");
}
/**
* 发送消息
*
* @param customProtocol
*/
public void sendMsg(CustomProtocol customProtocol){
public void sendMsg(CustomProtocol customProtocol) {
NioSocketChannel socketChannel = NettySocketHolder.get(customProtocol.getId());
if (null == socketChannel){
throw new NullPointerException("没有["+customProtocol.getId()+"]的socketChannel") ;
if (null == socketChannel) {
throw new NullPointerException("没有[" + customProtocol.getId() + "]的socketChannel");
}
ChannelFuture future = socketChannel.writeAndFlush(Unpooled.copiedBuffer(customProtocol.toString(), CharsetUtil.UTF_8));
... ...
... ... @@ -15,13 +15,23 @@ import java.util.concurrent.ConcurrentHashMap;
* @since JDK 1.8
*/
public class NettySocketHolder {
private static final Map<Long,NioSocketChannel> MAP = new ConcurrentHashMap<>(16) ;
private static final Map<Long, NioSocketChannel> MAP = new ConcurrentHashMap<>(16);
public static void put(Long id,NioSocketChannel socketChannel){
MAP.put(id,socketChannel) ;
public static void put(Long id, NioSocketChannel socketChannel) {
MAP.put(id, socketChannel);
}
public static NioSocketChannel get(Long id) {
return MAP.get(id);
}
public static Map<Long, NioSocketChannel> getMAP() {
return MAP;
}
public static void remove(NioSocketChannel nioSocketChannel) {
MAP.entrySet().stream().filter(entry -> entry.getValue() == nioSocketChannel).forEach(entry -> {
MAP.remove(entry.getKey());
});
}
}
... ...
package com.crossoverjie.netty.action.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public final class SpringBeanFactory implements ApplicationContextAware{
private static ApplicationContext context;
public static <T> T getBean(Class<T> c){
return context.getBean(c);
}
public static <T> T getBean(String name,Class<T> clazz){
return context.getBean(name,clazz);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
}
}
... ...
... ... @@ -15,3 +15,6 @@ logging.level.root=info
management.security.enabled=false
# SpringAdmin 地址
spring.boot.admin.url=http://127.0.0.1:8888
#自定义监控端点 key
monitor.channel.map.key=channelMap
... ...