www.zhblog.net

Java 非阻塞式通信中需要了解的概念:Future

Netty 中 ChannelFuture 和 Promise 都是继承 java.util.concurrent.Future,所以有必要好好总结一下。


1.Future 的介绍、用途

简而言之,Future表示异步计算的将来结果–在处理完成后最终会出现在Future中。

耗时间运行的方法非常适合异步处理和Future接口。这使我们能够在等待Future中封装的任务完成之前执行其他一些操作。

可以利用Future的异步特性的一些操作: 

密集型计算过程(数学和科学计算) 

处理大数据结构(或大数据) 

远程方法调用(下载文件,HTML抓取,Web服务)


2.FutureTask

下面的示例,我们将创建一个非常简单的类来计算Integer的平方。这绝对不适合“耗时操作”的方法类别,但是我们将对其进行Thread.sleep()调用,使其持续1秒才能完成:

public class SquareCalculator {    

     

    private ExecutorService executor 

      = Executors.newSingleThreadExecutor();

     

    public Future<Integer> calculate(Integer input) {        

        return executor.submit(() -> {

            Thread.sleep(1000);

            return input * input;

        });

    }

}


实际执行计算的部分代码包含在call()方法中,以lambda表达式形式提供。代码很简单,除了前面提到的sleep()调用外,没有什么特别的。

Callable是表示任务的接口,该接口返回结果并具有单一call()方法。在这里,我们使用lambda表达式创建了它的实例。

创建Callable实例后,我们必须将此实例传递给执行者,该执行者将负责在新线程中启动该任务并把宝贵的Future对象返回给我们。那就是ExecutorService的任务。

有几种方法可以获取ExecutorService实例,其中大多数是由实用程序类Executors的静态工厂方法提供的。在此示例中,我们使用了基本的newSingleThreadExecutor(),它为我们提供了一个ExecutorService,它使用单线程处理任务。

一旦有了ExecutorService对象,我们只需要调用Submit()并传递Callable作为参数即可。 Submit()将负责启动任务并返回FutureTask对象,该对象是Future接口的实现。


3.使用isDone()和get()从Future中获得结果

现在我们需要调用calculate()并使用返回的Future来获取结果Integer。 Future API中的两种方法将帮助我们完成此任务。

Future.isDone()告诉我们执行者是否已完成任务的处理。如果任务完成,则返回true,否则返回false。

从计算中返回实际结果的方法是Future.get()。请注意,该方法将阻塞直到任务完成为止,但是在我们的示例中,这不会成为问题,因为我们将通过调用isDone()首先检查任务是否完成。

通过使用这两个方法,我们可以在等待主要任务完成时运行其他代码:

Future<Integer> future = new SquareCalculator().calculate(10);

 

while(!future.isDone()) {

    System.out.println("Calculating...");

    Thread.sleep(300);

}

 

Integer result = future.get();


在这个例子中,我们在输出了一条简单的消息,让用户知道程序正在执行计算。

方法get()将阻塞执行,直到任务完成。但是我们不必担心,因为我们的示例仅在确保任务完成之后才调用get()。因此,在这种情况下,future.get()将始终立即返回。

另外,get()具有一个重载版本,该版本可以指定一个TimeUnit作为参数, 表示超时时间:

Integer result = future.get(500, TimeUnit.MILLISECONDS);


get(long,TimeUnit)和get()之间的区别在于,如果任务在指定的超时期限之前未返回,则前者将引发TimeoutException。


4.取消Future

假设我们已经触发了任务,但是由于某种原因,我们不再关心结果了。我们可以使用Future.cancel(boolean)告诉执行程序停止操作并中断其基础线程:

Future<Integer> future = new SquareCalculator().calculate(4);

boolean canceled = future.cancel(true);


上面代码中的Future实例永远无法完成其操作。实际上,如果尝试从该实例调用cancel()之后调用get(),结果将是CancellationException。 Future.isCancelled()会告诉我们Future是否已被取消。这对于避免出现CancellationException非常有用。

对cancel()的调用可能失败。在这种情况下,其返回值将为false。请注意,cancel()将布尔值作为参数–这控制执行此任务的线程是否应该被中断。


5.线程池多线程并发

我们当前的ExecutorService是单线程的,因为它是通过Executors.newSingleThreadExecutor获得的。为了突显这种“单线程”,让我们同时触发两个计算:

SquareCalculator squareCalculator = new SquareCalculator();

 

Future<Integer> future1 = squareCalculator.calculate(10);

Future<Integer> future2 = squareCalculator.calculate(100);

 

while (!(future1.isDone() && future2.isDone())) {

    System.out.println(

      String.format(

        "future1 is %s and future2 is %s", 

        future1.isDone() ? "done" : "not done", 

        future2.isDone() ? "done" : "not done"

      )

    );

    Thread.sleep(300);

}

 

Integer result1 = future1.get();

Integer result2 = future2.get();

 

System.out.println(result1 + " and " + result2);

 

squareCalculator.shutdown();


现在,让我们分析一下此代码的输出:

calculating square for: 10

future1 is not done and future2 is not done

future1 is not done and future2 is not done

future1 is not done and future2 is not done

future1 is not done and future2 is not done

calculating square for: 100

future1 is done and future2 is not done

future1 is done and future2 is not done

future1 is done and future2 is not done

100 and 10000


显然,该过程不是并行的。请注意,第二个任务仅在第一个任务完成后才开始,因此整个过程大约需要2秒钟才能完成。

为了使我们的程序真正成为多线程,我们应该使用另一种风格的ExecutorService。让我们看看如果使用工厂方法Executors.newFixedThreadPool()提供的线程池,示例的行为将如何改变:

public class SquareCalculator {

  

    private ExecutorService executor = Executors.newFixedThreadPool(2);

     

    //...

}


通过对SquareCalculator类进行简单的更改,现在我们有了一个执行程序,该执行程序可以同时使用2个线程。

如果再次运行完全相同的客户端代码,将得到以下输出:

calculating square for: 10

calculating square for: 100

future1 is not done and future2 is not done

future1 is not done and future2 is not done

future1 is not done and future2 is not done

future1 is not done and future2 is not done

100 and 10000


这两个任务是如何同时开始和完成运行的,整个过程大约需要1秒钟就能完成。


还有其他一些工厂方法可用于创建线程池,例如Executors.newCachedThreadPool()在可用的情况下重用线程,以及Executors.newScheduledThreadPool()安排任务在给定延迟后运行。


总结:

我们对Future接口进行了基本的介绍,并访问了其有用的方法。还学习了如何利用线程池的功能来触发多个并行操作。


 

展开阅读全文

评论

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 心情