Przewodnik po java.util.concurrent.Future

1. Przegląd

W tym artykule dowiemy się o przyszłości . Interfejs, który istnieje od wersji Java 1.5 i może być bardzo przydatny podczas pracy z wywołaniami asynchronicznymi i współbieżnym przetwarzaniem.

2. Tworzenie futures

Mówiąc najprościej, klasa Future reprezentuje przyszły wynik obliczeń asynchronicznych - wynik, który ostatecznie pojawi się w przyszłości po zakończeniu przetwarzania.

Zobaczmy, jak napisać metody, które tworzą i zwracają instancję Future .

Długotrwałe metody są dobrymi kandydatami do przetwarzania asynchronicznego i interfejsu Future . To pozwala nam wykonać inny proces, czekając na zakończenie zadania zawartego w Future .

Oto kilka przykładów operacji, które mogłyby wykorzystać asynchroniczny charakter Future :

  • procesy intensywne obliczeniowo (obliczenia matematyczne i naukowe)
  • manipulowanie dużymi strukturami danych (big data)
  • zdalne wywołania metod (pobieranie plików, wycinanie HTML, usługi sieciowe).

2.1. Wdrażanie kontraktów futures z FutureTask

W naszym przykładzie utworzymy bardzo prostą klasę, która oblicza kwadrat liczby całkowitej . To zdecydowanie nie pasuje do kategorii „długotrwałych” metod, ale zamierzamy wywołać Thread.sleep () , aby trwało 1 sekundę:

public class SquareCalculator { private ExecutorService executor = Executors.newSingleThreadExecutor(); public Future calculate(Integer input) { return executor.submit(() -> { Thread.sleep(1000); return input * input; }); } }

Bit kodu, który faktycznie wykonuje obliczenia, jest zawarty w metodzie call () , dostarczonej jako wyrażenie lambda. Jak widać, nie ma w tym nic specjalnego, poza wspomnianym wcześniej wywołaniem sleep () .

Ciekawie robi się, gdy skierujemy naszą uwagę na użycie Callable i ExecutorService .

Callable to interfejs reprezentujący zadanie, które zwraca wynik i ma jedną metodę call () . Tutaj utworzyliśmy jego instancję przy użyciu wyrażenia lambda.

Utworzenie instancji Callable nie prowadzi nas nigdzie, nadal musimy przekazać tę instancję do executora, który zajmie się uruchomieniem tego zadania w nowym wątku i zwróci nam wartościowy obiekt Future . W tym miejscu pojawia się ExecutorService .

Istnieje kilka sposobów na uzyskanie instancji ExecutorService , większość z nich jest udostępniana przez statyczne metody fabryki klasy użytkowej Executorów . W tym przykładzie użyliśmy podstawowej metody newSingleThreadExecutor () , która daje nam usługę ExecutorService zdolną do obsługi pojedynczego wątku naraz.

Gdy już mamy obiekt ExecutorService , musimy po prostu wywołać funkcję submit (), przekazując naszą Callable jako argument. submit () zajmie się uruchomieniem zadania i zwróci obiekt FutureTask , który jest implementacją interfejsu Future .

3. Konsumpcja kontraktów futures

Do tego momentu nauczyliśmy się, jak tworzyć instancję Future .

W tej sekcji nauczymy się, jak pracować z tą instancją, badając wszystkie metody, które są częścią interfejsu API Future .

3.1. Użycie isDone () i get () w celu uzyskania wyników

Teraz musimy wywołać funkcję kalkulator () i użyć zwróconej przyszłości, aby uzyskać wynikową liczbę całkowitą . Pomogą nam w tym dwie metody z Future API.

Future.isDone () informuje nas, czy executor zakończył przetwarzanie zadania. Jeśli zadanie zostanie zakończone, zwróci true, w przeciwnym razie zwróci false .

Metodą zwracającą rzeczywisty wynik obliczeń jest Future.get () . Zauważ, że ta metoda blokuje wykonanie do momentu zakończenia zadania, ale w naszym przykładzie nie będzie to problemem, ponieważ najpierw sprawdzimy, czy zadanie zostało ukończone, wywołując isDone () .

Korzystając z tych dwóch metod, możemy uruchomić inny kod, czekając na zakończenie głównego zadania:

Future future = new SquareCalculator().calculate(10); while(!future.isDone()) { System.out.println("Calculating..."); Thread.sleep(300); } Integer result = future.get();

W tym przykładzie na wyjściu napiszemy prosty komunikat, aby poinformować użytkownika, że ​​program wykonuje obliczenia.

Metoda get () zablokuje wykonanie do czasu zakończenia zadania. Ale nie musimy się tym martwić, ponieważ w naszym przykładzie dochodzi się do punktu, w którym metoda get () jest wywoływana po upewnieniu się, że zadanie zostało zakończone. Tak więc w tym scenariuszu future.get () zawsze zwróci natychmiast.

Warto wspomnieć, że get () ma przeciążoną wersję, która pobiera limit czasu i TimeUnit jako argumenty:

Integer result = future.get(500, TimeUnit.MILLISECONDS);

Różnica między get (long, TimeUnit) i get () polega na tym, że ta pierwsza zgłosi wyjątek TimeoutException, jeśli zadanie nie zwróci się przed upływem określonego limitu czasu.

3.2. Anulowanie przyszłości z anuluj ()

Załóżmy, że uruchomiliśmy zadanie, ale z jakiegoś powodu nie dbamy już o wynik. Możemy użyć Future.cancel (boolean), aby nakazać executorowi zatrzymanie operacji i przerwanie wątku bazowego:

Future future = new SquareCalculator().calculate(4); boolean canceled = future.cancel(true);

Nasza instancja Future z powyższego kodu nigdy nie zakończyłaby swojego działania. W rzeczywistości, jeśli spróbujemy wywołać metodę get () z tej instancji, po wywołaniu funkcji cancel () , wynikiem będzie wyjątek CancellationException . Future.isCancelled () powie nam, czy Future została już anulowana. Może to być bardzo przydatne, aby uniknąć uzyskania wyjątku CancellationException .

Możliwe, że wywołanie cancel () nie powiedzie się. W takim przypadku jego zwrócona wartość będzie fałszywa . Zauważ, że cancel () przyjmuje wartość logiczną jako argument - to kontroluje, czy wątek wykonujący to zadanie powinien zostać przerwany, czy nie.

4. Więcej Wielowątkowość Z wątku Baseny

Nasza bieżąca usługa ExecutorService jest jednowątkowa, ponieważ została uzyskana za pomocą Executors.newSingleThreadExecutor. Aby podkreślić tę „pojedynczą nitkę”, uruchommy jednocześnie dwa obliczenia:

SquareCalculator squareCalculator = new SquareCalculator(); Future future1 = squareCalculator.calculate(10); Future future2 = squareCalculator.calculate(100); while (!(future1.isDone() && future2.isDone())) { System.out.println( String.format( "future1 is %s and future2 is %s", future1.isDone() ? "done" : "not done", future2.isDone() ? "done" : "not done" ) ); Thread.sleep(300); } Integer result1 = future1.get(); Integer result2 = future2.get(); System.out.println(result1 + " and " + result2); squareCalculator.shutdown();

Teraz przeanalizujmy dane wyjściowe dla tego kodu:

calculating square for: 10 future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done calculating square for: 100 future1 is done and future2 is not done future1 is done and future2 is not done future1 is done and future2 is not done 100 and 10000

It is clear that the process is not parallel. Notice how the second task only starts once the first task is completed, making the whole process take around 2 seconds to finish.

To make our program really multi-threaded we should use a different flavor of ExecutorService. Let's see how the behavior of our example changes if we use a thread pool, provided by the factory method Executors.newFixedThreadPool():

public class SquareCalculator { private ExecutorService executor = Executors.newFixedThreadPool(2); //... }

With a simple change in our SquareCalculator class now we have an executor which is able to use 2 simultaneous threads.

If we run the exact same client code again, we'll get the following output:

calculating square for: 10 calculating square for: 100 future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done 100 and 10000

This is looking much better now. Notice how the 2 tasks start and finish running simultaneously, and the whole process takes around 1 second to complete.

There are other factory methods that can be used to create thread pools, like Executors.newCachedThreadPool() that reuses previously used Threads when they are available, and Executors.newScheduledThreadPool() which schedules commands to run after a given delay.

For more information about ExecutorService, read our article dedicated to the subject.

5. Overview of ForkJoinTask

ForkJoinTask is an abstract class which implements Future and is capable of running a large number of tasks hosted by a small number of actual threads in ForkJoinPool.

In this section, we are going to quickly cover the main characteristics of ForkJoinPool. For a comprehensive guide about the topic, check our Guide to the Fork/Join Framework in Java.

Then the main characteristic of a ForkJoinTask is that it usually will spawn new subtasks as part of the work required to complete its main task. It generates new tasks by calling fork() and it gathers all results with join(), thus the name of the class.

There are two abstract classes that implement ForkJoinTask: RecursiveTask which returns a value upon completion, and RecursiveAction which doesn't return anything. As the names imply, those classes are to be used for recursive tasks, like for example file-system navigation or complex mathematical computation.

Let's expand our previous example to create a class that, given an Integer, will calculate the sum squares for all its factorial elements. So, for instance, if we pass the number 4 to our calculator, we should get the result from the sum of 4² + 3² + 2² + 1² which is 30.

First of all, we need to create a concrete implementation of RecursiveTask and implement its compute() method. This is where we'll write our business logic:

public class FactorialSquareCalculator extends RecursiveTask { private Integer n; public FactorialSquareCalculator(Integer n) { this.n = n; } @Override protected Integer compute() { if (n <= 1) { return n; } FactorialSquareCalculator calculator = new FactorialSquareCalculator(n - 1); calculator.fork(); return n * n + calculator.join(); } }

Notice how we achieve recursiveness by creating a new instance of FactorialSquareCalculator within compute(). By calling fork(), a non-blocking method, we ask ForkJoinPool to initiate the execution of this subtask.

The join() method will return the result from that calculation, to which we add the square of the number we are currently visiting.

Now we just need to create a ForkJoinPool to handle the execution and thread management:

ForkJoinPool forkJoinPool = new ForkJoinPool(); FactorialSquareCalculator calculator = new FactorialSquareCalculator(10); forkJoinPool.execute(calculator);

6. Conclusion

In this article, we had a comprehensive view of the Future interface, visiting all its methods. We've also learned how to leverage the power of thread pools to trigger multiple parallel operations. The main methods from the ForkJoinTask class, fork() and join() were briefly covered as well.

We have many other great articles on parallel and asynchronous operations in Java. Here are three of them that are closely related to the Future interface (some of them are already mentioned in the article):

  • Guide to CompletableFuture – an implementation of Future with many extra features introduced in Java 8
  • Przewodnik po strukturze Fork / Join w Javie - więcej o ForkJoinTask omówiliśmy w rozdziale 5
  • Przewodnik po Java ExecutorService - poświęcony interfejsowi ExecutorService

Sprawdź kod źródłowy użyty w tym artykule w naszym repozytorium GitHub.