作者 crossoverJie

:recycle: Refactoring code.清楚路由关系

正在显示 31 个修改的文件 包含 538 行增加123 行删除
package com.crossoverjie.cim.client;
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.client.scanner.Scan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
... ... @@ -17,9 +16,8 @@ public class CIMClientApplication implements CommandLineRunner{
private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientApplication.class);
@Autowired
private CIMClient heartbeatClient ;
@Value("${cim.user.id}")
private long userId;
public static void main(String[] args) {
SpringApplication.run(CIMClientApplication.class, args);
... ... @@ -28,7 +26,7 @@ public class CIMClientApplication implements CommandLineRunner{
@Override
public void run(String... args) throws Exception {
Scan scan = new Scan(heartbeatClient) ;
Scan scan = new Scan(userId) ;
Thread thread = new Thread(scan);
thread.setName("scan-thread");
thread.start();
... ...
... ... @@ -46,13 +46,6 @@ public class CIMClient {
@Value("${cim.user.userName}")
private String userName;
@Value("${netty.server.port}")
private int nettyPort;
@Value("${netty.server.host}")
private String host;
private SocketChannel channel;
@Autowired
... ... @@ -98,7 +91,7 @@ public class CIMClient {
.handler(new CIMClientHandleInitializer())
;
ChannelFuture future = bootstrap.connect(cimServer.getIp(), cimServer.getPort()).sync();
ChannelFuture future = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
if (future.isSuccess()) {
LOGGER.info("启动 cim client 成功");
}
... ...
package com.crossoverjie.cim.client.controller;
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.client.vo.req.SendMsgReqVO;
import com.crossoverjie.cim.client.vo.res.SendMsgResVO;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.SendMsgReqVO;
import com.crossoverjie.cim.client.vo.req.StringReqVO;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.res.BaseResponse;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.vo.res.SendMsgResVO;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.pojo.CustomProtocol;
import com.crossoverjie.cim.common.res.BaseResponse;
import com.crossoverjie.cim.common.res.NULLBody;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -55,7 +56,7 @@ public class IndexController {
@ResponseBody()
public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){
BaseResponse<SendMsgResVO> res = new BaseResponse();
heartbeatClient.sendMsg(new CustomProtocol(sendMsgReqVO.getId(),sendMsgReqVO.getMsg())) ;
heartbeatClient.sendMsg(new CustomProtocol(sendMsgReqVO.getUserId(),sendMsgReqVO.getMsg())) ;
// 利用 actuator 来自增
counterService.increment(Constants.COUNTER_CLIENT_PUSH_COUNT);
... ... @@ -131,7 +132,8 @@ public class IndexController {
public BaseResponse sendGroupMsg(@RequestBody SendMsgReqVO sendMsgReqVO) throws Exception {
BaseResponse<NULLBody> res = new BaseResponse();
routeRequest.sendGroupMsg(sendMsgReqVO.getMsg()) ;
GroupReqVO groupReqVO = new GroupReqVO(sendMsgReqVO.getUserId(),sendMsgReqVO.getMsg()) ;
routeRequest.sendGroupMsg(groupReqVO) ;
counterService.increment(Constants.COUNTER_SERVER_PUSH_COUNT);
... ...
package com.crossoverjie.cim.client.scanner;
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
... ... @@ -14,31 +17,49 @@ import java.util.Scanner;
* Date: 2018/12/21 16:44
* @since JDK 1.8
*/
public class Scan implements Runnable{
public class Scan implements Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(Scan.class);
private CIMClient heartbeatClient ;
private CIMClient heartbeatClient;
public Scan(CIMClient heartbeatClient) {
this.heartbeatClient = heartbeatClient;
private RouteRequest routeRequest;
private Long userId ;
public Scan(Long userId) {
this.userId = userId ;
this.heartbeatClient = SpringBeanFactory.getBean(CIMClient.class);
this.routeRequest = SpringBeanFactory.getBean(RouteRequest.class);
}
@Override
public void run() {
Scanner sc = new Scanner(System.in);
String[] totalMsg ;
GoogleProtocolVO vo ;
while (true){
String msg = sc.nextLine() ;
String[] totalMsg;
GoogleProtocolVO vo;
while (true) {
String msg = sc.nextLine();
//单聊
totalMsg = msg.split(" ");
vo = new GoogleProtocolVO() ;
if (totalMsg.length > 1) {
vo = new GoogleProtocolVO();
vo.setRequestId(Integer.parseInt(totalMsg[0]));
vo.setMsg(totalMsg[1]);
heartbeatClient.sendGoogleProtocolMsg(vo) ;
LOGGER.info("scan =[{}]",msg);
heartbeatClient.sendGoogleProtocolMsg(vo);
} else {
//群聊
try {
GroupReqVO groupReqVO = new GroupReqVO(userId,msg) ;
routeRequest.sendGroupMsg(groupReqVO) ;
} catch (Exception e) {
LOGGER.error("Exception", e);
}
}
LOGGER.info("scan =[{}]", msg);
}
}
}
... ...
package com.crossoverjie.cim.client.service;
import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
... ... @@ -14,10 +15,10 @@ public interface RouteRequest {
/**
* 群发消息
* @param msg 消息
* @param groupReqVO 消息
* @throws Exception
*/
void sendGroupMsg(String msg) throws Exception;
void sendGroupMsg(GroupReqVO groupReqVO) throws Exception;
/**
* 获取服务器
... ...
... ... @@ -3,6 +3,7 @@ package com.crossoverjie.cim.client.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
import com.crossoverjie.cim.common.enums.StatusEnum;
... ... @@ -39,10 +40,11 @@ public class RouteRequestImpl implements RouteRequest {
private String serverRouteRequestUrl;
@Override
public void sendGroupMsg(String msg) throws Exception {
public void sendGroupMsg(GroupReqVO groupReqVO) throws Exception {
JSONObject jsonObject = new JSONObject();
jsonObject.put("msg",msg);
jsonObject.put("msg",groupReqVO.getMsg());
jsonObject.put("userId",groupReqVO.getUserId());
RequestBody requestBody = RequestBody.create(mediaType,jsonObject.toString());
Request request = new Request.Builder()
... ...
package com.crossoverjie.cim.route.vo.req;
package com.crossoverjie.cim.client.vo.req;
import com.crossoverjie.cim.common.req.BaseRequest;
import io.swagger.annotations.ApiModelProperty;
... ... @@ -14,10 +14,20 @@ import javax.validation.constraints.NotNull;
*/
public class GroupReqVO extends BaseRequest {
@NotNull(message = "userId 不能为空")
@ApiModelProperty(required = true, value = "消息发送者的 userId", example = "1545574049323")
private Long userId ;
@NotNull(message = "msg 不能为空")
@ApiModelProperty(required = true, value = "msg", example = "hello")
private String msg ;
public GroupReqVO(Long userId, String msg) {
this.userId = userId;
this.msg = msg;
}
public String getMsg() {
return msg;
}
... ... @@ -26,10 +36,19 @@ public class GroupReqVO extends BaseRequest {
this.msg = msg;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
@Override
public String toString() {
return "GroupRequest{" +
"msg='" + msg + '\'' +
return "GroupReqVO{" +
"userId=" + userId +
", msg='" + msg + '\'' +
"} " + super.toString();
}
}
... ...
... ... @@ -18,9 +18,9 @@ public class SendMsgReqVO extends BaseRequest {
@ApiModelProperty(required = true, value = "msg", example = "hello")
private String msg ;
@NotNull(message = "id 不能为空")
@ApiModelProperty(required = true, value = "id", example = "11")
private long id ;
@NotNull(message = "userId 不能为空")
@ApiModelProperty(required = true, value = "userId", example = "11")
private Long userId ;
public String getMsg() {
return msg;
... ... @@ -30,11 +30,11 @@ public class SendMsgReqVO extends BaseRequest {
this.msg = msg;
}
public long getId() {
return id;
public long getUserId() {
return userId;
}
public void setId(long id) {
this.id = id;
public void setUserId(long userId) {
this.userId = userId;
}
}
... ...
... ... @@ -60,9 +60,9 @@ public class CIMServerResVO implements Serializable {
* ip : 127.0.0.1
* port : 8081
*/
private String ip;
private int port;
private String ip ;
private Integer cimServerPort;
private Integer httpPort;
public String getIp() {
return ip;
... ... @@ -72,19 +72,28 @@ public class CIMServerResVO implements Serializable {
this.ip = ip;
}
public int getPort() {
return port;
public Integer getCimServerPort() {
return cimServerPort;
}
public void setCimServerPort(Integer cimServerPort) {
this.cimServerPort = cimServerPort;
}
public Integer getHttpPort() {
return httpPort;
}
public void setPort(int port) {
this.port = port;
public void setHttpPort(Integer httpPort) {
this.httpPort = httpPort;
}
@Override
public String toString() {
return "DataBodyBean{" +
return "ServerInfo{" +
"ip='" + ip + '\'' +
", port=" + port +
", cimServerPort=" + cimServerPort +
", httpPort=" + httpPort +
'}';
}
}
... ...
... ... @@ -6,9 +6,6 @@ server.port=8082
# 是否打开swagger
swagger.enable = true
netty.server.host=127.0.0.1
netty.server.port=11211
logging.level.root=info
# 群发消息
... ...
... ... @@ -23,5 +23,8 @@ public class CommonTest {
System.out.println(cimServerResVO.toString());
String text = "nihaoaaa" ;
String[] split = text.split(" ");
System.out.println(split.length);
}
}
... ...
... ... @@ -3,6 +3,7 @@ package com.crossoverjie.cim.route.config;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import okhttp3.OkHttpClient;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
... ... @@ -12,6 +13,8 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.util.concurrent.TimeUnit;
/**
* Function:
*
... ... @@ -55,4 +58,19 @@ public class BeanConfig {
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/**
* http client
* @return okHttp
*/
@Bean
public OkHttpClient okHttpClient() {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
builder.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS)
.writeTimeout(10,TimeUnit.SECONDS)
.retryOnConnectionFailure(true);
return builder.build();
}
}
... ...
... ... @@ -5,7 +5,7 @@ import com.crossoverjie.cim.common.res.BaseResponse;
import com.crossoverjie.cim.common.res.NULLBody;
import com.crossoverjie.cim.route.cache.ServerCache;
import com.crossoverjie.cim.route.service.AccountService;
import com.crossoverjie.cim.route.vo.req.GroupReqVO;
import com.crossoverjie.cim.route.vo.req.ChatReqVO;
import com.crossoverjie.cim.route.vo.req.LoginReqVO;
import com.crossoverjie.cim.route.vo.req.P2PReqVO;
import com.crossoverjie.cim.route.vo.req.RegisterInfoReqVO;
... ... @@ -21,6 +21,8 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.Map;
/**
* Function:
*
... ... @@ -42,11 +44,43 @@ public class RouteController {
@ApiOperation("群聊 API")
@RequestMapping(value = "groupRoute", method = RequestMethod.POST)
@ResponseBody()
public BaseResponse<NULLBody> groupRoute(@RequestBody GroupReqVO groupReqVO) {
public BaseResponse<NULLBody> groupRoute(@RequestBody ChatReqVO groupReqVO) throws Exception {
BaseResponse<NULLBody> res = new BaseResponse();
LOGGER.info("msg=[{}]", groupReqVO.toString());
//获取所有的推送列表
Map<Long, CIMServerResVO> serverResVOMap = accountService.loadRouteRelated();
for (Map.Entry<Long, CIMServerResVO> cimServerResVOEntry : serverResVOMap.entrySet()) {
Long userId = cimServerResVOEntry.getKey();
CIMServerResVO value = cimServerResVOEntry.getValue();
if (userId.equals(groupReqVO.getUserId())){
//过滤掉自己
LOGGER.info("过滤掉了发送者 userId={}",groupReqVO.getUserId());
continue;
}
//推送消息
String url = "http://" + value.getIp() + ":" + value.getHttpPort() + "/sendMsg" ;
ChatReqVO vo = new ChatReqVO(userId,groupReqVO.getMsg()) ;
accountService.pushMsg(url,vo);
}
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
return res;
}
@ApiOperation("客户端下线")
@RequestMapping(value = "offLine", method = RequestMethod.POST)
@ResponseBody()
public BaseResponse<NULLBody> offLine(@RequestBody ChatReqVO groupReqVO) throws Exception {
BaseResponse<NULLBody> res = new BaseResponse();
LOGGER.info("下线用户[{}]", groupReqVO.toString());
accountService.offLine(groupReqVO.getUserId());
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
return res;
... ... @@ -86,7 +120,7 @@ public class RouteController {
if (login) {
String server = serverCache.selectServer();
String[] serverInfo = server.split(":");
CIMServerResVO vo = new CIMServerResVO(serverInfo[0], Integer.parseInt(serverInfo[1]));
CIMServerResVO vo = new CIMServerResVO(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2]));
//保存路由信息
accountService.saveRouteInfo(loginReqVO,server);
... ...
package com.crossoverjie.cim.route.service;
import com.crossoverjie.cim.route.vo.req.ChatReqVO;
import com.crossoverjie.cim.route.vo.req.LoginReqVO;
import com.crossoverjie.cim.route.vo.res.CIMServerResVO;
import com.crossoverjie.cim.route.vo.res.RegisterInfoResVO;
import java.util.Map;
/**
* Function: 账户服务
*
... ... @@ -35,4 +39,26 @@ public interface AccountService {
* @throws Exception
*/
void saveRouteInfo(LoginReqVO loginReqVO ,String msg) throws Exception ;
/**
* 加载所有的路有关系
* @return 所有的路由关系
*/
Map<Long,CIMServerResVO> loadRouteRelated() ;
/**
* 推送消息
* @param url url
* @param groupReqVO 消息
* @throws Exception
*/
void pushMsg(String url,ChatReqVO groupReqVO) throws Exception;
/**
* 用户下线
* @param userId 下线用户ID
* @throws Exception
*/
void offLine(Long userId) throws Exception;
}
... ...
package com.crossoverjie.cim.route.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.crossoverjie.cim.route.service.AccountService;
import com.crossoverjie.cim.route.vo.req.ChatReqVO;
import com.crossoverjie.cim.route.vo.req.LoginReqVO;
import com.crossoverjie.cim.route.vo.res.CIMServerResVO;
import com.crossoverjie.cim.route.vo.res.RegisterInfoResVO;
import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import static com.crossoverjie.cim.route.constant.Constant.ACCOUNT_PREFIX;
import static com.crossoverjie.cim.route.constant.Constant.ROUTE_PREFIX;
... ... @@ -26,41 +38,103 @@ public class AccountServiceRedisImpl implements AccountService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private OkHttpClient okHttpClient;
private MediaType mediaType = MediaType.parse("application/json");
@Override
public RegisterInfoResVO register(RegisterInfoResVO info) {
String key = ACCOUNT_PREFIX + info.getUserId();
String name = redisTemplate.opsForValue().get(info.getUserName()) ;
if (null == name){
String name = redisTemplate.opsForValue().get(info.getUserName());
if (null == name) {
//为了方便查询,冗余一份
redisTemplate.opsForValue().set(key, info.getUserName());
redisTemplate.opsForValue().set(info.getUserName(),key);
}else {
redisTemplate.opsForValue().set(info.getUserName(), key);
} else {
long userId = Long.parseLong(name.split(":")[1]);
info.setUserId(userId);
info.setUserName(info.getUserName());
}
return info ;
return info;
}
@Override
public boolean login(LoginReqVO loginReqVO) throws Exception {
String key = ACCOUNT_PREFIX + loginReqVO.getUserId();
String userName = redisTemplate.opsForValue().get(key);
if (null == userName){
return false ;
if (null == userName) {
return false;
}
if (!userName.equals(loginReqVO.getUserName())) {
return false;
}
return true;
}
@Override
public void saveRouteInfo(LoginReqVO loginReqVO, String msg) throws Exception {
String key = ROUTE_PREFIX + loginReqVO.getUserId();
redisTemplate.opsForValue().set(key, msg);
}
@Override
public Map<Long, CIMServerResVO> loadRouteRelated() {
Map<Long, CIMServerResVO> routes = new HashMap<>(64);
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
ScanOptions options = ScanOptions.scanOptions()
.match(ROUTE_PREFIX + "*")
.build();
Cursor<byte[]> scan = connection.scan(options);
while (scan.hasNext()) {
byte[] next = scan.next();
String key = new String(next, StandardCharsets.UTF_8);
LOGGER.info("key={}", key);
parseServerInfo(routes, key);
}
if (!userName.equals(loginReqVO.getUserName())){
return false ;
return routes;
}
private void parseServerInfo(Map<Long, CIMServerResVO> routes, String key) {
long userId = Long.valueOf(key.split(":")[1]);
String value = redisTemplate.opsForValue().get(key);
String[] server = value.split(":");
CIMServerResVO cimServerResVO = new CIMServerResVO(server[0], Integer.parseInt(server[1]), Integer.parseInt(server[2]));
routes.put(userId, cimServerResVO);
}
@Override
public void pushMsg(String url, ChatReqVO groupReqVO) throws Exception {
Long userId = groupReqVO.getUserId();
String userName = redisTemplate.opsForValue().get(ACCOUNT_PREFIX + userId);
JSONObject jsonObject = new JSONObject();
jsonObject.put("msg", userName + ":【" + groupReqVO.getMsg() + "】");
jsonObject.put("userId", userId);
RequestBody requestBody = RequestBody.create(mediaType, jsonObject.toString());
Request request = new Request.Builder()
.url(url)
.post(requestBody)
.build();
Response response = okHttpClient.newCall(request).execute();
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
return true ;
}
@Override
public void saveRouteInfo(LoginReqVO loginReqVO,String msg) throws Exception {
String key = ROUTE_PREFIX + loginReqVO.getUserId() ;
redisTemplate.opsForValue().set(key,msg) ;
public void offLine(Long userId) throws Exception {
redisTemplate.delete(ROUTE_PREFIX + userId) ;
}
}
... ...
package com.crossoverjie.cim.route.vo.req;
import com.crossoverjie.cim.common.req.BaseRequest;
import io.swagger.annotations.ApiModelProperty;
import javax.validation.constraints.NotNull;
/**
* Function: Google Protocol 编解码发送
*
* @author crossoverJie
* Date: 2018/05/21 15:56
* @since JDK 1.8
*/
public class ChatReqVO extends BaseRequest {
@NotNull(message = "userId 不能为空")
@ApiModelProperty(required = true, value = "userId", example = "1545574049323")
private Long userId ;
@NotNull(message = "msg 不能为空")
@ApiModelProperty(required = true, value = "msg", example = "hello")
private String msg ;
public ChatReqVO() {
}
public ChatReqVO(Long userId, String msg) {
this.userId = userId;
this.msg = msg;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
@Override
public String toString() {
return "GroupReqVO{" +
"userId=" + userId +
", msg='" + msg + '\'' +
"} " + super.toString();
}
}
... ...
... ... @@ -12,11 +12,13 @@ import java.io.Serializable;
public class CIMServerResVO implements Serializable {
private String ip ;
private Integer port;
private Integer cimServerPort;
private Integer httpPort;
public CIMServerResVO(String ip, Integer port) {
public CIMServerResVO(String ip, Integer cimServerPort, Integer httpPort) {
this.ip = ip;
this.port = port;
this.cimServerPort = cimServerPort;
this.httpPort = httpPort;
}
public String getIp() {
... ... @@ -27,19 +29,19 @@ public class CIMServerResVO implements Serializable {
this.ip = ip;
}
public Integer getPort() {
return port;
public Integer getCimServerPort() {
return cimServerPort;
}
public void setPort(Integer port) {
this.port = port;
public void setCimServerPort(Integer cimServerPort) {
this.cimServerPort = cimServerPort;
}
@Override
public String toString() {
return "CIMServerResVO{" +
"ip='" + ip + '\'' +
", port=" + port +
'}';
public Integer getHttpPort() {
return httpPort;
}
public void setHttpPort(Integer httpPort) {
this.httpPort = httpPort;
}
}
... ...
package com.crossoverjie.cim.route.service.impl;
import com.alibaba.fastjson.JSON;
import com.crossoverjie.cim.route.RouteApplication;
import com.crossoverjie.cim.route.service.AccountService;
import com.crossoverjie.cim.route.vo.res.CIMServerResVO;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Map;
@SpringBootTest(classes = RouteApplication.class)
@RunWith(SpringRunner.class)
public class AccountServiceRedisImplTest {
private final static Logger LOGGER = LoggerFactory.getLogger(AccountServiceRedisImplTest.class);
@Autowired
private AccountService accountService ;
@Test
public void loadRouteRelated() throws Exception {
Map<Long, CIMServerResVO> longCIMServerResVOMap = accountService.loadRouteRelated();
LOGGER.info("longCIMServerResVOMap={}" , JSON.toJSONString(longCIMServerResVOMap));
}
}
\ No newline at end of file
... ...
... ... @@ -5,6 +5,7 @@ import com.crossoverjie.cim.server.kit.RegistryZK;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
... ... @@ -22,6 +23,9 @@ public class CIMServerApplication implements CommandLineRunner{
@Autowired
private AppConfiguration appConfiguration ;
@Value("${server.port}")
private int httpPort ;
public static void main(String[] args) {
SpringApplication.run(CIMServerApplication.class, args);
LOGGER.info("启动 Server 成功");
... ... @@ -31,7 +35,7 @@ public class CIMServerApplication implements CommandLineRunner{
public void run(String... args) throws Exception {
//获得本机IP
String addr = InetAddress.getLocalHost().getHostAddress();
Thread thread = new Thread(new RegistryZK(addr, appConfiguration.getPort()));
Thread thread = new Thread(new RegistryZK(addr, appConfiguration.getCimServerPort(),httpPort));
thread.setName("registry-zk");
thread.start() ;
}
... ...
... ... @@ -22,15 +22,18 @@ public class AppConfiguration {
@Value("${app.zk.switch}")
private boolean zkSwitch;
@Value("${netty.server.port}")
private int port;
@Value("${cim.server.port}")
private int cimServerPort;
public int getPort() {
return port;
@Value("${cim.clear.route.request.url}")
private String clearRouteUrl ;
public String getClearRouteUrl() {
return clearRouteUrl;
}
public void setPort(int port) {
this.port = port;
public void setClearRouteUrl(String clearRouteUrl) {
this.clearRouteUrl = clearRouteUrl;
}
public String getZkRoot() {
... ... @@ -56,4 +59,12 @@ public class AppConfiguration {
public void setZkSwitch(boolean zkSwitch) {
this.zkSwitch = zkSwitch;
}
public int getCimServerPort() {
return cimServerPort;
}
public void setCimServerPort(int cimServerPort) {
this.cimServerPort = cimServerPort;
}
}
... ...
package com.crossoverjie.cim.server.config;
import okhttp3.OkHttpClient;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
/**
* Function:
*
... ... @@ -22,4 +25,18 @@ public class BeanConfig {
public ZkClient buildZKClient(){
return new ZkClient(appConfiguration.getZkAddr(), 5000);
}
/**
* http client
* @return okHttp
*/
@Bean
public OkHttpClient okHttpClient() {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
builder.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS)
.writeTimeout(10,TimeUnit.SECONDS)
.retryOnConnectionFailure(true);
return builder.build();
}
}
... ...
... ... @@ -27,7 +27,7 @@ import org.springframework.web.bind.annotation.ResponseBody;
public class IndexController {
@Autowired
private CIMServer heartbeatClient ;
private CIMServer cimServer;
/**
... ... @@ -46,7 +46,7 @@ public class IndexController {
@ResponseBody
public BaseResponse<SendMsgResVO> sendMsg(@RequestBody SendMsgReqVO sendMsgReqVO){
BaseResponse<SendMsgResVO> res = new BaseResponse();
heartbeatClient.sendGoogleProtoMsg(sendMsgReqVO) ;
cimServer.sendMsg(sendMsgReqVO) ;
counterService.increment(Constants.COUNTER_SERVER_PUSH_COUNT);
... ...
package com.crossoverjie.cim.server.handle;
import com.alibaba.fastjson.JSONObject;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.crossoverjie.cim.server.config.AppConfiguration;
import com.crossoverjie.cim.server.kit.CIMUserInfo;
import com.crossoverjie.cim.server.util.SessionSocketHolder;
import com.crossoverjie.cim.server.util.SpringBeanFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Function:
*
... ... @@ -22,7 +29,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
private final static Logger LOGGER = LoggerFactory.getLogger(CIMServerHandle.class);
private final MediaType mediaType = MediaType.parse("application/json");
/**
* 取消绑定
* @param ctx
... ... @@ -30,9 +37,35 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String userName = SessionSocketHolder.getUserName((NioSocketChannel) ctx.channel());
LOGGER.info("用户[{}]断开",userName);
CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());
LOGGER.info("用户[{}]下线",userInfo.getUserName());
SessionSocketHolder.remove((NioSocketChannel) ctx.channel());
//清除路由关系
clearRouteInfo(userInfo);
}
/**
* 清除路由关系
* @param userInfo
* @throws IOException
*/
private void clearRouteInfo(CIMUserInfo userInfo) throws IOException {
OkHttpClient okHttpClient = SpringBeanFactory.getBean(OkHttpClient.class);
AppConfiguration configuration = SpringBeanFactory.getBean(AppConfiguration.class);
JSONObject jsonObject = new JSONObject();
jsonObject.put("userId", userInfo.getUserId());
RequestBody requestBody = RequestBody.create(mediaType, jsonObject.toString());
Request request = new Request.Builder()
.url(configuration.getClearRouteUrl())
.post(requestBody)
.build();
Response response = okHttpClient.newCall(request).execute();
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
}
... ...
package com.crossoverjie.cim.server.kit;
/**
* Function:
*
* @author crossoverJie
* Date: 2018/12/24 02:33
* @since JDK 1.8
*/
public class CIMUserInfo {
private Long userId ;
private String userName ;
public CIMUserInfo(Long userId, String userName) {
this.userId = userId;
this.userName = userName;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
@Override
public String toString() {
return "CIMUserInfo{" +
"userId=" + userId +
", userName='" + userName + '\'' +
'}';
}
}
... ...
... ... @@ -21,11 +21,13 @@ public class RegistryZK implements Runnable {
private AppConfiguration appConfiguration ;
private String ip;
private int port;
private int cimServerPort;
private int httpPort;
public RegistryZK(String ip, int port) {
public RegistryZK(String ip, int cimServerPort,int httpPort) {
this.ip = ip;
this.port = port;
this.cimServerPort = cimServerPort;
this.httpPort = httpPort ;
zKit = SpringBeanFactory.getBean(ZKit.class) ;
appConfiguration = SpringBeanFactory.getBean(AppConfiguration.class) ;
}
... ... @@ -38,7 +40,7 @@ public class RegistryZK implements Runnable {
//是否要将自己注册到 ZK
if (appConfiguration.isZkSwitch()){
String path = appConfiguration.getZkRoot() + "/ip-" + ip + ":" + port;
String path = appConfiguration.getZkRoot() + "/ip-" + ip + ":" + cimServerPort + ":" + httpPort;
zKit.createNode(path, path);
logger.info("注册 zookeeper 成功,msg=[{}]", path);
}
... ...
package com.crossoverjie.cim.server.server;
import com.alibaba.fastjson.JSON;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.pojo.CustomProtocol;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.crossoverjie.cim.server.init.CIMServerInitializer;
... ... @@ -41,7 +42,7 @@ public class CIMServer {
private EventLoopGroup work = new NioEventLoopGroup();
@Value("${netty.server.port}")
@Value("${cim.server.port}")
private int nettyPort;
... ... @@ -101,15 +102,16 @@ public class CIMServer {
* 发送 Google Protocol 编码消息
* @param sendMsgReqVO 消息
*/
public void sendGoogleProtoMsg(SendMsgReqVO sendMsgReqVO){
NioSocketChannel socketChannel = SessionSocketHolder.get(sendMsgReqVO.getId());
public void sendMsg(SendMsgReqVO sendMsgReqVO){
NioSocketChannel socketChannel = SessionSocketHolder.get(sendMsgReqVO.getUserId());
if (null == socketChannel) {
throw new NullPointerException("没有[" + sendMsgReqVO.getId() + "]的socketChannel");
throw new NullPointerException("客户端[" + sendMsgReqVO.getUserId() + "]不在线!");
}
CIMRequestProto.CIMReqProtocol protocol = CIMRequestProto.CIMReqProtocol.newBuilder()
.setRequestId((int) sendMsgReqVO.getId())
.setRequestId(sendMsgReqVO.getUserId())
.setReqMsg(sendMsgReqVO.getMsg())
.setType(Constants.CommandType.MSG)
.build();
ChannelFuture future = socketChannel.writeAndFlush(protocol);
... ...
package com.crossoverjie.cim.server.util;
import com.crossoverjie.cim.server.kit.CIMUserInfo;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Map;
... ... @@ -41,17 +42,20 @@ public class SessionSocketHolder {
* @param nioSocketChannel
* @return
*/
public static String getUserName(NioSocketChannel nioSocketChannel){
public static CIMUserInfo getUserId(NioSocketChannel nioSocketChannel){
for (Map.Entry<Long, NioSocketChannel> entry : CHANNEL_MAP.entrySet()) {
NioSocketChannel value = entry.getValue();
if (nioSocketChannel == value){
Long key = entry.getKey();
String userName = SESSION_MAP.get(key);
return userName ;
CIMUserInfo info = new CIMUserInfo(key,userName) ;
return info ;
}
}
return null;
}
}
... ...
... ... @@ -18,9 +18,9 @@ public class SendMsgReqVO extends BaseRequest {
@ApiModelProperty(required = true, value = "msg", example = "hello")
private String msg ;
@NotNull(message = "id 不能为空")
@ApiModelProperty(required = true, value = "id", example = "11")
private long id ;
@NotNull(message = "userId 不能为空")
@ApiModelProperty(required = true, value = "userId", example = "11")
private Long userId ;
public String getMsg() {
return msg;
... ... @@ -30,11 +30,19 @@ public class SendMsgReqVO extends BaseRequest {
this.msg = msg;
}
public long getId() {
return id;
public long getUserId() {
return userId;
}
public void setId(long id) {
this.id = id;
public void setUserId(long userId) {
this.userId = userId;
}
@Override
public String toString() {
return "SendMsgReqVO{" +
"msg='" + msg + '\'' +
", userId=" + userId +
"} " + super.toString();
}
}
... ...
spring.application.name=netty-heartbeat-server
spring.application.name=cim-server
# web port
server.port=8081
... ... @@ -6,7 +6,8 @@ server.port=8081
# 是否打开swagger
swagger.enable = true
netty.server.port=11211
# cim 服务器端口
cim.server.port=11211
logging.level.root=info
... ... @@ -28,3 +29,6 @@ app.zk.addr=47.98.194.60:2181
# zk 注册根节点
app.zk.root=/route
# 清除路由信息
cim.clear.route.request.url=http://localhost:8083/offLine
\ No newline at end of file
... ...
spring.application.name=cim-zk
# web port
server.port=8083
# web cimServerPort
server.cimServerPort=8083
# 是否打开swagger
swagger.enable = true
... ...
spring.application.name=spring-boot-admin
server.port = 8888
server.cimServerPort = 8888
logging.level.root=info
\ No newline at end of file
... ...