Zookeeper详解(三)——开源客户端curator

2023-05-16

开源客户端curator

(true re de)

curator是Netflix公司开源的一个zookeeper客户端,后捐献给apache,curator框架在zookeeper原生API接口上进行了包装,解决了很多zooKeeper客户端非常底层的细节开发。提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最好用,最流行的zookeeper的客户端。

原生zookeeperAPI的不足:

  • 连接对象异步创建,需要开发人员自行编码等待
  • 连接没有自动重连超时机制
  • watcher一次注册生效一次
  • 不支持递归创建树形节点

curator特点:

  • 解决session会话超时重连
  • watcher反复注册
  • 简化开发api
  • 遵循Fluent风格的API
  • 提供了分布式锁服务、共享计数器、缓存机制等机制

连接

依赖:

		<!-- zookeeper -->
        <zookeeper.version>3.4.14</zookeeper.version>
        <!-- curator -->
        <curator.version>2.6.0</curator.version>		

		<!--添加zookeeper3.4.9版本-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <exclusions>
                <!--排除zookeeper自带的slf4j,不然有引用slf4j会jar冲突-->
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
            <version>${zookeeper.version}</version>
        </dependency>

        <!-- curator -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>${curator.version}</version>
            <type>jar</type>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>${curator.version}</version>
            <type>jar</type>
        </dependency>
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * Title:Curator连接Zookeeper
 * Description:
 * @author WZQ
 * @version 1.0.0
 * @date 2021/2/5
 */
public class CuratorConnection {

    public static void main(String[] args) {
        // session重连策略
        /* 3秒后重连一次,只重连1次
        RetryPolicy retryPolicy = new RetryOneTime(3000);
        */

        /* 每3秒重连一次,重连3次
        RetryPolicy retryPolicy = new RetryNTimes(3,3000);
        */

        /* 每3秒重连一次,总等待时间超过10秒后停止重连
        RetryPolicy retryPolicy=new RetryUntilElapsed(10000,3000);
        */
        
        // 基于参数1000计算,retryCount重连次数越多,时间间隔就越长
        // baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

        // 创建连接对象
        CuratorFramework client = CuratorFrameworkFactory.builder()
                // IP地址端口号
                .connectString("192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:21 83")
                // 会话超时时间
                .sessionTimeoutMs(5000)
                // session超时重连机制
                .retryPolicy(retryPolicy)
                // 命名空间
                .namespace("create")
                // 构建连接对象
                .build();

        // 打开连接
        client.start();
        System.out.println(client.isStarted());

        // 关闭连接
        client.close();
    }
}

新增节点

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

public class CuratorCreate {

    String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory
                .builder()
                .connectString(IP)
                .sessionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("create")
                .build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    @Test
    public void create1() throws Exception {
        // 新增节点
        client.create()
                // 节点的类型
                .withMode(CreateMode.PERSISTENT)
                // 节点的权限列表 world:anyone:cdrwa
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                // arg1:节点的路径
                // arg2:节点的数据
                .forPath("/node1", "node1".getBytes());

        System.out.println("结束");
    }

    @Test
    public void create2() throws Exception {
        // 自定义权限列表
        // 权限列表
        List<ACL> list = new ArrayList<ACL>();
        // 授权模式和授权对象
        Id id = new Id("ip", "192.168.60.130");
        list.add(new ACL(ZooDefs.Perms.ALL, id));
        client.create().withMode(CreateMode.PERSISTENT)
                .withACL(list)
                .forPath("/node2", "node2".getBytes());
        System.out.println("结束");
    }

    @Test
    public void create3() throws Exception {
        // 递归创建节点树
        client.create()
                // 递归节点的创建/node3/node31, 2个节点
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath("/node3/node31", "node31".getBytes());
        System.out.println("结束");
    }

    @Test
    public void create4() throws Exception {
        // 异步方式创建节点
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                // 异步回调接口
                .inBackground(new BackgroundCallback() {
                    @Override
                    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                        // 节点的路径
                        System.out.println(curatorEvent.getPath());
                        // 时间类型
                        System.out.println(curatorEvent.getType());
                    }
                })
                .forPath("/node4", "node4".getBytes());
        Thread.sleep(5000);
        System.out.println("结束");
    }
}

更新节点

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorSet {

    String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder()
                .connectString(IP)
                .sessionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("set")
                .build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    @Test
    public void set1() throws Exception {
        // 更新节点
        client.setData()
                // arg1:节点的路径
                // arg2:节点的数据
                .forPath("/node1", "node11".getBytes());
        System.out.println("结束");
    }

    @Test
    public void set2() throws Exception {
        client.setData()
                // 指定版本号
                .withVersion(2)
                .forPath("/node1", "node1111".getBytes());
        System.out.println("结束");
    }

    @Test
    public void set3() throws Exception {
        // 异步方式修改节点数据
        client.setData()
                // -1自增
                .withVersion(-1)
                .inBackground(new BackgroundCallback() {
                    @Override
                    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                        // 节点的路径
                        System.out.println(curatorEvent.getPath());
                        // 事件的类型
                        System.out.println(curatorEvent.getType());
                    }
                })
                .forPath("/node1", "node1".getBytes());
        Thread.sleep(5000);
        System.out.println("结束");
    }

}

删除节点

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorDelete {

    String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder().connectString(IP).sessionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("delete").build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    @Test
    public void delete1() throws Exception {
        // 删除节点
        client.delete()
                // 节点的路径
                .forPath("/node1");
        System.out.println("结束");
    }

    @Test
    public void delete2() throws Exception {
        client.delete()
                // 版本号
                .withVersion(0)
                .forPath("/node1");
        System.out.println("结束");
    }

    @Test
    public void delete3() throws Exception {
        //删除包含字节点的节点
        client.delete()
                .deletingChildrenIfNeeded()
                .withVersion(-1)
                .forPath("/node1");
        System.out.println("结束");
    }

    @Test
    public void delete4() throws Exception {
        // 异步方式删除节点
        client.delete()
                .deletingChildrenIfNeeded()
                .withVersion(-1)
                .inBackground(new BackgroundCallback() {
                    @Override
                    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                        // 节点路径
                        System.out.println(curatorEvent.getPath());
                        // 事件类型
                        System.out.println(curatorEvent.getType());
                    }
                })
                .forPath("/node1");
        Thread.sleep(5000);
        System.out.println("结束");
    }
}

查看节点

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorGet {

    String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder().connectString(IP).sessionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("get").build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    @Test
    public void get1() throws Exception {
        // 读取节点数据
        byte[] bys = client.getData()
                // 节点的路径
                .forPath("/node1");
        System.out.println(new String(bys));
    }

    @Test
    public void get2() throws Exception {
        // 读取数据时读取节点的属性
        Stat stat = new Stat();
        byte[] bys = client.getData()
                // 读取属性
                .storingStatIn(stat)
                .forPath("/node1");
        System.out.println(new String(bys));
        System.out.println(stat.getVersion());
    }

    @Test
    public void get3() throws Exception {
        // 异步方式读取节点的数据
        client.getData()
                .inBackground(new BackgroundCallback() {
                    @Override
                    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                        // 节点的路径
                        System.out.println(curatorEvent.getPath());
                        // 事件类型
                        System.out.println(curatorEvent.getType());
                        // 数据
                        System.out.println(new String(curatorEvent.getData()));
                    }
                })
                .forPath("/node1");
        Thread.sleep(5000);
        System.out.println("结束");
    }
}

查看子节点

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

public class CuratorGetChild {

    String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder().connectString(IP).sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    @Test
    public void getChild1() throws Exception {
        // 读取子节点数据
        List<String> list = client.getChildren()
                // 节点路径
                .forPath("/get");
        for (String str : list) {
            System.out.println(str);
        }
    }

    @Test
    public void getChild2() throws Exception {
        // 异步方式读取子节点数据 
        client.getChildren()
                .inBackground(new BackgroundCallback() {
                    @Override
                    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                        // 节点路径
                        System.out.println(curatorEvent.getPath());
                        // 事件类型 
                        System.out.println(curatorEvent.getType());
                        // 读取子节点数据 
                        List<String> list = curatorEvent.getChildren();
                        for (String str : list) {
                            System.out.println(str);
                        }
                    }
                })
                .forPath("/get");
        Thread.sleep(5000);
        System.out.println("结束");
    }

}

检查节点是否存在

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorExists {

    String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder().connectString(IP).sessionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("get").build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    @Test
    public void exists1() throws Exception {
        // 判断节点是否存在
        Stat stat = client.checkExists()
                // 节点路径
                .forPath("/node2");
        System.out.println(stat.getVersion());
    }

    @Test
    public void exists2() throws Exception {
        // 异步方式判断节点是否存在
        client.checkExists()
                .inBackground(new BackgroundCallback() {
                    @Override
                    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                        // 节点路径
                        System.out.println(curatorEvent.getPath());
                        // 事件类型
                        System.out.println(curatorEvent.getType());
                        System.out.println(curatorEvent.getStat().getVersion());
                    }
                })
                .forPath("/node2");
        Thread.sleep(5000);
        System.out.println("结束");
    }
}

watcher API

curator提供了两种Watcher(Cache)来监听结点的变化

  • Node Cache : 只是监听某一个特定的节点,监听节点的新增和修改
  • PathChildren Cache : 监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorWatcher {

    String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder().connectString(IP).sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    @Test
    public void watcher1() throws Exception {
        // 监视某个节点的数据变化
        // arg1:连接对象
        // arg2:监视的节点路径
        final NodeCache nodeCache = new NodeCache(client, "/watcher1");
        // 启动监视器对象
        nodeCache.start();
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            // 节点变化时回调的方法
            // 打印变化后的数据
            @Override
            public void nodeChanged() throws Exception {
                System.out.println(nodeCache.getCurrentData().getPath());
                System.out.println(new String(nodeCache.getCurrentData().getData()));
            }
        });
        Thread.sleep(100000);
        System.out.println("结束");
        //关闭监视器对象
        nodeCache.close();
    }

    @Test
    public void watcher2() throws Exception {
        // 监视子节点的变化
        // arg1:连接对象
        // arg2:监视的节点路径
        // arg3:事件中是否可以获取节点的数据
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/watcher1", true);
        // 启动监听
        pathChildrenCache.start();
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            // 当子节点方法变化时回调的方法
            // 某个子节点变化就打印
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                // 节点的事件类型
                System.out.println(pathChildrenCacheEvent.getType());
                // 节点的路径
                System.out.println(pathChildrenCacheEvent.getData().getPath());
                // 节点数据
                System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
            }

        });
        Thread.sleep(100000);
        System.out.println("结束");
        // 关闭监听
        pathChildrenCache.close();
    }

}

事务

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorTransaction {

    String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder().connectString(IP).sessionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("create").build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    //保证原子性
    @Test
    public void tra1() throws Exception {
        client.inTransaction()
                .create()
                .forPath("node1", "node1".getBytes())
                .and()
                .create()
                .forPath("node2", "node2".getBytes())
                .and()
                .commit();
    }
}

分布式锁

InterProcessMutex:分布式可重入排它锁

InterProcessReadWriteLock:分布式读写锁

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorLock {

    String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
    CuratorFramework client;

    @Before
    public void before() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder().connectString(IP).sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();
        client.start();
    }

    @After
    public void after() {
        client.close();
    }

    @Test
    public void lock1() throws Exception {
        // 排他锁
        // arg1:连接对象
        // arg2:节点路径
        InterProcessLock interProcessLock = new InterProcessMutex(client, "/lock1");
        System.out.println("等待获取锁对象!");
        // 获取锁
        interProcessLock.acquire();
        for (int i = 1; i <= 10; i++) {
            Thread.sleep(3000);
            System.out.println(i);
        }
        // 释放锁
        interProcessLock.release();
        System.out.println("等待释放锁!");
    }

    @Test
    public void lock2() throws Exception {
        // 读写锁
        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock1");
        // 获取读锁对象
        InterProcessLock interProcessLock = interProcessReadWriteLock.readLock();
        System.out.println("等待获取锁对象!");
        // 获取锁
        interProcessLock.acquire();
        for (int i = 1; i <= 10; i++) {
            Thread.sleep(3000);
            System.out.println(i);
        }
        // 释放锁
        interProcessLock.release();
        System.out.println("等待释放锁!");
    }

    @Test
    public void lock3() throws Exception {
        // 读写锁
        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock1");
        // 获取写锁对象 
        InterProcessLock interProcessLock = interProcessReadWriteLock.writeLock();
        System.out.println("等待获取锁对象!");
        // 获取锁 
        interProcessLock.acquire();
        for (int i = 1; i <= 10; i++) {
            Thread.sleep(3000);
            System.out.println(i);
        }
        // 释放锁 
        interProcessLock.release();
        System.out.println("等待释放锁!");
    }
}

四字监控命令

zooKeeper支持某些特定的四字命令与其的交互。它们大多是查询命令,用来获取 zooKeeper服务的当前状态及相关信息。用户在客户端可以通过 telnet 或 nc 向zooKeeper提交相应的命令。 zooKeeper常用四字命令见下表所示:

命令描述
conf输出相关服务配置的详细信息。比如端口、zk数据及日志配置路径、最大连接数,session超时时间、serverId等
cons列出所有连接到这台服务器的客户端连接/会话的详细信息。包括“接受/发送”的包数量、session id 、操作延迟、最后的操作执行等信息
crst重置当前这台服务器所有连接/会话的统计信息
dump列出未经处理的会话和临时节点
envi输出关于服务器的环境详细信息
ruok测试服务是否处于正确运行状态。如果正常返回"imok",否则返回空
stat输出服务器的详细信息:接收/发送包数量、连接数、模式(leader/follower)、节点总数、延迟。 所有客户端的列表
srst重置server状态
wchs列出服务器watches的简洁信息:连接总数、watching节点总数和watches总数
wchc通过session分组,列出watch的所有节点,它的输出是一个与 watch 相关的会话的节点列表
wchp通过路径分组,列出所有的watchsession id 信息
mntr列出集群的健康状态。包括“接受/发送”的包数量、操作延迟、当前服务模式(leader/follower)、节点总数、watch总数、临时节点总数

telnet 使用:

telnet 192.168.60.130 2181
#之后便可以输入命令

nc命令工具安装:

#root用户安装 
#下载安装包 
wget http://vault.centos.org/6.6/os/x86_64/Packages/nc-1.84- 22.el6.x86_64.rpm
#rpm安装
rpm -iUv nc-1.84-22.el6.x86_64.rpm

使用方式,在shell终端输入:echo mntr(具体命令) | nc localhost 2181

  1. conf命令

    conf:输出相关服务配置的详细信息

    shell终端输入:echo conf | nc localhost 2181

    属性含义
    clientPort客户端端口号
    dataDir数据快照文件目录 默认情况下100000次事务操作生成一次快照
    dataLogDir事物日志文件目录,生产环境中放在独立的磁盘上
    tickTime服务器之间或客户端与服务器之间维持心跳的时间间隔(以毫秒为单位)
    maxClientCnxns最大连接数
    minSessionTimeout最小session超时 minSessionTimeout=tickTime*2
    maxSessionTimeout最大session超时 maxSessionTimeout=tickTime*20
    serverId服务器编号
    initLimit集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数
    syncLimit集群中的follower服务器(F)与leader服务器(L)之间 请求和应答之间能容忍的最多心跳数
    electionAlg0:基于UDP的LeaderElection;1:基于UDP的FastLeaderElection;2:基于UDP和认证的FastLeaderElection;3: 基于TCP的FastLeaderElection 在3.4.10版本中,默认值为3另外三种算法已经被弃用,并且有计划在之后的版本中将它们彻底删除而不再支持
    electionPort选举端口
    quorumPort数据通信端口
    peerType是否为观察者 1为观察者
  2. cons命令

    cons:列出所有连接到这台服务器的客户端连接/会话的详细信息

    shell终端输入:echo cons | nc localhost 2181

    属性含义
    ipip地址
    port端口号
    queued等待被处理的请求数,请求缓存在队列中
    received收到的包数
    sent发送的包数
    sid会话id
    lop最后的操作 GETD-读取数据 DELE-删除数据 CREA-创建数据
    est连接时间戳
    to超时时间
    lcxid当前会话的操作id
    lzxid最大事务id
    lresp最后响应时间戳
    llat最后/最新 延时
    minlat最小延时
    maxlat最大延时
    avglat平均延时
  3. crst命令

    crst:重置当前这台服务器所有连接/会话的统计信息

    shell终端输入:echo crst| nc localhost 2181

  4. dump命令

    dump:列出未经处理的会话和临时节点

    shell终端输入:echo dump| nc localhost 2181

    属性含义
    session idznode path(1对多 , 处于队列中排队的session和临时节点)
  5. envi命令

    envi:输出关于服务器的环境配置信息

    shell终端输入:echo envi| nc localhost 2181

    属性含义
    zookeeper.version版本
    host.namehost信息
    java.versionjava版本
    java.vendor供应商
    java.home运行环境所在目录
    java.class.pathclasspath
    java.library.path第三方库指定非Java类包的为止(如:dll,so)
    java.io.tmpdir默认的临时文件路径
    java.compilerJIT编辑器的名称
    os.nameLinux
    os.archamd64
    os.version3.10.0-1062.el7.x86_64
    user.namezookeeper
    user.home/opt/zookeeper
    user.dir`/opt/zookeeper/zookeeper2181/bin
  6. ruok命令

    ruok:测试服务是否处于正确运行状态

    shell终端输入:echo ruok| nc localhost 2181

  7. stat命令

    stat:输出服务器的详细信息与srvr相似,但是多了每个连接的会话信息

    shell终端输入:echo stat| nc localhost 2181

    属性含义
    zookeeper version版本
    Latency min/avg/max延时
    Received收包
    Sent发包
    Connections当前服务器连接数
    Outstanding服务器堆积的未处理请求数
    Zxid最大事务id
    Mode服务器角色
    Node count节点数
  8. srst命令

    srst:重置server状态

    shell终端输入:echo srst| nc localhost 2181

  9. wchs命令

    wchs:列出服务器watches的简洁信息

    shell终端输入:echo wchs| nc localhost 2181

    属性含义
    connectsions连接数
    watch-pathswatch节点数
    watcherswatcher数量
  10. wchc命令

    wchc:通过session分组,列出watch的所有节点,它的输出的是一个与 watch 相关的会话的节点列表问题:

    wchc is not executed because it is not in the whitelist

    解决办法

    # 修改启动指令zkServer.sh
    # 注意找到这个信息
    else
    	echo "JMX disabled by user request" >&2
    	ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
    fi
    # 下面添加如下信息
    ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"
    

    每一个客户端的连接的watcher信息都会被收集起来,并且监控的路径都会被展示出来(代价高,消耗性能)

    [root@localhost bin]# echo wchc | nc 192.168.133.133 2180
    0x171be6c6faf0000
            /node2
            /node1
    0x171be6c6faf0001
            /node3
    

    shell终端输入:echo wchc| nc localhost 2181

  11. wchp命令

    wchp:通过路径分组,列出所有的 watch 的session id信息

    问题:

    wchp is not executed because it is not in the whitelist.

    解决方法:

    # 修改启动指令 zkServer.sh 
    # 注意找到这个信息 
    
    else
    	echo "JMX disabled by user request" >&2 
    	ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain" 
    fi
    
    # 下面添加如下信息 
    ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"
    

    shell终端输入:echo wchp| nc localhost 2181

  12. mntr命令

    列出服务器的健康状态

    shell终端输入:echo mntr| nc localhost 2181

    属性含义
    zk_version版本
    zk_avg_latency平均延时
    zk_max_latency最大延时
    zk_min_latency最小延时
    zk_packets_received收包数
    zk_packets_sent发包数
    zk_num_alive_connections连接数
    zk_outstanding_requests堆积请求数
    zk_server_stateleader/follower状态
    zk_znode_countznode数量
    zk_watch_countwatch数量
    zk_ephemerals_countl临时节点(znode)
    zk_approximate_data_size数据大小
    zk_open_file_descriptor_count打开的文件描述符数量
    zk_max_file_descriptor_count最大文件描述符数量

ZooInspector图形化工具

ZooInspector下载地址:

https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip

解压后进入目录ZooInspector\build,运行zookeeper-dev-ZooInspector.jar

#执行命令如下 
java -jar zookeeper-dev-ZooInspector.jar

在这里插入图片描述

点击左上角连接按钮,输入zk服务地址:ip或者主机名:2181

在这里插入图片描述

之后即可查看ZK节点信息

taokeeper监控工具

基于zookeeper的监控管理工具taokeeper,由淘宝团队开源的zk管理中间件,安装前要求服务前先配置nc 和 sshd

1.下载数据库脚本

wget https://github.com/downloads/alibaba/taokeeper/taokeeper.sql

2.下载主程序

wget https://github.com/downloads/alibaba/taokeeper/taokeeper-monitor.tar.gz

3.下载配置文件

wget https://github.com/downloads/alibaba/taokeeper/taokeeper-monitor-config.properties

4.配置 taokeeper-monitor-config.properties

#Daily 
systemInfo.envName=DAILY 
#DBCP 
dbcp.driverClassName=com.mysql.jdbc.Driver 
#mysql连接的ip地址端口号 dbcp.dbJDBCUrl=jdbc:mysql://192.168.60.130:3306/taokeeper dbcp.characterEncoding=GBK
#用户名
dbcp.username=root 
#密码 
dbcp.password=root 
dbcp.maxActive=30
dbcp.maxIdle=10 
dbcp.maxWait=10000 
#SystemConstant 
#用户存储内部数据的文件夹 
#创建/home/zookeeper/taokeeperdata/ZooKeeperClientThroughputStat SystemConstent.dataStoreBasePath=/home/zookeeper/taokeeperdata 
#ssh用户 
SystemConstant.userNameOfSSH=zookeeper
#ssh密码 
SystemConstant.passwordOfSSH=zookeeper 
#Optional 
SystemConstant.portOfSSH=22

5.安装配置 tomcat,修改catalina.sh

#指向配置文件所在的位置 
JAVA_OPTS=-DconfigFilePath="/home/zookeeper/taokeeper-monitor-tomcat/webapps/ROOT/conf/taokeeper-monitor-config.properties"

6.部署工程启动

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Zookeeper详解(三)——开源客户端curator 的相关文章

  • 【Python】多进程 AttributeError: Can‘t pickle local object

    Python 多进程 AttributeError Can t pickle local object 最近写了一个在电脑磁盘搜索全部文件的的一个小程序 xff0c 效果达到了 xff0c 但是效率5 6分钟 xff0c 效率是十分的不理想
  • 【QT】自定义事件 QCustomEvent

    QT 自定义事件 一 自定义事件的优势 尽管 Qt 已经提供了很多事件 xff0c 但对于更加千变万化的需求来说 xff0c 有限的事件都是不够的 例如 xff0c 我要支持一种新的设备 xff0c 这个设备提供一种崭新的交互方式 xff0
  • 【QT】Qt学习之资源文件(qrc)的添加以及使用

    QT Qt学习之资源文件 xff08 qrc xff09 的添加以及使用 前言 当Qt工程打包好发给他人使用时可能会出现一些图片不显示或者一张图片都加载不出来情况 xff0c 那么可能就是他人的电脑没有这些图片资源 xff0c 或者源程序加
  • 【VS】VS、ReSharper 设置修改代码颜色、提高代码辨识度!附VS超实用快捷!

    VS VS ReSharper 设置修改代码颜色 提高代码辨识度 xff01 附VS超实用快捷 xff01 最终效果 xff1a 色彩由自己定义 可一眼辨识出哪些是变量 常量 方法 字符串等 xff0c 非常强大 方便 xff01 1 In
  • 【QT】 Qt高级——Qt自定义标题栏

    QT Qt高级 Qt自定义标题栏 一 Qt自定义标题栏简介 QWidget及其子类窗体组件的标题栏受操作系统的控制 xff0c 即标题栏的界面风格与操作系统的主题风格相同 xff0c 工程实践中需要开发者自行定义 xff0c 达到美化应用程
  • 第四次月模拟题-201809-3

    题目分析 xff1a 本题很绕 xff0c 但是关键的部分非常简洁 xff0c 实际上就是对应的查找字符而已 xff0c 只不过复杂一点的是可能会有分级 xff0c 这样我们就要用一个树结构来储存 xff0c 首先我们先要定义一个结构题 x
  • 【QT】 QSS类的用法及基本语法介绍

    QT QSS类的用法及基本语法介绍 文章目录 QT QSS类的用法及基本语法介绍摘要1 何为Qt样式表2 样式表语法基础3 方箱模型5 创建可缩放样式6 控制大小7 处理伪状态8 使用子部件定义微观样式8 1 相对定位8 2 绝对定位 四
  • 简易数字时钟 按键可校准

    1 课程设计目的 2 课程设计内容及要求 2 1 设计任务 2 2 设计要求 3 verilog程序设计 3 1方案论证 3 2 系统结构框图 3 3设计思路与方法 3 3 1功能模块 1 按键消抖 2 时钟状态 3 时钟调整状态 3 3
  • Chrome - develop for the web

    问题描述 xff1a X Chrome develop for the web Cannot find Chrome executable at Google Chrome Application chrome exe Cannot fin
  • 树莓派4b开启vnc和无法连接解决方法

    树莓派4b开启vnc vnc开启 通过ssh连接到树莓派后运行如下命令 打开命令行 xff0c 输入 sudo raspi config xff0c 打开树莓派软件设置工具 选择 3 Interfacing Options 选择 I3 VN
  • Ubuntu安装软件时报错(报错:dpkg: 处理软件包 xxx (--configure)时出错: 依赖关系问题 - 仍未被配置)

    Ubuntu安装软件时报错 报错 xff1a dpkg 处理软件包 configure 时出错 xff1a 依赖关系问题 仍未被配置 报错信息 xff1a done update alternatives 使用 var lib mecab
  • 修改WIN11右键菜单为经典右键菜单(这一招足够)

    1 首先我们在电脑桌面主页按下 Win 43 R 键 xff0c 打开运行对话框 2 接着在对话框中输入指令 regedit 然后按下回车确认 xff0c 打开注册表界面 3 找到如下注册表路径 xff1a 计算机 HKEY LOCAL M
  • 新版IDEA maven项目不自动下jar包如何解决

    因为是学生 xff0c 可以免费试用jetbrains的产品 xff0c 就下了2020 1 1版的IntelliJ IDEA 在maven项目上 xff0c 它跟之前不同是在pom加入坐标后不能自动从中央仓库下载jar包 2019之前的版
  • 快速搭建私有pip镜像源

    1 快速体验 span class token keyword import span os span class token keyword import span sys span class token keyword import
  • 虚拟机Ubuntu18.04突然连不上网怎么解决

    本来正常使用ubuntu18 04 xff0c 突然连不上网 使用sudo apt get update无法连接到域名 采用如下方法解决 xff01 xff01 xff01 原文链接 xff1a https blog csdn net qq
  • rpm方式安装 mysql5.7.29

    一 rpm方式安装 mysql5 7 29 1 下载mysql5 7 29的rpm安装包 rpm的mysql包 安装起来简单 解压版的mysql还需要做许多配置 xff0c 稍有不慎就会出错 xff01 xff01 xff01 下载地址 x
  • 必须知道的C语言知识细节:函数形参和实参的区别

    当你选择了一种语言 xff0c 意味着你还选择了一组技术 一个社区 Joshua Bloch C语言中函数形参和实参是十分重要的概念 xff0c 初学者很容易混淆 形参 xff1a 顾名思义 xff0c 形式参数 xff0c 仅仅是声明了参
  • windows和虚拟机互传文件的三种方式

    大家好 xff0c 在平时学习工作的时候可能有这样的需求 xff1a 要将windows中的文件传到虚拟机中或者将虚拟机的文件传到windows xff0c 大家都是怎么实现的呢 xff1f 今天给大家介绍下windows和虚拟机互传文件的
  • dpkg命令详解

    用法 xff1a dpkg lt 选项 gt lt 命令 gt Commands i install lt deb file name gt R recursive unpack lt deb file name gt R recursiv
  • 结构体字节对齐之嵌套结构体

    搜狐畅游2020游戏研发笔试题目 xff1a 以下输出的结果是 xff1f xff1f xff1f span class token macro property span class token directive keyword inc

随机推荐

  • 程序设计CSP-M4-补题——T1-TT数鸭子

    T1 TT数鸭子 题目描述InputOutput解题思路实现代码总结 题目描述 给出n个数 xff0c 求有多少个数其数位中不同的数字的个数小于k Input 第一行两个数n k 第二行n个数 Output 输出满足题目要求的数字个数 解题
  • ceph 分布式 存储服务 恢复

    文章目录 一条命令执行恢复 xff08 你最好还是读读 为什么可以一条命令恢复 ceph 服务 xff09 版本信息ceph 容器服务恢复前提条件安装cephadm查看ceph 服务依赖删除多余的集群 可选 一条命令执行恢复 systemc
  • svn: E230001: Server SSL certificate verification failed: certificate issued

    svn E230001 Server SSL certificate verification failed certificate issued 字面上的大致意思是服务器的SSL证书验证失败 解决方法 xff1a 在终端执行svn ls
  • linuxQt程序打包

    linux程序打包 qt程序打包与执行 将release版本生成的移动到新建文件夹中 xff1b linux下qt打包的sh文件 例如 xff0c 生成pack sh span class token shebang important b
  • JAVA判断时间格式为 “YYYY-MM-DD“

    常用的方法如下 xff1a import java text DateFormat import java text SimpleDateFormat import java util Date public class DataTest
  • win系统修改右键新建菜单

    win系统修改右键新建菜单 在右键新建中添加自己想要的文件修改右键新建顺序修改右键新建中菜单项的名字 在右键新建中添加自己想要的文件 首先win 43 R再regedit调出注册表在HKEY CLASSES ROOT目录下找到对应后缀名 x
  • Django基础(一)

    目录 创建项目 创建一个应用 启动服务 创建项目 D pythonProject3 Django gt django admin startproject hello 执行完成命令 大概10s之后会出现一个以hello命名的文件夹 创建一个
  • 二分图多重匹配——小结

    二分图的重匹配 xff0c 说白了就说一对多的匹配 还是匈牙利算法 xff0c 一般都是给出两个集合 xff0c 然后让你对这两个集合进行匹配 xff0c 但是其中一个集合是可以多次匹配的 xff0c 但是匹配的次数是有限的 xff08 假
  • C.Garland(DP)

    题目链接 xff1a C Garland 题意 给你了一个序列 xff0c 包含n个数 xff0c 这个序列是由1 n数字构成 xff0c 但是题目给你的这个序列并不完整 xff0c 让你去补完整 xff0c 那些输入的值为0的位置的就是让
  • P1908 逆序对(离散化)

    洛谷P1908 逆序对 逆序对就不用解释了 xff0c 题上也说的很清楚 那我分别用归并排序和树状数组来解决一下这道题目 归并排序 我们都知道 xff0c 归并排序是通过把大区间一直分 xff0c 分成小区间 xff0c 然后小区间排序好了
  • Codeforces Round #618 (Div. 2)

    太菜了 xff0c 也只能补补题了 A Non zero 这道题瞎弄一下就过了 xff0c 数0的个数 xff0c 把0全变成1 xff0c 然后再判断现在和是不是0 xff0c 和是0的话就再加上1 span class token ma
  • HDU 1025最长递增子序列(二分法)

    最长递增子序列 xff08 二分 xff09 HDU1025 https www felix021 com blog read php 1587 找最长递增子序列 xff0c 以前一般用DP的方法找 xff0c 因为理解简单 xff0c 实
  • Codeforces Round #658 (Div. 2)

    比赛链接 xff1a https codeforces com contest 1382 A Common Subsequence 题意 给你两组数 xff0c 问你有没有相同 的书 xff0c 有的话 xff0c 输出最短的那组 xff0
  • mysql学习笔记之数据库

    我的mysql学习参考于github文章 数据库 xff1a 高效的存储和处理数据的介质 xff08 比如磁盘和内存 xff09 xff0c 又根据介质的不同 xff0c 分为关系数据库和非关系数据 关系数据库特点 xff1a 1 xff0
  • Python_pytorch (三)分解网络模型

    python pytorch 小土堆pytotch学习视频链接 from的是一个个的包 xff08 package import 的是一个个的py文件 file py 所使用的一般是文件中的类 class 第一步实例化所使用的类 然后调用类
  • Python_pytorch(四)网络搭建

    搭建架构 span class token keyword import span torch span class token keyword import span torchvision span class token keywor
  • Python_pytorch(五)模型训练

    反向传播 Loss Function span class token keyword import span torchvision span class token keyword from span torch span class
  • 【计蒜客】泥塑课C++

    泥塑课 描述 小米是一个幼儿园老师 xff0c 每学期的泥塑课上 xff0c 她都会给每个学生发不超过 250 立方厘米的等量橡皮泥 xff0c 教大家做泥塑 在上课过程中 xff0c 她发现每个班都恰好有一个小朋友会去抢另一个小朋友的橡皮
  • linux无法粘贴文件

    无粘贴功能的主要原因是无权限复制 xff0c 所以解决方案是 xff1a 打开终端 xff0c 输入 xff1a sudo nautilus 那么就会打开一个有管理员权限的文件夹资源器 xff0c 现在右键就有粘贴功能了
  • Zookeeper详解(三)——开源客户端curator

    开源客户端curator true re de curator是Netflix公司开源的一个zookeeper客户端 xff0c 后捐献给apache xff0c curator框架在zookeeper原生API接口上进行了包装 xff0c