2014/06/27

java Fork/Join

★The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application.

As with any ExecutorService implementation, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy.
===>それはいいね!!



★例:

package testfork;

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class MaximumFinder extends RecursiveTask {

    private static final int SEQUENTIAL_THRESHOLD = 100;

    private final int[] data;
    private final int start;
    private final int end;

    public MaximumFinder(int[] data, int start, int end) {
        this.data = data;
        this.start = start;
        this.end = end;
    }

    public MaximumFinder(int[] data) {
        this(data, 0, data.length);
    }

    private Integer computeDirectly() {
        System.out.println(Thread.currentThread() + " computing: " + start
                + " to " + end);
        int max = Integer.MIN_VALUE;
        for (int i = start; i < end; i++) {
            if (data[i] > max) {
                max = data[i];
            }
        }
        return max;
    }

    @Override
    protected Object compute() {
        final int length = end - start;
        if (length < SEQUENTIAL_THRESHOLD) {
            return computeDirectly();
        }
        final int split = length / 2;
        final MaximumFinder left = new MaximumFinder(data, start, start + split);
        left.fork();
        final MaximumFinder right = new MaximumFinder(data, start + split, end);
        return Math.max((Integer) right.compute(), (Integer) left.join());
    }

    public static void main(String[] args) {
        // create a random data set
        final int[] data = new int[100000];
        final Random random = new Random();

        for (int i = 0; i < data.length; i++) {
            data[i] = random.nextInt(100);
        }

        // submit the task to the pool
        final ForkJoinPool pool = new ForkJoinPool(4);
        final MaximumFinder finder = new MaximumFinder(data);
        System.out.println(pool.invoke(finder));
       
    }
}


★ForkJoinPoolに入れるタスクはForkJoinTask型であるが、実際にはRecursiveTaskかRecursiveActionのいずれかから派生するのが便利である。
RecursiveTaskは戻り値のあるタスク、RecursiveActionは戻り値が無いタスクを定義するのに使う。

★Callable, RunnableをForkJoinTaskに変換するためのForkJoinTask.adapt()メソッドを使うこともできる


ForkJoinPool pool = new ForkJoinPool(
 Runtime.getRuntime().availableProcessors(),
 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
 null,
true); // タスクの積み上げにFIFOモードを使用する.(通常はSTACKモード)