作者 crossoverJie

:sparkles: Introducing new features.后端和客户端通信

@@ -9,15 +9,15 @@ package com.crossoverjie.netty.action.common.pojo; @@ -9,15 +9,15 @@ package com.crossoverjie.netty.action.common.pojo;
9 */ 9 */
10 public class CustomProtocol { 10 public class CustomProtocol {
11 11
12 - private long header ; 12 + private long id ;
13 private String content ; 13 private String content ;
14 14
15 - public long getHeader() {  
16 - return header; 15 + public long getId() {
  16 + return id;
17 } 17 }
18 18
19 - public void setHeader(long header) {  
20 - this.header = header; 19 + public void setId(long id) {
  20 + this.id = id;
21 } 21 }
22 22
23 public String getContent() { 23 public String getContent() {
@@ -28,18 +28,18 @@ public class CustomProtocol { @@ -28,18 +28,18 @@ public class CustomProtocol {
28 this.content = content; 28 this.content = content;
29 } 29 }
30 30
31 - public CustomProtocol(long header, String content) {  
32 - this.header = header;  
33 - this.content = content; 31 + public CustomProtocol() {
34 } 32 }
35 33
36 - public CustomProtocol() { 34 + public CustomProtocol(long id, String content) {
  35 + this.id = id;
  36 + this.content = content;
37 } 37 }
38 38
39 @Override 39 @Override
40 public String toString() { 40 public String toString() {
41 return "CustomProtocol{" + 41 return "CustomProtocol{" +
42 - "header=" + header + 42 + "id=" + id +
43 ", content='" + content + '\'' + 43 ", content='" + content + '\'' +
44 '}'; 44 '}';
45 } 45 }
@@ -38,7 +38,7 @@ public class IndexController { @@ -38,7 +38,7 @@ public class IndexController {
38 @ResponseBody 38 @ResponseBody
39 public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){ 39 public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){
40 BaseResponse<SendMsgResVO> res = new BaseResponse(); 40 BaseResponse<SendMsgResVO> res = new BaseResponse();
41 - heartbeatClient.sendMsg(new CustomProtocol(RandomUtil.getRandom(),sendMsgReqVO.getMsg())) ; 41 + heartbeatClient.sendMsg(new CustomProtocol(sendMsgReqVO.getId(),sendMsgReqVO.getMsg())) ;
42 42
43 SendMsgResVO sendMsgResVO = new SendMsgResVO() ; 43 SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
44 sendMsgResVO.setMsg("OK") ; 44 sendMsgResVO.setMsg("OK") ;
@@ -16,7 +16,7 @@ public class HeartbeatEncode extends MessageToByteEncoder<CustomProtocol> { @@ -16,7 +16,7 @@ public class HeartbeatEncode extends MessageToByteEncoder<CustomProtocol> {
16 @Override 16 @Override
17 protected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) throws Exception { 17 protected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) throws Exception {
18 18
19 - out.writeLong(msg.getHeader()) ; 19 + out.writeLong(msg.getId()) ;
20 out.writeBytes(msg.getContent().getBytes()) ; 20 out.writeBytes(msg.getContent().getBytes()) ;
21 21
22 } 22 }
@@ -18,6 +18,10 @@ public class SendMsgReqVO extends BaseRequest { @@ -18,6 +18,10 @@ public class SendMsgReqVO extends BaseRequest {
18 @ApiModelProperty(required = true, value = "msg", example = "hello") 18 @ApiModelProperty(required = true, value = "msg", example = "hello")
19 private String msg ; 19 private String msg ;
20 20
  21 + @NotNull(message = "id 不能为空")
  22 + @ApiModelProperty(required = true, value = "id", example = "11")
  23 + private long id ;
  24 +
21 public String getMsg() { 25 public String getMsg() {
22 return msg; 26 return msg;
23 } 27 }
@@ -26,4 +30,11 @@ public class SendMsgReqVO extends BaseRequest { @@ -26,4 +30,11 @@ public class SendMsgReqVO extends BaseRequest {
26 this.msg = msg; 30 this.msg = msg;
27 } 31 }
28 32
  33 + public long getId() {
  34 + return id;
  35 + }
  36 +
  37 + public void setId(long id) {
  38 + this.id = id;
  39 + }
29 } 40 }
@@ -40,7 +40,7 @@ public class IndexController { @@ -40,7 +40,7 @@ public class IndexController {
40 @ResponseBody 40 @ResponseBody
41 public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){ 41 public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){
42 BaseResponse<SendMsgResVO> res = new BaseResponse(); 42 BaseResponse<SendMsgResVO> res = new BaseResponse();
43 - heartbeatClient.sendMsg(new CustomProtocol(RandomUtil.getRandom(),sendMsgReqVO.getMsg())) ; 43 + heartbeatClient.sendMsg(new CustomProtocol(sendMsgReqVO.getId(),sendMsgReqVO.getMsg())) ;
44 44
45 SendMsgResVO sendMsgResVO = new SendMsgResVO() ; 45 SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
46 sendMsgResVO.setMsg("OK") ; 46 sendMsgResVO.setMsg("OK") ;
@@ -18,13 +18,13 @@ public class HeartbeatDecoder extends ByteToMessageDecoder { @@ -18,13 +18,13 @@ public class HeartbeatDecoder extends ByteToMessageDecoder {
18 @Override 18 @Override
19 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 19 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
20 20
21 - long header = in.readLong() ; 21 + long id = in.readLong() ;
22 byte[] bytes = new byte[in.readableBytes()] ; 22 byte[] bytes = new byte[in.readableBytes()] ;
23 in.readBytes(bytes) ; 23 in.readBytes(bytes) ;
24 String content = new String(bytes) ; 24 String content = new String(bytes) ;
25 25
26 CustomProtocol customProtocol = new CustomProtocol() ; 26 CustomProtocol customProtocol = new CustomProtocol() ;
27 - customProtocol.setHeader(header); 27 + customProtocol.setId(id);
28 customProtocol.setContent(content) ; 28 customProtocol.setContent(content) ;
29 out.add(customProtocol) ; 29 out.add(customProtocol) ;
30 30
1 package com.crossoverjie.netty.action.handle; 1 package com.crossoverjie.netty.action.handle;
2 2
3 import com.crossoverjie.netty.action.common.pojo.CustomProtocol; 3 import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
  4 +import com.crossoverjie.netty.action.common.util.RandomUtil;
  5 +import com.crossoverjie.netty.action.util.NettySocketHolder;
4 import io.netty.buffer.Unpooled; 6 import io.netty.buffer.Unpooled;
5 import io.netty.channel.ChannelFutureListener; 7 import io.netty.channel.ChannelFutureListener;
6 import io.netty.channel.ChannelHandlerContext; 8 import io.netty.channel.ChannelHandlerContext;
7 import io.netty.channel.SimpleChannelInboundHandler; 9 import io.netty.channel.SimpleChannelInboundHandler;
  10 +import io.netty.channel.socket.SocketChannel;
  11 +import io.netty.channel.socket.nio.NioServerSocketChannel;
  12 +import io.netty.channel.socket.nio.NioSocketChannel;
8 import io.netty.handler.timeout.IdleState; 13 import io.netty.handler.timeout.IdleState;
9 import io.netty.handler.timeout.IdleStateEvent; 14 import io.netty.handler.timeout.IdleStateEvent;
10 import io.netty.util.CharsetUtil; 15 import io.netty.util.CharsetUtil;
@@ -32,7 +37,7 @@ public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<CustomPro @@ -32,7 +37,7 @@ public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<CustomPro
32 if (idleStateEvent.state() == IdleState.READER_IDLE){ 37 if (idleStateEvent.state() == IdleState.READER_IDLE){
33 LOGGER.info("已经5秒没有收到信息!"); 38 LOGGER.info("已经5秒没有收到信息!");
34 //向客户端发送消息 39 //向客户端发送消息
35 - CustomProtocol customProtocol = new CustomProtocol(12345L,"pong") ; 40 + CustomProtocol customProtocol = new CustomProtocol(RandomUtil.getRandom(),"pong") ;
36 ctx.writeAndFlush(Unpooled.copiedBuffer(customProtocol.toString(), CharsetUtil.UTF_8)) 41 ctx.writeAndFlush(Unpooled.copiedBuffer(customProtocol.toString(), CharsetUtil.UTF_8))
37 .addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ; 42 .addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
38 } 43 }
@@ -44,16 +49,9 @@ public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<CustomPro @@ -44,16 +49,9 @@ public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<CustomPro
44 } 49 }
45 50
46 @Override 51 @Override
47 - public void channelActive(ChannelHandlerContext ctx) throws Exception {  
48 - }  
49 -  
50 - @Override  
51 protected void channelRead0(ChannelHandlerContext ctx, CustomProtocol customProtocol) throws Exception { 52 protected void channelRead0(ChannelHandlerContext ctx, CustomProtocol customProtocol) throws Exception {
52 LOGGER.info("customProtocol={}", customProtocol); 53 LOGGER.info("customProtocol={}", customProtocol);
53 54
54 - //手动处理数据并返回  
55 - customProtocol.setHeader(customProtocol.getHeader() + 1000);  
56 - customProtocol.setContent(customProtocol.getContent() + "asdfg");  
57 - ctx.writeAndFlush(Unpooled.copiedBuffer(customProtocol.toString(), CharsetUtil.UTF_8)); 55 + NettySocketHolder.put(customProtocol.getId(),(NioSocketChannel)ctx.channel()) ;
58 } 56 }
59 } 57 }
@@ -3,11 +3,15 @@ package com.crossoverjie.netty.action.server; @@ -3,11 +3,15 @@ package com.crossoverjie.netty.action.server;
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 import com.crossoverjie.netty.action.channel.init.HeartbeatInitializer; 4 import com.crossoverjie.netty.action.channel.init.HeartbeatInitializer;
5 import com.crossoverjie.netty.action.common.pojo.CustomProtocol; 5 import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
  6 +import com.crossoverjie.netty.action.util.NettySocketHolder;
6 import io.netty.bootstrap.ServerBootstrap; 7 import io.netty.bootstrap.ServerBootstrap;
  8 +import io.netty.buffer.Unpooled;
7 import io.netty.channel.*; 9 import io.netty.channel.*;
8 import io.netty.channel.nio.NioEventLoopGroup; 10 import io.netty.channel.nio.NioEventLoopGroup;
9 import io.netty.channel.socket.SocketChannel; 11 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel; 12 import io.netty.channel.socket.nio.NioServerSocketChannel;
  13 +import io.netty.channel.socket.nio.NioSocketChannel;
  14 +import io.netty.util.CharsetUtil;
11 import org.slf4j.Logger; 15 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory; 16 import org.slf4j.LoggerFactory;
13 import org.springframework.beans.factory.annotation.Value; 17 import org.springframework.beans.factory.annotation.Value;
@@ -78,8 +82,13 @@ public class HeartBeatServer { @@ -78,8 +82,13 @@ public class HeartBeatServer {
78 * @param customProtocol 82 * @param customProtocol
79 */ 83 */
80 public void sendMsg(CustomProtocol customProtocol){ 84 public void sendMsg(CustomProtocol customProtocol){
81 - ChannelFuture future = channel.writeAndFlush(customProtocol); 85 + NioSocketChannel socketChannel = NettySocketHolder.get(customProtocol.getId());
82 86
  87 + if (null == socketChannel){
  88 + throw new NullPointerException("没有["+customProtocol.getId()+"]的socketChannel") ;
  89 + }
  90 +
  91 + ChannelFuture future = socketChannel.writeAndFlush(Unpooled.copiedBuffer(customProtocol.toString(), CharsetUtil.UTF_8));
83 future.addListener((ChannelFutureListener) channelFuture -> 92 future.addListener((ChannelFutureListener) channelFuture ->
84 LOGGER.info("服务端手动发消息成功={}", JSON.toJSONString(customProtocol))); 93 LOGGER.info("服务端手动发消息成功={}", JSON.toJSONString(customProtocol)));
85 } 94 }
  1 +package com.crossoverjie.netty.action.util;
  2 +
  3 +import io.netty.channel.socket.SocketChannel;
  4 +import io.netty.channel.socket.nio.NioServerSocketChannel;
  5 +import io.netty.channel.socket.nio.NioSocketChannel;
  6 +
  7 +import java.util.Map;
  8 +import java.util.concurrent.ConcurrentHashMap;
  9 +
  10 +/**
  11 + * Function:
  12 + *
  13 + * @author crossoverJie
  14 + * Date: 22/05/2018 18:33
  15 + * @since JDK 1.8
  16 + */
  17 +public class NettySocketHolder {
  18 + private static final Map<Long,NioSocketChannel> MAP = new ConcurrentHashMap<>(16) ;
  19 +
  20 + public static void put(Long id,NioSocketChannel socketChannel){
  21 + MAP.put(id,socketChannel) ;
  22 + }
  23 +
  24 + public static NioSocketChannel get(Long id) {
  25 + return MAP.get(id);
  26 + }
  27 +}
  1 +package com.crossoverjie.netty.action.vo.req;
  2 +
  3 +import com.crossoverjie.netty.action.common.req.BaseRequest;
  4 +import io.swagger.annotations.ApiModelProperty;
  5 +
  6 +import javax.validation.constraints.NotNull;
  7 +
  8 +/**
  9 + * Function:
  10 + *
  11 + * @author crossoverJie
  12 + * Date: 2018/05/21 15:56
  13 + * @since JDK 1.8
  14 + */
  15 +public class SendMsgReqVO extends BaseRequest {
  16 +
  17 + @NotNull(message = "msg 不能为空")
  18 + @ApiModelProperty(required = true, value = "msg", example = "hello")
  19 + private String msg ;
  20 +
  21 + @NotNull(message = "id 不能为空")
  22 + @ApiModelProperty(required = true, value = "id", example = "11")
  23 + private long id ;
  24 +
  25 + public String getMsg() {
  26 + return msg;
  27 + }
  28 +
  29 + public void setMsg(String msg) {
  30 + this.msg = msg;
  31 + }
  32 +
  33 + public long getId() {
  34 + return id;
  35 + }
  36 +
  37 + public void setId(long id) {
  38 + this.id = id;
  39 + }
  40 +}
  1 +package com.crossoverjie.netty.action.vo.res;
  2 +
  3 +/**
  4 + * Function:
  5 + *
  6 + * @author crossoverJie
  7 + * Date: 2017/6/26 15:43
  8 + * @since JDK 1.8
  9 + */
  10 +public class SendMsgResVO {
  11 + private String msg ;
  12 +
  13 + public String getMsg() {
  14 + return msg;
  15 + }
  16 +
  17 + public void setMsg(String msg) {
  18 + this.msg = msg;
  19 + }
  20 +}