作者 钟来

开发透传服务

正在显示 31 个修改的文件 包含 341 行增加784 行删除
driverClassName=com.mysql.cj.jdbc.Driver
#url=jdbc:mysql://rm-wz9740un21f09iokuao.mysql.rds.aliyuncs.com:3306/mqtt_broker?useUnicode=true&characterEncoding=utf8&autoReconnect=true
#username=luhui
#password=Luhui586
url=jdbc:mysql://rm-wz9446bn79p0r80ew0o.mysql.rds.aliyuncs.com:3306/runing_fish?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai
url=jdbc:mysql://rm-wz9740un21f09iokuao.mysql.rds.aliyuncs.com:3306/mqtt_broker?useUnicode=true&characterEncoding=utf8&autoReconnect=true
username=luhui
password=Luhui586
#url=jdbc:mysql://rm-wz9446bn79p0r80ew0o.mysql.rds.aliyuncs.com:3306/runing_fish?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai
#username=luhui
#password=Luhui586
#\u6700\u5927\u8FDE\u63A5\u6570\u91CF
maxActive=100
#\u6700\u5927\u7A7A\u95F2\u8FDE\u63A5
... ...
package com.zhonglai.luhui.neutrino.proxy.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zhonglai.luhui.neutrino.proxy.common.RegisterMessage;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
public class ClientMain {
public static void main(String[] args) throws Exception {
Socket socket = new Socket("127.0.0.1", 9000);
PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
ObjectMapper mapper = new ObjectMapper();
RegisterMessage register = new RegisterMessage("clientA", "abc123");
writer.println(mapper.writeValueAsString(register));
String response = reader.readLine();
System.out.println("服务端响应:" + response);
new Thread(new HeartbeatTask(socket)).start();
}
}
package com.zhonglai.luhui.neutrino.proxy.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
/**
* 心跳线程(每30秒发送)
*/
public class HeartbeatTask implements Runnable {
private final Socket socket;
public HeartbeatTask(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
ObjectMapper mapper = new ObjectMapper();
while (!socket.isClosed()) {
Map<String, String> heartbeat = new HashMap<>();
heartbeat.put("type", "heartbeat");
heartbeat.put("clientId", "clientA");
writer.println(mapper.writeValueAsString(heartbeat));
Thread.sleep(30000); // 30秒心跳
}
} catch (Exception e) {
System.out.println("心跳发送失败:" + e.getMessage());
}
}
}
\ No newline at end of file
... ... @@ -19,8 +19,26 @@
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<!-- mqtt -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
</dependencies>
... ...
package com.zhonglai.luhui.neutrino.proxy.common;
/**
* 注册协议类
*/
public class RegisterMessage {
public String type = "register";
public String clientId;
public String token;
public RegisterMessage() {}
public RegisterMessage(String clientId, String token) {
this.clientId = clientId;
this.token = token;
}
}
package com.zhonglai.luhui.neutrino.proxy.common;
/**
* 隧道请求协议
*/
public class TunnelMessage {
public String type = "tunnel";
public String tunnelId; // UUID
public String clientId;
public int remotePort; // 公网端口
public String targetHost;
public int targetPort;
}
package com.zhonglai.luhui.neutrino.proxy.common.mqtt;
import com.alibaba.fastjson.JSONObject;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
/**
* mqtt回调
*/
public class MqttCallback implements MqttCallbackExtended {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private MqttService mqttclient;
public MqttCallback(MqttService mqttclient)
{
this.mqttclient = mqttclient;
}
@Override
public void connectComplete(boolean b, String s) {
log.info("连接成功");
Set<String> topics = mqttclient.subscribe();
log.info("-----------订阅成功:{}--------------------",topics);
}
@Override
public void connectionLost(Throwable cause) {
log.error("连接丢失重新链接",cause);
while (!mqttclient.connect())
{
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("mqtt发来消息>>>>>>>:{} 《===============》 字符串:【{}】",topic,message.toString());
String str = new String(message.getPayload());
mqttclient.operate(new Topic(topic),JSONObject.parseObject(str));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("消息发送完成");
}
}
... ...
package com.zhonglai.luhui.neutrino.proxy.common.mqtt;
import java.io.FileReader;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
public class MqttConfig {
public static String mqttBroker;
public static String mqttUserName;
public static String mqttPassword;
public static String mqttClientId;
public static Set<String> subTopic;
public static void init()
{
Properties properties = loadProperties();
mqttBroker = properties.getProperty("broker");
mqttUserName = properties.getProperty("username");
mqttPassword = properties.getProperty("password");
mqttClientId = properties.getProperty("clientId");
subTopic = new HashSet<>();
String topicsStr = properties.getProperty("subTopic");
if(null != topicsStr && !"".equals(topicsStr))
{
if (topicsStr != null && !topicsStr.trim().isEmpty()) {
String[] topics = topicsStr.split(",");
for (String topic : topics) {
subTopic.add(topic.trim());
}
}
}
}
public static Properties loadProperties()
{
Properties properties = new Properties();
try {
if(null != System.getProperty("configPath") && !"".equals(System.getProperty("configPath")))
{
properties.load(new FileReader(System.getProperty("configPath")+"/mqtt.properties"));
}else{
properties.load(MqttConfig.class.getClassLoader().getResourceAsStream("mqtt.properties"));
}
} catch (Exception e) {
e.printStackTrace();
}
return properties;
}
}
... ...
package com.zhonglai.luhui.neutrino.proxy.common.mqtt;
import com.alibaba.fastjson.JSONObject;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.PreDestroy;
import java.util.Set;
/**
* 通过mqtt实现消息推送,用来处理长链接
*/
public class MqttService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private MqttClient mqttclient;
private MqttConnectOptions options;
private OperateService operateService;
// 使用 volatile 确保多线程内存可见性和禁止指令重排
private static volatile MqttService instance;
private MqttService(OperateService operateService) {
this.operateService = operateService;
// 初始化代码
try {
sart();
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
public static MqttService getInstance( OperateService operateService) {
if (instance == null) {
synchronized (MqttService.class) {
if (instance == null) {
instance = new MqttService(operateService);
}
}
}
return instance;
}
private void init() throws MqttException {
if(null == mqttclient)
{
mqttclient = new MqttClient(MqttConfig.mqttBroker, MqttConfig.mqttClientId, new MemoryPersistence());
}
options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(15);
options.setKeepAliveInterval(60); // 添加心跳设置,单位为秒
//设置断开后重新连接
options.setAutomaticReconnect(true);
mqttclient.setCallback(new MqttCallback(this));
}
public boolean connect() {
try {
options.setUserName(MqttConfig.mqttUserName);
options.setPassword(MqttConfig.mqttPassword.toCharArray());
mqttclient.connect(options);
return true;
} catch (MqttException e) {
log.error("-----------mqtt连接服务器失败--------------------",e);
}
return false;
}
public void sart() throws MqttException{
MqttConfig.init();
log.info("-----------开始启动mqtt监听服务--------------------");
init();
connect();
log.info("-----------mqtt连接服务器成功--------------------");
}
@PreDestroy
public void stop() throws MqttException {
if(null != mqttclient)
{
log.info("退出mqtt服务");
mqttclient.unsubscribe(MqttConfig.subTopic.toArray(new String[MqttConfig.subTopic.size()]));
mqttclient.disconnect();
mqttclient.close();
}
instance = null;
}
public void publish(String topic, String messageStr) throws MqttException {
MqttMessage message = new MqttMessage();
message.setPayload(messageStr.getBytes());
mqttclient.publish(topic,message);
}
public Set<String> subscribe()
{
try {
mqttclient.subscribe(MqttConfig.subTopic.toArray(new String[MqttConfig.subTopic.size()]));
} catch (MqttException e) {
e.printStackTrace();
}
return MqttConfig.subTopic;
}
public boolean isConnect()
{
return mqttclient != null && mqttclient.isConnected();
}
public void operate(Topic topic, JSONObject payloaddata)
{
operateService.operate(topic,payloaddata);
}
}
... ...
package com.zhonglai.luhui.neutrino.proxy.common.mqtt;
import com.alibaba.fastjson.JSONObject;
public interface OperateService {
/**
* 指令操作
* @param topic
* @param data
*/
void operate(Topic topic, JSONObject data);
}
... ...
package com.zhonglai.luhui.neutrino.proxy.common.mqtt;
public class Topic {
public static final String TOPIC_PUT = "PUT";
public static final String TOPIC_READ = "READ";
private String topicType;
private String time;
public Topic()
{
}
public Topic(String stopicStr)
{
String[] strs = stopicStr.split("/");
topicType = strs[0];
time = strs[1];
}
public String getTopicType() {
return topicType;
}
public void setTopicType(String topicType) {
this.topicType = topicType;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
}
... ...
package com.zhonglai.luhui.neutrino.proxy.server;
import io.netty.channel.Channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 服务端维护客户端连接
*/
public class ClientSessionManager {
private static final Map<String, Channel> clientMap = new ConcurrentHashMap<>();
public static void register(String clientId, Channel channel) {
clientMap.put(clientId, channel);
System.out.println("注册客户端:" + clientId);
}
public static void remove(Channel channel) {
clientMap.entrySet().removeIf(entry -> entry.getValue().equals(channel));
}
public static void heartbeat(String clientId) {
// 更新最后心跳时间(后续可扩展)
System.out.println("收到心跳:" + clientId);
}
public static boolean isClientConnected(String clientId) {
return clientMap.containsKey(clientId);
}
public static Channel getChannel(String clientId) {
return clientMap.get(clientId);
}
}
package com.zhonglai.luhui.neutrino.proxy.server;
import com.zhonglai.luhui.neutrino.proxy.server.httpservice.SimpleHttpServer;
import com.zhonglai.luhui.neutrino.proxy.server.proxy.ProxyServer;
import com.zhonglai.luhui.neutrino.proxy.common.mqtt.MqttConfig;
import com.zhonglai.luhui.neutrino.proxy.common.mqtt.MqttService;
import com.zhonglai.luhui.neutrino.proxy.server.operate.OperateServiceImpl;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ControlServer {
private static Logger logger = LoggerFactory.getLogger(ControlServer.class);
public static void main(String[] args) throws Exception {
//启动代理服务
ProxyServer.start(9000);
MqttService mqttService = MqttService.getInstance(new OperateServiceImpl());
mqttService.sart();
//启动接口服务
SimpleHttpServer.startHttpServer(8080);
// 使用Lambda表达式添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("正在关闭应用程序...");
try {
mqttService.stop();
} catch (MqttException e) {
logger.error("MQTT服务停止异常", e);
}
logger.info("应用程序关闭成功");
}));
}
}
... ...
package com.zhonglai.luhui.neutrino.proxy.server.function;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.zhonglai.luhui.neutrino.proxy.common.TunnelMessage;
import com.zhonglai.luhui.neutrino.proxy.server.ClientSessionManager;
import io.netty.channel.Channel;
import java.io.*;
import java.net.*;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
public class PortForwardManager {
public static final Map<Integer, ServerSocket> activeServers = new ConcurrentHashMap<>();
public static final Map<String, Integer> clientBandwidthMap = new ConcurrentHashMap<>();
public void startPortForward(int remotePort, String clientId, int targetPort) throws IOException {
if (activeServers.containsKey(remotePort)) return;
ServerSocket serverSocket = new ServerSocket(remotePort);
activeServers.put(remotePort, serverSocket);
new Thread(() -> {
System.out.println("端口转发监听启动: " + remotePort + " => " + clientId + ":" + targetPort);
while (true) {
try {
Socket externalSocket = serverSocket.accept(); // 有外部连接过来了
String tunnelId = UUID.randomUUID().toString();
// 1. 注册外部连接 socket(暂时只 createBridge)
TunnelBridgePool.createBridge(tunnelId, externalSocket);
// 2. 通过 control 通道通知客户端建立内网连接
Channel channel = ClientSessionManager.getChannel(clientId);
if (channel != null) {
ObjectMapper mapper = new ObjectMapper();
ObjectNode msg = mapper.createObjectNode();
msg.put("type", "create_tunnel");
msg.put("tunnelId", tunnelId);
msg.put("targetPort", targetPort);
channel.writeAndFlush(msg.toString() + "\n");
} else {
System.err.println("客户端未连接,无法建立隧道: " + clientId);
externalSocket.close();
}
// ✅ 注意:此时 **不要 bindAndStart**,等客户端连接进来后才执行。
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
package com.zhonglai.luhui.neutrino.proxy.server.function;
import com.zhonglai.luhui.neutrino.proxy.server.function.ratelimit.RateLimitedInputStream;
import com.zhonglai.luhui.neutrino.proxy.server.function.ratelimit.RateLimitedOutputStream;
import com.zhonglai.luhui.neutrino.proxy.server.function.ratelimit.TokenBucket;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 服务端建立转发通道并绑定 tunnelId
*/
public class TunnelBridgePool {
private static final Map<String, Socket> bridgeMap = new ConcurrentHashMap<>();
private static final Map<String, Integer> bandwidthMap = new ConcurrentHashMap<>();
public static void createBridge(String tunnelId, Socket externalSocket) {
bridgeMap.put(tunnelId, externalSocket);
}
public static void bindAndStart(String tunnelId, Socket tunnelSocket, int rate) {
bandwidthMap.put(tunnelId, rate);
bindAndStart(tunnelId, tunnelSocket);
}
public static void bindAndStart(String tunnelId, Socket tunnelSocket) {
Socket externalSocket = bridgeMap.remove(tunnelId);
int rate = bandwidthMap.remove(tunnelId);
if (externalSocket != null) {
try {
TokenBucket upload = new TokenBucket(rate, rate);
TokenBucket download = new TokenBucket(rate, rate);
InputStream fromClient = new RateLimitedInputStream(tunnelSocket.getInputStream(), upload);
OutputStream toExternal = new RateLimitedOutputStream(externalSocket.getOutputStream(), upload);
InputStream fromExternal = new RateLimitedInputStream(externalSocket.getInputStream(), download);
OutputStream toClient = new RateLimitedOutputStream(tunnelSocket.getOutputStream(), download);
new Thread(() -> copy(fromClient, toExternal)).start();
new Thread(() -> copy(fromExternal, toClient)).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static void copy(InputStream in, OutputStream out) {
try {
byte[] buffer = new byte[8192];
int len;
while ((len = in.read(buffer)) != -1) {
out.write(buffer, 0, len);
out.flush();
}
} catch (IOException ignored) {
}
}
}
package com.zhonglai.luhui.neutrino.proxy.server.function;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
/**
* 服务端独立启动数据转发端口(如 9100)
*/
public class TunnelSocketListener {
public void start() throws Exception {
ServerSocket serverSocket = new ServerSocket(9100);
System.out.println("数据转发端口监听中: 9100");
while (true) {
Socket socket = serverSocket.accept();
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String tunnelId = reader.readLine();
TunnelBridgePool.bindAndStart(tunnelId, socket);
}
}
}
package com.zhonglai.luhui.neutrino.proxy.server.function.ratelimit;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* 限速 InputStream 和 OutputStream 封装类
*/
public class RateLimitedInputStream extends FilterInputStream {
private final TokenBucket bucket;
public RateLimitedInputStream(InputStream in, TokenBucket bucket) {
super(in);
this.bucket = bucket;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
bucket.consume(len);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return -1;
}
return super.read(b, off, len);
}
}
\ No newline at end of file
package com.zhonglai.luhui.neutrino.proxy.server.function.ratelimit;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
public class RateLimitedOutputStream extends FilterOutputStream {
private final TokenBucket bucket;
public RateLimitedOutputStream(OutputStream out, TokenBucket bucket) {
super(out);
this.bucket = bucket;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
try {
bucket.consume(len);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
super.write(b, off, len);
}
}
package com.zhonglai.luhui.neutrino.proxy.server.function.ratelimit;
/**
* 核心限速器
*/
public class TokenBucket {
private final long capacity;
private final long refillRate;
private long tokens;
private long lastRefillTime;
public TokenBucket(long capacity, long refillRate) {
this.capacity = capacity;
this.refillRate = refillRate;
this.tokens = capacity;
this.lastRefillTime = System.nanoTime();
}
public synchronized void consume(long bytes) throws InterruptedException {
refill();
while (tokens < bytes) {
Thread.sleep(5);
refill();
}
tokens -= bytes;
}
private void refill() {
long now = System.nanoTime();
long elapsed = now - lastRefillTime;
long refillTokens = (elapsed * refillRate) / 1_000_000_000L;
if (refillTokens > 0) {
tokens = Math.min(capacity, tokens + refillTokens);
lastRefillTime = now;
}
}
}
package com.zhonglai.luhui.neutrino.proxy.server.httpservice;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.zhonglai.luhui.neutrino.proxy.server.function.PortForwardManager;
import com.zhonglai.luhui.neutrino.proxy.server.httpservice.dto.ResponseMessage;
import java.io.*;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* 端口映射操作
*/
public class PortForwardHandler implements HttpHandler {
@Override
public void handle(HttpExchange httpExchange) throws IOException {
try {
Map<String,String> params = getParamsMap(httpExchange);
String operate = params.get("operate");
switch (operate)
{
case "map": //端口映射
map(httpExchange,params);
return;
case "updateBandwidth": //限制带宽
updateBandwidth(httpExchange,params);
return;
default:
response(httpExchange,new ResponseMessage("不支持的操作", 0));
}
}catch (Exception e)
{
response(httpExchange,new ResponseMessage("接口请求失败", 0));
}
}
private void map(HttpExchange httpExchange, Map<String, String> params)throws IOException
{
int serverPort = Integer.parseInt(params.get("serverPort"));
String clientId =params.get("clientId");
int targetPort = Integer.parseInt(params.get("targetPort"));
int bandwidth = null != params.get("bandwidth") ? Integer.parseInt(params.get("bandwidth")) : 100 * 1024;
PortForwardManager.clientBandwidthMap.put(clientId, bandwidth);
new PortForwardManager().startPortForward(serverPort, clientId, targetPort);
response(httpExchange,new ResponseMessage("映射成功", 1));
}
private void updateBandwidth(HttpExchange httpExchange, Map<String, String> params)throws IOException
{
String clientId = params.get("clientId");
int bandwidth = Integer.parseInt(params.get("bandwidth"));
PortForwardManager.clientBandwidthMap.put(clientId, bandwidth);
response(httpExchange,new ResponseMessage("带宽更新成功", 1));
}
private void response(HttpExchange httpExchange, ResponseMessage responseMessage) throws IOException
{
String response = JSON.toJSONString(responseMessage);
OutputStream os = httpExchange.getResponseBody();
httpExchange.getResponseHeaders().set("Content-Type", "application/json; charset=UTF-8");
byte[] bytes = response.getBytes(StandardCharsets.UTF_8);
httpExchange.sendResponseHeaders(200, bytes.length);
os.write(bytes);
os.close();
}
private Map<String, String> getQueryParams(HttpExchange exchange) throws UnsupportedEncodingException {
String query = exchange.getRequestURI().getQuery(); // 获取 name=张三&age=18
return parseQuery(query);
}
private Map<String, String> parseQuery(String query) throws UnsupportedEncodingException {
Map<String, String> result = new HashMap<>();
if (query == null || query.isEmpty()) return result;
for (String param : query.split("&")) {
String[] pair = param.split("=", 2);
if (pair.length == 2) {
result.put(URLDecoder.decode(pair[0], StandardCharsets.UTF_8.toString()),URLDecoder.decode(pair[1], StandardCharsets.UTF_8.toString()));
} else if (pair.length == 1) {
result.put(URLDecoder.decode(pair[0], StandardCharsets.UTF_8.toString()), "");
}
}
return result;
}
private Map<String, String> getParamsMap(HttpExchange httpExchange) throws IOException {
String method = httpExchange.getRequestMethod();
Map<String, String> params = null;
if ("POST".equalsIgnoreCase(method))
{
params = getPostParams(httpExchange);
} else if ("GET".equalsIgnoreCase(method))
{
params = getQueryParams(httpExchange);
}
return params;
}
private Map<String, String> getPostParams(HttpExchange exchange) throws IOException {
String body = readRequestBody(exchange);
return parseQuery(body); // 和 GET 参数一样解析
}
private String readRequestBody(HttpExchange exchange) throws IOException {
BufferedReader reader = new BufferedReader(
new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8));
StringBuilder sb = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
sb.append(line);
}
return sb.toString();
}
}
package com.zhonglai.luhui.neutrino.proxy.server.httpservice;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.net.InetSocketAddress;
public class SimpleHttpServer {
public static void startHttpServer(int port) throws IOException
{
HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
// 映射 /static/ 到 static/ 目录
server.createContext("/static/", new StaticFileHandler("static"));
server.createContext("/portForward", new PortForwardHandler());
server.setExecutor(null); // 默认线程池
server.start();
System.out.println("Server started at http://localhost:" + port);
}
}
package com.zhonglai.luhui.neutrino.proxy.server.httpservice;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
/**
* 静态文件处理器
*/
public class StaticFileHandler implements HttpHandler {
private final Path basePath;
public StaticFileHandler(String rootDir) {
this.basePath = Paths.get(rootDir).toAbsolutePath();
}
@Override
public void handle(HttpExchange exchange) throws IOException {
String uriPath = exchange.getRequestURI().getPath();
String other = uriPath.replaceFirst("^/[^/]+/", "");
Path filePath = basePath.resolve(other).normalize();
//生成
if (!filePath.startsWith(basePath) || !Files.exists(filePath)) {
exchange.sendResponseHeaders(404, -1);
return;
}
String contentType = guessContentType(filePath);
byte[] fileBytes = Files.readAllBytes(filePath);
exchange.getResponseHeaders().add("Content-Type", contentType);
exchange.sendResponseHeaders(200, fileBytes.length);
try (OutputStream os = exchange.getResponseBody()) {
os.write(fileBytes);
}
}
private String guessContentType(Path path) {
String name = path.getFileName().toString().toLowerCase();
if (name.endsWith(".html")) return "text/html";
if (name.endsWith(".js")) return "application/javascript";
if (name.endsWith(".css")) return "text/css";
if (name.endsWith(".m3u8")) return "application/vnd.apple.mpegurl";
if (name.endsWith(".ts")) return "video/MP2T";
return "application/octet-stream";
}
}
package com.zhonglai.luhui.neutrino.proxy.server.httpservice.dto;
public class MappingPort {
private Integer clientPort; //客服端端口
private Integer servicePort; //服务器端口
public Integer getClientPort() {
return clientPort;
}
public void setClientPort(Integer clientPort) {
this.clientPort = clientPort;
}
public Integer getServicePort() {
return servicePort;
}
public void setServicePort(Integer servicePort) {
this.servicePort = servicePort;
}
}
package com.zhonglai.luhui.neutrino.proxy.server.httpservice.dto;
import java.util.List;
import java.util.Map;
/**
* 端口映射内容
*/
public class PortForwardMapBody {
private String clientId;
private List<MappingPort> mappingPortList;
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public List<MappingPort> getMappingPortList() {
return mappingPortList;
}
public void setMappingPortList(List<MappingPort> mappingPortList) {
this.mappingPortList = mappingPortList;
}
}
package com.zhonglai.luhui.neutrino.proxy.server.httpservice.dto;
public class ResponseMessage {
private String msg;
private Integer code;
private Object data;
public ResponseMessage(String msg, Integer code) {
this.msg = msg;
this.code = code;
}
public ResponseMessage(String msg, Integer code, Object data) {
this.msg = msg;
this.code = code;
this.data = data;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Integer getCode() {
return code;
}
public void setCode(Integer code) {
this.code = code;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
}
package com.zhonglai.luhui.neutrino.proxy.server.operate;
import com.alibaba.fastjson.JSONObject;
import com.zhonglai.luhui.neutrino.proxy.common.mqtt.OperateService;
import com.zhonglai.luhui.neutrino.proxy.common.mqtt.Topic;
public class OperateServiceImpl implements OperateService {
@Override
public void operate(Topic topic, JSONObject data) {
}
}
... ...
package com.zhonglai.luhui.neutrino.proxy.server.proxy;
import com.zhonglai.luhui.neutrino.proxy.server.proxy.handler.ControlHandler;
import com.zhonglai.luhui.neutrino.proxy.server.proxy.handler.HeartbeatTimeoutHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
/**
* 代理服务
*/
public class ProxyServer {
public static void start(int proxyPort) throws Exception
{
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new IdleStateHandler(60, 0, 0));
ch.pipeline().addLast(new HeartbeatTimeoutHandler());
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ControlHandler());
}
});
ChannelFuture future = bootstrap.bind(proxyPort).sync(); // bind 完成阻塞等待
future.channel().closeFuture().addListener((ChannelFutureListener) closeFuture -> {
System.out.println("Server channel closed.");
});
System.out.println("Netty server started on port: " + proxyPort);
// ✅ 注册优雅关闭钩子,不阻塞主线程
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutdown signal received. Closing Netty gracefully...");
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}));
}
}
package com.zhonglai.luhui.neutrino.proxy.server.proxy.handler;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zhonglai.luhui.neutrino.proxy.common.RegisterMessage;
import com.zhonglai.luhui.neutrino.proxy.server.ClientSessionManager;
import com.zhonglai.luhui.neutrino.proxy.server.function.PortForwardManager;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
public class ControlHandler extends SimpleChannelInboundHandler<String> {
private static final ObjectMapper mapper = new ObjectMapper();
private String clientId;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
JsonNode json = mapper.readTree(msg);
String type = json.get("type").asText();
switch (type) {
case "register":
clientId = json.get("clientId").asText();
ClientSessionManager.register(clientId, ctx.channel());
ctx.writeAndFlush("{\"type\":\"register_ack\",\"status\":\"ok\"}\n");
break;
case "heartbeat":
ClientSessionManager.heartbeat(json.get("clientId").asText());
break;
default:
System.out.println("未知消息类型: " + type);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ClientSessionManager.remove(ctx.channel());
System.out.println("客户端断开连接:" + clientId);
}
}
\ No newline at end of file
package com.zhonglai.luhui.neutrino.proxy.server.proxy.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
public class HeartbeatTimeoutHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent && ((IdleStateEvent)evt).state() == IdleState.READER_IDLE) {
System.out.println("心跳超时,断开客户端:" + ctx.channel().remoteAddress());
ctx.close();
} else {
super.userEventTriggered(ctx, evt);
}
}
}
\ No newline at end of file
broker=
username=
password=
clientId=
subTopic=
\ No newline at end of file
... ...
... ... @@ -615,6 +615,11 @@
<version>1.18</version>
</dependency>
<!--solon aot start(用于 aot 时注册 native 元信息)-->
<dependency>
<groupId>org.noear</groupId>
<artifactId>solon.aot</artifactId>
</dependency>
</dependencies>
... ...