Fork/Join

fork/join 框架是ExecutorServiceinterface的实现,可帮助您利用多个处理器。它是为可以递归分解为较小部分的工作而设计的。目标是使用所有可用的处理能力来增强应用程序的性能。

与任何ExecutorService实现一样,fork/join 框架将任务分配给线程池中的工作线程。 fork/join 框架与众不同,因为它使用工作窃取算法。工作用尽的工作线程可以从其他仍很忙的线程中窃取任务。

fork/join 框架的中心是ForkJoinPool类,是AbstractExecutorService类的扩展。 ForkJoinPool实现了核心的工作窃取算法,并且可以执行ForkJoinTask个进程。

Basic Use

使用 fork/join 框架的第一步是编写执行部分工作的代码。您的代码应类似于以下伪代码:

if (my portion of the work is small enough)
  do the work directly
else
  split my work into two pieces
  invoke the two pieces and wait for the results

将此代码包装在ForkJoinTask子类中,通常使用其更专门的类型之一,即RecursiveTask(可以返回结果)或RecursiveAction

ForkJoinTask子类准备就绪后,创建代表所有要完成的工作的对象,并将其传递给ForkJoinPool实例的invoke()方法。

清晰度模糊

为了帮助您了解 fork/join 框架的工作原理,请考虑以下示例。假设您要模糊图像。原始源图像由整数数组表示,其中每个整数都包含单个像素的颜色值。模糊的目标图像还由与源大小相同的整数数组表示。

执行模糊处理的方法是一次遍历源阵列一次一个像素。将每个像素与其周围的像素进行平均(对红色,绿色和蓝色分量进行平均),然后将结果放置在目标数组中。由于图像是大阵列,因此此过程可能需要很 Long 时间。通过使用 fork/join 框架实现算法,可以在 multiprocessing 器系统上利用并发处理的优势。这是一种可能的实现:

public class ForkBlur extends RecursiveAction {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
  
    // Processing window size; should be odd.
    private int mBlurWidth = 15;
  
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0),
                                    mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float)((pixel & 0x00ff0000) >> 16)
                      / mBlurWidth;
                gt += (float)((pixel & 0x0000ff00) >>  8)
                      / mBlurWidth;
                bt += (float)((pixel & 0x000000ff) >>  0)
                      / mBlurWidth;
            }
          
            // Reassemble destination pixel.
            int dpixel = (0xff000000     ) |
                   (((int)rt) << 16) |
                   (((int)gt) <<  8) |
                   (((int)bt) <<  0);
            mDestination[index] = dpixel;
        }
    }
  
  ...

现在,您可以实现抽象的compute()方法,该方法可以直接执行模糊处理,也可以将其拆分为两个较小的任务。简单的阵列 Long 度阈值有助于确定工作是执行还是拆分。

protected static int sThreshold = 100000;

protected void compute() {
    if (mLength < sThreshold) {
        computeDirectly();
        return;
    }
    
    int split = mLength / 2;
    
    invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
              new ForkBlur(mSource, mStart + split, mLength - split,
                           mDestination));
}

如果以前的方法在RecursiveAction类的子类中,则将任务设置为在ForkJoinPool中运行很简单,并且涉及以下步骤:

  • 创建一个代表所有要完成工作的任务。
// source image pixels are in src
// destination image pixels are in dst
ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
  • 创建将运行任务的ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
  • 运行任务。
pool.invoke(fb);

有关完整的源代码,包括一些用于创建目标图像文件的其他代码,请参见ForkBlur示例。

Standard Implementations

除了使用 fork/join 框架为要在 multiprocessing 器系统上同时执行的任务实现自定义算法(例如上一节中的ForkBlur.java示例)之外,Java SE 中还有一些通常有用的功能,这些功能已经使用 fork /加入框架。 java.util.Arrays类将其 Java_SE 8 中引入的一种实现方式用于parallelSort()方法。这些方法类似于sort(),但是通过 fork/join 框架利用并发性。在 multiprocessing 器系统上运行时,大型数组的并行排序比 Sequences 排序要快。但是,这些方法如何精确地使用 fork/join 框架超出了 Java 教程的范围。有关此信息,请参阅 Java API 文档。

java.util.streams包中的方法使用 fork/join 框架的另一种实现,该包是为 Java SE 8 版本计划的Project Lambda的一部分。有关更多信息,请参见Lambda Expressions部分。