Curator 是对zookeeper进行了封装,可以直接使用,大大的提高了开发效率,容易上手,Curator leader选举是分布式中常用的一部分,也是Curator 主要功能之一,在分布式中,往往我们只需其中一个节点进行,其它节点作为备份节点。
要想使用Leader选举功能,需要添加recipes包,可以在maven中添加如下依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.9.0</version>
</dependency>
当然了,由于recipes需要使用framework,所以你肯定还要添加如下依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.9.0</version>
</dependency>
最后,为了简化测试也为了便于学习,可以添加test依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.9.0</version>
</dependency>
LeaderLatch使用流程
recipes包里面提供了Leader选举实现,Spark中的master选举使用的就是reciples包里面的LeaderLatch,使用他们可以极大的简化代码,使你将注意力更多的放在核心业务逻辑上。Leader选举的实现在org.apache.curator.framework.recipes.leader包中,这个包提供了两组Leader选举:
1.LeaderLatch,LeaderLatchListener
2.LeaderSelector,LeaderSelectorListener,LeaderSelectorListenerAdapter
这两组类都可以实现Leader选举,spark 使用的是第一种。再这篇文章里,只介绍第一种。
第一组使用起来非常简单,使用思路大致如下:假设你有3个节点,姑且叫做node0,node1,node2。你需要为每一个node创建一个CuratorFramework,LeaderLatch,LeaderLatchListener,如下:
node0:
1.CuratorFramework client=CuratorFrameworkFactory.newClient(.....);client.start();
2.new LeaderLatch(client,path)->addListener(LeaderLatchListener )->start()
node1:
1.CuratorFramework client=CuratorFrameworkFactory.newClient(.....);client.start();
2.new LeaderLatch(client,path)->addListener(LeaderLatchListener )->start()
node2:
1.CuratorFramework client=CuratorFrameworkFactory.newClient(.....);client.start();
2.new LeaderLatch(client,path)->addListener(LeaderLatchListener )->start()
你首先要创建CuratorFramework,然后并启动它,一个CuratorFramework就是一个ZooKeeper客户端。然后创建LeaderLatch,并制定刚才创建的CuratorFramework和一个leaderPath,leaderPath是一个ZooKeepe路径,node0,node1,node2中的leaderPath必须一致。创建好LeaderLatch之后,需要为他注册一个LeaderLatchListener回掉,如果某个node成为leader,那么会调用这个node的LeaderLatchListener的isLeader(),因此你可以在这里写自己的业务逻辑。最后,调用LeaderLatch的start(),这个LeaderLatch将参加选举了。
可以参考如下代码:
import java.util.ArrayList;
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
public class LeaderDemo {
public static void main(String[]args) throws Exception{
List<LeaderLatch>leaders=new ArrayList<LeaderLatch>();
List<CuratorFramework>clients=new ArrayList<CuratorFramework>();
TestingServer server=new TestingServer();
try{
for(int i=0;i<10;i++){
CuratorFramework client=CuratorFrameworkFactory.newClient(server.getConnectString(),new ExponentialBackoffRetry(20000,3));
clients.add(client);
LeaderLatch leader=new LeaderLatch(client,"/francis/leader");
leader.addListener(new LeaderLatchListener(){
@Override
public void isLeader() {
// TODO Auto-generated method stub
System.out.println("I am Leader");
}
@Override
public void notLeader() {
// TODO Auto-generated method stub
System.out.println("I am not Leader");
}});
leaders.add(leader);
client.start();
leader.start();
}
Thread.sleep(Integer.MAX_VALUE);
}finally{
for(CuratorFramework client:clients){
CloseableUtils.closeQuietly(client);
}
for(LeaderLatch leader:leaders){
CloseableUtils.closeQuietly(leader);
}
CloseableUtils.closeQuietly(server);
}
Thread.sleep(Integer.MAX_VALUE);
}
}
LeaderLatch和LeaderLatchListener方法介绍
LeaderLatch提供了如下方法:
start()/close():启动/停止LeaderLatch
addListener(LeaderLatchListener)/removeListener(LeaderLatchListener):添加/移除LeaderLatchListener
hasLeadership():如果LeaderLatch是Leader,那么返回true,否则false。
getLeader():
await:等待Leaderlatch成为Leader。
LeaderLatchListener提供了如下方法:
isLeader():当LeaderLatch的hasLeaderShip()从false到true后,就会调用isLeader(),表明这个LeaderLatch成为leader了。
notLeader():当LeaderLatch的hahLeaderShip从true到false后,就会调用notLeader(),表明这个LeaderLatch不再是leader了。
LeaderLatch在Master-Slave中的应用
在一个典型的master-slave场景下。你可以在isLeader中做如下处理:
1.每一个master类都有一个state属性,初始值为standby.
2.在isLeader中,从持久话引擎中读取要恢复的数据到一个临时的内存缓存中
3.将这个master的state修改为recovering
4.通知所有worker将其内部的master修改为当前master。
5.将临时内存缓存中的数据恢复到master内部。
6.将master状态修改为alive,然后这个master就可以对外服务了。
注意第5步,由于将持久话引擎中的数据添加到了master内部的内存中,所以需要确保之多恢复一次语义。