Java8 最大程度利用执行器详解

第 2 章管理大量线程–执行器中,我们介绍了执行器的基本特性,以此提高执行大量并发任务的并发应用的性能。在本章中,我们将为您的并发应用提供一个强大的工具,并进一步解释它们的特点。在本章中,我们将介绍以下内容:

executor 是类,允许程序员执行并发任务,而不必担心线程的创建和管理。程序员创建Runnable对象并将其发送给执行器,执行器创建并管理执行这些任务所需的线程。在第 2 章管理大量线程–执行器中,我们介绍了执行器框架的基本特征:

  • 如何创建执行器以及创建执行器时的不同选项
  • 如何将并发任务发送给执行者
  • 如何控制执行者使用的资源
  • 执行器如何在内部使用线程池来优化应用的性能

然而,执行者可以给你更多的选择,使它们成为并发应用中的强大机制。

取消任务

您可以在将任务发送给执行者后取消任务的执行。当您使用submit()方法向执行器发送Runnable对象时,它返回Future接口的实现。此类允许您控制任务的执行。它有cancel()方法,试图取消任务的执行。它接收一个布尔值作为参数。如果它接受了true值,并且执行器正在执行该任务,则执行该任务的线程将被中断。

以下是无法取消要取消的任务的情况:

  • 任务已被取消
  • 任务已完成执行
  • 任务正在运行,您将false作为参数提供给cancel()方法
  • API 文档中未指定的其他原因

cancel()方法返回一个布尔值,指示任务是否已取消。

安排任务的执行

ThreadPoolExecutor类是ExecutorExecutorService接口的基本实现。但是 Java 并发 API 提供了此类的扩展,以允许执行计划任务。这是ScheduledThreadPoolExeuctor课程,您可以:

  • 延迟后执行任务
  • 定期执行任务;这包括以固定速率或固定延迟执行任务

覆盖执行器方法

执行器框架是一种非常灵活的机制。您可以实现自己的执行器来扩展现有类(ThreadPoolExecutorScheduledThreadPoolExecutor之一)以获得所需的行为。这些类包括使更改执行器工作方式变得容易的方法。如果您覆盖ThreadPoolExecutor,您可以覆盖以下方法:

  • beforeExecute():在执行器中执行并发任务之前调用此方法。它接收将要执行的Runnable对象和将执行它们的Thread对象。此方法接收的Runnable对象是FutureTask类的实例,而不是您使用submit()方法发送给执行者的Runnable对象。
  • afterExecute():在执行器中执行并发任务后调用此方法。它接收已执行的Runnable对象和存储任务内抛出的可能异常的Throwable对象。在beforeExecute()方法中,Runnable对象是FutureTask类的实例。
  • newTaskFor():此方法创建要执行您使用submit()方法发送的Runnable对象的任务。它必须返回一个RunnableFuture接口的实现。默认情况下,OpenJDK8 和 OracleJDK8 返回一个FutureTask类的实例,但这种情况在将来的实现中可能会发生变化。

如果扩展ScheduledThreadPoolExecutor类,可以重写decorateTask()方法。此方法与计划任务的newTaskFor()类似。它允许您覆盖执行者执行的任务。

更改一些初始化参数

您还可以通过在执行器创建时更改一些参数来更改执行器的行为。最有用的是:

  • BlockingQueue<Runnable>:每个执行者都使用内部BlockingQueue来存储等待其执行的任务。您可以将此接口的任何实现作为参数传递。例如,您可以更改执行者用于执行任务的默认顺序。
  • ThreadFactory:您可以指定ThreadFactory接口的实现,执行者将使用该工厂创建执行任务的线程。例如,您可以使用ThreadFactory接口创建Thread类的扩展,该扩展保存有关任务执行时间的日志信息。
  • RejectedExecutionHandler:调用shutdown()shutdownNow()方法后,发送给执行者的所有任务都将被拒绝。您可以指定RejectedExecutionHandler接口的实现来管理这种情况。

第 2 章管理大量线程–执行器中,我们给出了一个客户机/服务器应用的示例。我们实现了一个服务器来搜索世界银行世界发展指标的数据,并实现了一个客户端,该客户端对该服务器进行多次调用,以测试执行器的性能。

在本节中,我们将扩展该示例以添加以下特征:

  • 您可以使用新的取消查询在服务器中取消查询的执行。
  • 您可以使用优先级参数控制查询的执行顺序。优先级较高的任务将首先执行。
  • 服务器将计算使用服务器的不同用户使用的任务数和总执行时间。

为了实现这些新特性,我们对服务器进行了以下更改:

  • 我们为每个查询添加了两个参数。第一个是发送查询的用户的名称,另一个是查询的优先级。查询的新格式如下:
    • 查询q;username;priority;codCountry;codIndicator;year其中username为用户名,priority为查询优先级,codCountry为国家代码,codIndicator为指标代码,year为查询年份的可选参数。
    • 报告r;username;priority;codIndicator其中username为用户名称,priority为查询优先级,codIndicator为您要报告的指标代码。
    • 状态s;username;priority其中username为用户名称,priority为查询优先级。
    • 停止z;username;priority其中username为用户名称,priority为查询优先级。
  • 我们实现了一个新的查询:
    • 取消c;username;priority其中username为用户名称,priority为查询优先级。
  • 我们已经实现了我们自己的执行器,以:
    • 计算每个用户的服务器使用率
    • 按优先级执行任务
    • 控制任务的拒绝
    • 我们调整了ConcurrentServerRequestTask以考虑服务器的新元素

服务器的其余元素(缓存系统、日志系统和DAO类)是相同的,因此不再赘述。

ServerExecutor 类

正如我们前面提到的,我们已经实现了我们自己的执行器来执行服务器的任务。我们还实现了一些额外但必要的类来提供所有功能。让我们描述一下这些类。

统计对象

我们的服务器将计算每个用户在其上执行的任务数量以及这些任务的总执行时间。为了存储这些数据,我们实现了ExecutorStatistics类。它有两个属性来存储信息:

public class ExecutorStatistics {
    private AtomicLong executionTime = new AtomicLong(0L);
    private AtomicInteger numTasks = new AtomicInteger(0);

这些属性是支持单变量原子操作的AtomicVariables。这允许您在不同的线程中使用这些变量,而无需使用任何同步机制。然后,它有两种方法来增加任务数量和执行时间:

    public void addExecutionTime(long time) {
        executionTime.addAndGet(time);
    }
    public void addTask() {
        numTasks.incrementAndGet();
    }

最后,我们添加了获取两个属性值的方法,并覆盖了toString()方法,以可读的方式获取信息:

    @Override
    public String toString() {
        return "Executed Tasks: "+getNumTasks()+". Execution Time: "+getExecutionTime();
    }

被拒绝的任务控制器

当您创建一个执行器时,您可以指定一个类来管理被拒绝的任务。当您在执行器中调用了shutdown()shutdownNow()方法后提交任务时,执行器将拒绝该任务。

为了控制这种情况,我们实现了RejectedTaskController类。此类实现了RejectedExecutionHandler接口,实现了rejectedExecution()方法:

public class RejectedTaskController implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
        ConcurrentCommand command=(ConcurrentCommand)task;
        Socket clientSocket=command.getSocket();
        try {
            PrintWriter out = new PrintWriter(clientSocket.getOutputStream(),true);

            String message="The server is shutting down."
                +" Your request can not be served."
                +" Shutting Down: "
                +String.valueOf(executor.isShutdown())
                +". Terminated: "
                +String.valueOf(executor.isTerminated())
                +". Terminating: "
                +String.valueOf(executor.isTerminating());
            out.println(message);
            out.close();
            clientSocket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

每个被拒绝的任务调用一次rejectedExecution()方法,并将被拒绝的任务和拒绝该任务的执行者作为参数接收。

执行者的任务

当向执行者提交Runnable对象时,它不会直接执行该Runnable对象。它创建了一个新对象,FutureTask类的一个实例,这个任务由执行器的工作线程执行。

在我们的案例中,为了度量任务的执行时间,我们在ServerTask类中实现了自己的FutureTask实现。它扩展了FutureTask类,实现了Comparable接口,如下所示:

public class ServerTask<V> extends FutureTask<V> implements Comparable<ServerTask<V>>{

在内部,它存储将要作为ConcurrentCommand对象执行的查询:

    private ConcurrentCommand command;

在构造函数中使用FutureTask类的构造函数并存储ConcurrentCommand对象:

    public ServerTask(ConcurrentCommand command) {
        super(command, null);
        this.command=command;
    }

    public ConcurrentCommand getCommand() {
        return command;
    }

    public void setCommand(ConcurrentCommand command) {
        this.command = command;
    }

最后,它实现了比较两个ServerTask实例存储的命令以进行比较的compareTo()操作。这可以在以下代码中看到:

    @Override
    public int compareTo(ServerTask<V> other) {
        return command.compareTo(other.getCommand());
    }

遗嘱执行器

现在我们有了执行器的辅助类,我们必须实现执行器本身。为此,我们实现了ServerExecutor类。它扩展了ThreadPoolExecutor类,并具有一些内部属性,如下所示:

  • startTimes:这是一个ConcurrentHashMap,用于存储每个任务的开始日期。类的键将是ServerTask对象(一个Runnable对象),值将是Date对象。
  • executionStatistics:这是一个ConcurrentHashMap用于存储每个用户的使用统计信息。键将是用户名,值将是一个ExecutorStatistics对象。
  • CORE_POOL_SIZEMAXIMUM_POOL_SIZEKEEP_ALIVE_TIME:这些是定义执行者特征的常数。
  • REJECTED_TASK_CONTROLLER:这是一个RejectedTaskController类属性,用于控制被执行者拒绝的任务。

此可由以下代码解释:

public class ServerExecutor extends ThreadPoolExecutor {
    private ConcurrentHashMap<Runnable, Date> startTimes;
    private ConcurrentHashMap<String, ExecutorStatistics> executionStatistics;
    private static int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors();
    private static int MAXIMUM_POOL_SIZE = Runtime.getRuntime().availableProcessors();
    private static long KEEP_ALIVE_TIME = 10;

    private static RejectedTaskController REJECTED_TASK_CONTROLLER = new RejectedTaskController();

    public ServerExecutor() {
        super(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new PriorityBlockingQueue<>(), REJECTED_TASK_CONTROLLER);

        startTimes = new ConcurrentHashMap<>();
        executionStatistics = new ConcurrentHashMap<>();
    }

类的构造函数调用父构造函数,创建一个PriorityBlockingQueue类来存储将在 executor 中执行的任务。此类根据compareTo()方法的执行结果对元素进行排序(因此存储在其中的元素必须实现Comparable接口)。此类的使用将允许我们按优先级执行任务。

然后,我们重写了ThreadPoolExecutor类的一些方法。首先是beforeExecute()方法。此方法在执行每个任务之前执行。它接收ServerTask对象作为参数,以及将要执行任务的线程。在本例中,我们将实际日期存储在ConcurrentHashMap中,每个任务的开始日期如下:

    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        startTimes.put(r, new Date());
    }

第二种方法是afterExecute()方法。此方法在执行器中的每个任务执行后执行,并接收已被执行的对象作为参数和Throwable对象。最后一个参数只有在任务执行期间引发异常时才有值。在我们的案例中,我们将使用此方法:

  • 计算任务的执行时间。

  • 按以下方式更新用户的统计信息:

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            ServerTask<?> task=(ServerTask<?>)r;
            ConcurrentCommand command=task.getCommand();
    
            if (t==null) {
                if (!task.isCancelled()) {
                    Date startDate = startTimes.remove(r);
                    Date endDate=new Date();
                    long executionTime= endDate.getTime() - startDate.getTime();
                                ;
                    ExecutorStatistics statistics = executionStatistics.computeIfAbsent (command.getUsername(), n -> new ExecutorStatistics());
                    statistics.addExecutionTime(executionTime);
                    statistics.addTask();
                    ConcurrentServer.finishTask (command.getUsername(), command);
                }
                else {
    
                    String message="The task" + command.hashCode() + "of user" + command.getUsername() + "has been cancelled.";
                    Logger.sendMessage(message);
                }
    
            } else {
    
                String message="The exception "
                        +t.getMessage()
                        +" has been thrown.";
                Logger.sendMessage(message);
            }
        }

最后,我们已经覆盖了newTaskFor()方法。将执行此方法来转换我们发送给执行者的Runnable对象,在执行者将执行的FutureTask实例中使用submit()方法。在我们的例子中,我们用我们的ServerTask对象替换默认的FutureTask类:

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new ServerTask<T>(runnable);
    }

我们在 executor 中包含了一个附加方法,用于将 executor 中存储的所有统计信息写入日志系统。此方法将在服务器执行结束时调用,稍后您将看到。我们有以下代码:

    public void writeStatistics() {

        for(Entry<String, ExecutorStatistics> entry: executionStatistics.entrySet()) {
             String user = entry.getKey();
             ExecutorStatistics stats = entry.getValue(); Logger.sendMessage(user+":"+stats);
        }
    }

命令类

命令类执行您可以发送到服务器的不同查询。您可以向我们的服务器发送五个不同的查询:

  • 查询:获取一个国家、一个指标和一年的信息。它是由ConcurrentQueryCommand类实现的。
  • 报告:获取指标信息。它是由ConcurrentReportCommand类实现的。
  • 状态:获取服务器的状态信息。它是由ConcurrentStatusCommand类实现的。
  • 取消:取消用户任务的执行。它是由ConcurrentCancelCommand类实现的。
  • 停止:停止服务器的执行。它是由ConcurrentStopCommand类实现的。

我们还有ConcurrentErrorCommand类来管理未知命令到达服务器时的情况,而ConcurrentCommand是所有命令的基类。

ConcurrentCommand 类

这是每个命令的基类。它包括所有命令共有的行为,包括:

  • 调用实现每个命令的特定逻辑的方法
  • 将结果写入客户端
  • 关闭通讯中使用的所有资源

该类扩展了Command类,实现了ComparableRunnable接口。在第 2 章的示例管理大量线程–执行器中,命令是简单类,但在本例中,并发命令是将发送给执行器的Runnable对象:

public abstract class ConcurrentCommand extends Command implements Comparable<ConcurrentCommand>, Runnable{

它有三个属性:

  • username:存储发送查询的用户名称。
  • priority:存储查询的优先级。它将确定查询的执行顺序。
  • socket:用于与客户端通信的套接字。

类的构造函数初始化这些属性:

    private String username;
    private byte priority;
    private Socket socket;

    public ConcurrentCommand(Socket socket, String[] command) {
        super(command);
        username=command[1];
        priority=Byte.parseByte(command[2]);
        this.socket=socket;

    }

这个类的主要功能是抽象的execute()方法和run()方法,该方法将由每个具体的命令来实现,以计算和返回查询结果。run()方法调用execute()方法,将结果存储在缓存中,将结果写入套接字,并关闭通信中使用的所有资源。我们有以下几点:

    @Override
    public abstract String execute();

    @Override
    public void run() {

        String message="Running a Task: Username: "
                +username
                +"; Priority: "
                +priority;
        Logger.sendMessage(message);

        String ret=execute();

        ParallelCache cache = ConcurrentServer.getCache();

        if (isCacheable()) {
            cache.put(String.join(";",command), ret);
        }

        try {
            PrintWriter out = new PrintWriter(socket.getOutputStream(),true);
            out.println(ret);
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println(ret);
    }

最后,compareTo()方法使用优先级属性来确定任务的顺序。这个将被PriorityBlockingQueue类用来排序任务,因此优先级较高的任务将首先执行。考虑到当getPriority()方法返回较低的值时,任务具有较高的优先级。如果任务的getPriority()返回1,则该任务的优先级将高于getPriority()方法返回2的任务:

    @Override
    public int compareTo(ConcurrentCommand o) {
        return Byte.compare(o.getPriority(), this.getPriority());
    }

具体指令

我们对实现不同命令的类做了一些小改动,并添加了一个由ConcurrentCancelCommand类实现的新类。这些类的主要逻辑包含在execute()方法中,该方法计算对查询的响应并将其作为字符串返回。

新的ConcurrentCancelCommandexecute()方法调用ConcurrentServer类的cancelTasks()方法。此方法将停止执行与作为参数传递的用户关联的所有挂起任务:

    @Override
    public String execute() {
        ConcurrentServer.cancelTasks(getUsername());

        String message = "Tasks of user "
                +getUsername()
                +" has been cancelled.";
        Logger.sendMessage(message);
        return message;
    }

ConcurrentReportCommandexecute()方法使用WDIDAO类的query()方法获取用户请求的数据。在第 2 章管理大量线程–执行器中,您可以找到此方法的实现。实现几乎相同。唯一的区别是命令数组索引如下所示:

    @Override
    public String execute() {

        WDIDAO dao=WDIDAO.getDAO();

        if (command.length==5) {
            return dao.query(command[3], command[4]);
        } else if (command.length==6) {
            try {
                return dao.query(command[3], command[4], Short.parseShort(command[5]));
            } catch (NumberFormatException e) {
                return "ERROR;Bad Command";
            }
        } else {
            return "ERROR;Bad Command";
        }
    }

ConcurrentQueryCommandexecute()方法使用WDIDAO类的report()方法获取数据。在第 2 章管理大量线程–执行器中,您也可以找到此方法的实现。这里的实现几乎相同。唯一的区别是命令数组索引:

    @Override
    public String execute() {

        WDIDAO dao=WDIDAO.getDAO();
        return dao.report(command[3]);
    }

ConcurrentStatusCommand在其构造函数中有一个额外的参数:Executor对象将执行命令。此命令使用此对象获取有关执行器的信息,并将其作为响应发送给用户。该实现与第 2 章中的管理大量线程–执行器中的实现几乎相同。我们使用了相同的方法来获取Executor对象的状态。

ConcurrentStopCommandConcurrentErrorCommand也与第 2 章中的管理大量线程–执行器相同,所以我们没有包含它们的源代码。

服务器部分

服务器部分从服务器的客户端接收查询,并创建执行查询的命令类并将其发送给执行器。它由两个类实现:

  • ConcurrentServer类:包括服务器的main()方法和其他取消任务、完成系统执行的方法
  • RequestTask类:该类创建命令并将其发送给执行器

第 2 章的示例管理大量线程–执行器的主要区别在于RequestTask类的角色。在SimpleServer示例中,ConcurrentServer类为每个查询创建一个RequestTask对象,并将它们发送给执行器。在本例中,我们只有一个将作为线程执行的RequestTask实例。当ConcurrentServer接收到连接时,它将套接字存储在挂起连接的并发列表中,以便与客户端通信。RequestTask线程读取该套接字,处理客户端发送的数据,创建相应的命令,并将该命令发送给执行器。

对于代码中的更改,仅将代码中的更改留给执行器进行预处理。

ConcurrentServer 类

ConcurrentServer类需要一些内部属性才能正常工作:

  • 使用缓存系统的ParallelCache实例。
  • 从客户端获取连接的ServerSocket实例。
  • 一个boolean值,用于知道何时必须停止执行。
  • LinkedBlockingQueue用于存储向服务器发送消息的客户端的套接字。这些套接字将由RequestTask类处理。
  • ConcurrentHashMap用于存储与执行器中执行的每个任务相关联的Future对象。键将是发送查询的用户的用户名,值将是另一个键为ConcurrenCommand对象的Map,值将是与该任务关联的Future实例。我们使用这些Future实例来取消任务的执行。
  • 一个RequestTask实例,用于创建命令并将其发送给执行器。
  • 一个Thread对象来执行RequestTask对象。

其代码如下:

public class ConcurrentServer {
    private static ParallelCache cache;
    private static volatile boolean stopped=false;
    private static LinkedBlockingQueue<Socket> pendingConnections;
    private static ConcurrentMap<String, ConcurrentMap<ConcurrentCommand, ServerTask<?>>> taskController;
    private static Thread requestThread;
    private static RequestTask task;

此类的main()方法初始化这些对象,并打开ServerSocket实例来监听来自客户端的连接。此外,它还创建了RequestTask对象,并将其作为线程执行。它将处于循环中,直到shutdown()方法更改停止属性的值。在此之后,它使用RequestTask对象的endTermination()方法等待Executor对象的终结,并使用finishServer()方法关闭Logger系统和RequestTask对象:

    public static void main(String[] args) {

        WDIDAO dao=WDIDAO.getDAO();
        cache=new ParallelCache();
        Logger.initializeLog();
        pendingConnections = new LinkedBlockingQueue<Socket>();
        taskController = new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Future<?>>>();
        task=new RequestTask(pendingConnections, taskController);
        requestThread=new Thread(task);
        requestThread.start();

        System.out.println("Initialization completed.");

        serverSocket= new ServerSocket(Constants.CONCURRENT_PORT);
        do {
            try {
                Socket clientSocket = serverSocket.accept();
                pendingConnections.put(clientSocket);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } while (!stopped);
        finishServer();
        System.out.println("Shutting down cache");
        cache.shutdown();
        System.out.println("Cache ok" + new Date());

    }

包括两种关闭服务器执行器的方法。shutdown()方法更改stopped变量的值并关闭serverSocket实例。finishServer()方法停止执行器,中断执行RequestTask对象的线程,并关闭Logger系统。我们将此过程分为两部分使用Logger系统,直到服务器的最后一条指令:

    public static void shutdown() {
        stopped=true;
        try {
            serverSocket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void finishServer() {
        System.out.println("Shutting down the server...");
        task.shutdown();
        System.out.println("Shutting down Request task");
        requestThread.interrupt();
        System.out.println("Request task ok");
        System.out.println("Closing socket");
        System.out.println("Shutting down logger");
        Logger.sendMessage("Shutting down the logger");
        Logger.shutdown();
        System.out.println("Logger ok");
        System.out.println("Main server thread ended");
    }

服务器包括取消与用户关联的任务的方法。如前所述,Server类使用嵌套的ConcurrentHashMap来存储与用户关联的所有任务。首先,我们获得用户所有任务的Map,然后我们处理这些任务的所有Future对象,调用Future对象的cancel()方法。我们将值true作为参数传递,因此如果执行者正在运行来自该用户的任务,它将被中断。我们已包含必要的代码,以避免取消ConcurrentCancelCommand

    public static void cancelTasks(String username) {

        ConcurrentMap<ConcurrentCommand, ServerTask<?>> userTasks = taskController.get(username);
        if (userTasks == null) {
            return;
        }
        int taskNumber = 0;

        Iterator<ServerTask<?>> it = userTasks.values().iterator();
        while(it.hasNext()) {
            ServerTask<?> task = it.next();
             ConcurrentCommand command = task.getCommand();
              if(!(command instanceof ConcurrentCancelCommand) && task.cancel(true)) {
                    taskNumber++;
                    Logger.sendMessage("Task with code "+command.hashCode()+"cancelled: "+command.getClass().getSimpleName());
                    it.remove();
              }
        }
        String message=taskNumber+" tasks has been cancelled.";
        Logger.sendMessage(message);
    }

最后,我们提供了一种方法,当任务正常完成执行时,从ServerTask对象的嵌套映射中删除与任务关联的Future对象。这是finishTask()方法:

    public static void finishTask(String username, ConcurrentCommand command) {

        ConcurrentMap<ConcurrentCommand, ServerTask<?>> userTasks = taskController.get(username);
        userTasks.remove(command);
        String message = "Task with code "+command.hashCode()+" has finished";
        Logger.sendMessage(message);

    }

请求任务类

RequestTask类是连接到客户端的ConcurrentServer类和Executor执行并发任务的类之间的中介。它打开客户端的套接字,读取查询数据,创建适当的命令,并将其发送给执行器。

它使用一些内部属性:

  • ConcurrentServer类存储客户机套接字的LinkedBlockingQueue
  • AServerExecutor将命令作为并发任务执行
  • 用于存储与任务关联的Future对象的ConcurrentHashMap

类的构造函数初始化所有这些对象:

public class RequestTask implements Runnable {
    private LinkedBlockingQueue<Socket> pendingConnections;
    private ServerExecutor executor = new ServerExecutor();
    private ConcurrentMap<String, ConcurrentMap<ConcurrentCommand, ServerTask<?>>> taskController;
    public RequestTask(LinkedBlockingQueue<Socket> pendingConnections, ConcurrentHashMap<String, ConcurrentHashMap<Integer, Future<?>>> taskController) {
        this.pendingConnections = pendingConnections;
        this.taskController = taskController;
    }

这个类的主要方法是run()方法。它执行一个循环,直到线程在处理存储在pendingConnections对象中的套接字时被中断。在这个对象中,ConcurrentServer类存储套接字,以便与向服务器发送查询的不同客户端通信。它打开套接字,读取数据,并创建相应的命令。这也会将命令发送给执行者,并将Future对象存储在与任务的hashCode和发送查询的用户关联的双ConcurrentHashMap中:

    public void run() {
        try {
            while (!Thread.currentThread().interrupted()) {
                try {
                    Socket clientSocket = pendingConnections.take();
                    BufferedReader in = new BufferedReader(new InputStreamReader (clientSocket.getInputStream()));
                    String line = in.readLine();

                    Logger.sendMessage(line);

                    ConcurrentCommand command;

                    ParallelCache cache = ConcurrentServer.getCache();
                    String ret = cache.get(line);
                    if (ret == null) {
                        String[] commandData = line.split(";");
                        System.out.println("Command: " + commandData[0]);
                        switch (commandData[0]) {
                        case "q":
                            System.out.println("Query");
                            command = new ConcurrentQueryCommand(clientSocket, commandData);
                            break;
                        case "r":
                            System.out.println("Report");
                            command = new ConcurrentReportCommand (clientSocket, commandData);
                            break;
                        case "s":
                            System.out.println("Status");
                            command = new ConcurrentStatusCommand(executor, clientSocket, commandData);
                            break;
                        case "z":
                            System.out.println("Stop");
                            command = new ConcurrentStopCommand(clientSocket, commandData);
                            break;
                        case "c":
                            System.out.println("Cancel");
                            command = new ConcurrentCancelCommand (clientSocket, commandData);
                            break;
                        default:
                            System.out.println("Error");
                            command = new ConcurrentErrorCommand(clientSocket, commandData);
                            break;
                        }

                        ServerTask<?> controller = (ServerTask<?>)executor.submit(command);
                        storeContoller(command.getUsername(), controller, command);
                    } else {
                        PrintWriter out = new PrintWriter (clientSocket.getOutputStream(),true);
                        out.println(ret);
                        clientSocket.close();
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        } catch (InterruptedException e) {
            // No Action Required
        }
    }

storeController()方法是将Future对象存储在双ConcurrentHashMap中的方法:

    private void storeContoller(String userName, ServerTask<?> controller, ConcurrentCommand command) {
        taskController.computeIfAbsent(userName, k -> new ConcurrentHashMap<>()).put(command, controller);
    }

最后,我们包含了两种方法来管理Executor类的执行,一种是为执行者调用shutdown()方法,另一种是等待其完成。请记住,必须显式调用shutdown()shutdownNow()方法来结束执行器的执行。否则,程序将不会终止。请看以下代码:

    public void shutdown() {

        String message="Request Task: "
                +pendingConnections.size()
                +" pending connections.";
        Logger.sendMessage(message);
        executor.shutdown();
    }

    public void terminate() {
        try {
            executor.awaitTermination(1,TimeUnit.DAYS);
            executor.writeStatistics();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

客户方

现在是测试我们服务器的时候了。在这种情况下,我们不会太担心执行时间。我们测试的主要目的是检查新特性是否工作良好。

我们将客户机部分分为以下两类:

  • ConcurrentClient 类:实现服务器的单个客户端。此类的每个实例都有不同的用户名。它进行 100 次查询,90 次类型查询,10 次类型报告。查询查询的优先级为 5,报表查询的优先级较低(10)。
  • MultipleConcurrentClient 类:此度量多个并发客户端并行的行为。我们已经用一到五个并发客户端测试了服务器。此类还测试取消和停止命令。

我们已经包含了一个执行器来执行对服务器的并发请求,以提高客户机的并发级别。

在下图中,您可以看到取消任务的结果:

The client part

在此情况下,用户用户的四个任务被取消。

下图显示了有关每个用户的任务数和执行时间的最终统计信息:

The client part

在前面的执行者示例中,任务只执行一次,并尽快执行。executor 框架包括其他 executor 实现,它为我们提供了更多关于任务执行时间的灵活性。ScheduledThreadPoolExecutor类允许我们定期执行任务,并在延迟后执行任务

在本部分中,您将学习如何执行实现RSS 提要阅读器的定期任务。这是一个简单的例子,您需要定期执行相同的任务(阅读 RSS 提要的新闻)。我们的示例将具有以下特征:

  • 将 RSS 源存储在一个文件中。我们从一些重要的报纸上选择了世界新闻,如《纽约时报》、《每日新闻》或《卫报》。
  • 我们向每个 RSS 源的执行器发送了一个Runnable对象。每次执行器运行对象时,它都会解析 RSS 源并将其转换为包含 RSS 内容的CommonInformationItem对象列表。
  • 我们使用生产者/消费者设计模式将 RSS 新闻写入磁盘。生产者将是执行者的任务,执行者将每个CommonInformationItem写入缓冲区。只有新项目才会存储在缓冲区中。使用者将是一个独立的线程,从缓冲区读取新闻并将其写入磁盘。
  • 任务的最终执行与下一次执行之间的时间为一分钟。

我们还实现了示例的高级版本,其中一个任务的两次执行之间的时间可能不同。

常见部位

正如我们前面提到的,我们阅读 RSS 提要并将其转换为对象列表。为了解析 RSS 文件,我们将它们视为 XML 文件,并且我们在RSSDataCapturer类中实现了一个SAX(简称Simple API for XML解析器。它解析文件并创建一个CommonInformationItem列表。此类为每个 RSS 项目存储以下信息:

* 标题**:RSS 项目的标题。

  • 日期:RSS 项目的日期。
  • 链接:指向 RSS 项目的链接。
  • 说明:RSS 项目的文本。
  • ID:RSS 项目的 ID。如果项目不包含 ID,我们将计算它。
  • :RSS 源的名称。

我们使用 Producer/Consumer 设计模式将新闻存储到磁盘中,因此我们需要一个缓冲区来存储新闻和一个Consumer类,在本例中,该类从缓冲区读取新闻并将其存储到磁盘中。

我们在NewsBuffer类中实现了缓冲区。它有两个内部属性:

  • 一个 LinkedBlockingQueue:这是一个带有阻塞操作的并发数据结构。如果我们想要从列表中获取一个空的元素,那么我们将从列表中获取一个空的元素。我们将使用此结构来存储CommonInformationItems
  • ConcurrentHashMap:这是HashMap的并发实现。我们将使用它来存储之前存储在缓冲区中的新闻项的 ID。

我们将只在缓冲区中插入之前未插入的新闻:

public class NewsBuffer {
    private LinkedBlockingQueue<CommonInformationItem> buffer;
    private ConcurrentHashMap<String, String> storedItems;

    public NewsBuffer() {
        buffer=new LinkedBlockingQueue<>();
        storedItems=new ConcurrentHashMap<String, String>();
    }

NewsBuffer类中有两种方法:一种是在缓冲区中存储一个项,用于检查该项之前是否已插入;另一种是从缓冲区中获取下一个项。我们使用compute()方法在ConcurrentHashMap中插入元素。此方法将 lambda 表达式作为参数接收,该参数包含键和与此键关联的实际值(如果键没有关联值,则为 null)。在本例中,我们将该项添加到缓冲区中,该缓冲区以前没有处理过。我们使用add()take()方法从队列中插入、获取和删除元素:

    public void add (CommonInformationItem item) {
        storedItems.compute(item.getId(), (id, oldSource) -> {
              if(oldSource == null) {
                buffer.add(item);
                return item.getSource();
              } else {
                System.out.println("Item "+item.getId()+" has been processed before");
                return oldSource;
              }
            });
    }

    public CommonInformationItem get() throws InterruptedException {
        return buffer.take();
    }

缓冲区的项目将由NewsWriter类写入磁盘,该类将作为独立线程执行。它只有一个指向应用中使用的NewsBuffer类的内部属性:

public class NewsWriter implements Runnable {
    private NewsBuffer buffer;
    public NewsWriter(NewsBuffer buffer) {
        this.buffer=buffer;
    }

Runnable对象的run()方法从缓冲区中获取CommonInformationItem实例,并将其保存到磁盘中。由于我们使用阻塞方法take,如果缓冲区为空,则该线程将被阻塞,直到缓冲区中有元素:

    public void run() {
        try {
            while (!Thread.currentThread().interrupted()) {
                CommonInformationItem item=buffer.get();

                Path path=Paths.get ("output\\"+item.getFileName());

                try (BufferedWriter fileWriter = Files.newBufferedWriter(path, StandardOpenOption.CREATE)) {
                    fileWriter.write(item.toString());
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        } catch (InterruptedException e) {
            //Normal execution
        }
    }

基本读者

基本的读取器将使用标准的ScheduledThreadPoolExecutor类定期执行任务。我们将对每个 RSS 源执行一个任务,从一个任务的一次执行结束到下一次执行开始之间有一分钟的时间。这些并发任务在NewsTask类中实现。它有三个内部属性来存储 RSS 提要的名称、URL 和存储新闻的NewsBuffer类:

public class NewsTask implements Runnable {
    private String name;
    private String url;
    private NewsBuffer buffer;

    public NewsTask (String name, String url, NewsBuffer buffer) {
        this.name=name;
        this.url=url;
        this.buffer=buffer;
    }

这个Runnable对象的run()方法只是解析 RSS 提要,获取 CommonItemInterface 实例列表,并将它们存储在缓冲区中。此方法将以周期性方式执行。在每次执行中,run()方法将从头到尾执行:

    @Override
    public void run() {
        System.out.println(name+": Running. " + new Date());
        RSSDataCapturer capturer=new RSSDataCapturer(name);
        List<CommonInformationItem> items=capturer.load(url);

        for (CommonInformationItem item: items) {
            buffer.add(item);
        }
    }

在本例中,我们还实现了另一个线程来实现执行器和任务的初始化以及等待执行的最终完成。我们将这个类命名为NewsSystem。它有三个内部属性来存储 RSS 源文件的路径、存储新闻的缓冲区和控制执行结束的CountDownLatch对象。CountDownLatch类是一种同步机制,允许您让线程等待事件。我们将在第 9 章中详细介绍此类的使用,将深入探讨并发数据结构和同步工具。我们有以下代码:

public class NewsSystem implements Runnable {
    private String route;
    private ScheduledThreadPoolExecutor executor;
    private NewsBuffer buffer;
    private CountDownLatch latch=new CountDownLatch(1);

    public NewsSystem(String route) {
        this.route = route;
        executor = new ScheduledThreadPoolExecutor (Runtime.getRuntime().availableProcessors());
        buffer=new NewsBuffer();
    }

run()方法中,我们读取所有 RSS 源,为每个源创建一个NewsTask类,并将它们发送给我们的ScheduledThreadPool执行器。我们已经使用Executors类的newScheduledThreadPool()方法创建了 executor,并使用scheduleAtFixedDelay()方法将任务发送给它。我们还将NewsWriter实例作为线程启动。run()方法等待有人告诉它使用CountDownLatch类的 await()方法完成执行,并结束NewsWriter任务和ScheduledExecutor的执行。

    @Override
    public void run() {
        Path file = Paths.get(route);
        NewsWriter newsWriter=new NewsWriter(buffer);
        Thread t=new Thread(newsWriter);
        t.start();

        try (InputStream in = Files.newInputStream(file);
                BufferedReader reader = new BufferedReader(
                        new InputStreamReader(in))) {
            String line = null;
            while ((line = reader.readLine()) != null) {
                String data[] = line.split(";");

                NewsTask task = new NewsTask(data[0], data[1], buffer);
                System.out.println("Task "+task.getName());
                executor.scheduleWithFixedDelay(task,0, 1, TimeUnit.MINUTES);
            }
        }  catch (Exception e) {
            e.printStackTrace();
        }

        synchronized (this) {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("Shutting down the executor.");
        executor.shutdown();
        t.interrupt();
        System.out.println("The system has finished.");

    }

我们还实现了shutdown()方法。此方法将通知NewsSystem类使用CountDownLatch类的countDown() 方法结束其执行。此方法将唤醒run()方法,因此将关闭运行NewsTask对象的执行器:

    public void shutdown() {
        latch.countDown();
    }

本例的最后一个类是实现本例main()方法的Main类。它以线程的形式启动一个NewsSystem实例,等待 10 分钟,并通知线程其终结,从而完成整个系统的执行,如下所示:

public class Main {

    public static void main(String[] args) {

        // Creates the System an execute it as a Thread
        NewsSystem system=new NewsSystem("data\\sources.txt");

        Thread t=new Thread(system);

        t.start();

        // Waits 10 minutes
        try {
            TimeUnit.MINUTES.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // Notifies the finalization of the System
         (
        system.shutdown();
    }

执行此示例时,您会看到不同的任务是如何周期性执行的,以及新闻项是如何写入磁盘的,如以下屏幕截图所示:

The basic reader

高级读物

基本的新闻阅读器是使用ScheduledThreadPoolExecutor类的一个例子,但我们可以更进一步。与ThreadPoolExecutor一样,我们可以实现自己的ScheduledThreadPoolExecutor来获得特定的行为。在我们的例子中,我们希望周期性任务的延迟时间根据一天中的某个时刻而变化。在本部分中,您将学习如何实现此行为。

第一步是实现一个类,该类告诉我们周期性任务的两次执行之间的延迟。我们把它命名为Timer类。它只有一个名为getPeriod()的静态方法,返回一次执行结束到下一次执行开始之间的毫秒数。这是我们的实现,但您可以自己制作:

public class Timer {
    public static long getPeriod() {
        Calendar calendar = Calendar.getInstance();
        int hour = calendar.get(Calendar.HOUR_OF_DAY);

        if ((hour >= 6) && (hour <= 8)) {
            return TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
        }

        if ((hour >= 13) && (hour <= 14)) {
            return TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
        }

        if ((hour >= 20) && (hour <= 22)) {
            return TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
        }
        return TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
    }
}

接下来,我们必须执行执行器的内部任务。当您向执行者发送一个Runnable对象时,在外部,您将该对象视为并发任务,但执行者将该对象转换为另一个任务,即FutureTask类的实例,该类包括执行任务的run()方法和管理任务执行的Future接口的方法。为了实现这个示例,我们必须实现一个扩展FutureTask类的类,并且,由于我们将在调度执行器中执行这些任务,它必须实现RunnableScheduledFuture接口。此接口提供了返回下一次执行任务剩余时间的getDelay()方法。我们已经在ExecutorTask课上完成了这些内部任务。它有四个内部属性:

  • ScheduledThreadPoolExecutor类创建的原始RunnableScheduledFuture内部任务
  • 将执行任务的计划执行者
  • 下一次执行任务的开始日期
  • RSS 源的名称

其代码如下:

public class ExecutorTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
    private RunnableScheduledFuture<V> task;

    private NewsExecutor executor;

    private long startDate;

    private String name;

    public ExecutorTask(Runnable runnable, V result, RunnableScheduledFuture<V> task, NewsExecutor executor) {
        super(runnable, result);
        this.task = task;
        this.executor = executor;
        this.name=((NewsTask)runnable).getName();
        this.startDate=new Date().getTime();
    }

我们在这个类中重写或实现了不同的方法。第一种是getDelay()方法,正如我们之前所说的,它返回给定单位时间内下一次执行任务的剩余时间:

    @Override
    public long getDelay(TimeUnit unit) {
        long delay;
        if (!isPeriodic()) {
            delay = task.getDelay(unit);
        } else {
            if (startDate == 0) {
                delay = task.getDelay(unit);
            } else {
                Date now = new Date();
                delay = startDate - now.getTime();
                delay = unit.convert(delay, TimeUnit.MILLISECONDS);
            }

        }

        return delay;
    }

下一个方法是compareTo()方法,考虑到下一次执行任务的开始日期,比较两个任务:

    @Override
    public int compareTo(Delayed object) {
        return Long.compare(this.getStartDate(), ((ExecutorTask<V>)object).getStartDate());
    }

然后,如果任务是周期性的,isPeriodic()方法返回true,如果任务不是周期性的,false方法返回:

    @Override
    public boolean isPeriodic() {
        return task.isPeriodic();
    }

最后,我们有run()方法,它实现了本例中最重要的部分。首先,我们调用FutureTask类的runAndReset()方法。此方法执行任务并重置其状态,以便可以再次执行。然后,我们使用Timer类计算下一次执行的开始日期,最后,我们必须再次将任务插入ScheduledThreadPoolExecutor类的队列中。如果我们不执行最后一步,任务将不会再次执行,如下所示:

    @Override
    public void run() {
        if (isPeriodic() && (!executor.isShutdown())) {
            super.runAndReset();
            Date now=new Date();
            startDate=now.getTime()+Timer.getPeriod();
            executor.getQueue().add(this);
            System.out.println("Start Date: "+new Date(startDate));
        }
    }

一旦我们有了执行者的任务,我们就必须执行执行者。我们已经实现了扩展ScheduledThreadPoolExecutor类的NewsExecutor类。我们已经覆盖了decorateTask()方法。使用此方法,可以替换计划执行者使用的内部任务。默认情况下,它返回一个RunnableScheduledFuture接口的默认实现,但在我们的例子中,它将返回一个ExecutorClass实例的实例:

public class NewsExecutor extends ScheduledThreadPoolExecutor {

    public NewsExecutor(int corePoolSize) {
        super(corePoolSize);
    }

    @Override
    protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable,
            RunnableScheduledFuture<V> task) {
        ExecutorTask<V> myTask = new ExecutorTask<>(runnable, null, task, this);
        return myTask;
    }
}

我们必须实现其他版本的NewsSystemMain类来使用NewsExecutor。为此,我们实现了NewsAdvancedSystemAdvancedMain

现在您可以运行高级新闻系统来查看执行之间的延迟时间是如何变化的。

在本章中,我们扩展了ThreadPoolExecutorScheduledThreadPoolExecutor类,并重写了它们的一些方法。但是,如果需要更特殊的行为,可以重写更多的方法。以下是一些可以覆盖的方法:

  • shutdown():必须显式调用此方法才能结束执行器的执行。您可以覆盖它以添加一些代码来释放您自己的执行器使用的其他资源。
  • shutdownNow()shutdown()shutdownNow()的区别在于方法等待在执行器中等待的所有任务完成。
  • submit()invokeall()invokeany():调用这些方法将并发任务发送给执行者。如果需要在将任务插入执行器的任务队列之前或之后执行某些操作,则可以覆盖它们。请注意,在任务排队之前或之后添加自定义操作与在执行之前或之后添加自定义操作不同,这是我们在重写beforeExecute()afterExecute()方法时所做的。

在新闻阅读器示例中,我们使用scheduleWithFixedDelay()方法将任务发送给执行者。但是ScheduledThreadPoolExecutor类有其他方法来执行周期性任务或延迟后的任务:

  • schedule():此方法在给定延迟后执行任务。该任务只执行一次。
  • scheduleAtFixedRate():该方法执行给定周期的周期性任务。与 scheduleWithFixedDelay()方法的区别在于,在最后一种方法中,两次执行之间的延迟从第一次执行的结束到第二次执行的开始,而在第一种方法中,两次执行之间的延迟介于两次执行的开始之间。

在本章中,我们给出了两个示例,探讨了执行者的高级特征。在第一个示例中,我们继续使用第 2 章的客户机/服务器示例管理大量线程–执行器。我们实现了我们自己的执行器,扩展了ThreadPoolExecutor类,以按优先级执行任务,并测量每个用户的任务执行时间。我们还包括一个新命令,允许取消任务。

在第二个示例中,我们解释了如何使用ScheduledThreadPoolExecutor类执行周期性任务。我们实现了两个版本的新闻阅读器。第一个演示了如何使用ScheduledExecutorService,的基本功能,第二个演示了如何重写ScheduledExecutorService类的行为,例如更改任务两次执行之间的延迟时间。

在下一章中,您将学习如何执行返回结果的Executor任务。如果您扩展Thread类或实现Runnable接口,run()方法不会返回任何结果,但 executor 框架包含Callable接口,允许您实现返回结果的任务。**

教程来源于Github,感谢apachecn大佬的无私奉献,致敬!

技术教程推荐

图解 Google V8 -〔李兵〕

视觉笔记入门课 -〔高伟〕

重学线性代数 -〔朱维刚〕

高楼的性能工程实战课 -〔高楼〕

中间件核心技术与实战 -〔丁威〕

商业思维案例笔记 -〔曹雄峰〕

JavaScript进阶实战课 -〔石川〕

云计算的必修小课 -〔吕蕴偲〕

AI 应用实战课 -〔黄佳〕