Java8 优化分治的解决方案详解

第 2 章管理大量线程–执行器第 3 章从执行器获取最大值第 4 章从任务获取数据–可调用和未来接口,您学习了如何使用执行器作为一种机制来提高执行大量并发任务的并发应用的性能。Java7 并发 API 通过 Fork/Join 框架引入了一种特殊的执行器。该框架旨在为那些可以使用分治设计范式解决的问题实现最佳并行解决方案。在本章中,我们将介绍以下主题:

Java5 中引入的 executor 框架提供了一种执行并发任务的机制,无需创建、启动和完成线程。这个框架使用一个线程池来执行发送给执行者的任务,并将它们用于多个任务。这种机制为程序员提供了一些优势,如下所示:

  • 编写并发应用更容易,因为您不必担心创建线程。
  • 更容易控制执行者和应用使用的资源。您可以创建仅使用预定义线程数的执行器。如果发送更多任务,执行器将它们存储在队列中,直到有线程可用。
  • 执行器减少了线程创建和重用线程所带来的开销。在内部,它管理一个线程池,该线程池重用线程来执行多个任务。

分治算法是一种非常流行的设计技术。要使用此技术解决问题,可以将其划分为更小的问题。你以递归的方式重复这个过程,直到你要解决的问题小到可以直接解决为止。这些类型的问题可以使用 executor 来解决,但是为了更有效地解决它们,Java7 并发 API 引入了 Fork/Join 框架。

该框架基于ForkJoinPool类,这是一种特殊的执行器、两种操作fork()join()方法(及其不同变体),以及内部算法工作窃取算法。在本章中,您将学习实现以下三个示例的 Fork/Join 框架的基本特征、限制和组件:

  • k-means 聚类算法在文档聚类中的应用
  • 一种数据过滤算法,用于获取满足特定条件的数据
  • 合并排序算法以高效的方式对大数据组进行排序

分叉/连接框架的基本特征

正如我们前面提到的,必须使用 Fork/Join 框架来实现基于分治技术的问题解决方案。你必须把原来的问题分成更小的问题,直到它们小到可以直接解决为止。使用此框架,您将实现其主要方法如下所示的任务:

if ( problem.size() > DEFAULT_SIZE) {
    divideTasks();
    executeTask();
    taskResults=joinTasksResult();
    return taskResults;
} else {
    taskResults=solveBasicProblem();
    return taskResults;
}

最重要的部分是允许您以有效的方式划分和执行子任务,并获得这些子任务的结果以计算父任务的结果。ForkJoinTask类提供的两种方法支持此功能,如下所示:

  • fork()方法:此方法允许您向 Fork/Join 执行器发送子任务
  • join()方法:此方法允许您等待子任务完成并返回其结果

这些方法有不同的变体,您将在示例中看到。Fork/Join 框架还有另一个关键部分:工作窃取算法,它决定要执行哪些任务。当一个任务正在等待使用join()方法完成子任务时,执行该任务的线程从正在等待的任务池中获取另一个任务并开始执行。这样,Fork/Join 执行器的线程总是通过提高应用的性能来执行任务。

Java8 在 Fork/Join 框架中包含了一个新特性。现在,每个 Java 应用都有一个名为 common pool 的默认ForkJoinPool。您可以通过调用ForkJoinPool.commonPool()静态方法获取。您不需要显式地创建一个(尽管您可以)。默认情况下,此默认分叉/联接执行器将使用由计算机的可用处理器确定的线程数。您可以通过更改系统属性java.util.concurrent.ForkJoinPool.common.parallelism的值来更改此默认行为。

JavaAPI 的一些特性使用 Fork/Join 框架来实现并发操作。例如,Arrays类对数组进行并行排序的parallelSort()方法和 Java 8 中引入的并行流(后面将在第 7 章中介绍,使用并行流处理海量数据集——Map and Reduce 模型第 8 章使用并行流处理海量数据集–映射和收集模型使用此框架。

Fork/Join 框架的局限性

由于 Fork/Join 框架被认为可以解决一类确定的问题,因此在使用它来实现问题时,您必须考虑到一些限制,如下所示:

  • 你不打算细分的基本问题必须不是很大,但也不是很小。根据 JavaAPI 文档,它应该有 100 到 10000 个基本计算步骤。
  • 您不应该使用阻塞 I/O 操作,例如在数据可用之前从网络套接字读取用户输入或数据。这样的操作会导致 CPU 内核闲置,降低并行度,因此无法获得完整的性能。
  • 您不能在任务内抛出已检查的异常。您必须包含处理它们的代码(例如,包装为 uncheckedRuntimeException)。未经检查的异常有一种特殊的处理方式,您将在示例中看到。

分叉/连接框架的组件

Fork/Join 框架中有五个基本类:

  • ForkJoinPool类:这个类实现ExecutorExecutorService接口,它是您将用来执行 Fork/Join 任务的Executor接口。Java 为您提供了一个默认的ForkJoinPool对象(名为 common pool),但如果需要,您可以使用一些构造函数来创建一个。您可以指定并行级别(运行并行线程的最大数量)。默认情况下,它使用可用处理器的数量作为并发级别。
  • ForkJoinTask类:这是所有 Fork/Join 任务的基本抽象类。它是一个抽象类,提供了fork()join()方法以及它们的一些变体。它还实现了Future接口,并提供了确定任务是否以正常方式完成、是否被取消或是否引发未检查异常的方法。RecursiveTaskRecursiveActionCountedCompleter类提供了compute()抽象方法,这些方法应该在子类中实现以执行实际计算。
  • RecursiveTask类:该类扩展了ForkJoinTask类。它也是一个抽象类,应该是实现返回结果的 Fork/Join 任务的起点。
  • RecursiveAction类:该类扩展了ForkJoinTask类。它也是一个抽象类,应该是实现不返回结果的 Fork/Join 任务的起点。
  • CountedCompleter类:该类扩展了ForkJoinTask类。这是 Java8API 的一个新特性,它应该是您实现任务的起点,这些任务在完成时会触发其他任务。

k-means 聚类算法是一种聚类算法,用于将之前未分类为预定义数量的 k 个聚类的一组项目分组。在数据挖掘和机器学习领域,以无监督的方式组织和分类数据非常流行。

每个项目通常由特征或属性向量定义。所有项目都具有相同数量的属性。每个簇也由一个向量定义,该向量具有相同数量的属性,表示分类到该簇中的所有项。这个向量被命名为质心。例如,如果项目是由数字向量定义的,则集群是由分类到该集群的项目的平均值定义的。

基本上,该算法有四个步骤:

  1. 初始化:在第一步中,您必须创建表示 K 簇的初始向量。通常,您将随机初始化这些向量。
  2. 作业:然后,将每个项目分类为一个集群。要选择簇,请计算项目与每个簇之间的距离。您将使用距离度量作为欧几里德距离来计算表示项目的向量和表示集群的向量之间的距离。您将以最短的距离将项目指定给簇。
  3. 更新:一旦所有项目都被分类,您必须重新计算定义每个集群的向量。正如我们前面提到的,您通常计算分类到集群中的项目的所有向量的平均值。
  4. 结束:最后检查某个项目是否改变了其分配集群。如果有任何更改,请再次转到分配步骤。否则,算法将结束,并对项目进行分类。

此算法有以下两个主要限制:

  • 如果您对集群的初始向量进行随机初始化,正如我们前面所建议的,两次执行对同一项目集进行分类可能会得到不同的结果。
  • 集群的数量是预先定义的。从分类的角度来看,此属性的错误选择将导致较差的结果。

尽管如此,该算法还是非常流行于对不同种类的项目进行聚类。为了测试我们的算法,您将实现一个应用来对一组文档进行集群。作为一个文档集合,我们采用了维基百科页面的简化版本,其中包含了我们在第 4 章中介绍的电影语料库信息,从任务中获取数据–可调用和未来接口。我们只拿了 1000 份文件。为了表示每个文档,我们必须使用向量空间模型表示。通过这种表示,每个文档都表示为一个数字向量,其中向量的每个维度表示一个单词或一个术语,其值是定义该单词或术语在文档中重要性的度量。

当使用向量空间模型表示文档集合时,向量的维数将与整个集合中不同单词的数量相同,因此向量将有很多零值,因为每个文档不包含所有单词。您可以在内存中使用更优化的表示,以避免所有这些零值,并节省内存,从而提高应用的性能。

在我们的案例中,我们选择术语频率–逆文档频率tf idf作为定义每个单词重要性的度量,而 tf idf 较高的 50 个单词作为表示每个文档的术语。

我们使用两个文件:movies.words文件存储向量中使用的所有单词的列表,movies.data存储每个文档的表示。movies.data文件的格式如下:

10000202,rabona:23.039285705435507,1979:8.09314752937111,argentina:7.953798614698405,la:5.440565539075689,argentine:4.058577338363469,editor:3.0401515284855267,spanish:2.9692083275217134,image_size:1.3701158713905104,narrator:1.1799670194306195,budget:0.286193223652206,starring:0.25519156764102785,cast:0.2540127604060545,writer:0.23904044207902764,distributor:0.20430284744786784,cinematography:0.182583823735518,music:0.1675671228903468,caption:0.14545085918028047,runtime:0.127767002869991,country:0.12493801913495534,producer:0.12321749670640451,director:0.11592975672109682,links:0.07925582303812376,image:0.07786973207561361,external:0.07764427108746134,released:0.07447174080087617,name:0.07214163435745059,infobox:0.06151153983466272,film:0.035415118094854446

这里,10000202是文档的标识符,文件的其余部分遵循共振峰word:tfxidf

与其他示例一样,我们将实现串行和并发版本,并执行这两个版本,以验证 Fork/Join 框架是否提高了该算法的性能。

普通班

有部分在串行版本和并发版本之间共享。这些部分包括:

  • VocabularyLoader:这个类用于加载构成语料库词汇表的单词列表。
  • WordDocumentDocumentLoader:这三个类用于加载文档信息。这些类在算法的串行版本和并发版本之间有一些差异。
  • DistanceMeasure:计算两个向量之间的欧几里德距离的类。
  • DocumentCluster:存储集群信息的类。

让我们详细了解这些类。

词汇加载器类

正如前面提到的一样,我们的数据存储在两个文件中。其中一个文件是movies.words文件。此文件存储一个包含文档中使用的所有单词的列表。VocabularyLoader类将该文件转换为HashMapHashMap的键是整个单词,该值是一个整数值,该单词的索引在列表中。我们使用该索引来确定单词在表示每个文档的向量空间模型中的位置。

该类只有一个名为load()的方法,该方法接收文件路径作为参数,并返回HashMap

public class VocabularyLoader {

    public static Map<String, Integer> load (Path path) throws IOException {
        int index=0;
        HashMap<String, Integer> vocIndex=new HashMap<String, Integer>();
        try(BufferedReader reader = Files.newBufferedReader(path)){
            String line = null;
            while ((line = reader.readLine()) != null) {
                vocIndex.put(line,index );
                index++;
            }
        }
        return vocIndex;

    }
}

Word、Document 和 DocumentLoader 类

这些类存储关于我们将在算法中使用的文档的所有信息。首先,Word类在文档中存储关于单词的信息。它包括文档中单词的索引和单词的 tf idf。这个类只包含那些属性(intdouble,并且实现了Comparable接口,使用它们的 tf idf 值对两个单词进行排序,所以我们不包含这个类的源代码。

Document类存储文档的所有相关信息。首先,文档中包含单词的Word对象数组。这是我们对向量空间模型的表示。我们只存储文档中使用的单词,以节省大量内存空间。然后是一个带有存储文档的文件名的String对象,最后是一个DocumentCluster对象,用于了解与文档关联的集群。它还包括一个构造函数来初始化这些属性和方法,以获取和设置它们的值。我们只包括setCluster()方法的代码。在这种情况下,此方法将返回一个布尔值,以指示此属性的新值是与旧值相同还是与新值相同。我们将使用该值确定是否停止算法:

public boolean setCluster(DocumentCluster cluster) {
    if (this.cluster == cluster) {
        return false;
    } else {
        this.cluster = cluster;
        return true;
    }
}

最后,DocumentLoader类加载有关文档的信息。它包括一个静态方法,load()接收文件的路径,以及带有词汇表的HashMap并返回Document对象的Array。它逐行加载文件,并将每一行转换为一个Document对象。我们有以下代码:

public static Document[] load(Path path, Map<String, Integer> vocIndex) throws IOException{
    List<Document> list = new ArrayList<Document>();
    try(BufferedReader reader = Files.newBufferedReader(path)) {
        String line = null;
        while ((line = reader.readLine()) != null) {
            Document item = processItem(line, vocIndex);
            list.add(item);
        }
    }
    Document[] ret = new Document[list.size()];
    return list.toArray(ret);

}

要将文本文件的一行转换为Document对象,我们使用processItem()方法:

private static Document processItem(String line,Map<String, Integer> vocIndex) {

    String[] tokens = line.split(",");
    int size = tokens.length - 1;

    Document document = new Document(tokens[0], size);
    Word[] data = document.getData();

    for (int i = 1; i < tokens.length; i++) {
        String[] wordInfo = tokens[i].split(":");
        Word word = new Word();
        word.setIndex(vocIndex.get(wordInfo[0]));
        word.setTfidf(Double.parseDouble(wordInfo[1]));
        data[i - 1] = word;
    }
    Arrays.sort(data);
    return document;
}

正如前面提到的,行中的第一项是文档的标识符。我们从tokens[0]获取,并将其传递给Document类构造函数。然后,对于其余的标记,我们再次拆分它们以获得每个单词的信息,其中包括整个单词和 tf idf 值。

距离测量器类

此类计算文档和簇(表示为向量)之间的欧氏距离。排序后单词数组中的单词的排列顺序与质心数组中的相同,但有些单词可能不存在。对于这些词,我们假设 tf idf 为零,因此距离仅为与质心阵列对应值的平方:

public class DistanceMeasurer {

    public static double euclideanDistance(Word[] words, double[] centroid) {
        double distance = 0;

        int wordIndex = 0;
        for (int i = 0; i < centroid.length; i++) {
            if ((wordIndex < words.length) (words[wordIndex].getIndex() == i)) {
                distance += Math.pow( (words[wordIndex].getTfidf() - centroid[i]), 2);
                wordIndex++;
            } else {
                distance += centroid[i] * centroid[i];
            }
        }

        return Math.sqrt(distance);
    }
}

DocumentCluster 类

这个类存储算法生成的每个集群的信息。此信息包括与此群集关联的所有文档的列表以及表示群集的向量的质心。在这种情况下,该向量的维数与词汇表中的单词相同。这个类有两个属性,一个初始化它们的构造函数,以及获取和设置它们的值的方法。它还包括两种非常重要的方法。首先是calculateCentroid()方法。它计算簇的质心,作为表示与该簇关联的文档的向量的平均值。我们有以下代码:

public void calculateCentroid() {

    Arrays.fill(centroid, 0);

    for (Document document : documents) {
        Word vector[] = document.getData();

        for (Word word : vector) {
            centroid[word.getIndex()] += word.getTfidf();
        }
    }

    for (int i = 0; i < centroid.length; i++) {
        centroid[i] /= documents.size();
    }
}

第二种方法是initialize()方法,它接收Random对象,并用随机数初始化簇的质心向量,如下所示:

public void initialize(Random random) {
    for (int i = 0; i < centroid.length; i++) {
        centroid[i] = random.nextDouble();
    }
}

序列版本

一旦我们描述了应用的公共部分,让我们看看如何实现 k-means 聚类算法的串行版本。我们将使用两个类:SerialKMeans,实现该算法,以及SerialMain,实现main()方法来执行该算法。

SerialKMeans 类

SerialKMeans类实现了 k-means 聚类算法的串行版本。该类的主要方法是calculate()方法。它接收以下参数:

  • 包含文档信息的Document对象数组
  • 要生成的群集数
  • 词汇量
  • 随机数发生器的种子

方法返回DocumentCluster对象的Array。每个集群都有与其关联的文档列表。首先,文档创建由numberClusters参数确定的集群Array并使用initialize()方法和Random对象对其进行初始化,如下所示:

public class SerialKMeans {

    public static DocumentCluster[] calculate(Document[] documents, int clusterCount, int vocSize, int seed) {
        DocumentCluster[] clusters = new DocumentCluster[clusterCount];

        Random random = new Random(seed);
        for (int i = 0; i < clusterCount; i++) {
            clusters[i] = new DocumentCluster(vocSize);
            clusters[i].initialize(random);
        }

然后,我们重复分配和更新阶段,直到所有文档都位于同一集群中。最后,我们返回包含文档最终组织的集群数组,如下所示:

        boolean change = true;

        int numSteps = 0;
        while (change) {
            change = assignment(clusters, documents);
            update(clusters);
            numSteps++;
        }
        System.out.println("Number of steps: "+numSteps);
        return clusters;
    }

分配阶段在assignment()方法中实现。此方法接收DocumentDocumentCluster对象的数组。对于每个文档,它计算文档与所有簇之间的欧氏距离,并将文档指定给距离最小的簇。它返回一个布尔值,以指示一个或多个文档是否已将其分配的集群从一个步骤更改为下一个步骤。我们有以下代码:

private static boolean assignment(DocumentCluster[] clusters, Document[] documents) {

    boolean change = false;

    for (DocumentCluster cluster : clusters) {
        cluster.clearClusters();
    }

    int numChanges = 0;
    for (Document document : documents) {
        double distance = Double.MAX_VALUE;
        DocumentCluster selectedCluster = null;
        for (DocumentCluster cluster : clusters) {
            double curDistance = DistanceMeasurer.euclideanDistance(document.getData(), cluster.getCentroid());
            if (curDistance < distance) {
                distance = curDistance;
                selectedCluster = cluster;
            }
        }
        selectedCluster.addDocument(document);
        boolean result = document.setCluster(selectedCluster);
        if (result)
            numChanges++;
    }
    System.out.println("Number of Changes: " + numChanges);
    return numChanges > 0;
}

update()方法中实现更新步骤。它接收带有簇信息的DocumentCluster数组,并简单地重新计算每个簇的质心:

    private static void update(DocumentCluster[] clusters) {
        for (DocumentCluster cluster : clusters) {
            cluster.calculateCentroid();
        }

    }

}

SerialMain 类SerialMain类包括启动 k-means 算法测试的main()方法。首先,它从文件中加载数据(文字和文档):

public class SerialMain {

    public static void main(String[] args) {
        Path pathVoc = Paths.get("data", "movies.words");

        Map<String, Integer> vocIndex=VocabularyLoader.load(pathVoc);
        System.out.println("Voc Size: "+vocIndex.size());

        Path pathDocs = Paths.get("data", "movies.data");
        Document[] documents = DocumentLoader.load(pathDocs, vocIndex);
        System.out.println("Document Size: "+documents.length);

然后,它初始化我们想要生成的集群数量和随机数生成器的种子。如果它们不是 main()方法的参数,我们使用默认值如下:

    if (args.length != 2) {
        System.err.println("Please specify K and SEED");
        return;
    }
    int K = Integer.valueOf(args[0]);
    int SEED = Integer.valueOf(args[1]);
}

最后,我们启动算法,测量其执行时间,并写入每个集群的文档数。

        Date start, end;
        start=new Date();
        DocumentCluster[] clusters = SerialKMeans.calculate(documents, K ,vocIndex.size(), SEED);
        end=new Date();
        System.out.println("K: "+K+"; SEED: "+SEED);
        System.out.println("Execution Time: "+(end.getTime()- start.getTime()));
        System.out.println(
            Arrays.stream(clusters).map (DocumentCluster::getDocumentCount).sorted (Comparator.reverseOrder())
                        .map(Object::toString).collect( Collectors.joining(", ", "Cluster sizes: ", "")));
    }
}

并发版本

为了实现算法的并发版本,我们使用了 Fork/Join 框架。我们基于RecursiveAction类实现了两个不同的任务。正如我们前面提到的,RecursiveAction任务是在您希望将 Fork/Join 框架用于不返回结果的任务时使用的。我们已经将分配和更新阶段实现为在 Fork/Join 框架中执行的任务。

为了实现 k-means 算法的并发版本,我们将修改一些常用类以使用并发数据结构。然后,我们将实现这两个任务,最后,我们将实现实现算法并发版本的ConcurrentKMeansConcurrentMain类来测试它。

Fork/Join 框架的两个任务–AssignmentTask 和 UpdateTask

如前所述,我们已经将分配和更新阶段作为任务实现,将在 Fork/Join 框架中实现。

分配阶段将文档分配给与文档欧氏距离最小的簇。因此,我们必须处理所有文档,并计算所有文档和所有簇的欧氏距离。我们将使用任务必须处理的文档数量作为控制是否必须拆分任务的度量。我们从必须处理所有文档的任务开始,我们将对它们进行拆分,直到我们的任务必须处理大量小于预定义大小的文档。

AssignmentTask类具有以下属性:

  • 包含集群数据的ConcurrentDocumentCluster对象数组
  • 包含文档数据的ConcurrentDocument对象数组
  • 两个整数属性startend,用于确定任务必须处理的文档数量
  • 一个AtomicInteger属性numChanges,用于存储已将其分配的集群从上次执行更改为当前执行的文档数
  • 整数属性maxSize,用于存储任务可以处理的最大文档数

我们已经实现了一个构造函数来初始化所有这些属性和方法,以获取和设置其值。

这些任务的主要方法是compute()方法(与每项任务一样)。首先,我们检查任务必须处理的文档数量。如果小于或等于maxSize属性,我们将处理这些文档。我们计算每个文档和所有聚类之间的欧氏距离,并选择距离最小的聚类。如果有必要,我们使用incrementAndGet()方法增加numChanges原子变量。原子变量可以由多个线程同时更新,而无需使用同步机制,也不会导致任何内存不一致。请参阅以下代码:

protected void compute() {
    if (end - start <= maxSize) {
        for (int i = start; i < end; i++) {
            ConcurrentDocument document = documents[i];
            double distance = Double.MAX_VALUE;
            ConcurrentDocumentCluster selectedCluster = null;
            for (ConcurrentDocumentCluster cluster : clusters) {
                double curDistance = DistanceMeasurer.euclideanDistance (document.getData(), cluster.getCentroid());
                if (curDistance < distance) {
                    distance = curDistance;
                    selectedCluster = cluster;
                }
            }
            selectedCluster.addDocument(document);
            boolean result = document.setCluster(selectedCluster);
            if (result) {
                numChanges.incrementAndGet();
            }

        }

如果任务必须处理的文档的数量太大,我们将该集合分为两部分,并创建两个新任务来处理这些部分,如下所示:

    } else {
        int mid = (start + end) / 2;
        AssignmentTask task1 = new AssignmentTask(clusters, documents, start, mid, numChanges, maxSize);
        AssignmentTask task2 = new AssignmentTask(clusters, documents, mid, end, numChanges, maxSize);

        invokeAll(task1, task2);
    }
}

为了在 Fork/Join 池中执行这些任务,我们使用了invokeAll()方法。此方法将在任务完成执行后返回。

更新阶段重新计算每个簇的质心作为所有文档的平均值。因此,我们必须处理所有集群。我们将使用任务必须处理的集群数量作为控制是否必须拆分任务的度量。我们从一个必须处理所有集群的任务开始,我们将对其进行拆分,直到我们的任务必须处理一些小于预定义大小的集群。

UpdateTask类具有以下属性:

  • 包含集群数据的ConcurrentDocumentCluster对象数组
  • 两个整数属性startend,用于确定任务必须处理的集群数量
  • 整数属性maxSize,用于存储任务可以处理的最大集群数

我们已经实现了构造函数来初始化所有这些属性和方法,以获取和设置其值。

compute()方法首先检查任务必须处理的集群数量。如果该数字小于或等于maxSize属性,它将处理这些簇并更新其质心:

@Override
protected void compute() {
    if (end - start <= maxSize) {
        for (int i = start; i < end; i++) {
            ConcurrentDocumentCluster cluster = clusters[i];
            cluster.calculateCentroid();
        }

如果任务必须处理的集群数量太多,我们将把任务必须处理的集群集分成两部分,并创建两个任务来处理每个部分,如下所示:

    } else {
        int mid = (start + end) / 2;
        UpdateTask task1 = new UpdateTask(clusters, start, mid, maxSize);
        UpdateTask task2 = new UpdateTask(clusters, mid, end, maxSize);

        invokeAll(task1, task2);
    }
}

ConcurrentKMeans 类

ConcurrentKMeans类实现了 k-means 聚类算法的并发版本。作为串行版本,类的主要方法是calculate()方法。它接收以下参数:

  • 包含文档信息的ConcurrentDocument对象数组
  • 要生成的群集数
  • 词汇量
  • 随机数生成器的一个种子
  • Fork/Join 任务在不将任务拆分为其他任务的情况下将处理的最大项目数

calculate()方法返回包含集群信息的ConcurrentDocumentCluster对象数组。每个集群都有与其关联的文档列表。首先,文档创建由numberClusters参数确定的集群数组,并使用initialize()方法和Random对象对其进行初始化:

public class ConcurrentKMeans {

    public static ConcurrentDocumentCluster[] calculate(ConcurrentDocument[] documents int numberCluster int vocSize, int seed, int maxSize) {
        ConcurrentDocumentCluster[] clusters = new ConcurrentDocumentCluster[numberClusters];

        Random random = new Random(seed);
        for (int i = 0; i < numberClusters; i++) {
            clusters[i] = new ConcurrentDocumentCluster(vocSize);
            clusters[i].initialize(random);
        }

然后,我们重复分配和更新阶段,直到所有文档都位于同一集群中。在循环之前,我们创建将执行该任务及其所有子任务的ForkJoinPool。循环完成后,与其他Executor对象一样,我们必须使用具有 Fork/Join 池的shutdown()方法来完成其执行。最后,我们返回带有文档最终组织的集群数组:

        boolean change = true;
        ForkJoinPool pool = new ForkJoinPool();

        int numSteps = 0;
        while (change) {
            change = assignment(clusters, documents, maxSize, pool);
            update(clusters, maxSize, pool);
            numSteps++;
        }
        pool.shutdown();
        System.out.println("Number of steps: "+numSteps); return clusters;
    }

分配阶段在assignment()方法中实现。此方法接收集群数组、文档数组和maxSize属性。首先,我们删除所有集群的关联文档列表:

    private static boolean assignment(ConcurrentDocumentCluster[] clusters, ConcurrentDocument[] documents, int maxSize, ForkJoinPool pool) {

        boolean change = false;

        for (ConcurrentDocumentCluster cluster : clusters) {
            cluster.clearDocuments();
        }

然后,我们初始化必要的对象:AtomicInteger存储分配的集群已更改的文档数量,以及将开始该过程的AssignmentTask

        AtomicInteger numChanges = new AtomicInteger(0);
        AssignmentTask task = new AssignmentTask(clusters, documents, 0, documents.length, numChanges, maxSize);

然后,我们使用ForkJoinPoolexecute()方法异步执行池中的任务,并使用AssignmentTask对象的join()方法等待其完成,如下所示:

        pool.execute(task);
        task.join();

最后,我们检查已更改其指定集群的文档数。如果有变化,我们返回true值。否则返回false值。我们有以下代码:

        System.out.println("Number of Changes: " + numChanges);
        return numChanges.get() > 0;
    }

更新阶段在update()方法中实现。它接收集群阵列和maxSize参数。首先,我们创建一个UpdateTask对象来更新所有集群。然后,我们在方法作为参数接收的ForkJoinPool对象中执行该任务,如下所示:

    private static void update(ConcurrentDocumentCluster[] clusters, int maxSize, ForkJoinPool pool) {
        UpdateTask task = new UpdateTask(clusters, 0, clusters.length, maxSize, ForkJoinPool pool);
         pool.execute(task);
         task.join();
    }
}

ConcurrentMain 类

ConcurrentMain类包括main()方法,用于启动 k-means 算法的测试。其代码与SerialMain类相同,但将串行类更改为并发类。

比较解决方案

为了比较两种解决方案,我们进行了不同的实验,改变三个不同参数的值:

  • k 参数将确定我们想要生成的集群数量。我们已经用值 5、10、15 和 20 测试了算法。
  • Random编号生成器的种子。该种子确定初始质心位置的方式。我们已经用值 1 和 13 测试了算法。
  • 对于并发算法,maxSize参数用于确定项目(文档或集群)的最大数量,一个任务可以在不拆分为其他任务的情况下进行处理。我们已经用值 1、20 和 400 测试了算法。

我们已经使用 JMH 框架(执行了实验 http://openjdk.java.net/projects/code-tools/jmh/ ),允许您在 Java 中实现微基准测试。使用基准测试框架是一个更好的解决方案,它可以使用currentTimeMillis()nanoTime()等方法简单地测量时间。我们已经在一台四核处理器的计算机上执行了 10 次,并计算了这 10 次的中间执行时间。以下是我们以毫秒为单位获得的执行时间:

|   |   |

电视连续剧

|

同时发生的

| | --- | --- | --- | --- | | K | 种子 |   | 最大尺寸=1 | 最大尺寸=20 | 最大尺寸=400 | | 5. | 1. | 6676.141 | 4696.414 | 3291.397 | 3179.673 | | 10 | 1. | 6780.088 | 3365.731 | 2970.056 | 2825.488 | | 15 | 1. | 12936.178 | 5308.734 | 4737.329 | 4490.443 | | 20 | 1. | 19824.729 | 7937.820 | 7347.445 | 6848.873 | | 5. | 13 | 3738.869 | 2714.325 | 1984.152 | 1916.053 | | 10 | 13 | 9567.416 | 4693.164 | 3892.526 | 3739.129 | | 15 | 13 | 12427.589 | 5598.996 | 4735.518 | 4468.721 | | 20 | 13 | 18157.913 | 7285.565 | 6671.283 | 6325.664 |

我们可以得出以下结论:

  • 种子在执行时间上具有重要且不可预测的影响。有时,种子 13 的执行时间较低,但种子 1 的执行时间较低。
  • 增加集群数量时,执行时间也会增加。
  • maxSize参数对执行时间影响不大。参数 K 或 seed 对执行时间的影响较大。如果增加参数的值,将获得更好的性能。1 和 20 之间的差异大于 20 和 400 之间的差异。
  • 在所有情况下,该算法的并发版本比串行版本具有更好的性能。

对于示例,如果我们将参数为 K=20 和 seed=13 的串行算法与参数为 K=20、seed=13 和 maxSize=400 的并发版本进行加速比较,我们得到以下结果:

Comparing the solutions

假设您有很多描述项目列表的数据。例如,您拥有许多人的许多属性(姓名、姓氏、地址、电话号码等)。获取符合特定标准的数据是一种常见的需求,例如,您希望获取居住在特定街道或具有特定姓名的人。

在本节中,您将实现其中一个过滤程序。我们使用了 UCI 的人口普查收入 KDD数据集(您可以从下载)https://archive.ics.uci.edu/ml/datasets/Census-Income+%28KDD%29,其中包含从美国人口普查局进行的 1994 年和 1995 年当前人口调查中提取的加权人口普查数据。

在本例的并发版本中,您将学习如何取消在 Fork/Join 池中运行的任务,以及如何管理任务中可能引发的未检查异常。

常用部件

我们实现了一些类来读取文件中的数据并过滤数据。这些类由算法的串行和并发版本使用。这些是课程:

  • CensusData类:这个类存储 39 个定义每个人的属性。它定义了获取和设置其值的属性和方法。我们将用一个数字来标识每个属性。这个类的evaluateFilter()方法包含属性的编号和名称之间的关联。您可以查看文件https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census-income.names 获取每个属性的详细信息。
  • CensusDataLoader类:此类从文件中加载普查数据。它有一个load()方法,该方法接收文件路径作为输入参数,并返回一个包含文件中所有人员信息的CensusData数组。
  • FilterData类:这个类定义了一个数据过滤器。过滤器包括属性的编号和该属性的值。
  • Filter类:此类实现确定CensusData对象是否满足过滤器列表条件的方法。

我们不包括这些类的源代码。它们非常简单,您可以检查示例的源代码。

序列版本

我们已经在两个类中实现了滤波器算法的串行版本。SerialSearch类对数据进行过滤。它提供了两种方法:

  • findAny()方法:它接收CensusData对象的数组作为参数,其中包含来自文件的所有数据和过滤器列表,并返回一个CensusData对象,其中包含它从过滤器中找到的第一个满足所有条件的人。
  • findAll()方法:接收CensusData对象数组作为参数,包含来自文件的所有数据和过滤器列表,并从过滤器返回CensusData对象数组,其中包含满足所有条件的所有人员。

SerialMain类实现了该版本的main()方法,并对其进行测试,以测量该算法在某些情况下的执行时间。

SerialSearch 类

正如前面提到的类实现了对数据的过滤。它提供了两种方法。第一个方法是findAny()方法,它查找满足过滤器要求的第一个数据对象。当它找到第一个数据对象时,它将完成其执行。请参阅以下代码:

public class SerialSearch {

    public static CensusData findAny (CensusData[] data, List<FilterData> filters) {
        int index=0;
        for (CensusData censusData : data) {
            if (Filter.filter(censusData, filters)) {
                System.out.println("Found: "+index);
                return censusData;
            }
            index++;
        }

        return null;
    }

第二个方法是findAll()方法,返回一个CensusData对象数组,其中包含满足过滤器要求的所有对象,如下所示:

    public static List<CensusData> findAll (CensusData[] data, List<FilterData> filters) {
        List<CensusData> results=new ArrayList<CensusData>();

        for (CensusData censusData : data) {
            if (Filter.filter(censusData, filters)) {
                results.add(censusData);
            }
        }
        return results;
    }
}

主课

您将使用这个类在不同的情况下测试过滤算法。首先,我们从文件中加载数据,如下所示:

public class SerialMain {
    public static void main(String[] args) {
        Path path = Paths.get("data","census-income.data");

        CensusData data[]=CensusDataLoader.load(path);
        System.out.println("Number of items: "+data.length);

        Date start, end;

我们要测试的第一种情况是使用findAny()方法来查找数组第一个位置存在的对象。您构建一个过滤器列表,然后使用文件的数据和过滤器列表调用findAny()方法:

        List<FilterData> filters=new ArrayList<>();
        FilterData filter=new FilterData();
        filter.setIdField(32);
        filter.setValue("Dominican-Republic");
        filters.add(filter);
        filter=new FilterData();
        filter.setIdField(31);
        filter.setValue("Dominican-Republic");
        filters.add(filter);
        filter=new FilterData();
        filter.setIdField(1);
        filter.setValue("Not in universe");
        filters.add(filter);
        filter=new FilterData();
        filter.setIdField(14);
        filter.setValue("Not in universe");
        filters.add(filter);
        start=new Date();
        CensusData result=SerialSearch.findAny(data, filters);
        System.out.println("Test 1 - Result: "+result.getReasonForUnemployment());
        end=new Date();
        System.out.println("Test 1- Execution Time: "+(end.getTime()-start.getTime()));

我们的过滤器查找以下属性:

  • 32:这是生父属性的国家
  • 31:这是生母属性的国家
  • 1:工人属性的类别;Not in universe是它们可能的值之一
  • 14:这是失业属性的原因;Not in universe是它们可能的值之一

我们将测试其他情况,如下所示:

  • 使用findAny()方法查找数组最后位置存在的对象
  • 使用findAny()方法尝试查找不存在的对象
  • 在错误情况下使用findAny()方法
  • 使用findAll()方法获取满足过滤器列表的所有对象
  • 在错误情况下使用findAll()方法

并发版本

我们将在并发版本中包含更多元素:

  • 任务管理器:当您使用 Fork/Join 框架时,您从一个任务开始,然后将该任务拆分为两个(或多个)子任务,然后一次又一次地拆分,直到您的问题达到所需的大小。在某些情况下,您可能希望完成所有这些任务的执行。例如,当您实现findAny()方法并找到满足所有条件的对象时,您不需要继续执行其余任务。
  • 实现findAny()方法的RecursiveTask类:扩展RecursiveTaskIndividualTask类。
  • 实现findAll()方法的RecursiveTask类:扩展RecursiveTaskListTask类。

让我们看看所有这些类的详细信息。

TaskManager 类

我们将使用这个类来控制任务的取消。在以下两种情况下,我们将取消任务的执行:

  • 您正在执行findAny()操作,发现一个符合要求的对象
  • 您正在执行findAny()findAll()操作,其中一个任务中存在未检查的异常

该类声明了两个属性:ConcurrentLinkedDeque用于存储我们需要取消的所有任务,以及一个AtomicBoolean变量以确保只有一个任务执行cancelTasks()方法:

public class TaskManager {

    private Set<RecursiveTask> tasks;
    private AtomicBoolean cancelled;

    public TaskManager() {
        tasks = ConcurrentHashMap.newKeySet();
        cancelled = new AtomicBoolean(false);
    }

定义了向ConcurrentLinkedDeque添加任务、从ConcurrentLinkedDeque中删除任务、取消其中存储的所有任务的方法。为了取消任务,我们使用在ForkJoinTask类中定义的cancel()方法。true参数强制中断正在运行的任务,如下所示:

    public void addTask(RecursiveTask task) {
        tasks.add(task);
    }

    public void cancelTasks(RecursiveTask sourceTask) {

        if (cancelled.compareAndSet(false, true)) {
            for (RecursiveTask task : tasks) {
                if (task != sourceTask) {
                    if(cancelled.get()) {
                        task.cancel(true);
                    } 
                    else {
                        tasks.add(task);
                    }
                }
            }
        }
    }

    public void deleteTask(RecursiveTask task) {
        tasks.remove(task);
    }

cancelTasks()方法接收RecursiveTask对象作为参数。我们将取消所有任务,除了调用此方法的任务。我们不想取消找到结果的任务。compareAndSet(false, true)方法将AtomicBoolean变量设置为true,仅当当前值为false时返回true。如果AtomicBoolean变量已经有true值,则返回false。整个操作是以原子方式执行的,因此即使从不同线程并发调用了多次cancelTasks()方法,也可以保证 if 语句体最多执行一次。

个人任务类

IndividualTask类对CensusData任务参数化的RecursiveTask类进行扩展,实现findAny()操作。它定义了以下属性:

  • 包含所有CensusData对象的数组

  • 决定必须处理的元素的startend属性

  • size属性,用于确定任务将在不拆分任务的情况下处理的最大元素数

  • 一个TaskManager类,用于在必要时取消任务

  • 以下代码给出了要应用的过滤器列表:

    private CensusData[] data;
    private int start, end, size;
    private TaskManager manager;
    private List<FilterData> filters;
    
    public IndividualTask(CensusData[] data, int start, int end, TaskManager manager, int size, List<FilterData> filters) {
        this.data = data;
        this.start = start;
        this.end = end;
        this.manager = manager;
        this.size = size;
        this.filters = filters;
    }

本课程的主要方法是compute()方法。它返回一个CensusData对象。如果任务必须处理的元素数小于 size 属性,则直接查找对象。如果该方法找到所需的对象,则返回该对象并使用方法cancelTasks()取消其余任务的执行。如果该方法没有找到所需的对象,则返回 null。我们有以下代码:

if (end - start <= size) {
    for (int i = start; i < end && ! Thread.currentThread().isInterrupted(); i++) {
        CensusData censusData = data[i];
        if (Filter.filter(censusData, filters)) {
            System.out.println("Found: " + i);
            manager.cancelTasks(this);
            return censusData;
            }
        }
        return null;
    }

如果必须处理的项目数大于 size 属性,我们将创建两个子任务来处理一半的元素:

        } else {
            int mid = (start + end) / 2;
            IndividualTask task1 = new IndividualTask(data, start, mid, manager, size, filters);
            IndividualTask task2 = new IndividualTask(data, mid, end, manager, size, filters);

然后,我们将新创建的任务添加到任务管理器,并删除实际任务。如果要取消任务,则只需取消正在运行的任务:

            manager.addTask(task1);
            manager.addTask(task2);
            manager.deleteTask(this);

然后,我们使用异步方式发送任务的fork()方法将任务发送到ForkJoinPool,并使用quietlyJoin()方法等待任务完成。join()方法和quietlyJoin()方法的区别在于,如果任务被取消或在方法内部引发未检查的异常,join() 方法将启动异常, quietlyJoin()方法不会引发任何异常。

            task1.fork();
            task2.fork();
            task1.quietlyJoin();
            task2.quietlyJoin();

然后,我们从TaskManager类中删除子任务,如下所示:

            manager.deleteTask(task1);
            manager.deleteTask(task2);

现在,我们使用 join()方法获得任务的结果。如果任务抛出未经检查的异常,它将在不进行特殊处理的情况下传播,取消将被忽略,如下所示:

        try {
            CensusData res = task1.join();
            if (res != null)
                return res;
                manager.deleteTask(task1);
        } catch (CancellationException ex) {
        }
        try {
            CensusData res = task2.join();
            if (res != null)
                return res;
            manager.deleteTask(task2);
        } catch (CancellationException ex) {
        }
        return null;
    }
}

ListTask 类

ListTask类扩展了参数化为CensusDataList类。我们将使用此任务执行findAll()操作。这与IndividualTask任务非常相似。两者使用相同的属性,但在compute()方法中有所不同。

首先,我们初始化一个List对象以返回结果并检查任务必须处理的元素数量。如果任务必须处理的元素数小于“大小”属性,请将满足过滤器中指定条件的所有对象添加到结果列表中:

@Override
protected List<CensusData> compute() {
    List<CensusData> ret = new ArrayList<CensusData>();
    if (end - start <= size) {
        for (int i = start; i < end; i++) {
            CensusData censusData = data[i];
            if (Filter.filter(censusData, filters)) {
                ret.add(censusData);
            }
        }

如果需要处理的项数大于 size 属性,我们将创建两个子任务来处理一半的元素:

        int mid = (start + end) / 2;
        ListTask task1 = new ListTask(data, start, mid, manager, size, filters);
        ListTask task2 = new ListTask(data, mid, end, manager, size, filters);

然后,我们将新创建的任务添加到任务管理器中,并删除实际任务。实际任务不会被取消;其子任务将被取消,如下所示:

        manager.addTask(task1);
        manager.addTask(task2);
        manager.deleteTask(this);

然后,我们将使用异步发送任务的fork() 方法将任务发送到ForkJoinPool,并使用quietlyJoin()方法等待任务完成:

        task1.fork();
        task2.fork();
        task2.quietlyJoin();
        task1.quietlyJoin();

然后,我们将从TaskManager中删除子任务:

        manager.deleteTask(task1);
        manager.deleteTask(task2);

现在,我们使用 join()方法获得任务的结果。如果任务抛出未经检查的异常,它将在不进行特殊处理的情况下传播,取消将被忽略:

   try {
    List<CensusData> tmp = task1.join();
    if (tmp != null)
     ret.addAll(tmp);
    manager.deleteTask(task1);
   } catch (CancellationException ex) {
   }
   try {
    List<CensusData> tmp = task2.join();
    if (tmp != null)
     ret.addAll(tmp);
    manager.deleteTask(task2);
   } catch (CancellationException ex) {
   }

ConcurrentSearch 类

ConcurrentSearch类实现了findAny()findAll() 方法。它们具有与串行版本相同的接口。在内部,他们初始化TaskManager对象和第一个任务,并使用execute方法发送到默认ForkJoinPool;他们等待任务的最终完成并写出结果。这是findAny()方法的代码:

public class ConcurrentSearch {

    public static CensusData findAny (CensusData[] data, List<FilterData> filters, int size) {
        TaskManager manager=new TaskManager();
        IndividualTask task=new IndividualTask(data, 0, data.length, manager, size, filters);
        ForkJoinPool.commonPool().execute(task);
        try {
            CensusData result=task.join();
            if (result!=null) {
                System.out.println("Find Any Result: "+result.getCitizenship());
            return result;
        } catch (Exception e) {
            System.err.println("findAny has finished with an error: "+task.getException().getMessage());
        }

        return null;
    }

这是findAll()方法的代码:

    public static CensusData[] findAll (CensusData[] data, List<FilterData> filters, int size) {
        List<CensusData> results;
        TaskManager manager=new TaskManager();
        ListTask task=new ListTask(data,0,data.length,manager, size,filters);
        ForkJoinPool.commonPool().execute(task);
        try {
            results=task.join();

            return results;
        } catch (Exception e) {
            System.err.println("findAny has finished with an error: " + task.getException().getMessage());
        }
        return null;
    }

ConcurrentMain 类

ConcurrentMain类用于测试对象过滤器的并发版本。它与SerialMain类相同,但使用并发版本的操作。

比较两个版本

为了比较过滤算法的串行版本和并发版本,我们在六种不同的情况下对它们进行了测试:

  • 测试 1:我们测试findAny()方法,寻找存在于CensusData数组第一个位置的对象
  • 测试 2:我们测试findAny() 方法寻找CensusData数组最后位置存在的对象
  • 测试 3:我们测试findAny()方法,寻找一个不存在的对象
  • 测试 4:我们在错误情况下测试findAny()方法
  • 测试 5:我们在正常情况下测试findAll()方法
  • 测试 6:我们在错误情况下测试 findAll() 方法

对于该算法的并发版本,我们测试了大小参数的三个不同值,它们决定了一个任务在不分叉两个子任务的情况下可以处理的最大元素数。我们用 10200 和 2000 进行了测试。

我们已经使用 JMH 框架(执行了测试 http://openjdk.java.net/projects/code-tools/jmh/ ),允许您在 Java 中实现微基准测试。使用基准测试框架是一个更好的解决方案,它使用currentTimeMillis()nanoTime()等方法简单地测量时间。我们已经在一台四核处理器的计算机上执行了 10 次,并计算了这 10 次的中间执行时间。与其他示例一样,我们以毫秒为单位测量了执行时间:

|

测试用例

|

电视连续剧

|

并发大小=10

|

并发大小=200

|

并发大小=2000

|

最好的

| | --- | --- | --- | --- | --- | --- | | 试验 1 | 1.177 | 8.124 | 4.547 | 4.073 | 电视连续剧 | | 测试 2 | 95.237 | 157.412 | 34.581 | 35.691 | 同时发生的 | | 测试 3 | 66.616 | 41.916 | 74.829 | 37.140 | 同时发生的 | | 测试 4 | 0.540 | 25869.339 | 643.144 | 9.673 | 电视连续剧 | | 测试 5 | 61.752 | 37.349 | 40.344 | 22.911 | 同时发生的 | | 测试 6 | 0.802 | 31663.607 | 231.440 | 7.706 | 电视连续剧 |

我们可以得出以下结论:

  • 当我们必须处理较少数量的元素时,该算法的串行版本具有更好的性能。
  • 当我们必须处理所有元素或少量元素时,该算法的并发版本具有更好的性能。
  • 在错误情况下,该算法的串行版本比并发版本具有更好的性能。当size参数的值很小时,并发版本在这种情况下的性能非常差。

在这种情况下,并发并不总是能提高性能。

merge 排序算法是一种非常流行的排序算法,通常使用分治技术来实现,因此它是使用 Fork/Join 框架进行测试的非常好的候选者。

为了实现合并排序算法,我们将未排序的列表划分为一个元素的子列表。然后,我们合并那些未排序的子列表以生成有序的子列表,直到我们处理完所有的子列表,我们只有原始列表,但所有元素都已排序。

为了制作我们算法的并发版本,我们使用了新的 Fork/Join 任务,CountedCompleter任务,在 Java8 版本中引入。这些任务最重要的特征是,它们包含了一个方法,当它们的所有子任务都已完成执行时,该方法将被执行。

为了测试实现,我们使用了亚马逊产品联合购买网络元数据(您可以从下载)https://snap.stanford.edu/data/amazon-meta.html )。特别是,我们创建了一个 salesrank 为 542184 种产品的列表。我们将测试我们的算法版本,对这个产品列表进行排序,并将执行时间与Arrays类的sort()parallelSort()方法进行比较。

共享类

正如我们前面提到的一样,我们已经建立了一个包含 542184 个亚马逊产品的列表,其中包含每个产品的信息,包括 ID、名称、组、salesrank、评论数量、类似产品的数量以及产品所属类别的数量。我们已经实现了AmazonMetaData类来存储产品的信息。此类声明获取和设置其值所需的属性和方法。这个类实现了Comparable接口来比较这个类的两个实例。我们希望按 salesrank 按升序对元素进行排序。为了实现compare()方法,我们使用Long类的compare()方法来比较两个对象的 salesrank,如下所示:

    public int compareTo(AmazonMetaData other) {
        return Long.compare(this.getSalesrank(), other.getSalesrank());
    }

我们还实现了提供load()方法的AmazonMetaDataLoader。该方法以数据作为参数接收到文件的路由,并返回一个包含所有产品信息的AmazonMetaData对象数组。

我们不包括这些类的源代码来关注 Fork/Join 框架的特性。

序列版本

我们已经在SerialMergeSort类中实现了 merge 排序算法的串行版本,它实现了算法和SerialMetaData类,并提供了main()方法来测试算法。

SerialMergeSort 类

SerialMergeSort类实现了合并排序算法的串行版本。提供接收以下参数的mergeSort()方法:

  • 包含要排序的所有数据的数组
  • 方法必须处理的第一个元素(包括)
  • 方法必须处理的最后一个元素(不包括)

如果方法必须只处理一个元素,则返回。否则,它将对mergeSort()方法进行两次递归调用。第一个调用将处理前一半的元素,第二个调用将处理后一半的元素。最后,我们调用merge()方法来合并元素的两半,并得到元素的排序列表:

public void mergeSort (Comparable data[], int start, int end) {
    if (end-start < 2) { 
        return;
    }
    int middle = (end+start)>>>1;
    mergeSort(data,start,middle);
    mergeSort(data,middle,end);
    merge(data,start,middle,end);
}

我们已经使用了(end+start)>>>1操作符来获取中间元素来拆分数组。例如,如果您有 15 亿个元素(对于现代内存芯片来说并非不可能),它仍然适合 Java 阵列。但是,(结束+开始)/2将溢出,导致一个负数字数组。有关此问题的详细说明,请参见http://googleresearch.blogspot.ru/2006/06/extra-extra-read-all-about-it-nearly.html

merge()方法合并两个元素列表以获得排序列表。它接收以下参数:

  • 包含要排序的所有数据的数组
  • 三个元素(startmidend)决定了我们要合并和排序的数组的两个部分(开始-中间、中间-结束)

我们创建一个临时数组来对元素进行排序,对数组中处理列表两部分的元素进行排序,并将排序后的列表存储在原始数组的相同位置。检查以下代码:

    private void merge(Comparable[] data, int start, int middle, int end) {
        int length=end-start+1;
        Comparable[] tmp=new Comparable[length];
        int i, j, index;
        i=start;
        j=middle;
        index=0;
        while ((i<middle) && (j<end)) {
            if (data[i].compareTo(data[j])<=0) {
                tmp[index]=data[i];
                i++;
            } else {
                tmp[index]=data[j];
                j++;
            }
            index++;
        }

        while (i<middle) {
            tmp[index]=data[i];
            i++;
            index++;
        }

        while (j<end) {
            tmp[index]=data[j];
            j++;
            index++;
        }

        for (index=0; index < (end-start); index++) {
            data[index+start]=tmp[index];
        }
    }
}

SerialMetaData 类

SerialMetaData类提供了main()方法来测试算法。我们将执行每个排序算法 10 次,以计算平均执行时间。首先,我们从文件中加载数据并创建数组的副本:

public class SerialMetaData {

    public static void main(String[] args) {
    for (int j=0; j<10; j++) {
        Path path = Paths.get("data","amazon-meta.csv");

        AmazonMetaData[] data = AmazonMetaDataLoader.load(path);
        AmazonMetaData data2[] = data.clone();

然后,我们使用Arrays类的sort()方法对第一个数组进行排序:

        Date start, end;

        start = new Date();
        Arrays.sort(data);
        end = new Date();
        System.out.println("Execution Time Java Arrays.sort(): " + (end.getTime() - start.getTime()));

然后,我们使用合并排序算法的实现对第二个数组进行排序:

        SerialMergeSort mySorter = new SerialMergeSort();
        start = new Date();
        mySorter.mergeSort(data2, 0, data2.length);
        end = new Date();
        System.out.println("Execution Time Java SerialMergeSort: " + (end.getTime() - start.getTime()));

最后,我们检查排序的数组是否相同:

        for (int i = 0; i < data.length; i++) {
            if (data[i].compareTo(data2[i]) != 0) {
                System.err.println("There's a difference is position " + i);
                System.exit(-1);
            }
        }
        System.out.println("Both arrays are equal");
    }
}
}

并发版本

正如我们前面提到的,我们将使用新的 Java8CountedCompleter类作为 Fork/Join 任务的基类。此类提供了一种机制,用于在方法的所有子任务都已完成时执行该方法。这是onCompletion()方法。因此,我们使用compute()方法划分数组,并使用onCompletion()方法将子列表合并为有序列表。

您将要实现的并发解决方案有三个类:

  • 扩展CountedCompleter类并实现执行合并排序算法的任务的MergeSortTask
  • 启动第一个任务的ConcurrentMergeSort任务
  • ConcurrentMetaData类,提供main()方法来测试合并排序算法的并发版本

合并任务类

正如我们前面提到的,这个类实现了将要执行合并排序算法的任务。此类使用以下属性:

  • 我们要排序的数据数组
  • 任务必须排序的数组的起始位置和结束位置

该类还具有用于初始化其参数的构造函数:

public class MergeSortTask extends CountedCompleter<Void> {

    private Comparable[] data;
    private int start, end;
    private int middle;

    public MergeSortTask(Comparable[] data, int start, int end,
            MergeSortTask parent) {
        super(parent);

        this.data = data;
        this.start = start;
        this.end = end;
    }

compute()方法,如果开始索引和结束索引之间的差异大于或等于1024,我们将任务拆分为两个子任务,以处理原始集合的两个子集。两个任务都使用fork()方法以异步方式向ForkJoinPool发送任务。否则,我们执行SerialMergeSorg.mergeSort()对数组的部分(包含1024或更少的元素)进行排序,然后调用tryComplete()方法。当子任务完成执行后,此方法将在内部调用onCompletion()方法。请看下面的代码:

    @Override
    public void compute() {
        if (end - start >= 1024) {
                middle = (end+start)>>>1;
            MergeSortTask task1 = new MergeSortTask(data, start, middle, this);
            MergeSortTask task2 = new MergeSortTask(data, middle, end, this);
            addToPendingCount(1);
            task1.fork();
            task2.fork();
        } else {
            new SerialMergeSort().mergeSort(data, start, end);
            tryComplete();
        }

在本例中,我们将使用onCompletion()方法进行合并和排序操作以获得排序列表。一旦任务完成了onCompletion()方法的执行,它就会通过其父任务调用tryComplete()来尝试完成该任务。onCompletion() 方法的源代码与算法串行版本的merge()方法非常相似。请参阅以下代码:

    @Override
    public void onCompletion(CountedCompleter<?> caller) {
        if (middle==0) {
            return;
        }
        int length = end - start + 1;
        Comparable tmp[] = new Comparable[length];
        int i, j, index;
        i = start;
        j = middle;
        index = 0;
        while ((i < middle) && (j < end)) {
            if (data[i].compareTo(data[j]) <= 0) {
                tmp[index] = data[i];
                i++;
            } else {
                tmp[index] = data[j];
                j++;
            }
            index++;
        }
        while (i < middle) {
            tmp[index] = data[i];
            i++;
            index++;
        }
        while (j < end) {
            tmp[index] = data[j];
            j++;
            index++;
        }
        for (index = 0; index < (end - start); index++) {
            data[index + start] = tmp[index];
        }

    }

ConcurrentMergeSort 类

在并发版本中,这个类非常简单。它实现了mergeSort()方法,该方法接收要排序的数据数组,并将开始索引(始终为 0)和结束索引(始终为数组的长度)作为参数对数组进行排序。我们选择维护相同的接口,而不是串行版本。

该方法创建一个新的MergeSortTask,使用invoke()方法将其发送到默认的ForkJoinPool,该方法在任务完成执行并对数组进行排序时返回。

public class ConcurrentMergeSort {

    public void mergeSort (Comparable data[], int start, int end) {

        MergeSortTask task=new MergeSortTask(data, start, end,null);
        ForkJoinPool.commonPool().invoke(task);

    }
}

ConcurrentMetaData 类

ConcurrentMetaData类提供main()方法来测试合并排序算法的并发版本。在我们的例子中,代码与SerialMetaData类的代码相同,但是使用了类的并发版本和Arrays.parallelSort()方法,而不是Arrays.sort()方法,所以我们不包括类的源代码。

比较两个版本

我们已经执行了合并排序算法的串行和并发版本,并将它们之间的执行时间与Arrays.sort()Arrays.parallelSort()方法进行了比较。我们已经使用 JMH 框架(执行了四个版本 http://openjdk.java.net/projects/code-tools/jmh/ 允许您在 Java 中实现微基准测试。使用基准测试框架是一个更好的解决方案,它使用currentTimeMillis()nanoTime()等方法简单地测量时间。我们已经在一台四核处理器的计算机上执行了 10 次,并计算了这 10 次的中间执行时间。这些是我们在使用 542184 个对象对数据集进行排序时获得的执行时间(毫秒):

|   |

Arrays.sort()

|

串行合并排序

|

Arrays.parallelSort()

|

并发合并排序

| | --- | --- | --- | --- | --- | | 执行时间(毫秒) | 561.324 | 711.004 | 261.418 | 353.846 |

我们可以得出以下结论:

  • 方法Arrays.parallelSort()获得最佳结果。对于串行算法,Arrays.sort()方法比我们的实现获得更好的执行时间。
  • 对于我们的实现,该算法的并发版本比串行版本具有更好的性能。

我们可以使用以下速度比较合并排序算法的串行和并发版本:

Comparing the two versions

在本章的三个示例中,我们使用了构成 Fork/Join 框架的类的许多方法,但您还需要了解其他有趣的方法。

我们使用了ForkJoinPool类中的execute()invoke()方法将任务发送到池中。我们可以使用另一个名为submit()的方法。它们之间的主要区别在于execute()方法将任务发送到ForkJoinPool并立即返回一个无效值;invoke()方法将任务发送到ForkJoinPool并在任务执行完毕后返回;submit()方法将任务发送到ForkJoinPool并立即返回一个Future对象来控制任务的状态并获取其结果。

在本章的所有示例中,我们都使用了基于ForkJoinTask类的类,但是您可以使用基于RunnableCallable接口的ForkJoinPool任务。为此,您可以使用方法submit(),该方法的版本接受Runnable对象、带有结果的Runnable对象和Callable对象。

ForkJoinTask类提供方法get(long timeout, TimeUnit unit)获取任务返回的结果。此方法等待参数中为任务结果指定的时间段。如果任务在该时间段之前完成执行,则该方法返回结果。否则,它会抛出一个TimeoutException异常。

ForkJoinTask提供了invoke()方法的替代方法。这是quietlyInvoke()方法。这两个版本的主要区别在于invoke()方法返回任务执行的结果,或者在必要时抛出任何异常。quietlyInvoke()方法不会返回任务的结果,也不会引发任何异常。这与示例中使用的quietlyJoin()方法类似。

分治设计技术是解决各种问题的一种非常流行的方法。你把原来的问题分成几个小问题,然后把这些问题分成几个小问题,直到我们有足够的简单问题来直接解决它。在版本 7 中,Java 并发 API 引入了一种针对此类问题优化的特殊类型的Executor。这是 Fork/Join 框架。它基于以下两个操作:

  • fork:这允许您创建一个新的子任务
  • 加入:这允许您等待子任务的完成并获取其结果

使用这些操作,Fork/Join 任务具有以下外观:

if ( problem.size() > DEFAULT_SIZE) {
    childTask1=new Task();
    childTask2=new Task();
    childTask1.fork();
    childTask2.fork();
    childTaskResults1=childTask1.join();
    childTaskResults2=childTask2.join();
    taskResults=makeResults(childTaskResults1, childTaskResults2);
    return taskResults;
} else {
    taskResults=solveBasicProblem();
    return taskResults;
}

在本章中,您使用 Fork/Join 框架解决了三个不同的问题,例如 k-means 聚类算法、数据过滤算法和合并排序算法。

您已经使用了 API 提供的默认ForkJoinPool(这是 Java 8 版本的一个新特性),并创建了一个新的ForkJoinPool对象。您还使用了三种类型的ForkJoinTask``s

  • RecursiveAction类,用作不返回结果的ForkJoinTasks类的基类。
  • RecursiveTask类,用作返回结果的ForkJoinTasks类的基类。
  • CountedCompleter类,在 Java 8 中引入,用作ForkJoinTasks的基类,当其所有子任务完成执行时,需要执行一个方法或启动另一个任务。

在下一章中,您将学习如何使用 MapReduce 编程技术,使用新的 Java 8并行流来获得处理超大数据集的最佳性能。

教程来源于Github,感谢apachecn大佬的无私奉献,致敬!

技术教程推荐

JavaScript核心原理解析 -〔周爱民〕

现代C++编程实战 -〔吴咏炜〕

互联网人的英语私教课 -〔陈亦峰〕

Go 并发编程实战课 -〔晁岳攀(鸟窝)〕

Python自动化办公实战课 -〔尹会生〕

零基础入门Spark -〔吴磊〕

高并发系统实战课 -〔徐长龙〕

零基础学Python(2023版) -〔尹会生〕

手把手带你写一个 MiniTomcat -〔郭屹〕