Java Fork 任务为多个小任务以提升任务性能

在执行一个大任务时,可以将其 fork 为多个小任务,最后 join 得到结果。 fork 任务主要是要设置一个阀值 THRESHOLD,不宜过小,也不宜过大,否则起不到提升性能的作用。

package com.learn.corejava.threading;



import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.RecursiveTask;

import java.util.function.DoublePredicate;



public class ForkJoinMain {

    public static void main(String[] args) {

        final int SIZE = 10000000;

        double[] numbers = new double[SIZE];

        for (int i = 0; i < SIZE; i++) {

            numbers[i] = Math.random();

        }



        Countr countr = new Countr(numbers, 0, numbers.length, x -> x > 0.5);

        ForkJoinPool forkJoinPool = new ForkJoinPool();

        forkJoinPool.invoke(countr);

        System.out.println(countr.join());

    }

}



class Countr extends RecursiveTask<Integer> {

    public static final int THRESHOLD = 1000;

    private double[] values;

    private int from;

    private int to;

    private DoublePredicate filter;



    public Countr(double[] values, int from, int to, DoublePredicate filter) {

        this.values = values;

        this.from = from;

        this.to = to;

        this.filter = filter;

    }



    @Override

    protected Integer compute() {

        if (to - from < THRESHOLD) {

            int count = 0;

            for (int i = from; i < to; i++) {

                if (filter.test(values[i])) {

                    count++;

                }

            }

            return count;

        } else {

            int mid = (from + to) / 2;

            Countr first = new Countr(values, from, mid, filter);

            Countr second = new Countr(values, mid, to, filter);

            invokeAll(first, second);   // 其它小的任务也提交到池

            return first.join() + second.join();

        }

    }

}


 

 

展开阅读全文