分布式锁(二):基于ZooKeeper Curator的分布式锁实践

本文基于ZooKeeper Curator进行分布式锁的实践

分布式锁(二):基于ZooKeeper Curator的分布式锁实践

abstract.png

搭建ZooKeeper环境

这里基于Docker搭建ZooKeeper环境

# 拉取 ZooKeeper 镜像
docker pull zookeeper:3.4

# 创建 ZooKeeper 容器
docker run -p 2181:2181 -d 
   --name ZooKeeper-Service-2 
   zookeeper:3.4

POM依赖

Curator,作为Netflix开源的ZooKeeper客户端框架,大大简化了我们操作、使用ZooKeeper的难度,并且提供了非常丰富的基于链式调用的API。故这里首先在POM中引入Curator依赖,其中我们需要在Curator依赖中排除ZooKeeper依赖,然后单独引入与服务端版本一致的ZooKeeper依赖

<dependencies>

  <!-- ZooKeeper Client: Curator -->
  <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
    <exclusions>
      <exclusion>
        <!--排除自带ZooKeeper依赖-->
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
      </exclusion>
    </exclusions>
  </dependency>

  <!-- Zookeeper依赖版本需与服务端保持一致 -->
  <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.0</version>
  </dependency>

</dependencies>

Curator基本实践

创建节点

众所周知,在ZooKeeper中节点支持两种类型:持久/临时、有序/无序。即两两组合则共计四种节点。其中临时节点会在客户端连接断开后自动被删除,而持久节点则不会;有序节点在创建过程中则会被分配一个唯一的单调递增的序号,并将序号追加在节点名称中,而无序节点则不会。下面即是一个基于Curator创建节点的示例

/**
 * ZooKeeper Curator 基本实践
 * @author Aaron Zhu
 * @date 2022-03-28
 */

public class Basic {
    public static void main(String[] args) throws Exception {
        // 创建客户端
        CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181")    // ZK Server地址信息
            .connectionTimeoutMs(15 * 1000// 连接超时时间: 15s
            .sessionTimeoutMs( 60 * 1000 )  // 会话超时时间: 60s
            // 重试策略: 重试3次, 每次间隔1s
            .retryPolicy(new RetryNTimes(31000))
            .build();
        // 启动客户端
        zkClient.start();
        System.out.println("---------------------- 系统上线 ----------------------");

        /******************************* 创建节点 *******************************/
        zkClient.create()
            .creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
            .withMode( PERSISTENT )   // 节点类型: 持久节点
            .forPath("/AaronTest/nodeA""Hello, Node A".getBytes());

        zkClient.create()
            .creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
            .withMode( PERSISTENT_SEQUENTIAL )   // 节点类型: 持久有序节点
            .forPath("/AaronTest/nodeB""Hello, Node B".getBytes());

        zkClient.create()
            .creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
            .withMode( EPHEMERAL )   // 节点类型: 临时节点
            .forPath("/AaronTest/nodeC""Hello, Node C".getBytes());

        zkClient.create()
            .creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
            .withMode( EPHEMERAL_SEQUENTIAL )   // 节点类型: 临时有序节点
            .forPath("/AaronTest/nodeD""Hello, Node D".getBytes());

        List<String> childrenNodeNameList = zkClient.getChildren()
            .forPath("/AaronTest");

        for(String childrenNodeName : childrenNodeNameList) {
            // 获取节点的状态信息、数据信息
            Stat stat = new Stat();
            byte[] bytes = zkClient.getData()
                .storingStatIn(stat)
                .forPath( "/AaronTest/" + childrenNodeName );
            String data = new String( bytes );

            System.out.println("--------------------------------");
            System.out.println("childrenNodeName: " + childrenNodeName);
            System.out.println("data: " + data);
            System.out.println("stat: " + stat);
        }

        // 关闭客户端
        zkClient.close();
        System.out.println("---------------------- 系统下线 ----------------------");
    }
}

测试结果如下所示,符合预期

分布式锁(二):基于ZooKeeper Curator的分布式锁实践

figure 1.jpeg

Watcher机制

ZooKeeper通过引入Watched机制实现发布/订阅功能,但原生的Watcher机制一旦触发一次后就会失效。如果期望一直监听,则必须每次重复注册Watcher,使用起来较为繁琐。为此Curator对其进行了优化,实现了自动注册,以便进行重复监听。具体地,Curator中提供了三种监听器:NodeCache、PathChildrenCache、TreeCache。其中,NodeCache只可监听指定路径所在节点的创建、修改、删除;PathChildrenCache只可监听指定路径下的第一级子节点的创建、修改、删除,无法监听指定路径所在节点的事件,无法监听指定路径的子节点的子节点的事件;TreeCache可监听指定路径所在节点的创建、修改、删除,可监听指定路径下的所有各级子节点的创建、修改、删除

NodeCache实践

下面即是一个基于Curator实践NodeCache的示例

/**
 * NodeCache Demo
 * @author Aaron Zhu
 * @date 2022-03-29
 */

public class NodeCacheDemo {
    public static void main(String[] args) throws Exception {
        // 创建客户端
        CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181")    // ZK Server地址信息
            .connectionTimeoutMs(15 * 1000// 连接超时时间: 15s
            .sessionTimeoutMs( 60 * 1000 )  // 会话超时时间: 60s
            // 重试策略: 重试3次, 每次间隔1s
            .retryPolicy(new RetryNTimes(31000))
            .build();
        // 启动客户端
        zkClient.start();
        System.out.println("---------------------- 系统上线 ----------------------");

        // 只可监听指定路径所在节点的创建、修改、删除
        String node = "/Aaron/Bob";
        NodeCache nodeCache = new NodeCache(zkClient, node);
        nodeCache.start();
        nodeCache.getListenable()
            .addListener( () -> {
                System.out.print("监听当前节点的事件");
                ChildData currentNode = nodeCache.getCurrentData();
                if( currentNode==null ) {
                    System.out.println(": 当前节点已被删除");
                } else {
                    getNodeInfo( currentNode );
                }
            });

        System.out.println("Test 1: 创建当前节点");
        zkClient.create()
            .creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
            .withMode( PERSISTENT )   // 节点类型: 持久节点
            .forPath( node, "Good Morning".getBytes() );

        Thread.sleep(2000);

        System.out.println("Test 2: 修改当前节点的数据");
        zkClient.setData()
            .forPath( node, "Good Night".getBytes() );
        Thread.sleep(2000);

        System.out.println("Test 3: 删除当前节点");
        zkClient.delete()
            .forPath(node);
        Thread.sleep(2000);

        // 主线程等待执行完毕
        try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
        // 关闭客户端
        zkClient.close();
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    private static void getNodeInfo(ChildData currentNode) {
        String info = ", Current Data Info: n"
            + "path: " + currentNode.getPath()
            + ", data: " + new String(currentNode.getData())
            + ", stat="+currentNode.getStat();
        System.out.println(info);
    }
}

测试结果如下所示,符合预期

分布式锁(二):基于ZooKeeper Curator的分布式锁实践

figure 2.jpeg

PathChildrenCache实践

下面即是一个基于Curator实践PathChildrenCache的示例

/**
 * PathChildrenCache Demo
 * @author Aaron Zhu
 * @date 2022-03-29
 */

public class PathChildrenCacheDemo {
    public static void main(String[] args) throws Exception {
        // 创建客户端
        CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181")    // ZK Server地址信息
            .connectionTimeoutMs(15 * 1000// 连接超时时间: 15s
            .sessionTimeoutMs( 60 * 1000 )  // 会话超时时间: 60s
            // 重试策略: 重试3次, 每次间隔1s
            .retryPolicy(new RetryNTimes(31000))
            .build();
        // 启动客户端
        zkClient.start();
        System.out.println("---------------------- 系统上线 ----------------------");

        // 只可监听指定路径下的第一级子节点的创建、修改、删除
        // 无法监听指定路径所在节点的事件
        // 无法监听指定路径的子节点的子节点的事件
        String parentPath = "/Aaron/Tony";
        PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, parentPath, true);
        pathChildrenCache.start();
        pathChildrenCache.getListenable()
            .addListener( (client, event) -> {
                PathChildrenCacheEvent.Type eventType = event.getType();
                ChildData currentNode = event.getData();
                if( PathChildrenCacheEvent.Type.CHILD_ADDED.equals( eventType ) ) {
                    System.out.println("添加 子节点, Current Data Info:");
                    getNodeInfo(currentNode);
                } else if( PathChildrenCacheEvent.Type.CHILD_UPDATED.equals( eventType ) ) {
                    System.out.println("修改 子节点, Current Data Info:");
                    getNodeInfo(currentNode);
                } else if( PathChildrenCacheEvent.Type.CHILD_REMOVED.equals( eventType ) ) {
                    System.out.println("删除 子节点, Current Data Info:");
                    getNodeInfo(currentNode);
                }
            });

        System.out.println("Test 1: 创建子节点");
        String childNode1 = parentPath + "/Lucy";
        zkClient.create()
            .creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
            .withMode( PERSISTENT_SEQUENTIAL )   // 节点类型: 持久有序节点
            .forPath( childNode1, "I'm a Dog".getBytes() );
        Thread.sleep(2000);

        System.out.println("Test 2: 创建子节点");
        String childNode2 = parentPath + "/Tony";
        zkClient.create()
            .creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
            .withMode( PERSISTENT )   // 节点类型: 持久节点
            .forPath( childNode2, "Good Morning".getBytes() );
        Thread.sleep(2000);

        System.out.println("Test 3: 修改子节点的数据");
        zkClient.setData()
            .forPath( childNode2, "Good Night".getBytes() );
        Thread.sleep(2000);

        System.out.println("Test 4: 删除子节点");
        zkClient.delete()
            .forPath(childNode2);
        Thread.sleep(2000);

        // 主线程等待执行完毕
        try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
        // 关闭客户端
        zkClient.close();
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    private static void getNodeInfo(ChildData currentNode) {
        String info = "path: " + currentNode.getPath()
            + ", data: " + new String(currentNode.getData())
            + ", stat="+currentNode.getStat();
        System.out.println(info);
    }
}

测试结果如下所示,符合预期

分布式锁(二):基于ZooKeeper Curator的分布式锁实践

figure 3.jpeg

TreeCache实践

下面即是一个基于Curator实践TreeCache的示例

/**
 * TreeCache Demo
 * @author Aaron Zhu
 * @date 2022-03-29
 */

public class TreeCacheDemo {
    public static void main(String[] args) throws Exception {
        // 创建客户端
        CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181")    // ZK Server地址信息
            .connectionTimeoutMs(15 * 1000// 连接超时时间: 15s
            .sessionTimeoutMs( 60 * 1000 )  // 会话超时时间: 60s
            // 重试策略: 重试3次, 每次间隔1s
            .retryPolicy(new RetryNTimes(31000))
            .build();
        // 启动客户端
        zkClient.start();
        System.out.println("---------------------- 系统上线 ----------------------");

        // 可监听指定路径所在节点的创建、修改、删除
        // 可监听指定路径下的所有各级子节点的创建、修改、删除
        String parentPath = "/Aaron/Luca";
        TreeCache treeCache = new TreeCache(zkClient, parentPath);
        treeCache.start();
        treeCache.getListenable()
            .addListener( (client, event) -> {
                TreeCacheEvent.Type eventType = event.getType();
                ChildData currentNode = event.getData();
                if( TreeCacheEvent.Type.NODE_ADDED.equals( eventType ) ) {
                    System.out.println("添加 节点, Current Data Info:");
                    getNodeInfo(currentNode);
                } else if( TreeCacheEvent.Type.NODE_UPDATED.equals( eventType ) ) {
                    System.out.println("修改 节点, Current Data Info:");
                    getNodeInfo(currentNode);
                } else if( TreeCacheEvent.Type.NODE_REMOVED.equals( eventType ) ) {
                    System.out.println("删除 节点, Current Data Info:");
                    getNodeInfo(currentNode);
                }
            });

        System.out.println("Test 1: 创建指定路径所在节点");
        zkClient.create()
            .creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
            .withMode( PERSISTENT )   // 节点类型: 持久节点
            .forPath( parentPath, "I'm a Dog".getBytes() );
        Thread.sleep(2000);

        System.out.println("Test 2: 修改指定路径所在节点");
        zkClient.setData()
            .forPath( parentPath, "Good Night".getBytes() );
        Thread.sleep(2000);

        System.out.println("Test 3: 创建一级子节点");
        String childNode1 = parentPath + "/Cat";
        zkClient.create()
            .withMode( PERSISTENT )   // 节点类型: 持久节点
            .forPath( childNode1, "I'm a Cat".getBytes() );
        Thread.sleep(2000);

        System.out.println("Test 4: 创建二级子节点");
        String childNode2 = childNode1 + "/David";
        zkClient.create()
            .withMode( PERSISTENT )   // 节点类型: 持久节点
            .forPath( childNode2, "My Name is David".getBytes() );
        Thread.sleep(2000);

        System.out.println("Test 5: 修改二级子节点");
        zkClient.setData()
            .forPath( childNode2, "I'm Sorry".getBytes() );
        Thread.sleep(2000);

        System.out.println("Test 6: 删除二级子节点");
        zkClient.delete()
            .forPath(childNode2);
        Thread.sleep(2000);

        System.out.println("Test 7: 删除一级子节点");
        zkClient.delete()
            .forPath(childNode1);
        Thread.sleep(2000);

        System.out.println("Test 8: 删除指定路径所在节点");
        zkClient.delete()
            .forPath(parentPath);
        Thread.sleep(2000);

        // 主线程等待执行完毕
        try{ Thread.sleep( 60*1000 ); } catch (Exception e) {}
        // 关闭客户端
        zkClient.close();
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    private static void getNodeInfo(ChildData currentNode) {
        String info = "path: " + currentNode.getPath()
            + ", data: " + new String(currentNode.getData())
            + ", stat="+currentNode.getStat();
        System.out.println(info);
    }
}

测试结果如下所示,符合预期

分布式锁(二):基于ZooKeeper Curator的分布式锁实践

figure 4.jpeg

分布式锁

Curator还进一步地提供了非常丰富的分布式锁特性,具体包括:

  • InterProcessMutex 分布式可重入互斥锁
  • InterProcessReadWriteLock 分布式可重入读写锁
  • InterProcessSemaphoreMutex 分布式不可重入互斥锁
  • InterProcessSemaphoreV2 分布式信号量

InterProcessMutex分布式可重入互斥锁

InterProcessMutex是一个分布式的可重入的互斥锁,示例代码如下所示

/**
 * InterProcessMutex Demo: 分布式可重入互斥锁
 * @author Aaron Zhu
 * @date 2022-03-31
 */

public class InterProcessMutexDemo {

    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

    private static String zkLockPath = "/Aaron/Lock1";

    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(10);

        // 创建客户端
        CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181")    // ZK Server地址信息
            .connectionTimeoutMs(15 * 1000// 连接超时时间: 15s
            .sessionTimeoutMs( 60 * 1000 )  // 会话超时时间: 60s
            // 重试策略: 重试3次, 每次间隔1s
            .retryPolicy(new RetryNTimes(31000))
            .build();
        // 启动客户端
        zkClient.start();
        System.out.println("---------------------- 系统上线 ----------------------");

        for(int i=1; i<=3; i++) {
            String taskName = "任务#"+i;
            Task task = new Task(taskName, zkClient, zkLockPath);
            threadPool.execute( task );
        }

        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
        // 关闭客户端
        zkClient.close();
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String thread = Thread.currentThread().getName();
        String log = "["+time+"] "" <"+ thread +"> " + msg;
        System.out.println(log);
    }

    private static class Task implements Runnable {
        private String taskName;

        private InterProcessMutex lock;

        public Task(String taskName, CuratorFramework zkClient, String zkLockPath) {
            this.taskName = taskName;
            this.lock = new InterProcessMutex(zkClient, zkLockPath);
        }

        @Override
        public void run() {
            try{
                lock.acquire();
                info(taskName + ": 成功获取锁 #1");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));
                methodA();
            } catch (Exception e) {
                System.out.println( taskName + ": Happen Exception: " + e.getMessage());
            } finally {
                info(taskName + ": 释放锁 #1n");
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        private void methodA() {
            try{
                lock.acquire();
                info(taskName + ": 成功获取锁 #2");

                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));
            } catch (Exception e) {
                System.out.println(taskName + ": Happen Exception: " + e.getMessage());
            } finally {
                info(taskName + ": 释放锁 #2");
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

测试结果如下所示,符合预期

分布式锁(二):基于ZooKeeper Curator的分布式锁实践

figure 5.jpeg

InterProcessReadWriteLock分布式可重入读写锁

读写互斥

InterProcessReadWriteLock是一个分布式可重入读写锁,其中读锁为共享锁、写锁为互斥锁。示例代码如下所示

/**
 * InterProcessReadWriteLock Demo: 分布式读写锁, 读锁为共享锁、写锁为互斥锁
 * @author Aaron Zhu
 * @date 2022-03-31
 */

public class InterProcessReadWriteLockDemo1 {

    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

    private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    private static String zkLockPath = "/Aaron/Lock2";

    private static CuratorFramework zkClient;

    @BeforeClass
    public static void init() {
        // 创建客户端
        zkClient = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181")    // ZK Server地址信息
            .connectionTimeoutMs(15 * 1000// 连接超时时间: 15s
            .sessionTimeoutMs( 60 * 1000 )  // 会话超时时间: 60s
            // 重试策略: 重试3次, 每次间隔1s
            .retryPolicy(new RetryNTimes(31000))
            .build();
        // 启动客户端
        zkClient.start();
        System.out.println("---------------------- 系统上线 ----------------------");
    }

    /**
     * 测试: 读锁为共享锁
     */

    @Test
    public void test1Read() {
        System.out.println("n---------------------- Test 1 : Read ----------------------");
        for(int i=1; i<=3; i++) {
            String taskName = "读任务#"+i;
            Runnable task = new ReadTask(taskName, zkClient, zkLockPath);
            threadPool.execute( task );
        }
        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
    }

    /**
     * 测试: 写锁为互斥锁
     */

    @Test
    public void test2Write() {
        System.out.println("n---------------------- Test 2 : Write ----------------------");
        for(int i=1; i<=3; i++) {
            String taskName = "写任务#"+i;
            Runnable task = new WriteTask(taskName, zkClient, zkLockPath);
            threadPool.execute( task );
        }
        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 30*1000 ); } catch (Exception e) {}
    }

    /**
     * 测试: 读写互斥
     */

    @Test
    public void test2ReadWrite() {
        System.out.println("n---------------------- Test 3 : Read Write ----------------------");
        for(int i=1; i<=8; i++) {
            Runnable task = null;
            Boolean isReadTask = RandomUtils.nextBoolean();
            if( isReadTask ) {
                task = new ReadTask( "读任务#"+i, zkClient, zkLockPath );
            } else {
                task = new WriteTask( "写任务#"+i, zkClient, zkLockPath );
            }
            threadPool.execute( task );
        }
        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
    }

    @AfterClass
    public static void close() {
        // 关闭客户端
        zkClient.close();
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String thread = Thread.currentThread().getName();
        String log = "["+time+"] "" <"+ thread +"> " + msg;
        System.out.println(log);
    }

    /**
     * 读任务
     */

    private static class ReadTask implements Runnable {
        private String taskName;

        private InterProcessMutex readLock;

        public ReadTask(String taskName, CuratorFramework zkClient, String zkLockPath) {
            this.taskName = taskName;
            InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
            this.readLock = interProcessReadWriteLock.readLock();
        }

        @Override
        public void run() {
            try{
                readLock.acquire();
                info(taskName + ": 成功获取读锁 #1");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));
            } catch (Exception e) {
                System.out.println( taskName + ": Happen Exception: " + e.getMessage());
            } finally {
                info(taskName + ": 释放读锁 #1");
                try {
                    readLock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 写任务
     */

    private static class WriteTask implements Runnable {
        private String taskName;

        private InterProcessMutex writeLock;

        public WriteTask(String taskName, CuratorFramework zkClient, String zkLockPath) {
            this.taskName = taskName;
            InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
            this.writeLock = interProcessReadWriteLock.writeLock();
        }

        @Override
        public void run() {
            try{
                writeLock.acquire();
                info(taskName + ": 成功获取写锁 #1");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));
            } catch (Exception e) {
                System.out.println( taskName + ": Happen Exception: " + e.getMessage());
            } finally {
                info(taskName + ": 释放写锁 #1n");
                try {
                    writeLock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

测试结果如下所示,符合预期

分布式锁(二):基于ZooKeeper Curator的分布式锁实践

figure 6.jpeg

分布式锁(二):基于ZooKeeper Curator的分布式锁实践

figure 7.jpeg

可重入性

由于读锁、写锁分别是基于InterProcessMutex实现的,故这二者自然也是支持可重入的。示例代码如下所示

/**
 * InterProcessReadWriteLock Demo: 分布式读写锁, 可重入性测试
 * @author Aaron Zhu
 * @date 2022-03-31
 */

public class InterProcessReadWriteLockDemo2 {

    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

    private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    private static String zkLockPath = "/Aaron/Lock3";

    private static CuratorFramework zkClient;

    @BeforeClass
    public static void init() {
        System.out.println("---------------------- 系统上线 ----------------------");

        // 创建客户端
        zkClient = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181")    // ZK Server地址信息
            .connectionTimeoutMs(15 * 1000// 连接超时时间: 15s
            .sessionTimeoutMs( 60 * 1000 )  // 会话超时时间: 60s
            // 重试策略: 重试3次, 每次间隔1s
            .retryPolicy(new RetryNTimes(31000))
            .build();
        // 启动客户端
        zkClient.start();
    }

    /**
     * 测试: 读锁具有可重入性
     */

    @Test
    public void test1Read() {
        System.out.println("n---------------------- Test 1 : Read ----------------------");
        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
        InterProcessMutex readLock = interProcessReadWriteLock.readLock();

        Runnable task = new Task("读任务", readLock, readLock);
        threadPool.execute( task );

        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 5*1000 ); } catch (Exception e) {}
    }

    /**
     * 测试: 写锁具有可重入性
     */

    @Test
    public void test2Write() {
        System.out.println("n---------------------- Test 2 : Write ----------------------");
        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
        InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();

        Runnable task = new Task("写任务", writeLock, writeLock);
        threadPool.execute( task );

        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 5*1000 ); } catch (Exception e) {}
    }

    @AfterClass
    public static void close() {
        // 关闭客户端
        zkClient.close();
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String thread = Thread.currentThread().getName();
        String log = "["+time+"] "" <"+ thread +"> " + msg;
        System.out.println(log);
    }

    /**
     * 任务
     */

    private static class Task implements Runnable {
        private String taskName;

        private InterProcessMutex firstLock;

        private InterProcessMutex secondLock;

        public Task(String taskName, InterProcessMutex firstLock, InterProcessMutex secondLock) {
            this.taskName = taskName;
            this.firstLock = firstLock;
            this.secondLock = secondLock;
        }

        @Override
        public void run() {
            try{
                firstLock.acquire();
                info(taskName + ": 成功获取锁 firstLock");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));
                methodB();
            } catch (Exception e) {
                System.out.println( taskName + ": Happen Exception: " + e.getMessage());
            } finally {
                info(taskName + ": 释放锁 firstLockn");
                try {
                    firstLock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        public void methodB() {
            try{
                secondLock.acquire();
                info(taskName + ": 成功获取锁 secondLock");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));
            } catch (Exception e) {
                System.out.println( taskName + ": Happen Exception: " + e.getMessage());
            } finally {
                info(taskName + ": 释放锁 secondLock");
                try {
                    secondLock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

测试结果如下所示,符合预期

分布式锁(二):基于ZooKeeper Curator的分布式锁实践

figure 8.jpeg

锁升级、锁降级

所谓锁升级指的是读锁升级为写锁。当一个线程先获取到读锁再去申请写锁,显然其是不支持的。理由也很简单,读锁是可以多个服务实例同时持有的。若其中一个服务实例此锁线程能够进行锁升级,成功获得写锁。显然与我们之前的所说的读写互斥相违背。因为其在获得写锁的同时,其他服务实例依然持有读锁;反之,其是支持锁降级的,即写锁降级为读锁。当一个服务实例的线程在获得写锁后,该线程依然可以获得读锁。这个时候当其释放写锁,则将只持有读锁,即完成了锁降级过程。锁降级的使用价值也很大,其一方面保证了安全,读锁在写锁释放前获取;另一方面保证了高效,因为读锁是共享的。

锁升级示例代码如下所示

/**
 * InterProcessReadWriteLock Demo: 分布式读写锁, 锁升级、锁降级测试
 * @author Aaron Zhu
 * @date 2022-03-31
 */

public class InterProcessReadWriteLockDemo3 {

    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

    private static String zkLockPath = "/Aaron/Lock4";

    private static CuratorFramework zkClient;

    /**
     * 测试: 锁升级
     */

    @Test
    public void test1Read2Write() {
        System.out.println("---------------------- 系统上线 ----------------------");
        // 创建客户端
        zkClient = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181")    // ZK Server地址信息
            .connectionTimeoutMs(15 * 1000// 连接超时时间: 15s
            .sessionTimeoutMs( 60 * 1000 )  // 会话超时时间: 60s
            // 重试策略: 重试3次, 每次间隔1s
            .retryPolicy(new RetryNTimes(31000))
            .build();
        // 启动客户端
        zkClient.start();

        System.out.println("---------------------- Test 1 : Read -> Write ----------------------n");

        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
        InterProcessMutex readLock = interProcessReadWriteLock.readLock();
        InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();

        try {
            readLock.acquire();
            info("成功获取读锁");
            // 模拟业务耗时
            Thread.sleep(RandomUtils.nextLong(100500));

            writeLock.acquire();
            info("成功获取写锁");
            // 模拟业务耗时
            Thread.sleep(RandomUtils.nextLong(100500));

            readLock.release();
            info("成功释放读锁");
            // 模拟业务耗时
            Thread.sleep(RandomUtils.nextLong(100500));

            writeLock.release();
            info("成功释放写锁");
        } catch (Exception e) {
            System.out.println("Happen Exception: " + e.getMessage());
        }

        zkClient.close();
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String thread = Thread.currentThread().getName();
        String log = "["+time+"] "" <"+ thread +"> " + msg;
        System.out.println(log);
    }
}

测试结果如下所示,在持有读锁的情况下,继续尝试获取写锁会被一直阻塞

分布式锁(二):基于ZooKeeper Curator的分布式锁实践

figure 9.jpeg

锁降级示例代码如下所示

/**
 * InterProcessReadWriteLock Demo: 分布式读写锁, 锁升级、锁降级测试
 * @author Aaron Zhu
 * @date 2022-03-31
 */

public class InterProcessReadWriteLockDemo3 {

    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

    private static String zkLockPath = "/Aaron/Lock4";

    private static CuratorFramework zkClient;

    /**
     * 测试: 锁降级
     */

    @Test
    public void test2Write2Read() {
        System.out.println("---------------------- 系统上线 ----------------------");
        // 创建客户端
        zkClient = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181")    // ZK Server地址信息
            .connectionTimeoutMs(15 * 1000// 连接超时时间: 15s
            .sessionTimeoutMs( 60 * 1000 )  // 会话超时时间: 60s
            // 重试策略: 重试3次, 每次间隔1s
            .retryPolicy(new RetryNTimes(31000))
            .build();
        // 启动客户端
        zkClient.start();

        System.out.println("---------------------- Test 2 : Write -> Read ----------------------n");
        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
        InterProcessMutex readLock = interProcessReadWriteLock.readLock();
        InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();

        try {
            writeLock.acquire();
            info("成功获取写锁");
            // 模拟业务耗时
            Thread.sleep(RandomUtils.nextLong(100500));

            readLock.acquire();
            info("成功获取读锁");
            // 模拟业务耗时
            Thread.sleep(RandomUtils.nextLong(100500));

            writeLock.release();
            info("成功释放写锁");
            // 模拟业务耗时
            Thread.sleep(RandomUtils.nextLong(100500));

            readLock.release();
            info("成功释放读锁");
        } catch (Exception e) {
            System.out.println("Happen Exception: " + e.getMessage());
        }

        zkClient.close();
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String thread = Thread.currentThread().getName();
        String log = "["+time+"] "" <"+ thread +"> " + msg;
        System.out.println(log);
    }
}

测试结果如下所示,符合预期

分布式锁(二):基于ZooKeeper Curator的分布式锁实践

figure 10.jpeg

InterProcessSemaphoreMutex分布式不可重入互斥锁

InterProcessSemaphoreMutex则是一个分布式不可重入互斥锁,示例代码如下所示

/**
 * InterProcessSemaphoreMutex Demo : 分布式不可重入互斥锁
 * @author Aaron Zhu
 * @date 2022-04-03
 */

public class InterProcessSemaphoreMutexDemo {

    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

    private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    private static String zkLockPath = "/Aaron/Lock5";

    private static CuratorFramework zkClient;

    @BeforeClass
    public static void init() {
        System.out.println("---------------------- 系统上线 ----------------------");
        // 创建客户端
        zkClient = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181")    // ZK Server地址信息
            .connectionTimeoutMs(15 * 1000// 连接超时时间: 15s
            .sessionTimeoutMs( 60 * 1000 )  // 会话超时时间: 60s
            // 重试策略: 重试3次, 每次间隔1s
            .retryPolicy(new RetryNTimes(31000))
            .build();
        // 启动客户端
        zkClient.start();
    }

    /**
     * InterProcessSemaphoreMutex 是互斥锁
     */

    @Test
    public void test1() {
        System.out.println("n---------------------- Test 1 ----------------------");
        Runnable task = () -> {
            InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(zkClient, zkLockPath);

            try{
                lock.acquire();
                info("成功获取锁 #1");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));
            } catch (Exception e) {
                System.out.println("Happen Exception: " + e.getMessage());
            } finally {
                info("释放锁 #1n");
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        for(int i=1; i<=3; i++) {
            threadPool.execute( task );
        }

        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
    }

    /**
     * InterProcessSemaphoreMutex 是不可重入锁
     */

    @Test
    public void test2() {
        System.out.println("n---------------------- Test 2 ----------------------");
        InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(zkClient, zkLockPath);

        try{
            lock.acquire();
            info("成功获取锁 #1");
            // 模拟业务耗时
            Thread.sleep(RandomUtils.nextLong(100500));

            lock.acquire();
            info("成功获取锁 #2");
            // 模拟业务耗时
            Thread.sleep(RandomUtils.nextLong(100500));

            lock.release();
            info("释放锁 #1n");

            lock.release();
            info("释放锁 #1n");
        } catch (Exception e) {
            System.out.println("Happen Exception: " + e.getMessage());
        }
    }

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String thread = Thread.currentThread().getName();
        String log = "["+time+"] "" <"+ thread +"> " + msg;
        System.out.println(log);
    }
}

测试结果如下所示,符合预期。Test 1结果证明其是一个互斥锁,而Test 2则在第二次获取锁时被阻塞,证明其不可重入

分布式锁(二):基于ZooKeeper Curator的分布式锁实践

figure 11.jpeg

InterProcessSemaphoreV2分布式信号量

InterProcessSemaphoreV2是一个的分布式信号量,示例代码如下所示

/**
 * InterProcessSemaphoreV2 Demo : 分布式信号量
 * @author Aaron Zhu
 * @date 2022-04-03
 */

public class InterProcessSemaphoreV2Demo {

    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

    private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    private static String zkLockPath = "/Aaron/Lock6";

    private static CuratorFramework zkClient;

    @Test
    public void test1() {
        System.out.println("---------------------- 系统上线 ----------------------");
        // 创建客户端
        zkClient = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181")    // ZK Server地址信息
            .connectionTimeoutMs(15 * 1000// 连接超时时间: 15s
            .sessionTimeoutMs( 60 * 1000 )  // 会话超时时间: 60s
            // 重试策略: 重试3次, 每次间隔1s
            .retryPolicy(new RetryNTimes(31000))
            .build();
        // 启动客户端
        zkClient.start();

        // 系统最大并发处理量
        int maxLimit = 2;

        IntStream.rangeClosed(1,5)
            .mapToObj( num -> new UserReq("用户#"+num, zkClient, zkLockPath, maxLimit) )
            .forEach( threadPool::execute );

        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 20*1000 ); } catch (Exception e) {}
        // 关闭客户端
        zkClient.close();
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String thread = Thread.currentThread().getName();
        String log = "["+time+"] "" <"+ thread +"> " + msg;
        System.out.println(log);
    }

    private static class UserReq implements Runnable {

        private String name;

        private InterProcessSemaphoreV2 interProcessSemaphoreV2;

        private Integer maxLimit;

        public UserReq(String name, CuratorFramework zkClient, String zkLockPath, Integer maxLimit) {
            this.name = name;
            this.maxLimit = maxLimit;
            this.interProcessSemaphoreV2 = new InterProcessSemaphoreV2(zkClient, zkLockPath, maxLimit);
        }

        @Override
        public void run() {
            try {
                // 模拟用户不定时发起请求
                Thread.sleep(RandomUtils.nextLong(5002000));
                String msg = name + ": 发起请求";
                info(msg);

                // 阻塞等待,直到获取许可
                Lease lease = interProcessSemaphoreV2.acquire();

                info(name + ": 系统开始处理请求");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextInt(520)*1000);

                // 用户请求处理完毕,释放许可
                interProcessSemaphoreV2.returnLease( lease );
                info(name + ": 系统处理完毕");
            }catch (Exception e) {
                System.out.println("Happen Exception: " + e.getMessage());
            }
        }
    }
}

测试结果如下所示,符合预期。其每次同时处理的用户请求数最大只有2个

分布式锁(二):基于ZooKeeper Curator的分布式锁实践

figure 12.jpeg


原文始发于微信公众号(青灯抽丝):分布式锁(二):基于ZooKeeper Curator的分布式锁实践

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/156673.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!