作者 crossoverJie

:sparkles: 客户端可在控制台发送消息

1 package com.crossoverjie.netty.action.client; 1 package com.crossoverjie.netty.action.client;
2 2
  3 +import com.crossoverjie.netty.action.client.scanner.Scan;
3 import org.slf4j.Logger; 4 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory; 5 import org.slf4j.LoggerFactory;
  6 +import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.boot.CommandLineRunner; 7 import org.springframework.boot.CommandLineRunner;
6 import org.springframework.boot.SpringApplication; 8 import org.springframework.boot.SpringApplication;
7 import org.springframework.boot.autoconfigure.SpringBootApplication; 9 import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -14,6 +16,8 @@ public class HeartbeatClientApplication implements CommandLineRunner{ @@ -14,6 +16,8 @@ public class HeartbeatClientApplication implements CommandLineRunner{
14 16
15 private final static Logger LOGGER = LoggerFactory.getLogger(HeartbeatClientApplication.class); 17 private final static Logger LOGGER = LoggerFactory.getLogger(HeartbeatClientApplication.class);
16 18
  19 + @Autowired
  20 + private HeartbeatClient heartbeatClient ;
17 21
18 22
19 public static void main(String[] args) { 23 public static void main(String[] args) {
@@ -23,5 +27,9 @@ public class HeartbeatClientApplication implements CommandLineRunner{ @@ -23,5 +27,9 @@ public class HeartbeatClientApplication implements CommandLineRunner{
23 27
24 @Override 28 @Override
25 public void run(String... args) throws Exception { 29 public void run(String... args) throws Exception {
  30 + Scan scan = new Scan(heartbeatClient) ;
  31 + Thread thread = new Thread(scan);
  32 + thread.setName("scan-thread");
  33 + thread.start();
26 } 34 }
27 } 35 }
1 package com.crossoverjie.netty.action.client.handle; 1 package com.crossoverjie.netty.action.client.handle;
2 2
  3 +import com.crossoverjie.netty.action.common.protocol.BaseRequestProto;
3 import com.crossoverjie.netty.action.common.protocol.BaseResponseProto; 4 import com.crossoverjie.netty.action.common.protocol.BaseResponseProto;
  5 +import io.netty.channel.ChannelFutureListener;
4 import io.netty.channel.ChannelHandlerContext; 6 import io.netty.channel.ChannelHandlerContext;
5 import io.netty.channel.SimpleChannelInboundHandler; 7 import io.netty.channel.SimpleChannelInboundHandler;
  8 +import io.netty.handler.timeout.IdleState;
  9 +import io.netty.handler.timeout.IdleStateEvent;
6 import org.slf4j.Logger; 10 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory; 11 import org.slf4j.LoggerFactory;
8 12
@@ -17,23 +21,26 @@ public class EchoClientHandle extends SimpleChannelInboundHandler<BaseResponsePr @@ -17,23 +21,26 @@ public class EchoClientHandle extends SimpleChannelInboundHandler<BaseResponsePr
17 21
18 private final static Logger LOGGER = LoggerFactory.getLogger(EchoClientHandle.class); 22 private final static Logger LOGGER = LoggerFactory.getLogger(EchoClientHandle.class);
19 23
  24 + private final BaseRequestProto.RequestProtocol heart = BaseRequestProto.RequestProtocol.newBuilder()
  25 + .setRequestId(99999999)
  26 + .setReqMsg("ping")
  27 + .build();
20 28
21 29
22 @Override 30 @Override
23 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 31 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
24 32
25 - /*if (evt instanceof IdleStateEvent){ 33 + if (evt instanceof IdleStateEvent){
26 IdleStateEvent idleStateEvent = (IdleStateEvent) evt ; 34 IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
27 35
28 if (idleStateEvent.state() == IdleState.WRITER_IDLE){ 36 if (idleStateEvent.state() == IdleState.WRITER_IDLE){
29 LOGGER.info("已经 10 秒没有发送信息!"); 37 LOGGER.info("已经 10 秒没有发送信息!");
30 //向服务端发送消息 38 //向服务端发送消息
31 - CustomProtocol heartBeat = SpringBeanFactory.getBean("heartBeat", CustomProtocol.class);  
32 - ctx.writeAndFlush(heartBeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ; 39 + ctx.writeAndFlush(heart).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
33 } 40 }
34 41
35 42
36 - }*/ 43 + }
37 44
38 super.userEventTriggered(ctx, evt); 45 super.userEventTriggered(ctx, evt);
39 } 46 }
  1 +package com.crossoverjie.netty.action.client.scanner;
  2 +
  3 +import com.crossoverjie.netty.action.client.HeartbeatClient;
  4 +import com.crossoverjie.netty.action.client.vo.req.GoogleProtocolVO;
  5 +import org.slf4j.Logger;
  6 +import org.slf4j.LoggerFactory;
  7 +
  8 +import java.util.Scanner;
  9 +
  10 +/**
  11 + * Function:
  12 + *
  13 + * @author crossoverJie
  14 + * Date: 2018/12/21 16:44
  15 + * @since JDK 1.8
  16 + */
  17 +public class Scan implements Runnable{
  18 +
  19 + private final static Logger LOGGER = LoggerFactory.getLogger(Scan.class);
  20 +
  21 + private HeartbeatClient heartbeatClient ;
  22 +
  23 + public Scan(HeartbeatClient heartbeatClient) {
  24 + this.heartbeatClient = heartbeatClient;
  25 + }
  26 +
  27 + @Override
  28 + public void run() {
  29 + Scanner sc = new Scanner(System.in);
  30 + String[] totalMsg ;
  31 + GoogleProtocolVO vo ;
  32 + while (true){
  33 + String msg = sc.nextLine() ;
  34 +
  35 + totalMsg = msg.split(" ");
  36 +
  37 + vo = new GoogleProtocolVO() ;
  38 + vo.setRequestId(Integer.parseInt(totalMsg[0]));
  39 + vo.setMsg(totalMsg[1]);
  40 + heartbeatClient.sendGoogleProtocolMsg(vo) ;
  41 + LOGGER.info("scan =[{}]",msg);
  42 + }
  43 + }
  44 +}
@@ -2,9 +2,7 @@ package com.crossoverjie.netty.action.controller; @@ -2,9 +2,7 @@ package com.crossoverjie.netty.action.controller;
2 2
3 import com.crossoverjie.netty.action.common.constant.Constants; 3 import com.crossoverjie.netty.action.common.constant.Constants;
4 import com.crossoverjie.netty.action.common.enums.StatusEnum; 4 import com.crossoverjie.netty.action.common.enums.StatusEnum;
5 -import com.crossoverjie.netty.action.common.pojo.CustomProtocol;  
6 import com.crossoverjie.netty.action.common.res.BaseResponse; 5 import com.crossoverjie.netty.action.common.res.BaseResponse;
7 -import com.crossoverjie.netty.action.common.util.RandomUtil;  
8 import com.crossoverjie.netty.action.server.HeartBeatServer; 6 import com.crossoverjie.netty.action.server.HeartBeatServer;
9 import com.crossoverjie.netty.action.vo.req.SendMsgReqVO; 7 import com.crossoverjie.netty.action.vo.req.SendMsgReqVO;
10 import com.crossoverjie.netty.action.vo.res.SendMsgResVO; 8 import com.crossoverjie.netty.action.vo.res.SendMsgResVO;
@@ -16,8 +14,6 @@ import org.springframework.web.bind.annotation.RequestBody; @@ -16,8 +14,6 @@ import org.springframework.web.bind.annotation.RequestBody;
16 import org.springframework.web.bind.annotation.RequestMapping; 14 import org.springframework.web.bind.annotation.RequestMapping;
17 import org.springframework.web.bind.annotation.ResponseBody; 15 import org.springframework.web.bind.annotation.ResponseBody;
18 16
19 -import java.util.UUID;  
20 -  
21 /** 17 /**
22 * Function: 18 * Function:
23 * 19 *
@@ -49,7 +45,7 @@ public class IndexController { @@ -49,7 +45,7 @@ public class IndexController {
49 @ResponseBody 45 @ResponseBody
50 public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){ 46 public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){
51 BaseResponse<SendMsgResVO> res = new BaseResponse(); 47 BaseResponse<SendMsgResVO> res = new BaseResponse();
52 - heartbeatClient.sendMsg(new CustomProtocol(sendMsgReqVO.getId(),sendMsgReqVO.getMsg())) ; 48 + heartbeatClient.sendGoogleProtoMsg(sendMsgReqVO) ;
53 49
54 counterService.increment(Constants.COUNTER_SERVER_PUSH_COUNT); 50 counterService.increment(Constants.COUNTER_SERVER_PUSH_COUNT);
55 51
@@ -69,6 +69,6 @@ public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<BaseReque @@ -69,6 +69,6 @@ public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<BaseReque
69 } 69 }
70 70
71 //保存客户端与 Channel 之间的关系 71 //保存客户端与 Channel 之间的关系
72 - //NettySocketHolder.put(CustomProtocolProtocol.getId(),(NioSocketChannel)ctx.channel()) ; 72 + NettySocketHolder.put((long) msg.getRequestId(),(NioSocketChannel)ctx.channel()) ;
73 } 73 }
74 } 74 }
1 package com.crossoverjie.netty.action.server; 1 package com.crossoverjie.netty.action.server;
2 2
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 -import com.crossoverjie.netty.action.init.HeartbeatInitializer;  
5 import com.crossoverjie.netty.action.common.pojo.CustomProtocol; 4 import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
  5 +import com.crossoverjie.netty.action.common.protocol.BaseRequestProto;
  6 +import com.crossoverjie.netty.action.init.HeartbeatInitializer;
6 import com.crossoverjie.netty.action.util.NettySocketHolder; 7 import com.crossoverjie.netty.action.util.NettySocketHolder;
  8 +import com.crossoverjie.netty.action.vo.req.SendMsgReqVO;
7 import io.netty.bootstrap.ServerBootstrap; 9 import io.netty.bootstrap.ServerBootstrap;
8 import io.netty.buffer.Unpooled; 10 import io.netty.buffer.Unpooled;
9 -import io.netty.channel.*; 11 +import io.netty.channel.ChannelFuture;
  12 +import io.netty.channel.ChannelFutureListener;
  13 +import io.netty.channel.ChannelOption;
  14 +import io.netty.channel.EventLoopGroup;
10 import io.netty.channel.nio.NioEventLoopGroup; 15 import io.netty.channel.nio.NioEventLoopGroup;
11 import io.netty.channel.socket.nio.NioServerSocketChannel; 16 import io.netty.channel.socket.nio.NioServerSocketChannel;
12 import io.netty.channel.socket.nio.NioSocketChannel; 17 import io.netty.channel.socket.nio.NioSocketChannel;
@@ -91,4 +96,24 @@ public class HeartBeatServer { @@ -91,4 +96,24 @@ public class HeartBeatServer {
91 future.addListener((ChannelFutureListener) channelFuture -> 96 future.addListener((ChannelFutureListener) channelFuture ->
92 LOGGER.info("服务端手动发消息成功={}", JSON.toJSONString(customProtocol))); 97 LOGGER.info("服务端手动发消息成功={}", JSON.toJSONString(customProtocol)));
93 } 98 }
  99 +
  100 + /**
  101 + * 发送 Google Protocol 编码消息
  102 + * @param sendMsgReqVO 消息
  103 + */
  104 + public void sendGoogleProtoMsg(SendMsgReqVO sendMsgReqVO){
  105 + NioSocketChannel socketChannel = NettySocketHolder.get(sendMsgReqVO.getId());
  106 +
  107 + if (null == socketChannel) {
  108 + throw new NullPointerException("没有[" + sendMsgReqVO.getId() + "]的socketChannel");
  109 + }
  110 + BaseRequestProto.RequestProtocol protocol = BaseRequestProto.RequestProtocol.newBuilder()
  111 + .setRequestId((int) sendMsgReqVO.getId())
  112 + .setReqMsg(sendMsgReqVO.getMsg())
  113 + .build();
  114 +
  115 + ChannelFuture future = socketChannel.writeAndFlush(protocol);
  116 + future.addListener((ChannelFutureListener) channelFuture ->
  117 + LOGGER.info("服务端手动发送 Google Protocol 成功={}", sendMsgReqVO.toString()));
  118 + }
94 } 119 }
@@ -14,7 +14,7 @@ logging.level.root=info @@ -14,7 +14,7 @@ logging.level.root=info
14 app.zk.switch=true 14 app.zk.switch=true
15 15
16 # zk 地址 16 # zk 地址
17 -app.zk.addr=127.0.0.1:2181 17 +app.zk.addr=47.98.194.60:2181
18 18
19 # zk 注册根节点 19 # zk 注册根节点
20 app.zk.root=/route 20 app.zk.root=/route