WideDataDispatchVerticle.java
5.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package com.aries.crawler.verticles;
import com.aries.crawler.model.douyincrawler.DouyinCrawlerLogModel;
import com.aries.crawler.trans.message.DouyinUserInfoMessage;
import com.aries.crawler.trans.message.DouyinVideoInfoMessage;
import com.aries.crawler.trans.message.DouyinWideDataMessage;
import com.aries.crawler.trans.message.SimpleInt64Message;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.math.BigInteger;
import java.util.concurrent.TimeUnit;
import static com.aries.crawler.trans.EventBusTopic.*;
import static com.aries.crawler.trans.message.CommonResponseMessage.COMMON_SUCCESS_MESSAGE;
/**
* 这个verticle的职责是:
* 将受到的数据, 根据状态做不同的处理。
* <p> status = 0 表示这个数据没被处理过, 要把宽表中的数据拆成两部分, 分别交给用户数据插入器和视频数据插入器来处理。
* <p> status = 1 表示这个款数据中的用户数据部分已经处理过, 但是视频数据的部分还没处理。需要发给视频数据插入器来处理。
* <p> status = 2 表示这个款数据中的视频数据部分已经处理过, 但是用户数据的部分还没处理。需要发给用户数据插入器来处理。
* <p> status = 3 表示这个数据已经处理过, 没有剩余价值了, 不必处理, 忽略就可以了。
*
* @author arowana
*/
public class WideDataDispatchVerticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WideDataDispatchVerticle.class);
@Override
public void start() {
vertx.eventBus().consumer(LOGIC_DOUYIN_WIDEDATA_DISPATCH.getTopic(), this::dispatch).setMaxBufferedMessages(4000);
}
private void dispatch(Message<Object> message) {
var wideDataMessage = (DouyinWideDataMessage) message.body();
// 如果 用户部分的数据 未处于已处理状态
if ((wideDataMessage.status() & DouyinCrawlerLogModel.STATUS_USER_DONE) == 0) {
logger.info("user data need to be parsed, uid:" + wideDataMessage.authorUid());
processUser(DouyinUserInfoMessage.of(wideDataMessage), wideDataMessage.id());
}
// 如果 视频部分的数据 未处于已处理状态
if ((wideDataMessage.status() & DouyinCrawlerLogModel.STATUS_VIDEO_DONE) == 0) {
logger.info("video data need to be parsed, awemeid:" + wideDataMessage.awemeId());
processVideo(DouyinVideoInfoMessage.of(wideDataMessage), wideDataMessage.id());
}
message.reply(COMMON_SUCCESS_MESSAGE);
}
private void processVideo(DouyinVideoInfoMessage douyinVideoInfoMessage, BigInteger wideDataId) {
logger.info("prepare to insert video, awemeid: " + douyinVideoInfoMessage.awemeId());
vertx.eventBus().request(MYSQL_DOUYIN_VIDEO_INSERT.getTopic(), douyinVideoInfoMessage, new DeliveryOptions().setSendTimeout(TimeUnit.SECONDS.toMillis(20)), insertReply -> {
vertx.executeBlocking(future -> {
if (insertReply.succeeded()) {
logger.info("insert video reply succeeded, awemeid: " + douyinVideoInfoMessage.awemeId());
future.complete(100);
} else {
logger.error("insert video reply failed, awemeid: " + douyinVideoInfoMessage.awemeId() + ",cause:" + insertReply.cause());
}
}, res -> {
if (res.result() instanceof Integer s) {
if (s.equals(100)) {
vertx.eventBus().request(MYSQL_DOUYIN_WIDEDATA_UPDATE_STATUS_VIDEO.getTopic(), new SimpleInt64Message(wideDataId), updateReply -> {
if (updateReply.succeeded()) {
logger.info("update status video succ ...");
} else {
logger.info("update status video fail ...");
}
});
}
}
});
});
}
private void processUser(DouyinUserInfoMessage douyinUserInfoMessage, BigInteger wideDataId) {
logger.info("prepare to insert user, uid:" + douyinUserInfoMessage.uid());
vertx.eventBus().request(MYSQL_DOUYIN_USER_INSERT.getTopic(), douyinUserInfoMessage, new DeliveryOptions().setSendTimeout(TimeUnit.SECONDS.toMillis(20)), insertReply -> {
vertx.executeBlocking(future -> {
if (insertReply.succeeded()) {
logger.info("insert user reply succeeded, uid: " + douyinUserInfoMessage.uid());
future.complete(100);
} else {
logger.error("insert user reply failed, uid: " + douyinUserInfoMessage.uid() + ",cause:" + insertReply.cause());
}
}, res -> {
if (res.result() instanceof Integer s) {
if (s.equals(100)) {
vertx.eventBus().request(MYSQL_DOUYIN_WIDEDATA_UPDATE_STATUS_USER.getTopic(), new SimpleInt64Message(wideDataId), updateReply -> {
if (updateReply.succeeded()) {
logger.info("update status user succ ...");
} else {
logger.info("update status user fail ...");
}
});
}
}
});
});
}
}