作者 crossoverJie
提交者 GitHub

Merge pull request #87 from crossoverJie/fix-issue-28

Fix issue 28
正在显示 20 个修改的文件 包含 258 行增加45 行删除
... ... @@ -112,7 +112,7 @@ public class CIMClient {
LOGGER.error("连接失败", e);
}
if (future.isSuccess()) {
echoService.echo("start cim client success!");
echoService.echo("Start cim client success!");
LOGGER.info("启动 cim client 成功");
}
channel = (SocketChannel) future.channel();
... ... @@ -158,7 +158,7 @@ public class CIMClient {
.build();
ChannelFuture future = channel.writeAndFlush(login);
future.addListener((ChannelFutureListener) channelFuture ->
echoService.echo("registry cim server success!")
echoService.echo("Registry cim server success!")
);
}
... ...
... ... @@ -8,7 +8,7 @@ swagger.enable = true
logging.level.root=error
#消息记录存放路径
# 消息记录存放路径
cim.msg.logger.path = /opt/logs/cim/
... ... @@ -45,7 +45,7 @@ cim.server.online.user.url=http://localhost:8083/onlineUser
cim.clear.route.request.url=http://localhost:8083/offLine
# 客户端唯一ID
cim.user.id=1566914867344
cim.user.id=1586617710861
cim.user.userName=zhangsan
# 回调线程队列大小
... ...
... ... @@ -14,21 +14,26 @@ public enum StatusEnum {
/** 成功 */
FALLBACK("8000", "FALL_BACK"),
/** 参数校验失败**/
VALIDATION_FAIL("3000", "参数校验失败"),
VALIDATION_FAIL("3000", "invalid argument"),
/** 失败 */
FAIL("4000", "失败"),
/** 重复登录 */
REPEAT_LOGIN("5000", "账号重复登录,请退出一个账号!"),
REPEAT_LOGIN("5000", "Repeat login, log out an account please!"),
/** 请求限流 */
REQUEST_LIMIT("6000", "请求限流"),
/** 账号不在线 */
OFF_LINE("7000", "你选择的账号不在线,请重新选择!"),
SERVER_NOT_AVAILABLE("7100", "CIM server is not available, please try again later!"),
/** 登录信息不匹配 */
ACCOUNT_NOT_MATCH("9100", "登录信息不匹配!"),
ACCOUNT_NOT_MATCH("9100", "The User information you have used is incorrect!"),
/** 请求限流 */
REQUEST_LIMIT("6000", "请求限流"),
;
... ...
package com.crossoverjie.cim.common.pojo;
/**
* Function:
*
* @author crossoverJie
* Date: 2020-04-12 20:48
* @since JDK 1.8
*/
public final class RouteInfo {
private String ip ;
private Integer cimServerPort;
private Integer httpPort;
public RouteInfo(String ip, Integer cimServerPort, Integer httpPort) {
this.ip = ip;
this.cimServerPort = cimServerPort;
this.httpPort = httpPort;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public Integer getCimServerPort() {
return cimServerPort;
}
public void setCimServerPort(Integer cimServerPort) {
this.cimServerPort = cimServerPort;
}
public Integer getHttpPort() {
return httpPort;
}
public void setHttpPort(Integer httpPort) {
this.httpPort = httpPort;
}
}
... ...
package com.crossoverjie.cim.common.route.algorithm.consistenthash;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.exception.CIMException;
import java.util.SortedMap;
import java.util.TreeMap;
... ... @@ -35,6 +38,9 @@ public class TreeMapConsistentHash extends AbstractConsistentHash {
if (!last.isEmpty()) {
return last.get(last.firstKey());
}
if (treeMap.size() == 0){
throw new CIMException(StatusEnum.SERVER_NOT_AVAILABLE) ;
}
return treeMap.firstEntry().getValue();
}
}
... ...
package com.crossoverjie.cim.common.util;
import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.pojo.RouteInfo;
import static com.crossoverjie.cim.common.enums.StatusEnum.VALIDATION_FAIL;
/**
* Function:
*
* @author crossoverJie
* Date: 2020-04-12 20:42
* @since JDK 1.8
*/
public class RouteInfoParseUtil {
public static RouteInfo parse(String info){
try {
String[] serverInfo = info.split(":");
RouteInfo routeInfo = new RouteInfo(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2])) ;
return routeInfo ;
}catch (Exception e){
throw new CIMException(VALIDATION_FAIL) ;
}
}
}
... ...
... ... @@ -17,7 +17,7 @@ public class RouteApplication implements CommandLineRunner{
public static void main(String[] args) {
SpringApplication.run(RouteApplication.class, args);
LOGGER.info("启动 route 成功");
LOGGER.info("Start cim route success!!!");
}
@Override
... ...
... ... @@ -3,11 +3,14 @@ package com.crossoverjie.cim.route.controller;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.common.pojo.RouteInfo;
import com.crossoverjie.cim.common.res.BaseResponse;
import com.crossoverjie.cim.common.res.NULLBody;
import com.crossoverjie.cim.common.route.algorithm.RouteHandle;
import com.crossoverjie.cim.common.util.RouteInfoParseUtil;
import com.crossoverjie.cim.route.cache.ServerCache;
import com.crossoverjie.cim.route.service.AccountService;
import com.crossoverjie.cim.route.service.CommonBizService;
import com.crossoverjie.cim.route.service.UserInfoCacheService;
import com.crossoverjie.cim.route.vo.req.ChatReqVO;
import com.crossoverjie.cim.route.vo.req.LoginReqVO;
... ... @@ -49,6 +52,8 @@ public class RouteController {
@Autowired
private UserInfoCacheService userInfoCacheService ;
@Autowired
private CommonBizService commonBizService ;
@Autowired
private RouteHandle routeHandle ;
... ... @@ -128,7 +133,7 @@ public class RouteController {
CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(groupReqVO.getUserId());
LOGGER.info("下线用户[{}]", cimUserInfo.toString());
LOGGER.info("user [{}] offline!", cimUserInfo.toString());
accountService.offLine(groupReqVO.getUserId());
res.setCode(StatusEnum.SUCCESS.getCode());
... ... @@ -147,17 +152,19 @@ public class RouteController {
public BaseResponse<CIMServerResVO> login(@RequestBody LoginReqVO loginReqVO) throws Exception {
BaseResponse<CIMServerResVO> res = new BaseResponse();
// check server available
String server = routeHandle.routeServer(serverCache.getAll(),String.valueOf(loginReqVO.getUserId()));
RouteInfo routeInfo = RouteInfoParseUtil.parse(server);
commonBizService.checkServerAvailable(routeInfo);
//登录校验
StatusEnum status = accountService.login(loginReqVO);
if (status == StatusEnum.SUCCESS) {
String server = routeHandle.routeServer(serverCache.getAll(),String.valueOf(loginReqVO.getUserId()));
String[] serverInfo = server.split(":");
CIMServerResVO vo = new CIMServerResVO(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2]));
//保存路由信息
accountService.saveRouteInfo(loginReqVO,server);
CIMServerResVO vo = new CIMServerResVO(routeInfo);
res.setDataBody(vo);
}
... ...
package com.crossoverjie.cim.route.exception;
import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.res.BaseResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* Function:
*
* @author crossoverJie
* Date: 2020-04-12 22:13
* @since JDK 1.8
*/
@ControllerAdvice
public class ExceptionHandlingController {
private static Logger logger = LoggerFactory.getLogger(ExceptionHandlingController.class) ;
@ExceptionHandler(CIMException.class)
@ResponseBody()
public BaseResponse handleAllExceptions(CIMException ex) {
logger.error("exception", ex);
BaseResponse baseResponse = new BaseResponse();
baseResponse.setCode(ex.getErrorCode());
baseResponse.setMessage(ex.getMessage());
return baseResponse ;
}
}
\ No newline at end of file
... ...
package com.crossoverjie.cim.route.kit;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
/**
* Function:
*
* @author crossoverJie
* Date: 2020-04-12 20:32
* @since JDK 1.8
*/
public class NetAddressIsReachable {
/**
* check ip and port
*
* @param address
* @param port
* @param timeout
* @return True if connection successful
*/
public static boolean checkAddressReachable(String address, int port, int timeout) {
Socket socket = new Socket() ;
try {
socket.connect(new InetSocketAddress(address, port), timeout);
return true;
} catch (IOException exception) {
return false;
} finally {
try {
socket.close();
} catch (IOException e) {
return false ;
}
}
}
}
... ...
... ... @@ -40,7 +40,7 @@ public class ZKit {
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
logger.info("清除/更新本地缓存 parentPath=【{}】,currentChilds=【{}】", parentPath,currentChilds.toString());
logger.info("Clear or update local cache parentPath=[{}],currentChilds=[{}]", parentPath,currentChilds.toString());
//更新所有缓存/先删除 再新增
serverCache.updateCache(currentChilds) ;
... ...
package com.crossoverjie.cim.route.service;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.pojo.RouteInfo;
import com.crossoverjie.cim.route.kit.NetAddressIsReachable;
import org.springframework.stereotype.Component;
/**
* Function:
*
* @author crossoverJie
* Date: 2020-04-12 21:40
* @since JDK 1.8
*/
@Component
public class CommonBizService {
/**
* check ip and port
* @param routeInfo
*/
public void checkServerAvailable(RouteInfo routeInfo){
boolean reachable = NetAddressIsReachable.checkAddressReachable(routeInfo.getIp(), routeInfo.getCimServerPort(), 1000);
if (!reachable) {
throw new CIMException(StatusEnum.SERVER_NOT_AVAILABLE) ;
}
}
}
... ...
... ... @@ -38,8 +38,8 @@ public interface UserInfoCacheService {
/**
*
* @return 获取所有在线用户
* query all online user
* @return online user
*/
Set<CIMUserInfo> onlineUser() ;
}
... ...
... ... @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.common.util.RouteInfoParseUtil;
import com.crossoverjie.cim.route.service.AccountService;
import com.crossoverjie.cim.route.service.UserInfoCacheService;
import com.crossoverjie.cim.route.vo.req.ChatReqVO;
... ... @@ -33,7 +34,7 @@ import static com.crossoverjie.cim.route.constant.Constant.ROUTE_PREFIX;
* Function:
*
* @author crossoverJie
* Date: 2018/12/23 21:58
* Date: 2018/12/23 21:58
* @since JDK 1.8
*/
@Service
... ... @@ -44,7 +45,7 @@ public class AccountServiceRedisImpl implements AccountService {
private RedisTemplate<String, String> redisTemplate;
@Autowired
private UserInfoCacheService userInfoCacheService ;
private UserInfoCacheService userInfoCacheService;
@Autowired
private OkHttpClient okHttpClient;
... ... @@ -84,9 +85,9 @@ public class AccountServiceRedisImpl implements AccountService {
//登录成功,保存登录状态
boolean status = userInfoCacheService.saveAndCheckUserLoginStatus(loginReqVO.getUserId());
if (status == false){
if (status == false) {
//重复登录
return StatusEnum.REPEAT_LOGIN ;
return StatusEnum.REPEAT_LOGIN;
}
return StatusEnum.SUCCESS;
... ... @@ -120,7 +121,7 @@ public class AccountServiceRedisImpl implements AccountService {
try {
scan.close();
} catch (IOException e) {
LOGGER.error("IOException",e);
LOGGER.error("IOException", e);
}
return routes;
... ... @@ -130,20 +131,18 @@ public class AccountServiceRedisImpl implements AccountService {
public CIMServerResVO loadRouteRelatedByUserId(Long userId) {
String value = redisTemplate.opsForValue().get(ROUTE_PREFIX + userId);
if (value == null){
throw new CIMException(OFF_LINE) ;
if (value == null) {
throw new CIMException(OFF_LINE);
}
String[] server = value.split(":");
CIMServerResVO cimServerResVO = new CIMServerResVO(server[0], Integer.parseInt(server[1]), Integer.parseInt(server[2]));
CIMServerResVO cimServerResVO = new CIMServerResVO(RouteInfoParseUtil.parse(value));
return cimServerResVO;
}
private void parseServerInfo(Map<Long, CIMServerResVO> routes, String key) {
long userId = Long.valueOf(key.split(":")[1]);
String value = redisTemplate.opsForValue().get(key);
String[] server = value.split(":");
CIMServerResVO cimServerResVO = new CIMServerResVO(server[0], Integer.parseInt(server[1]), Integer.parseInt(server[2]));
CIMServerResVO cimServerResVO = new CIMServerResVO(RouteInfoParseUtil.parse(value));
routes.put(userId, cimServerResVO);
}
... ... @@ -167,7 +166,7 @@ public class AccountServiceRedisImpl implements AccountService {
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
}finally {
} finally {
response.body().close();
}
}
... ... @@ -178,7 +177,7 @@ public class AccountServiceRedisImpl implements AccountService {
// TODO: 2019-01-21 改为一个原子命令,以防数据一致性
//删除路由
redisTemplate.delete(ROUTE_PREFIX + userId) ;
redisTemplate.delete(ROUTE_PREFIX + userId);
//删除登录状态
userInfoCacheService.removeLoginStatus(userId);
... ...
package com.crossoverjie.cim.route.vo.res;
import com.crossoverjie.cim.common.pojo.RouteInfo;
import java.io.Serializable;
/**
... ... @@ -15,10 +17,10 @@ public class CIMServerResVO implements Serializable {
private Integer cimServerPort;
private Integer httpPort;
public CIMServerResVO(String ip, Integer cimServerPort, Integer httpPort) {
this.ip = ip;
this.cimServerPort = cimServerPort;
this.httpPort = httpPort;
public CIMServerResVO(RouteInfo routeInfo) {
this.ip = routeInfo.getIp();
this.cimServerPort = routeInfo.getCimServerPort();
this.httpPort = routeInfo.getHttpPort();
}
public String getIp() {
... ...
import com.crossoverjie.cim.route.kit.NetAddressIsReachable;
import org.junit.Test;
/**
* Function:
*
* @author crossoverJie
* Date: 2020-04-12 18:38
* @since JDK 1.8
*/
public class CommonTest {
@Test
public void test() {
boolean reachable = NetAddressIsReachable.checkAddressReachable("127.0.0.1", 11211, 1000);
System.out.println(reachable);
}
}
... ...
... ... @@ -28,7 +28,7 @@ public class CIMServerApplication implements CommandLineRunner{
public static void main(String[] args) {
SpringApplication.run(CIMServerApplication.class, args);
LOGGER.info("启动 Server 成功");
LOGGER.info("Start cim server success!!!");
}
@Override
... ...
... ... @@ -49,7 +49,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
//可能出现业务判断离线后再次触发 channelInactive
CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel());
if (userInfo != null){
LOGGER.warn("[{}]触发 channelInactive 掉线!",userInfo.getUserName());
LOGGER.warn("[{}] trigger channelInactive offline!",userInfo.getUserName());
userOffLine(userInfo, (NioSocketChannel) ctx.channel());
ctx.channel().close();
}
... ... @@ -77,7 +77,7 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
* @throws IOException
*/
private void userOffLine(CIMUserInfo userInfo, NioSocketChannel channel) throws IOException {
LOGGER.info("用户[{}]下线", userInfo.getUserName());
LOGGER.info("account [{}] offline!", userInfo.getUserName());
SessionSocketHolder.remove(channel);
SessionSocketHolder.removeSession(userInfo.getUserId());
... ... @@ -118,13 +118,13 @@ public class CIMServerHandle extends SimpleChannelInboundHandler<CIMRequestProto
@Override
protected void channelRead0(ChannelHandlerContext ctx, CIMRequestProto.CIMReqProtocol msg) throws Exception {
LOGGER.info("收到msg={}", msg.toString());
LOGGER.info("received msg=[{}]", msg.toString());
if (msg.getType() == Constants.CommandType.LOGIN) {
//保存客户端与 Channel 之间的关系
SessionSocketHolder.put(msg.getRequestId(), (NioSocketChannel) ctx.channel());
SessionSocketHolder.saveSession(msg.getRequestId(), msg.getReqMsg());
LOGGER.info("客户端[{}]上线成功", msg.getReqMsg());
LOGGER.info("client [{}] online success!!", msg.getReqMsg());
}
//心跳更新时间
... ...
... ... @@ -42,7 +42,7 @@ public class RegistryZK implements Runnable {
if (appConfiguration.isZkSwitch()){
String path = appConfiguration.getZkRoot() + "/ip-" + ip + ":" + cimServerPort + ":" + httpPort;
zKit.createNode(path);
logger.info("注册 zookeeper 成功,msg=[{}]", path);
logger.info("Registry zookeeper success, msg=[{}]", path);
}
... ...
... ... @@ -61,7 +61,7 @@ public class CIMServer {
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
LOGGER.info("启动 cim server 成功");
LOGGER.info("Start cim server success!!!");
}
}
... ... @@ -73,7 +73,7 @@ public class CIMServer {
public void destroy() {
boss.shutdownGracefully().syncUninterruptibly();
work.shutdownGracefully().syncUninterruptibly();
LOGGER.info("关闭 cim server 成功");
LOGGER.info("Close cim server success!!!");
}
... ... @@ -85,7 +85,7 @@ public class CIMServer {
NioSocketChannel socketChannel = SessionSocketHolder.get(sendMsgReqVO.getUserId());
if (null == socketChannel) {
throw new NullPointerException("客户端[" + sendMsgReqVO.getUserId() + "]不在线!");
LOGGER.error("client {} offline!", sendMsgReqVO.getUserId());
}
CIMRequestProto.CIMReqProtocol protocol = CIMRequestProto.CIMReqProtocol.newBuilder()
.setRequestId(sendMsgReqVO.getUserId())
... ... @@ -95,6 +95,6 @@ public class CIMServer {
ChannelFuture future = socketChannel.writeAndFlush(protocol);
future.addListener((ChannelFutureListener) channelFuture ->
LOGGER.info("服务端手动发送 Google Protocol 成功={}", sendMsgReqVO.toString()));
LOGGER.info("server push msg:[{}]", sendMsgReqVO.toString()));
}
}
... ...