Starter.java
3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package com.aries.crawler;
import com.aries.crawler.trans.codec.CommonMessageCodec;
import com.aries.crawler.trans.message.*;
import com.aries.crawler.verticles.*;
import io.vertx.core.CompositeFuture;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author arowana
*/
public class Starter {
private static final Logger logger = LoggerFactory.getLogger(Starter.class);
public static void main(String[] args) {
// Force to use slf4j. 参考自dgate:https://github.com/DTeam-Top/dgate/blob/master/src/main/java/top/dteam/dgate/Launcher.java
System.setProperty("vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory");
var vertx = Vertx.vertx();
logger.info("register codec");
vertx.eventBus().registerCodec(new CommonMessageCodec<>());
vertx.eventBus().registerDefaultCodec(SimpleInt64Message.class, new CommonMessageCodec<>());
vertx.eventBus().registerDefaultCodec(DouyinUserInfoMessage.class, new CommonMessageCodec<>());
vertx.eventBus().registerDefaultCodec(DouyinVideoInfoMessage.class, new CommonMessageCodec<>());
vertx.eventBus().registerDefaultCodec(DouyinWideDataMessage.class, new CommonMessageCodec<>());
vertx.eventBus().registerDefaultCodec(CommonResponseMessage.class, new CommonMessageCodec<>());
logger.info("deploying verticles");
List<Future> futures = new ArrayList<>() {
{
add(optionalDeploy(vertx, UserInsertVerticle.class, 1));
add(optionalDeploy(vertx, VideoInsertVerticle.class, 1));
add(optionalDeploy(vertx, UpdateDataVerticle.class, 1));
add(optionalDeploy(vertx, WideDataDispatchVerticle.class, 1));
add(optionalDeploy(vertx, WideDataPickUpVerticle.class, 1));
add(optionalDeploy(vertx, VideoDataPickUpVerticle.class, 1));
add(optionalDeploy(vertx, VideoDownloadVerticle.class, 1));
}
};
CompositeFuture.all(futures).setHandler(ar -> {
if (ar.succeeded()) {
logger.info("all verticle start");
} else {
logger.error("verticle(s) failed: " + ar.cause());
}
});
}
/**
* 简单部署
*
* @param vertx 全局vertx
* @param verticleType 部署的verticel类
* @return 部署后的Future回调
*/
public static Future<Void> simpleDeploy(Vertx vertx, Class<?> verticleType) {
return Future.future(res -> {
vertx.deployVerticle(verticleType.getTypeName(), deployRes -> {
if (deployRes.succeeded()) {
res.complete();
} else {
res.fail(deployRes.cause());
}
});
});
}
/**
* 配置化部署
*
* @param vertx 全局vertx
* @param verticleType 部署的verticel类
* @param workerSize 实例个数
* @return 部署后的Future回调
*/
public static Future<Void> optionalDeploy(Vertx vertx, Class<?> verticleType, int workerSize) {
var options = new DeploymentOptions()
.setWorker(true)
.setWorkerPoolName(verticleType.getTypeName() + "-pool")
.setWorkerPoolSize(workerSize)
.setMaxWorkerExecuteTime(TimeUnit.SECONDS.toNanos(10))
.setInstances(workerSize);
return Future.future(res -> {
vertx.deployVerticle(verticleType.getTypeName(), options, deployRes -> {
if (deployRes.succeeded()) {
res.complete();
} else {
res.fail(deployRes.cause());
}
});
});
}
}