在第 2 章管理大量线程–执行器中,我们介绍了执行器的基本特性,以此提高执行大量并发任务的并发应用的性能。在本章中,我们将为您的并发应用提供一个强大的工具,并进一步解释它们的特点。在本章中,我们将介绍以下内容:
executor 是类,允许程序员执行并发任务,而不必担心线程的创建和管理。程序员创建Runnable
对象并将其发送给执行器,执行器创建并管理执行这些任务所需的线程。在第 2 章管理大量线程–执行器中,我们介绍了执行器框架的基本特征:
然而,执行者可以给你更多的选择,使它们成为并发应用中的强大机制。
您可以在将任务发送给执行者后取消任务的执行。当您使用submit()
方法向执行器发送Runnable
对象时,它返回Future
接口的实现。此类允许您控制任务的执行。它有cancel()
方法,试图取消任务的执行。它接收一个布尔值作为参数。如果它接受了true
值,并且执行器正在执行该任务,则执行该任务的线程将被中断。
以下是无法取消要取消的任务的情况:
false
作为参数提供给cancel()
方法cancel()
方法返回一个布尔值,指示任务是否已取消。
ThreadPoolExecutor
类是Executor
和ExecutorService
接口的基本实现。但是 Java 并发 API 提供了此类的扩展,以允许执行计划任务。这是ScheduledThreadPoolExeuctor
课程,您可以:
执行器框架是一种非常灵活的机制。您可以实现自己的执行器来扩展现有类(ThreadPoolExecutor
或ScheduledThreadPoolExecutor
之一)以获得所需的行为。这些类包括使更改执行器工作方式变得容易的方法。如果您覆盖ThreadPoolExecutor
,您可以覆盖以下方法:
beforeExecute()
:在执行器中执行并发任务之前调用此方法。它接收将要执行的Runnable
对象和将执行它们的Thread
对象。此方法接收的Runnable
对象是FutureTask
类的实例,而不是您使用submit()
方法发送给执行者的Runnable
对象。afterExecute()
:在执行器中执行并发任务后调用此方法。它接收已执行的Runnable
对象和存储任务内抛出的可能异常的Throwable
对象。在beforeExecute()
方法中,Runnable
对象是FutureTask
类的实例。newTaskFor()
:此方法创建要执行您使用submit()
方法发送的Runnable
对象的任务。它必须返回一个RunnableFuture
接口的实现。默认情况下,OpenJDK8 和 OracleJDK8 返回一个FutureTask
类的实例,但这种情况在将来的实现中可能会发生变化。如果扩展ScheduledThreadPoolExecutor
类,可以重写decorateTask()
方法。此方法与计划任务的newTaskFor()
类似。它允许您覆盖执行者执行的任务。
您还可以通过在执行器创建时更改一些参数来更改执行器的行为。最有用的是:
BlockingQueue<Runnable>
:每个执行者都使用内部BlockingQueue
来存储等待其执行的任务。您可以将此接口的任何实现作为参数传递。例如,您可以更改执行者用于执行任务的默认顺序。ThreadFactory
:您可以指定ThreadFactory
接口的实现,执行者将使用该工厂创建执行任务的线程。例如,您可以使用ThreadFactory
接口创建Thread
类的扩展,该扩展保存有关任务执行时间的日志信息。RejectedExecutionHandler
:调用shutdown()
或shutdownNow()
方法后,发送给执行者的所有任务都将被拒绝。您可以指定RejectedExecutionHandler
接口的实现来管理这种情况。在第 2 章管理大量线程–执行器中,我们给出了一个客户机/服务器应用的示例。我们实现了一个服务器来搜索世界银行世界发展指标的数据,并实现了一个客户端,该客户端对该服务器进行多次调用,以测试执行器的性能。
在本节中,我们将扩展该示例以添加以下特征:
为了实现这些新特性,我们对服务器进行了以下更改:
q;username;priority;codCountry;codIndicator;year
其中username
为用户名,priority
为查询优先级,codCountry
为国家代码,codIndicator
为指标代码,year
为查询年份的可选参数。r;username;priority;codIndicator
其中username
为用户名称,priority
为查询优先级,codIndicator
为您要报告的指标代码。s;username;priority
其中username
为用户名称,priority
为查询优先级。z;username;priority
其中username
为用户名称,priority
为查询优先级。c;username;priority
其中username
为用户名称,priority
为查询优先级。ConcurrentServer
和RequestTask
以考虑服务器的新元素服务器的其余元素(缓存系统、日志系统和DAO
类)是相同的,因此不再赘述。
正如我们前面提到的,我们已经实现了我们自己的执行器来执行服务器的任务。我们还实现了一些额外但必要的类来提供所有功能。让我们描述一下这些类。
我们的服务器将计算每个用户在其上执行的任务数量以及这些任务的总执行时间。为了存储这些数据,我们实现了ExecutorStatistics
类。它有两个属性来存储信息:
public class ExecutorStatistics {
private AtomicLong executionTime = new AtomicLong(0L);
private AtomicInteger numTasks = new AtomicInteger(0);
这些属性是支持单变量原子操作的AtomicVariables
。这允许您在不同的线程中使用这些变量,而无需使用任何同步机制。然后,它有两种方法来增加任务数量和执行时间:
public void addExecutionTime(long time) {
executionTime.addAndGet(time);
}
public void addTask() {
numTasks.incrementAndGet();
}
最后,我们添加了获取两个属性值的方法,并覆盖了toString()
方法,以可读的方式获取信息:
@Override
public String toString() {
return "Executed Tasks: "+getNumTasks()+". Execution Time: "+getExecutionTime();
}
当您创建一个执行器时,您可以指定一个类来管理被拒绝的任务。当您在执行器中调用了shutdown()
或shutdownNow()
方法后提交任务时,执行器将拒绝该任务。
为了控制这种情况,我们实现了RejectedTaskController
类。此类实现了RejectedExecutionHandler
接口,实现了rejectedExecution()
方法:
public class RejectedTaskController implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
ConcurrentCommand command=(ConcurrentCommand)task;
Socket clientSocket=command.getSocket();
try {
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(),true);
String message="The server is shutting down."
+" Your request can not be served."
+" Shutting Down: "
+String.valueOf(executor.isShutdown())
+". Terminated: "
+String.valueOf(executor.isTerminated())
+". Terminating: "
+String.valueOf(executor.isTerminating());
out.println(message);
out.close();
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
每个被拒绝的任务调用一次rejectedExecution()
方法,并将被拒绝的任务和拒绝该任务的执行者作为参数接收。
当向执行者提交Runnable
对象时,它不会直接执行该Runnable
对象。它创建了一个新对象,FutureTask
类的一个实例,这个任务由执行器的工作线程执行。
在我们的案例中,为了度量任务的执行时间,我们在ServerTask
类中实现了自己的FutureTask
实现。它扩展了FutureTask
类,实现了Comparable
接口,如下所示:
public class ServerTask<V> extends FutureTask<V> implements Comparable<ServerTask<V>>{
在内部,它存储将要作为ConcurrentCommand
对象执行的查询:
private ConcurrentCommand command;
在构造函数中使用FutureTask
类的构造函数并存储ConcurrentCommand
对象:
public ServerTask(ConcurrentCommand command) {
super(command, null);
this.command=command;
}
public ConcurrentCommand getCommand() {
return command;
}
public void setCommand(ConcurrentCommand command) {
this.command = command;
}
最后,它实现了比较两个ServerTask
实例存储的命令以进行比较的compareTo()
操作。这可以在以下代码中看到:
@Override
public int compareTo(ServerTask<V> other) {
return command.compareTo(other.getCommand());
}
现在我们有了执行器的辅助类,我们必须实现执行器本身。为此,我们实现了ServerExecutor
类。它扩展了ThreadPoolExecutor
类,并具有一些内部属性,如下所示:
startTimes
:这是一个ConcurrentHashMap
,用于存储每个任务的开始日期。类的键将是ServerTask
对象(一个Runnable
对象),值将是Date
对象。executionStatistics
:这是一个ConcurrentHashMap
用于存储每个用户的使用统计信息。键将是用户名,值将是一个ExecutorStatistics
对象。CORE_POOL_SIZE
、MAXIMUM_POOL_SIZE
、KEEP_ALIVE_TIME
:这些是定义执行者特征的常数。REJECTED_TASK_CONTROLLER
:这是一个RejectedTaskController
类属性,用于控制被执行者拒绝的任务。此可由以下代码解释:
public class ServerExecutor extends ThreadPoolExecutor {
private ConcurrentHashMap<Runnable, Date> startTimes;
private ConcurrentHashMap<String, ExecutorStatistics> executionStatistics;
private static int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors();
private static int MAXIMUM_POOL_SIZE = Runtime.getRuntime().availableProcessors();
private static long KEEP_ALIVE_TIME = 10;
private static RejectedTaskController REJECTED_TASK_CONTROLLER = new RejectedTaskController();
public ServerExecutor() {
super(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new PriorityBlockingQueue<>(), REJECTED_TASK_CONTROLLER);
startTimes = new ConcurrentHashMap<>();
executionStatistics = new ConcurrentHashMap<>();
}
类的构造函数调用父构造函数,创建一个PriorityBlockingQueue
类来存储将在 executor 中执行的任务。此类根据compareTo()
方法的执行结果对元素进行排序(因此存储在其中的元素必须实现Comparable
接口)。此类的使用将允许我们按优先级执行任务。
然后,我们重写了ThreadPoolExecutor
类的一些方法。首先是beforeExecute()
方法。此方法在执行每个任务之前执行。它接收ServerTask
对象作为参数,以及将要执行任务的线程。在本例中,我们将实际日期存储在ConcurrentHashMap
中,每个任务的开始日期如下:
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
startTimes.put(r, new Date());
}
第二种方法是afterExecute()
方法。此方法在执行器中的每个任务执行后执行,并接收已被执行的对象作为参数和Throwable
对象。最后一个参数只有在任务执行期间引发异常时才有值。在我们的案例中,我们将使用此方法:
计算任务的执行时间。
按以下方式更新用户的统计信息:
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
ServerTask<?> task=(ServerTask<?>)r;
ConcurrentCommand command=task.getCommand();
if (t==null) {
if (!task.isCancelled()) {
Date startDate = startTimes.remove(r);
Date endDate=new Date();
long executionTime= endDate.getTime() - startDate.getTime();
;
ExecutorStatistics statistics = executionStatistics.computeIfAbsent (command.getUsername(), n -> new ExecutorStatistics());
statistics.addExecutionTime(executionTime);
statistics.addTask();
ConcurrentServer.finishTask (command.getUsername(), command);
}
else {
String message="The task" + command.hashCode() + "of user" + command.getUsername() + "has been cancelled.";
Logger.sendMessage(message);
}
} else {
String message="The exception "
+t.getMessage()
+" has been thrown.";
Logger.sendMessage(message);
}
}
最后,我们已经覆盖了newTaskFor()
方法。将执行此方法来转换我们发送给执行者的Runnable
对象,在执行者将执行的FutureTask
实例中使用submit()
方法。在我们的例子中,我们用我们的ServerTask
对象替换默认的FutureTask
类:
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new ServerTask<T>(runnable);
}
我们在 executor 中包含了一个附加方法,用于将 executor 中存储的所有统计信息写入日志系统。此方法将在服务器执行结束时调用,稍后您将看到。我们有以下代码:
public void writeStatistics() {
for(Entry<String, ExecutorStatistics> entry: executionStatistics.entrySet()) {
String user = entry.getKey();
ExecutorStatistics stats = entry.getValue(); Logger.sendMessage(user+":"+stats);
}
}
命令类执行您可以发送到服务器的不同查询。您可以向我们的服务器发送五个不同的查询:
ConcurrentQueryCommand
类实现的。ConcurrentReportCommand
类实现的。ConcurrentStatusCommand
类实现的。ConcurrentCancelCommand
类实现的。ConcurrentStopCommand
类实现的。我们还有ConcurrentErrorCommand
类来管理未知命令到达服务器时的情况,而ConcurrentCommand
是所有命令的基类。
这是每个命令的基类。它包括所有命令共有的行为,包括:
该类扩展了Command
类,实现了Comparable
和Runnable
接口。在第 2 章的示例管理大量线程–执行器中,命令是简单类,但在本例中,并发命令是将发送给执行器的Runnable
对象:
public abstract class ConcurrentCommand extends Command implements Comparable<ConcurrentCommand>, Runnable{
它有三个属性:
username
:存储发送查询的用户名称。priority
:存储查询的优先级。它将确定查询的执行顺序。socket
:用于与客户端通信的套接字。类的构造函数初始化这些属性:
private String username;
private byte priority;
private Socket socket;
public ConcurrentCommand(Socket socket, String[] command) {
super(command);
username=command[1];
priority=Byte.parseByte(command[2]);
this.socket=socket;
}
这个类的主要功能是抽象的execute()
方法和run()
方法,该方法将由每个具体的命令来实现,以计算和返回查询结果。run()
方法调用execute()
方法,将结果存储在缓存中,将结果写入套接字,并关闭通信中使用的所有资源。我们有以下几点:
@Override
public abstract String execute();
@Override
public void run() {
String message="Running a Task: Username: "
+username
+"; Priority: "
+priority;
Logger.sendMessage(message);
String ret=execute();
ParallelCache cache = ConcurrentServer.getCache();
if (isCacheable()) {
cache.put(String.join(";",command), ret);
}
try {
PrintWriter out = new PrintWriter(socket.getOutputStream(),true);
out.println(ret);
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(ret);
}
最后,compareTo()
方法使用优先级属性来确定任务的顺序。这个将被PriorityBlockingQueue
类用来排序任务,因此优先级较高的任务将首先执行。考虑到当getPriority()
方法返回较低的值时,任务具有较高的优先级。如果任务的getPriority()
返回1
,则该任务的优先级将高于getPriority()
方法返回2
的任务:
@Override
public int compareTo(ConcurrentCommand o) {
return Byte.compare(o.getPriority(), this.getPriority());
}
我们对实现不同命令的类做了一些小改动,并添加了一个由ConcurrentCancelCommand
类实现的新类。这些类的主要逻辑包含在execute()
方法中,该方法计算对查询的响应并将其作为字符串返回。
新的ConcurrentCancelCommand
的execute()
方法调用ConcurrentServer
类的cancelTasks()
方法。此方法将停止执行与作为参数传递的用户关联的所有挂起任务:
@Override
public String execute() {
ConcurrentServer.cancelTasks(getUsername());
String message = "Tasks of user "
+getUsername()
+" has been cancelled.";
Logger.sendMessage(message);
return message;
}
ConcurrentReportCommand
的execute()
方法使用WDIDAO
类的query()
方法获取用户请求的数据。在第 2 章管理大量线程–执行器中,您可以找到此方法的实现。实现几乎相同。唯一的区别是命令数组索引如下所示:
@Override
public String execute() {
WDIDAO dao=WDIDAO.getDAO();
if (command.length==5) {
return dao.query(command[3], command[4]);
} else if (command.length==6) {
try {
return dao.query(command[3], command[4], Short.parseShort(command[5]));
} catch (NumberFormatException e) {
return "ERROR;Bad Command";
}
} else {
return "ERROR;Bad Command";
}
}
ConcurrentQueryCommand
的execute()
方法使用WDIDAO
类的report()
方法获取数据。在第 2 章管理大量线程–执行器中,您也可以找到此方法的实现。这里的实现几乎相同。唯一的区别是命令数组索引:
@Override
public String execute() {
WDIDAO dao=WDIDAO.getDAO();
return dao.report(command[3]);
}
ConcurrentStatusCommand
在其构造函数中有一个额外的参数:Executor
对象将执行命令。此命令使用此对象获取有关执行器的信息,并将其作为响应发送给用户。该实现与第 2 章中的管理大量线程–执行器中的实现几乎相同。我们使用了相同的方法来获取Executor
对象的状态。
ConcurrentStopCommand
和ConcurrentErrorCommand
也与第 2 章中的管理大量线程–执行器相同,所以我们没有包含它们的源代码。
服务器部分从服务器的客户端接收查询,并创建执行查询的命令类并将其发送给执行器。它由两个类实现:
ConcurrentServer
类:包括服务器的main()
方法和其他取消任务、完成系统执行的方法RequestTask
类:该类创建命令并将其发送给执行器与第 2 章的示例管理大量线程–执行器的主要区别在于RequestTask
类的角色。在SimpleServer
示例中,ConcurrentServer
类为每个查询创建一个RequestTask
对象,并将它们发送给执行器。在本例中,我们只有一个将作为线程执行的RequestTask
实例。当ConcurrentServer
接收到连接时,它将套接字存储在挂起连接的并发列表中,以便与客户端通信。RequestTask
线程读取该套接字,处理客户端发送的数据,创建相应的命令,并将该命令发送给执行器。
对于代码中的更改,仅将代码中的更改留给执行器进行预处理。
ConcurrentServer
类需要一些内部属性才能正常工作:
ParallelCache
实例。ServerSocket
实例。boolean
值,用于知道何时必须停止执行。LinkedBlockingQueue
用于存储向服务器发送消息的客户端的套接字。这些套接字将由RequestTask
类处理。ConcurrentHashMap
用于存储与执行器中执行的每个任务相关联的Future
对象。键将是发送查询的用户的用户名,值将是另一个键为ConcurrenCommand
对象的Map
,值将是与该任务关联的Future
实例。我们使用这些Future
实例来取消任务的执行。RequestTask
实例,用于创建命令并将其发送给执行器。Thread
对象来执行RequestTask
对象。其代码如下:
public class ConcurrentServer {
private static ParallelCache cache;
private static volatile boolean stopped=false;
private static LinkedBlockingQueue<Socket> pendingConnections;
private static ConcurrentMap<String, ConcurrentMap<ConcurrentCommand, ServerTask<?>>> taskController;
private static Thread requestThread;
private static RequestTask task;
此类的main()
方法初始化这些对象,并打开ServerSocket
实例来监听来自客户端的连接。此外,它还创建了RequestTask
对象,并将其作为线程执行。它将处于循环中,直到shutdown()
方法更改停止属性的值。在此之后,它使用RequestTask
对象的endTermination()
方法等待Executor
对象的终结,并使用finishServer()
方法关闭Logger
系统和RequestTask
对象:
public static void main(String[] args) {
WDIDAO dao=WDIDAO.getDAO();
cache=new ParallelCache();
Logger.initializeLog();
pendingConnections = new LinkedBlockingQueue<Socket>();
taskController = new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Future<?>>>();
task=new RequestTask(pendingConnections, taskController);
requestThread=new Thread(task);
requestThread.start();
System.out.println("Initialization completed.");
serverSocket= new ServerSocket(Constants.CONCURRENT_PORT);
do {
try {
Socket clientSocket = serverSocket.accept();
pendingConnections.put(clientSocket);
} catch (Exception e) {
e.printStackTrace();
}
} while (!stopped);
finishServer();
System.out.println("Shutting down cache");
cache.shutdown();
System.out.println("Cache ok" + new Date());
}
包括两种关闭服务器执行器的方法。shutdown()
方法更改stopped
变量的值并关闭serverSocket
实例。finishServer()
方法停止执行器,中断执行RequestTask
对象的线程,并关闭Logger
系统。我们将此过程分为两部分使用Logger
系统,直到服务器的最后一条指令:
public static void shutdown() {
stopped=true;
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void finishServer() {
System.out.println("Shutting down the server...");
task.shutdown();
System.out.println("Shutting down Request task");
requestThread.interrupt();
System.out.println("Request task ok");
System.out.println("Closing socket");
System.out.println("Shutting down logger");
Logger.sendMessage("Shutting down the logger");
Logger.shutdown();
System.out.println("Logger ok");
System.out.println("Main server thread ended");
}
服务器包括取消与用户关联的任务的方法。如前所述,Server
类使用嵌套的ConcurrentHashMap
来存储与用户关联的所有任务。首先,我们获得用户所有任务的Map
,然后我们处理这些任务的所有Future
对象,调用Future
对象的cancel()
方法。我们将值true
作为参数传递,因此如果执行者正在运行来自该用户的任务,它将被中断。我们已包含必要的代码,以避免取消ConcurrentCancelCommand
:
public static void cancelTasks(String username) {
ConcurrentMap<ConcurrentCommand, ServerTask<?>> userTasks = taskController.get(username);
if (userTasks == null) {
return;
}
int taskNumber = 0;
Iterator<ServerTask<?>> it = userTasks.values().iterator();
while(it.hasNext()) {
ServerTask<?> task = it.next();
ConcurrentCommand command = task.getCommand();
if(!(command instanceof ConcurrentCancelCommand) && task.cancel(true)) {
taskNumber++;
Logger.sendMessage("Task with code "+command.hashCode()+"cancelled: "+command.getClass().getSimpleName());
it.remove();
}
}
String message=taskNumber+" tasks has been cancelled.";
Logger.sendMessage(message);
}
最后,我们提供了一种方法,当任务正常完成执行时,从ServerTask
对象的嵌套映射中删除与任务关联的Future
对象。这是finishTask()
方法:
public static void finishTask(String username, ConcurrentCommand command) {
ConcurrentMap<ConcurrentCommand, ServerTask<?>> userTasks = taskController.get(username);
userTasks.remove(command);
String message = "Task with code "+command.hashCode()+" has finished";
Logger.sendMessage(message);
}
RequestTask
类是连接到客户端的ConcurrentServer
类和Executor
执行并发任务的类之间的中介。它打开客户端的套接字,读取查询数据,创建适当的命令,并将其发送给执行器。
它使用一些内部属性:
ConcurrentServer
类存储客户机套接字的LinkedBlockingQueue
类ServerExecutor
将命令作为并发任务执行Future
对象的ConcurrentHashMap
类的构造函数初始化所有这些对象:
public class RequestTask implements Runnable {
private LinkedBlockingQueue<Socket> pendingConnections;
private ServerExecutor executor = new ServerExecutor();
private ConcurrentMap<String, ConcurrentMap<ConcurrentCommand, ServerTask<?>>> taskController;
public RequestTask(LinkedBlockingQueue<Socket> pendingConnections, ConcurrentHashMap<String, ConcurrentHashMap<Integer, Future<?>>> taskController) {
this.pendingConnections = pendingConnections;
this.taskController = taskController;
}
这个类的主要方法是run()
方法。它执行一个循环,直到线程在处理存储在pendingConnections
对象中的套接字时被中断。在这个对象中,ConcurrentServer
类存储套接字,以便与向服务器发送查询的不同客户端通信。它打开套接字,读取数据,并创建相应的命令。这也会将命令发送给执行者,并将Future
对象存储在与任务的hashCode
和发送查询的用户关联的双ConcurrentHashMap
中:
public void run() {
try {
while (!Thread.currentThread().interrupted()) {
try {
Socket clientSocket = pendingConnections.take();
BufferedReader in = new BufferedReader(new InputStreamReader (clientSocket.getInputStream()));
String line = in.readLine();
Logger.sendMessage(line);
ConcurrentCommand command;
ParallelCache cache = ConcurrentServer.getCache();
String ret = cache.get(line);
if (ret == null) {
String[] commandData = line.split(";");
System.out.println("Command: " + commandData[0]);
switch (commandData[0]) {
case "q":
System.out.println("Query");
command = new ConcurrentQueryCommand(clientSocket, commandData);
break;
case "r":
System.out.println("Report");
command = new ConcurrentReportCommand (clientSocket, commandData);
break;
case "s":
System.out.println("Status");
command = new ConcurrentStatusCommand(executor, clientSocket, commandData);
break;
case "z":
System.out.println("Stop");
command = new ConcurrentStopCommand(clientSocket, commandData);
break;
case "c":
System.out.println("Cancel");
command = new ConcurrentCancelCommand (clientSocket, commandData);
break;
default:
System.out.println("Error");
command = new ConcurrentErrorCommand(clientSocket, commandData);
break;
}
ServerTask<?> controller = (ServerTask<?>)executor.submit(command);
storeContoller(command.getUsername(), controller, command);
} else {
PrintWriter out = new PrintWriter (clientSocket.getOutputStream(),true);
out.println(ret);
clientSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
// No Action Required
}
}
storeController()
方法是将Future
对象存储在双ConcurrentHashMap
中的方法:
private void storeContoller(String userName, ServerTask<?> controller, ConcurrentCommand command) {
taskController.computeIfAbsent(userName, k -> new ConcurrentHashMap<>()).put(command, controller);
}
最后,我们包含了两种方法来管理Executor
类的执行,一种是为执行者调用shutdown()
方法,另一种是等待其完成。请记住,必须显式调用shutdown()
或shutdownNow()
方法来结束执行器的执行。否则,程序将不会终止。请看以下代码:
public void shutdown() {
String message="Request Task: "
+pendingConnections.size()
+" pending connections.";
Logger.sendMessage(message);
executor.shutdown();
}
public void terminate() {
try {
executor.awaitTermination(1,TimeUnit.DAYS);
executor.writeStatistics();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
现在是测试我们服务器的时候了。在这种情况下,我们不会太担心执行时间。我们测试的主要目的是检查新特性是否工作良好。
我们将客户机部分分为以下两类:
我们已经包含了一个执行器来执行对服务器的并发请求,以提高客户机的并发级别。
在下图中,您可以看到取消任务的结果:
在此情况下,用户用户的四个任务被取消。
下图显示了有关每个用户的任务数和执行时间的最终统计信息:
在前面的执行者示例中,任务只执行一次,并尽快执行。executor 框架包括其他 executor 实现,它为我们提供了更多关于任务执行时间的灵活性。ScheduledThreadPoolExecutor
类允许我们定期执行任务,并在延迟后执行任务。
在本部分中,您将学习如何执行实现RSS 提要阅读器的定期任务。这是一个简单的例子,您需要定期执行相同的任务(阅读 RSS 提要的新闻)。我们的示例将具有以下特征:
Runnable
对象。每次执行器运行对象时,它都会解析 RSS 源并将其转换为包含 RSS 内容的CommonInformationItem
对象列表。CommonInformationItem
写入缓冲区。只有新项目才会存储在缓冲区中。使用者将是一个独立的线程,从缓冲区读取新闻并将其写入磁盘。我们还实现了示例的高级版本,其中一个任务的两次执行之间的时间可能不同。
正如我们前面提到的,我们阅读 RSS 提要并将其转换为对象列表。为了解析 RSS 文件,我们将它们视为 XML 文件,并且我们在RSSDataCapturer
类中实现了一个SAX(简称Simple API for XML)解析器。它解析文件并创建一个CommonInformationItem
列表。此类为每个 RSS 项目存储以下信息:
* 标题**:RSS 项目的标题。
我们使用 Producer/Consumer 设计模式将新闻存储到磁盘中,因此我们需要一个缓冲区来存储新闻和一个Consumer
类,在本例中,该类从缓冲区读取新闻并将其存储到磁盘中。
我们在NewsBuffer
类中实现了缓冲区。它有两个内部属性:
CommonInformationItems
。HashMap
的并发实现。我们将使用它来存储之前存储在缓冲区中的新闻项的 ID。我们将只在缓冲区中插入之前未插入的新闻:
public class NewsBuffer {
private LinkedBlockingQueue<CommonInformationItem> buffer;
private ConcurrentHashMap<String, String> storedItems;
public NewsBuffer() {
buffer=new LinkedBlockingQueue<>();
storedItems=new ConcurrentHashMap<String, String>();
}
NewsBuffer
类中有两种方法:一种是在缓冲区中存储一个项,用于检查该项之前是否已插入;另一种是从缓冲区中获取下一个项。我们使用compute()
方法在ConcurrentHashMap
中插入元素。此方法将 lambda 表达式作为参数接收,该参数包含键和与此键关联的实际值(如果键没有关联值,则为 null)。在本例中,我们将该项添加到缓冲区中,该缓冲区以前没有处理过。我们使用add()
和take()
方法从队列中插入、获取和删除元素:
public void add (CommonInformationItem item) {
storedItems.compute(item.getId(), (id, oldSource) -> {
if(oldSource == null) {
buffer.add(item);
return item.getSource();
} else {
System.out.println("Item "+item.getId()+" has been processed before");
return oldSource;
}
});
}
public CommonInformationItem get() throws InterruptedException {
return buffer.take();
}
缓冲区的项目将由NewsWriter
类写入磁盘,该类将作为独立线程执行。它只有一个指向应用中使用的NewsBuffer
类的内部属性:
public class NewsWriter implements Runnable {
private NewsBuffer buffer;
public NewsWriter(NewsBuffer buffer) {
this.buffer=buffer;
}
此Runnable
对象的run()
方法从缓冲区中获取CommonInformationItem
实例,并将其保存到磁盘中。由于我们使用阻塞方法take
,如果缓冲区为空,则该线程将被阻塞,直到缓冲区中有元素:
public void run() {
try {
while (!Thread.currentThread().interrupted()) {
CommonInformationItem item=buffer.get();
Path path=Paths.get ("output\\"+item.getFileName());
try (BufferedWriter fileWriter = Files.newBufferedWriter(path, StandardOpenOption.CREATE)) {
fileWriter.write(item.toString());
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
//Normal execution
}
}
基本的读取器将使用标准的ScheduledThreadPoolExecutor
类定期执行任务。我们将对每个 RSS 源执行一个任务,从一个任务的一次执行结束到下一次执行开始之间有一分钟的时间。这些并发任务在NewsTask
类中实现。它有三个内部属性来存储 RSS 提要的名称、URL 和存储新闻的NewsBuffer
类:
public class NewsTask implements Runnable {
private String name;
private String url;
private NewsBuffer buffer;
public NewsTask (String name, String url, NewsBuffer buffer) {
this.name=name;
this.url=url;
this.buffer=buffer;
}
这个Runnable
对象的run()
方法只是解析 RSS 提要,获取 CommonItemInterface 实例列表,并将它们存储在缓冲区中。此方法将以周期性方式执行。在每次执行中,run()
方法将从头到尾执行:
@Override
public void run() {
System.out.println(name+": Running. " + new Date());
RSSDataCapturer capturer=new RSSDataCapturer(name);
List<CommonInformationItem> items=capturer.load(url);
for (CommonInformationItem item: items) {
buffer.add(item);
}
}
在本例中,我们还实现了另一个线程来实现执行器和任务的初始化以及等待执行的最终完成。我们将这个类命名为NewsSystem
。它有三个内部属性来存储 RSS 源文件的路径、存储新闻的缓冲区和控制执行结束的CountDownLatch
对象。CountDownLatch
类是一种同步机制,允许您让线程等待事件。我们将在第 9 章中详细介绍此类的使用,将深入探讨并发数据结构和同步工具。我们有以下代码:
public class NewsSystem implements Runnable {
private String route;
private ScheduledThreadPoolExecutor executor;
private NewsBuffer buffer;
private CountDownLatch latch=new CountDownLatch(1);
public NewsSystem(String route) {
this.route = route;
executor = new ScheduledThreadPoolExecutor (Runtime.getRuntime().availableProcessors());
buffer=new NewsBuffer();
}
在run()
方法中,我们读取所有 RSS 源,为每个源创建一个NewsTask
类,并将它们发送给我们的ScheduledThreadPool
执行器。我们已经使用Executors
类的newScheduledThreadPool()
方法创建了 executor,并使用scheduleAtFixedDelay()
方法将任务发送给它。我们还将NewsWriter
实例作为线程启动。run()
方法等待有人告诉它使用CountDownLatch
类的 await()
方法完成执行,并结束NewsWriter
任务和ScheduledExecutor
的执行。
@Override
public void run() {
Path file = Paths.get(route);
NewsWriter newsWriter=new NewsWriter(buffer);
Thread t=new Thread(newsWriter);
t.start();
try (InputStream in = Files.newInputStream(file);
BufferedReader reader = new BufferedReader(
new InputStreamReader(in))) {
String line = null;
while ((line = reader.readLine()) != null) {
String data[] = line.split(";");
NewsTask task = new NewsTask(data[0], data[1], buffer);
System.out.println("Task "+task.getName());
executor.scheduleWithFixedDelay(task,0, 1, TimeUnit.MINUTES);
}
} catch (Exception e) {
e.printStackTrace();
}
synchronized (this) {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Shutting down the executor.");
executor.shutdown();
t.interrupt();
System.out.println("The system has finished.");
}
我们还实现了shutdown()
方法。此方法将通知NewsSystem
类使用CountDownLatch
类的countDown()
方法结束其执行。此方法将唤醒run()
方法,因此将关闭运行NewsTask
对象的执行器:
public void shutdown() {
latch.countDown();
}
本例的最后一个类是实现本例main()
方法的Main
类。它以线程的形式启动一个NewsSystem
实例,等待 10 分钟,并通知线程其终结,从而完成整个系统的执行,如下所示:
public class Main {
public static void main(String[] args) {
// Creates the System an execute it as a Thread
NewsSystem system=new NewsSystem("data\\sources.txt");
Thread t=new Thread(system);
t.start();
// Waits 10 minutes
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Notifies the finalization of the System
(
system.shutdown();
}
执行此示例时,您会看到不同的任务是如何周期性执行的,以及新闻项是如何写入磁盘的,如以下屏幕截图所示:
基本的新闻阅读器是使用ScheduledThreadPoolExecutor
类的一个例子,但我们可以更进一步。与ThreadPoolExecutor
一样,我们可以实现自己的ScheduledThreadPoolExecutor
来获得特定的行为。在我们的例子中,我们希望周期性任务的延迟时间根据一天中的某个时刻而变化。在本部分中,您将学习如何实现此行为。
第一步是实现一个类,该类告诉我们周期性任务的两次执行之间的延迟。我们把它命名为Timer
类。它只有一个名为getPeriod()
的静态方法,返回一次执行结束到下一次执行开始之间的毫秒数。这是我们的实现,但您可以自己制作:
public class Timer {
public static long getPeriod() {
Calendar calendar = Calendar.getInstance();
int hour = calendar.get(Calendar.HOUR_OF_DAY);
if ((hour >= 6) && (hour <= 8)) {
return TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
}
if ((hour >= 13) && (hour <= 14)) {
return TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
}
if ((hour >= 20) && (hour <= 22)) {
return TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
}
return TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
}
}
接下来,我们必须执行执行器的内部任务。当您向执行者发送一个Runnable
对象时,在外部,您将该对象视为并发任务,但执行者将该对象转换为另一个任务,即FutureTask
类的实例,该类包括执行任务的run()
方法和管理任务执行的Future
接口的方法。为了实现这个示例,我们必须实现一个扩展FutureTask
类的类,并且,由于我们将在调度执行器中执行这些任务,它必须实现RunnableScheduledFuture
接口。此接口提供了返回下一次执行任务剩余时间的getDelay()
方法。我们已经在ExecutorTask
课上完成了这些内部任务。它有四个内部属性:
ScheduledThreadPoolExecutor
类创建的原始RunnableScheduledFuture
内部任务其代码如下:
public class ExecutorTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
private RunnableScheduledFuture<V> task;
private NewsExecutor executor;
private long startDate;
private String name;
public ExecutorTask(Runnable runnable, V result, RunnableScheduledFuture<V> task, NewsExecutor executor) {
super(runnable, result);
this.task = task;
this.executor = executor;
this.name=((NewsTask)runnable).getName();
this.startDate=new Date().getTime();
}
我们在这个类中重写或实现了不同的方法。第一种是getDelay()
方法,正如我们之前所说的,它返回给定单位时间内下一次执行任务的剩余时间:
@Override
public long getDelay(TimeUnit unit) {
long delay;
if (!isPeriodic()) {
delay = task.getDelay(unit);
} else {
if (startDate == 0) {
delay = task.getDelay(unit);
} else {
Date now = new Date();
delay = startDate - now.getTime();
delay = unit.convert(delay, TimeUnit.MILLISECONDS);
}
}
return delay;
}
下一个方法是compareTo()
方法,考虑到下一次执行任务的开始日期,比较两个任务:
@Override
public int compareTo(Delayed object) {
return Long.compare(this.getStartDate(), ((ExecutorTask<V>)object).getStartDate());
}
然后,如果任务是周期性的,isPeriodic()
方法返回true
,如果任务不是周期性的,false
方法返回:
@Override
public boolean isPeriodic() {
return task.isPeriodic();
}
最后,我们有run()
方法,它实现了本例中最重要的部分。首先,我们调用FutureTask
类的runAndReset()
方法。此方法执行任务并重置其状态,以便可以再次执行。然后,我们使用Timer
类计算下一次执行的开始日期,最后,我们必须再次将任务插入ScheduledThreadPoolExecutor
类的队列中。如果我们不执行最后一步,任务将不会再次执行,如下所示:
@Override
public void run() {
if (isPeriodic() && (!executor.isShutdown())) {
super.runAndReset();
Date now=new Date();
startDate=now.getTime()+Timer.getPeriod();
executor.getQueue().add(this);
System.out.println("Start Date: "+new Date(startDate));
}
}
一旦我们有了执行者的任务,我们就必须执行执行者。我们已经实现了扩展ScheduledThreadPoolExecutor
类的NewsExecutor
类。我们已经覆盖了decorateTask()
方法。使用此方法,可以替换计划执行者使用的内部任务。默认情况下,它返回一个RunnableScheduledFuture
接口的默认实现,但在我们的例子中,它将返回一个ExecutorClass
实例的实例:
public class NewsExecutor extends ScheduledThreadPoolExecutor {
public NewsExecutor(int corePoolSize) {
super(corePoolSize);
}
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable,
RunnableScheduledFuture<V> task) {
ExecutorTask<V> myTask = new ExecutorTask<>(runnable, null, task, this);
return myTask;
}
}
我们必须实现其他版本的NewsSystem
和Main
类来使用NewsExecutor
。为此,我们实现了NewsAdvancedSystem
和AdvancedMain
。
现在您可以运行高级新闻系统来查看执行之间的延迟时间是如何变化的。
在本章中,我们扩展了ThreadPoolExecutor
和ScheduledThreadPoolExecutor
类,并重写了它们的一些方法。但是,如果需要更特殊的行为,可以重写更多的方法。以下是一些可以覆盖的方法:
shutdown()
:必须显式调用此方法才能结束执行器的执行。您可以覆盖它以添加一些代码来释放您自己的执行器使用的其他资源。shutdownNow()
:shutdown()
和shutdownNow()
的区别在于方法等待在执行器中等待的所有任务完成。submit()
、invokeall()
或invokeany()
:调用这些方法将并发任务发送给执行者。如果需要在将任务插入执行器的任务队列之前或之后执行某些操作,则可以覆盖它们。请注意,在任务排队之前或之后添加自定义操作与在执行之前或之后添加自定义操作不同,这是我们在重写beforeExecute()
和afterExecute()
方法时所做的。在新闻阅读器示例中,我们使用scheduleWithFixedDelay()
方法将任务发送给执行者。但是ScheduledThreadPoolExecutor
类有其他方法来执行周期性任务或延迟后的任务:
schedule()
:此方法在给定延迟后执行任务。该任务只执行一次。scheduleAtFixedRate()
:该方法执行给定周期的周期性任务。与 scheduleWithFixedDelay()
方法的区别在于,在最后一种方法中,两次执行之间的延迟从第一次执行的结束到第二次执行的开始,而在第一种方法中,两次执行之间的延迟介于两次执行的开始之间。在本章中,我们给出了两个示例,探讨了执行者的高级特征。在第一个示例中,我们继续使用第 2 章的客户机/服务器示例管理大量线程–执行器。我们实现了我们自己的执行器,扩展了ThreadPoolExecutor
类,以按优先级执行任务,并测量每个用户的任务执行时间。我们还包括一个新命令,允许取消任务。
在第二个示例中,我们解释了如何使用ScheduledThreadPoolExecutor
类执行周期性任务。我们实现了两个版本的新闻阅读器。第一个演示了如何使用ScheduledExecutorService,
的基本功能,第二个演示了如何重写ScheduledExecutorService
类的行为,例如更改任务两次执行之间的延迟时间。
在下一章中,您将学习如何执行返回结果的Executor
任务。如果您扩展Thread
类或实现Runnable
接口,run()
方法不会返回任何结果,但 executor 框架包含Callable
接口,允许您实现返回结果的任务。**