-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
ZMQ producer-consumer state table design.md
88 lines (81 loc) · 4.13 KB
/
ZMQ producer-consumer state table design.md
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# ZMQ producer/consumer state table design
# Table of Contents
- [Table of Contents](#table-of-contents)
- [1 Functional Requirements](#1-functional-requirement)
* [1.1 Supported operations](#11-supported-operations)
* [1.2 ZMQ producer state table support async operation](#12-zmq-producer-state-table-support-async-operation)
* [1.3 ZMQ consumer state table support async operation](#13-zmq-consumer-state-table-support-async-operation)(#2-configuration-and-management-requirements)
- [2 Design](#1-design)
* [2.1 diagram](#21-diagram)
* [2.1 flow chart](#21-flow-chart)
- [3 References](#references)
* [ZMQ](#zmq)
# 1 Functional Requirement
## 1.1 ZMQ client supported operations
- ZmqClient will send message to ZMQ.
- ZmqClient can be reused by multiple ZmqProducerStateTable instance.
- ZmqClient sendMsg() method is thread safe async method, will return immediately, ZMQ lib support async operation.
- ZmqClient will retry when send not success:
- When ZMQ socket connection broken, send API will failed and need re-connect and send again.
- When ZMQ send queue is full, send API will failed and need retry later.
- When a signal come, ZMQ send API will failed, need retry again.
- ZmqClient will throw exception after retry failed.
- ZmqClient will throw exception when ZMQ connection break.
## 1.2 ZMQ server supported operations
- ZmqServer will start a receive thread and receive message from ZMQ.
- ZmqServer can be reused by multiple ZmqConsumerStateTable instance.
- When ZmqServer receive message from ZMQ, ZmqServer will:
- De-serialize received message.
- Find ZmqMessageHandler by message content.
- Dispatch message to ZmqMessageHandler.
## 1.3 ZMQ producer state table supported operations
- Producer table will use ZmqClient to send message.
- Should support following operations.
- Set:
void set(const std::string &key,
const std::vector<FieldValueTuple> &values,
const std::string &op = SET_COMMAND,
const std::string &prefix = EMPTY_PREFIX)
- Delete:
void del(const std::string &key,
const std::string &op = DEL_COMMAND,
const std::string &prefix = EMPTY_PREFIX)
- Batch Set:
void set(const std::vector<KeyOpFieldsValuesTuple>& values)
- Batch Delete:
void del(const std::vector<std::string>& keys)
## 1.3 ZMQ consumer state table supported operations
- Consumer implement ZmqMessageHandler interface.
- When consumer table receive message from ZmqServer, consumer table will:
- Send notification to select to handle received operation.
- Send notification to DB update thread for write received operation to database.
This is a configurable feature, could turn on/off this feature in use cases requiring less memory consumption or higher performance.
- After send notification, continue receive next message from ZMQ.
# 2 Design
- Diagram:
<img src="./zmq-diagram.png" style="zoom:100%;" />
- Sequence:
<img src="./zmq-sequence.png" style="zoom:100%;" />
- Call ZmqProducerStateTable API.
- ZmqProducerStateTable will send message with ZmqClient.
- ZmqClient will serialize operation and send to ZMQ.
- ZmqClient will add database name and table name to message for ZmqServer side message dispatch.
- Return when send success.
- Retry when send failed.
- Throw exception when retry failed.
- ZmqServer Side:
- m_mqPollThread:
- Receive message from ZMQ.
- De-serialize received message then dispatch message to ZmqConsumerStateTable.
- ZmqServer manage a database name & table name to ZmqConsumerStateTable mapping. ZmqConsumerStateTable will register itself to this mapping.
- ZmqServer will find ZmqConsumerStateTable by database name and table name.
- Continue receive next message.
- ZmqConsumerStateTable Side:
- Receive message from ZmqServer then:
- Notify select event
- Notify m_dbUpdateThread
- m_dbUpdateThread:
- Update data from m_DbUpdateDataQueue to Redis database.
- Select will pop operations from m_receivedQueue with ZmqConsumerStateTable::pops().
# 3 References
- ZMQ: https://zguide.zeromq.org/docs/