作者 crossoverJie

:recycle: Refactoring code.联调成功 群发,完善

正在显示 22 个修改的文件 包含 154 行增加193 行删除
... ... @@ -3,7 +3,6 @@ package com.crossoverjie.cim.client;
import com.crossoverjie.cim.client.scanner.Scan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
... ... @@ -16,17 +15,14 @@ public class CIMClientApplication implements CommandLineRunner{
private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientApplication.class);
@Value("${cim.user.id}")
private long userId;
public static void main(String[] args) {
SpringApplication.run(CIMClientApplication.class, args);
LOGGER.info("启动 Client 成功");
LOGGER.info("启动 Client 服务成功");
}
@Override
public void run(String... args) throws Exception {
Scan scan = new Scan(userId) ;
Scan scan = new Scan() ;
Thread thread = new Thread(scan);
thread.setName("scan-thread");
thread.start();
... ...
package com.crossoverjie.cim.client.client;
import com.alibaba.fastjson.JSON;
import com.crossoverjie.cim.client.init.CIMClientHandleInitializer;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.pojo.CustomProtocol;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
... ... @@ -99,17 +97,6 @@ public class CIMClient {
}
/**
* 发送消息
*
* @param customProtocol
*/
public void sendMsg(CustomProtocol customProtocol) {
ChannelFuture future = channel.writeAndFlush(customProtocol);
future.addListener((ChannelFutureListener) channelFuture ->
LOGGER.info("客户端手动发消息成功={}", JSON.toJSONString(customProtocol)));
}
/**
* 发送消息字符串
*
* @param msg
... ...
package com.crossoverjie.cim.client.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/8/24 01:43
* @since JDK 1.8
*/
@Component
public class AppConfiguration {
@Value("${cim.user.id}")
private Long userId;
@Value("${cim.user.userName}")
private String userName;
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
}
... ...
... ... @@ -9,7 +9,6 @@ import com.crossoverjie.cim.client.vo.req.StringReqVO;
import com.crossoverjie.cim.client.vo.res.SendMsgResVO;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.pojo.CustomProtocol;
import com.crossoverjie.cim.common.res.BaseResponse;
import com.crossoverjie.cim.common.res.NULLBody;
import io.swagger.annotations.ApiOperation;
... ... @@ -46,28 +45,6 @@ public class IndexController {
@Autowired
private RouteRequest routeRequest ;
/**
* 向服务端发消息
* @param sendMsgReqVO
* @return
*/
@ApiOperation("客户端发送消息")
@RequestMapping(value = "sendMsg",method = RequestMethod.POST)
@ResponseBody()
public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){
BaseResponse<SendMsgResVO> res = new BaseResponse();
heartbeatClient.sendMsg(new CustomProtocol(sendMsgReqVO.getUserId(),sendMsgReqVO.getMsg())) ;
// 利用 actuator 来自增
counterService.increment(Constants.COUNTER_CLIENT_PUSH_COUNT);
SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
sendMsgResVO.setMsg("OK") ;
res.setCode(StatusEnum.SUCCESS.getCode()) ;
res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
res.setDataBody(sendMsgResVO) ;
return res ;
}
/**
* 向服务端发消息 字符串
... ...
package com.crossoverjie.cim.client.encode;
import com.crossoverjie.cim.common.pojo.CustomProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* Function:编码
*
* @author crossoverJie
* Date: 17/05/2018 19:07
* @since JDK 1.8
*/
public class HeartbeatEncode extends MessageToByteEncoder<CustomProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) throws Exception {
out.writeLong(msg.getId()) ;
out.writeBytes(msg.getContent().getBytes()) ;
}
}
... ... @@ -33,7 +33,7 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
if (idleStateEvent.state() == IdleState.WRITER_IDLE){
LOGGER.info("已经 10 秒没有发送信息!");
//LOGGER.info("已经 10 秒没有发送信息!");
//向服务端发送消息
CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat", CIMRequestProto.CIMReqProtocol.class);
ctx.writeAndFlush(heartBeat) ;
... ... @@ -58,7 +58,7 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
//从服务端收到消息时被调用
//LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ;
LOGGER.info("客户端收到消息={}" ,responseProtocol.getResMsg());
LOGGER.info(responseProtocol.getResMsg());
}
@Override
... ...
... ... @@ -24,8 +24,8 @@ public class CIMClientHandleInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//10 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(0, 10, 0))
//60 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(0, 60, 0))
//心跳解码
//.addLast(new HeartbeatEncode())
... ...
package com.crossoverjie.cim.client.scanner;
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.client.config.AppConfiguration;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
... ... @@ -25,10 +26,10 @@ public class Scan implements Runnable {
private RouteRequest routeRequest;
private Long userId ;
private AppConfiguration configuration;
public Scan(Long userId) {
this.userId = userId ;
public Scan() {
this.configuration = SpringBeanFactory.getBean(AppConfiguration.class);
this.heartbeatClient = SpringBeanFactory.getBean(CIMClient.class);
this.routeRequest = SpringBeanFactory.getBean(RouteRequest.class);
}
... ... @@ -42,7 +43,7 @@ public class Scan implements Runnable {
String msg = sc.nextLine();
//单聊
totalMsg = msg.split(" ");
totalMsg = msg.split("><");
if (totalMsg.length > 1) {
vo = new GoogleProtocolVO();
vo.setRequestId(Integer.parseInt(totalMsg[0]));
... ... @@ -51,15 +52,15 @@ public class Scan implements Runnable {
} else {
//群聊
try {
GroupReqVO groupReqVO = new GroupReqVO(userId,msg) ;
routeRequest.sendGroupMsg(groupReqVO) ;
GroupReqVO groupReqVO = new GroupReqVO(configuration.getUserId(), msg);
routeRequest.sendGroupMsg(groupReqVO);
} catch (Exception e) {
LOGGER.error("Exception", e);
}
}
LOGGER.info("scan =[{}]", msg);
LOGGER.info("{}:【{}】", configuration.getUserName(), msg);
}
}
}
... ...
package com.crossoverjie.cim.common.pojo;
import java.io.Serializable;
/**
* Function:
*
* @author crossoverJie
* Date: 17/05/2018 17:50
* @since JDK 1.8
*/
public class CustomProtocol implements Serializable{
private static final long serialVersionUID = 4671171056588401542L;
private long id ;
private String content ;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public CustomProtocol() {
}
public CustomProtocol(long id, String content) {
this.id = id;
this.content = content;
}
@Override
public String toString() {
return "CustomProtocol{" +
"id=" + id +
", content='" + content + '\'' +
'}';
}
}
package com.crossoverjie.cim.route.controller;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.common.res.BaseResponse;
import com.crossoverjie.cim.common.res.NULLBody;
import com.crossoverjie.cim.route.cache.ServerCache;
import com.crossoverjie.cim.route.service.AccountService;
import com.crossoverjie.cim.route.service.UserInfoCacheService;
import com.crossoverjie.cim.route.vo.req.ChatReqVO;
import com.crossoverjie.cim.route.vo.req.LoginReqVO;
import com.crossoverjie.cim.route.vo.req.P2PReqVO;
... ... @@ -41,6 +43,9 @@ public class RouteController {
@Autowired
private AccountService accountService;
@Autowired
private UserInfoCacheService userInfoCacheService ;
@ApiOperation("群聊 API")
@RequestMapping(value = "groupRoute", method = RequestMethod.POST)
@ResponseBody()
... ... @@ -56,7 +61,8 @@ public class RouteController {
CIMServerResVO value = cimServerResVOEntry.getValue();
if (userId.equals(groupReqVO.getUserId())){
//过滤掉自己
LOGGER.info("过滤掉了发送者 userId={}",groupReqVO.getUserId());
CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfo(groupReqVO.getUserId());
LOGGER.warn("过滤掉了发送者 userId={}",cimUserInfo.toString());
continue;
}
... ... @@ -79,7 +85,9 @@ public class RouteController {
public BaseResponse<NULLBody> offLine(@RequestBody ChatReqVO groupReqVO) throws Exception {
BaseResponse<NULLBody> res = new BaseResponse();
LOGGER.info("下线用户[{}]", groupReqVO.toString());
CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfo(groupReqVO.getUserId());
LOGGER.info("下线用户[{}]", cimUserInfo.toString());
accountService.offLine(groupReqVO.getUserId());
res.setCode(StatusEnum.SUCCESS.getCode());
... ...
package com.crossoverjie.cim.route.service;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/24 11:06
* @since JDK 1.8
*/
public interface UserInfoCacheService {
/**
* 通过 userID 获取用户信息
* @param userId
* @return
* @throws Exception
*/
CIMUserInfo loadUserInfo(long userId) throws Exception ;
}
... ...
package com.crossoverjie.cim.route.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.route.service.AccountService;
import com.crossoverjie.cim.route.service.UserInfoCacheService;
import com.crossoverjie.cim.route.vo.req.ChatReqVO;
import com.crossoverjie.cim.route.vo.req.LoginReqVO;
import com.crossoverjie.cim.route.vo.res.CIMServerResVO;
... ... @@ -39,6 +41,9 @@ public class AccountServiceRedisImpl implements AccountService {
private RedisTemplate<String, String> redisTemplate;
@Autowired
private UserInfoCacheService userInfoCacheService ;
@Autowired
private OkHttpClient okHttpClient;
private MediaType mediaType = MediaType.parse("application/json");
... ... @@ -114,11 +119,10 @@ public class AccountServiceRedisImpl implements AccountService {
@Override
public void pushMsg(String url, long sendUserId, ChatReqVO groupReqVO) throws Exception {
//可考虑本地缓存
String sendUserName = redisTemplate.opsForValue().get(ACCOUNT_PREFIX + sendUserId);
CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfo(sendUserId);
JSONObject jsonObject = new JSONObject();
jsonObject.put("msg", sendUserName + ":【" + groupReqVO.getMsg() + "】");
jsonObject.put("msg", cimUserInfo.getUserName() + ":【" + groupReqVO.getMsg() + "】");
jsonObject.put("userId", groupReqVO.getUserId());
RequestBody requestBody = RequestBody.create(mediaType, jsonObject.toString());
... ...
package com.crossoverjie.cim.route.service.impl;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.route.service.UserInfoCacheService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static com.crossoverjie.cim.route.constant.Constant.ACCOUNT_PREFIX;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/24 11:06
* @since JDK 1.8
*/
@Service
public class UserInfoCacheServiceImpl implements UserInfoCacheService {
/**
* 本地缓存,后期可换为 LRU
*/
private final static Map<Long,CIMUserInfo> USER_INFO_MAP = new ConcurrentHashMap<>(64) ;
@Autowired
private RedisTemplate<String,String> redisTemplate ;
@Override
public CIMUserInfo loadUserInfo(long userId) throws Exception {
//优先从本地缓存获取
CIMUserInfo cimUserInfo = USER_INFO_MAP.get(userId);
if (cimUserInfo != null){
return cimUserInfo ;
}
//load redis
String sendUserName = redisTemplate.opsForValue().get(ACCOUNT_PREFIX + userId);
if (sendUserName != null){
cimUserInfo = new CIMUserInfo(userId,sendUserName) ;
USER_INFO_MAP.put(userId,cimUserInfo) ;
}
return cimUserInfo;
}
}
... ...
... ... @@ -13,7 +13,7 @@ logging.level.root=info
management.security.enabled=false
# zk 地址
app.zk.addr=47.98.194.60:2181
app.zk.addr=47.98.194.60:2182
# zk 注册根节点
app.zk.root=/route
... ...
package com.crossoverjie.cim.server.decoder;
import com.crossoverjie.cim.common.pojo.CustomProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* Function: 解码信息
*
* @author crossoverJie
* Date: 17/05/2018 18:34
* @since JDK 1.8
*/
public class HeartbeatDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
long id = in.readLong() ;
byte[] bytes = new byte[in.readableBytes()] ;
in.readBytes(bytes) ;
String content = new String(bytes) ;
CustomProtocol customProtocol = new CustomProtocol() ;
customProtocol.setId(id);
customProtocol.setContent(content) ;
out.add(customProtocol) ;
}
}
... ... @@ -2,9 +2,9 @@ package com.crossoverjie.cim.server.handle;
import com.alibaba.fastjson.JSONObject;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.crossoverjie.cim.server.config.AppConfiguration;
import com.crossoverjie.cim.server.kit.CIMUserInfo;
import com.crossoverjie.cim.server.util.SessionSocketHolder;
import com.crossoverjie.cim.server.util.SpringBeanFactory;
import io.netty.channel.ChannelHandler;
... ... @@ -63,10 +63,15 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
.post(requestBody)
.build();
Response response = okHttpClient.newCall(request).execute();
Response response = null;
try {
response = okHttpClient.newCall(request).execute();
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
}finally {
response.body().close();
}
}
... ... @@ -78,7 +83,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
//保存客户端与 Channel 之间的关系
SessionSocketHolder.put(msg.getRequestId(),(NioSocketChannel)ctx.channel()) ;
SessionSocketHolder.saveSession(msg.getRequestId(),msg.getReqMsg());
LOGGER.info("客户端[{}]注册成功",msg.getReqMsg());
LOGGER.info("客户端[{}]上线成功",msg.getReqMsg());
}
}
... ...
package com.crossoverjie.cim.server.server;
import com.alibaba.fastjson.JSON;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.pojo.CustomProtocol;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.crossoverjie.cim.server.init.CIMServerInitializer;
import com.crossoverjie.cim.server.util.SessionSocketHolder;
import com.crossoverjie.cim.server.vo.req.SendMsgReqVO;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
... ... @@ -16,7 +13,6 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
... ... @@ -82,23 +78,6 @@ public class CIMServer {
/**
* 发送消息
*
* @param customProtocol
*/
public void sendMsg(CustomProtocol customProtocol) {
NioSocketChannel socketChannel = SessionSocketHolder.get(customProtocol.getId());
if (null == socketChannel) {
throw new NullPointerException("没有[" + customProtocol.getId() + "]的socketChannel");
}
ChannelFuture future = socketChannel.writeAndFlush(Unpooled.copiedBuffer(customProtocol.toString(), CharsetUtil.UTF_8));
future.addListener((ChannelFutureListener) channelFuture ->
LOGGER.info("服务端手动发消息成功={}", JSON.toJSONString(customProtocol)));
}
/**
* 发送 Google Protocol 编码消息
* @param sendMsgReqVO 消息
*/
... ...
package com.crossoverjie.cim.server.util;
import com.crossoverjie.cim.server.kit.CIMUserInfo;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Map;
... ...
... ... @@ -25,7 +25,7 @@ monitor.channel.map.key=channelMap
app.zk.switch=true
# zk 地址
app.zk.addr=47.98.194.60:2181
app.zk.addr=47.98.194.60:2182
# zk 注册根节点
app.zk.root=/route
... ...
... ... @@ -38,6 +38,6 @@ public class Application implements CommandLineRunner{
String addr = InetAddress.getLocalHost().getHostAddress();
Thread thread = new Thread(new RegistryZK(addr, appConfiguration.getPort()));
thread.setName("registry-zk");
thread.start() ;
//thread.start() ;
}
}
\ No newline at end of file
... ...
spring.application.name=cim-zk
# web cimServerPort
server.cimServerPort=8083
# web port
server.port=9083
# 是否打开swagger
swagger.enable = true
... ...