简介
是Netflix公司开源的一套Zookeeper客户端框架。在ZooKzookeeper原生API上做了封装,提供了一套可读性更强的Fluent风格的API框架。
引入依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</dependency>
创建节点
private String connectString = "192.168.1.201:2181";
// 会话超时时间,默认60000ms
private int sessionTimeoutMs = 15000;
// 创建连接超时时间,默认15000ms
private int connectionTimeoutMs = 15000;
private CuratorFramework client;
@Before
public void init() {
// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
// 启动会话
client.start();
System.out.println("连接创建成功");
}
@After
public void close() {
client.close();
System.out.println("连接关闭成功");
}
查询节点数据
@Test
public void getDataTest() throws Exception {
// 查询节点数据
byte[] data = client.getData().forPath("/o");
System.out.println(new String(data));
}
修改节点数据
@Test
public void setDataTest() throws Exception {
// 修改节点数据
client.setData().forPath("/o", "1-1".getBytes());
}
删除节点
@Test
public void deleteTest() throws Exception {
// 删除节点
client.delete().forPath("/t");
// 删除节点和其下面的所有子节点
client.delete().deletingChildrenIfNeeded().forPath("/s");
}
事件监听
Curator引入了Cache来实现对Zookeeper服务端事件的监听。
Curator能够自动处理反复注册的监听,从而简化原生API开发的繁琐过程。Cache监听类型:节点监听和子节点监听。
NodeCache
用于监听指定节点的变化:
- 节点不存在时,创建这个节点,会触发监听事件
- 节点数据变更时,会触发监听事件
- 节点被删除时,不会触发监听事件
@Test
public void nodeCacheTest() throws Exception {
//dataIsCompressed:是否对数据压缩
final NodeCache cache = new NodeCache(client, "/o", false);
// buildInitial:第一次启动时是否读取指定节点的数据并保存在cache中,默认false
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() {
System.out.println("节点数据变更为:" + new String(cache.getCurrentData().getData()));
}
});
Thread.sleep(Integer.MAX_VALUE);
}
PathChildrenCache
监听指定节点的子节点变化情况
@Test
public void pathChildrenCacheTest() throws Exception {
// cacheData:是否把节点数据缓存起来
PathChildrenCache cache = new PathChildrenCache(client, "/o", false);
cache.start(StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("新增子节点:" + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("子节点" + event.getData().getPath() + "数据有更新");
break;
case CHILD_REMOVED:
System.out.println("删除子节点:" + event.getData().getPath());
break;
default:
break;
}
}
});
Thread.sleep(Integer.MAX_VALUE);
}
Master选举
基本思路:选择一个节点,例如/o,多个客户端同时向该节点创建子节点/o/o1,最终只有一个客户端能创建成功,即成为master。
Curator对创建节点、事件监听、自动选举做了封装,我们只需要条用API即可实现Master选举。