Kafka系统通知

[TOC]

image-20200908222307315

Event

1
2
3
4
5
6
7
8
9
10

public class Event {

private String topic;
private int userId;
private int entityType;
private int entityId;
private int entityUserId;
private Map<String, Object> data = new HashMap<>();
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class EventProducer {

@Autowired
private KafkaTemplate kafkaTemplate;

// 处理事件
public void fireEvent(Event event) {
// 将事件发布到指定的主题
kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class EventConsumer implements CommunityConstant {

private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

@Autowired
private MessageService messageService;

@Autowired
private DiscussPostService discussPostService;

@Autowired
private ElasticsearchService elasticsearchService;

@Value("${wk.image.command}")
private String wkImageCommand;

@Value("${wk.image.storage}")
private String wkImageStorage;

@Value("${qiniu.key.access}")
private String accessKey;

@Value("${qiniu.key.secret}")
private String secretKey;

@Value("${qiniu.bucket.share.name}")
private String shareBucketName;

@Autowired
private ThreadPoolTaskScheduler taskScheduler;

@KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})
public void handleCommentMessage(ConsumerRecord record) {
if (record == null || record.value() == null) {
logger.error("消息的内容为空!");
return;
}

Event event = JSONObject.parseObject(record.value().toString(), Event.class);
if (event == null) {
logger.error("消息格式错误!");
return;
}

// 发送站内通知
Message message = new Message();
message.setFromId(SYSTEM_USER_ID);
message.setToId(event.getEntityUserId());
message.setConversationId(event.getTopic());
message.setCreateTime(new Date());

Map<String, Object> content = new HashMap<>();
content.put("userId", event.getUserId());
content.put("entityType", event.getEntityType());
content.put("entityId", event.getEntityId());

if (!event.getData().isEmpty()) {
for (Map.Entry<String, Object> entry : event.getData().entrySet()) {
content.put(entry.getKey(), entry.getValue());
}
}

message.setContent(JSONObject.toJSONString(content));
messageService.addMessage(message);
}

// 消费发帖事件
@KafkaListener(topics = {TOPIC_PUBLISH})
public void handlePublishMessage(ConsumerRecord record) {
if (record == null || record.value() == null) {
logger.error("消息的内容为空!");
return;
}

Event event = JSONObject.parseObject(record.value().toString(), Event.class);
if (event == null) {
logger.error("消息格式错误!");
return;
}

DiscussPost post = discussPostService.findDiscussPostById(event.getEntityId());
elasticsearchService.saveDiscussPost(post);
}

// 消费删帖事件
@KafkaListener(topics = {TOPIC_DELETE})
public void handleDeleteMessage(ConsumerRecord record) {
if (record == null || record.value() == null) {
logger.error("消息的内容为空!");
return;
}

Event event = JSONObject.parseObject(record.value().toString(), Event.class);
if (event == null) {
logger.error("消息格式错误!");
return;
}

elasticsearchService.deleteDiscussPost(event.getEntityId());
}
1
2
3
4
5
6
7
// 触发评论事件
Event event = new Event()
.setTopic(TOPIC_COMMENT)
.setUserId(hostHolder.getUser().getId())
.setEntityType(comment.getEntityType())
.setEntityId(comment.getEntityId())
.setData("postId", discussPostId);
1
2
3
4
5
6
7
8
9
10
11
// 触发点赞事件
if (likeStatus == 1) {
Event event = new Event()
.setTopic(TOPIC_LIKE)
.setUserId(hostHolder.getUser().getId())
.setEntityType(entityType)
.setEntityId(entityId)
.setEntityUserId(entityUserId)
.setData("postId", postId);
eventProducer.fireEvent(event);
}
1
2
3
4
5
6
7
8
        // 触发关注事件
Event event = new Event()
.setTopic(TOPIC_FOLLOW)
.setUserId(hostHolder.getUser().getId())
.setEntityType(entityType)
.setEntityId(entityId)
.setEntityUserId(entityId);
eventProducer.fireEvent(event);

显示系统通知(我们只要评论,赞和关注)

image-20200909105702853

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@RequestMapping(path = "/notice/list", method = RequestMethod.GET)
public String getNoticeList(Model model) {
User user = hostHolder.getUser();

// 查询评论类通知
Message message = messageService.findLatestNotice(user.getId(), TOPIC_COMMENT);
if (message != null) {
Map<String, Object> messageVO = new HashMap<>();
messageVO.put("message", message);

String content = HtmlUtils.htmlUnescape(message.getContent());
Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);

messageVO.put("user", userService.findUserById((Integer) data.get("userId")));
messageVO.put("entityType", data.get("entityType"));
messageVO.put("entityId", data.get("entityId"));
messageVO.put("postId", data.get("postId"));

int count = messageService.findNoticeCount(user.getId(), TOPIC_COMMENT);
messageVO.put("count", count);

int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_COMMENT);
messageVO.put("unread", unread);

model.addAttribute("commentNotice", messageVO);
}

// 查询点赞类通知
message = messageService.findLatestNotice(user.getId(), TOPIC_LIKE);
if (message != null) {
Map<String, Object> messageVO = new HashMap<>();
messageVO.put("message", message);

String content = HtmlUtils.htmlUnescape(message.getContent());
Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);

messageVO.put("user", userService.findUserById((Integer) data.get("userId")));
messageVO.put("entityType", data.get("entityType"));
messageVO.put("entityId", data.get("entityId"));
messageVO.put("postId", data.get("postId"));

int count = messageService.findNoticeCount(user.getId(), TOPIC_LIKE);
messageVO.put("count", count);

int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_LIKE);
messageVO.put("unread", unread);

model.addAttribute("likeNotice", messageVO);
}

// 查询关注类通知
message = messageService.findLatestNotice(user.getId(), TOPIC_FOLLOW);
if (message != null) {
Map<String, Object> messageVO = new HashMap<>();
messageVO.put("message", message);

String content = HtmlUtils.htmlUnescape(message.getContent());
Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);

messageVO.put("user", userService.findUserById((Integer) data.get("userId")));
messageVO.put("entityType", data.get("entityType"));
messageVO.put("entityId", data.get("entityId"));

int count = messageService.findNoticeCount(user.getId(), TOPIC_FOLLOW);
messageVO.put("count", count);

int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_FOLLOW);
messageVO.put("unread", unread);

model.addAttribute("followNotice", messageVO);
}

// 查询未读消息数量
int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);
model.addAttribute("letterUnreadCount", letterUnreadCount);
int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
model.addAttribute("noticeUnreadCount", noticeUnreadCount);

return "/site/notice";
}