在第 2 章、管理大量线程–执行器、第 3 章、从执行器获取最大值、第 4 章、从任务获取数据–可调用和未来接口,您学习了如何使用执行器作为一种机制来提高执行大量并发任务的并发应用的性能。Java7 并发 API 通过 Fork/Join 框架引入了一种特殊的执行器。该框架旨在为那些可以使用分治设计范式解决的问题实现最佳并行解决方案。在本章中,我们将介绍以下主题:
Java5 中引入的 executor 框架提供了一种执行并发任务的机制,无需创建、启动和完成线程。这个框架使用一个线程池来执行发送给执行者的任务,并将它们用于多个任务。这种机制为程序员提供了一些优势,如下所示:
分治算法是一种非常流行的设计技术。要使用此技术解决问题,可以将其划分为更小的问题。你以递归的方式重复这个过程,直到你要解决的问题小到可以直接解决为止。这些类型的问题可以使用 executor 来解决,但是为了更有效地解决它们,Java7 并发 API 引入了 Fork/Join 框架。
该框架基于ForkJoinPool
类,这是一种特殊的执行器、两种操作fork()
和join()
方法(及其不同变体),以及内部算法工作窃取算法。在本章中,您将学习实现以下三个示例的 Fork/Join 框架的基本特征、限制和组件:
正如我们前面提到的,必须使用 Fork/Join 框架来实现基于分治技术的问题解决方案。你必须把原来的问题分成更小的问题,直到它们小到可以直接解决为止。使用此框架,您将实现其主要方法如下所示的任务:
if ( problem.size() > DEFAULT_SIZE) {
divideTasks();
executeTask();
taskResults=joinTasksResult();
return taskResults;
} else {
taskResults=solveBasicProblem();
return taskResults;
}
最重要的部分是允许您以有效的方式划分和执行子任务,并获得这些子任务的结果以计算父任务的结果。ForkJoinTask
类提供的两种方法支持此功能,如下所示:
fork()
方法:此方法允许您向 Fork/Join 执行器发送子任务join()
方法:此方法允许您等待子任务完成并返回其结果这些方法有不同的变体,您将在示例中看到。Fork/Join 框架还有另一个关键部分:工作窃取算法,它决定要执行哪些任务。当一个任务正在等待使用join()
方法完成子任务时,执行该任务的线程从正在等待的任务池中获取另一个任务并开始执行。这样,Fork/Join 执行器的线程总是通过提高应用的性能来执行任务。
Java8 在 Fork/Join 框架中包含了一个新特性。现在,每个 Java 应用都有一个名为 common pool 的默认ForkJoinPool
。您可以通过调用ForkJoinPool.commonPool()
静态方法获取。您不需要显式地创建一个(尽管您可以)。默认情况下,此默认分叉/联接执行器将使用由计算机的可用处理器确定的线程数。您可以通过更改系统属性java.util.concurrent.ForkJoinPool.common.parallelism
的值来更改此默认行为。
JavaAPI 的一些特性使用 Fork/Join 框架来实现并发操作。例如,Arrays
类对数组进行并行排序的parallelSort()
方法和 Java 8 中引入的并行流(后面将在第 7 章中介绍,使用并行流处理海量数据集——Map and Reduce 模型和第 8 章、使用并行流处理海量数据集–映射和收集模型使用此框架。
由于 Fork/Join 框架被认为可以解决一类确定的问题,因此在使用它来实现问题时,您必须考虑到一些限制,如下所示:
RuntimeException
)。未经检查的异常有一种特殊的处理方式,您将在示例中看到。Fork/Join 框架中有五个基本类:
ForkJoinPool
类:这个类实现Executor
和ExecutorService
接口,它是您将用来执行 Fork/Join 任务的Executor
接口。Java 为您提供了一个默认的ForkJoinPool
对象(名为 common pool),但如果需要,您可以使用一些构造函数来创建一个。您可以指定并行级别(运行并行线程的最大数量)。默认情况下,它使用可用处理器的数量作为并发级别。ForkJoinTask
类:这是所有 Fork/Join 任务的基本抽象类。它是一个抽象类,提供了fork()
和join()
方法以及它们的一些变体。它还实现了Future
接口,并提供了确定任务是否以正常方式完成、是否被取消或是否引发未检查异常的方法。RecursiveTask
、RecursiveAction
和CountedCompleter
类提供了compute()
抽象方法,这些方法应该在子类中实现以执行实际计算。RecursiveTask
类:该类扩展了ForkJoinTask
类。它也是一个抽象类,应该是实现返回结果的 Fork/Join 任务的起点。RecursiveAction
类:该类扩展了ForkJoinTask
类。它也是一个抽象类,应该是实现不返回结果的 Fork/Join 任务的起点。CountedCompleter
类:该类扩展了ForkJoinTask
类。这是 Java8API 的一个新特性,它应该是您实现任务的起点,这些任务在完成时会触发其他任务。k-means 聚类算法是一种聚类算法,用于将之前未分类为预定义数量的 k 个聚类的一组项目分组。在数据挖掘和机器学习领域,以无监督的方式组织和分类数据非常流行。
每个项目通常由特征或属性向量定义。所有项目都具有相同数量的属性。每个簇也由一个向量定义,该向量具有相同数量的属性,表示分类到该簇中的所有项。这个向量被命名为质心。例如,如果项目是由数字向量定义的,则集群是由分类到该集群的项目的平均值定义的。
基本上,该算法有四个步骤:
此算法有以下两个主要限制:
尽管如此,该算法还是非常流行于对不同种类的项目进行聚类。为了测试我们的算法,您将实现一个应用来对一组文档进行集群。作为一个文档集合,我们采用了维基百科页面的简化版本,其中包含了我们在第 4 章中介绍的电影语料库信息,从任务中获取数据–可调用和未来接口。我们只拿了 1000 份文件。为了表示每个文档,我们必须使用向量空间模型表示。通过这种表示,每个文档都表示为一个数字向量,其中向量的每个维度表示一个单词或一个术语,其值是定义该单词或术语在文档中重要性的度量。
当使用向量空间模型表示文档集合时,向量的维数将与整个集合中不同单词的数量相同,因此向量将有很多零值,因为每个文档不包含所有单词。您可以在内存中使用更优化的表示,以避免所有这些零值,并节省内存,从而提高应用的性能。
在我们的案例中,我们选择术语频率–逆文档频率(tf idf作为定义每个单词重要性的度量,而 tf idf 较高的 50 个单词作为表示每个文档的术语。
我们使用两个文件:movies.words
文件存储向量中使用的所有单词的列表,movies.data
存储每个文档的表示。movies.data
文件的格式如下:
10000202,rabona:23.039285705435507,1979:8.09314752937111,argentina:7.953798614698405,la:5.440565539075689,argentine:4.058577338363469,editor:3.0401515284855267,spanish:2.9692083275217134,image_size:1.3701158713905104,narrator:1.1799670194306195,budget:0.286193223652206,starring:0.25519156764102785,cast:0.2540127604060545,writer:0.23904044207902764,distributor:0.20430284744786784,cinematography:0.182583823735518,music:0.1675671228903468,caption:0.14545085918028047,runtime:0.127767002869991,country:0.12493801913495534,producer:0.12321749670640451,director:0.11592975672109682,links:0.07925582303812376,image:0.07786973207561361,external:0.07764427108746134,released:0.07447174080087617,name:0.07214163435745059,infobox:0.06151153983466272,film:0.035415118094854446
这里,10000202
是文档的标识符,文件的其余部分遵循共振峰word:tfxidf
。
与其他示例一样,我们将实现串行和并发版本,并执行这两个版本,以验证 Fork/Join 框架是否提高了该算法的性能。
有部分在串行版本和并发版本之间共享。这些部分包括:
VocabularyLoader
:这个类用于加载构成语料库词汇表的单词列表。Word
、Document
、DocumentLoader
:这三个类用于加载文档信息。这些类在算法的串行版本和并发版本之间有一些差异。DistanceMeasure
:计算两个向量之间的欧几里德距离的类。DocumentCluster
:存储集群信息的类。让我们详细了解这些类。
正如前面提到的一样,我们的数据存储在两个文件中。其中一个文件是movies.words
文件。此文件存储一个包含文档中使用的所有单词的列表。VocabularyLoader
类将该文件转换为HashMap
。HashMap
的键是整个单词,该值是一个整数值,该单词的索引在列表中。我们使用该索引来确定单词在表示每个文档的向量空间模型中的位置。
该类只有一个名为load()
的方法,该方法接收文件路径作为参数,并返回HashMap
:
public class VocabularyLoader {
public static Map<String, Integer> load (Path path) throws IOException {
int index=0;
HashMap<String, Integer> vocIndex=new HashMap<String, Integer>();
try(BufferedReader reader = Files.newBufferedReader(path)){
String line = null;
while ((line = reader.readLine()) != null) {
vocIndex.put(line,index );
index++;
}
}
return vocIndex;
}
}
这些类存储关于我们将在算法中使用的文档的所有信息。首先,Word
类在文档中存储关于单词的信息。它包括文档中单词的索引和单词的 tf idf。这个类只包含那些属性(int
和double
,并且实现了Comparable
接口,使用它们的 tf idf 值对两个单词进行排序,所以我们不包含这个类的源代码。
Document
类存储文档的所有相关信息。首先,文档中包含单词的Word
对象数组。这是我们对向量空间模型的表示。我们只存储文档中使用的单词,以节省大量内存空间。然后是一个带有存储文档的文件名的String
对象,最后是一个DocumentCluster
对象,用于了解与文档关联的集群。它还包括一个构造函数来初始化这些属性和方法,以获取和设置它们的值。我们只包括setCluster()
方法的代码。在这种情况下,此方法将返回一个布尔值,以指示此属性的新值是与旧值相同还是与新值相同。我们将使用该值确定是否停止算法:
public boolean setCluster(DocumentCluster cluster) {
if (this.cluster == cluster) {
return false;
} else {
this.cluster = cluster;
return true;
}
}
最后,DocumentLoader
类加载有关文档的信息。它包括一个静态方法,load()
接收文件的路径,以及带有词汇表的HashMap
并返回Document
对象的Array
。它逐行加载文件,并将每一行转换为一个Document
对象。我们有以下代码:
public static Document[] load(Path path, Map<String, Integer> vocIndex) throws IOException{
List<Document> list = new ArrayList<Document>();
try(BufferedReader reader = Files.newBufferedReader(path)) {
String line = null;
while ((line = reader.readLine()) != null) {
Document item = processItem(line, vocIndex);
list.add(item);
}
}
Document[] ret = new Document[list.size()];
return list.toArray(ret);
}
要将文本文件的一行转换为Document
对象,我们使用processItem()
方法:
private static Document processItem(String line,Map<String, Integer> vocIndex) {
String[] tokens = line.split(",");
int size = tokens.length - 1;
Document document = new Document(tokens[0], size);
Word[] data = document.getData();
for (int i = 1; i < tokens.length; i++) {
String[] wordInfo = tokens[i].split(":");
Word word = new Word();
word.setIndex(vocIndex.get(wordInfo[0]));
word.setTfidf(Double.parseDouble(wordInfo[1]));
data[i - 1] = word;
}
Arrays.sort(data);
return document;
}
正如前面提到的,行中的第一项是文档的标识符。我们从tokens[0]
获取,并将其传递给Document
类构造函数。然后,对于其余的标记,我们再次拆分它们以获得每个单词的信息,其中包括整个单词和 tf idf 值。
此类计算文档和簇(表示为向量)之间的欧氏距离。排序后单词数组中的单词的排列顺序与质心数组中的相同,但有些单词可能不存在。对于这些词,我们假设 tf idf 为零,因此距离仅为与质心阵列对应值的平方:
public class DistanceMeasurer {
public static double euclideanDistance(Word[] words, double[] centroid) {
double distance = 0;
int wordIndex = 0;
for (int i = 0; i < centroid.length; i++) {
if ((wordIndex < words.length) (words[wordIndex].getIndex() == i)) {
distance += Math.pow( (words[wordIndex].getTfidf() - centroid[i]), 2);
wordIndex++;
} else {
distance += centroid[i] * centroid[i];
}
}
return Math.sqrt(distance);
}
}
这个类存储算法生成的每个集群的信息。此信息包括与此群集关联的所有文档的列表以及表示群集的向量的质心。在这种情况下,该向量的维数与词汇表中的单词相同。这个类有两个属性,一个初始化它们的构造函数,以及获取和设置它们的值的方法。它还包括两种非常重要的方法。首先是calculateCentroid()
方法。它计算簇的质心,作为表示与该簇关联的文档的向量的平均值。我们有以下代码:
public void calculateCentroid() {
Arrays.fill(centroid, 0);
for (Document document : documents) {
Word vector[] = document.getData();
for (Word word : vector) {
centroid[word.getIndex()] += word.getTfidf();
}
}
for (int i = 0; i < centroid.length; i++) {
centroid[i] /= documents.size();
}
}
第二种方法是initialize()
方法,它接收Random
对象,并用随机数初始化簇的质心向量,如下所示:
public void initialize(Random random) {
for (int i = 0; i < centroid.length; i++) {
centroid[i] = random.nextDouble();
}
}
一旦我们描述了应用的公共部分,让我们看看如何实现 k-means 聚类算法的串行版本。我们将使用两个类:SerialKMeans
,实现该算法,以及SerialMain
,实现main()
方法来执行该算法。
SerialKMeans
类实现了 k-means 聚类算法的串行版本。该类的主要方法是calculate()
方法。它接收以下参数:
Document
对象数组方法返回DocumentCluster
对象的Array
。每个集群都有与其关联的文档列表。首先,文档创建由numberClusters
参数确定的集群Array
并使用initialize()
方法和Random
对象对其进行初始化,如下所示:
public class SerialKMeans {
public static DocumentCluster[] calculate(Document[] documents, int clusterCount, int vocSize, int seed) {
DocumentCluster[] clusters = new DocumentCluster[clusterCount];
Random random = new Random(seed);
for (int i = 0; i < clusterCount; i++) {
clusters[i] = new DocumentCluster(vocSize);
clusters[i].initialize(random);
}
然后,我们重复分配和更新阶段,直到所有文档都位于同一集群中。最后,我们返回包含文档最终组织的集群数组,如下所示:
boolean change = true;
int numSteps = 0;
while (change) {
change = assignment(clusters, documents);
update(clusters);
numSteps++;
}
System.out.println("Number of steps: "+numSteps);
return clusters;
}
分配阶段在assignment()
方法中实现。此方法接收Document
和DocumentCluster
对象的数组。对于每个文档,它计算文档与所有簇之间的欧氏距离,并将文档指定给距离最小的簇。它返回一个布尔值,以指示一个或多个文档是否已将其分配的集群从一个步骤更改为下一个步骤。我们有以下代码:
private static boolean assignment(DocumentCluster[] clusters, Document[] documents) {
boolean change = false;
for (DocumentCluster cluster : clusters) {
cluster.clearClusters();
}
int numChanges = 0;
for (Document document : documents) {
double distance = Double.MAX_VALUE;
DocumentCluster selectedCluster = null;
for (DocumentCluster cluster : clusters) {
double curDistance = DistanceMeasurer.euclideanDistance(document.getData(), cluster.getCentroid());
if (curDistance < distance) {
distance = curDistance;
selectedCluster = cluster;
}
}
selectedCluster.addDocument(document);
boolean result = document.setCluster(selectedCluster);
if (result)
numChanges++;
}
System.out.println("Number of Changes: " + numChanges);
return numChanges > 0;
}
在update()
方法中实现更新步骤。它接收带有簇信息的DocumentCluster
数组,并简单地重新计算每个簇的质心:
private static void update(DocumentCluster[] clusters) {
for (DocumentCluster cluster : clusters) {
cluster.calculateCentroid();
}
}
}
SerialMain 类SerialMain
类包括启动 k-means 算法测试的main()
方法。首先,它从文件中加载数据(文字和文档):
public class SerialMain {
public static void main(String[] args) {
Path pathVoc = Paths.get("data", "movies.words");
Map<String, Integer> vocIndex=VocabularyLoader.load(pathVoc);
System.out.println("Voc Size: "+vocIndex.size());
Path pathDocs = Paths.get("data", "movies.data");
Document[] documents = DocumentLoader.load(pathDocs, vocIndex);
System.out.println("Document Size: "+documents.length);
然后,它初始化我们想要生成的集群数量和随机数生成器的种子。如果它们不是 main()
方法的参数,我们使用默认值如下:
if (args.length != 2) {
System.err.println("Please specify K and SEED");
return;
}
int K = Integer.valueOf(args[0]);
int SEED = Integer.valueOf(args[1]);
}
最后,我们启动算法,测量其执行时间,并写入每个集群的文档数。
Date start, end;
start=new Date();
DocumentCluster[] clusters = SerialKMeans.calculate(documents, K ,vocIndex.size(), SEED);
end=new Date();
System.out.println("K: "+K+"; SEED: "+SEED);
System.out.println("Execution Time: "+(end.getTime()- start.getTime()));
System.out.println(
Arrays.stream(clusters).map (DocumentCluster::getDocumentCount).sorted (Comparator.reverseOrder())
.map(Object::toString).collect( Collectors.joining(", ", "Cluster sizes: ", "")));
}
}
为了实现算法的并发版本,我们使用了 Fork/Join 框架。我们基于RecursiveAction
类实现了两个不同的任务。正如我们前面提到的,RecursiveAction
任务是在您希望将 Fork/Join 框架用于不返回结果的任务时使用的。我们已经将分配和更新阶段实现为在 Fork/Join 框架中执行的任务。
为了实现 k-means 算法的并发版本,我们将修改一些常用类以使用并发数据结构。然后,我们将实现这两个任务,最后,我们将实现实现算法并发版本的ConcurrentKMeans
和ConcurrentMain
类来测试它。
如前所述,我们已经将分配和更新阶段作为任务实现,将在 Fork/Join 框架中实现。
分配阶段将文档分配给与文档欧氏距离最小的簇。因此,我们必须处理所有文档,并计算所有文档和所有簇的欧氏距离。我们将使用任务必须处理的文档数量作为控制是否必须拆分任务的度量。我们从必须处理所有文档的任务开始,我们将对它们进行拆分,直到我们的任务必须处理大量小于预定义大小的文档。
AssignmentTask
类具有以下属性:
ConcurrentDocumentCluster
对象数组ConcurrentDocument
对象数组start
和end
,用于确定任务必须处理的文档数量AtomicInteger
属性numChanges
,用于存储已将其分配的集群从上次执行更改为当前执行的文档数maxSize
,用于存储任务可以处理的最大文档数我们已经实现了一个构造函数来初始化所有这些属性和方法,以获取和设置其值。
这些任务的主要方法是compute()
方法(与每项任务一样)。首先,我们检查任务必须处理的文档数量。如果小于或等于maxSize
属性,我们将处理这些文档。我们计算每个文档和所有聚类之间的欧氏距离,并选择距离最小的聚类。如果有必要,我们使用incrementAndGet()
方法增加numChanges
原子变量。原子变量可以由多个线程同时更新,而无需使用同步机制,也不会导致任何内存不一致。请参阅以下代码:
protected void compute() {
if (end - start <= maxSize) {
for (int i = start; i < end; i++) {
ConcurrentDocument document = documents[i];
double distance = Double.MAX_VALUE;
ConcurrentDocumentCluster selectedCluster = null;
for (ConcurrentDocumentCluster cluster : clusters) {
double curDistance = DistanceMeasurer.euclideanDistance (document.getData(), cluster.getCentroid());
if (curDistance < distance) {
distance = curDistance;
selectedCluster = cluster;
}
}
selectedCluster.addDocument(document);
boolean result = document.setCluster(selectedCluster);
if (result) {
numChanges.incrementAndGet();
}
}
如果任务必须处理的文档的数量太大,我们将该集合分为两部分,并创建两个新任务来处理这些部分,如下所示:
} else {
int mid = (start + end) / 2;
AssignmentTask task1 = new AssignmentTask(clusters, documents, start, mid, numChanges, maxSize);
AssignmentTask task2 = new AssignmentTask(clusters, documents, mid, end, numChanges, maxSize);
invokeAll(task1, task2);
}
}
为了在 Fork/Join 池中执行这些任务,我们使用了invokeAll()
方法。此方法将在任务完成执行后返回。
更新阶段重新计算每个簇的质心作为所有文档的平均值。因此,我们必须处理所有集群。我们将使用任务必须处理的集群数量作为控制是否必须拆分任务的度量。我们从一个必须处理所有集群的任务开始,我们将对其进行拆分,直到我们的任务必须处理一些小于预定义大小的集群。
UpdateTask
类具有以下属性:
ConcurrentDocumentCluster
对象数组start
和end
,用于确定任务必须处理的集群数量maxSize
,用于存储任务可以处理的最大集群数我们已经实现了构造函数来初始化所有这些属性和方法,以获取和设置其值。
compute()
方法首先检查任务必须处理的集群数量。如果该数字小于或等于maxSize
属性,它将处理这些簇并更新其质心:
@Override
protected void compute() {
if (end - start <= maxSize) {
for (int i = start; i < end; i++) {
ConcurrentDocumentCluster cluster = clusters[i];
cluster.calculateCentroid();
}
如果任务必须处理的集群数量太多,我们将把任务必须处理的集群集分成两部分,并创建两个任务来处理每个部分,如下所示:
} else {
int mid = (start + end) / 2;
UpdateTask task1 = new UpdateTask(clusters, start, mid, maxSize);
UpdateTask task2 = new UpdateTask(clusters, mid, end, maxSize);
invokeAll(task1, task2);
}
}
ConcurrentKMeans
类实现了 k-means 聚类算法的并发版本。作为串行版本,类的主要方法是calculate()
方法。它接收以下参数:
ConcurrentDocument
对象数组calculate()
方法返回包含集群信息的ConcurrentDocumentCluster
对象数组。每个集群都有与其关联的文档列表。首先,文档创建由numberClusters
参数确定的集群数组,并使用initialize()
方法和Random
对象对其进行初始化:
public class ConcurrentKMeans {
public static ConcurrentDocumentCluster[] calculate(ConcurrentDocument[] documents int numberCluster int vocSize, int seed, int maxSize) {
ConcurrentDocumentCluster[] clusters = new ConcurrentDocumentCluster[numberClusters];
Random random = new Random(seed);
for (int i = 0; i < numberClusters; i++) {
clusters[i] = new ConcurrentDocumentCluster(vocSize);
clusters[i].initialize(random);
}
然后,我们重复分配和更新阶段,直到所有文档都位于同一集群中。在循环之前,我们创建将执行该任务及其所有子任务的ForkJoinPool
。循环完成后,与其他Executor
对象一样,我们必须使用具有 Fork/Join 池的shutdown()
方法来完成其执行。最后,我们返回带有文档最终组织的集群数组:
boolean change = true;
ForkJoinPool pool = new ForkJoinPool();
int numSteps = 0;
while (change) {
change = assignment(clusters, documents, maxSize, pool);
update(clusters, maxSize, pool);
numSteps++;
}
pool.shutdown();
System.out.println("Number of steps: "+numSteps); return clusters;
}
分配阶段在assignment()
方法中实现。此方法接收集群数组、文档数组和maxSize
属性。首先,我们删除所有集群的关联文档列表:
private static boolean assignment(ConcurrentDocumentCluster[] clusters, ConcurrentDocument[] documents, int maxSize, ForkJoinPool pool) {
boolean change = false;
for (ConcurrentDocumentCluster cluster : clusters) {
cluster.clearDocuments();
}
然后,我们初始化必要的对象:AtomicInteger
存储分配的集群已更改的文档数量,以及将开始该过程的AssignmentTask
。
AtomicInteger numChanges = new AtomicInteger(0);
AssignmentTask task = new AssignmentTask(clusters, documents, 0, documents.length, numChanges, maxSize);
然后,我们使用ForkJoinPool
的execute()
方法异步执行池中的任务,并使用AssignmentTask
对象的join()
方法等待其完成,如下所示:
pool.execute(task);
task.join();
最后,我们检查已更改其指定集群的文档数。如果有变化,我们返回true
值。否则返回false
值。我们有以下代码:
System.out.println("Number of Changes: " + numChanges);
return numChanges.get() > 0;
}
更新阶段在update()
方法中实现。它接收集群阵列和maxSize
参数。首先,我们创建一个UpdateTask
对象来更新所有集群。然后,我们在方法作为参数接收的ForkJoinPool
对象中执行该任务,如下所示:
private static void update(ConcurrentDocumentCluster[] clusters, int maxSize, ForkJoinPool pool) {
UpdateTask task = new UpdateTask(clusters, 0, clusters.length, maxSize, ForkJoinPool pool);
pool.execute(task);
task.join();
}
}
ConcurrentMain
类包括main()
方法,用于启动 k-means 算法的测试。其代码与SerialMain
类相同,但将串行类更改为并发类。
为了比较两种解决方案,我们进行了不同的实验,改变三个不同参数的值:
Random
编号生成器的种子。该种子确定初始质心位置的方式。我们已经用值 1 和 13 测试了算法。maxSize
参数用于确定项目(文档或集群)的最大数量,一个任务可以在不拆分为其他任务的情况下进行处理。我们已经用值 1、20 和 400 测试了算法。我们已经使用 JMH 框架(执行了实验 http://openjdk.java.net/projects/code-tools/jmh/ ),允许您在 Java 中实现微基准测试。使用基准测试框架是一个更好的解决方案,它可以使用currentTimeMillis()
或nanoTime()
等方法简单地测量时间。我们已经在一台四核处理器的计算机上执行了 10 次,并计算了这 10 次的中间执行时间。以下是我们以毫秒为单位获得的执行时间:
电视连续剧
|
同时发生的
| | --- | --- | --- | --- | | K | 种子 | | 最大尺寸=1 | 最大尺寸=20 | 最大尺寸=400 | | 5. | 1. | 6676.141 | 4696.414 | 3291.397 | 3179.673 | | 10 | 1. | 6780.088 | 3365.731 | 2970.056 | 2825.488 | | 15 | 1. | 12936.178 | 5308.734 | 4737.329 | 4490.443 | | 20 | 1. | 19824.729 | 7937.820 | 7347.445 | 6848.873 | | 5. | 13 | 3738.869 | 2714.325 | 1984.152 | 1916.053 | | 10 | 13 | 9567.416 | 4693.164 | 3892.526 | 3739.129 | | 15 | 13 | 12427.589 | 5598.996 | 4735.518 | 4468.721 | | 20 | 13 | 18157.913 | 7285.565 | 6671.283 | 6325.664 |
我们可以得出以下结论:
maxSize
参数对执行时间影响不大。参数 K 或 seed 对执行时间的影响较大。如果增加参数的值,将获得更好的性能。1 和 20 之间的差异大于 20 和 400 之间的差异。对于示例,如果我们将参数为 K=20 和 seed=13 的串行算法与参数为 K=20、seed=13 和 maxSize=400 的并发版本进行加速比较,我们得到以下结果:
假设您有很多描述项目列表的数据。例如,您拥有许多人的许多属性(姓名、姓氏、地址、电话号码等)。获取符合特定标准的数据是一种常见的需求,例如,您希望获取居住在特定街道或具有特定姓名的人。
在本节中,您将实现其中一个过滤程序。我们使用了 UCI 的人口普查收入 KDD数据集(您可以从下载)https://archive.ics.uci.edu/ml/datasets/Census-Income+%28KDD%29,其中包含从美国人口普查局进行的 1994 年和 1995 年当前人口调查中提取的加权人口普查数据。
在本例的并发版本中,您将学习如何取消在 Fork/Join 池中运行的任务,以及如何管理任务中可能引发的未检查异常。
我们实现了一些类来读取文件中的数据并过滤数据。这些类由算法的串行和并发版本使用。这些是课程:
CensusData
类:这个类存储 39 个定义每个人的属性。它定义了获取和设置其值的属性和方法。我们将用一个数字来标识每个属性。这个类的evaluateFilter()
方法包含属性的编号和名称之间的关联。您可以查看文件https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census-income.names 获取每个属性的详细信息。CensusDataLoader
类:此类从文件中加载普查数据。它有一个load()
方法,该方法接收文件路径作为输入参数,并返回一个包含文件中所有人员信息的CensusData
数组。FilterData
类:这个类定义了一个数据过滤器。过滤器包括属性的编号和该属性的值。Filter
类:此类实现确定CensusData
对象是否满足过滤器列表条件的方法。我们不包括这些类的源代码。它们非常简单,您可以检查示例的源代码。
我们已经在两个类中实现了滤波器算法的串行版本。SerialSearch
类对数据进行过滤。它提供了两种方法:
findAny()
方法:它接收CensusData
对象的数组作为参数,其中包含来自文件的所有数据和过滤器列表,并返回一个CensusData
对象,其中包含它从过滤器中找到的第一个满足所有条件的人。findAll()
方法:接收CensusData
对象数组作为参数,包含来自文件的所有数据和过滤器列表,并从过滤器返回CensusData
对象数组,其中包含满足所有条件的所有人员。SerialMain
类实现了该版本的main()
方法,并对其进行测试,以测量该算法在某些情况下的执行时间。
正如前面提到的类实现了对数据的过滤。它提供了两种方法。第一个方法是findAny()
方法,它查找满足过滤器要求的第一个数据对象。当它找到第一个数据对象时,它将完成其执行。请参阅以下代码:
public class SerialSearch {
public static CensusData findAny (CensusData[] data, List<FilterData> filters) {
int index=0;
for (CensusData censusData : data) {
if (Filter.filter(censusData, filters)) {
System.out.println("Found: "+index);
return censusData;
}
index++;
}
return null;
}
第二个方法是findAll()
方法,返回一个CensusData
对象数组,其中包含满足过滤器要求的所有对象,如下所示:
public static List<CensusData> findAll (CensusData[] data, List<FilterData> filters) {
List<CensusData> results=new ArrayList<CensusData>();
for (CensusData censusData : data) {
if (Filter.filter(censusData, filters)) {
results.add(censusData);
}
}
return results;
}
}
您将使用这个类在不同的情况下测试过滤算法。首先,我们从文件中加载数据,如下所示:
public class SerialMain {
public static void main(String[] args) {
Path path = Paths.get("data","census-income.data");
CensusData data[]=CensusDataLoader.load(path);
System.out.println("Number of items: "+data.length);
Date start, end;
我们要测试的第一种情况是使用findAny()
方法来查找数组第一个位置存在的对象。您构建一个过滤器列表,然后使用文件的数据和过滤器列表调用findAny()
方法:
List<FilterData> filters=new ArrayList<>();
FilterData filter=new FilterData();
filter.setIdField(32);
filter.setValue("Dominican-Republic");
filters.add(filter);
filter=new FilterData();
filter.setIdField(31);
filter.setValue("Dominican-Republic");
filters.add(filter);
filter=new FilterData();
filter.setIdField(1);
filter.setValue("Not in universe");
filters.add(filter);
filter=new FilterData();
filter.setIdField(14);
filter.setValue("Not in universe");
filters.add(filter);
start=new Date();
CensusData result=SerialSearch.findAny(data, filters);
System.out.println("Test 1 - Result: "+result.getReasonForUnemployment());
end=new Date();
System.out.println("Test 1- Execution Time: "+(end.getTime()-start.getTime()));
我们的过滤器查找以下属性:
32
:这是生父属性的国家31
:这是生母属性的国家1
:工人属性的类别;Not in universe
是它们可能的值之一14
:这是失业属性的原因;Not in universe
是它们可能的值之一我们将测试其他情况,如下所示:
findAny()
方法查找数组最后位置存在的对象findAny()
方法尝试查找不存在的对象findAny()
方法findAll()
方法获取满足过滤器列表的所有对象findAll()
方法我们将在并发版本中包含更多元素:
findAny()
方法并找到满足所有条件的对象时,您不需要继续执行其余任务。findAny()
方法的RecursiveTask
类:扩展RecursiveTask
的IndividualTask
类。findAll()
方法的RecursiveTask
类:扩展RecursiveTask
的ListTask
类。让我们看看所有这些类的详细信息。
我们将使用这个类来控制任务的取消。在以下两种情况下,我们将取消任务的执行:
findAny()
操作,发现一个符合要求的对象findAny()
或findAll()
操作,其中一个任务中存在未检查的异常该类声明了两个属性:ConcurrentLinkedDeque
用于存储我们需要取消的所有任务,以及一个AtomicBoolean
变量以确保只有一个任务执行cancelTasks()
方法:
public class TaskManager {
private Set<RecursiveTask> tasks;
private AtomicBoolean cancelled;
public TaskManager() {
tasks = ConcurrentHashMap.newKeySet();
cancelled = new AtomicBoolean(false);
}
定义了向ConcurrentLinkedDeque
添加任务、从ConcurrentLinkedDeque
中删除任务、取消其中存储的所有任务的方法。为了取消任务,我们使用在ForkJoinTask
类中定义的cancel()
方法。true
参数强制中断正在运行的任务,如下所示:
public void addTask(RecursiveTask task) {
tasks.add(task);
}
public void cancelTasks(RecursiveTask sourceTask) {
if (cancelled.compareAndSet(false, true)) {
for (RecursiveTask task : tasks) {
if (task != sourceTask) {
if(cancelled.get()) {
task.cancel(true);
}
else {
tasks.add(task);
}
}
}
}
}
public void deleteTask(RecursiveTask task) {
tasks.remove(task);
}
cancelTasks()
方法接收RecursiveTask
对象作为参数。我们将取消所有任务,除了调用此方法的任务。我们不想取消找到结果的任务。compareAndSet(false, true)
方法将AtomicBoolean
变量设置为true
,仅当当前值为false
时返回true
。如果AtomicBoolean
变量已经有true
值,则返回false
。整个操作是以原子方式执行的,因此即使从不同线程并发调用了多次cancelTasks()
方法,也可以保证 if 语句体最多执行一次。
IndividualTask
类对CensusData
任务参数化的RecursiveTask
类进行扩展,实现findAny()
操作。它定义了以下属性:
包含所有CensusData
对象的数组
决定必须处理的元素的start
和end
属性
size
属性,用于确定任务将在不拆分任务的情况下处理的最大元素数
一个TaskManager
类,用于在必要时取消任务
以下代码给出了要应用的过滤器列表:
private CensusData[] data;
private int start, end, size;
private TaskManager manager;
private List<FilterData> filters;
public IndividualTask(CensusData[] data, int start, int end, TaskManager manager, int size, List<FilterData> filters) {
this.data = data;
this.start = start;
this.end = end;
this.manager = manager;
this.size = size;
this.filters = filters;
}
本课程的主要方法是compute()
方法。它返回一个CensusData
对象。如果任务必须处理的元素数小于 size 属性,则直接查找对象。如果该方法找到所需的对象,则返回该对象并使用方法cancelTasks()
取消其余任务的执行。如果该方法没有找到所需的对象,则返回 null。我们有以下代码:
if (end - start <= size) {
for (int i = start; i < end && ! Thread.currentThread().isInterrupted(); i++) {
CensusData censusData = data[i];
if (Filter.filter(censusData, filters)) {
System.out.println("Found: " + i);
manager.cancelTasks(this);
return censusData;
}
}
return null;
}
如果必须处理的项目数大于 size 属性,我们将创建两个子任务来处理一半的元素:
} else {
int mid = (start + end) / 2;
IndividualTask task1 = new IndividualTask(data, start, mid, manager, size, filters);
IndividualTask task2 = new IndividualTask(data, mid, end, manager, size, filters);
然后,我们将新创建的任务添加到任务管理器,并删除实际任务。如果要取消任务,则只需取消正在运行的任务:
manager.addTask(task1);
manager.addTask(task2);
manager.deleteTask(this);
然后,我们使用异步方式发送任务的fork()
方法将任务发送到ForkJoinPool
,并使用quietlyJoin()
方法等待任务完成。join()
方法和quietlyJoin()
方法的区别在于,如果任务被取消或在方法内部引发未检查的异常,join()
方法将启动异常, quietlyJoin()
方法不会引发任何异常。
task1.fork();
task2.fork();
task1.quietlyJoin();
task2.quietlyJoin();
然后,我们从TaskManager
类中删除子任务,如下所示:
manager.deleteTask(task1);
manager.deleteTask(task2);
现在,我们使用 join()
方法获得任务的结果。如果任务抛出未经检查的异常,它将在不进行特殊处理的情况下传播,取消将被忽略,如下所示:
try {
CensusData res = task1.join();
if (res != null)
return res;
manager.deleteTask(task1);
} catch (CancellationException ex) {
}
try {
CensusData res = task2.join();
if (res != null)
return res;
manager.deleteTask(task2);
} catch (CancellationException ex) {
}
return null;
}
}
ListTask
类扩展了参数化为CensusData
的List
类。我们将使用此任务执行findAll()
操作。这与IndividualTask
任务非常相似。两者使用相同的属性,但在compute()
方法中有所不同。
首先,我们初始化一个List
对象以返回结果并检查任务必须处理的元素数量。如果任务必须处理的元素数小于“大小”属性,请将满足过滤器中指定条件的所有对象添加到结果列表中:
@Override
protected List<CensusData> compute() {
List<CensusData> ret = new ArrayList<CensusData>();
if (end - start <= size) {
for (int i = start; i < end; i++) {
CensusData censusData = data[i];
if (Filter.filter(censusData, filters)) {
ret.add(censusData);
}
}
如果需要处理的项数大于 size 属性,我们将创建两个子任务来处理一半的元素:
int mid = (start + end) / 2;
ListTask task1 = new ListTask(data, start, mid, manager, size, filters);
ListTask task2 = new ListTask(data, mid, end, manager, size, filters);
然后,我们将新创建的任务添加到任务管理器中,并删除实际任务。实际任务不会被取消;其子任务将被取消,如下所示:
manager.addTask(task1);
manager.addTask(task2);
manager.deleteTask(this);
然后,我们将使用异步发送任务的fork()
方法将任务发送到ForkJoinPool
,并使用quietlyJoin()
方法等待任务完成:
task1.fork();
task2.fork();
task2.quietlyJoin();
task1.quietlyJoin();
然后,我们将从TaskManager
中删除子任务:
manager.deleteTask(task1);
manager.deleteTask(task2);
现在,我们使用 join()
方法获得任务的结果。如果任务抛出未经检查的异常,它将在不进行特殊处理的情况下传播,取消将被忽略:
try {
List<CensusData> tmp = task1.join();
if (tmp != null)
ret.addAll(tmp);
manager.deleteTask(task1);
} catch (CancellationException ex) {
}
try {
List<CensusData> tmp = task2.join();
if (tmp != null)
ret.addAll(tmp);
manager.deleteTask(task2);
} catch (CancellationException ex) {
}
ConcurrentSearch
类实现了findAny()
和findAll()
方法。它们具有与串行版本相同的接口。在内部,他们初始化TaskManager
对象和第一个任务,并使用execute
方法发送到默认ForkJoinPool
;他们等待任务的最终完成并写出结果。这是findAny()
方法的代码:
public class ConcurrentSearch {
public static CensusData findAny (CensusData[] data, List<FilterData> filters, int size) {
TaskManager manager=new TaskManager();
IndividualTask task=new IndividualTask(data, 0, data.length, manager, size, filters);
ForkJoinPool.commonPool().execute(task);
try {
CensusData result=task.join();
if (result!=null) {
System.out.println("Find Any Result: "+result.getCitizenship());
return result;
} catch (Exception e) {
System.err.println("findAny has finished with an error: "+task.getException().getMessage());
}
return null;
}
这是findAll()
方法的代码:
public static CensusData[] findAll (CensusData[] data, List<FilterData> filters, int size) {
List<CensusData> results;
TaskManager manager=new TaskManager();
ListTask task=new ListTask(data,0,data.length,manager, size,filters);
ForkJoinPool.commonPool().execute(task);
try {
results=task.join();
return results;
} catch (Exception e) {
System.err.println("findAny has finished with an error: " + task.getException().getMessage());
}
return null;
}
ConcurrentMain
类用于测试对象过滤器的并发版本。它与SerialMain
类相同,但使用并发版本的操作。
为了比较过滤算法的串行版本和并发版本,我们在六种不同的情况下对它们进行了测试:
findAny()
方法,寻找存在于CensusData
数组第一个位置的对象findAny()
方法寻找CensusData
数组最后位置存在的对象findAny()
方法,寻找一个不存在的对象findAny()
方法findAll()
方法findAll()
方法对于该算法的并发版本,我们测试了大小参数的三个不同值,它们决定了一个任务在不分叉两个子任务的情况下可以处理的最大元素数。我们用 10200 和 2000 进行了测试。
我们已经使用 JMH 框架(执行了测试 http://openjdk.java.net/projects/code-tools/jmh/ ),允许您在 Java 中实现微基准测试。使用基准测试框架是一个更好的解决方案,它使用currentTimeMillis()
或nanoTime()
等方法简单地测量时间。我们已经在一台四核处理器的计算机上执行了 10 次,并计算了这 10 次的中间执行时间。与其他示例一样,我们以毫秒为单位测量了执行时间:
测试用例
|
电视连续剧
|
并发大小=10
|
并发大小=200
|
并发大小=2000
|
最好的
| | --- | --- | --- | --- | --- | --- | | 试验 1 | 1.177 | 8.124 | 4.547 | 4.073 | 电视连续剧 | | 测试 2 | 95.237 | 157.412 | 34.581 | 35.691 | 同时发生的 | | 测试 3 | 66.616 | 41.916 | 74.829 | 37.140 | 同时发生的 | | 测试 4 | 0.540 | 25869.339 | 643.144 | 9.673 | 电视连续剧 | | 测试 5 | 61.752 | 37.349 | 40.344 | 22.911 | 同时发生的 | | 测试 6 | 0.802 | 31663.607 | 231.440 | 7.706 | 电视连续剧 |
我们可以得出以下结论:
size
参数的值很小时,并发版本在这种情况下的性能非常差。在这种情况下,并发并不总是能提高性能。
merge 排序算法是一种非常流行的排序算法,通常使用分治技术来实现,因此它是使用 Fork/Join 框架进行测试的非常好的候选者。
为了实现合并排序算法,我们将未排序的列表划分为一个元素的子列表。然后,我们合并那些未排序的子列表以生成有序的子列表,直到我们处理完所有的子列表,我们只有原始列表,但所有元素都已排序。
为了制作我们算法的并发版本,我们使用了新的 Fork/Join 任务,CountedCompleter
任务,在 Java8 版本中引入。这些任务最重要的特征是,它们包含了一个方法,当它们的所有子任务都已完成执行时,该方法将被执行。
为了测试实现,我们使用了亚马逊产品联合购买网络元数据(您可以从下载)https://snap.stanford.edu/data/amazon-meta.html )。特别是,我们创建了一个 salesrank 为 542184 种产品的列表。我们将测试我们的算法版本,对这个产品列表进行排序,并将执行时间与Arrays
类的sort()
和parallelSort()
方法进行比较。
正如我们前面提到的一样,我们已经建立了一个包含 542184 个亚马逊产品的列表,其中包含每个产品的信息,包括 ID、名称、组、salesrank、评论数量、类似产品的数量以及产品所属类别的数量。我们已经实现了AmazonMetaData
类来存储产品的信息。此类声明获取和设置其值所需的属性和方法。这个类实现了Comparable
接口来比较这个类的两个实例。我们希望按 salesrank 按升序对元素进行排序。为了实现compare()
方法,我们使用Long
类的compare()
方法来比较两个对象的 salesrank,如下所示:
public int compareTo(AmazonMetaData other) {
return Long.compare(this.getSalesrank(), other.getSalesrank());
}
我们还实现了提供load()
方法的AmazonMetaDataLoader
。该方法以数据作为参数接收到文件的路由,并返回一个包含所有产品信息的AmazonMetaData
对象数组。
我们不包括这些类的源代码来关注 Fork/Join 框架的特性。
我们已经在SerialMergeSort
类中实现了 merge 排序算法的串行版本,它实现了算法和SerialMetaData
类,并提供了main()
方法来测试算法。
SerialMergeSort
类实现了合并排序算法的串行版本。提供接收以下参数的mergeSort()
方法:
如果方法必须只处理一个元素,则返回。否则,它将对mergeSort()
方法进行两次递归调用。第一个调用将处理前一半的元素,第二个调用将处理后一半的元素。最后,我们调用merge()
方法来合并元素的两半,并得到元素的排序列表:
public void mergeSort (Comparable data[], int start, int end) {
if (end-start < 2) {
return;
}
int middle = (end+start)>>>1;
mergeSort(data,start,middle);
mergeSort(data,middle,end);
merge(data,start,middle,end);
}
我们已经使用了(end+start)>>>1
操作符来获取中间元素来拆分数组。例如,如果您有 15 亿个元素(对于现代内存芯片来说并非不可能),它仍然适合 Java 阵列。但是,(结束+开始)/2将溢出,导致一个负数字数组。有关此问题的详细说明,请参见http://googleresearch.blogspot.ru/2006/06/extra-extra-read-all-about-it-nearly.html 。
merge()
方法合并两个元素列表以获得排序列表。它接收以下参数:
start
、mid
和end
)决定了我们要合并和排序的数组的两个部分(开始-中间、中间-结束)我们创建一个临时数组来对元素进行排序,对数组中处理列表两部分的元素进行排序,并将排序后的列表存储在原始数组的相同位置。检查以下代码:
private void merge(Comparable[] data, int start, int middle, int end) {
int length=end-start+1;
Comparable[] tmp=new Comparable[length];
int i, j, index;
i=start;
j=middle;
index=0;
while ((i<middle) && (j<end)) {
if (data[i].compareTo(data[j])<=0) {
tmp[index]=data[i];
i++;
} else {
tmp[index]=data[j];
j++;
}
index++;
}
while (i<middle) {
tmp[index]=data[i];
i++;
index++;
}
while (j<end) {
tmp[index]=data[j];
j++;
index++;
}
for (index=0; index < (end-start); index++) {
data[index+start]=tmp[index];
}
}
}
SerialMetaData
类提供了main()
方法来测试算法。我们将执行每个排序算法 10 次,以计算平均执行时间。首先,我们从文件中加载数据并创建数组的副本:
public class SerialMetaData {
public static void main(String[] args) {
for (int j=0; j<10; j++) {
Path path = Paths.get("data","amazon-meta.csv");
AmazonMetaData[] data = AmazonMetaDataLoader.load(path);
AmazonMetaData data2[] = data.clone();
然后,我们使用Arrays
类的sort()
方法对第一个数组进行排序:
Date start, end;
start = new Date();
Arrays.sort(data);
end = new Date();
System.out.println("Execution Time Java Arrays.sort(): " + (end.getTime() - start.getTime()));
然后,我们使用合并排序算法的实现对第二个数组进行排序:
SerialMergeSort mySorter = new SerialMergeSort();
start = new Date();
mySorter.mergeSort(data2, 0, data2.length);
end = new Date();
System.out.println("Execution Time Java SerialMergeSort: " + (end.getTime() - start.getTime()));
最后,我们检查排序的数组是否相同:
for (int i = 0; i < data.length; i++) {
if (data[i].compareTo(data2[i]) != 0) {
System.err.println("There's a difference is position " + i);
System.exit(-1);
}
}
System.out.println("Both arrays are equal");
}
}
}
正如我们前面提到的,我们将使用新的 Java8CountedCompleter
类作为 Fork/Join 任务的基类。此类提供了一种机制,用于在方法的所有子任务都已完成时执行该方法。这是onCompletion()
方法。因此,我们使用compute()
方法划分数组,并使用onCompletion()
方法将子列表合并为有序列表。
您将要实现的并发解决方案有三个类:
CountedCompleter
类并实现执行合并排序算法的任务的MergeSortTask
类ConcurrentMergeSort
任务ConcurrentMetaData
类,提供main()
方法来测试合并排序算法的并发版本正如我们前面提到的,这个类实现了将要执行合并排序算法的任务。此类使用以下属性:
该类还具有用于初始化其参数的构造函数:
public class MergeSortTask extends CountedCompleter<Void> {
private Comparable[] data;
private int start, end;
private int middle;
public MergeSortTask(Comparable[] data, int start, int end,
MergeSortTask parent) {
super(parent);
this.data = data;
this.start = start;
this.end = end;
}
compute()
方法,如果开始索引和结束索引之间的差异大于或等于1024
,我们将任务拆分为两个子任务,以处理原始集合的两个子集。两个任务都使用fork()
方法以异步方式向ForkJoinPool
发送任务。否则,我们执行SerialMergeSorg.mergeSort()
对数组的部分(包含1024
或更少的元素)进行排序,然后调用tryComplete()
方法。当子任务完成执行后,此方法将在内部调用onCompletion()
方法。请看下面的代码:
@Override
public void compute() {
if (end - start >= 1024) {
middle = (end+start)>>>1;
MergeSortTask task1 = new MergeSortTask(data, start, middle, this);
MergeSortTask task2 = new MergeSortTask(data, middle, end, this);
addToPendingCount(1);
task1.fork();
task2.fork();
} else {
new SerialMergeSort().mergeSort(data, start, end);
tryComplete();
}
在本例中,我们将使用onCompletion()
方法进行合并和排序操作以获得排序列表。一旦任务完成了onCompletion()
方法的执行,它就会通过其父任务调用tryComplete()
来尝试完成该任务。onCompletion()
方法的源代码与算法串行版本的merge()
方法非常相似。请参阅以下代码:
@Override
public void onCompletion(CountedCompleter<?> caller) {
if (middle==0) {
return;
}
int length = end - start + 1;
Comparable tmp[] = new Comparable[length];
int i, j, index;
i = start;
j = middle;
index = 0;
while ((i < middle) && (j < end)) {
if (data[i].compareTo(data[j]) <= 0) {
tmp[index] = data[i];
i++;
} else {
tmp[index] = data[j];
j++;
}
index++;
}
while (i < middle) {
tmp[index] = data[i];
i++;
index++;
}
while (j < end) {
tmp[index] = data[j];
j++;
index++;
}
for (index = 0; index < (end - start); index++) {
data[index + start] = tmp[index];
}
}
在并发版本中,这个类非常简单。它实现了mergeSort()
方法,该方法接收要排序的数据数组,并将开始索引(始终为 0)和结束索引(始终为数组的长度)作为参数对数组进行排序。我们选择维护相同的接口,而不是串行版本。
该方法创建一个新的MergeSortTask
,使用invoke()
方法将其发送到默认的ForkJoinPool
,该方法在任务完成执行并对数组进行排序时返回。
public class ConcurrentMergeSort {
public void mergeSort (Comparable data[], int start, int end) {
MergeSortTask task=new MergeSortTask(data, start, end,null);
ForkJoinPool.commonPool().invoke(task);
}
}
ConcurrentMetaData
类提供main()
方法来测试合并排序算法的并发版本。在我们的例子中,代码与SerialMetaData
类的代码相同,但是使用了类的并发版本和Arrays.parallelSort()
方法,而不是Arrays.sort()
方法,所以我们不包括类的源代码。
我们已经执行了合并排序算法的串行和并发版本,并将它们之间的执行时间与Arrays.sort()
和Arrays.parallelSort()
方法进行了比较。我们已经使用 JMH 框架(执行了四个版本 http://openjdk.java.net/projects/code-tools/jmh/ 允许您在 Java 中实现微基准测试。使用基准测试框架是一个更好的解决方案,它使用currentTimeMillis()
或nanoTime()
等方法简单地测量时间。我们已经在一台四核处理器的计算机上执行了 10 次,并计算了这 10 次的中间执行时间。这些是我们在使用 542184 个对象对数据集进行排序时获得的执行时间(毫秒):
Arrays.sort()
|
串行合并排序
|
Arrays.parallelSort()
|
并发合并排序
| | --- | --- | --- | --- | --- | | 执行时间(毫秒) | 561.324 | 711.004 | 261.418 | 353.846 |
我们可以得出以下结论:
Arrays.parallelSort()
获得最佳结果。对于串行算法,Arrays.sort()
方法比我们的实现获得更好的执行时间。我们可以使用以下速度比较合并排序算法的串行和并发版本:
在本章的三个示例中,我们使用了构成 Fork/Join 框架的类的许多方法,但您还需要了解其他有趣的方法。
我们使用了ForkJoinPool
类中的execute()
和invoke()
方法将任务发送到池中。我们可以使用另一个名为submit()
的方法。它们之间的主要区别在于execute()
方法将任务发送到ForkJoinPool
并立即返回一个无效值;invoke()
方法将任务发送到ForkJoinPool
并在任务执行完毕后返回;submit()
方法将任务发送到ForkJoinPool
并立即返回一个Future
对象来控制任务的状态并获取其结果。
在本章的所有示例中,我们都使用了基于ForkJoinTask
类的类,但是您可以使用基于Runnable
和Callable
接口的ForkJoinPool
任务。为此,您可以使用方法submit()
,该方法的版本接受Runnable
对象、带有结果的Runnable
对象和Callable
对象。
ForkJoinTask
类提供方法get(long timeout, TimeUnit unit)
获取任务返回的结果。此方法等待参数中为任务结果指定的时间段。如果任务在该时间段之前完成执行,则该方法返回结果。否则,它会抛出一个TimeoutException
异常。
ForkJoinTask
提供了invoke()
方法的替代方法。这是quietlyInvoke()
方法。这两个版本的主要区别在于invoke()
方法返回任务执行的结果,或者在必要时抛出任何异常。quietlyInvoke()
方法不会返回任务的结果,也不会引发任何异常。这与示例中使用的quietlyJoin()
方法类似。
分治设计技术是解决各种问题的一种非常流行的方法。你把原来的问题分成几个小问题,然后把这些问题分成几个小问题,直到我们有足够的简单问题来直接解决它。在版本 7 中,Java 并发 API 引入了一种针对此类问题优化的特殊类型的Executor
。这是 Fork/Join 框架。它基于以下两个操作:
使用这些操作,Fork/Join 任务具有以下外观:
if ( problem.size() > DEFAULT_SIZE) {
childTask1=new Task();
childTask2=new Task();
childTask1.fork();
childTask2.fork();
childTaskResults1=childTask1.join();
childTaskResults2=childTask2.join();
taskResults=makeResults(childTaskResults1, childTaskResults2);
return taskResults;
} else {
taskResults=solveBasicProblem();
return taskResults;
}
在本章中,您使用 Fork/Join 框架解决了三个不同的问题,例如 k-means 聚类算法、数据过滤算法和合并排序算法。
您已经使用了 API 提供的默认ForkJoinPool
(这是 Java 8 版本的一个新特性),并创建了一个新的ForkJoinPool
对象。您还使用了三种类型的ForkJoinTask``s
:
RecursiveAction
类,用作不返回结果的ForkJoinTasks
类的基类。RecursiveTask
类,用作返回结果的ForkJoinTasks
类的基类。CountedCompleter
类,在 Java 8 中引入,用作ForkJoinTasks
的基类,当其所有子任务完成执行时,需要执行一个方法或启动另一个任务。在下一章中,您将学习如何使用 MapReduce 编程技术,使用新的 Java 8并行流来获得处理超大数据集的最佳性能。