作者 crossoverJie

:sparkles: Introducing new features.修改为发送字符串

... ... @@ -4,6 +4,8 @@ import com.alibaba.fastjson.JSON;
import com.crossoverjie.netty.action.client.init.CustomerHandleInitializer;
import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
... ... @@ -66,4 +68,17 @@ public class HeartbeatClient {
LOGGER.info("客户端手动发消息成功={}", JSON.toJSONString(customProtocol)));
}
/**
* 发送消息字符串
*
* @param msg
*/
public void sendStringMsg(String msg) {
ByteBuf message = Unpooled.buffer(msg.getBytes().length) ;
message.writeBytes(msg.getBytes()) ;
ChannelFuture future = channel.writeAndFlush(message);
future.addListener((ChannelFutureListener) channelFuture ->
LOGGER.info("客户端手动发消息成功={}", msg));
}
}
... ...
... ... @@ -7,7 +7,6 @@ import com.crossoverjie.netty.action.common.constant.Constants;
import com.crossoverjie.netty.action.common.enums.StatusEnum;
import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
import com.crossoverjie.netty.action.common.res.BaseResponse;
import com.crossoverjie.netty.action.common.util.RandomUtil;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.metrics.CounterService;
... ... @@ -58,4 +57,27 @@ public class IndexController {
res.setDataBody(sendMsgResVO) ;
return res ;
}
/**
* 向服务端发消息 字符串
* @param msg
* @return
*/
@ApiOperation("客户端发送消息,字符串")
@RequestMapping("sendStringMsg")
@ResponseBody
public BaseResponse<SendMsgResVO> sendStringMsg(@RequestBody String msg){
BaseResponse<SendMsgResVO> res = new BaseResponse();
heartbeatClient.sendStringMsg(msg) ;
// 利用 actuator 来自增
counterService.increment(Constants.COUNTER_CLIENT_PUSH_COUNT);
SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
sendMsgResVO.setMsg("OK") ;
res.setCode(StatusEnum.SUCCESS.getCode()) ;
res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
res.setDataBody(sendMsgResVO) ;
return res ;
}
}
... ...
package com.crossoverjie.netty.action.client.handle;
import com.crossoverjie.netty.action.client.util.SpringBeanFactory;
import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
... ... @@ -29,7 +23,7 @@ public class EchoClientHandle extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
/*if (evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
if (idleStateEvent.state() == IdleState.WRITER_IDLE){
... ... @@ -40,7 +34,7 @@ public class EchoClientHandle extends SimpleChannelInboundHandler<ByteBuf> {
}
}
}*/
super.userEventTriggered(ctx, evt);
}
... ...
... ... @@ -11,6 +11,7 @@ netty.server.port=11211
logging.level.root=info
# 通道 ID
channel.id=100
... ...
package com.crossoverjie.netty.action.handle;
import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
import com.crossoverjie.netty.action.common.util.RandomUtil;
import com.crossoverjie.netty.action.util.NettySocketHolder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
... ... @@ -24,7 +18,7 @@ import org.slf4j.LoggerFactory;
* Date: 17/05/2018 18:52
* @since JDK 1.8
*/
public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<CustomProtocol> {
public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<String> {
private final static Logger LOGGER = LoggerFactory.getLogger(HeartBeatSimpleHandle.class);
... ... @@ -45,7 +39,7 @@ public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<CustomPro
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
/*if (evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
if (idleStateEvent.state() == IdleState.READER_IDLE){
... ... @@ -55,16 +49,16 @@ public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<CustomPro
}
}
}*/
super.userEventTriggered(ctx, evt);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, CustomProtocol customProtocol) throws Exception {
LOGGER.info("收到customProtocol={}", customProtocol);
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
LOGGER.info("收到msg={}", msg);
//保存客户端与 Channel 之间的关系
NettySocketHolder.put(customProtocol.getId(),(NioSocketChannel)ctx.channel()) ;
//NettySocketHolder.put(CustomProtocolProtocol.getId(),(NioSocketChannel)ctx.channel()) ;
}
}
... ...
package com.crossoverjie.netty.action.init;
import com.crossoverjie.netty.action.handle.HeartBeatSimpleHandle;
import com.crossoverjie.netty.action.decoder.HeartbeatDecoder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.timeout.IdleStateHandler;
/**
... ... @@ -19,7 +20,9 @@ public class HeartbeatInitializer extends ChannelInitializer<Channel> {
ch.pipeline()
//五秒没有收到消息 将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(5, 0, 0))
.addLast(new HeartbeatDecoder())
//.addLast(new HeartbeatDecoder())
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())
.addLast(new HeartBeatSimpleHandle());
}
}
... ...