ZooKeeper Java Api 操作

导读:本篇文章讲解 ZooKeeper Java Api 操作,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

目录

zk 客户端与 zk 服务端的连接

zk 会话重连

zk 同步创建节点 必须带权限

zk 异步创建节点 必须带权限

zk 设置节点数据

zk 同步删除节点

zk 异步删除节点

zk 获取节点数据

zk 获取子节点

zk 判断节点是否存在

zk ACL digest 用户权限

zk ACL ip 权限


添加依赖

<dependency>
	<groupId>org.apache.zookeeper</groupId>
	<artifactId>zookeeper</artifactId>
	<version>最新版本</version>
</dependency>

zk 客户端与 zk 服务端的连接

	public static void main(String[] args) throws InterruptedException, IOException {
		/*
		 * 构造参数说明:
		 * *** 参数一:zookeeper 服务连接地址和端口,
		 * *** 参数二:会话超时时间,
		 * *** 参数三:watcher事件,如果不需要可以传null
		 */
		ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 50000, (event) -> {
			log.info("监听到 watcher 通知:{}", event.toString());
		});
		log.info("开始连接 zk 服务器");
		log.info("连接状态:{}", zooKeeper.getState());

		Thread.sleep(1000);

		log.info("连接状态:{}", zooKeeper.getState());
	}

zk 会话重连

	public static void main(String[] args) throws Exception {
		ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 50000, null);

		long sessionId = zk.getSessionId();

		String ssid = "0x" + Long.toHexString(sessionId);
		System.out.println(ssid);
		byte[] sessionPassword = zk.getSessionPasswd();

		log.warn("客户端开始连接zookeeper服务器...");
		log.warn("连接状态:{}", zk.getState());
		Thread.sleep(1000);
		log.warn("连接状态:{}", zk.getState());

		Thread.sleep(200);

		// 开始会话重连
		log.warn("开始会话重连...");

		ZooKeeper zkSession = new ZooKeeper("127.0.0.1:2181",
				50000,
				null,
				sessionId,
				sessionPassword);
		log.warn("重新连接状态zkSession:{}", zkSession.getState());
		Thread.sleep(1000);
		log.warn("重新连接状态zkSession:{}", zkSession.getState());
	}

zk 同步创建节点 必须带权限

	public static void main(String[] args) throws Exception {
		ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);

		// 使用 ZooDefs.Ids.CREATOR_ALL_ACL 一直报错Invalid ACL只能自己创建一个
		/*
		 * acl 权限
		 * 参数一:perms:1:r, 2:w, 3:c, 4:d, 5:a
		 * 参数二:权限类型
		 */
		Id id = ZooDefs.Ids.ANYONE_ID_UNSAFE;
		ACL acl = new ACL(ZooDefs.Perms.ALL, id);
		List<ACL> aclList = new ArrayList<>();
		aclList.add(acl);
		
		/*
		 * 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
		 * 参数:
		 * path:创建的路径
		 * data:存储的数据的byte[]
		 * acl:控制权限策略
		 *  ***		Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
		 *  ***		Ids.CREATOR_ALL_ACL --> auth:user:password:cdrwa
		 * createMode:节点类型, 是一个枚举
		 * 	***		PERSISTENT:持久节点
		 * 	***		PERSISTENT_SEQUENTIAL:持久顺序节点
		 * 	***		EPHEMERAL:临时节点
		 * 	***		EPHEMERAL_SEQUENTIAL:临时顺序节点
		 * 返回节点名称
		 */
		// 同步创建节点
		String nodeName = zookeeper.create("/test", "数据".getBytes(), aclList, CreateMode.PERSISTENT);
		log.info("创建节点 {} 成功", nodeName);
	}

zk 异步创建节点 必须带权限

	public static void main(String[] args) throws Exception {
		ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);

		// 使用 ZooDefs.Ids.CREATOR_ALL_ACL 一直报错Invalid ACL只能自己创建一个
		Id id = ZooDefs.Ids.ANYONE_ID_UNSAFE;
		ACL acl = new ACL(ZooDefs.Perms.ALL, id);
		List<ACL> aclList = new ArrayList<>();
		aclList.add(acl);
		// 异步创建创建节点,不支持子节点递归创建
		zookeeper.create("/test", "数据".getBytes(), aclList, CreateMode.PERSISTENT,
				new AsyncCallback.StringCallback() {
					@Override
					public void processResult(int rc, String path, Object ctx, String name) {
						log.info("创建节点成功");
						log.info("========== {}", rc);    // 0 的时候才表示操作成功
						log.info("========== {}", path);
						log.info("========== {}", ctx);
						log.info("========== {}", name);
					}
				}, "扩展数据,原样返回");

		Thread.sleep(2000L);    // 阻塞等待回调,不让程序结束,不然回调不打印
	}

zk 设置节点数据

	public static void main(String[] args) throws Exception {
		ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);

		Thread.sleep(2000L);

//		zookeeper.addAuthInfo("");	// 登录 acl 权限
		/*
		 * 设置节点数据
		 * 参数一:节点
		 * 参数二:数据
		 * 参数三:版本号,通过 命令 [ls -s 节点] 获得 dataVersion 版本号
		 */
		Stat stat = zookeeper.setData("/test", "123".getBytes(), 0);
		System.err.println(stat);
	}

zk 同步删除节点

	public static void main(String[] args) throws Exception {
		ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
		/*
		 * 同步删除节点,java api 没有 递归删除的操作
		 * 参数一:节点
		 * 参数二:版本号,通过 命令 [ls -s 节点] 获得 dataVersion 版本号
		 */
		zookeeper.delete("/test", 1);
	}

zk 异步删除节点

	public static void main(String[] args) throws Exception {
		ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
		/*
		 * 同步删除节点,java api 没有 递归删除的操作
		 * 参数一:节点
		 * 参数二:版本号,通过 命令 [ls -s 节点] 获得 dataVersion 版本号
		 * 参数三:异步回调
		 * 参数四:扩展数据,原样返回
		 */
		zookeeper.delete("/test", 0, new AsyncCallback.VoidCallback () {
			@Override
			public void processResult(int rc, String path, Object ctx) {
				log.info("========== {}", rc);	// 0 的时候才表示操作成功
				log.info("========== {}", path);
				log.info("========== {}", ctx);
			}
		}, "扩展数据,原样返回");

		Thread.sleep(2000L);
	}

zk 获取节点数据

    // 没有 watcher 事件的示例	
    public static void main(String[] args) throws Exception {
		ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
		Thread.sleep(2000L);
		log.warn("连接状态:{}", zookeeper.getState());

		Stat stat = new Stat();
		/*
		 * 参数一:节点路径
		 * 参数二:是否注册一个watch事件
		 * 参数三:获取数据后,节点存放stat的对象
		 */
		byte[] data = zookeeper.getData("/test", false, stat);
		String result = new String(data);
		log.info(result);
		Thread.sleep(2000L);
	}
    // 带 watcher 事件的示例
	private static final CountDownLatch countDown = new CountDownLatch(1);

	public static void main(String[] args) throws Exception {
		ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
		Thread.sleep(2000L);
		log.warn("连接状态:{}", zookeeper.getState());

		Stat stat = new Stat();
		/*
		 * 参数一:节点路径
		 * 参数二:是否注册一个watch事件
		 * 参数三:获取数据后,节点存放stat的对象
		 * return:返回节点数据
		 */
		byte[] data = zookeeper.getData("/test", (event) -> {
			log.info("当节点发生变化时,才会触发此事件,监听到 watcher 通知:{}", event.toString());
			countDown.countDown();
		}, stat);

		String result = new String(data);
		log.info(result);

		countDown.await();	// 不让线程结束,等 watcher 触发后,才让线程结束
		log.info("执行完成");
	}

zk 获取子节点

	public static void main(String[] args) throws Exception {
		ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
		Thread.sleep(2000L);
		log.warn("连接状态:{}", zookeeper.getState());

		/*
		 * 参数一:节点路径
		 * 参数二:是否注册一个watch事件。watch事件前面的示例都有演示过,这里就不演示了
		 * return:返回子节点的路径名称
		 */
		List<String> childrenList = zookeeper.getChildren("/test", false);
		log.info("子节点列表:{}", childrenList);
	}

zk 判断节点是否存在

	public static void main(String[] args) throws Exception {
		ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
		Thread.sleep(2000L);
		log.warn("连接状态:{}", zookeeper.getState());

		/*
		 * 参数一:节点路径
		 * 参数二:是否注册一个watch事件。watch事件前面的示例都有演示过,这里就不演示了
		 * return:返回节点的 stat 信息
		 */
		Stat stat = zookeeper.exists("/test", false);
		if (null == stat) {
			log.info("节点不存在");
		} else {
			log.info("节点存在 {}", stat);
		}
	}

zk ACL digest 用户权限

	public static void main(String[] args) throws Exception {
		ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
		Thread.sleep(2000L);
		log.warn("连接状态:{}", zookeeper.getState());

		// 创建 digest 认证机制 的用户
		Id zs = new Id("digest", DigestAuthenticationProvider.generateDigest("zhangsan:123456"));
		Id ls = new Id("digest", DigestAuthenticationProvider.generateDigest("lisi:123456"));

		// 指定 acl 权限
		List<ACL> aclList = new ArrayList<>();
		aclList.add(new ACL(ZooDefs.Perms.ALL, zs));    // 用户张三,有rwcda所有权限
		aclList.add(new ACL(ZooDefs.Perms.READ, ls));   // 用户李四,有 r 权限
		aclList.add(new ACL(ZooDefs.Perms.CREATE | ZooDefs.Perms.WRITE, ls));   // 用户李四,有 cw 权限

		// 创建节点
		String nodeName = zookeeper.create("/test", "数据".getBytes(), aclList, CreateMode.PERSISTENT);
		log.info(nodeName);

		// 操作节点
		// 设置了 digest 权限的节点,如果需要操作节点,必须通过 addAuthInfo 进行登录才能操作节点
		zookeeper.addAuthInfo("digest", "zhangsan:123456".getBytes());
		byte[] data = zookeeper.getData("/test", false, new Stat());
		log.info("数据:{}", new String(data));
	}

ZooKeeper Java Api 操作

zk ACL ip 权限

	public static void main(String[] args) throws Exception {
		ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
		Thread.sleep(2000L);
		log.warn("连接状态:{}", zookeeper.getState());

		// 创建 ip 认证机制
		Id localIp = new Id("ip", "127.0.0.1");		// 指定ip
		Id otherIp = new Id("ip", "1.10.10.10");	// 指定ip

		// 指定 acl 权限
		List<ACL> aclList = new ArrayList<>();
		aclList.add(new ACL(ZooDefs.Perms.ALL, localIp));	// 指定的ip 有所有权限
		aclList.add(new ACL(ZooDefs.Perms.READ, otherIp));	// 指定的ip 只有 r 权限

		// 创建节点
		String nodeName = zookeeper.create("/test", "数据".getBytes(), aclList, CreateMode.PERSISTENT);
		log.info(nodeName);

		// 操作节点
		// 设置了 ip 权限的节点,如果需要操作节点,必须有相应权限的ip电脑才能进行操作,其他ip都无权操作
		byte[] data = zookeeper.getData("/test", false, new Stat());
		log.info("数据:{}", new String(data));
	}

ZooKeeper Java Api 操作

完。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/72499.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!