Java 多线程并发【15】Executor 框架

Java 多线程并发【15】Executor 框架

在 Java 中,线程执行的任务被抽象为了 Runnable 接口,Runnable 通过唯一的 run 方法表示着任务运行的能力。而执行任务的能力,并不应该仅仅是 Thread ,可能别的需要执行任务的东西,也具备执行任务的能力,所以就有了 Executor 接口。

Executor 是用来执行 Runnable 的执行器,它具有将任务的提交与任务的执行进行解耦的能力。

Executor 代表的是任务的执行器;Runnable 代表的是任务本身。

在 Java 中通常需要通过线程来运行代码,所以 Executor 的实现离不开线程。

Executor 接口提供了一种将任务提交与每个任务将如何运行的机制解耦的方法,包括线程使用、调度等的细节。通常使用 Executor 而不是显式创建线程,能够更好地管理系统资源。

public interface Executor {
    /**
     * 在将来的某个时间执行给定的命令。 
     * 命令将由 Executor 的实现决定会在一个新的线程、一个线程池中的线程还是调用线程执行。
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be accepted for execution
     * @throws NullPointerException if command is null
     */

    void execute(Runnable command);
}

Executor 接口通过 execute(Runnable command) 来表示它所具备的执行任务的能力。

优点

在 Executor 的注释中提到了通常使用 Executor 而不是显式创建线程,例如,不是为一组任务中的每个任务都创建一个新线程:

new Thread(new(RunnableTask1())).start();
new Thread(new(RunnableTask2())).start();
...

而是通过创建一个统一的执行器,使用执行器去执行不同的任务:

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...

执行器可以实现不同的任务运行逻辑,可以更灵活地管理线程的调度。

同步与异步

Executor 只是代表了一个执行器,但这个执行器是同步还是异步的,需要根据其具体实现来判断。例如,以下是一个简单的同步实现:

class DirectExecutor implements Executor {     
   public void execute(Runnable r) {       
       r.run();     
    }   
}

DirectExecutor 的实现,直接调用了 Runnable 的 run 方法,没有任何导致代码切换到其他线程的操作,所以它是同步的。

一个典型的异步执行器的实现是:

class ThreadPerTaskExecutor implements Executor {     
   public void execute(Runnable r) {       
       new Thread(r).start();     
    }   
}

这种实现就是等同于上面的例子中的:

new Thread(new(RunnableTask1())).start();
new Thread(new(RunnableTask2())).start();
...

许多 Executor 的实现对任务如何调度以及何时调度进行了限制。下面的例子就是将任务添加到一个队列,然后按照先进先出的规则连续执行的一个执行器:

class SerialExecutor implements Executor {     
   
   final Queue<Runnable> tasks = new ArrayDeque<Runnable>();     
   final Executor executor;     
   Runnable active; 
  
   SerialExecutor(Executor executor) {       
      this.executor = executor;     
    }
  
   public synchronized void execute(final Runnable r) {       
       tasks.offer(new Runnable() {         
           public void run() {           
               try {              
                   r.run();           
                } finally {             
                   scheduleNext();           
                }         
            }       
        });       
       if (active == null) {         
           scheduleNext();       
        }     
    }
  
   protected synchronized void scheduleNext() {       
       if ((active = tasks.poll()) != null) {         
           executor.execute(active);       
        }     
    }   
}

这个 Executor 处理了调度:在当前线程执行;时间:在插入元素后,立刻执行,并且在执行完成后,检查队列是否为空,不为空就会连续按序执行。

Executor 的实现

J.U.C. 包中提供了 Executor 的实现 ExecutorService , 这是一个执行器的定义更加完善的接口。

public interface ExecutorService extends Executor {

    /**
     * 启动有序关闭,其中执行先前提交的任务,但不会接受新任务。 如果已经关闭,调用没有额外的效果。
     * 此方法不等待先前提交的任务完成执行。 使用 awaitTermination 来做到这一点。
     *
     */

    void shutdown();

    /**
     * 尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。
     * 此方法不等待主动执行的任务终止。 使用 awaitTermination 来做到这一点。
     * 除了尽最大努力停止处理正在执行的任务之外,没有任何保证。 例如,典型的实现将通过 Thread.interrupt 取消,因此任何未能响应中断的任务可能永远不会终止。
     */

    List<Runnable> shutdownNow();

    /**
     * 如果此执行程序已关闭,则返回 {@code true}。
     */

    boolean isShutdown();

    /**
     * 如果所有任务在关闭后都已完成,则返回 true。 
     * 请注意,除非首先调用了 shutdown 或 shutdownNow,否则 isTerminated 永远不会为 true。
     */

    boolean isTerminated();

    /**
     * 阻塞直到所有任务在关闭请求后完成执行,或者发生超时,或者当前线程被中断,以先发生者为准。
     */

    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * 提交一个返回值的任务以供执行,并返回一个代表任务未决结果的 Future。Future 的 get 方法将在成功完成后返回任务的结果。
     * 如果您想立即阻止等待任务,可以使用 result = exec.submit(aCallable).get(); 形式的结构。
     * 注意:Executors 类包含一组方法,可以将一些其他常见的类似闭包的对象(例如 java.security.PrivilegedAction)转换为 Callable 形式,以便可以提交它们。
     */

    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交 Runnable 任务以执行并返回代表该任务的 Future。Future 的 get 方法将在成功完成后返回给定的结果。
     */

    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交 Runnable 任务以执行并返回代表该任务的 Future。Future 的 get 方法将在成功完成后返回 null。
     */

    Future<?> submit(Runnable task);

    /**
     * 执行给定的任务,返回一个 Futures 列表,在所有完成时保存它们的状态和结果。 
     * Future.isDone 对于返回列表的每个元素都是 true。 
     * 请注意,已完成的任务可能已经正常终止,也可能通过引发异常终止。 如果在此操作进行时修改了给定的集合,则此方法的结果是不确定的。
     */

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     * 执行给定的任务,当全部完成或超时到期(以先发生者为准)时,返回保存其状态和结果的 Futures 列表。 
     * Future.isDone 对于返回列表的每个元素都是 true。 返回时,未完成的任务将被取消。 
     * 请注意,已完成的任务可能已经正常终止,也可能通过引发异常终止。 如果在此操作进行时修改了给定的集合,则此方法的结果是不确定的。
     */

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * 执行给定的任务,返回已成功完成的任务的结果(即不抛出异常),如果有的话。 正常或异常返回时,取消未完成的任务。 
     * 如果在此操作进行时修改了给定的集合,则此方法的结果是不确定的。
     */

    <T> invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    /**
     * 执行给定任务,返回已成功完成的任务的结果(即,不抛出异常),如果在给定超时过去之前有任何操作。 正常或异常返回时,取消未完成的任务。 
     * 如果在此操作进行时修改了给定的集合,则此方法的结果是不确定的。
     */

    <T> invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)

        throws InterruptedException, ExecutionException, TimeoutException
;
}

ExecutorService 的能力是三个,也是三个方法组:

  • Shotdown 系列方法,表示关闭执行器。
  • Submit 系列方法,表示提交任务。
  • Invoke 系列方法,表示执行任务。

而 ExecutorService 实现,就是丰富的线程池了。

ExecutorService 的使用通过下面这个网络请求中的服务端示例来说明,这个示例通过 J.U.C. 提供的便捷工厂方法 Executors.newFixedThreadPool 创建了线程池:

class NetworkService implements Runnable {     
   private final ServerSocket serverSocket;     
   private final ExecutorService pool;        
   
   public NetworkService(int port, int poolSize) throws IOException {       
       serverSocket = new ServerSocket(port);       
       pool = Executors.newFixedThreadPool(poolSize);     
    }        
   public void run() 
       // run the service       
       try {         
            for (;;) {           
                 pool.execute(new Handler(serverSocket.accept()));         
              }       
        } catch (IOException ex) {         
           pool.shutdown();       
        }     
    }   
}      

class Handler implements Runnable {     
   private final Socket socket;     
   Handler(Socket socket) { this.socket = socket; }     
   public void run() {       
       // read and service request on socket     
    }   
}

关闭方法的示例,以下方法分两个阶段关闭 ExecutorService,首先调用 shutdown 拒绝传入任务,然后在必要时调用 shutdownNow 取消任何延迟任务:

void shutdownAndAwaitTermination(ExecutorService pool) {     
   pool.shutdown(); 
   // Disable new tasks from being submitted     
   try {       
       // Wait a while for existing tasks to terminate       
       if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {         
           pool.shutdownNow(); // Cancel currently executing tasks         
           // Wait a while for tasks to respond to being cancelled         
           if (!pool.awaitTermination(60, TimeUnit.SECONDS)) 
               System.err.println("Pool did not terminate");       
        }     
    } catch (InterruptedException ie) {       
       // (Re-)Cancel if current thread also interrupted       
       pool.shutdownNow();       
       // Preserve interrupt status      
       Thread.currentThread().interrupt();     
    }   
}

内存一致性效果:在将 Runnable 或 Callable 任务提交给 ExecutorService 之前,线程中的操作发生在该任务采取的任何操作之前,这反过来又发生在通过 Future.get() 检索结果之前。

ExecutorService 的实现是抽象类 AbstractExecutorService ,它是线程池 ThreadPoolExecutor 的父类。AbstractExecutorService 中对 ExecutorService 提供的方法进行了默认实现。

AbstractExecutorService 使用 newTaskFor 方法返回的 RunnableFuture ,从而在 ExecutorService 的三组方法中通过 RunnableFuture 实现了逻辑。例如:

    public Future<?> submit(Runnable task) {
        if (task == nullthrow new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

子类可以覆盖 newTaskFor 方法以返回 FutureTask 以外的 RunnableFuture 实现。

两种 newTaskFor 方法在 AbstractExecutorService 中的默认实现:

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

关于 AbstractExecutorService 的细节可自行查看源码。

总结

从 Executor 、 ExecutorService 再到 AbstractExecutorService ,这是 Executor 体系中的顶层逻辑。由它们衍生出来的实现类就是各种各样的线程池和其他执行器。理解好了顶层逻辑的思想,才能继续深入线程池的实现。


原文始发于微信公众号(八千里路山与海):Java 多线程并发【15】Executor 框架

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/85084.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!