从第 2 章管理大量线程–执行器,到第 8 章使用并行流处理海量数据集–映射和收集模型,您使用 Java 并发 API 的最重要部分实现了不同的示例。通常,这些示例是真实的,但大多数情况下,这些示例可能是更大系统的一部分。例如,在第 4 章中从任务获取数据–可调用和未来接口中,您实现了一个应用来构建一个反向索引,用于信息检索系统。在第 6 章优化分治的解决方案——Fork/Join 框架中,您实现了 k-means 聚类算法对一组文档进行聚类。但是,您可以实现一个完整的信息检索应用,该应用读取一组文档,使用向量空间模型表示它们,并使用 K-NN 算法对它们进行聚类。在这些情况下,您有可能使用不同并发技术的不同部分(执行器、流等),但它们必须在它们之间进行同步和通信,以获得所需的结果。
此外,本书中介绍的所有示例都可以使用 Java 并发 API 的其他组件实现。我们也将讨论其中一些备选方案。
在本章中,我们将介绍以下主题:
大型计算机应用由不同的组件组成,这些组件共同工作以获得所需的功能。这些组件必须在它们之间进行同步和通信。在第 9 章中深入到并发数据结构和同步工具中,您了解到可以使用不同的 Java 类来同步任务并在它们之间进行通信。但是,如果要同步的组件也是可以使用不同机制实现其并发性的并发系统,则此任务组织会更加复杂。对于示例,您在应用中有一个组件,该组件使用 Fork/Join 框架生成结果,这些结果由使用Phaser
类同步的其他任务使用。
在这些情况下,您可以使用以下两种机制来同步和通信这些组件:
在本节中,您将实现一个应用来对文档进行集群,该应用由四个子系统组成,它们之间通过通信和同步来对文档进行集群。
此应用将读取一组文档,并使用 k-均值聚类算法对其进行组织。为此,我们将使用四个组件:
String
对象列表。所有这些系统都是并发的,并使用它们自己的任务来实现它们的功能。让我们看看如何实现这个示例。
让我们看看如何实现阅读器、索引器、映射器和集群系统。
我们已经在DocumentReader
类中实现了这个系统。此类实现Runnable
接口,内部使用三个属性:
ConcurrentLinkedDeque
类的String
对象,包含您必须处理的所有文件的名称ConcurrentLinkedQueue
类TextFile
对象CountDownLatch
对象类的构造函数初始化这些属性(构造函数将这三个属性作为参数接收),此处给出的run()
方法实现所有功能:
String route;
System.out.println(Thread.currentThread().getName()+": Reader start");
while ((route = files.pollFirst()) != null) {
Path file = Paths.get(route);
TextFile textFile;
try {
textFile = new TextFile(file);
buffer.offer(textFile);
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+": Reader end: "+buffer.size());
readersCounter.countDown();
}
}
首先,我们读取所有文件的内容。对于每个文件,我们创建一个TextFile
类的对象。此类包含文本文件的名称和内容。它有一个构造函数,该构造函数接收具有文件路由的Path
对象。最后,我们在控制台中编写一条消息,并使用CountDownLatch
对象的countDown()
方法来指示此任务的结束。
这是TextFile
类的代码。在内部,它有两个属性来存储文件名及其内容。使用Files
类的readAllLines()
方法将文件内容转换为List<String>
数据结构:
public class TextFile {
private String fileName;
private List<String> content;
public TextFile(String fileName, List<String> content) {
this.fileName = fileName;
this.content = content;
}
public TextFile(Path path) throws IOException {
this(path.getFileName().toString(), Files.readAllLines(path));
}
public String getFileName() {
return fileName;
}
public List<String> getContent() {
return content;
}
}
该系统在Indexer
类中实现,该类也实现了Runnable
接口。在本例中,我们使用五个内部属性,如下所示:
TextFile
的ConcurrentLinkedQueue
Document
对象的ConcurrentLinkedDeque
用于存储构成每个文档的单词列表Reader
系统终结的CountDownLatch
对象CountDownLatch
对象,用于指示此系统任务的完成Vocabulary
对象,用于存储构成文档集合的所有单词类的构造函数初始化此属性(将所有属性作为参数接收):
public class Indexer implements Runnable {
private ConcurrentLinkedQueue<TextFile> buffer;
private ConcurrentLinkedDeque<Document> documents;
private CountDownLatch readersCounter;
private CountDownLatch indexersCounter;
private Vocabulary voc;
run()
方法实现了所有功能,如图所示:
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+": Indexer start");
do {
TextFile textFile= buffer.poll();
if (textFile!=null) {
Document document= parseDoc(textFile);
首先,它从队列中获取TextFile
,如果不是null
,则使用parseDoc()
方法将其转换为Document
对象。然后,它处理文档中的所有单词以将它们存储在全局词汇表对象中,并将文档存储在文档列表中,如以下代码所示:
document.getVoc().values()
.forEach(voc::addWord);
documents.offer(document);
}
} while ((readersCounter.getCount()>0) || (!buffer.isEmpty()));
当Reader
系统正在运行或缓冲区中仍有文档时,重复此过程。最后,后面提到的代码片段中显示的run()
方法在控制台中写入一条消息,并使用CountDownLatch
对象的countDown()
方法指示此任务已完成执行:
indexersCounter.countDown();
System.out.println(Thread.currentThread().getName()+": Indexer end");
}
parseDoc()
方法接收带文档内容的List<String>
并返回一个Document
对象。它创建一个Document
对象,使用forEach()
方法处理所有行,如下所示:
private Document parseDoc(TextFile textFile) {
Document doc=new Document();
doc.setName(textFile.getFileName());
textFile.getContent().forEach(line -> parseLine(line,doc));
return doc;
}
parseLine()
方法将行拆分为字,并存储到doc
对象中,如下所示:
private static void parseLine(String inputLine, Document doc) {
// Clean string
String line=new String(inputLine);
line = Normalizer.normalize(line, Normalizer.Form.NFKD);
line = line.replaceAll("[^\\p{ASCII}]", "");
line = line.toLowerCase();
// Tokenizer
StringTokenizer tokenizer = new StringTokenizer(line,
" ,.;:-{}[]¿?¡!|\\=*+/()\"@\t~#<>", false);
while (tokenizer.hasMoreTokens()) {
doc.addWord(tokenizer.nextToken());
}
}
在预编译replaceAll()
方法中使用的正则表达式之前,您可以在提供的代码中包含优化:
static final Pattern NON_ASCII = Pattern.compile("[^\\p{ASCII}]");
line = NON_ASCII.matcher(line).replaceAll("");
}
该系统在Mapper
类中实现,该类也实现了Runnable
接口。在内部,它使用以下两个属性:
Document
对象的ConcurrentLinkedDeque
Vocabulary
对象其代码如下:
public class Mapper implements Runnable {
private ConcurrentLinkedDeque<Document> documents;
private Vocabulary voc;
类的构造函数初始化这些属性,run()
方法实现该系统的功能:
public void run() {
Document doc;
int counter=0;
System.out.println(Thread.currentThread().getName()+": Mapper start");
while ((doc=documents.pollFirst())!=null) {
counter++;
首先,它使用pollFirst()
方法从文档的Deque
对象获取文档。然后,它处理文档中的所有单词,计算tfxidf
度量值,并创建一个新的Attribute
对象来存储这些值。这些属性存储在一个列表中。
List<Attribute> attributes=new ArrayList<>();
doc.getVoc().forEach((key, item)-> {
Word word=voc.getWord(key);
item.setTfxidf(item.getTfxidf()/word.getDf());
Attribute attribute=new Attribute();
attribute.setIndex(word.getIndex());
attribute.setValue(item.getTfxidf());
attributes.add(attribute);
});
最后,我们将列表转换为一个Attribute
对象数组,并将该数组存储在Document
对象中:
Collections.sort(attributes);
doc.setExample(attributes);
}
System.out.println(Thread.currentThread().getName()+": Mapper end: "+counter);
}
本系统实现了 k-均值聚类算法。您可以使用第 5 章中介绍的元素,将运行任务分为阶段–相位器类来实现此系统。这一实现具有以下要素:
Attribute
对象之间的欧氏距离RecursiveAction
类(Fork/Join 框架),执行算法的赋值任务,计算每个文档与所有簇的距离,确定每个文档的簇RecursiveAction
类(Fork/Join 框架)并执行算法的更新任务,该算法将每个集群的质心重新计算为其上存储的文档的平均值calculate()
,该方法执行聚类算法并返回一个包含所有生成的聚类的DocumentCluster
对象数组我们只添加了一个新类,ClusterTask
类,它实现了Runnable
接口,并将调用ConcurrentKMeans
类的calculate()
方法。在内部,它使用两个属性,如下所示:
Document
对象数组Vocabulary
对象构造函数初始化这些属性,run()
方法实现任务的逻辑。我们调用ConcurrentKMeans
类的calculate()
方法传递五个参数如下:
Document
对象数组。Vocabulary
对象。10
作为集群的数量。991
作为种子。10
作为最小尺寸。此为该类代码:
@Override
public void run() {
System.out.println("Documents to cluster: "+documents.length);
ConcurrentKMeans.calculate(documents, 10, voc.getVocabulary().size(), 991, 10);
}
一旦我们实现了应用中使用的所有元素,我们就必须实现系统的main()
方法。在这种情况下,此方法至关重要,因为它负责启动系统并创建同步系统所需的元素。Reader
和Indexer
系统将同时执行。他们将使用缓冲区在他们之间共享信息。当读卡器读取一个文档时,它将写入缓冲区中的String
对象列表,然后继续处理下一个文档。它不会等待处理List
的任务。这是异步消息传递的一个示例。Indexer
系统将从缓冲区中取出文档,对其进行处理,并生成包含文档所有单词的Vocabulary
对象。Indexer
系统执行的所有任务共享Vocabulary
类的实例。这是共享内存的一个示例。
主类将使用CountDownLatch
对象的await()
方法以同步方式等待Reader
和Indexer
系统的终结。此方法阻止调用线程的执行,直到其内部计数器到达 0。
两个系统完成执行后,Mapper
系统将使用Vocabulary
对象和Document
信息获取每个文档的向量空间模型表示。当Mapper
执行完毕后,Clustering
系统对所有单据进行集群处理。我们已经使用了CompletableFuture
类来同步Mapper
系统的结束和Clustering
系统的开始。这是两个系统之间异步通信的另一个示例。
我们已经在ClusteringDocs
类中实现了主类。
首先,我们创建一个ThreadPoolExecutor
对象,使用readFileNames()
方法获取包含文档的文件的ConcurrentLinkedDeque
:
public class ClusteringDocs {
private static int NUM_READERS = 2;
private static int NUM_WRITERS = 4;
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor=(ThreadPoolExecutor) Executors.newCachedThreadPool();
ConcurrentLinkedDeque<String> files=readFiles("data");
System.out.println(new Date()+":"+files.size()+" files read.");
然后,我们创建文档缓冲区ConcurrentLinkedDeque
,用于存储Document
对象、Vocabulary
对象和两个CountDownLatch
对象,一个用于控制Reader
系统任务的结束,另一个用于控制Indexer
系统任务的结束。我们有以下代码:
ConcurrentLinkedQueue<List<String>> buffer=new ConcurrentLinkedQueue<>();
CountDownLatch readersCounter=new CountDownLatch(2);
ConcurrentLinkedDeque<Document> documents=new ConcurrentLinkedDeque<>();
CountDownLatch indexersCounter=new CountDownLatch(4);
Vocabulary voc=new Vocabulary();
然后,我们启动两个任务来执行DocumentReader
类的Reader
系统,另外四个任务来执行Indexer
类的Indexer
系统。所有这些任务都在我们前面创建的Executor
对象中执行:
System.out.println(new Date()+":"+"Launching the tasks");
for (int i=0; i<NUM_READERS; i++) {
DocumentReader reader=new DocumentReader(files,buffer,readersCounter);
executor.execute(reader);
}
for (int i=0; i<NUM_WRITERS; i++) {
Indexer indexer=new Indexer(documents, buffer, readersCounter, indexersCounter, voc);
executor.execute(indexer);
}
然后,main()
方法等待该任务的完成;首先,针对DocumentReader
任务,然后针对Indexer
任务,如下所示:
System.out.println(new Date()+":"+"Waiting for the readers");
readersCounter.await();
System.out.println(new Date()+":"+"Waiting for the indexers");
indexersCounter.await();
然后,我们将Document
对象的ConcurrentLinkedDeque
类转换成一个数组:
Document[] documentsArray=new Document[documents.size()];
documentsArray=documents.toArray(documentsArray);
我们使用CompletableFuture
类的runAsync()
方法启动Indexer
系统,执行Mapper
类的四个任务,如下所示:
System.out.println(new Date()+":"+"Launching the mappers");
CompletableFuture<Void>[] completables = Stream.generate(() -> new Mapper(documents, voc))
.limit(4)
.map(CompletableFuture::runAsync)
.toArray(CompletableFuture[]::new);
然后,我们启动Clustering
系统来启动ClusterTask
类的一个任务(记住这些任务将启动其他任务来执行算法)。main()
方法使用CompletableFuture
类的allOf()
方法等待Mapper
任务的完成,然后在Mapper
系统完成后使用thenRunAsync()
方法启动聚类算法:
System.out.println(new Date()+":"+"Launching the cluster calculation");
CompletableFuture<Void> completableMappers= CompletableFuture.allOf(completables);
ClusterTask clusterTask=new ClusterTask(documentsArray, voc);
CompletableFuture<Void> completableClustering= completableMappers.thenRunAsync(clusterTask);
最后,我们使用get()
方法等待Clustering
系统的最终确定,并按如下方式完成程序的执行:
System.out.println(new Date()+":"+"Wating for the cluster calculation");
try {
completableClustering.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println(new Date()+":"+"Execution finished");
executor.shutdown();
}
readFileNames()
方法接收一个字符串作为参数,该字符串必须是存储文档集合的目录的路径,并使用该目录中包含的文件名生成一个ConcurrentLinkedDeque
类的String
对象。
为了测试这个应用,我们使用了 100673 个文档中的 10052 个文档的子集,其中包含从维基百科获取的电影信息,作为文档集合。在下图中,您可以看到从执行开始到索引器执行结束的第一部分执行的结果:
下图显示了示例的其余执行过程:
您可以看到任务是如何同步的,如本章前面所述。首先,Reader
和Indexer
任务以并发方式执行。完成后,映射器对数据进行转换,最后,聚类算法组织示例。
我们在本书各章中实现的大多数示例都可以使用 Java 并发 API 的其他组件实现。在本节中,我们将描述如何实现其中一些替代方案。
您已经在第 2章中实现了 k-最近邻算法,使用一个执行器管理大量线程–执行器。这是一个用于监督分类的简单机器学习算法。您有一组以前分类的示例。要获得新示例的类,请计算从该示例到示例训练集的距离。最近示例中的大多数类都是为示例选择的类。您还可以使用并发 API 的以下组件之一实现该算法:
Thread
对象实现此示例。您必须使用普通线程执行在执行器中执行的任务。每个线程将计算示例与训练集子集之间的距离,并将该距离保存在所有线程之间共享的数据结构中。完成所有线程后,可以使用距离对数据结构进行排序,并计算示例的类。limit()
获得最接近的结构,并计算最终的结果类。我们已经在第 4 章中实现了这个示例,使用执行器从任务中获取数据–可调用和未来接口。倒排索引是一种用于信息检索领域的数据结构,用于加速信息搜索。它存储文档集合中显示的单词以及每个单词出现的文档。搜索信息时,不需要处理文档。您可以查看反向索引来提取您插入的单词出现的文档,并构建结果列表。您还可以使用并发 API 的以下组件之一实现此算法:
您已经在第 4 章中实现了这个示例,从任务获取数据–可调用和未来接口。该算法的主要目标是找到最类似于作为参数传递的字符串的单词。您还可以使用并发 API 的以下组件之一实现此算法:
您已经在第 5 章中实现了这个示例,运行分为阶段的任务–相位器类。遗传算法是一种基于自然选择原则的自适应启发式搜索算法,用于生成优化和搜索问题的良好解决方案。对于遗传算法,使用多线程有不同的方法。最经典的是创造岛屿。每个线程代表一个岛屿,其中一部分种群进化。有时,岛屿之间的迁移发生在将一些个体从一个岛屿转移到另一个岛屿的过程中。算法完成后,选择所有岛屿上的最佳物种。这种方法大大减少了争用,因为线程很少相互通信。
还有许多出版物和网站中详细描述的其他方法。例如,这个讲义集在中非常好地总结了这些方法 https://cw.fel.cvut.cz/wiki/_media/courses/a0m33eoa/prednasky/08pgas-handouts.pdf 。
您还可以使用并发 API 的以下组件之一实现此算法:
您已经在第 5 章中实现了这个示例,运行分为阶段的任务–相位器类。我们使用这种算法来提取描述文档的一小部分单词。我们试图通过 Tf Idf 等方法找到信息量最大的单词。您还可以使用并发 API 的以下组件实现此示例:
您已经在第 6 章中优化分治解决方案——Fork/Join 框架中实现了该算法。该算法将一组元素分类为先前定义的数量的簇。您没有关于元素类的任何信息,因此这是一个无监督的学习算法,尝试查找类似的项。您还可以使用并发 API 的以下组件实现此示例:
您已经在第 6 章中优化分治解决方案——Fork/Join 框架中实现了该算法。该算法的主要目标是从一组非常大的对象中选择满足特定条件的对象。您还可以使用并发 API 的以下组件实现此示例:
Stream
类的filter()
方法对对象进行搜索。然后,您可以减少这些结果,以获得所需的格式。您已经在第 7 章中使用并行流处理海量数据集实现了该算法——Map and Reduce 模型。在前面的一个示例中,我们讨论了如何实现创建反向索引的算法以加快信息搜索。这就是搜索信息的算法。您还可以使用并发 API 的以下组件实现此示例:
您已经在第 7 章中使用并行流处理海量数据集的过程中实现了这个示例—映射和简化模型。这种算法希望获得关于一组非常大的数据的统计信息。您还可以使用并发 API 的以下组件实现此示例:
您已经在第 8 章中实现了这个示例,使用并行流处理海量数据集–映射和收集模型。该算法在没有反向索引的情况下获取满足特定条件的对象,以加快搜索速度。在这些情况下,进行搜索时必须处理所有元素。您还可以使用并发 API 的以下组件实现此示例:
您已经在第 8 章中实现了这个示例,使用并行流处理海量数据集–映射和收集模型。推荐系统根据客户购买/使用的产品/服务以及购买/使用与其相同服务的用户购买/使用的产品/服务,向客户推荐产品或服务。您还可以使用并发 API 的 Phaser 组件实现此示例。该算法分为三个阶段:
在本书中,您实现了许多实际示例。其中一些示例可以作为更大系统的一部分使用。这些较大的系统通常具有不同的并发部分,这些部分必须共享信息并在它们之间进行同步。为了实现同步,我们可以使用三种机制:共享内存,当两个或多个任务共享一个对象或数据结构时,异步消息传递,当一个任务向另一个任务发送消息而不等待其处理时,以及同步消息传递,当一个任务向另一个任务发送消息并等待其处理时。
在本章中,我们实现了一个由四个子系统组成的集群文档应用。我们使用前面介绍的机制在这四个子系统之间同步和共享信息。
我们还修改了书中介绍的一些示例,以讨论其实现的其他替代方案。
在下一章中,您将学习如何获取并发 API 组件的调试信息,以及如何监视和测试并发应用。