作者 crossoverJie

:sparkles: Introducing new features.

... ... @@ -9,7 +9,7 @@ Netty 实战相关
* [x] [Netty(一) SpringBoot 整合长连接心跳机制](https://crossoverjie.top/2018/05/24/netty/Netty(1)TCP-Heartbeat/)
* [x] [Netty(二) 从线程模型的角度看 Netty 为什么是高性能的?](https://crossoverjie.top/2018/07/04/netty/Netty(2)Thread-model/)
* [ ] 如何解决 TCP 的拆包、粘包?
* [x] [Netty(三) 什么是 TCP 拆、粘包?如何解决?](https://crossoverjie.top/2018/08/03/netty/Netty(3)TCP-Sticky/)
## 安装
... ...
package com.crossoverjie.netty.action;
import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
import com.crossoverjie.netty.action.init.CustomerHandleInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* Function:
*
* @author crossoverJie
* Date: 22/05/2018 14:19
* @since JDK 1.8
*/
@Component
public class HeartbeatClient {
private final static Logger LOGGER = LoggerFactory.getLogger(HeartbeatClient.class);
private EventLoopGroup group = new NioEventLoopGroup();
@Value("${netty.server.port}")
private int nettyPort;
@Value("${netty.server.host}")
private String host;
private SocketChannel channel ;
@PostConstruct
public void start() throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new CustomerHandleInitializer())
;
ChannelFuture future = bootstrap.connect(host, nettyPort).sync();
if (future.isSuccess()) {
LOGGER.info("启动 Netty 成功");
}
channel = (SocketChannel) future.channel();
}
/**
* 发送消息
* @param customProtocol
*/
public void sendMsg(CustomProtocol customProtocol){
channel.writeAndFlush(customProtocol) ;
}
}
... ...
package com.crossoverjie.netty.action;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author crossoverJie
*/
@SpringBootApplication
public class HeartbeatClientApplication implements CommandLineRunner{
private final static Logger LOGGER = LoggerFactory.getLogger(HeartbeatClientApplication.class);
public static void main(String[] args) {
SpringApplication.run(HeartbeatClientApplication.class, args);
LOGGER.info("启动 Client 成功");
}
@Override
public void run(String... args) throws Exception {
}
}
\ No newline at end of file
... ...
package com.crossoverjie.netty.action.config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
/** 是否打开swagger **/
@ConditionalOnExpression("'${swagger.enable}' == 'true'")
public class SwaggerConfig {
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
.apis(RequestHandlerSelectors.basePackage("com.crossoverjie.netty.action.client.controller"))
.paths(PathSelectors.any())
.build();
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("sbc order api")
.description("sbc order api")
.termsOfServiceUrl("http://crossoverJie.top")
.contact("crossoverJie")
.version("1.0.0")
.build();
}
}
\ No newline at end of file
... ...
package com.crossoverjie.netty.action.controller;
import com.crossoverjie.netty.action.HeartbeatClient;
import com.crossoverjie.netty.action.vo.req.SendMsgReqVO;
import com.crossoverjie.netty.action.vo.res.SendMsgResVO;
import com.crossoverjie.netty.action.common.enums.StatusEnum;
import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
import com.crossoverjie.netty.action.common.res.BaseResponse;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* Function:
*
* @author crossoverJie
* Date: 22/05/2018 14:46
* @since JDK 1.8
*/
@Controller
@RequestMapping("/")
public class IndexController {
@Autowired
private HeartbeatClient heartbeatClient ;
/**
* 向服务端发消息
* @param sendMsgReqVO
* @return
*/
@ApiOperation("发送消息")
@RequestMapping("sendMsg")
@ResponseBody
public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){
BaseResponse<SendMsgResVO> res = new BaseResponse();
heartbeatClient.sendMsg(new CustomProtocol(122,sendMsgReqVO.getMsg())) ;
SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
sendMsgResVO.setMsg("OK") ;
res.setCode(StatusEnum.SUCCESS.getCode()) ;
res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
res.setDataBody(sendMsgResVO) ;
return res ;
}
}
... ...
package com.crossoverjie.netty.action.encode;
import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* Function:编码
*
* @author crossoverJie
* Date: 17/05/2018 19:07
* @since JDK 1.8
*/
public class HeartbeatEncode extends MessageToByteEncoder<CustomProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) throws Exception {
out.writeLong(msg.getHeader()) ;
out.writeBytes(msg.getContent().getBytes()) ;
}
}
... ...
package com.crossoverjie.netty.action.handle;
import com.crossoverjie.netty.action.common.pojo.CustomProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Function:
*
* @author crossoverJie
* Date: 16/02/2018 18:09
* @since JDK 1.8
*/
public class EchoClientHandle extends SimpleChannelInboundHandler<ByteBuf> {
private final static Logger LOGGER = LoggerFactory.getLogger(EchoClientHandle.class);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
if (idleStateEvent.state() == IdleState.WRITER_IDLE){
LOGGER.info("已经 10 秒没有发送信息!");
//向客户端发送消息
CustomProtocol customProtocol = new CustomProtocol(45678L,"ping") ;
ctx.writeAndFlush(Unpooled.copiedBuffer(customProtocol.toString(), CharsetUtil.UTF_8)) ;
}
}
super.userEventTriggered(ctx, evt);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//客户端和服务端建立连接时调用
LOGGER.info("已经建立了联系。。");
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) throws Exception {
//从服务端收到消息时被调用
LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//异常时断开连接
cause.printStackTrace() ;
ctx.close() ;
}
}
... ...
package com.crossoverjie.netty.action.init;
import com.crossoverjie.netty.action.encode.HeartbeatEncode;
import com.crossoverjie.netty.action.handle.EchoClientHandle;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.timeout.IdleStateHandler;
/**
* Function:
*
* @author crossoverJie
* Date: 23/02/2018 22:47
* @since JDK 1.8
*/
public class CustomerHandleInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//10 秒没发送消息
.addLast(new IdleStateHandler(0, 10, 0))
.addLast(new HeartbeatEncode())
.addLast(new EchoClientHandle())
;
}
}
... ...
package com.crossoverjie.netty.action.vo.req;
import com.crossoverjie.netty.action.common.req.BaseRequest;
import io.swagger.annotations.ApiModelProperty;
import javax.validation.constraints.NotNull;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/05/21 15:56
* @since JDK 1.8
*/
public class SendMsgReqVO extends BaseRequest {
@NotNull(message = "msg 不能为空")
@ApiModelProperty(required = true, value = "msg", example = "hello")
private String msg ;
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
... ...
package com.crossoverjie.netty.action.vo.res;
/**
* Function:
*
* @author crossoverJie
* Date: 2017/6/26 15:43
* @since JDK 1.8
*/
public class SendMsgResVO {
private String msg ;
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
... ...