Java 多线程

Java 多线程

多线程 – 廖雪峰的官方网站 (liaoxuefeng.com) 学习笔记

创建新线程

一、从Thread派生一个自定义类,然后重写run()方法:

  1. package org.example;


  2. public class Main {

  3. public static void main(String[] args) {

  4. Thread t = new MyThread();

  5. t.start(); // 启n动新线程

  6. }

  7. }


  8. class MyThread extends Thread {

  9. @Override

  10. public void run() {

  11. System.out.println("start new thread!");

  12. }

  13. }

Java 多线程

二、创建Thread实例时,传入一个Runnable实例:

  1. package org.example;


  2. public class Main {

  3. public static void main(String[] args) {

  4. Thread t = new Thread(new MyRunnable());

  5. t.start(); // 启动新线程

  6. }

  7. }


  8. class MyRunnable implements Runnable {

  9. @Override

  10. public void run() {

  11. System.out.println("start new thread!");

  12. }

  13. }

Java 多线程

三、使用lambda

  1. public class Main {

  2. public static void main(String[] args) {

  3. Thread t = new Thread(() -> {

  4. System.out.println("start new thread!");

  5. });

  6. t.start(); // 启动新线程

  7. }

  8. }

Java 多线程

线程状态

  • New:新创建的线程,尚未执行;

  • Runnable:运行中的线程,正在执行run()方法的Java代码;

  • Blocked:运行中的线程,因为某些操作被阻塞而挂起;

  • Waiting:运行中的线程,因为某些操作在等待中;

  • Timed Waiting:运行中的线程,因为执行sleep()方法正在计时等待;

  • Terminated:线程已终止,因为run()方法执行完毕。

Waiting状态

join()方法,需要让线程按照指定的顺序执行时使用。

Thread.join()方法表示调用此方法的线程被阻塞,仅当该方法完成以后,才能继续运行。

  1. import java.util.concurrent.atomic.AtomicInteger;


  2. public class Main {

  3. public static void main(String[] args) throws InterruptedException {

  4. AtomicInteger count = new AtomicInteger();

  5. Thread t1 = new Thread(() -> {

  6. System.out.println("Thread t1 run");

  7. count.set(10);

  8. });

  9. t1.start();

  10. t1.join();//若把这一行注释掉,则打印的count可能为0

  11. System.out.println("count=" + count);

  12. }

  13. }

Interrupted状态

interrupt()方法可以中断Runnable状态的线程,或者使Waiting状态的线程抛出异常并结束运行

  1. public class Main {

  2. public static void main(String[] args) throws InterruptedException {

  3. Thread t = new MyThread();

  4. t.start();

  5. Thread.sleep(1); // 暂停1毫秒

  6. t.interrupt(); // 中断t线程

  7. t.join(); // 等待t线程结束

  8. System.out.println("end");

  9. }

  10. }


  11. class MyThread extends Thread {

  12. public void run() {

  13. int n = 0;

  14. while (!isInterrupted()) {

  15. n++;

  16. System.out.println(n + " hello!");

  17. }

  18. }

  19. }

通过修改running标志位来中断线程

  1. public class Main {

  2. public static void main(String[] args) throws InterruptedException {

  3. HelloThread t = new HelloThread();

  4. t.start();

  5. Thread.sleep(1);

  6. t.running = false; // 标志位置为false

  7. }

  8. }


  9. class HelloThread extends Thread {

  10. public volatile boolean running = true;

  11. public void run() {

  12. int n = 0;

  13. while (running) {

  14. n ++;

  15. System.out.println(n + " hello!");

  16. }

  17. System.out.println("end!");

  18. }

  19. }

volatile 关键字修饰的变量可以使线程每次访问变量时,总是获取主内存的最新值,并且在每次修改变量后,立刻回写到主内存。

在x86架构下,JVM回写主内存速度很快,加不加volatile影响不大,若是ARM架构,就会有明显的延迟。

守护线程

守护线程是为其他线程服务的线程,如定时触发的任务;

所有非守护线程都执行完毕后,虚拟机退出;

守护线程不能持有需要关闭的资源(如打开文件等)。

线程同步

synchronized()方法

  1. synchronized(Counter.lock) {

  2. // 获取锁

  3. ...

  4. }

  5. // 释放锁

  1. public class Main {

  2. public static void main(String[] args) throws Exception {

  3. Thread[] ts = new Thread[]{new Thread1(), new Thread2(), new Thread3(), new Thread4()};

  4. for (Thread t : ts) {

  5. t.start();

  6. }

  7. for (Thread t : ts) {

  8. t.join();

  9. }

  10. System.out.println(Counter.count1);

  11. System.out.println(Counter.count2);

  12. }

  13. }


  14. class Counter {

  15. public static final Object lock1 = new Object();

  16. public static final Object lock2 = new Object();

  17. public static int count1 = 0;

  18. public static int count2 = 0;

  19. }


  20. class Thread1 extends Thread {

  21. public void run() {

  22. for (int i = 0; i < 10000; i++) {

  23. synchronized (Counter.lock1) {

  24. Counter.count1 += 1;

  25. }

  26. }

  27. }

  28. }


  29. class Thread2 extends Thread {

  30. public void run() {

  31. for (int i = 0; i < 10000; i++) {

  32. synchronized (Counter.lock1) {

  33. Counter.count1 -= 1;

  34. }

  35. }

  36. }

  37. }


  38. class Thread3 extends Thread {

  39. public void run() {

  40. for (int i = 0; i < 10000; i++) {

  41. synchronized (Counter.lock2) {

  42. Counter.count2 += 1;

  43. }

  44. }

  45. }

  46. }


  47. class Thread4 extends Thread {

  48. public void run() {

  49. for (int i = 0; i < 10000; i++) {

  50. synchronized (Counter.lock2) {

  51. Counter.count2 -= 1;

  52. }

  53. }

  54. }

  55. }

Java 多线程

线程安全

synchronized方法加锁对象是this,如以下代码即为封装后的Counter类,它是一个“线程安全”的类

  1. public class Counter {

  2. private int count = 0;


  3. public void add(int n) {

  4. synchronized (this) {

  5. count += n;

  6. }

  7. }


  8. public void dec(int n) {

  9. synchronized (this) {

  10. count -= n;

  11. }

  12. }


  13. public int get() {

  14. return count;

  15. }

  16. }

wait() 与notify()

  1. import java.util.ArrayList;

  2. import java.util.LinkedList;

  3. import java.util.Queue;


  4. public class Main {

  5. public static void main(String[] args) throws InterruptedException {

  6. TaskQueue q = new TaskQueue();

  7. ArrayList<Thread> ts = new ArrayList<Thread>();

  8. for (int i = 0; i < 5; i++) {

  9. Thread t = new Thread() {

  10. public void run() {

  11. // 执行task:

  12. while (true) {

  13. try {

  14. String s = q.getTask();

  15. System.out.println("execute task: " + s);

  16. } catch (InterruptedException e) {

  17. return;

  18. }

  19. }

  20. }

  21. };

  22. t.start();

  23. ts.add(t);

  24. }

  25. Thread add = new Thread(() -> {

  26. for (int i = 0; i < 10; i++) {

  27. // 放入task:

  28. String s = "t-" + Math.random();

  29. System.out.println("add task: " + s);

  30. q.addTask(s);

  31. try {

  32. Thread.sleep(100);

  33. } catch (InterruptedException e) {

  34. throw new RuntimeException(e);

  35. }

  36. }

  37. });

  38. add.start();

  39. add.join();

  40. Thread.sleep(100);

  41. for (Thread t : ts) {

  42. t.interrupt();

  43. }

  44. }

  45. }


  46. class TaskQueue {

  47. Queue<String> queue = new LinkedList<>();


  48. public synchronized void addTask(String s) {

  49. this.queue.add(s);

  50. this.notifyAll();

  51. }


  52. public synchronized String getTask() throws InterruptedException {

  53. while (queue.isEmpty()) {

  54. this.wait();

  55. }

  56. return queue.remove();

  57. }

  58. }

getTask方法中不适用wait,会一直占用this锁,导致进入死循环,循环中添加对wait方法的执行后,线程会进入等待状态,直到notify将其唤醒,才会进行下次循环并再次判断队列是否为空。

addTask()方法,内部调用了this.notifyAll()而不是this.notify(),使用notifyAll()将唤醒所有当前正在this锁等待的线程,而notify()只会唤醒其中一个。这是因为可能有多个线程正在getTask()方法内部的wait()中等待,使用notifyAll()将一次性全部唤醒。通常来说,notifyAll()更安全

ReentrantLock

使用synchronized

  1. public class Counter {

  2. private int count;


  3. public void add(int n) {

  4. synchronized(this) {

  5. count += n;

  6. }

  7. }

  8. }

使用ReentrantLock

  1. public class Counter {

  2. private final Lock lock = new ReentrantLock();

  3. private int count;


  4. public void add(int n) {

  5. lock.lock();

  6. try {

  7. count += n;

  8. } finally {

  9. lock.unlock();

  10. }

  11. }

  12. }

使用ReentrantLock尝试获取锁

  1. if (lock.tryLock(1, TimeUnit.SECONDS)) {

  2. try {

  3. ...

  4. } finally {

  5. lock.unlock();

  6. }

  7. }

使用ReentrantLock可以避免因为使用synchronized时抛出异常导致的死锁。

Condition

使用ReentrantLock时,waitnotify方法应使用Condition对象实现

  1. class TaskQueue {

  2. private final Lock lock = new ReentrantLock();

  3. private final Condition condition = lock.newCondition();

  4. private Queue<String> queue = new LinkedList<>();


  5. public void addTask(String s) {

  6. lock.lock();

  7. try {

  8. queue.add(s);

  9. condition.signalAll();

  10. } finally {

  11. lock.unlock();

  12. }

  13. }


  14. public String getTask() {

  15. lock.lock();

  16. try {

  17. while (queue.isEmpty()) {

  18. condition.await();

  19. }

  20. return queue.remove();

  21. } finally {

  22. lock.unlock();

  23. }

  24. }

  25. }

ReadWriteLock

使用ReadWriteLock可以实现同一时间只允许一个线程写入,而在没有写入时,允许多个线程同时读,从而提高性能。

  1. ublic class Counter {

  2. private final ReadWriteLock rwlock = new ReentrantReadWriteLock();

  3. private final Lock rlock = rwlock.readLock();

  4. private final Lock wlock = rwlock.writeLock();

  5. private int[] counts = new int[10];


  6. public void inc(int index) {

  7. wlock.lock(); // 加写锁

  8. try {

  9. counts[index] += 1;

  10. } finally {

  11. wlock.unlock(); // 释放写锁

  12. }

  13. }


  14. public int[] get() {

  15. rlock.lock(); // 加读锁

  16. try {

  17. return Arrays.copyOf(counts, counts.length);

  18. } finally {

  19. rlock.unlock(); // 释放读锁

  20. }

  21. }

  22. }

StampedLock(乐观锁)

乐观锁会乐观地估计读的过程中大概率不会有写入,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。

  1. import java.util.concurrent.locks.StampedLock;


  2. public class Point {

  3. private final StampedLock stampedLock = new StampedLock();


  4. private double x;

  5. private double y;


  6. public void move(double deltaX, double deltaY) {

  7. long stamp = stampedLock.writeLock(); // 获取写锁

  8. try {

  9. x += deltaX;

  10. y += deltaY;

  11. } finally {

  12. stampedLock.unlockWrite(stamp); // 释放写锁

  13. }

  14. }


  15. public double distanceFromOrigin() {

  16. long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁

  17. // 注意下面两行代码不是原子操作

  18. // 假设x,y = (100,200)

  19. double currentX = x;

  20. // 此处已读取到x=100,但x,y可能被写线程修改为(300,400)

  21. double currentY = y;

  22. // 此处已读取到y,如果没有写入,读取是正确的(100,200)

  23. // 如果有写入,读取是错误的(100,400)

  24. if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生

  25. stamp = stampedLock.readLock(); // 获取一个悲观读锁

  26. try {

  27. currentX = x;

  28. currentY = y;

  29. } finally {

  30. stampedLock.unlockRead(stamp); // 释放悲观读锁

  31. }

  32. }

  33. return Math.sqrt(currentX * currentX + currentY * currentY);

  34. }

  35. }

Concurrent

接口 非线程安全类 线程安全类
List ArrayList CopyOnWriteArrayList
Map HashMap ConcurrentHashMap
Set HashSet /TreeSet CopyOnWriteArraySet
Queue ArrayDeque /LinkedList ArrayBlockingQueue /LinkedBlockingQueue
Deque ArrayDeque /LinkedList LinkedBlockingDeque

示例

  1. Map<String, String> map = new ConcurrentHashMap<>();

  2. // 在不同的线程读写:

  3. map.put("A", "1");

  4. map.put("B", "2");

  5. map.get("A", "1");

Atomic

AtomicInteger为例,它提供的主要操作有:

  • 增加值并返回新值:intaddAndGet(intdelta)

  • 加1后返回新值:intincrementAndGet()

  • 获取当前值:intget()

  • 用CAS方式设置:intcompareAndSet(intexpect,intupdate)

例如 使用AtomicLong编写一个多线程安全的全局唯一ID生成器:

  1. class IdGenerator {

  2. AtomicLong var = new AtomicLong(0);


  3. public long getNextId() {

  4. return var.incrementAndGet();

  5. }

  6. }

ExecutorService(线程池)

  • FixedThreadPool:线程数固定的线程池。

  • CachedThreadPool:线程数根据任务动态调整的线程池。

  • SingleThreadExecutor:仅单线程执行的线程池。

  • ScheduledThreadPool:可以放入需要定期反复执行的线程池。

  1. import java.util.concurrent.*;


  2. public class Main {

  3. public static void main(String[] args) {

  4. ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);

  5. // 1秒后执行一次性任务:

  6. ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);

  7. // 2秒后开始执行定时任务,每3秒执行:

  8. ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);

  9. //2秒后开始执行定时任务,以3秒为间隔执行:

  10. ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);


  11. // 创建一个固定大小的线程池:

  12. ExecutorService es = Executors.newFixedThreadPool(4);

  13. for (int i = 0; i < 6; i++) {

  14. es.submit(new Task("" + i));

  15. }

  16. // 关闭线程池:

  17. es.shutdown();

  18. }

  19. }


  20. class Task implements Runnable {

  21. private final String name;


  22. public Task(String name) {

  23. this.name = name;

  24. }


  25. @Override

  26. public void run() {

  27. System.out.println("start task " + name);

  28. try {

  29. Thread.sleep(1000);

  30. } catch (InterruptedException e) {

  31. }

  32. System.out.println("end task " + name);

  33. }

  34. }

Future

Runnable接口没有返回值,Callable带有返回值。

  1. import java.math.BigDecimal;

  2. import java.math.RoundingMode;

  3. import java.util.concurrent.Callable;

  4. import java.util.concurrent.ExecutorService;

  5. import java.util.concurrent.Executors;

  6. import java.util.concurrent.Future;


  7. public class Main {

  8. public static void main(String[] args) throws Exception {

  9. ExecutorService es = Executors.newFixedThreadPool(4);

  10. Future<String> future = es.submit(new Task("123"));

  11. System.out.println(future.get());

  12. es.shutdown();

  13. }

  14. }


  15. class Task implements Callable<String> {

  16. String name;


  17. public Task(String code) {

  18. this.name = code;

  19. }


  20. @Override

  21. public String call() throws Exception {

  22. Thread.sleep(1000);

  23. return "hello " + this.name;

  24. }

  25. }

一个Future<V>接口表示一个未来可能会返回的结果,它定义的方法有:

  • get():获取结果(任务完成时返回结果,未完成时阻塞,直到完成时才返回结果)

  • get(longtimeout,TimeUnitunit):获取结果,但只等待指定的时间;

  • cancel(booleanmayInterruptIfRunning):取消当前任务;

  • isDone():判断任务是否已完成。

CompletableFuture

CompletableFuture的优点是:

  • 异步任务结束时,会自动回调某个对象的方法;

  • 异步任务出错时,会自动回调某个对象的方法;

  • 主线程设置好回调后,不再关心异步任务的执行。

  1. import java.util.concurrent.CompletableFuture;


  2. public class Main {

  3. public static void main(String[] args) throws Exception {

  4. // 两个CompletableFuture执行异步查询:

  5. CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {

  6. return queryCode("中国石油", "https://finance.sina.com.cn/code/");

  7. });

  8. CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {

  9. return queryCode("中国石油", "https://money.163.com/code/");

  10. });

  11. // 用anyOf合并为一个新的CompletableFuture:

  12. CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);

  13. // 两个CompletableFuture执行异步查询:

  14. CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {

  15. return fetchPrice((String) code, "https://finance.sina.com.cn/price/");

  16. });

  17. CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {

  18. return fetchPrice((String) code, "https://money.163.com/price/");

  19. });

  20. // 用anyOf合并为一个新的CompletableFuture:

  21. CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);

  22. // 最终结果:

  23. cfFetch.thenAccept((result) -> {

  24. System.out.println("price: " + result);

  25. });

  26. // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:

  27. Thread.sleep(200);

  28. }


  29. static String queryCode(String name, String url) {

  30. System.out.println("query code from " + url + "...");

  31. try {

  32. Thread.sleep((long) (Math.random() * 100));

  33. } catch (InterruptedException e) {

  34. }

  35. return "601857";

  36. }


  37. static Double fetchPrice(String code, String url) {

  38. System.out.println("query price from " + url + "...");

  39. try {

  40. Thread.sleep((long) (Math.random() * 100));

  41. } catch (InterruptedException e) {

  42. }

  43. return 5 + Math.random() * 20;

  44. }

  45. }

anyOf()表示“任意个CompletableFuture只要一个成功”

allOf()表示“所有CompletableFuture都必须成功”

ForkJoin

一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。

写一个不严谨的例子

  1. import java.util.Random;

  2. import java.util.concurrent.ForkJoinPool;

  3. import java.util.concurrent.ForkJoinTask;

  4. import java.util.concurrent.RecursiveTask;


  5. public class Main {

  6. public static void main(String[] args) throws Exception {

  7. // 创建900000000个随机数组成的数组:

  8. long[] array = new long[100000000];

  9. long expectedSum = 0;

  10. for (int i = 0; i < array.length; i++) {

  11. array[i] = random();

  12. expectedSum += array[i];

  13. }

  14. System.out.println("Expected sum: " + expectedSum);

  15. // fork/join:

  16. ForkJoinTask<Long> task = new SumTask(array, 0, array.length);

  17. long startTime = System.currentTimeMillis();

  18. Long result = ForkJoinPool.commonPool().invoke(task);

  19. long endTime = System.currentTimeMillis();

  20. System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");

  21. long res = 0;

  22. startTime = System.currentTimeMillis();

  23. for (int i = 0; i < array.length; i++) {

  24. res += array[i];

  25. }

  26. endTime = System.currentTimeMillis();

  27. System.out.println("Common sum: " + res + " in " + (endTime - startTime) + " ms.");

  28. }


  29. static Random random = new Random(0);


  30. static long random() {

  31. return random.nextInt(10000);

  32. }

  33. }


  34. class SumTask extends RecursiveTask<Long> {

  35. static final int THRESHOLD = 500;

  36. long[] array;

  37. int start;

  38. int end;


  39. SumTask(long[] array, int start, int end) {

  40. this.array = array;

  41. this.start = start;

  42. this.end = end;

  43. }


  44. @Override

  45. protected Long compute() {

  46. if (end - start <= THRESHOLD) {

  47. // 如果任务足够小,直接计算:

  48. long sum = 0;

  49. for (int i = start; i < end; i++) {

  50. sum += this.array[i];

  51. // 故意放慢计算速度:

  52. // try {

  53. // Thread.sleep(1);

  54. // } catch (InterruptedException e) {

  55. // }

  56. }

  57. return sum;

  58. }

  59. // 任务太大,一分为二:

  60. int middle = (end + start) / 2;

  61. //System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));

  62. SumTask subtask1 = new SumTask(this.array, start, middle);

  63. SumTask subtask2 = new SumTask(this.array, middle, end);

  64. invokeAll(subtask1, subtask2);

  65. Long subresult1 = subtask1.join();

  66. Long subresult2 = subtask2.join();

  67. Long result = subresult1 + subresult2;

  68. //System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);

  69. return result;

  70. }

  71. }

Java 多线程

ThreadLocal(线程本地变量)

其实就是为了避免冲突在每个线程中都为变量创建了一个副本

使用ThreadLocal要用try...finally结构,并在finally中清除。

举例如下:

  1. import java.util.stream.IntStream;


  2. public class Main {

  3. public static void main(String[] args) {

  4. ThreadLocal<String> local = new ThreadLocal<>();

  5. IntStream.range(0, 10).forEach(i -> new Thread(() -> {

  6. try {

  7. local.set(Thread.currentThread().getName() + ":" + i);

  8. System.out.println("线程:" + Thread.currentThread().getName() + ",local:" + local.get());

  9. } finally {

  10. local.remove();

  11. }

  12. }).start());

  13. }

  14. }


原文始发于微信公众号(布沃布图):Java 多线程

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/24966.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!