UpdateDataVerticle.java
4.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package com.aries.crawler.verticles;
import com.aries.crawler.sqlbuilder.UpdateBuilder;
import com.aries.crawler.tools.MySqlExecuteHelper;
import com.aries.crawler.trans.message.SimpleInt64Message;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import static com.aries.crawler.trans.EventBusTopic.*;
import static com.aries.crawler.trans.message.CommonResponseMessage.COMMON_FAILED_MESSAGE;
import static com.aries.crawler.trans.message.CommonResponseMessage.COMMON_SUCCESS_MESSAGE;
/**
* @author arowana
*/
public class UpdateDataVerticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(UpdateDataVerticle.class);
public static String getDateTimeAsString(LocalDateTime localDateTime, String format) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
return localDateTime.format(formatter);
}
@Override
public void start() {
// 更新款表的status状态为'已处理用户数据'状态
vertx.eventBus().consumer(MYSQL_DOUYIN_WIDEDATA_UPDATE_STATUS_USER.getTopic(), this::mysqlDouyinWideDataUpdateStatusUser).setMaxBufferedMessages(4000);
// 更新款表的status状态为'已处理视频数据'状态
vertx.eventBus().consumer(MYSQL_DOUYIN_WIDEDATA_UPDATE_STATUS_VIDEO.getTopic(), this::mysqlDouyinWideDataUpdateStatusVideo).setMaxBufferedMessages(4000);
// 更新完成下载的视频
vertx.eventBus().consumer(MYSQL_DOUYIN_VIDEO__UPDATE_STATUS_DOWNLOADED.getTopic(), this::mysqlDouyinVideoDataUpdateStatusDownloaded).setMaxBufferedMessages(4000);
// 更新下载失败的视频
vertx.eventBus().consumer(MYSQL_DOUYIN_VIDEO__UPDATE_STATUS_FAILED.getTopic(), this::mysqlDouyinVideoDataUpdateStatusFailed).setMaxBufferedMessages(4000);
}
private void mysqlDouyinWideDataUpdateStatusUser(Message<Object> message) {
var idMessage = (SimpleInt64Message) message.body();
// 构建sql数据, 插入用户信息。
var insertBuilder = new UpdateBuilder("douyin_crawler_log")
.set("status = status | 1")
.where("id=" + idMessage.id())
.toString();
MySqlExecuteHelper.execute(vertx, insertBuilder, mysqlExecutorRes -> {
if (mysqlExecutorRes.succeeded()) {
message.reply(COMMON_SUCCESS_MESSAGE);
} else {
message.reply(COMMON_FAILED_MESSAGE);
}
});
}
private void mysqlDouyinWideDataUpdateStatusVideo(Message<Object> message) {
var idMessage = (SimpleInt64Message) message.body();
// 构建sql数据, 插入用户信息。
var insertBuilder = new UpdateBuilder("douyin_crawler_log")
.set("status = status | 2")
.where("id=" + idMessage.id())
.toString();
MySqlExecuteHelper.execute(vertx, insertBuilder, mysqlExecutorRes -> {
if (mysqlExecutorRes.succeeded()) {
message.reply(COMMON_SUCCESS_MESSAGE);
} else {
message.reply(COMMON_FAILED_MESSAGE);
}
});
}
private void mysqlDouyinVideoDataUpdateStatusDownloaded(Message<Object> message) {
var idMessage = (SimpleInt64Message) message.body();
// 构建sql数据, 插入用户信息。
var insertBuilder = new UpdateBuilder("douyin_video_info")
.set("status = status | 1")
.where("id=" + idMessage.id())
.toString();
MySqlExecuteHelper.execute(vertx, insertBuilder, mysqlExecutorRes -> {
if (mysqlExecutorRes.succeeded()) {
message.reply(COMMON_SUCCESS_MESSAGE);
} else {
message.reply(COMMON_FAILED_MESSAGE);
}
});
}
private void mysqlDouyinVideoDataUpdateStatusFailed(Message<Object> message) {
var idMessage = (SimpleInt64Message) message.body();
// 构建sql数据, 插入用户信息。
var insertBuilder = new UpdateBuilder("douyin_video_info")
.set("status = status | 2")
.where("id=" + idMessage.id())
.toString();
MySqlExecuteHelper.execute(vertx, insertBuilder, mysqlExecutorRes -> {
if (mysqlExecutorRes.succeeded()) {
message.reply(COMMON_SUCCESS_MESSAGE);
} else {
message.reply(COMMON_FAILED_MESSAGE);
}
});
}
}