作者 crossoverJie

:sparkles: Introducing new features.新增接口回调

1 package com.crossoverjie.cim.client.config; 1 package com.crossoverjie.cim.client.config;
2 2
  3 +import com.crossoverjie.cim.client.handle.MsgHandleCaller;
3 import com.crossoverjie.cim.common.constant.Constants; 4 import com.crossoverjie.cim.common.constant.Constants;
4 import com.crossoverjie.cim.common.protocol.CIMRequestProto; 5 import com.crossoverjie.cim.common.protocol.CIMRequestProto;
  6 +import com.google.common.util.concurrent.ThreadFactoryBuilder;
5 import okhttp3.OkHttpClient; 7 import okhttp3.OkHttpClient;
  8 +import org.slf4j.Logger;
  9 +import org.slf4j.LoggerFactory;
6 import org.springframework.beans.factory.annotation.Value; 10 import org.springframework.beans.factory.annotation.Value;
7 import org.springframework.context.annotation.Bean; 11 import org.springframework.context.annotation.Bean;
8 import org.springframework.context.annotation.Configuration; 12 import org.springframework.context.annotation.Configuration;
9 13
10 -import java.util.concurrent.TimeUnit; 14 +import java.util.concurrent.*;
11 15
12 /** 16 /**
13 * Function:bean 配置 17 * Function:bean 配置
@@ -19,9 +23,18 @@ import java.util.concurrent.TimeUnit; @@ -19,9 +23,18 @@ import java.util.concurrent.TimeUnit;
19 @Configuration 23 @Configuration
20 public class BeanConfig { 24 public class BeanConfig {
21 25
  26 + private final static Logger LOGGER = LoggerFactory.getLogger(BeanConfig.class);
  27 +
  28 +
22 @Value("${cim.user.id}") 29 @Value("${cim.user.id}")
23 private long userId; 30 private long userId;
24 31
  32 + @Value("${cim.callback.thread.queue.size}")
  33 + private int queueSize;
  34 +
  35 + @Value("${cim.callback.thread.pool.size}")
  36 + private int poolSize;
  37 +
25 38
26 /** 39 /**
27 * 创建心跳单例 40 * 创建心跳单例
@@ -52,4 +65,33 @@ public class BeanConfig { @@ -52,4 +65,33 @@ public class BeanConfig {
52 return builder.build(); 65 return builder.build();
53 } 66 }
54 67
  68 +
  69 + /**
  70 + * 创建回调线程池
  71 + * @return
  72 + */
  73 + @Bean("callBackThreadPool")
  74 + public ThreadPoolExecutor buildCallerThread(){
  75 + BlockingQueue<Runnable> queue = new LinkedBlockingQueue(queueSize);
  76 + ThreadFactory product = new ThreadFactoryBuilder()
  77 + .setNameFormat("product-%d")
  78 + .setDaemon(true)
  79 + .build();
  80 + ThreadPoolExecutor productExecutor = new ThreadPoolExecutor(poolSize, poolSize, 1, TimeUnit.MILLISECONDS, queue,product);
  81 + return productExecutor ;
  82 + }
  83 +
  84 + /**
  85 + * 回调 bean
  86 + * @return
  87 + */
  88 + @Bean
  89 + public MsgHandleCaller buildCaller(){
  90 + MsgHandleCaller caller = new MsgHandleCaller(msg -> {
  91 + LOGGER.warn("caller msg=[{}]" ,msg);
  92 + }) ;
  93 +
  94 + return caller ;
  95 + }
  96 +
55 } 97 }
@@ -11,6 +11,8 @@ import io.netty.handler.timeout.IdleStateEvent; @@ -11,6 +11,8 @@ import io.netty.handler.timeout.IdleStateEvent;
11 import org.slf4j.Logger; 11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory; 12 import org.slf4j.LoggerFactory;
13 13
  14 +import java.util.concurrent.ThreadPoolExecutor;
  15 +
14 /** 16 /**
15 * Function: 17 * Function:
16 * 18 *
@@ -23,7 +25,9 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt @@ -23,7 +25,9 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
23 25
24 private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientHandle.class); 26 private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientHandle.class);
25 27
  28 + private MsgHandleCaller caller = SpringBeanFactory.getBean(MsgHandleCaller.class);
26 29
  30 + private ThreadPoolExecutor threadPoolExecutor = SpringBeanFactory.getBean("callBackThreadPool",ThreadPoolExecutor.class) ;;
27 31
28 32
29 @Override 33 @Override
@@ -58,9 +62,24 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt @@ -58,9 +62,24 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
58 //从服务端收到消息时被调用 62 //从服务端收到消息时被调用
59 //LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ; 63 //LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ;
60 64
  65 + //回调消息
  66 + callBackMsg(responseProtocol.getResMsg());
  67 +
61 LOGGER.info(responseProtocol.getResMsg()); 68 LOGGER.info(responseProtocol.getResMsg());
62 } 69 }
63 70
  71 + /**
  72 + * 回调消息
  73 + * @param msg
  74 + */
  75 + private void callBackMsg(String msg) {
  76 +
  77 + threadPoolExecutor.execute(() -> {
  78 + caller.getMsgHandleListener().handle(msg);
  79 + });
  80 +
  81 + }
  82 +
64 @Override 83 @Override
65 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 84 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
66 //异常时断开连接 85 //异常时断开连接
  1 +package com.crossoverjie.cim.client.handle;
  2 +
  3 +import com.crossoverjie.cim.client.service.CustomMsgHandleListener;
  4 +
  5 +/**
  6 + * Function:消息回调 bean
  7 + *
  8 + * @author crossoverJie
  9 + * Date: 2018/12/26 17:37
  10 + * @since JDK 1.8
  11 + */
  12 +public class MsgHandleCaller {
  13 +
  14 + /**
  15 + * 回调接口
  16 + */
  17 + private CustomMsgHandleListener msgHandleListener ;
  18 +
  19 + public MsgHandleCaller(CustomMsgHandleListener msgHandleListener) {
  20 + this.msgHandleListener = msgHandleListener;
  21 + }
  22 +
  23 + public CustomMsgHandleListener getMsgHandleListener() {
  24 + return msgHandleListener;
  25 + }
  26 +
  27 + public void setMsgHandleListener(CustomMsgHandleListener msgHandleListener) {
  28 + this.msgHandleListener = msgHandleListener;
  29 + }
  30 +}
  1 +package com.crossoverjie.cim.client.service;
  2 +
  3 +/**
  4 + * Function: 自定义消息回调
  5 + *
  6 + * @author crossoverJie
  7 + * Date: 2018/12/26 17:24
  8 + * @since JDK 1.8
  9 + */
  10 +public interface CustomMsgHandleListener {
  11 +
  12 + /**
  13 + * 消息回调
  14 + * @param msg
  15 + */
  16 + void handle(String msg);
  17 +}
@@ -28,6 +28,10 @@ cim.server.route.request.url=http://45.78.28.220:8083/login @@ -28,6 +28,10 @@ cim.server.route.request.url=http://45.78.28.220:8083/login
28 cim.user.id=1545574841528 28 cim.user.id=1545574841528
29 cim.user.userName=zhangsan 29 cim.user.userName=zhangsan
30 30
  31 +# 回调线程队列大小
  32 +cim.callback.thread.queue.size = 2
  33 +# 回调线程池大小
  34 +cim.callback.thread.pool.size = 2
31 35
32 # 关闭健康检查权限 36 # 关闭健康检查权限
33 management.security.enabled=false 37 management.security.enabled=false