Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

zookeeper---客户端---Curator

简介

是Netflix公司开源的一套Zookeeper客户端框架。在ZooKzookeeper原生API上做了封装,提供了一套可读性更强的Fluent风格的API框架。

引入依赖

        <dependency>
         <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
        </dependency>

创建节点

    private String connectString = "192.168.1.201:2181";

    // 会话超时时间,默认60000ms
    private int sessionTimeoutMs = 15000;
    // 创建连接超时时间,默认15000ms
    private int connectionTimeoutMs = 15000;
    private CuratorFramework client;

    @Before
    public void init() {
        // 重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
        // 启动会话
        client.start();
        System.out.println("连接创建成功");
    }

    @After
    public void close() {
        client.close();
        System.out.println("连接关闭成功");
    }

查询节点数据

    @Test
    public void getDataTest() throws Exception {
        // 查询节点数据
        byte[] data = client.getData().forPath("/o");
        System.out.println(new String(data));
    }

修改节点数据

    @Test
    public void setDataTest() throws Exception {
        // 修改节点数据
        client.setData().forPath("/o", "1-1".getBytes());
    }

删除节点

    @Test
    public void deleteTest() throws Exception {
        // 删除节点
        client.delete().forPath("/t");
        // 删除节点和其下面的所有子节点
        client.delete().deletingChildrenIfNeeded().forPath("/s");
    }

事件监听

Curator引入了Cache来实现对Zookeeper服务端事件的监听。
Curator能够自动处理反复注册的监听,从而简化原生API开发的繁琐过程。Cache监听类型:节点监听和子节点监听。

NodeCache

用于监听指定节点的变化:

  • 节点不存在时,创建这个节点,会触发监听事件
  • 节点数据变更时,会触发监听事件
  • 节点被删除时,不会触发监听事件
    @Test
    public void nodeCacheTest() throws Exception {
        //dataIsCompressed:是否对数据压缩
        final NodeCache cache = new NodeCache(client, "/o", false);
        // buildInitial:第一次启动时是否读取指定节点的数据并保存在cache中,默认false
        cache.start(true);
        cache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() {
               System.out.println("节点数据变更为:" + new String(cache.getCurrentData().getData()));
            }
        });
        Thread.sleep(Integer.MAX_VALUE);
    }

PathChildrenCache

监听指定节点的子节点变化情况

    @Test
    public void pathChildrenCacheTest() throws Exception {
        // cacheData:是否把节点数据缓存起来
        PathChildrenCache cache = new PathChildrenCache(client, "/o", false);
        cache.start(StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client,
                                   PathChildrenCacheEvent event) {
                switch (event.getType()) {
                    case CHILD_ADDED:
                        System.out.println("新增子节点:" + event.getData().getPath());
                        break;
                    case CHILD_UPDATED:
                        System.out.println("子节点" + event.getData().getPath() + "数据有更新");
                        break;
                    case CHILD_REMOVED:
                        System.out.println("删除子节点:" + event.getData().getPath());
                        break;
                    default:
                        break;
                }
            }
        });
        Thread.sleep(Integer.MAX_VALUE);
    }

Master选举

基本思路:选择一个节点,例如/o,多个客户端同时向该节点创建子节点/o/o1,最终只有一个客户端能创建成功,即成为master。
Curator对创建节点、事件监听、自动选举做了封装,我们只需要条用API即可实现Master选举。