Zookeeper的leader选举

[TOC]

Zookeeper的Leader选举

前面我们聊了一下ZAB协议以及Zookeeper的基础概念,心想着都到这个份上了,那还是把剩下的“Leader选举”、“分布式锁”、“惊群和脑裂”都跟大家简单聊聊,这些知识应该足够准备校招的你造火箭了。

今天首先说一下Zookeeper的Leader选举流程以及其中涉及的FastLeaderElection选举算法

说在前面

ZAB协议是保证Zookeeper集群数据一致性协议其中会涉及选举流程,FastLeaderElection是Zookeeper选举Leader的算法之一。这两点概念一定要搞清楚,不然很容易混为一谈。

Leader选举

两个关键时期:

  • 启动Zookeeper集群时
  • Leader崩溃进行崩溃恢复时

一些基础概念你需要提前预知:

img

1.对选举Leader的要求:

1
2
3
选出的leader节点上要持有最高zxid
选出的leader要有过半数节点同意
复制代码

2.内置实现的选举算法

1
2
3
4
LeaderElection
FastLeaderElection(默认的)
AuthFastLeaderElection
复制代码

3.选举状态

1
2
3
4
5
LOOKING:竞选状态
FOLLOWING:跟随状态,同步leader状态,参与投票
OBSERVING:管擦状态,同步leader状态,不参与投票
LEADING:领导者状态
复制代码

4.部分名词

1
2
3
4
服务器id---myid(或后文的sid,集群模式下必有该配置项) 
事务id---服务器中存放的最大zxid
逻辑时钟---发起的投票轮数计数
复制代码

选举流程

Zookeeper要求集群机器必须是奇数个(避免脑裂,下文会讲),那么我们假设有三台服务器。接着介绍一下三台服务器的Leader选举流程。

  • 每个Server发出一个投票。由于是初始情况,Server1和Server2都会将自己作为Leader服务器来进行投票,每次投票会包含所推举的服务器的myid和ZXID,使用(myid, ZXID)来表示,此时Server1的投票为(1, 0),Server2的投票为(2, 0),然后各自将这个投票发给集群中其他机器。
1
PS:不懂什么叫为自己投票(不知道票的数据结构?),别急后面带你看源码!!!
  • 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票、是否来自LOOKING状态的服务器。
  • 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行比较,比较规则如下:
1
2
3
4
 优先判断ZXID。ZXID(事务ID)比较大的服务器优先作为Leader。

如果ZXID相同,那么就比较myid。myid(服务器ID)较大的服务器作为Leader服务器。
复制代码

对于Server1而言,它的投票是(1, 0),接收Server2的投票为(2, 0),首先会比较两者的ZXID,均为0,再比较myid,此时Server2的myid最大,于是更新自己的投票为(2, 0),然后重新投票,对于Server2而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。

1
PS:于是更新自己的投票为(2, 0)?

其涵义指的是将自己下次发出的投票信息更新为(2, 0),以该票作为新的投票依据。

  • 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于Server1、Server2而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出了Leader。
  • 改变服务器状态。一旦确定了Leader,每个服务器就会更新自己的状态,如果是Follower,那么就变更为FOLLOWING,如果是Leader,就变更为LEADING。

简而言之

1.每个服务实例均发起选举自己为leader的投票。

2.其他服务实例收到投票邀请时,比较发起者的数据事务id是否比自己最新的事务ID大,大则给它投一票,小则不投票,相等则比较发起者的服务器ID,大则投票给它 。

3.发起者收到大家的投票反馈后,看投票数(包括自己的票数)是否大于集群的半数,大于则成为leader,未超过半数且leader未选出,则再次发起投票。

Leader选举算法

在了解了选举流程后我们介绍一下Zookeeper源码中对于算法中的实现细节。

借助网上随处可以百度到的算法描述,我再一次针对其中涉及的疑难点做一个解说,其大致流程如下:

第一次投票。无论哪种导致进行Leader选举,集群的所有机器都处于试图选举出一个Leader的状态,即LOOKING状态,LOOKING机器会向所有其他机器发送消息,该消息称为投票。投票中包含了SID(服务器的唯一标识)和ZXID(事务ID),(SID, ZXID)形式来标识一次投票信息。

假定Zookeeper由5台机器组成,SID分别为1、2、3、4、5,ZXID分别为9、9、9、8、8,并且此时SID为2的机器是Leader机器,某一时刻,1、2所在机器出现故障,因此集群开始进行Leader选举。在第一次投票时,每台机器都会将自己作为投票对象,于是SID为3、4、5的机器投票情况分别为(3, 9),(4, 8), (5, 8)。

1
2
3
4
5
6
7
此时五台机器手里的投票分别为:
服务器一:(1,9)假设故障 ×
服务器二:(2,9)假设故障 ×
服务器三:(3,9)
服务器四:(4,8)
服务器五:(5,8)
复制代码

变更投票。每台机器发出投票后,也会收到其他机器的投票,每台机器会根据一定规则来处理收到的其他机器的投票,并以此来决定是否需要变更自己的投票,这个规则也是整个Leader选举算法的核心所在,其中术语描述如下

1
2
3
4
5
6
7
8
vote_sid:接收到的投票中所推举Leader服务器的SID。

vote_zxid:接收到的投票中所推举Leader服务器的ZXID。

self_sid:当前服务器自己的SID。

self_zxid:当前服务器自己的ZXID。
复制代码

每次对收到的投票的处理,都是对(vote_sid, vote_zxid)和(self_sid, self_zxid)对比的过程。

规则一:如果vote_zxid大于self_zxid,就认可当前收到的投票,并再次将该投票发送出去。(接收到的事务id大于自己当前事务id)

规则二:如果vote_zxid小于self_zxid,那么坚持自己的投票,不做任何变更。(接收到的事务id小于自己当前事务id)

规则三:如果vote_zxid等于self_zxid,那么就对比两者的SID,如果vote_sid大于self_sid,那么就认可当前收到的投票,并再次将该投票发送出去。(事务ID相等比较服务器ID及zxid)

规则四:如果vote_zxid等于self_zxid,并且vote_sid小于self_sid,那么坚持自己的投票,不做任何变更。

具体流程如图:

在这里插入图片描述

确定Leader。经过第二轮投票后,集群中的每台机器都会再次接收到其他机器的投票,然后开始统计投票,如果一台机器收到了超过半数的相同投票,那么这个投票对应的SID机器即为Leader。此时Server3将成为Leader。

选举流程源码

光说不练假把式,搞懂了Leader选举的基本流程,再来探究一下源码,源码之下无秘密!

用我的地址去拉取源码可能会快些。Zookeeper源码 git clone

投票数据结构

我们先解决前面的疑惑投票(或者说票)到底是什么结构?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Vote {
...
private final int version;// 版本号

private final long id;//被推举的Leader的SID

private final long zxid;//被推举的Leader事务ID

private final long electionEpoch;//逻辑时钟,用来判断多个投票是否在同一轮选举周期中,每轮自加1

private final long peerEpoch;//被推举的Leader的epoch

private final ServerState state;//当前服务器的状态
...
}
// 服务器状态
public enum ServerState {
LOOKING,
FOLLOWING,
LEADING,
OBSERVING
}
复制代码

知道我们投的是什么票了,接下来我们理一下整个算法流程。

源码入口

zookeeper\zookeeper-server\src\main\java\org\apache\zookeeper\server\quorum下

非核心代码我给大家省去了,如果有兴趣想研究,可以按着我的分析流程查看源码细节。

QuorumPeerMain.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* To start the replicated server specify the configuration file name on
* the command line.
* @param args path to the configfile
*/
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);//入口
} catch (IllegalArgumentException e) {
...
}
LOG.info("Exiting normally");
ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue());
}

protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}

// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
config.getDataDir(),
config.getDataLogDir(),
config.getSnapRetainCount(),
config.getPurgeInterval());
purgeMgr.start();

//判断是standalone模式还是集群模式
if (args.length == 1 && config.isDistributed()) {
//集群模式
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}

public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}

LOG.info("Starting quorum peer");
MetricsProvider metricsProvider;
try {
metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
config.getMetricsProviderClassName(),
config.getMetricsProviderConfiguration());
} catch (MetricsProviderLifeCycleException error) {
throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
}
try {
ServerMetrics.metricsProviderInitialized(metricsProvider);
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;

//为客户端提供读写的server 及2181的端口
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}

if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
}
...
//启动主线程
quorumPeer.start();
ZKAuditProvider.addZKStartStopAuditLog();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
...
}
复制代码

调用 QuorumPeer 的 start方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
//loaddatabase主要是从本地文件中恢复数据,以及获取最新的 zxid
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
//选举初始化
startLeaderElection();
startJvmPauseMonitor();
super.start();
}
...
public synchronized void startLeaderElection() {
try {
//如果当前节点状态是LOOKING 投票给自己
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}

//根据配置获取选举算法 可以通过在 zoo.cfg 里面进行配置,默认是 fast 选举
this.electionAlg = createElectionAlgorithm(electionType);
}
...
@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;

//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 1:
throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
case 2:
throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
case 3:
//leader选举网络io负责类(负责底层网络处理接收和发送队列中的消息)
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
//启动已绑定的选举线程 等待集群中其他机器连接
listener.start();
//基于TCP的选举算法 FastLeaderElection
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}

// 其中FastLeaderElection fle = new FastLeaderElection(this, qcm);会调用一下构造方法
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
this.stop = false;
this.manager = manager;
starter(self, manager);
}
// 一目了然不多解释
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;

sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}

// FastLeaderElection的start方法被调用会构建Messenger
//Starts instances of WorkerSender and WorkerReceiver启动消息接收器和发送器线程
public void start() {
this.messenger.start();
}
/**
* Constructor of class Messenger.
*
* @param manager Connection manager
*/
Messenger(QuorumCnxManager manager) {
this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true);

this.wr = new WorkerReceiver(manager);

this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);
}
/**
* Starts instances of WorkerSender and WorkerReceiver
*/
void start() {
this.wsThread.start();
this.wrThread.start();
}
...
// 以上执行完成后QuorumPeer的run方法被调用
@Override
public void run() {
...
try {
/*
* Main loop
*/
while (running) {
//判断当前节点状态
switch (getPeerState()) {
case LOOKING:
//如果是LOOKING 则进入选举流程
LOG.info("LOOKING");
ServerMetrics.getMetrics().LOOKING_COUNT.add(1);

if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");

// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
//此处通过策略模式来决定当前用哪个选举算法来进行领导选举
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
//此处通过策略模式决定当前用哪个选举算法来进行领导选举
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
...
break;
case FOLLOWING:
...
break;
case LEADING:
...
break;
}
}
} finally {
...
}
}
复制代码

执行核心选举算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
// 入口前文:setCurrentVote(makeLEStrategy().lookForLeader());
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}

self.start_fle = Time.currentElapsedTime();
try {
/*
* The votes from the current leader election are stored in recvset. In other words, a vote v is in recvset
* if v.electionEpoch == logicalclock. The current participant uses recvset to deduce on whether a majority
* of participants has voted for it.
*/
//保存收到的投票
Map<Long, Vote> recvset = new HashMap<Long, Vote>();

/*
* The votes from previous leader elections, as well as the votes from the current leader election are
* stored in outofelection. Note that notifications in a LOOKING state are not stored in outofelection.
* Only FOLLOWING or LEADING notifications are stored in outofelection. The current participant could use
* outofelection to learn which participant is the leader if it arrives late (i.e., higher logicalclock than
* the electionEpoch of the received notifications) in a leader election.
*/
//存储选举结果
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

int notTimeout = minNotificationInterval;

synchronized (this) {
//增加逻辑时钟 +1原子操作
logicalclock.incrementAndGet();
//更新自己的zxid和epoch
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}

LOG.info(
"New election. My id = {}, proposed zxid=0x{}",
self.getId(),
Long.toHexString(proposedZxid));
//发送投票 包括发送给自己(广播)
sendNotifications();

SyncedLearnerTracker voteSet;

/*
* Loop in which we exchange notifications until we find a leader
*/

//进行while循环 直到选举出leader
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
//从接收IO线程里拿到投票信息 自己的投票也在这里处理
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
//如果为空 消息发完了 继续发送 一直到选出leader为止
if (n == null) {
if (manager.haveDelivered()) {
sendNotifications();
} else {
//消息还没投递出去 可能是其他server还没启动 尝试再连接
manager.connectAll();
}

/*
* Exponential backoff
*/
//延长超时时间
int tmpTimeOut = notTimeout * 2;
notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
LOG.info("Notification time out: {}", notTimeout);
//收到投票消息 判断收到的消息是不是属于这个集群内
} else if (validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view for a replica in the current or next voting view.
*/
//判断收到的消息的节点的状态
switch (n.state) {
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
break;
}
// If notification > current, replace and send messages out
//判断接收到的节点epoch大于logicalclock 则表示当前是新一轮的选举
if (n.electionEpoch > logicalclock.get()) {
//更新本地logicalclock
logicalclock.set(n.electionEpoch);
//清空接收队列
recvset.clear();
//检查收到的消息是否可以胜出 依次比较epoch zxid myid
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
//胜出后 把投票改为对方的票据
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
//否则 票据不变
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//继续广播 让其他节点知道我现在的票据
sendNotifications();
//如果收到的消息epoch小于当前节点的epoch 则忽略这条消息
} else if (n.electionEpoch < logicalclock.get()) {
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
Long.toHexString(n.electionEpoch),
Long.toHexString(logicalclock.get()));
break;
//如果epoch相同 继续比较zxid myid 如果胜出 则更新自己的票据 并发出广播
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}

LOG.debug(
"Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
n.sid,
n.leader,
Long.toHexString(n.zxid),
Long.toHexString(n.electionEpoch));

// don't care about the version if it's in LOOKING state
//添加到本机投票集合 用来做选举终结判断
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

//判断选举是否结束 默认算法是超过半数server同意
if (voteSet.hasAllQuorums()) {

// Verify if there is any change in the proposed leader
//一直等到新的通知到达 直到超时
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}

/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
//确定leader
if (n == null) {
//修改状态
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
//OBSERVING 不参与选举投票
case OBSERVING:
LOG.debug("Notification from observer: {}", n.sid);
break;
//这两种需要参与选举
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
//判断epoch是否相同
if (n.electionEpoch == logicalclock.get()) {
//如果相同 加入本机的投票集合
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
//判断是否结束 如果结束 确认leader是否有效
if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
//修改自己的状态并返回投票结果
setPeerState(n.leader, voteSet);
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}

/*
* Before joining an established ensemble, verify that
* a majority are following the same leader.
*
* Note that the outofelection map also stores votes from the current leader election.
* See ZOOKEEPER-1732 for more information.
*/
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
setPeerState(n.leader, voteSet);
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if (self.jmxLeaderElectionBean != null) {
MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
}
}


/**
* Send notifications to all peers upon a change in our vote
*/
/**
* 广播消息
*/
private void sendNotifications() {
//循环发送
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
//消息实体
ToSend notmsg = new ToSend(
ToSend.mType.notification,
proposedLeader,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch,
qv.toString().getBytes());

LOG.debug(
"Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"
+ " {} (myid), 0x{} (n.peerEpoch) ",
proposedLeader,
Long.toHexString(proposedZxid),
Long.toHexString(logicalclock.get()),
sid,
self.getId(),
Long.toHexString(proposedEpoch));

//添加到发送队列 这个队列会被workersender消费
sendqueue.offer(notmsg);
}
}
复制代码

借助一张网络图片,该图对于选举流程中涉及到数据的流向的描述还是很清楚的。

img

其中涉及一个网络IO管理器:负责维护处理发送和接收两个线程。及选举算法从队列消费生产投票消息。最终执行核心的选票PK,按照一定策略进行更新和丢弃,直到选举出一个Leader。

总结

要想理解清楚Leader选举流程,其中几个重要的概念及名词要清楚。

  • 事务ID和Zxid的概念要明确
  • Zxid和Sid比较的先后顺序及比较策略
  • 如何理解更新选票并广播自己的选票

OK!关于Zookeeper的Leader选举流程暂时就聊这么多,后期还会对ZK实现的分布式锁以及涉及到的”惊群和脑裂的概念做一个介绍”,如果还有时间的话,再聊聊Zk是进行数据同步的几种模式!欢迎关注公众号:“Java编程之道”!🌹

作者:爱唠嗑的阿磊
链接:https://juejin.im/post/6883483460686594061
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。