JAVA并发编程基础概念知识点(三)

人生之路不会是一帆风顺的,我们会遇上顺境,也会遇上逆境,在所有成功路上折磨你的,背后都隐藏着激励你奋发向上的动机,人生没有如果,只有后果与结果,成熟,就是用微笑来面对一切小事。

导读:本篇文章讲解 JAVA并发编程基础概念知识点(三),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

10、线程安全

线程安全是多线程编程时的计算机程序代码中的一个概念。在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会通过同步机制保证各个线程都可以正常且正确的执行,不会出现数据污染等意外情况。

线程安全问题的具体表现它有三个方面:可见性、原子性和有序性

  • 可见性
    当一个线程修改了共享变量的值,其他线程能够看到修改的值,Java内存模型是通过在变量修改后将新值同步会主内存,在变量读取前从主内存刷新值,这种依赖主内存作为传递媒介的方法实现可见性
    • 可以通过以下方案解决
      • 通过 volatile 关键字保证可见性。
      • 通过 内存屏障保证可见性。
      • 通过 synchronized 关键字保证可见性。
      • 通过 Lock保证可见性。
      • 通过 final 关键字保证可见性
  • 有序性
    即程序执行的顺序按照代码的先后顺序执行,JVM存在指令重排,所以存在有序性问题,其实不止是JVM,CPU指令优化器也会重排
    • 可以通过以下方案解决
      • 通过 volatile 关键字保证有序性
      • 通过内存屏障保证有序性
      • 通过synchronized关键字保证有序性
      • 通过Lock保证有序性
  • 原子性
    一个或多个操作,要么全部执行且执行过程中不被任何因素打扰,要么全部不执行。在Java中,对基本数据类型变量的读取、赋值操作都是原子性的(64位处理器,32位处理器会分段)。不采取任何的原子性保障措施的自增操作并不是原子性的。
    • 可以通过以下方案解决
      • 通过synchronized关键字保证原子性
      • 通过Lock保证原子性
      • 通过Lock保证原子性

11、线程池

线程池相关知识点可以看我这三篇文章

线程池(一)—-综述&ThreadPoolExecutor

线程池(二)—-ThreadPoolTaskExecutor

线程池(三)—-ThreadPoolTaskExecutor的提交方法execute和submit

12、JUC的使用

JUC编程的使用(基于尚硅谷视频,持续更新中…)

1. 集合线程安全

package interview;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * 集合线程不安全的demo
 */
public class Demo01ContainerNotSafeDemo {
    public static void main(String[] args) {
        mapNotSafe();
    }

    public static void mapNotSafe() {
        Map<String, String> map = new ConcurrentHashMap<>();
        for (int i = 0; i <= 30; i++) {
            new Thread(() -> {
                map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 8));
            }, String.valueOf(i)).start();
        }
    }

    /**
     * set线程安全问题
     */
    public static void setNotSafe() {
        Set<String> set = new CopyOnWriteArraySet<>();
        for (int i = 0; i <= 30; i++) {
            new Thread(() -> {
                set.add(UUID.randomUUID().toString().substring(0, 8));
            }, String.valueOf(i)).start();
        }
    }

    /**
     * list线程安全问题
     */
    public static void listNotSafe() {
        List<String> list = new ArrayList<>();
        for (int i = 0; i <= 30; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().substring(0, 8));
            }, String.valueOf(i)).start();
        }

        /**
         * 1. 故障现象:
         *      java.util.ConcurrentModificationException
         * 
         * 2. 导致原因:
         *      并发争抢导致。例如:一个人正在写入,另外一个同学过来抢夺。
         * 
         * 3. 解决方案:
         *      3.1 Vector<String> list = new Vector<>();
         *      3.2 List<String> list = Collections.synchroizedList<>(new ArrayList<>());
         *      3.3 List<String> list = new CopyOnWriteArrayList<>();
         * 
         * 4. 写时复制
         *      CopyOnWrite容器即写时复制的容器。往一个容器里添加元素的时候,不直接往当前容器Object[]添加,而是先将
         *      当前容器Object[]进行Copy,复制出一个新的容器Object[] new Elements,然后新的容器Object[] new Elements
         *      里添加元素,添加完元素之后,再将原容器的引用指向新的容器setArray(newElements);这样做的好处是CopyOnWrite
         *      容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读
         *      和写不同的容器。
         *      public boolean add(E e) {
                    final ReentrantLock lock = this.lock;
                    lock.lock();
                    try {
                        Object[] elements = getArray();
                        int len = elements.length;
                        Object[] newElements = Arrays.copyOf(elements, len + 1);
                        newElements[len] = e;
                        setArray(newElements);
                        return true;
                    } finally {
                        lock.unlock();
                    }
                }
         * 
         * 
         */
    }
}

2. 锁机制

package interview;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * java中的锁机制
 * 1. 公平锁和非公平锁(ReentrantLock(boolean flag):默认是非公平锁, Synchronized:非公平锁)
 *      1.1 公平锁:多个线程按照申请锁的顺序来获取锁,先来后到。
 *      1.2 非公平锁:多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先。
 *                   获取锁在高并发的情况下,有可能会造成优先级反转或者饥饿现象。
 * 2. 可重入锁(又名递归锁)(ReentrantLock/Synchronized就是典型的可重入锁)
 *      2.1 指的是同一线程外层函数获得锁之后,内层递归函数仍然能获取该锁的代码,
 *          在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。
 *          也就是说,线程可以进入任何一个它已经拥有的锁锁同步着的代码块
 *      2.2 可重入锁最大的作用就是避免死锁
 * 3. 独占锁/共享锁
 *      3.1 独占锁:该锁一次只能被一个线程所持有。对ReentrantLock和Synchronized而言都是独占锁。
 *      3.2 共享锁:指该锁可被多个线程所持有。
 *          对ReentrantReadWriteLock其读锁是共享锁,其写锁是独占锁。
 *          读锁的共享锁可保证并发读是非常高效的,读写,写读,写写的过程是互斥的。
 * 4. 自旋锁
 *      4.1 指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,
 *          这样的好处是减少线程上下文的消耗,缺点是循环会消耗CPU。
 */
class Phone{
    public synchronized void sendSMS() throws Exception {
        System.out.println(Thread.currentThread().getId() + "\t invoked sendSMS()");
        sendEmail();
    }

    public synchronized void sendEmail() throws Exception {
        System.out.println(Thread.currentThread().getId() + "\t invoked sendEmail()");
    }
}

class MyCache {
    private volatile Map<String, Object> map = new HashMap<>();
    private ReentrantReadWriteLock rwlLock = new ReentrantReadWriteLock();
    
    public void put(String key, Object value) {
        rwlLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "\t 正在写入:" + key);
            try {TimeUnit.SECONDS.sleep(300);} catch (Exception e) {e.printStackTrace();}
            map.put(key, value);
            System.out.println(Thread.currentThread().getName() + "\t 写入完成");   
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            rwlLock.writeLock().unlock();
        }
    }

    public void get(String key) {
        rwlLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "\t 正在读取:");
            try {TimeUnit.SECONDS.sleep(300);} catch (Exception e) {e.printStackTrace();}
            Object result = map.get(key);
            System.out.println(Thread.currentThread().getName() + "\t 读取完成:" + result);   
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            rwlLock.readLock().unlock();
        }
    }
}

public class Demo02Lock {
    public static void main(String[] args) {
        
    }

    /**
     * 读写锁
     */
    public static void ReadWriteLock() {
        MyCache cache = new MyCache();
        
        for (int i = 0; i <= 5; i++) {
            final int tempInt = i;
            new Thread(() -> {
                cache.put(tempInt + "", tempInt + "");
            }, String.valueOf(i)).start();
        }

        for (int i = 0; i <= 5; i++) {
            final int tempInt = i;
            new Thread(() -> {
                cache.get(tempInt + "");;
            }, String.valueOf(i)).start();
        }
    }

    /**
     * 自旋锁
     */
    AtomicReference<Thread> atomicReference = new AtomicReference<>();
    public static void SpinLock() {
        Demo02Lock lock = new Demo02Lock();
        
        new Thread(() -> {
            lock.myLock();
            try {TimeUnit.SECONDS.sleep(5);} catch (Exception e) {e.printStackTrace();}
            lock.unMyLock();
        }, "AA").start();
        
        try {TimeUnit.SECONDS.sleep(1);} catch (Exception e) {e.printStackTrace();}

        new Thread(() -> {
            lock.myLock();
            lock.unMyLock();
        }, "BB").start();
    }

    public void myLock() {
        // 加锁
        Thread thread = new Thread();
        while(!atomicReference.compareAndSet(null, thread)) {
            //自旋锁
        }
    }

    public void unMyLock() {
        // 解锁
        Thread thread = new Thread();
        atomicReference.compareAndSet(thread, null);
    }

    /**
     * 可重入锁的验证
     */
    public static void Reentrant() {
        Phone phone = new Phone();
        new Thread(() -> {
            try {
                phone.sendSMS();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "t1").start();

        new Thread(() -> {
            try {
                phone.sendSMS();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "t2").start();
    }
}

3. CountDownLatch

package interview;

import java.util.concurrent.CountDownLatch;

enum CountryEnum {
    ONE(1, "齐"), TWO(2, "楚"), THREE(3, "燕"), FOUR(4, "赵"), FIVE(5, "魏"), SIX(6, "韩");
    private Integer retCode;
    private String retMessage;
    
    public Integer getRetCode() {
        return retCode;
    }
    public String getRetMessage() {
        return retMessage;
    }
    CountryEnum(Integer retCode, String retMessage) {
        this.retCode = retCode;
        this.retMessage = retMessage;
    }
    public static CountryEnum forEach_CountryEnum(int index) {
        CountryEnum[] myArray = CountryEnum.values();
        for (CountryEnum element : myArray) {
            if(index == element.getRetCode()) {
                return element;
            }
        }
        return null;
    }
}
/**
 * CountDownLatch的使用方法
 *      1. 当一些线程阻塞直到另一些线程完成一系列操作后才能被唤醒
 *      2. CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,调用线程会被阻塞。
 *         其他线程调用CountDown方法会将计数器减1(调用countDown方法的线程不会被阻塞),
 *         当计数器的值变为0时,因调用await方法被阻塞的线程会被唤醒,继续执行。
 */
public class Demo03CountDownLatch {
    public static void main(String[] args) throws Exception{
        /**
         * 案例:
         *      6国被灭完,秦国统一华夏
         */
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "\t 国,被灭");
                countDownLatch.countDown();
            }, CountryEnum.forEach_CountryEnum(i).getRetMessage()).start();
        }
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "\t 秦帝国,一统华夏");
    }

    /**
     * 案例:
     *      学生都走完之后班长才能走。
     * @throws Exception
     */
    public static void closeDoor() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "\t 上完自习,离开教室");
                countDownLatch.countDown();
            }, String.valueOf(i)).start();
        }
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "\t 班长最后关门走人");
    }
}

4. CyclicBarrier

package interview;

import java.util.concurrent.CyclicBarrier;

/**
 * CyclicBarrier的使用方法:
 *      1. 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,
 *         屏障才会打开门,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await()方法
 */
public class Demo04CyclicBarrier {
    public static void main(String[] args) {
        /**
         * 案例:
         *      集齐7颗龙珠才能召唤神龙
         */
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {System.out.println("***召唤神龙");});
        for (int i = 1; i <= 7; i++) {
            final int tempInt = i;
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "\t 收集到第:" + tempInt + "龙珠");
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, String.valueOf(i)).start();
        }
    }
}

5. Semaphore

package interview;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * Semaphore的使用方法:
 *      信号量的主要用于两个目的:
 *          1. 用于多个共享资源的互斥使用
 *          2. 用于并发线程数的控制
 */
public class Demo05Semaphore {
    public static void main(String[] args) {
        /**
         * 抢车位案例
         */
        Semaphore semaphore = new Semaphore(3); // 三个车位
        for (int i = 1; i <= 6; i++) { // 6个车
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "\t抢到车位");
                    try {TimeUnit.SECONDS.sleep(3);} catch (Exception e) {e.printStackTrace();};
                    System.out.println(Thread.currentThread().getName() + "\t停车3秒后离开车位");
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();
                }
            }, String.valueOf(i)).start();
        }
    }
}

6. BlockingQueue

package interview;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * 阻塞队列:
 *      当阻塞队列是空时,从队列中获取元素的操作将会被阻塞
 *      当阻塞队列是满时,从队列种添加元素的操作将会被阻塞
 * BlockingQueue的方法:
 * 方法类型         抛出异常        特殊值       阻塞        超时
 * 插入             add(e)         offer(e)    put(e)      offer(e, time, unit)
 * 移除             remove()       poll()      take()      poll(time, unit)
 * 检查             element()      peek()      不可用       不可用
 */
public class Demo06BlockingQueue {
    public static void main(String[] args) throws Exception{
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        /**
         * 抛出异常组:
         *      阻塞队列满时插入失败:抛出java.lang.IllegalStateException: Queue full异常
         *      阻塞队列空时移除失败:抛出java.util.NoSuchElementException异常
         * 特殊值组:
         *      插入方法:成功true,失败false
         *      移除方法:成功返回队列的元素,失败返回null
         * 阻塞组:
         *      阻塞队列满时插入:队列一直阻塞,直到put数据or响应中断退出
         *      阻塞队列空时移除:一直阻塞消费者线程直到队列可用
         * 超时组:
         *      阻塞队列满时,队列会阻塞生产者线程一定时间,超时后生产者线程会退出
         */
        blockingQueue.add("a");
        blockingQueue.add("b");
        blockingQueue.add("c");
        //blockingQueue.add("x"); // 抛出java.lang.IllegalStateException: Queue full异常

        System.out.println(blockingQueue.element()); // 元素空不空,队首元素是谁

        blockingQueue.remove();
        blockingQueue.remove();
        blockingQueue.remove();
        //blockingQueue.remove(); //抛出java.util.NoSuchElementException异常

        /**
         * 特殊值组
         */
        blockingQueue.offer("a");
        blockingQueue.offer("b");
        blockingQueue.offer("c");
        System.out.println(blockingQueue.offer("c")); // 输出false

        System.out.println(blockingQueue.peek()); // 输出a

        blockingQueue.poll();
        blockingQueue.poll();
        blockingQueue.poll();
        System.out.println(blockingQueue.poll()); // 输出null

        /**
         * 阻塞组
         */
        blockingQueue.put("a");
        blockingQueue.put("b");
        blockingQueue.put("c");
        //blockingQueue.put("d"); // 一直阻塞
        
        blockingQueue.take();
        blockingQueue.take();
        blockingQueue.take();
        //blockingQueue.take(); // 一直阻塞

        /**
         * 超时组
         */
        blockingQueue.offer("a", 2L, TimeUnit.SECONDS);
        blockingQueue.offer("b", 2L, TimeUnit.SECONDS);
        blockingQueue.offer("c", 2L, TimeUnit.SECONDS);
        blockingQueue.offer("d", 2L, TimeUnit.SECONDS); // 阻塞,两秒后自己停止,返回false
    }
}

7. SynchronousQueue

package interview;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * SynchronousQueue是一个不存储元素的BlockingQueue;
 * 每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
 */
public class Demo07SynchronousQueue {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
        
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "\t put 1");
                blockingQueue.put("1");
                System.out.println(Thread.currentThread().getName() + "\t put 2");
                blockingQueue.put("2");
                System.out.println(Thread.currentThread().getName() + "\t put 3");
                blockingQueue.put("3");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "AAA").start();

        new Thread(() -> {
            try {
                try {TimeUnit.SECONDS.sleep(5);} catch (Exception e) {e.printStackTrace();}
                System.out.println(Thread.currentThread().getName() + "\t take " + blockingQueue.take());
                try {TimeUnit.SECONDS.sleep(5);} catch (Exception e) {e.printStackTrace();}
                System.out.println(Thread.currentThread().getName() + "\t take " + blockingQueue.take());
                try {TimeUnit.SECONDS.sleep(5);} catch (Exception e) {e.printStackTrace();}
                System.out.println(Thread.currentThread().getName() + "\t take " + blockingQueue.take());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "BBB").start();
    }
}

8. 生产者与消费者

8.1 传统实现方式

package interview;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ShareData {
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() throws Exception {
        lock.lock();
        try {
            while(number != 0) {
                //等待,不能生产
                condition.await();
            }
            number++;
            System.out.println(Thread.currentThread().getName() + "\t" + number);
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void decrement() throws Exception {
        lock.lock();
        try {
            while(number == 0) {
                //等待,不能生产
                condition.await();
            }
            number--;
            System.out.println(Thread.currentThread().getName() + "\t" + number);
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

/**
 * 传统的生产者消费者的实现
 * 
 * 题目:一个初始值为零的变量,两个线程对其交替操作,一个加1一个减1,进行5轮
 */
public class Demo08ProdConsumer_Tradition {
    public static void main(String[] args) {
        ShareData shareData = new ShareData();
        new Thread(() -> {
            for (int i = 1; i <= 5; i++) {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "AA").start();

        new Thread(() -> {
            for (int i = 1; i <= 5; i++) {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "BB").start();
    }
}

8.2 多条件绑定

package interview;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ShareResource{
    private int number = 1; //A:1, B:2, C:3
    private Lock lock = new ReentrantLock();
    private Condition c1 = lock.newCondition();
    private Condition c2 = lock.newCondition();
    private Condition c3 = lock.newCondition();

    public void print5() {
        lock.lock();
        try {
            while(number != 1) {
                c1.await();
            }
            for (int i = 1; i <= 5; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + i);
            }
            number = 2;
            c2.signal();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void print10() {
        lock.lock();
        try {
            while(number != 2) {
                c2.await();
            }
            for (int i = 1; i <= 10; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + i);
            }
            number = 3;
            c3.signal();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void print15() {
        lock.lock();
        try {
            while(number != 3) {
                c3.await();
            }
            for (int i = 1; i <= 15; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + i);
            }
            number = 1;
            c1.signal();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

/**
 * 多Condition绑定
 * 
 * 题目:多线程之间按顺序调用,实现A->B->C三个线程启动,要求如下:
 * AA打印5次,BB打印10次,CC打印15次
 * 紧接着:
 * AA打印5次,BB打印10次,CC打印15次
 * ...
 * ...
 * 共尽心10轮
 */
public class Demo09SyncAndReentrantLock {
    public static void main(String[] args) {
        ShareResource shareResource = new ShareResource();
        new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                shareResource.print5();
            }
        }, "AA").start();
        new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                shareResource.print10();
            }
        }, "BB").start();
        new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                shareResource.print15();
            }
        }, "CC").start();
    }
}

8.3 阻塞队列实现

package interview;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class MyResource{
    private volatile boolean FLAG = true; // 默认开启,进行生产+消费
    private AtomicInteger atomicInteger = new AtomicInteger();

    BlockingQueue<String> blockingQueue = null;
    public MyResource(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        System.out.println(blockingQueue.getClass().getName());
    }

    // 生产方法
    public void myProd() throws Exception {
        String data = null;
        boolean retValue = false;
        while(FLAG) {
            data = atomicInteger.incrementAndGet() + "";
            retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
            if(retValue) {
                System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "成功");
            }else {
                System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "失败");
            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName() + "\t大老板叫停了,表示FLAG=false,生产动作结束");
    }

    // 消费方法
    public void myConsumer() throws Exception {
        String result = null;
        while(FLAG) {
            result = blockingQueue.poll(2L, TimeUnit.SECONDS);
            if(result == null || result.equalsIgnoreCase("")) {
                FLAG = false;
                System.out.println(Thread.currentThread().getName() + "\t超过2秒钟没有取到蛋糕,消费退出");
                return;
            }
            System.out.println(Thread.currentThread().getName() + "\t 消费队列" + result + "成功");
        }
    }

    public void stop() throws Exception {
        this.FLAG = false;
    }
}

/**
 * 阻塞队列实现生产者消费者问题
 */
public class Demo10ProdConsumer_BlockQueue {
    public static void main(String[] args) {
        MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
        
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t生产线程启动");
            try {
                myResource.myProd();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "Prod").start();

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t消费线程启动");
            try {
                myResource.myConsumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "Consumer").start();

        try {TimeUnit.SECONDS.sleep(5);} catch (Exception e) {e.printStackTrace();}
        System.out.println("5秒钟后停止生产消费");

        try {
            myResource.stop();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

9. 线程池

package interview;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 线程池的使用
 *      1. 基本概念:线程池做的工作主要是控制运行的线程的数量,处理过程种将任务放入队列,然后在线程创建后启动这些任务,
 *         如果线程数量超过了最大数量超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
 *      2. 主要特点:线程复用;控制最大并发数;管理线程。
 * 
 * 线程池7大参数:
 *      public ThreadPoolExecutor(int corePoolSize,//线程池中的常驻核心线程数
 *                                int maximumPoolSize,//线程池能够容纳同时执行的最大线程数
 *                                long keepAliveTime,//多余的空闲线程的存活时间
 *                                TimeUnit unit,//keepAliveTime的单位
 *                                BlockingQueue<Runnable> workQueue,//任务队列,被提交但尚未被执行的任务
 *                                ThreadFactory threadFactory,//生成线程池中工作线程的线程工厂,用于创建线程一般用默认的即可
 *                                RejectedExecutionHandler handler)//拒绝策略,当队列满了并且工作线程大于等于线程池的最大线程数时如何
 * 
 * 线程池底层工作原理:
 *      1. 在创建了线程池后,等待提交过来的任务请求。
 *      2. 当调用execute()方法添加一个请求任务时,线程池会做如下判断:
 *          2.1 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
 *          2.2 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务加入队列;
 *          2.3 如果这时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
 *          2.4 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
 *      3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。
 *      4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程池会判断:
 *              如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
 *              所以线程池的所有任务完成后它最终会收缩到corePoolSize的大小。
 * 
 * 线程池的拒绝策略:
 *      1. AbortPolicy(默认):直接抛出RejectedExecutionException异常组织系统正常运行。
 *      2. CallerRunsPolicy:"调用者运行"一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新的任务流量。
 *      3. DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。
 *      4. DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种方案。
 */
public class Demo12ThreadPool {
    public static void main(String[] args) {
        
    }

    /**
     * 自定义线程池
     */
    public static void initThreadPool() {
        ExecutorService threadPool = new ThreadPoolExecutor(2,
                                                            5,
                                                            1L, 
                                                            TimeUnit.SECONDS, 
                                                            new LinkedBlockingQueue<Runnable>(3),
                                                            Executors.defaultThreadFactory(), 
                                                            new ThreadPoolExecutor.AbortPolicy() );
        try {
            for (int i = 1; i <= 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            threadPool.shutdown();
        }
    }

    /**
     * 三种线程池:
     *      1. newFixedThreadPool(int nThreads){return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())}
     *      2. newSingleThreadExecutor(){return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())}
     *      3. newFixedThreadPool(){return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>())}
     */
    public static void ThreadPool() {
        ExecutorService threadPool = Executors.newFixedThreadPool(5); // 5个处理线程
        // ExecutorService threadPool = Executors.newSingleThreadExecutor(); // 1个处理线程
        // ExecutorService threadPool = Executors.newCachedThreadPool(); // N个处理线程

        // 模拟器10个用户来办理业务,每个用户就是一个来自外部的请求线程
        try {
            for (int i = 1; i <= 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            threadPool.shutdown();
        }
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/125142.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!