Java 多线程
多线程 – 廖雪峰的官方网站 (liaoxuefeng.com) 学习笔记
创建新线程
一、从Thread
派生一个自定义类,然后重写run()
方法:
package org.example;
public class Main {
public static void main(String[] args) {
Thread t = new MyThread();
t.start(); // 启n动新线程
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println("start new thread!");
}
}
二、创建Thread
实例时,传入一个Runnable
实例:
package org.example;
public class Main {
public static void main(String[] args) {
Thread t = new Thread(new MyRunnable());
t.start(); // 启动新线程
}
}
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("start new thread!");
}
}
三、使用lambda
public class Main {
public static void main(String[] args) {
Thread t = new Thread(() -> {
System.out.println("start new thread!");
});
t.start(); // 启动新线程
}
}
线程状态
New:新创建的线程,尚未执行;
Runnable:运行中的线程,正在执行
run()
方法的Java代码;Blocked:运行中的线程,因为某些操作被阻塞而挂起;
Waiting:运行中的线程,因为某些操作在等待中;
Timed Waiting:运行中的线程,因为执行
sleep()
方法正在计时等待;Terminated:线程已终止,因为
run()
方法执行完毕。
Waiting状态
join()
方法,需要让线程按照指定的顺序执行时使用。
Thread.join()
方法表示调用此方法的线程被阻塞,仅当该方法完成以后,才能继续运行。
import java.util.concurrent.atomic.AtomicInteger;
public class Main {
public static void main(String[] args) throws InterruptedException {
AtomicInteger count = new AtomicInteger();
Thread t1 = new Thread(() -> {
System.out.println("Thread t1 run");
count.set(10);
});
t1.start();
t1.join();//若把这一行注释掉,则打印的count可能为0
System.out.println("count=" + count);
}
}
Interrupted状态
interrupt()
方法可以中断Runnable
状态的线程,或者使Waiting
状态的线程抛出异常并结束运行
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread t = new MyThread();
t.start();
Thread.sleep(1); // 暂停1毫秒
t.interrupt(); // 中断t线程
t.join(); // 等待t线程结束
System.out.println("end");
}
}
class MyThread extends Thread {
public void run() {
int n = 0;
while (!isInterrupted()) {
n++;
System.out.println(n + " hello!");
}
}
}
通过修改running
标志位来中断线程
public class Main {
public static void main(String[] args) throws InterruptedException {
HelloThread t = new HelloThread();
t.start();
Thread.sleep(1);
t.running = false; // 标志位置为false
}
}
class HelloThread extends Thread {
public volatile boolean running = true;
public void run() {
int n = 0;
while (running) {
n ++;
System.out.println(n + " hello!");
}
System.out.println("end!");
}
}
volatile
关键字修饰的变量可以使线程每次访问变量时,总是获取主内存的最新值,并且在每次修改变量后,立刻回写到主内存。
在x86架构下,JVM回写主内存速度很快,加不加volatile
影响不大,若是ARM架构,就会有明显的延迟。
守护线程
守护线程是为其他线程服务的线程,如定时触发的任务;
所有非守护线程都执行完毕后,虚拟机退出;
守护线程不能持有需要关闭的资源(如打开文件等)。
线程同步
synchronized()
方法
synchronized(Counter.lock) {
// 获取锁
...
}
// 释放锁
public class Main {
public static void main(String[] args) throws Exception {
Thread[] ts = new Thread[]{new Thread1(), new Thread2(), new Thread3(), new Thread4()};
for (Thread t : ts) {
t.start();
}
for (Thread t : ts) {
t.join();
}
System.out.println(Counter.count1);
System.out.println(Counter.count2);
}
}
class Counter {
public static final Object lock1 = new Object();
public static final Object lock2 = new Object();
public static int count1 = 0;
public static int count2 = 0;
}
class Thread1 extends Thread {
public void run() {
for (int i = 0; i < 10000; i++) {
synchronized (Counter.lock1) {
Counter.count1 += 1;
}
}
}
}
class Thread2 extends Thread {
public void run() {
for (int i = 0; i < 10000; i++) {
synchronized (Counter.lock1) {
Counter.count1 -= 1;
}
}
}
}
class Thread3 extends Thread {
public void run() {
for (int i = 0; i < 10000; i++) {
synchronized (Counter.lock2) {
Counter.count2 += 1;
}
}
}
}
class Thread4 extends Thread {
public void run() {
for (int i = 0; i < 10000; i++) {
synchronized (Counter.lock2) {
Counter.count2 -= 1;
}
}
}
}
线程安全
synchronized
方法加锁对象是this
,如以下代码即为封装后的Counter
类,它是一个“线程安全”的类
public class Counter {
private int count = 0;
public void add(int n) {
synchronized (this) {
count += n;
}
}
public void dec(int n) {
synchronized (this) {
count -= n;
}
}
public int get() {
return count;
}
}
wait()
与notify()
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Queue;
public class Main {
public static void main(String[] args) throws InterruptedException {
TaskQueue q = new TaskQueue();
ArrayList<Thread> ts = new ArrayList<Thread>();
for (int i = 0; i < 5; i++) {
Thread t = new Thread() {
public void run() {
// 执行task:
while (true) {
try {
String s = q.getTask();
System.out.println("execute task: " + s);
} catch (InterruptedException e) {
return;
}
}
}
};
t.start();
ts.add(t);
}
Thread add = new Thread(() -> {
for (int i = 0; i < 10; i++) {
// 放入task:
String s = "t-" + Math.random();
System.out.println("add task: " + s);
q.addTask(s);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
add.start();
add.join();
Thread.sleep(100);
for (Thread t : ts) {
t.interrupt();
}
}
}
class TaskQueue {
Queue<String> queue = new LinkedList<>();
public synchronized void addTask(String s) {
this.queue.add(s);
this.notifyAll();
}
public synchronized String getTask() throws InterruptedException {
while (queue.isEmpty()) {
this.wait();
}
return queue.remove();
}
}
若getTask
方法中不适用wait
,会一直占用this
锁,导致进入死循环,循环中添加对wait
方法的执行后,线程会进入等待状态,直到notify
将其唤醒,才会进行下次循环并再次判断队列是否为空。
addTask()
方法,内部调用了this.notifyAll()
而不是this.notify()
,使用notifyAll()
将唤醒所有当前正在this
锁等待的线程,而notify()
只会唤醒其中一个。这是因为可能有多个线程正在getTask()
方法内部的wait()
中等待,使用notifyAll()
将一次性全部唤醒。通常来说,notifyAll()
更安全
ReentrantLock
使用synchronized
public class Counter {
private int count;
public void add(int n) {
synchronized(this) {
count += n;
}
}
}
使用ReentrantLock
public class Counter {
private final Lock lock = new ReentrantLock();
private int count;
public void add(int n) {
lock.lock();
try {
count += n;
} finally {
lock.unlock();
}
}
}
使用ReentrantLock
尝试获取锁
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
...
} finally {
lock.unlock();
}
}
使用ReentrantLock
可以避免因为使用synchronized
时抛出异常导致的死锁。
Condition
使用ReentrantLock
时,wait
与notify
方法应使用Condition
对象实现
class TaskQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Queue<String> queue = new LinkedList<>();
public void addTask(String s) {
lock.lock();
try {
queue.add(s);
condition.signalAll();
} finally {
lock.unlock();
}
}
public String getTask() {
lock.lock();
try {
while (queue.isEmpty()) {
condition.await();
}
return queue.remove();
} finally {
lock.unlock();
}
}
}
ReadWriteLock
使用ReadWriteLock
可以实现同一时间只允许一个线程写入,而在没有写入时,允许多个线程同时读,从而提高性能。
ublic class Counter {
private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
private final Lock rlock = rwlock.readLock();
private final Lock wlock = rwlock.writeLock();
private int[] counts = new int[10];
public void inc(int index) {
wlock.lock(); // 加写锁
try {
counts[index] += 1;
} finally {
wlock.unlock(); // 释放写锁
}
}
public int[] get() {
rlock.lock(); // 加读锁
try {
return Arrays.copyOf(counts, counts.length);
} finally {
rlock.unlock(); // 释放读锁
}
}
}
StampedLock
(乐观锁)
乐观锁会乐观地估计读的过程中大概率不会有写入,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。
import java.util.concurrent.locks.StampedLock;
public class Point {
private final StampedLock stampedLock = new StampedLock();
private double x;
private double y;
public void move(double deltaX, double deltaY) {
long stamp = stampedLock.writeLock(); // 获取写锁
try {
x += deltaX;
y += deltaY;
} finally {
stampedLock.unlockWrite(stamp); // 释放写锁
}
}
public double distanceFromOrigin() {
long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
// 注意下面两行代码不是原子操作
// 假设x,y = (100,200)
double currentX = x;
// 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
double currentY = y;
// 此处已读取到y,如果没有写入,读取是正确的(100,200)
// 如果有写入,读取是错误的(100,400)
if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
stamp = stampedLock.readLock(); // 获取一个悲观读锁
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp); // 释放悲观读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
Concurrent
接口 | 非线程安全类 | 线程安全类 |
---|---|---|
List | ArrayList |
CopyOnWriteArrayList |
Map | HashMap |
ConcurrentHashMap |
Set | HashSet /TreeSet |
CopyOnWriteArraySet |
Queue | ArrayDeque /LinkedList |
ArrayBlockingQueue /LinkedBlockingQueue |
Deque | ArrayDeque /LinkedList |
LinkedBlockingDeque |
示例
Map<String, String> map = new ConcurrentHashMap<>();
// 在不同的线程读写:
map.put("A", "1");
map.put("B", "2");
map.get("A", "1");
Atomic
以AtomicInteger
为例,它提供的主要操作有:
-
增加值并返回新值:
intaddAndGet(intdelta)
-
加1后返回新值:
intincrementAndGet()
-
获取当前值:
intget()
-
用CAS方式设置:
intcompareAndSet(intexpect,intupdate)
例如 使用AtomicLong
编写一个多线程安全的全局唯一ID生成器:
class IdGenerator {
AtomicLong var = new AtomicLong(0);
public long getNextId() {
return var.incrementAndGet();
}
}
ExecutorService
(线程池)
-
FixedThreadPool
:线程数固定的线程池。 -
CachedThreadPool
:线程数根据任务动态调整的线程池。 -
SingleThreadExecutor
:仅单线程执行的线程池。 -
ScheduledThreadPool
:可以放入需要定期反复执行的线程池。
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
// 1秒后执行一次性任务:
ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);
// 2秒后开始执行定时任务,每3秒执行:
ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);
//2秒后开始执行定时任务,以3秒为间隔执行:
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);
// 创建一个固定大小的线程池:
ExecutorService es = Executors.newFixedThreadPool(4);
for (int i = 0; i < 6; i++) {
es.submit(new Task("" + i));
}
// 关闭线程池:
es.shutdown();
}
}
class Task implements Runnable {
private final String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("start task " + name);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("end task " + name);
}
}
Future
Runnable
接口没有返回值,Callable
带有返回值。
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newFixedThreadPool(4);
Future<String> future = es.submit(new Task("123"));
System.out.println(future.get());
es.shutdown();
}
}
class Task implements Callable<String> {
String name;
public Task(String code) {
this.name = code;
}
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "hello " + this.name;
}
}
一个Future<V>
接口表示一个未来可能会返回的结果,它定义的方法有:
-
get()
:获取结果(任务完成时返回结果,未完成时阻塞,直到完成时才返回结果) -
get(longtimeout,TimeUnitunit)
:获取结果,但只等待指定的时间; -
cancel(booleanmayInterruptIfRunning)
:取消当前任务; -
isDone()
:判断任务是否已完成。
CompletableFuture
CompletableFuture
的优点是:
-
异步任务结束时,会自动回调某个对象的方法;
-
异步任务出错时,会自动回调某个对象的方法;
-
主线程设置好回调后,不再关心异步任务的执行。
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) throws Exception {
// 两个CompletableFuture执行异步查询:
CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油", "https://finance.sina.com.cn/code/");
});
CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油", "https://money.163.com/code/");
});
// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
// 两个CompletableFuture执行异步查询:
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
});
CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://money.163.com/price/");
});
// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
// 最终结果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(200);
}
static String queryCode(String name, String url) {
System.out.println("query code from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
}
return "601857";
}
static Double fetchPrice(String code, String url) {
System.out.println("query price from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
anyOf()
表示“任意个CompletableFuture
只要一个成功”
allOf()
表示“所有CompletableFuture
都必须成功”
ForkJoin
一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。
写一个不严谨的例子
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class Main {
public static void main(String[] args) throws Exception {
// 创建900000000个随机数组成的数组:
long[] array = new long[100000000];
long expectedSum = 0;
for (int i = 0; i < array.length; i++) {
array[i] = random();
expectedSum += array[i];
}
System.out.println("Expected sum: " + expectedSum);
// fork/join:
ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
long startTime = System.currentTimeMillis();
Long result = ForkJoinPool.commonPool().invoke(task);
long endTime = System.currentTimeMillis();
System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
long res = 0;
startTime = System.currentTimeMillis();
for (int i = 0; i < array.length; i++) {
res += array[i];
}
endTime = System.currentTimeMillis();
System.out.println("Common sum: " + res + " in " + (endTime - startTime) + " ms.");
}
static Random random = new Random(0);
static long random() {
return random.nextInt(10000);
}
}
class SumTask extends RecursiveTask<Long> {
static final int THRESHOLD = 500;
long[] array;
int start;
int end;
SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 如果任务足够小,直接计算:
long sum = 0;
for (int i = start; i < end; i++) {
sum += this.array[i];
// 故意放慢计算速度:
// try {
// Thread.sleep(1);
// } catch (InterruptedException e) {
// }
}
return sum;
}
// 任务太大,一分为二:
int middle = (end + start) / 2;
//System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
SumTask subtask1 = new SumTask(this.array, start, middle);
SumTask subtask2 = new SumTask(this.array, middle, end);
invokeAll(subtask1, subtask2);
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
Long result = subresult1 + subresult2;
//System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
return result;
}
}
ThreadLocal
(线程本地变量)
其实就是为了避免冲突在每个线程中都为变量创建了一个副本
使用ThreadLocal
要用try...finally
结构,并在finally
中清除。
举例如下:
import java.util.stream.IntStream;
public class Main {
public static void main(String[] args) {
ThreadLocal<String> local = new ThreadLocal<>();
IntStream.range(0, 10).forEach(i -> new Thread(() -> {
try {
local.set(Thread.currentThread().getName() + ":" + i);
System.out.println("线程:" + Thread.currentThread().getName() + ",local:" + local.get());
} finally {
local.remove();
}
}).start());
}
}
原文始发于微信公众号(布沃布图):Java 多线程
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/24966.html