作者 钟来

初始提交

正在显示 78 个修改的文件 包含 4745 行增加0 行删除

要显示太多修改。

为保证性能只显示 78 of 78+ 个文件。

.idea
**/*.iml
**/target
**/*.pyc
\ No newline at end of file
... ...
# 抖音爬虫
----
<p>用于爬取指定用户的'作品', 以及指定用户的'喜欢'。
<p>用手机挂代理刷抖音, 当你访问其他用户的个人空间时, 就自动把这些用户信息、头像、视频封面、视频mp4、mp3等都爬取下来了。
---
本项目使用语言及版本:
* jdk: 14-preview (idea请使用2020.1尝鲜版, 否则不支持jdk14新语法)
* python: 3.8
----
模块介绍:
* douyin-scanner \[python\] 本模块用于将抖音信息以mitmdump代理形式拦截, 然后以宽表形式写入到数据库中, 方便douyin-downloader模块做后续的处理。
* douyin-downloader \[java\] 使用vertx框架。本模块用于将爬取下来的信息做后续的分析、重组、下载。
----
本项目使用技术:
* mitmdump + python做代理拦截
* vertx作为整个项目的主要框架
* 裸写sql不方便, 自己实现了一个sqlBuilder, 方便拼接sql (最开始是使用的第三方依赖[sqlBuilder](https://github.com/jkrasnay/sqlbuilder)但是这个用起来有很多不足之处, 比如不支持limit, 不支持prepare等等
* 自己用反射实现了一个对象关系应该工具类com/aries/crawler/tools/Orm.java , 弥补了vertx没有orm的不便利之处。美其名曰:几十行代码实现了一个orm。
----
为什么不用spring和mybatis
1. 我不喜欢自己的项目里有一大堆眼花缭乱的第三方依赖(你可以看一下本项目的pom.xml, 目前只有vertx-core、vertx-mysql, 还有一个用于单元测试的junit)
1. 不喜欢无脑使用spring和mybatis的行为. 经常见到一些java工程师打算新建个项目写点东西时, 第一件事情就是想都不想就直接引入一套spring(醒醒啊喂, 你是java工程师, 不是spring工程师)。 并不是认为这些不好, 只是认为这并不是解决问题的通用方案, 更不是完美方案。
1. 这是我第一次使用vertx, 但不是第一次不使用spring。(netty/jFinal/play/akka都是很不错的框架呀)
----
本项目仅供学习研究, 不提供任何反爬虫等功能, 请不要恶意爬取。 恶意使用本代码者, 后果自负!
... ...
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aries.crawler</groupId>
<artifactId>douyin-downloader</artifactId>
<version>1.0-SNAPSHOT</version>
<name>douyin-downloader</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>14</maven.compiler.source>
<maven.compiler.target>14</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.8.5</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-jdbc-client</artifactId>
<version>3.8.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>14</release> <!-- <release>13</release> -->
<compilerArgs>--enable-preview</compilerArgs>
</configuration>
</plugin>
</plugins>
</build>
</project>
... ...
package com.aries.crawler;
import com.aries.crawler.trans.codec.CommonMessageCodec;
import com.aries.crawler.trans.message.*;
import com.aries.crawler.verticles.*;
import io.vertx.core.CompositeFuture;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author arowana
*/
public class Starter {
private static final Logger logger = LoggerFactory.getLogger(Starter.class);
public static void main(String[] args) {
// Force to use slf4j. 参考自dgate:https://github.com/DTeam-Top/dgate/blob/master/src/main/java/top/dteam/dgate/Launcher.java
System.setProperty("vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory");
var vertx = Vertx.vertx();
logger.info("register codec");
vertx.eventBus().registerCodec(new CommonMessageCodec<>());
vertx.eventBus().registerDefaultCodec(SimpleInt64Message.class, new CommonMessageCodec<>());
vertx.eventBus().registerDefaultCodec(DouyinUserInfoMessage.class, new CommonMessageCodec<>());
vertx.eventBus().registerDefaultCodec(DouyinVideoInfoMessage.class, new CommonMessageCodec<>());
vertx.eventBus().registerDefaultCodec(DouyinWideDataMessage.class, new CommonMessageCodec<>());
vertx.eventBus().registerDefaultCodec(CommonResponseMessage.class, new CommonMessageCodec<>());
logger.info("deploying verticles");
List<Future> futures = new ArrayList<>() {
{
add(optionalDeploy(vertx, UserInsertVerticle.class, 1));
add(optionalDeploy(vertx, VideoInsertVerticle.class, 1));
add(optionalDeploy(vertx, UpdateDataVerticle.class, 1));
add(optionalDeploy(vertx, WideDataDispatchVerticle.class, 1));
add(optionalDeploy(vertx, WideDataPickUpVerticle.class, 1));
add(optionalDeploy(vertx, VideoDataPickUpVerticle.class, 1));
add(optionalDeploy(vertx, VideoDownloadVerticle.class, 1));
}
};
CompositeFuture.all(futures).setHandler(ar -> {
if (ar.succeeded()) {
logger.info("all verticle start");
} else {
logger.error("verticle(s) failed: " + ar.cause());
}
});
}
/**
* 简单部署
*
* @param vertx 全局vertx
* @param verticleType 部署的verticel类
* @return 部署后的Future回调
*/
public static Future<Void> simpleDeploy(Vertx vertx, Class<?> verticleType) {
return Future.future(res -> {
vertx.deployVerticle(verticleType.getTypeName(), deployRes -> {
if (deployRes.succeeded()) {
res.complete();
} else {
res.fail(deployRes.cause());
}
});
});
}
/**
* 配置化部署
*
* @param vertx 全局vertx
* @param verticleType 部署的verticel类
* @param workerSize 实例个数
* @return 部署后的Future回调
*/
public static Future<Void> optionalDeploy(Vertx vertx, Class<?> verticleType, int workerSize) {
var options = new DeploymentOptions()
.setWorker(true)
.setWorkerPoolName(verticleType.getTypeName() + "-pool")
.setWorkerPoolSize(workerSize)
.setMaxWorkerExecuteTime(TimeUnit.SECONDS.toNanos(10))
.setInstances(workerSize);
return Future.future(res -> {
vertx.deployVerticle(verticleType.getTypeName(), options, deployRes -> {
if (deployRes.succeeded()) {
res.complete();
} else {
res.fail(deployRes.cause());
}
});
});
}
}
... ...
package com.aries.crawler.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author arowana
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(value = {ElementType.FIELD})
public @interface MysqlField {
/**
* 对应的数据库字段名
*/
String alias();
/**
* 对应的类型
*/
Class<?> type();
}
... ...
package com.aries.crawler.model;
/**
* 数据模型的抽象类。
*
* @author arowana
*/
public interface DataModelable {
}
... ...
package com.aries.crawler.model.douyincrawler;
import com.aries.crawler.annotation.MysqlField;
import com.aries.crawler.model.DataModelable;
import java.math.BigInteger;
/**
* 抖音数据宽表
*
* @author arowana
*/
public class DouyinCrawlerLogModel implements DataModelable {
/**
* 提取用户数据完成
*/
public static final Integer STATUS_USER_DONE = 1;
/**
* 提取视频信息完成
*/
public static final Integer STATUS_VIDEO_DONE = 2;
/**
* 用户信息、视频信息 都提取完成
*/
public static final Integer STATUS_ALL_DONE = 3;
public static final String TABLE = "douyin_crawler_log";
@MysqlField(alias = "aweme_id", type = Long.class)
public Long awemeId;
@MysqlField(alias = "aweme_desc", type = String.class)
public String awemeDesc;
@MysqlField(alias = "aweme_create_time", type = Long.class)
public Long awemeCreateTime;
@MysqlField(alias = "author_uid", type = Long.class)
public Long authorUid;
@MysqlField(alias = "author_short_id", type = Long.class)
public Long authorShortId;
@MysqlField(alias = "author_nickname", type = String.class)
public String authorNickname;
@MysqlField(alias = "author_signature", type = String.class)
public String authorSignature;
@MysqlField(alias = "avatar_larger_url", type = String.class)
public String avatarLargerUrl;
@MysqlField(alias = "author_share_info_qrcode_url", type = String.class)
public String authorShareInfoQrcodeUrl;
@MysqlField(alias = "video_cover_url", type = String.class)
public String videoCoverUrl;
@MysqlField(alias = "video_dynamic_cover_url", type = String.class)
public String videoDynamicCoverUrl;
@MysqlField(alias = "video_download_addr_url", type = String.class)
public String videoDownloadAddrUrl;
@MysqlField(alias = "video_share_url", type = String.class)
public String videoShareUrl;
@MysqlField(alias = "video_tag", type = String.class)
public String videoVideoTag;
@MysqlField(alias = "video_duration", type = BigInteger.class)
public Long videoDuration;
@MysqlField(alias = "type", type = Integer.class)
public Integer type;
@MysqlField(alias = "status", type = Integer.class)
public Integer status;
@MysqlField(alias = "ct", type = String.class)
public String ct;
@MysqlField(alias = "ut", type = String.class)
public String ut;
@MysqlField(alias = "id", type = BigInteger.class)
private BigInteger id;
public static Integer getStatusUserDone() {
return STATUS_USER_DONE;
}
public static Integer getStatusVideoDone() {
return STATUS_VIDEO_DONE;
}
public static Integer getStatusAllDone() {
return STATUS_ALL_DONE;
}
public static String getTABLE() {
return TABLE;
}
public BigInteger getId() {
return id;
}
public void setId(BigInteger id) {
this.id = id;
}
public Long getAwemeId() {
return awemeId;
}
public void setAwemeId(Long awemeId) {
this.awemeId = awemeId;
}
public String getAwemeDesc() {
return awemeDesc;
}
public void setAwemeDesc(String awemeDesc) {
this.awemeDesc = awemeDesc;
}
public Long getAwemeCreateTime() {
return awemeCreateTime;
}
public void setAwemeCreateTime(Long awemeCreateTime) {
this.awemeCreateTime = awemeCreateTime;
}
public Long getAuthorUid() {
return authorUid;
}
public void setAuthorUid(Long authorUid) {
this.authorUid = authorUid;
}
public Long getAuthorShortId() {
return authorShortId;
}
public void setAuthorShortId(Long authorShortId) {
this.authorShortId = authorShortId;
}
public String getAuthorNickname() {
return authorNickname;
}
public void setAuthorNickname(String authorNickname) {
this.authorNickname = authorNickname;
}
public String getAuthorSignature() {
return authorSignature;
}
public void setAuthorSignature(String authorSignature) {
this.authorSignature = authorSignature;
}
public String getAvatarLargerUrl() {
return avatarLargerUrl;
}
public void setAvatarLargerUrl(String avatarLargerUrl) {
this.avatarLargerUrl = avatarLargerUrl;
}
public String getAuthorShareInfoQrcodeUrl() {
return authorShareInfoQrcodeUrl;
}
public void setAuthorShareInfoQrcodeUrl(String authorShareInfoQrcodeUrl) {
this.authorShareInfoQrcodeUrl = authorShareInfoQrcodeUrl;
}
public String getVideoCoverUrl() {
return videoCoverUrl;
}
public void setVideoCoverUrl(String videoCoverUrl) {
this.videoCoverUrl = videoCoverUrl;
}
public String getVideoDynamicCoverUrl() {
return videoDynamicCoverUrl;
}
public void setVideoDynamicCoverUrl(String videoDynamicCoverUrl) {
this.videoDynamicCoverUrl = videoDynamicCoverUrl;
}
public String getVideoDownloadAddrUrl() {
return videoDownloadAddrUrl;
}
public void setVideoDownloadAddrUrl(String videoDownloadAddrUrl) {
this.videoDownloadAddrUrl = videoDownloadAddrUrl;
}
public String getVideoShareUrl() {
return videoShareUrl;
}
public void setVideoShareUrl(String videoShareUrl) {
this.videoShareUrl = videoShareUrl;
}
public String getVideoVideoTag() {
return videoVideoTag;
}
public void setVideoVideoTag(String videoVideoTag) {
this.videoVideoTag = videoVideoTag;
}
public Long getVideoDuration() {
return videoDuration;
}
public void setVideoDuration(Long videoDuration) {
this.videoDuration = videoDuration;
}
public Integer getType() {
return type;
}
public void setType(Integer type) {
this.type = type;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public String getCt() {
return ct;
}
public void setCt(String ct) {
this.ct = ct;
}
public String getUt() {
return ut;
}
public void setUt(String ut) {
this.ut = ut;
}
@Override
public String toString() {
return "DouyinCrawlerLogModel{" +
"id=" + id +
", awemeId=" + awemeId +
", awemeDesc='" + awemeDesc + '\'' +
", awemeCreateTime=" + awemeCreateTime +
", authorUid=" + authorUid +
", authorShortId=" + authorShortId +
", authorNickname='" + authorNickname + '\'' +
", authorSignature='" + authorSignature + '\'' +
", avatarLargerUrl='" + avatarLargerUrl + '\'' +
", authorShareInfoQrcodeUrl='" + authorShareInfoQrcodeUrl + '\'' +
", videoCoverUrl='" + videoCoverUrl + '\'' +
", videoDynamicCoverUrl='" + videoDynamicCoverUrl + '\'' +
", videoDownloadAddrUrl='" + videoDownloadAddrUrl + '\'' +
", videoShareUrl='" + videoShareUrl + '\'' +
", videoVideoTag='" + videoVideoTag + '\'' +
", videoDuration=" + videoDuration +
", type=" + type +
", status=" + status +
", ct=" + ct +
", ut=" + ut +
'}';
}
}
... ...
package com.aries.crawler.model.douyincrawler;
import com.aries.crawler.annotation.MysqlField;
import com.aries.crawler.model.DataModelable;
import java.time.LocalDateTime;
/**
* 抖音数据宽表
*
* @author arowana
*/
public class DouyinUserModel implements DataModelable {
@MysqlField(alias = "uid", type = Long.class)
public Long uid;
@MysqlField(alias = "short_id", type = Long.class)
public Long shortId;
@MysqlField(alias = "nickname", type = String.class)
public String nickname;
@MysqlField(alias = "signature", type = String.class)
public String signature;
@MysqlField(alias = "avatar_larger_url", type = String.class)
public String avatarLargerUrl;
@MysqlField(alias = "share_url", type = String.class)
public String shareUrl;
@MysqlField(alias = "share_info_qrcode_url", type = String.class)
public String shareInfoQrCodeUrl;
@MysqlField(alias = "ct", type = LocalDateTime.class)
public String ct;
@MysqlField(alias = "ut", type = LocalDateTime.class)
public String ut;
public Long getUid() {
return uid;
}
public void setUid(Long uid) {
this.uid = uid;
}
public Long getShortId() {
return shortId;
}
public void setShortId(Long shortId) {
this.shortId = shortId;
}
public String getNickname() {
return nickname;
}
public void setNickname(String nickname) {
this.nickname = nickname;
}
public String getSignature() {
return signature;
}
public void setSignature(String signature) {
this.signature = signature;
}
public String getAvatarLargerUrl() {
return avatarLargerUrl;
}
public void setAvatarLargerUrl(String avatarLargerUrl) {
this.avatarLargerUrl = avatarLargerUrl;
}
public String getShareUrl() {
return shareUrl;
}
public void setShareUrl(String shareUrl) {
this.shareUrl = shareUrl;
}
public String getShareInfoQrCodeUrl() {
return shareInfoQrCodeUrl;
}
public void setShareInfoQrCodeUrl(String shareInfoQrCodeUrl) {
this.shareInfoQrCodeUrl = shareInfoQrCodeUrl;
}
public String getCt() {
return ct;
}
public void setCt(String ct) {
this.ct = ct;
}
public String getUt() {
return ut;
}
public void setUt(String ut) {
this.ut = ut;
}
@Override
public String toString() {
return "DouyinUserModel{" +
"uid=" + uid +
", shortId=" + shortId +
", nickname='" + nickname + '\'' +
", signature='" + signature + '\'' +
", avatarLargerUrl='" + avatarLargerUrl + '\'' +
", shareUrl='" + shareUrl + '\'' +
", shareInfoQrCodeUrl='" + shareInfoQrCodeUrl + '\'' +
", ct=" + ct +
", ut=" + ut +
'}';
}
}
... ...
package com.aries.crawler.model.douyincrawler;
import com.aries.crawler.annotation.MysqlField;
import com.aries.crawler.model.DataModelable;
import java.time.LocalDateTime;
/**
* 抖音数据宽表
*
* @author arowana
*/
public class DouyinVideoModel implements DataModelable {
/**
* 默认值。未处理。
*/
public static final Integer STATUS_VIDEO_DOWNLOAD_DEFAULT = 0;
/**
* 下载成功
*/
public static final Integer STATUS_VIDEO_DOWNLOAD_SUCCESS = 1;
/**
* 下载失败
*/
public static final Integer STATUS_VIDEO_DOWNLOAD_FAILED = 2;
public static final String TABLE = "douyin_video_info";
@MysqlField(alias = "id", type = Long.class)
public Long id;
@MysqlField(alias = "comments", type = String.class)
public String comments;
@MysqlField(alias = "create_time", type = Long.class)
public Long createTime;
@MysqlField(alias = "uid", type = Long.class)
public Long uid;
@MysqlField(alias = "cover_url", type = String.class)
public String coverUrl;
@MysqlField(alias = "dynamic_cover_url", type = String.class)
public String dynamicCoverUrl;
@MysqlField(alias = "download_addr_url", type = String.class)
public String downloadAddrUrl;
@MysqlField(alias = "share_url", type = String.class)
public String shareUrl;
@MysqlField(alias = "tag", type = String.class)
public String tag;
@MysqlField(alias = "duration", type = Long.class)
public Long duration;
@MysqlField(alias = "type", type = Integer.class)
public Integer type;
@MysqlField(alias = "status", type = Integer.class)
public Integer status;
@MysqlField(alias = "ct", type = LocalDateTime.class)
public String ct;
@MysqlField(alias = "ut", type = LocalDateTime.class)
public String ut;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getComments() {
return comments;
}
public void setComments(String comments) {
this.comments = comments;
}
public Long getCreateTime() {
return createTime;
}
public void setCreateTime(Long createTime) {
this.createTime = createTime;
}
public Long getUid() {
return uid;
}
public void setUid(Long uid) {
this.uid = uid;
}
public String getCoverUrl() {
return coverUrl;
}
public void setCoverUrl(String coverUrl) {
this.coverUrl = coverUrl;
}
public String getDynamicCoverUrl() {
return dynamicCoverUrl;
}
public void setDynamicCoverUrl(String dynamicCoverUrl) {
this.dynamicCoverUrl = dynamicCoverUrl;
}
public String getDownloadAddrUrl() {
return downloadAddrUrl;
}
public void setDownloadAddrUrl(String downloadAddrUrl) {
this.downloadAddrUrl = downloadAddrUrl;
}
public String getShareUrl() {
return shareUrl;
}
public void setShareUrl(String shareUrl) {
this.shareUrl = shareUrl;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public Long getDuration() {
return duration;
}
public void setDuration(Long duration) {
this.duration = duration;
}
public Integer getType() {
return type;
}
public void setType(Integer type) {
this.type = type;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public String getCt() {
return ct;
}
public void setCt(String ct) {
this.ct = ct;
}
public String getUt() {
return ut;
}
public void setUt(String ut) {
this.ut = ut;
}
@Override
public String toString() {
return "DouyinVideoModel{" +
"id=" + id +
", comments='" + comments + '\'' +
", createTime=" + createTime +
", uid=" + uid +
", coverUrl='" + coverUrl + '\'' +
", dynamicCoverUrl='" + dynamicCoverUrl + '\'' +
", downloadAddrUrl='" + downloadAddrUrl + '\'' +
", shareUrl='" + shareUrl + '\'' +
", tag='" + tag + '\'' +
", duration=" + duration +
", type=" + type +
", status=" + status +
", ct=" + ct +
", ut=" + ut +
'}';
}
}
... ...
/**
* 本包下存放的都是douyin_crawler数据库对应的orm映射关系
* <p>
* 一个子package对应一个数据库, 例如 douyincrawler包 对应 douyin_crawler数据库
* 一个子类对应一个数据表, 例如DouYinCrawlerLogModel类 对应 douyin_crawler_log数据表
*/
package com.aries.crawler.model.douyincrawler;
... ...
/**
* 数据模型层的包
* 一个子package对应一个数据库, 例如 douyincrawler包 对应 douyin_crawler数据库
* 一个子类对应一个数据表, 例如DouYinCrawlerLogModel类 对应 douyin_crawler_log数据表
*/
package com.aries.crawler.model;
... ...
package com.aries.crawler.sqlbuilder;
import java.util.List;
/**
* @author arowana
*/
public abstract class AbstractSqlBuilder {
protected void appendList(StringBuilder sql, List<?> list, String init, String sep) {
boolean first = true;
for (Object s : list) {
if (first) {
sql.append(init);
} else {
sql.append(sep);
}
sql.append(s);
first = false;
}
}
protected String repeat(String ch, Integer times, String sep) {
StringBuilder result = new StringBuilder(ch);
if (times > 1) {
result.append((sep + ch).repeat(times - 1));
}
return result.toString();
}
}
... ...
package com.aries.crawler.sqlbuilder;
import java.util.ArrayList;
import java.util.List;
/**
* @author arowana
*/
public class DeleteBuilder extends AbstractSqlBuilder {
private String table;
private List<String> wheres = new ArrayList<>();
public DeleteBuilder(String table) {
this.table = table;
}
@Override
public String toString() {
StringBuilder sql = new StringBuilder("delete from ").append(table);
appendList(sql, wheres, " where ", " and ");
return sql.toString();
}
public DeleteBuilder where(String expr) {
wheres.add(expr);
return this;
}
}
... ...
package com.aries.crawler.sqlbuilder;
import java.util.ArrayList;
import java.util.List;
/**
* @author arowana
*/
public class InsertBuilder extends AbstractSqlBuilder {
private String table;
private List<String> columns = new ArrayList<>();
private List<Object> values = new ArrayList<>();
private List<String> upColumns = new ArrayList<>();
private List<Object> upValues = new ArrayList<>();
public InsertBuilder(String table) {
this.table = table;
}
public InsertBuilder set(String column, Object value) {
columns.add(column);
values.add(value);
return this;
}
public InsertBuilder onDuplicateKeyUpdate(String column, Object value) {
upColumns.add(column);
upValues.add(value);
return this;
}
public List<Object> getValues() {
var result = new ArrayList<>();
result.addAll(values);
result.addAll(upValues);
return result;
}
public String getSql() {
var sql = new StringBuilder("insert into ")
.append(table).append(" (");
appendList(sql, columns, "", ", ");
sql.append(") values (");
sql.append(repeat("?", columns.size(), ","));
sql.append(")");
if (!upColumns.isEmpty()) {
sql.append(" on duplicate key update ");
for (int i = 0; i < upColumns.size(); i++) {
if (i == 0) {
sql.append(upColumns.get(i) + " = ?");
} else {
sql.append(", " + upColumns.get(i) + " = ?");
}
}
}
return sql.toString();
}
}
... ...
package com.aries.crawler.sqlbuilder;
import java.util.ArrayList;
import java.util.List;
/**
* @author arowana
*/
public class SelectBuilder extends AbstractSqlBuilder implements Cloneable {
private boolean distinct;
private List<Object> columns = new ArrayList<>();
private List<String> tables = new ArrayList<>();
private List<String> joins = new ArrayList<>();
private List<String> leftJoins = new ArrayList<>();
private List<String> wheres = new ArrayList<>();
private List<String> groupBys = new ArrayList<>();
private List<String> havings = new ArrayList<>();
private List<SelectBuilder> unions = new ArrayList<>();
private List<String> orderBys = new ArrayList<>();
private Long offset = 0L;
private Long limit = 0L;
private boolean forUpdate;
private boolean noWait;
public SelectBuilder() {
}
public SelectBuilder(String table) {
tables.add(table);
}
/**
* Copy constructor. Used by {@link #clone()}.
*
* @param other SelectBuilder being cloned.
*/
protected SelectBuilder(SelectBuilder other) {
this.distinct = other.distinct;
this.forUpdate = other.forUpdate;
this.noWait = other.noWait;
for (Object column : other.columns) {
if (column instanceof SubSelectBuilder) {
this.columns.add(((SubSelectBuilder) column).clone());
} else {
this.columns.add(column);
}
}
this.tables.addAll(other.tables);
this.joins.addAll(other.joins);
this.leftJoins.addAll(other.leftJoins);
this.wheres.addAll(other.wheres);
this.groupBys.addAll(other.groupBys);
this.havings.addAll(other.havings);
for (SelectBuilder sb : other.unions) {
this.unions.add(sb.clone());
}
this.orderBys.addAll(other.orderBys);
}
/**
* Alias for {@link #where(String)}.
*/
public SelectBuilder and(String expr) {
return where(expr);
}
public SelectBuilder column(String name) {
columns.add(name);
return this;
}
public SelectBuilder column(SubSelectBuilder subSelect) {
columns.add(subSelect);
return this;
}
public SelectBuilder column(String name, boolean groupBy) {
columns.add(name);
if (groupBy) {
groupBys.add(name);
}
return this;
}
@Override
public SelectBuilder clone() {
return new SelectBuilder(this);
}
public SelectBuilder distinct() {
this.distinct = true;
return this;
}
public SelectBuilder forUpdate() {
forUpdate = true;
return this;
}
public SelectBuilder from(String table) {
tables.add(table);
return this;
}
public List<SelectBuilder> getUnions() {
return unions;
}
public SelectBuilder groupBy(String expr) {
groupBys.add(expr);
return this;
}
public SelectBuilder having(String expr) {
havings.add(expr);
return this;
}
public SelectBuilder join(String join) {
joins.add(join);
return this;
}
public SelectBuilder leftJoin(String join) {
leftJoins.add(join);
return this;
}
public SelectBuilder noWait() {
if (!forUpdate) {
throw new RuntimeException("noWait without forUpdate cannot be called");
}
noWait = true;
return this;
}
public SelectBuilder orderBy(String name) {
orderBys.add(name);
return this;
}
/**
* Adds an ORDER BY item with a direction indicator.
*
* @param name Name of the column by which to sort.
* @param ascending If true, specifies the direction "asc", otherwise, specifies
* the direction "desc".
*/
public SelectBuilder orderBy(String name, boolean ascending) {
if (ascending) {
orderBys.add(name + " asc");
} else {
orderBys.add(name + " desc");
}
return this;
}
@Override
public String toString() {
StringBuilder sql = new StringBuilder("select ");
if (distinct) {
sql.append("distinct ");
}
if (columns.size() == 0) {
sql.append("*");
} else {
appendList(sql, columns, "", ", ");
}
appendList(sql, tables, " from ", ", ");
appendList(sql, joins, " join ", " join ");
appendList(sql, leftJoins, " left join ", " left join ");
appendList(sql, wheres, " where ", " and ");
appendList(sql, groupBys, " group by ", ", ");
appendList(sql, havings, " having ", " and ");
appendList(sql, unions, " union ", " union ");
appendList(sql, orderBys, " order by ", ", ");
if (limit > 0) {
sql.append(" limit ");
sql.append(limit);
sql.append(" offset ");
sql.append(offset);
}
if (forUpdate) {
sql.append(" for update");
if (noWait) {
sql.append(" nowait");
}
}
return sql.toString();
}
/**
* Adds a "union" select builder. The generated SQL will union this query
* with the result of the main query. The provided builder must have the
* same columns as the parent select builder and must not use "order by" or
* "for update".
*/
public SelectBuilder union(SelectBuilder unionBuilder) {
unions.add(unionBuilder);
return this;
}
public SelectBuilder where(String expr) {
wheres.add(expr);
return this;
}
public SelectBuilder offset(Long offset) {
this.offset = offset;
return this;
}
public SelectBuilder limit(Long limit) {
this.limit = limit;
return this;
}
}
... ...
package com.aries.crawler.sqlbuilder;
/**
* @author arowana
*/
public class SubSelectBuilder extends SelectBuilder {
private String alias;
public SubSelectBuilder(String alias) {
this.alias = alias;
}
protected SubSelectBuilder(SubSelectBuilder other) {
super(other);
this.alias = other.alias;
}
@Override
public SubSelectBuilder clone() {
return new SubSelectBuilder(this);
}
@Override
public String toString() {
return new StringBuilder()
.append("(")
.append(super.toString())
.append(") as ")
.append(alias)
.toString();
}
}
\ No newline at end of file
... ...
package com.aries.crawler.sqlbuilder;
import java.util.ArrayList;
import java.util.List;
/**
* @author arowana
*/
public class UpdateBuilder extends AbstractSqlBuilder {
private String table;
private List<String> sets = new ArrayList<String>();
private List<String> wheres = new ArrayList<String>();
public UpdateBuilder(String table) {
this.table = table;
}
public UpdateBuilder set(String expr) {
sets.add(expr);
return this;
}
@Override
public String toString() {
var sql = new StringBuilder("update ")
.append(table);
appendList(sql, sets, " set ", ", ");
appendList(sql, wheres, " where ", " and ");
return sql.toString();
}
public UpdateBuilder where(String expr) {
wheres.add(expr);
return this;
}
}
... ...
package com.aries.crawler.tools;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
/**
* 用于获取mysql客户端、连接池
*
* @author arowana
*/
public class MySqlClientHelper {
private static volatile JDBCClient jdbcClient;
/**
* 防止实例化
*/
private MySqlClientHelper() {
}
/**
* 双重校验, 单例模式, 创建mysql连接池
*
* @param vertx 全局vertx
* @return 数据库连接池
*/
public static JDBCClient getJDBcClient(Vertx vertx) {
if (jdbcClient == null) {
synchronized (MySqlClientHelper.class) {
if (jdbcClient == null) {
JsonObject dbConfig = new JsonObject();
dbConfig.put("url", "jdbc:mysql://localhost:3306/douyin_crawler");
dbConfig.put("driver_class", "com.mysql.jdbc.Driver");
dbConfig.put("user", "root");
dbConfig.put("password", "1qaz2wsx"); // 反正是localhost, 密码随便看
// dbConfig.put("provider_class", "io.vertx.ext.jdbc.spi.impl.HikariCPDataSourceProvider");
dbConfig.put("maximumPoolSize", 200);
dbConfig.put("cachePrepStmts", true);
dbConfig.put("prepStmtCacheSize", 250);
dbConfig.put("prepStmtCacheSqlLimit", 2048);
jdbcClient = JDBCClient.createShared(vertx, dbConfig, "my-data-pool¬");
}
}
}
return jdbcClient;
}
}
... ...
package com.aries.crawler.tools;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.sql.ResultSet;
import java.util.List;
/**
* @author arowana
*/
public class MySqlExecuteHelper {
private static final Logger logger = LoggerFactory.getLogger(MySqlExecuteHelper.class);
/**
* 防止实例化
*/
private MySqlExecuteHelper() {
}
/**
* @param vertx 全局vertx
* @param sql 要执行的sql语句
* @param arguments sql参数
* @param handler 回调
*/
public static void prepareExecute(Vertx vertx, String sql, List<Object> arguments, Handler<AsyncResult<ResultSet>> handler) {
var jdbcClient = MySqlClientHelper.getJDBcClient(vertx);
// 构造参数
JsonArray params = new JsonArray();
for (Object argument : arguments) {
params.add(argument);
}
// 执行查询
jdbcClient.queryWithParams(sql, params, handler);
}
public static void execute(Vertx vertx, String sql, Handler<AsyncResult<ResultSet>> handler) {
logger.debug("准备执行sql: " + sql);
MySqlClientHelper.getJDBcClient(vertx).getConnection(connectionHandlerRes -> {
if (connectionHandlerRes.succeeded()) {
var connection = connectionHandlerRes.result();
connection.query(sql, handler);
connection.close();
} else {
logger.error("Could not connect: " + connectionHandlerRes.cause().getMessage());
}
});
}
}
... ...
package com.aries.crawler.tools;
import com.aries.crawler.annotation.MysqlField;
import com.aries.crawler.model.DataModelable;
import com.aries.crawler.verticles.WideDataPickUpVerticle;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
/**
* vert.x没有orm, 使用mysql获取到的数据很难转化为对象。
* 平时只能这样使用:
* <code>
* DouYinCrawlerLogModel model = new DouYinCrawlerLogModel();
* model.setFirstName(row.getString("first_name"));
* model.setMale(row.getBoolean("male"));
* model.setAge(row.getInteger("age"));
* ...
* ...
* <code/>
* 可见, 想要把数据库对象row转化为model需要花费数行代码, 该表中的字段越多, 代码函数耗费的越多。
* 所以, 我实现了本方法, 只需一行代码即可将数据库对象Row转化为Model对象, 弥补了vert.x没有orm的不便之处:
* <code>
* DouYinCrawlerLogModel model = MySQLHelper.getModel(row, DouYinCrawlerLogModel.class);
* </code>
*
* @author arowana
*/
public class Orm {
private static final Logger logger = LoggerFactory.getLogger(WideDataPickUpVerticle.class);
/**
* 防止实例化
*/
private Orm() {
}
/**
* @param row 从vert.x-mysql获取到的数据数据行对象
* @param clazz 要转化的model类
* @param <T> 继承DataModel的某个类
* @return 返回clazz类型的一个实例
*/
public static <T extends DataModelable> T getModel(JsonObject row, Class<T> clazz) {
try {
// 反射获取clazz的构造器
Constructor<? extends T> constructor = clazz.getConstructor();
// 利用反射的构造器实例化
T dataModel = constructor.newInstance();
// 获取所有字段
Field[] dataModelFields = clazz.getDeclaredFields();
// 如果存在字段
if (dataModelFields.length != 0) {
for (Field dataModelField : dataModelFields) {
dataModelField.setAccessible(true);
MysqlField annotation = dataModelField.getAnnotation(MysqlField.class);
if (annotation != null) {
Object columnValue = row.getValue(annotation.alias());
// 将值反射到dataModel里
dataModelField.set(dataModel, columnValue);
}
}
}
return dataModel;
} catch (Exception e) {
logger.error("exception in MysqlHelper.getModel(Row, Class): ", e);
}
return null;
}
}
... ...
package com.aries.crawler.tools;
/**
* @author arowana
*/
public final class Urls {
private static final String CM = "://";
private static final String USER_SHARE_PAGE_TEMPLATE = "https://www.iesdouyin.com/share/user/%d";
/**
* 防止实例化
*/
private Urls() {
}
/**
* 获取用户分享页的url
*/
public static String getUserSharePage(final Long uid) {
return String.format(USER_SHARE_PAGE_TEMPLATE, uid);
}
/**
* @param url http(s)://www.kkk.com/xxx/yyy
* @return
*/
public static RequestInfo getInfo(String url) {
var cmIndex = url.indexOf(CM);
var hostStart = cmIndex + CM.length();
var hostEnd = url.indexOf("/", hostStart);
var host = url.substring(hostStart, hostEnd);//前缀https
var path = url.substring(hostEnd);
return new RequestInfo(host, path);
}
public static record RequestInfo(String host, String path) {
}
}
... ...
package com.aries.crawler.trans;
/**
* @author arowana
*/
public enum EventBusTopic {
/**
* 抖音用户数据插入
*/
LOGIC_DOUYIN_WIDEDATA_DISPATCH("logic.douyin.widedata.dispatch"),
MYSQL_DOUYIN_USER_INSERT("mysql.douyin.user.insert"),
MYSQL_DOUYIN_VIDEO_INSERT("mysql.douyin.video.insert"),
MYSQL_DOUYIN_WIDEDATA_UPDATE_STATUS_VIDEO("mysql.douyin.widedata.update.status.video"),
MYSQL_DOUYIN_WIDEDATA_UPDATE_STATUS_USER("mysql.douyin.widedata.update.status.user"),
LOGIC_DOUYIN_VIDEO_DOWNLOAD("logic.douyin.video.url.parse"),
MYSQL_DOUYIN_VIDEO__UPDATE_STATUS_DOWNLOADED("mysql.douyin.video.update.status.downloaded"),
MYSQL_DOUYIN_VIDEO__UPDATE_STATUS_FAILED("mysql.douyin.video.update.status.failed");
EventBusTopic(String topic) {
this.topic = topic;
}
private String topic;
public String getTopic() {
return topic;
}
}
... ...
package com.aries.crawler.trans;
/**
* @author arowana
*/
public interface Messagable {
}
... ...
package com.aries.crawler.trans.codec;
import com.aries.crawler.trans.Messagable;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.json.JsonObject;
import java.lang.reflect.ParameterizedType;
import java.util.UUID;
/**
* common message codec(通用的codec)
*
* @author arowana
*/
public class CommonMessageCodec<T extends Messagable> implements MessageCodec<T, T> {
@Override
public void encodeToWire(Buffer buffer, T message) {
// Easiest ways is using JSON object
JsonObject jsonToEncode = JsonObject.mapFrom(message);
// Encode object to string
String jsonToStr = jsonToEncode.encode();
// Length of JSON: is NOT characters count
int length = jsonToStr.getBytes().length;
// Write data into given buffer
buffer.appendInt(length);
buffer.appendString(jsonToStr);
}
@Override
public T decodeFromWire(int position, Buffer buffer) {
// My custom message starting from this *position* of buffer
int _pos = position;
// Length of JSON
int length = buffer.getInt(_pos);
// Get JSON string by it`s length
// Jump 4 because getInt() == 4 bytes
String jsonStr = buffer.getString(_pos += 4, _pos += length);
JsonObject jsonObject = new JsonObject(jsonStr);
// Get fields
@SuppressWarnings("unchecked")
Class<T> entityClass = (Class<T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
return jsonObject.mapTo(entityClass);
}
@Override
public T transform(T commonMessage) {
return commonMessage;
}
@Override
public String name() {
return this.getClass().getSimpleName() + UUID.randomUUID();
}
@Override
public byte systemCodecID() {
return -1;
}
}
\ No newline at end of file
... ...
package com.aries.crawler.trans.message;
import com.aries.crawler.trans.Messagable;
/**
* immutable,通用message
*
* @author arowana
*/
public record CommonResponseMessage<T>(Integer code, T message) implements Messagable {
public static final CommonResponseMessage<Object> COMMON_SUCCESS_MESSAGE = CommonResponseMessage.CommonResponseMessageBuilder
.aCommonResponseMessage()
.code(100)
.message("success")
.build();
public static final CommonResponseMessage<Object> COMMON_FAILED_MESSAGE = CommonResponseMessage.CommonResponseMessageBuilder
.aCommonResponseMessage()
.code(1000)
.message("failed")
.build();
public static final class CommonResponseMessageBuilder<T> {
private Integer code;
private T message;
private CommonResponseMessageBuilder() {
}
public static <T> CommonResponseMessageBuilder<T> aCommonResponseMessage() {
return new CommonResponseMessageBuilder<T>();
}
public CommonResponseMessageBuilder<T> code(Integer code) {
this.code = code;
return this;
}
public CommonResponseMessageBuilder<T> message(T message) {
this.message = message;
return this;
}
public CommonResponseMessage<T> build() {
return new CommonResponseMessage<>(code, message);
}
}
}
... ...
package com.aries.crawler.trans.message;
import com.aries.crawler.tools.Urls;
import com.aries.crawler.trans.Messagable;
/**
* immutable message, 用户数据message
*
* @author arowana
*/
public record DouyinUserInfoMessage(Long uid, Long shortId, String nickname, String signature,
String avatarLargerUrl, String shareUrl,
String shareInfoQrCodeUrl) implements Messagable {
public static DouyinUserInfoMessage of(DouyinWideDataMessage wideDataMessage) {
return new DouyinUserInfoMessageBuilder()
.uid(wideDataMessage.authorUid())
.shortId(wideDataMessage.authorShortId())
.nickname(wideDataMessage.authorNickname())
.signature(wideDataMessage.authorSignature())
.avatarLargerUrl(wideDataMessage.avatarLargerUrl())
.shareUrl(Urls.getUserSharePage(wideDataMessage.authorUid()))
.shareInfoQrCodeUrl(wideDataMessage.authorShareInfoQrcodeUrl())
.build();
}
public static final class DouyinUserInfoMessageBuilder {
private Long uid;
private Long shortId;
private String nickname;
private String signature;
private String avatarLargerUrl;
private String shareUrl;
private String shareInfoQrCodeUrl;
private DouyinUserInfoMessageBuilder() {
}
public static DouyinUserInfoMessageBuilder aDouyinUserInfoMessage() {
return new DouyinUserInfoMessageBuilder();
}
public DouyinUserInfoMessageBuilder uid(Long uid) {
this.uid = uid;
return this;
}
public DouyinUserInfoMessageBuilder shortId(Long shortId) {
this.shortId = shortId;
return this;
}
public DouyinUserInfoMessageBuilder nickname(String nickname) {
this.nickname = nickname;
return this;
}
public DouyinUserInfoMessageBuilder signature(String signature) {
this.signature = signature;
return this;
}
public DouyinUserInfoMessageBuilder avatarLargerUrl(String avatarLargerUrl) {
this.avatarLargerUrl = avatarLargerUrl;
return this;
}
public DouyinUserInfoMessageBuilder shareUrl(String shareUrl) {
this.shareUrl = shareUrl;
return this;
}
public DouyinUserInfoMessageBuilder shareInfoQrCodeUrl(String shareInfoQrCodeUrl) {
this.shareInfoQrCodeUrl = shareInfoQrCodeUrl;
return this;
}
public DouyinUserInfoMessage build() {
return new DouyinUserInfoMessage(uid, shortId, nickname, signature, avatarLargerUrl, shareUrl, shareInfoQrCodeUrl);
}
}
}
... ...
package com.aries.crawler.trans.message;
import com.aries.crawler.model.douyincrawler.DouyinVideoModel;
import com.aries.crawler.trans.Messagable;
/**
* immutable message, 视频数据message
*
* @author arowana
*/
public record DouyinVideoInfoMessage(Long awemeId, Long authorUid, String awemeDesc, Long awemeCreateTime,
String videoCoverUrl, String videoDynamicCoverUrl, String videoDownloadAddrUrl,
String videoShareUrl, String videoVideoTag, Long videoDuration,
Integer type) implements Messagable {
public static DouyinVideoInfoMessage of(DouyinWideDataMessage wideDataMessage) {
return new DouyinVideoInfoMessageBuilder()
.awemeId(wideDataMessage.awemeId())
.awemeDesc(wideDataMessage.awemeDesc())
.awemeCreateTime(wideDataMessage.awemeCreateTime())
.authorUid(wideDataMessage.authorUid())
.videoCoverUrl(wideDataMessage.videoCoverUrl())
.videoDynamicCoverUrl(wideDataMessage.videoDynamicCoverUrl())
.videoDownloadAddrUrl(wideDataMessage.videoDownloadAddrUrl())
.videoShareUrl(wideDataMessage.videoShareUrl())
.videoVideoTag(wideDataMessage.videoVideoTag())
.videoDuration(wideDataMessage.videoDuration())
.type(wideDataMessage.type())
.build();
}
public static DouyinVideoInfoMessage of(DouyinVideoModel douyinVideoModel) {
return new DouyinVideoInfoMessageBuilder()
.awemeId(douyinVideoModel.getId())
.awemeDesc(douyinVideoModel.getComments())
.awemeCreateTime(douyinVideoModel.getCreateTime())
.authorUid(douyinVideoModel.getUid())
.videoCoverUrl(douyinVideoModel.getCoverUrl())
.videoDynamicCoverUrl(douyinVideoModel.getDynamicCoverUrl())
.videoDownloadAddrUrl(douyinVideoModel.getDownloadAddrUrl())
.videoShareUrl(douyinVideoModel.getShareUrl())
.videoVideoTag(douyinVideoModel.getTag())
.videoDuration(douyinVideoModel.getDuration())
.type(douyinVideoModel.getType())
.build();
}
public static final class DouyinVideoInfoMessageBuilder {
private Long awemeId;
private Long authorUid;
private String awemeDesc;
private Long awemeCreateTime;
private String videoCoverUrl;
private String videoDynamicCoverUrl;
private String videoDownloadAddrUrl;
private String videoShareUrl;
private String videoVideoTag;
private Long videoDuration;
private Integer type;
private DouyinVideoInfoMessageBuilder() {
}
public static DouyinVideoInfoMessageBuilder aDouyinVideoInfoMessage() {
return new DouyinVideoInfoMessageBuilder();
}
public DouyinVideoInfoMessageBuilder awemeId(Long awemeId) {
this.awemeId = awemeId;
return this;
}
public DouyinVideoInfoMessageBuilder authorUid(Long authorUid) {
this.authorUid = authorUid;
return this;
}
public DouyinVideoInfoMessageBuilder awemeDesc(String awemeDesc) {
this.awemeDesc = awemeDesc;
return this;
}
public DouyinVideoInfoMessageBuilder awemeCreateTime(Long awemeCreateTime) {
this.awemeCreateTime = awemeCreateTime;
return this;
}
public DouyinVideoInfoMessageBuilder videoCoverUrl(String videoCoverUrl) {
this.videoCoverUrl = videoCoverUrl;
return this;
}
public DouyinVideoInfoMessageBuilder videoDynamicCoverUrl(String videoDynamicCoverUrl) {
this.videoDynamicCoverUrl = videoDynamicCoverUrl;
return this;
}
public DouyinVideoInfoMessageBuilder videoDownloadAddrUrl(String videoDownloadAddrUrl) {
this.videoDownloadAddrUrl = videoDownloadAddrUrl;
return this;
}
public DouyinVideoInfoMessageBuilder videoShareUrl(String videoShareUrl) {
this.videoShareUrl = videoShareUrl;
return this;
}
public DouyinVideoInfoMessageBuilder videoVideoTag(String videoVideoTag) {
this.videoVideoTag = videoVideoTag;
return this;
}
public DouyinVideoInfoMessageBuilder videoDuration(Long videoDuration) {
this.videoDuration = videoDuration;
return this;
}
public DouyinVideoInfoMessageBuilder type(Integer type) {
this.type = type;
return this;
}
public DouyinVideoInfoMessage build() {
return new DouyinVideoInfoMessage(awemeId, authorUid, awemeDesc, awemeCreateTime, videoCoverUrl, videoDynamicCoverUrl, videoDownloadAddrUrl, videoShareUrl, videoVideoTag, videoDuration, type);
}
}
}
... ...
package com.aries.crawler.trans.message;
import com.aries.crawler.model.douyincrawler.DouyinCrawlerLogModel;
import com.aries.crawler.trans.Messagable;
import java.math.BigInteger;
/**
* immutable message, 宽表数据message
*
* @author arowana
*/
public record DouyinWideDataMessage(BigInteger id, Long awemeId, String awemeDesc, Long awemeCreateTime,
Long authorUid, Long authorShortId, String authorNickname, String authorSignature,
String avatarLargerUrl, String authorShareInfoQrcodeUrl,
String videoCoverUrl, String videoDynamicCoverUrl, String videoDownloadAddrUrl,
String videoShareUrl, String videoVideoTag, Long videoDuration,
Integer type, Integer status, String ct, String ut) implements Messagable {
public static DouyinWideDataMessage of(DouyinCrawlerLogModel douyinCrawlerLogModel) {
return new DouyinWideDataMessageBuilder()
.id(douyinCrawlerLogModel.getId())
.awemeId(douyinCrawlerLogModel.getAwemeId())
.awemeDesc(douyinCrawlerLogModel.getAwemeDesc())
.awemeCreateTime(douyinCrawlerLogModel.getAwemeCreateTime())
.authorUid(douyinCrawlerLogModel.getAuthorUid())
.authorShortId(douyinCrawlerLogModel.getAuthorShortId())
.authorNickname(douyinCrawlerLogModel.getAuthorNickname())
.authorSignature(douyinCrawlerLogModel.getAuthorSignature())
.avatarLargerUrl(douyinCrawlerLogModel.getAvatarLargerUrl())
.authorShareInfoQrcodeUrl(douyinCrawlerLogModel.getAuthorShareInfoQrcodeUrl())
.videoCoverUrl(douyinCrawlerLogModel.getVideoCoverUrl())
.videoDynamicCoverUrl(douyinCrawlerLogModel.getVideoDynamicCoverUrl())
.videoDownloadAddrUrl(douyinCrawlerLogModel.getVideoDownloadAddrUrl())
.videoShareUrl(douyinCrawlerLogModel.getVideoShareUrl())
.videoVideoTag(douyinCrawlerLogModel.getVideoVideoTag())
.videoDuration(douyinCrawlerLogModel.getVideoDuration())
.type(douyinCrawlerLogModel.getType())
.status(douyinCrawlerLogModel.getStatus())
.ct(douyinCrawlerLogModel.getCt())
.ut(douyinCrawlerLogModel.getUt())
.build();
}
public static final class DouyinWideDataMessageBuilder {
private BigInteger id;
private Long awemeId;
private String awemeDesc;
private Long awemeCreateTime;
private Long authorUid;
private Long authorShortId;
private String authorNickname;
private String authorSignature;
private String avatarLargerUrl;
private String authorShareInfoQrcodeUrl;
private String videoCoverUrl;
private String videoDynamicCoverUrl;
private String videoDownloadAddrUrl;
private String videoShareUrl;
private String videoVideoTag;
private Long videoDuration;
private Integer type;
private Integer status;
private String ct;
private String ut;
private DouyinWideDataMessageBuilder() {
}
public static DouyinWideDataMessageBuilder aDouyinWideDataMessage() {
return new DouyinWideDataMessageBuilder();
}
public DouyinWideDataMessageBuilder id(BigInteger id) {
this.id = id;
return this;
}
public DouyinWideDataMessageBuilder awemeId(Long awemeId) {
this.awemeId = awemeId;
return this;
}
public DouyinWideDataMessageBuilder awemeDesc(String awemeDesc) {
this.awemeDesc = awemeDesc;
return this;
}
public DouyinWideDataMessageBuilder awemeCreateTime(Long awemeCreateTime) {
this.awemeCreateTime = awemeCreateTime;
return this;
}
public DouyinWideDataMessageBuilder authorUid(Long authorUid) {
this.authorUid = authorUid;
return this;
}
public DouyinWideDataMessageBuilder authorShortId(Long authorShortId) {
this.authorShortId = authorShortId;
return this;
}
public DouyinWideDataMessageBuilder authorNickname(String authorNickname) {
this.authorNickname = authorNickname;
return this;
}
public DouyinWideDataMessageBuilder authorSignature(String authorSignature) {
this.authorSignature = authorSignature;
return this;
}
public DouyinWideDataMessageBuilder avatarLargerUrl(String avatarLargerUrl) {
this.avatarLargerUrl = avatarLargerUrl;
return this;
}
public DouyinWideDataMessageBuilder authorShareInfoQrcodeUrl(String authorShareInfoQrcodeUrl) {
this.authorShareInfoQrcodeUrl = authorShareInfoQrcodeUrl;
return this;
}
public DouyinWideDataMessageBuilder videoCoverUrl(String videoCoverUrl) {
this.videoCoverUrl = videoCoverUrl;
return this;
}
public DouyinWideDataMessageBuilder videoDynamicCoverUrl(String videoDynamicCoverUrl) {
this.videoDynamicCoverUrl = videoDynamicCoverUrl;
return this;
}
public DouyinWideDataMessageBuilder videoDownloadAddrUrl(String videoDownloadAddrUrl) {
this.videoDownloadAddrUrl = videoDownloadAddrUrl;
return this;
}
public DouyinWideDataMessageBuilder videoShareUrl(String videoShareUrl) {
this.videoShareUrl = videoShareUrl;
return this;
}
public DouyinWideDataMessageBuilder videoVideoTag(String videoVideoTag) {
this.videoVideoTag = videoVideoTag;
return this;
}
public DouyinWideDataMessageBuilder videoDuration(Long videoDuration) {
this.videoDuration = videoDuration;
return this;
}
public DouyinWideDataMessageBuilder type(Integer type) {
this.type = type;
return this;
}
public DouyinWideDataMessageBuilder status(Integer status) {
this.status = status;
return this;
}
public DouyinWideDataMessageBuilder ct(String ct) {
this.ct = ct;
return this;
}
public DouyinWideDataMessageBuilder ut(String ut) {
this.ut = ut;
return this;
}
public DouyinWideDataMessage build() {
return new DouyinWideDataMessage(id, awemeId, awemeDesc, awemeCreateTime, authorUid, authorShortId, authorNickname, authorSignature, avatarLargerUrl, authorShareInfoQrcodeUrl, videoCoverUrl, videoDynamicCoverUrl, videoDownloadAddrUrl, videoShareUrl, videoVideoTag, videoDuration, type, status, ct, ut);
}
}
}
... ...
package com.aries.crawler.trans.message;
import com.aries.crawler.trans.Messagable;
import java.math.BigInteger;
/**
* immutable message, 用户数据message
*
* @author arowana
*/
public record SimpleInt64Message(BigInteger id) implements Messagable {
}
... ...
package com.aries.crawler.verticles;
import com.aries.crawler.sqlbuilder.UpdateBuilder;
import com.aries.crawler.tools.MySqlExecuteHelper;
import com.aries.crawler.trans.message.SimpleInt64Message;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import static com.aries.crawler.trans.EventBusTopic.*;
import static com.aries.crawler.trans.message.CommonResponseMessage.COMMON_FAILED_MESSAGE;
import static com.aries.crawler.trans.message.CommonResponseMessage.COMMON_SUCCESS_MESSAGE;
/**
* @author arowana
*/
public class UpdateDataVerticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(UpdateDataVerticle.class);
public static String getDateTimeAsString(LocalDateTime localDateTime, String format) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
return localDateTime.format(formatter);
}
@Override
public void start() {
// 更新款表的status状态为'已处理用户数据'状态
vertx.eventBus().consumer(MYSQL_DOUYIN_WIDEDATA_UPDATE_STATUS_USER.getTopic(), this::mysqlDouyinWideDataUpdateStatusUser).setMaxBufferedMessages(4000);
// 更新款表的status状态为'已处理视频数据'状态
vertx.eventBus().consumer(MYSQL_DOUYIN_WIDEDATA_UPDATE_STATUS_VIDEO.getTopic(), this::mysqlDouyinWideDataUpdateStatusVideo).setMaxBufferedMessages(4000);
// 更新完成下载的视频
vertx.eventBus().consumer(MYSQL_DOUYIN_VIDEO__UPDATE_STATUS_DOWNLOADED.getTopic(), this::mysqlDouyinVideoDataUpdateStatusDownloaded).setMaxBufferedMessages(4000);
// 更新下载失败的视频
vertx.eventBus().consumer(MYSQL_DOUYIN_VIDEO__UPDATE_STATUS_FAILED.getTopic(), this::mysqlDouyinVideoDataUpdateStatusFailed).setMaxBufferedMessages(4000);
}
private void mysqlDouyinWideDataUpdateStatusUser(Message<Object> message) {
var idMessage = (SimpleInt64Message) message.body();
// 构建sql数据, 插入用户信息。
var insertBuilder = new UpdateBuilder("douyin_crawler_log")
.set("status = status | 1")
.where("id=" + idMessage.id())
.toString();
MySqlExecuteHelper.execute(vertx, insertBuilder, mysqlExecutorRes -> {
if (mysqlExecutorRes.succeeded()) {
message.reply(COMMON_SUCCESS_MESSAGE);
} else {
message.reply(COMMON_FAILED_MESSAGE);
}
});
}
private void mysqlDouyinWideDataUpdateStatusVideo(Message<Object> message) {
var idMessage = (SimpleInt64Message) message.body();
// 构建sql数据, 插入用户信息。
var insertBuilder = new UpdateBuilder("douyin_crawler_log")
.set("status = status | 2")
.where("id=" + idMessage.id())
.toString();
MySqlExecuteHelper.execute(vertx, insertBuilder, mysqlExecutorRes -> {
if (mysqlExecutorRes.succeeded()) {
message.reply(COMMON_SUCCESS_MESSAGE);
} else {
message.reply(COMMON_FAILED_MESSAGE);
}
});
}
private void mysqlDouyinVideoDataUpdateStatusDownloaded(Message<Object> message) {
var idMessage = (SimpleInt64Message) message.body();
// 构建sql数据, 插入用户信息。
var insertBuilder = new UpdateBuilder("douyin_video_info")
.set("status = status | 1")
.where("id=" + idMessage.id())
.toString();
MySqlExecuteHelper.execute(vertx, insertBuilder, mysqlExecutorRes -> {
if (mysqlExecutorRes.succeeded()) {
message.reply(COMMON_SUCCESS_MESSAGE);
} else {
message.reply(COMMON_FAILED_MESSAGE);
}
});
}
private void mysqlDouyinVideoDataUpdateStatusFailed(Message<Object> message) {
var idMessage = (SimpleInt64Message) message.body();
// 构建sql数据, 插入用户信息。
var insertBuilder = new UpdateBuilder("douyin_video_info")
.set("status = status | 2")
.where("id=" + idMessage.id())
.toString();
MySqlExecuteHelper.execute(vertx, insertBuilder, mysqlExecutorRes -> {
if (mysqlExecutorRes.succeeded()) {
message.reply(COMMON_SUCCESS_MESSAGE);
} else {
message.reply(COMMON_FAILED_MESSAGE);
}
});
}
}
\ No newline at end of file
... ...
package com.aries.crawler.verticles;
import com.aries.crawler.sqlbuilder.InsertBuilder;
import com.aries.crawler.tools.MySqlExecuteHelper;
import com.aries.crawler.trans.message.DouyinUserInfoMessage;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import static com.aries.crawler.trans.EventBusTopic.MYSQL_DOUYIN_USER_INSERT;
import static com.aries.crawler.trans.message.CommonResponseMessage.COMMON_FAILED_MESSAGE;
import static com.aries.crawler.trans.message.CommonResponseMessage.COMMON_SUCCESS_MESSAGE;
/**
* @author arowana
*/
public class UserInsertVerticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(UserInsertVerticle.class);
@Override
public void start() {
// 用于插入用户数据
vertx.eventBus().consumer(MYSQL_DOUYIN_USER_INSERT.getTopic(), this::mysqlDouyinUserInsertHandler).setMaxBufferedMessages(4000);
}
public static String getDateTimeAsString(LocalDateTime localDateTime, String format) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
return localDateTime.format(formatter);
}
private void mysqlDouyinUserInsertHandler(Message<Object> message) {
var userInfoMessage = (DouyinUserInfoMessage) message.body();
// 构建sql数据, 插入用户信息。
var insertBuilder = new InsertBuilder("douyin_user_info")
.set("uid", userInfoMessage.uid())
.set("short_id", userInfoMessage.shortId())
.set("nickname", userInfoMessage.nickname())
.set("signature", userInfoMessage.signature())
.set("avatar_larger_url", userInfoMessage.avatarLargerUrl())
.set("share_url", userInfoMessage.shareUrl())
.set("share_info_qrcode_url", userInfoMessage.shareInfoQrCodeUrl())
.onDuplicateKeyUpdate("ut", getDateTimeAsString(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"));
logger.info("user sql:" + insertBuilder.getSql() + "---values:" + insertBuilder.getValues());
MySqlExecuteHelper.prepareExecute(vertx, insertBuilder.getSql(), insertBuilder.getValues(), mysqlExecutorRes -> {
if (mysqlExecutorRes.succeeded()) {
logger.info("insert user succ, uid:" + userInfoMessage.uid());
message.reply(COMMON_SUCCESS_MESSAGE);
} else {
logger.info("insert user. failed" + mysqlExecutorRes.cause());
message.reply(COMMON_FAILED_MESSAGE);
}
});
}
}
... ...
package com.aries.crawler.verticles;
import com.aries.crawler.model.douyincrawler.DouyinVideoModel;
import com.aries.crawler.sqlbuilder.SelectBuilder;
import com.aries.crawler.tools.MySqlExecuteHelper;
import com.aries.crawler.tools.Orm;
import com.aries.crawler.trans.message.DouyinVideoInfoMessage;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import static com.aries.crawler.trans.EventBusTopic.LOGIC_DOUYIN_VIDEO_DOWNLOAD;
/**
* @author arowana
*/
public class VideoDataPickUpVerticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WideDataPickUpVerticle.class);
private Consumer<Long> consumer = (offset) -> {
var sql = new SelectBuilder()
.column("*")
.from(DouyinVideoModel.TABLE)
.where(" status = " + DouyinVideoModel.STATUS_VIDEO_DOWNLOAD_DEFAULT)
.limit(1L)
.offset(offset)
.orderBy("ct", false)
.toString();
logger.info("构建pick up sql: " + sql);
MySqlExecuteHelper.prepareExecute(vertx, sql, new ArrayList<>(), mysqlExecutorRes -> {
if (mysqlExecutorRes.succeeded()) {
List<JsonObject> rows = mysqlExecutorRes.result().getRows();
for (JsonObject row : rows) {
var model = Orm.getModel(row, DouyinVideoModel.class);
DouyinVideoInfoMessage douyinVideoInfoMessage = DouyinVideoInfoMessage.of(model);
vertx.eventBus().request(LOGIC_DOUYIN_VIDEO_DOWNLOAD.getTopic(), douyinVideoInfoMessage, updateReply -> {
if (updateReply.succeeded()) {
logger.info("download video succ ...");
} else {
logger.info("download video fail ...");
}
});
}
} else {
logger.error("execute download video failed: " + mysqlExecutorRes.cause());
}
});
};
@Override
public void start() {
vertx.setPeriodic(5000, id -> {
consumer.accept(0L);
});
// vertx.setPeriodic(2000, id -> vertx.executeBlocking(future -> {
// consumer.accept(0L);
// consumer.accept(5L);
// consumer.accept(10L);
// }, res -> {
// nothing
// }));
}
}
... ...
package com.aries.crawler.verticles;
import com.aries.crawler.tools.Urls;
import com.aries.crawler.trans.message.DouyinVideoInfoMessage;
import com.aries.crawler.trans.message.SimpleInt64Message;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.math.BigInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.aries.crawler.trans.EventBusTopic.*;
public class VideoDownloadVerticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(VideoDownloadVerticle.class);
private static final String MY_UA = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.148 Safari/537.36";
@Override
public void start() {
// 下载视频
vertx.eventBus().consumer(LOGIC_DOUYIN_VIDEO_DOWNLOAD.getTopic(), this::mysqlDouyinVideoInsertHandler).setMaxBufferedMessages(4000);
}
private void mysqlDouyinVideoInsertHandler(Message<Object> message) {
var videoInfoMessage = (DouyinVideoInfoMessage) message.body();
HttpClient client = vertx.createHttpClient();
var videoPageUrlInfo = Urls.getInfo(videoInfoMessage.videoShareUrl());
System.out.println(videoInfoMessage.videoShareUrl());
client.get(new RequestOptions().setHost(videoPageUrlInfo.host()).setURI(videoPageUrlInfo.path()).addHeader("user-agent", MY_UA), response -> {
response.bodyHandler(body -> {
var bodyStr = new String(body.getBytes());
var addrUrl = getPlayAddrUrl(bodyStr);
System.out.println(addrUrl);
downloadMp4(addrUrl, "/Users/arowana/Movies/douyin/", videoInfoMessage.awemeId());
});
}).setFollowRedirects(true).end();
}
private void downloadMp4(String addrUrl, String filePath, Long awemeId) {
vertx.executeBlocking(future -> {
if (addrUrl == null || addrUrl.equals("")) {
future.complete(200);
} else {
HttpClient client = vertx.createHttpClient();
var videoDownloadUrlInfo = Urls.getInfo(addrUrl);
client.get(new RequestOptions().setHost(videoDownloadUrlInfo.host()).setURI(videoDownloadUrlInfo.path()).addHeader("user-agent", MY_UA), httpResponse -> {
httpResponse.bodyHandler(httpBody -> {
final String mp4PathName = filePath + awemeId + ".mp4";
vertx.fileSystem().exists(mp4PathName, fileRes -> {
if (!fileRes.result()) {
vertx.fileSystem().writeFile(mp4PathName, Buffer.buffer(httpBody.getBytes()), fileStoreRes -> {
if (fileStoreRes.succeeded()) {
logger.info("file written. pathname:" + mp4PathName);
} else {
logger.info("file written failed. pathname: " + mp4PathName + ", cause: " + fileStoreRes.cause());
}
});
}
});
});
}).setFollowRedirects(true).end();
future.complete(100);
}
}, res -> {
if (res.succeeded() && res.result() instanceof Integer s) {
switch (s) {
case 100 -> vertx.eventBus().request(MYSQL_DOUYIN_VIDEO__UPDATE_STATUS_DOWNLOADED.getTopic(), new SimpleInt64Message(BigInteger.valueOf(awemeId)), updateReply -> {
if (updateReply.succeeded()) {
logger.info("update status video downloaded succ ...");
} else {
logger.info("update status video downloaded fail ...");
}
});
case 200 -> vertx.eventBus().request(MYSQL_DOUYIN_VIDEO__UPDATE_STATUS_FAILED.getTopic(), new SimpleInt64Message(BigInteger.valueOf(awemeId)), updateReply -> {
if (updateReply.succeeded()) {
logger.info("update status video failed-status succ ...");
} else {
logger.info("update status video failed-status fail ...");
}
});
}
} else {
logger.error(res);
}
});
}
public String getPlayAddrUrl(String body) {
int indexOfPlayAddr = body.indexOf("playAddr: ");
int indexOfCover = body.indexOf("cover: ");
String subBody = body.substring(indexOfPlayAddr, indexOfCover);
Pattern r = Pattern.compile("playAddr: \"(.*)\"");
Matcher m = r.matcher(subBody);
if (m.find()) {
return m.group(1);
} else {
return "";
}
}
}
... ...
package com.aries.crawler.verticles;
import com.aries.crawler.sqlbuilder.InsertBuilder;
import com.aries.crawler.tools.MySqlExecuteHelper;
import com.aries.crawler.trans.message.DouyinVideoInfoMessage;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import static com.aries.crawler.trans.EventBusTopic.MYSQL_DOUYIN_VIDEO_INSERT;
import static com.aries.crawler.trans.message.CommonResponseMessage.COMMON_FAILED_MESSAGE;
import static com.aries.crawler.trans.message.CommonResponseMessage.COMMON_SUCCESS_MESSAGE;
/**
* @author arowana
*/
public class VideoInsertVerticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(VideoInsertVerticle.class);
@Override
public void start() {
// 用于插入视频数据
vertx.eventBus().consumer(MYSQL_DOUYIN_VIDEO_INSERT.getTopic(), this::mysqlDouyinVideoInsertHandler).setMaxBufferedMessages(4000);
}
public static String getDateTimeAsString(LocalDateTime localDateTime, String format) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
return localDateTime.format(formatter);
}
private void mysqlDouyinVideoInsertHandler(Message<Object> message) {
var videoInfoMessage = (DouyinVideoInfoMessage) message.body();
// 构建sql数据, 插入视频信息。
var insertBuilder = new InsertBuilder("douyin_video_info")
.set("id", videoInfoMessage.awemeId())
.set("comments", videoInfoMessage.awemeDesc())
.set("create_time", videoInfoMessage.awemeCreateTime())
.set("uid", videoInfoMessage.authorUid())
.set("cover_url", videoInfoMessage.videoCoverUrl())
.set("dynamic_cover_url", videoInfoMessage.videoDynamicCoverUrl())
.set("download_addr_url", videoInfoMessage.videoDownloadAddrUrl())
.set("share_url", videoInfoMessage.videoShareUrl())
.set("tag", videoInfoMessage.videoVideoTag())
.set("duration", videoInfoMessage.videoDuration())
.set("type", videoInfoMessage.type())
.onDuplicateKeyUpdate("ut", getDateTimeAsString(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"));
logger.info("video sql:" + insertBuilder.getSql() + "---values:" + insertBuilder.getValues());
MySqlExecuteHelper.prepareExecute(vertx, insertBuilder.getSql(), insertBuilder.getValues(), mysqlExecutorRes -> {
if (mysqlExecutorRes.succeeded()) {
logger.info("insert video succ, awemeid:" + videoInfoMessage.awemeId());
message.reply(COMMON_SUCCESS_MESSAGE);
} else {
logger.info("insert video. failed:" + mysqlExecutorRes.cause());
message.reply(COMMON_FAILED_MESSAGE);
}
});
}
}
... ...
package com.aries.crawler.verticles;
import com.aries.crawler.model.douyincrawler.DouyinCrawlerLogModel;
import com.aries.crawler.trans.message.DouyinUserInfoMessage;
import com.aries.crawler.trans.message.DouyinVideoInfoMessage;
import com.aries.crawler.trans.message.DouyinWideDataMessage;
import com.aries.crawler.trans.message.SimpleInt64Message;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.math.BigInteger;
import java.util.concurrent.TimeUnit;
import static com.aries.crawler.trans.EventBusTopic.*;
import static com.aries.crawler.trans.message.CommonResponseMessage.COMMON_SUCCESS_MESSAGE;
/**
* 这个verticle的职责是:
* 将受到的数据, 根据状态做不同的处理。
* <p> status = 0 表示这个数据没被处理过, 要把宽表中的数据拆成两部分, 分别交给用户数据插入器和视频数据插入器来处理。
* <p> status = 1 表示这个款数据中的用户数据部分已经处理过, 但是视频数据的部分还没处理。需要发给视频数据插入器来处理。
* <p> status = 2 表示这个款数据中的视频数据部分已经处理过, 但是用户数据的部分还没处理。需要发给用户数据插入器来处理。
* <p> status = 3 表示这个数据已经处理过, 没有剩余价值了, 不必处理, 忽略就可以了。
*
* @author arowana
*/
public class WideDataDispatchVerticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WideDataDispatchVerticle.class);
@Override
public void start() {
vertx.eventBus().consumer(LOGIC_DOUYIN_WIDEDATA_DISPATCH.getTopic(), this::dispatch).setMaxBufferedMessages(4000);
}
private void dispatch(Message<Object> message) {
var wideDataMessage = (DouyinWideDataMessage) message.body();
// 如果 用户部分的数据 未处于已处理状态
if ((wideDataMessage.status() & DouyinCrawlerLogModel.STATUS_USER_DONE) == 0) {
logger.info("user data need to be parsed, uid:" + wideDataMessage.authorUid());
processUser(DouyinUserInfoMessage.of(wideDataMessage), wideDataMessage.id());
}
// 如果 视频部分的数据 未处于已处理状态
if ((wideDataMessage.status() & DouyinCrawlerLogModel.STATUS_VIDEO_DONE) == 0) {
logger.info("video data need to be parsed, awemeid:" + wideDataMessage.awemeId());
processVideo(DouyinVideoInfoMessage.of(wideDataMessage), wideDataMessage.id());
}
message.reply(COMMON_SUCCESS_MESSAGE);
}
private void processVideo(DouyinVideoInfoMessage douyinVideoInfoMessage, BigInteger wideDataId) {
logger.info("prepare to insert video, awemeid: " + douyinVideoInfoMessage.awemeId());
vertx.eventBus().request(MYSQL_DOUYIN_VIDEO_INSERT.getTopic(), douyinVideoInfoMessage, new DeliveryOptions().setSendTimeout(TimeUnit.SECONDS.toMillis(20)), insertReply -> {
vertx.executeBlocking(future -> {
if (insertReply.succeeded()) {
logger.info("insert video reply succeeded, awemeid: " + douyinVideoInfoMessage.awemeId());
future.complete(100);
} else {
logger.error("insert video reply failed, awemeid: " + douyinVideoInfoMessage.awemeId() + ",cause:" + insertReply.cause());
}
}, res -> {
if (res.result() instanceof Integer s) {
if (s.equals(100)) {
vertx.eventBus().request(MYSQL_DOUYIN_WIDEDATA_UPDATE_STATUS_VIDEO.getTopic(), new SimpleInt64Message(wideDataId), updateReply -> {
if (updateReply.succeeded()) {
logger.info("update status video succ ...");
} else {
logger.info("update status video fail ...");
}
});
}
}
});
});
}
private void processUser(DouyinUserInfoMessage douyinUserInfoMessage, BigInteger wideDataId) {
logger.info("prepare to insert user, uid:" + douyinUserInfoMessage.uid());
vertx.eventBus().request(MYSQL_DOUYIN_USER_INSERT.getTopic(), douyinUserInfoMessage, new DeliveryOptions().setSendTimeout(TimeUnit.SECONDS.toMillis(20)), insertReply -> {
vertx.executeBlocking(future -> {
if (insertReply.succeeded()) {
logger.info("insert user reply succeeded, uid: " + douyinUserInfoMessage.uid());
future.complete(100);
} else {
logger.error("insert user reply failed, uid: " + douyinUserInfoMessage.uid() + ",cause:" + insertReply.cause());
}
}, res -> {
if (res.result() instanceof Integer s) {
if (s.equals(100)) {
vertx.eventBus().request(MYSQL_DOUYIN_WIDEDATA_UPDATE_STATUS_USER.getTopic(), new SimpleInt64Message(wideDataId), updateReply -> {
if (updateReply.succeeded()) {
logger.info("update status user succ ...");
} else {
logger.info("update status user fail ...");
}
});
}
}
});
});
}
}
... ...
package com.aries.crawler.verticles;
import com.aries.crawler.model.douyincrawler.DouyinCrawlerLogModel;
import com.aries.crawler.sqlbuilder.SelectBuilder;
import com.aries.crawler.tools.MySqlExecuteHelper;
import com.aries.crawler.tools.Orm;
import com.aries.crawler.trans.message.DouyinWideDataMessage;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import static com.aries.crawler.trans.EventBusTopic.LOGIC_DOUYIN_WIDEDATA_DISPATCH;
/**
* 这个verticle的职责是:
* 从宽表douyin_crawler_log中读取数据, 然后将数据派发给WideDataDispatchVerticle来做分派处理
*
* @author arowana
*/
public class WideDataPickUpVerticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WideDataPickUpVerticle.class);
private final Supplier<Void> pickUpWideDataSupplier = () -> {
var sql = new SelectBuilder()
.column("*")
.from(DouyinCrawlerLogModel.TABLE)
.where(" status != " + DouyinCrawlerLogModel.STATUS_ALL_DONE)
.limit(1000L)
.orderBy("ct", false)
.toString();
MySqlExecuteHelper.prepareExecute(vertx, sql, new ArrayList<>(), mysqlExecutorRes -> {
logger.info("prepare to pick up wide data. sql: " + sql);
if (mysqlExecutorRes.succeeded()) {
vertx.executeBlocking(future -> {
List<JsonObject> rows = mysqlExecutorRes.result().getRows();
for (JsonObject row : rows) {
var model = Orm.getModel(row, DouyinCrawlerLogModel.class);
processWideData(model);
}
}, res -> {
// ignore
});
} else {
logger.error("pick wide data failed, sql:" + sql + ", cause: " + mysqlExecutorRes.cause());
}
});
return null;
};
@Override
public void start() {
vertx.setPeriodic(10000, id -> {
pickUpWideDataSupplier.get();
});
}
public void processWideData(DouyinCrawlerLogModel model) {
if (model == null) {
logger.info("model is null, do nothing");
return;
}
var douyinWideDataMessage = DouyinWideDataMessage.of(model);
vertx.eventBus().request(LOGIC_DOUYIN_WIDEDATA_DISPATCH.getTopic(), douyinWideDataMessage, reply -> {
if (reply.succeeded()) {
logger.info("reply success from topic: " + LOGIC_DOUYIN_WIDEDATA_DISPATCH.getTopic() +
", wide data id: " + douyinWideDataMessage.id() +
", authorUid:" + douyinWideDataMessage.authorUid() +
", awemeid: " + douyinWideDataMessage.awemeId());
} else {
logger.info("reply success from topic: " + LOGIC_DOUYIN_WIDEDATA_DISPATCH.getTopic() +
", wide data id: " + douyinWideDataMessage.id() +
", authorUid:" + douyinWideDataMessage.authorUid() +
", awemeid: " + douyinWideDataMessage.awemeId() +
". cause:" + reply.cause());
}
});
}
}
... ...
package com.aries.crawler;
import com.aries.crawler.sqlbuilder.InsertBuilder;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.RequestOptions;
import org.junit.Test;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import static com.aries.crawler.verticles.UpdateDataVerticle.getDateTimeAsString;
public class AppTest {
@Test
public void shouldAnswerWithTrue() {
Vertx vertx = Vertx.vertx();
HttpClient client = vertx.createHttpClient();
// client.getNow("aweme.snssdk.com", "/aweme/v1/playwm/?s_vid=93f1b41336a8b7a442dbf1c29c6bbc560cdca46fc197329a17cb02eef09b72493338e49045f75c3a6cd886d97de228c6e6a1f93d3b9a63a26a63e40654c6655e&line=0", response -> {
// System.out.println("Received response with status code " + response.statusCode());
// response.bodyHandler(x -> {
// vertx.fileSystem().writeFile("target/classes/a.mp4", Buffer.buffer(x.getBytes()), result -> {
// if (result.succeeded()) {
// System.out.println("File written");
// } else {
// System.err.println("Oh oh ..." + result.cause());
// }
// });
// });
// });
client.get(new RequestOptions().setHost("aweme.snssdk.com").setURI("/aweme/v1/playwm/?s_vid=93f1b41336a8b7a442dbf1c29c6bbc560cdca46fc197329a17cb02eef09b72493338e49045f75c3a6cd886d97de228c6e6a1f93d3b9a63a26a63e40654c6655e&line=0").addHeader("user-agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36"), response -> {
for (Map.Entry<String, String> header : response.headers()) {
System.out.println(header);
}
System.out.println("Received response with status code " + response.statusCode());
response.bodyHandler(x -> {
vertx.fileSystem().writeFile("target/classes/a.mp4", Buffer.buffer(x.getBytes()), result -> {
if (result.succeeded()) {
System.out.println("File written");
} else {
System.err.println("Oh oh ..." + result.cause());
}
});
});
})
.setFollowRedirects(true)
.end();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void TestInsertSql() {
InsertBuilder ib = new InsertBuilder("douyin_user_info")
.set("uid", 123)
.set("short_id", 234234)
.set("nickname", "jinlong")
.onDuplicateKeyUpdate("ut", getDateTimeAsString(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"));
String sql = ib.getSql();
List<Object> values = ib.getValues();
System.out.println(sql);
System.out.println(values);
}
}
... ...
package com.aries.crawler;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.RequestOptions;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class HttpParserTestVerticle extends AbstractVerticle {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new HttpParserTestVerticle());
}
@Override
public void start() throws Exception {
HttpClient client = vertx.createHttpClient();
client.get(new RequestOptions()
.setHost("www.iesdouyin.com")
.setURI("/share/video/6772821096413580548/?region=CN&mid=6772787703437069070&u_code=19h7agc1k&titleType=title")
.addHeader("user-agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36")
, response -> {
response.bodyHandler(body -> {
var bodyStr = new String(body.getBytes());
var addrUrl = getPlayAddrUrl(bodyStr);
System.out.println(addrUrl);
});
}).setFollowRedirects(true).end();
}
public String getPlayAddrUrl(String body) {
int indexOfPlayAddr = body.indexOf("playAddr: ");
int indexOfCover = body.indexOf("cover: ");
String subBody = body.substring(indexOfPlayAddr, indexOfCover);
Pattern r = Pattern.compile("playAddr: \"(.*)\"");
Matcher m = r.matcher(subBody);
if (m.find()) {
return m.group(1);
} else {
return "";
}
}
}
... ...
package com.aries.crawler;
import com.aries.crawler.tools.MySqlClientHelper;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.ResultSet;
import java.util.List;
public class JdbcTestVerticle extends AbstractVerticle {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new JdbcTestVerticle());
}
@Override
public void start() throws Exception {
// 获取到数据库连接的客户端
JDBCClient jdbcClient = MySqlClientHelper.getJDBcClient(vertx);
String sql = "SELECT * from douyin_crawler_log WHERE id=?";
// 构造参数
JsonArray params = new JsonArray().add(139719);
// 执行查询
jdbcClient.queryWithParams(sql, params, qryRes -> {
if (qryRes.succeeded()) {
// 获取到查询的结果,Vert.x对ResultSet进行了封装
ResultSet resultSet = qryRes.result();
// 把ResultSet转为List<JsonObject>形式
List<JsonObject> rows = resultSet.getRows();
// 输出结果
System.out.println(rows);
} else {
System.out.println("查询数据库出错!");
}
});
}
}
... ...
package com.aries.crawler.sqlbuilder;
import com.aries.crawler.model.douyincrawler.DouyinCrawlerLogModel;
import org.junit.Assert;
import org.junit.Test;
public class TestSqlBuilder {
@Test
public void sqlSelectTest() {
String sql = new SelectBuilder()
.column("*")
.from("douyin_crawler_log")
.where("status != " + DouyinCrawlerLogModel.STATUS_ALL_DONE)
.limit(10L)
.toString();
Assert.assertEquals(sql, "select * from douyin_crawler_log where status != 3 limit 10 offset 0");
}
}
... ...
import json
# 这个地方必须这么写 函数名:response
import sys
sys.path.append('/usr/local/lib/python3.8/site-packages')
sys.path.append('/usr/local/lib/python3.7/site-packages')
import pymysql as pymysql
db = pymysql.connect("localhost", "root", "1qaz2wsx", "douyin_crawler")
def response(flow):
# 通过抓包软包软件获取请求的接口
if '/aweme/favorite' in flow.request.url or '/aweme/post' in flow.request.url:
# print("-------"+flow.response.text)
for aweme in json.loads(flow.response.text)['aweme_list']:
aweme_map = {}
aweme_map['aweme_id'] = aweme['aweme_id']
aweme_map['aweme_desc'] = aweme['desc']
aweme_map['aweme_create_time'] = aweme['create_time']
aweme_map['author_uid'] = aweme['author']['uid']
aweme_map['author_short_id'] = aweme['author']['short_id']
aweme_map['author_nickname'] = aweme['author']['nickname']
aweme_map['author_signature'] = aweme['author']['signature']
aweme_map['avatar_larger_url'] = aweme['author']['avatar_larger']['url_list'][0]
aweme_map['author_share_info_qrcode'] = aweme['author']['share_info']['share_qrcode_url']['url_list'][0]
aweme_map['video_cover'] = aweme['video']['cover']['url_list'][0]
aweme_map['video_dynamic_cover'] = aweme['video']['dynamic_cover']['url_list'][0]
aweme_map['video_download_addr'] = aweme['video']['download_addr']['url_list'][0]
aweme_map['video_share_url'] = aweme['share_info']['share_url']
if len(aweme['text_extra']) > 0:
aweme_map['video_tag'] = aweme['text_extra']
aweme_map['video_duration'] = aweme['duration']
sql = """INSERT INTO douyin_crawler_log(aweme_id,aweme_desc,aweme_create_time,author_uid,author_short_id,author_nickname,author_signature,avatar_larger_url,
author_share_info_qrcode_url,video_cover_url,video_dynamic_cover_url,video_download_addr_url ,video_share_url ,
video_tag ,video_duration)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
on duplicate key update video_download_addr_url = %s"""
cursor = db.cursor()
if 'video_tag' in aweme_map:
t_tag = json.dumps(aweme_map['video_tag'], ensure_ascii=False)
else:
t_tag = "{}"
values = (aweme_map['aweme_id'], aweme_map['aweme_desc'], aweme_map['aweme_create_time'],
aweme_map['author_uid'], aweme_map['author_short_id'],
aweme_map['author_nickname'], aweme_map['author_signature'],
aweme_map['avatar_larger_url'], aweme_map['author_share_info_qrcode'],
aweme_map['video_cover'], aweme_map['video_dynamic_cover'],
aweme_map['video_download_addr'], aweme_map['video_share_url'],
t_tag,
aweme_map['video_duration'], aweme_map['video_download_addr'])
try:
# 执行sql语句
cursor.execute(sql, values)
# 提交到数据库执行
db.commit()
print("here is succ")
except:
# 如果发生错误则回滚
db.rollback()
print("here is err:", sys.exc_info())
# 关闭数据库连接
# print("crawler res: ", aweme_map)
# db.close()
... ...
import pymysql
import sys
def main():
print("start-----")
db = pymysql.connect("localhost", "root", "1qaz2wsx", "douyin_crawler")
sql = "INSERT INTO douyin_crawler_log(aweme_id) values(123)"
cursor = db.cursor()
try:
cursor.execute(sql)
db.commit()
print("commit-----")
except:
# 如果发生错误则回滚
db.rollback()
print("rollback-----")
print(sys.exc_info())
# 关闭数据库连接
db.close()
if __name__ == '__main__':
main()
... ...
.idea/
.idea/*
*.iml
target/
*.class
.project
.settings/
.settings/*
... ...
# netty-proxy-server
基于Netty实现的代理服务器,Web Proxy Server(普通Web代理和SSL隧道代理),Socks5 Proxy Server和混合模式(同时支持以上两种,自动选择)
虽然是个玩具,但麻雀虽小五脏俱全,基本的都有。不管是用来做 netty 的学习,还是代理协议的学习都是不错的参考资料
... ...
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>netty-proxy-server</artifactId>
<groupId>cc.leevi.common</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>http-proxy</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>7</source>
<target>7</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.14.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.14.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
</dependency>
</dependencies>
</project>
... ...
package cc.leevi.common.httpproxy;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.Promise;
public final class DirectClientHandler extends ChannelInboundHandlerAdapter {
private final Promise<Channel> promise;
public DirectClientHandler(Promise<Channel> promise) {
this.promise = promise;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.pipeline().remove(this);
promise.setSuccess(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
promise.setFailure(throwable);
}
}
\ No newline at end of file
... ...
package cc.leevi.common.httpproxy;
import io.netty.buffer.ByteBuf;
public class HttpProxyRequestHead {
private String host;
private int port;
private String proxyType;//tunnel or web
private String protocolVersion;
private ByteBuf byteBuf;
public HttpProxyRequestHead(String host, int port, String proxyType, String protocolVersion, ByteBuf byteBuf) {
this.host = host;
this.port = port;
this.proxyType = proxyType;
this.protocolVersion = protocolVersion;
this.byteBuf = byteBuf;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getProxyType() {
return proxyType;
}
public void setProxyType(String proxyType) {
this.proxyType = proxyType;
}
public ByteBuf getByteBuf() {
return byteBuf;
}
public void setByteBuf(ByteBuf byteBuf) {
this.byteBuf = byteBuf;
}
public String getProtocolVersion() {
return protocolVersion;
}
public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}
}
... ...
package cc.leevi.common.httpproxy;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.logging.LoggingHandler;
public class HttpProxyServerInitializer extends ChannelInitializer {
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new LoggingHandler());
p.addLast(new HttpServerHeadDecoder());
}
}
... ...
package cc.leevi.common.httpproxy;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpServer {
private Logger logger = LoggerFactory.getLogger(HttpServer.class);
private ServerBootstrap serverBootstrap;
private EventLoopGroup serverEventLoopGroup;
private Channel acceptorChannel;
public void startServer(){
logger.info("Proxy Server starting...");
serverEventLoopGroup = new NioEventLoopGroup(4);
serverBootstrap = new ServerBootstrap()
.channel(NioServerSocketChannel.class)
.childHandler(new HttpProxyServerInitializer())
.group(serverEventLoopGroup);
acceptorChannel = serverBootstrap.bind(17891).syncUninterruptibly().channel();
}
public void shutdown(){
logger.info("Proxy Server shutting down...");
acceptorChannel.close().syncUninterruptibly();
serverEventLoopGroup.shutdownGracefully().syncUninterruptibly();
logger.info("shutdown completed!");
}
}
... ...
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package cc.leevi.common.httpproxy;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
@ChannelHandler.Sharable
public final class HttpServerConnectHandler extends SimpleChannelInboundHandler<HttpProxyRequestHead> {
private final Bootstrap b = new Bootstrap();
@Override
public void channelRead0(final ChannelHandlerContext ctx, final HttpProxyRequestHead requestHead) throws Exception {
Promise<Channel> promise = ctx.executor().newPromise();
final Channel inboundChannel = ctx.channel();
promise.addListener(
new FutureListener<Channel>() {
@Override
public void operationComplete(final Future<Channel> future) throws Exception {
final Channel outboundChannel = future.getNow();
if (future.isSuccess()) {
ChannelFuture responseFuture;
if("TUNNEL".equals(requestHead.getProxyType())){
responseFuture = inboundChannel.writeAndFlush(Unpooled.wrappedBuffer((requestHead.getProtocolVersion() + " 200 Connection Established\r\n\r\n").getBytes()));
}else if("WEB".equals(requestHead.getProxyType())){
responseFuture = outboundChannel.writeAndFlush(requestHead.getByteBuf());
}else{
HttpServerUtils.closeOnFlush(inboundChannel);
return;
}
responseFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) {
ctx.pipeline().remove(HttpServerConnectHandler.this);
outboundChannel.pipeline().addLast(new RelayHandler(inboundChannel));
ctx.pipeline().addLast(new RelayHandler(outboundChannel));
}
});
} else {
HttpServerUtils.closeOnFlush(inboundChannel);
}
}
});
b.group(inboundChannel.eventLoop())
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new DirectClientHandler(promise));
b.connect(requestHead.getHost(), requestHead.getPort()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// Connection established use handler provided results
} else {
// Close the connection if the connection attempt has failed.
HttpServerUtils.closeOnFlush(inboundChannel);
}
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
HttpServerUtils.closeOnFlush(ctx.channel());
}
}
... ...
package cc.leevi.common.httpproxy;
import com.google.common.net.HostAndPort;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpConstants;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.util.ByteProcessor;
import io.netty.util.internal.AppendableCharSequence;
import java.net.URL;
public class HttpServerHeadDecoder extends SimpleChannelInboundHandler<ByteBuf> {
private HttpServerHeadDecoder.HeadLineByteProcessor headLineByteProcessor = new HttpServerHeadDecoder.HeadLineByteProcessor();
private
class HeadLineByteProcessor implements ByteProcessor{
private AppendableCharSequence seq;
public HeadLineByteProcessor() {
this.seq = new AppendableCharSequence(4096);
}
public AppendableCharSequence parse(ByteBuf buffer) {
seq.reset();
int i = buffer.forEachByte(this);
if (i == -1) {
return null;
}
buffer.readerIndex(i + 1);
return seq;
}
@Override
public boolean process(byte value) throws Exception {
char nextByte = (char) (value & 0xFF);
if (nextByte == HttpConstants.LF) {
int len = seq.length();
if (len >= 1 && seq.charAtUnsafe(len - 1) == HttpConstants.CR) {
seq.append(nextByte);
}
return false;
}
//continue loop byte
seq.append(nextByte);
return true;
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
AppendableCharSequence seq = headLineByteProcessor.parse(in);
if(seq.charAt(seq.length()-1) == HttpConstants.LF){
HttpProxyRequestHead httpProxyRequestHead;
String[] splitInitialLine = splitInitialLine(seq);
String method = splitInitialLine[0];
String uri = splitInitialLine[1];
String protocolVersion = splitInitialLine[2];
String host;
int port;
if(HttpMethod.CONNECT.name().equals(method)){
//https tunnel proxy
HostAndPort hostAndPort = HostAndPort.fromString(uri);
host = hostAndPort.getHost();
port = hostAndPort.getPort();
httpProxyRequestHead = new HttpProxyRequestHead(host, port, "TUNNEL",protocolVersion,null);
}else{
//http proxy
URL url = new URL(uri);
host = url.getHost();
port = url.getPort();
if(port == -1){
port = 80;
}
httpProxyRequestHead = new HttpProxyRequestHead(host, port,"WEB",protocolVersion,in.retain().resetReaderIndex());
}
ctx.pipeline().addLast(new HttpServerConnectHandler()).remove(this);
ctx.fireChannelRead(httpProxyRequestHead);
}
}
private static String[] splitInitialLine(AppendableCharSequence sb) {
int aStart;
int aEnd;
int bStart;
int bEnd;
int cStart;
int cEnd;
aStart = findNonSPLenient(sb, 0);
aEnd = findSPLenient(sb, aStart);
bStart = findNonSPLenient(sb, aEnd);
bEnd = findSPLenient(sb, bStart);
cStart = findNonSPLenient(sb, bEnd);
cEnd = findEndOfString(sb);
return new String[] {
sb.subStringUnsafe(aStart, aEnd),
sb.subStringUnsafe(bStart, bEnd),
cStart < cEnd? sb.subStringUnsafe(cStart, cEnd) : "" };
}
private static int findNonSPLenient(AppendableCharSequence sb, int offset) {
for (int result = offset; result < sb.length(); ++result) {
char c = sb.charAtUnsafe(result);
// See https://tools.ietf.org/html/rfc7230#section-3.5
if (isSPLenient(c)) {
continue;
}
if (Character.isWhitespace(c)) {
// Any other whitespace delimiter is invalid
throw new IllegalArgumentException("Invalid separator");
}
return result;
}
return sb.length();
}
private static int findSPLenient(AppendableCharSequence sb, int offset) {
for (int result = offset; result < sb.length(); ++result) {
if (isSPLenient(sb.charAtUnsafe(result))) {
return result;
}
}
return sb.length();
}
private static boolean isSPLenient(char c) {
// See https://tools.ietf.org/html/rfc7230#section-3.5
return c == ' ' || c == (char) 0x09 || c == (char) 0x0B || c == (char) 0x0C || c == (char) 0x0D;
}
private static int findNonWhitespace(AppendableCharSequence sb, int offset, boolean validateOWS) {
for (int result = offset; result < sb.length(); ++result) {
char c = sb.charAtUnsafe(result);
if (!Character.isWhitespace(c)) {
return result;
} else if (validateOWS && !isOWS(c)) {
// Only OWS is supported for whitespace
throw new IllegalArgumentException("Invalid separator, only a single space or horizontal tab allowed," +
" but received a '" + c + "'");
}
}
return sb.length();
}
private static int findEndOfString(AppendableCharSequence sb) {
for (int result = sb.length() - 1; result > 0; --result) {
if (!Character.isWhitespace(sb.charAtUnsafe(result))) {
return result + 1;
}
}
return 0;
}
private static boolean isOWS(char ch) {
return ch == ' ' || ch == (char) 0x09;
}
}
... ...
package cc.leevi.common.httpproxy;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
public final class HttpServerUtils {
/**
* Closes the specified channel after all queued write requests are flushed.
*/
public static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
private HttpServerUtils() { }
}
\ No newline at end of file
... ...
package cc.leevi.common.httpproxy;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
public class RelayHandler extends ChannelInboundHandlerAdapter {
private final Channel relayChannel;
public RelayHandler(Channel relayChannel) {
this.relayChannel = relayChannel;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (relayChannel.isActive()) {
relayChannel.writeAndFlush(msg);
} else {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
HttpServerUtils.closeOnFlush(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
... ...
package cc.leevi.common.httpproxy;
import com.google.common.net.HostAndPort;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class HttpProxyClientTest {
@Before
public void setUp() throws Exception {
}
@Test
public void startServer() throws IOException {
HttpServer httpServer = new HttpServer();
httpServer.startServer();
System.in.read();
}
@Test
public void parseURI(){
HostAndPort hostAndPort = HostAndPort.fromString("cdn.segmentfault.com:443");
System.out.println(hostAndPort.getHost());
System.out.println(hostAndPort.getPort());
}
}
... ...
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="TRACE">
<AppenderRef ref="Console" />
</Root>
</Loggers>
</Configuration>
... ...
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>netty-proxy-server</artifactId>
<groupId>cc.leevi.common</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>mixin-proxy</artifactId>
<name>socks5-proxy</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.14.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.14.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
</dependency>
</dependencies>
</project>
... ...
package cc.leevi.common.socks5proxy;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.Promise;
public final class DirectClientHandler extends ChannelInboundHandlerAdapter {
private final Promise<Channel> promise;
public DirectClientHandler(Promise<Channel> promise) {
this.promise = promise;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.pipeline().remove(this);
promise.setSuccess(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
promise.setFailure(throwable);
}
}
\ No newline at end of file
... ...
package cc.leevi.common.socks5proxy;
import io.netty.buffer.ByteBuf;
public class HttpProxyRequestHead {
private String host;
private int port;
private String proxyType;//tunnel or web
private String protocolVersion;
private ByteBuf byteBuf;
public HttpProxyRequestHead(String host, int port, String proxyType, String protocolVersion, ByteBuf byteBuf) {
this.host = host;
this.port = port;
this.proxyType = proxyType;
this.protocolVersion = protocolVersion;
this.byteBuf = byteBuf;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getProxyType() {
return proxyType;
}
public void setProxyType(String proxyType) {
this.proxyType = proxyType;
}
public ByteBuf getByteBuf() {
return byteBuf;
}
public void setByteBuf(ByteBuf byteBuf) {
this.byteBuf = byteBuf;
}
public String getProtocolVersion() {
return protocolVersion;
}
public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}
}
... ...
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package cc.leevi.common.socks5proxy;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
@ChannelHandler.Sharable
public final class HttpServerConnectHandler extends SimpleChannelInboundHandler<HttpProxyRequestHead> {
private final Bootstrap b = new Bootstrap();
@Override
public void channelRead0(final ChannelHandlerContext ctx, final HttpProxyRequestHead requestHead) throws Exception {
Promise<Channel> promise = ctx.executor().newPromise();
final Channel inboundChannel = ctx.channel();
promise.addListener(
new FutureListener<Channel>() {
@Override
public void operationComplete(final Future<Channel> future) throws Exception {
final Channel outboundChannel = future.getNow();
if (future.isSuccess()) {
ChannelFuture responseFuture;
if("TUNNEL".equals(requestHead.getProxyType())){
responseFuture = inboundChannel.writeAndFlush(Unpooled.wrappedBuffer((requestHead.getProtocolVersion() + " 200 Connection Established\r\n\r\n").getBytes()));
}else if("WEB".equals(requestHead.getProxyType())){
responseFuture = outboundChannel.writeAndFlush(requestHead.getByteBuf());
}else{
MixinServerUtils.closeOnFlush(inboundChannel);
return;
}
responseFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) {
ctx.pipeline().remove(HttpServerConnectHandler.this);
outboundChannel.pipeline().addLast(new RelayHandler(inboundChannel));
ctx.pipeline().addLast(new RelayHandler(outboundChannel));
}
});
} else {
MixinServerUtils.closeOnFlush(inboundChannel);
}
}
});
b.group(inboundChannel.eventLoop())
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new DirectClientHandler(promise));
b.connect(requestHead.getHost(), requestHead.getPort()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// Connection established use handler provided results
} else {
// Close the connection if the connection attempt has failed.
MixinServerUtils.closeOnFlush(inboundChannel);
}
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
MixinServerUtils.closeOnFlush(ctx.channel());
}
}
... ...
package cc.leevi.common.socks5proxy;
import com.google.common.net.HostAndPort;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpConstants;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.util.ByteProcessor;
import io.netty.util.internal.AppendableCharSequence;
import java.net.URL;
public class HttpServerHeadDecoder extends SimpleChannelInboundHandler<ByteBuf> {
private HeadLineByteProcessor headLineByteProcessor = new HeadLineByteProcessor();
private
class HeadLineByteProcessor implements ByteProcessor{
private AppendableCharSequence seq;
public HeadLineByteProcessor() {
this.seq = new AppendableCharSequence(4096);
}
public AppendableCharSequence parse(ByteBuf buffer) {
seq.reset();
int i = buffer.forEachByte(this);
if (i == -1) {
return null;
}
buffer.readerIndex(i + 1);
return seq;
}
@Override
public boolean process(byte value) throws Exception {
char nextByte = (char) (value & 0xFF);
if (nextByte == HttpConstants.LF) {
int len = seq.length();
if (len >= 1 && seq.charAtUnsafe(len - 1) == HttpConstants.CR) {
seq.append(nextByte);
}
return false;
}
//continue loop byte
seq.append(nextByte);
return true;
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
AppendableCharSequence seq = headLineByteProcessor.parse(in);
if(seq.charAt(seq.length()-1) == HttpConstants.LF){
HttpProxyRequestHead httpProxyRequestHead;
String[] splitInitialLine = splitInitialLine(seq);
String method = splitInitialLine[0];
String uri = splitInitialLine[1];
String protocolVersion = splitInitialLine[2];
String host;
int port;
if(HttpMethod.CONNECT.name().equals(method)){
//https tunnel proxy
HostAndPort hostAndPort = HostAndPort.fromString(uri);
host = hostAndPort.getHost();
port = hostAndPort.getPort();
httpProxyRequestHead = new HttpProxyRequestHead(host, port, "TUNNEL",protocolVersion,null);
}else{
//http proxy
URL url = new URL(uri);
host = url.getHost();
port = url.getPort();
if(port == -1){
port = 80;
}
httpProxyRequestHead = new HttpProxyRequestHead(host, port, protocolVersion,"WEB",in.resetReaderIndex());
}
ctx.pipeline().addLast(new HttpServerConnectHandler()).remove(this);
ctx.fireChannelRead(httpProxyRequestHead);
}
}
private static String[] splitInitialLine(AppendableCharSequence sb) {
int aStart;
int aEnd;
int bStart;
int bEnd;
int cStart;
int cEnd;
aStart = findNonSPLenient(sb, 0);
aEnd = findSPLenient(sb, aStart);
bStart = findNonSPLenient(sb, aEnd);
bEnd = findSPLenient(sb, bStart);
cStart = findNonSPLenient(sb, bEnd);
cEnd = findEndOfString(sb);
return new String[] {
sb.subStringUnsafe(aStart, aEnd),
sb.subStringUnsafe(bStart, bEnd),
cStart < cEnd? sb.subStringUnsafe(cStart, cEnd) : "" };
}
private static int findNonSPLenient(AppendableCharSequence sb, int offset) {
for (int result = offset; result < sb.length(); ++result) {
char c = sb.charAtUnsafe(result);
// See https://tools.ietf.org/html/rfc7230#section-3.5
if (isSPLenient(c)) {
continue;
}
if (Character.isWhitespace(c)) {
// Any other whitespace delimiter is invalid
throw new IllegalArgumentException("Invalid separator");
}
return result;
}
return sb.length();
}
private static int findSPLenient(AppendableCharSequence sb, int offset) {
for (int result = offset; result < sb.length(); ++result) {
if (isSPLenient(sb.charAtUnsafe(result))) {
return result;
}
}
return sb.length();
}
private static boolean isSPLenient(char c) {
// See https://tools.ietf.org/html/rfc7230#section-3.5
return c == ' ' || c == (char) 0x09 || c == (char) 0x0B || c == (char) 0x0C || c == (char) 0x0D;
}
private static int findNonWhitespace(AppendableCharSequence sb, int offset, boolean validateOWS) {
for (int result = offset; result < sb.length(); ++result) {
char c = sb.charAtUnsafe(result);
if (!Character.isWhitespace(c)) {
return result;
} else if (validateOWS && !isOWS(c)) {
// Only OWS is supported for whitespace
throw new IllegalArgumentException("Invalid separator, only a single space or horizontal tab allowed," +
" but received a '" + c + "'");
}
}
return sb.length();
}
private static int findEndOfString(AppendableCharSequence sb) {
for (int result = sb.length() - 1; result > 0; --result) {
if (!Character.isWhitespace(sb.charAtUnsafe(result))) {
return result + 1;
}
}
return 0;
}
private static boolean isOWS(char ch) {
return ch == ' ' || ch == (char) 0x09;
}
}
... ...
package cc.leevi.common.socks5proxy;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MixinProxyServer {
private Logger logger = LoggerFactory.getLogger(MixinProxyServer.class);
private ServerBootstrap serverBootstrap;
private EventLoopGroup serverEventLoopGroup;
private Channel acceptorChannel;
public void startServer(){
logger.info("Proxy Server starting...");
serverEventLoopGroup = new NioEventLoopGroup(4);
serverBootstrap = new ServerBootstrap()
.channel(NioServerSocketChannel.class)
.childHandler(new MixinServerInitializer())
.group(serverEventLoopGroup);
acceptorChannel = serverBootstrap.bind(8065).syncUninterruptibly().channel();
}
public void shutdown(){
logger.info("Proxy Server shutting down...");
acceptorChannel.close().syncUninterruptibly();
serverEventLoopGroup.shutdownGracefully().syncUninterruptibly();
logger.info("shutdown completed!");
}
}
... ...
package cc.leevi.common.socks5proxy;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.socksx.SocksPortUnificationServerHandler;
import io.netty.handler.codec.socksx.SocksVersion;
public class MixinSelectHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
final int readerIndex = msg.readerIndex();
if (msg.writerIndex() == readerIndex) {
return;
}
ChannelPipeline p = ctx.pipeline();
final byte versionVal = msg.getByte(readerIndex);
SocksVersion version = SocksVersion.valueOf(versionVal);
if(version.equals(SocksVersion.SOCKS4a) || version.equals(SocksVersion.SOCKS5)){
//socks proxy
p.addLast(new SocksPortUnificationServerHandler(),
SocksServerHandler.INSTANCE).remove(this);
}else{
//http/tunnel proxy
p.addLast(new HttpServerHeadDecoder()).remove(this);
}
msg.retain();
ctx.fireChannelRead(msg);
}
}
... ...
package cc.leevi.common.socks5proxy;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.socksx.SocksPortUnificationServerHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public final class MixinServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.DEBUG),
new MixinSelectHandler());
}
}
\ No newline at end of file
... ...
package cc.leevi.common.socks5proxy;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
public final class MixinServerUtils {
/**
* Closes the specified channel after all queued write requests are flushed.
*/
public static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
private MixinServerUtils() { }
}
\ No newline at end of file
... ...
package cc.leevi.common.socks5proxy;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
public class RelayHandler extends ChannelInboundHandlerAdapter {
private final Channel relayChannel;
public RelayHandler(Channel relayChannel) {
this.relayChannel = relayChannel;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (relayChannel.isActive()) {
relayChannel.writeAndFlush(msg);
} else {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (relayChannel.isActive()) {
relayChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
... ...
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package cc.leevi.common.socks5proxy;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.socksx.SocksMessage;
import io.netty.handler.codec.socksx.v5.DefaultSocks5CommandResponse;
import io.netty.handler.codec.socksx.v5.Socks5CommandRequest;
import io.netty.handler.codec.socksx.v5.Socks5CommandStatus;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
@ChannelHandler.Sharable
public final class SocksServerConnectHandler extends SimpleChannelInboundHandler<SocksMessage> {
private final Bootstrap b = new Bootstrap();
@Override
public void channelRead0(final ChannelHandlerContext ctx, final SocksMessage message) throws Exception {
final Socks5CommandRequest request = (Socks5CommandRequest) message;
Promise<Channel> promise = ctx.executor().newPromise();
promise.addListener(
new FutureListener<Channel>() {
@Override
public void operationComplete(final Future<Channel> future) throws Exception {
final Channel outboundChannel = future.getNow();
if (future.isSuccess()) {
ChannelFuture responseFuture =
ctx.channel().writeAndFlush(new DefaultSocks5CommandResponse(
Socks5CommandStatus.SUCCESS,
request.dstAddrType(),
request.dstAddr(),
request.dstPort()));
responseFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) {
ctx.pipeline().remove(SocksServerConnectHandler.this);
outboundChannel.pipeline().addLast(new RelayHandler(ctx.channel()));
ctx.pipeline().addLast(new RelayHandler(outboundChannel));
}
});
} else {
ctx.channel().writeAndFlush(new DefaultSocks5CommandResponse(
Socks5CommandStatus.FAILURE, request.dstAddrType()));
MixinServerUtils.closeOnFlush(ctx.channel());
}
}
});
final Channel inboundChannel = ctx.channel();
b.group(inboundChannel.eventLoop())
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new DirectClientHandler(promise));
b.connect(request.dstAddr(), request.dstPort()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// Connection established use handler provided results
} else {
// Close the connection if the connection attempt has failed.
ctx.channel().writeAndFlush(
new DefaultSocks5CommandResponse(Socks5CommandStatus.FAILURE, request.dstAddrType()));
MixinServerUtils.closeOnFlush(ctx.channel());
}
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
MixinServerUtils.closeOnFlush(ctx.channel());
}
}
... ...
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package cc.leevi.common.socks5proxy;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.socksx.SocksMessage;
import io.netty.handler.codec.socksx.SocksVersion;
import io.netty.handler.codec.socksx.v5.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ChannelHandler.Sharable
public final class SocksServerHandler extends SimpleChannelInboundHandler<SocksMessage> {
private Logger logger = LoggerFactory.getLogger(SocksServerHandler.class);
public static final SocksServerHandler INSTANCE = new SocksServerHandler();
private SocksServerHandler() { }
@Override
public void channelRead0(ChannelHandlerContext ctx, SocksMessage socksRequest) throws Exception {
if(!socksRequest.version().equals(SocksVersion.SOCKS5)){
logger.error("only supports socks5 protocol!");
ctx.writeAndFlush(Unpooled.wrappedBuffer("protocol version illegal!".getBytes()));
return ;
}
if (socksRequest instanceof Socks5InitialRequest) {
ctx.pipeline().addFirst(new Socks5CommandRequestDecoder());
ctx.write(new DefaultSocks5InitialResponse(Socks5AuthMethod.NO_AUTH));
//如果需要密码,这里可以换成
// ctx.write(new DefaultSocks5InitialResponse(Socks5AuthMethod.PASSWORD));
} else if (socksRequest instanceof Socks5PasswordAuthRequest) {
//如果需要密码,这里需要验证密码
ctx.pipeline().addFirst(new Socks5CommandRequestDecoder());
ctx.write(new DefaultSocks5PasswordAuthResponse(Socks5PasswordAuthStatus.SUCCESS));
} else if (socksRequest instanceof Socks5CommandRequest) {
Socks5CommandRequest socks5CmdRequest = (Socks5CommandRequest) socksRequest;
if (socks5CmdRequest.type() == Socks5CommandType.CONNECT) {
ctx.pipeline().addLast(new SocksServerConnectHandler());
ctx.pipeline().remove(this);
ctx.fireChannelRead(socksRequest);
} else {
ctx.close();
}
} else {
ctx.close();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
logger.error("exceptionCaught",throwable);
MixinServerUtils.closeOnFlush(ctx.channel());
}
}
... ...
package cc.leevi.common.socks5proxy;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class MixinProxyServerTest {
@Before
public void setUp() throws Exception {
}
@Test
public void startServer() throws IOException {
MixinProxyServer mixinProxyServer = new MixinProxyServer();
mixinProxyServer.startServer();
System.in.read();
}
}
\ No newline at end of file
... ...
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="TRACE">
<AppenderRef ref="Console" />
</Root>
</Loggers>
</Configuration>
... ...
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cc.leevi.common</groupId>
<artifactId>netty-proxy-server</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>http-proxy</module>
<module>socks5-proxy</module>
<module>mixin-proxy</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.54.Final</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.14.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
\ No newline at end of file
... ...
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>netty-proxy-server</artifactId>
<groupId>cc.leevi.common</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>socks5-proxy</artifactId>
<name>socks5-proxy</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.14.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.14.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
</dependency>
</dependencies>
</project>
... ...
package cc.leevi.common.socks5proxy;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.Promise;
public final class DirectClientHandler extends ChannelInboundHandlerAdapter {
private final Promise<Channel> promise;
public DirectClientHandler(Promise<Channel> promise) {
this.promise = promise;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.pipeline().remove(this);
promise.setSuccess(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
promise.setFailure(throwable);
}
}
\ No newline at end of file
... ...
package cc.leevi.common.socks5proxy;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
public class RelayHandler extends ChannelInboundHandlerAdapter {
private final Channel relayChannel;
public RelayHandler(Channel relayChannel) {
this.relayChannel = relayChannel;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (relayChannel.isActive()) {
relayChannel.writeAndFlush(msg);
} else {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (relayChannel.isActive()) {
relayChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
... ...
package cc.leevi.common.socks5proxy;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Socks5ProxyServer {
private Logger logger = LoggerFactory.getLogger(Socks5ProxyServer.class);
private ServerBootstrap serverBootstrap;
private EventLoopGroup serverEventLoopGroup;
private Channel acceptorChannel;
public void startServer(){
logger.info("Proxy Server starting...");
serverEventLoopGroup = new NioEventLoopGroup(4);
serverBootstrap = new ServerBootstrap()
.channel(NioServerSocketChannel.class)
.childHandler(new SocksServerInitializer())
.group(serverEventLoopGroup);
acceptorChannel = serverBootstrap.bind(1080).syncUninterruptibly().channel();
}
public void shutdown(){
logger.info("Proxy Server shutting down...");
acceptorChannel.close().syncUninterruptibly();
serverEventLoopGroup.shutdownGracefully().syncUninterruptibly();
logger.info("shutdown completed!");
}
}
... ...
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package cc.leevi.common.socks5proxy;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.socksx.SocksMessage;
import io.netty.handler.codec.socksx.v4.DefaultSocks4CommandResponse;
import io.netty.handler.codec.socksx.v4.Socks4CommandRequest;
import io.netty.handler.codec.socksx.v4.Socks4CommandStatus;
import io.netty.handler.codec.socksx.v5.DefaultSocks5CommandResponse;
import io.netty.handler.codec.socksx.v5.Socks5CommandRequest;
import io.netty.handler.codec.socksx.v5.Socks5CommandStatus;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
@ChannelHandler.Sharable
public final class SocksServerConnectHandler extends SimpleChannelInboundHandler<SocksMessage> {
private final Bootstrap b = new Bootstrap();
@Override
public void channelRead0(final ChannelHandlerContext ctx, final SocksMessage message) throws Exception {
final Socks5CommandRequest request = (Socks5CommandRequest) message;
Promise<Channel> promise = ctx.executor().newPromise();
promise.addListener(
new FutureListener<Channel>() {
@Override
public void operationComplete(final Future<Channel> future) throws Exception {
final Channel outboundChannel = future.getNow();
if (future.isSuccess()) {
ChannelFuture responseFuture =
ctx.channel().writeAndFlush(new DefaultSocks5CommandResponse(
Socks5CommandStatus.SUCCESS,
request.dstAddrType(),
request.dstAddr(),
request.dstPort()));
responseFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) {
ctx.pipeline().remove(SocksServerConnectHandler.this);
outboundChannel.pipeline().addLast(new RelayHandler(ctx.channel()));
ctx.pipeline().addLast(new RelayHandler(outboundChannel));
}
});
} else {
ctx.channel().writeAndFlush(new DefaultSocks5CommandResponse(
Socks5CommandStatus.FAILURE, request.dstAddrType()));
SocksServerUtils.closeOnFlush(ctx.channel());
}
}
});
final Channel inboundChannel = ctx.channel();
b.group(inboundChannel.eventLoop())
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new DirectClientHandler(promise));
b.connect(request.dstAddr(), request.dstPort()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// Connection established use handler provided results
} else {
// Close the connection if the connection attempt has failed.
ctx.channel().writeAndFlush(
new DefaultSocks5CommandResponse(Socks5CommandStatus.FAILURE, request.dstAddrType()));
SocksServerUtils.closeOnFlush(ctx.channel());
}
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
SocksServerUtils.closeOnFlush(ctx.channel());
}
}
... ...
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package cc.leevi.common.socks5proxy;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.socksx.SocksMessage;
import io.netty.handler.codec.socksx.SocksVersion;
import io.netty.handler.codec.socksx.v4.Socks4CommandRequest;
import io.netty.handler.codec.socksx.v4.Socks4CommandType;
import io.netty.handler.codec.socksx.v5.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ChannelHandler.Sharable
public final class SocksServerHandler extends SimpleChannelInboundHandler<SocksMessage> {
private Logger logger = LoggerFactory.getLogger(SocksServerHandler.class);
public static final SocksServerHandler INSTANCE = new SocksServerHandler();
private SocksServerHandler() { }
@Override
public void channelRead0(ChannelHandlerContext ctx, SocksMessage socksRequest) throws Exception {
if(!socksRequest.version().equals(SocksVersion.SOCKS5)){
logger.error("only supports socks5 protocol!");
ctx.writeAndFlush(Unpooled.wrappedBuffer("protocol version illegal!".getBytes()));
return ;
}
if (socksRequest instanceof Socks5InitialRequest) {
ctx.pipeline().addFirst(new Socks5CommandRequestDecoder());
ctx.write(new DefaultSocks5InitialResponse(Socks5AuthMethod.NO_AUTH));
//如果需要密码,这里可以换成
// ctx.write(new DefaultSocks5InitialResponse(Socks5AuthMethod.PASSWORD));
} else if (socksRequest instanceof Socks5PasswordAuthRequest) {
//如果需要密码,这里需要验证密码
ctx.pipeline().addFirst(new Socks5CommandRequestDecoder());
ctx.write(new DefaultSocks5PasswordAuthResponse(Socks5PasswordAuthStatus.SUCCESS));
} else if (socksRequest instanceof Socks5CommandRequest) {
Socks5CommandRequest socks5CmdRequest = (Socks5CommandRequest) socksRequest;
if (socks5CmdRequest.type() == Socks5CommandType.CONNECT) {
ctx.pipeline().addLast(new SocksServerConnectHandler());
ctx.pipeline().remove(this);
ctx.fireChannelRead(socksRequest);
} else {
ctx.close();
}
} else {
ctx.close();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
logger.error("exceptionCaught",throwable);
SocksServerUtils.closeOnFlush(ctx.channel());
}
}
... ...
package cc.leevi.common.socks5proxy;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.socksx.SocksPortUnificationServerHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public final class SocksServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.DEBUG),
new SocksPortUnificationServerHandler(),
SocksServerHandler.INSTANCE);
}
}
\ No newline at end of file
... ...
package cc.leevi.common.socks5proxy;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
public final class SocksServerUtils {
/**
* Closes the specified channel after all queued write requests are flushed.
*/
public static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
private SocksServerUtils() { }
}
\ No newline at end of file
... ...