VideoInsertVerticle.java
3.0 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.aries.crawler.verticles;
import com.aries.crawler.sqlbuilder.InsertBuilder;
import com.aries.crawler.tools.MySqlExecuteHelper;
import com.aries.crawler.trans.message.DouyinVideoInfoMessage;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import static com.aries.crawler.trans.EventBusTopic.MYSQL_DOUYIN_VIDEO_INSERT;
import static com.aries.crawler.trans.message.CommonResponseMessage.COMMON_FAILED_MESSAGE;
import static com.aries.crawler.trans.message.CommonResponseMessage.COMMON_SUCCESS_MESSAGE;
/**
* @author arowana
*/
public class VideoInsertVerticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(VideoInsertVerticle.class);
@Override
public void start() {
// 用于插入视频数据
vertx.eventBus().consumer(MYSQL_DOUYIN_VIDEO_INSERT.getTopic(), this::mysqlDouyinVideoInsertHandler).setMaxBufferedMessages(4000);
}
public static String getDateTimeAsString(LocalDateTime localDateTime, String format) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
return localDateTime.format(formatter);
}
private void mysqlDouyinVideoInsertHandler(Message<Object> message) {
var videoInfoMessage = (DouyinVideoInfoMessage) message.body();
// 构建sql数据, 插入视频信息。
var insertBuilder = new InsertBuilder("douyin_video_info")
.set("id", videoInfoMessage.awemeId())
.set("comments", videoInfoMessage.awemeDesc())
.set("create_time", videoInfoMessage.awemeCreateTime())
.set("uid", videoInfoMessage.authorUid())
.set("cover_url", videoInfoMessage.videoCoverUrl())
.set("dynamic_cover_url", videoInfoMessage.videoDynamicCoverUrl())
.set("download_addr_url", videoInfoMessage.videoDownloadAddrUrl())
.set("share_url", videoInfoMessage.videoShareUrl())
.set("tag", videoInfoMessage.videoVideoTag())
.set("duration", videoInfoMessage.videoDuration())
.set("type", videoInfoMessage.type())
.onDuplicateKeyUpdate("ut", getDateTimeAsString(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"));
logger.info("video sql:" + insertBuilder.getSql() + "---values:" + insertBuilder.getValues());
MySqlExecuteHelper.prepareExecute(vertx, insertBuilder.getSql(), insertBuilder.getValues(), mysqlExecutorRes -> {
if (mysqlExecutorRes.succeeded()) {
logger.info("insert video succ, awemeid:" + videoInfoMessage.awemeId());
message.reply(COMMON_SUCCESS_MESSAGE);
} else {
logger.info("insert video. failed:" + mysqlExecutorRes.cause());
message.reply(COMMON_FAILED_MESSAGE);
}
});
}
}