异度部落格

学习是一种生活态度。

0%

Zookeeper源码分析-Zookeeper Server启动分析

Zookeeper Server 的启动入口为 org.apache.zookeeper.server.quorum.QuorumPeerMain。Zookeeper 的启动模式分为两种:一种为 standalone mode;另一种为 cluster mode。

  • Standalone 模式:当配置文件中仅配置了一台 server 时,Zookeeper 将以 standalone 模式启动,启动类为 ZooKeeperServerMain,此处不作详细分析。
  • Cluster 模式:当配置文件中配置了多台 server,构建 cluster,启动类为 QuorumPeer#start()。
1
2
3
4
5
6
public synchronized void start() {
loadDataBase();
cnxnFactory.start();
startLeaderElection();
super.start();
}

清理 DataDir

在 Server 启动时,根据启动的配置参数启动一个 TimeTask 用于清理 DataDir 中的 snapshot 及对应的 transactionlog。由于 Zookeeper 的任何一个写操作都将在 transaction log 中留下记录,当写操作达到一定量或者一定时间间隔,Zookeeper 将 transaction log 合并为 snapshot。所以随着运行时间的增长生成的 transaction log 和 snapshot 将越来越多,所以定期清理是必要的。在 DatadirCleanupManager 中有两个参数:

  • snapRetainCount:清理后保留的 snapshot 的个数,该参数至少要大于 3。
  • purgeInterval:定期清理的时间间隔,以小时为单位。
1
2
3
4
5
// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();

加载 ZKDatabase

1.从 snapshot 和 transaction log 中恢复 ZKDatabase,并将其载入内存。

1
zkDb.loadDataBase();

2.载入 currentEpoch 和 acceptedEpoch

首先要明白什么是 epoch,官方给出的解释是:

The zxid has two parts: the epoch and a counter. In our implementation the zxid is a 64-bit number. We use the high order 32-bits for the epoch and the low order 32-bits for the counter. Because it has two parts represent the zxid both as a number and as a pair of integers, (epoch, count). The epoch number represents a change in leadership. Each time a new leader comes into power it will have its own epoch number.

从 currentEpoch 文件中读取 current epoch;若 currentEpoch 文件不存在,Zookeeper 将从 lastProcessedZxid 中获取 epoch 作为 current epoch,写入 currentEpoch 文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
if (epochOfZxid > currentEpoch && updating.exists()) {
LOG.info("{} found. The server was terminated after " +
"taking a snapshot but before updating current " +
"epoch. Setting current epoch to {}.",
UPDATING_EPOCH_FILENAME, epochOfZxid);
setCurrentEpoch(epochOfZxid);
if (!updating.delete()) {
throw new IOException("Failed to delete " +
updating.toString());
}
}
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid;
LOG.info(CURRENT_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
currentEpoch);
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
}

同理,获取 accepted epoch。

1
2
3
4
5
6
7
8
9
10
11
12
try {
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
acceptedEpoch = epochOfZxid;
LOG.info(ACCEPTED_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
acceptedEpoch);
writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
}

启动 ServerCnxnFactory 线程

ServerCnxnFacotry 是管理 ServerCnxn 处理类的工厂,它负责对 connection 上数据处理的调度,以及 server 级别的一些处理,例如关闭指定 session 等。有关 ServerCnxnFactory 的实现类,可分为三种情况:

  • NIO 模式。这是 Zookeeper 默认的 ServerCnxnFactory 实现,其实现类为 NIOServerCnxnFactory。
  • Netty 模式。在 Zookeeper 3.4 以后引入了 Netty 作为 Server 端连接处理的可选实现。Netty 是一套非常高效的异步通信框架。可以通过 JVM 参数 zookeeper.serverCnxnFactory 进行配置。
  • 自定义模型。Zookeeper 还支持自定类来实现通信,同样可以通过 JVM 参数 zookeeper.serverCnxnFactory 进行配置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Static public ServerCnxnFactory createFactory() throws IOException {
String serverCnxnFactoryName =
System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.newInstance();
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ serverCnxnFactoryName);
ioe.initCause(e);
throw ioe;
}
}

ServerCnxnFactory 的主要职责:

  1. 在单机模式下,引导完成 Zookeeper Server 的实例化。
  2. 异步接收 Client 的 IO 连接,并维护连接的 IO 操作,这是 ServerCnxnFactory 的核心功能。ServerCnxnFacotry 本身被设计成一个 Thread,在完成初始化工作之后,就开始启动自身线程,在线程 run 方法中,采用 NIO 的方式 Accept 客户端连接,创建一个 NIOServerCnxn 实例,此实例和普通的 NIO 设计思路一样,它持有当前连接的 Channel 句柄和 Buffer 队列,最终将此 NIOServerCnxn 放入 Factory 内部的一个 set 中,以便此后对链接信息进行查询和操作(比如关闭操作,IO 中 read 和 write 操作等)。

Leader 选举

Leader 的选举算法有三种:

  • LeaderElection:LeaderElection 采用的是 fast paxos 算法的一种简单实现,目前该算法在 3.4 以后已经建议弃用。
  • FastLeaderElection:FastLeaderElection 是标准的 fast paxos 的实现,这是目前 Zookeeper 默认的 Leader 选举算法。
  • AuthFastLeaderElection:AuthFastLeaderElection 算法同 FastLeaderElection 算法基本一致,只是在消息中加入了认证信息,该算法在最新的 Zookeeper 中也建议弃用。

Leader 选举算法分析详见:Zookeeper 源码分析-Zookeeper Leader 选举算法

QuorumPeer 启动

完成 DataDir,ZKDatabase 加载以及 Leader 选举动作后,QuorumPeer 完成初始化及启动工作。

参考资料

http://zookeeper.apache.org/doc/r3.4.6/zookeeperInternals.html

http://shift-alt-ctrl.iteye.com/blog/1846507