1. Course summary

Following you find the summary of this course:

This is only a short excerpt - you are strongly advised to buy this course for the full information.

2. Thread Pools

Das Arbeiten mit Threads ist kompliziert unf fehleranfällig. Es gibt daher die Möglichkeit Java die Arbeit mit den Threads zu überlassen und sich auf die zu erfüllende aufgabe zu konzentrieren.

Problems of Thread Manipulation
  • Availability: too many threads → out-of-memory exception

  • Costs: the creating of threads is "expensive"

→ Thread Pools

A Thread Pool is a pool of threads called worker threads.

Thread of this pool can be used and afterwards they are returned to the thread pool, where they are ready for new usage.

0500 thread pool

Java takes care of using the threads for executing the threads. When not enough threads in the pool, the tasks are waiting in a task queue.

3. Executors

ThreadPoolExecutor is a implementation of Interface ExecutorService

There are also

  • ScheduledThreadPoolExecutor and

  • ForkJoinPool

When we create a threadPool, there are several static methods available:

0510 new fixed thread pool

creating a ThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorsDemo {

  public static void show() {
    final ExecutorService executor = Executors.newFixedThreadPool(2);
    System.out.println(executor.getClass().getName());
  }
}

Output:

java.util.concurrent.ThreadPoolExecutor
first run of a thread
public class ExecutorsDemo {

  public static void show() {
    final ExecutorService executor = Executors.newFixedThreadPool(2);
    executor.submit(() -> {
      System.out.println(Thread.currentThread().getName());
    });
  }
}

Output:

pool-1-thread-1
0515 seqd first run thread

Now we run our first thread of the thread pool. We didn’t have to start the thread, we only provided the task as lambda expression.

Now we have 10 tasks but only to threads in the thread pool
public class ExecutorsDemo {

  public static void show() {
    final ExecutorService executor = Executors.newFixedThreadPool(2);
    for (int i = 0; i < 10; i++) {
      executor.submit(() -> {
        System.out.println(Thread.currentThread().getName());
      });
    }
  }
}

Output:

pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2

Internally exists a task queue, which processes all 10 tasks with the two threads of the thread pool.

The program is still running, because the tread pool is awaiting new tasks. We have to use executor.shutdown()
public class ExecutorsDemo {

  public static void show() {
    final ExecutorService executor = Executors.newFixedThreadPool(2);
      executor.submit(() -> {
        System.out.println(Thread.currentThread().getName());
      });

      executor.shutdown();
  }
}

Output:

pool-1-thread-1

Process finished with exit code 0 (1)
1 Now, the program is finished.
But now is a problem. When an error occurs, the executor service (thread pool) will not be ended. So it is best tu place the shutdown()-method into a finally block.
public class ExecutorsDemo {

  public static void show() {
    final ExecutorService executor = Executors.newFixedThreadPool(2);
    try {
      executor.submit(() -> {
        System.out.println(Thread.currentThread().getName());
      });
    } finally {
      executor.shutdown(); (1)
    }
  }
}
1 Now the shutdown()-method will be executed even when there occurs an error.

4. Callables and Futures

0520 callable

The tasks are normally of type Runnable, but when you need a return-value ie an record of a database, the type is CallableInterface Callable<V>

Callable is very similar to Runnable, but this task returns a value. Instead of a method run() it has a method call() which returns a value.

0530 call method

public class ExecutorsDemo {

  public static void show() {
    final ExecutorService executor = Executors.newFixedThreadPool(2);
    try {
      Future<Integer> future = executor.submit(() -> { (1)
        System.out.println(Thread.currentThread().getName());
        return 1;  (2)
      });
    } finally {
      executor.shutdown();
    }
  }
}
1 The variable is from type Future<Integer>, because the result will available in the future, even this are only milliseconds.
2 The return value of the task.
LongClass for simulating a long lasting task
package at.htl.multithreading.executors;

public class LongTask {
  public static void simulate() {
    try {
      Thread.sleep(3000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}
public class ExecutorsDemo {

  public static void show() {
    final ExecutorService executor = Executors.newFixedThreadPool(2);
    try {
      Future<Integer> future = executor.submit(() -> {
        LongTask.simulate();  (1)
        return 1;
      });
      System.out.println("Do more work");
      System.out.println(future);  (2)
      int result = future.get();   (3)
      System.out.println(result);
      System.out.println(future);  (4)
    } catch (InterruptedException | ExecutionException e) {  (5)
      e.printStackTrace();
    } finally {
      executor.shutdown();
    }
  }
}

Output:

Do more work
java.util.concurrent.FutureTask@6e8dacdf[Not completed, task = at.htl.multithreading.executors.ExecutorsDemo$$Lambda$14/0x0000000800066840@1e643faf]
1
java.util.concurrent.FutureTask@6e8dacdf[Completed normally]

Process finished with exit code 0
1 First, we simulate a long run of the task.
2 Now immediately the future-Variable will be printed to the console
3 The get() method is blocking for 3 seconds, until the task will be finished. 1 is the result.
4 Printing the future-variable again, shows that the task has completed normally
5 For the get-method you need InterruptedException and ExecutionException

There are also another methods in the Future-class:

0540 future

5. Asynchronous Programming

asynchronous \(\cong\) non-blocking

The get()-method of a future is blocking and therefore a synchronous code.

Man kann sich als Analogie für ein asynchrones Programm ein Restaurant vorstellen, in dem der Keller auch mehrere Gäste hintereinander bedienen kann, ohne auf die Zubereitung des Essens für jeden Gast zu warten.

6. CompletableFutures

0550 completable future

6.1. Creating a CompletableFuture - Object

0560 completable future methods

public class CompletableFuturesDemo {
  public static void show() {
    Runnable task = () -> System.out.println("a");
    final CompletableFuture<Void> future = CompletableFuture.runAsync(task);
  }
}

Output:

a

Process finished with exit code 0
  • When no thread pool is defined, under the hood a ForkJoinPool.commonPool() is used with 8 Threads.

  • The number of 8 threads is calculated by Runtime.getRuntime().availableProcessors(): The current machine has 4 cores á 2 threads.

  • There is no return value, so the return type is CompletableFuture<Void>

0565 seqd thread with get
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;

public class CompletableFuturesDemo {
  public static void show() {
    Supplier<Integer> task = () -> 1;
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(task);
    try {
      int result = future.get();
      System.out.println(result);
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }
  }
}

Output:

1

Process finished with exit code 0

6.2. Implementing an Asynchronous API

6.2.1. Synchronous Operation

public class MailService {
  public void send() {
    LongTask.simulate();
    System.out.println("Mail was sent.");
  }
}
public class Main {
  public static void main(String[] args) {
    final MailService service = new MailService();
    service.send();
    System.out.println("Hello World!");
  }
}

Output:

Mail was sent.
Hello World!

Process finished with exit code 0

send() is a blocking (synchronous) operation, so "Hello World" is printed after waiting for the completion od send().

6.2.2. Asynchronous Operation

public class MailService {
  public void send() {
    LongTask.simulate();
    System.out.println("Mail was sent.");
  }

  public CompletableFuture<Void> sendAsync() {
    return CompletableFuture.runAsync(() -> send()); (1)
  }
}
public class Main {
  public static void main(String[] args) {
    final MailService service = new MailService();
    service.sendAsync(); (2)
    System.out.println("Hello World!");
  }
}
1 First, we wrap the invocation of send() into a CompletableFuture.
2 The wrapper-method sendAsync() is called.

Output:

Hello World!

Process finished with exit code 0

But now occurs a problem. The main()-Method has finished so fast, that the thread is not able to print "Mail was sent."

public class Main {

  public static void main(String[] args) {
    final MailService service = new MailService();
    service.sendAsync();
    System.out.println("Hello World!");

    try {
      Thread.sleep(5000);  (1)
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

Output:

Hello World!
Mail was sent.  (2)

Process finished with exit code 0  (3)
1 We have to wait until the mail is sent to see the output.
2 3 seconds after "Hello World!" the mail-output is printed
3 After 5 seconds the program will end.

6.3. Running Code on Completion

6.3.1. CompletionStage

java.util.concurrent.CompletionStage<T> interface represents a commutation task (either synchronous or asynchronous). As all methods declared in this interface return an instance of CompletionStage itself, multiple CompletionStages can be chained together in different ways to complete a group of tasks. Java - Basics of CompletionStage And CompletableFuture

6.3.2. thenRun()

public class CompletableFuturesDemo {
  public static void show() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1);
    future.thenRun(() -> {  (1)
      System.out.println(Thread.currentThread().getName());
      System.out.println("Done");
    });
  }
}

Output:

main (2)
Done

Process finished with exit code 0
1 After finishing the task continue with the next task.
2 This is a synchronous operation and therefore executed in the main-thread

6.3.3. thenRunAsync()

public class CompletableFuturesDemo {
  public static void show() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1);
    future.thenRunAsync(() -> {  (1)
      System.out.println(Thread.currentThread().getName());
      System.out.println("Done");
    });
  }
}

Output:

ForkJoinPool.commonPool-worker-3 (2)
Done

Process finished with exit code 0
1 You can also continue with a asynchronous operation, …​
2 …​ which is executed in a worker thread.

6.3.4. then Accept()

To get the result of the CompletableFuture. It uses a consumer-object (Interface Consumer<T>). The accept-method has no return value.

0570 interface consumer

public class CompletableFuturesDemo {
  public static void show() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1);
    future.thenAccept(result -> {
      System.out.println(Thread.currentThread().getName());
      System.out.println(result);
    });
  }
}

Output:

main
1

Process finished with exit code 0

6.3.5. thenAcceptAsync()

public class CompletableFuturesDemo {
  public static void show() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1);
    future.thenAcceptAsync(result -> { (1)
      System.out.println(Thread.currentThread().getName());
      System.out.println(result);
    });
  }
}

Output

ForkJoinPool.commonPool-worker-3
1
1 when the result (1) is not shown, then the main thread finishes earlier than the thread. Then you need to use sleep() in the main-thread.

0575 thenAcceptAsync

6.4. Handling Exceptions

public class CompletableFuturesDemo {
  public static void show() {
    final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      System.out.println("Getting the current weather");
      throw new IllegalStateException(); (1)
      // return <temperature>
    });
  }
}

Output

Process finished with exit code 0
1 A exception is thrown, but nothing happens in the console, because the exception is thrown in another thread.
The get()-method can throw InterruptedException and ExecutionException

0580 completable future get exceptions

public class CompletableFuturesDemo {
  public static void show() {
    final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      System.out.println("Getting the current weather");
      throw new IllegalStateException(); (1)
      // return <temperature>
    });

    try {
      future.get(); (2)
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) { (3)
      e.printStackTrace();
    }
  }
}

Output

Getting the current weather
java.util.concurrent.ExecutionException: java.lang.IllegalStateException
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
	at at.htl.multithreading.executors.CompletableFuturesDemo.show(CompletableFuturesDemo.java:15)
	at at.htl.multithreading.Main.main(Main.java:15)
Caused by: java.lang.IllegalStateException
	at at.htl.multithreading.executors.CompletableFuturesDemo.lambda$show$0(CompletableFuturesDemo.java:11)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)

Process finished with exit code 0
1 Now we get a ExecutionException caused by a IllegalStateException. The IllegalStateException is wrapped by a ExecutionException

With System.err.println(e.getCause()); in the ExecutionException you can show the original exception.

Now we cann add .exceptionally():

0590 completable future get exceptionally

This method throws a Throwable, the parent class of all Errors and Exceptions in Java.

0600 throwable

0610 completable future exceptionally

The method exceptionally() returns a new CompletableFuture and shows ie the last valid temperature.

public class CompletableFuturesDemo {
  public static void show() {
    final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      System.out.println("Getting the current weather");
      throw new IllegalStateException();
      // return <temperature>
    });

    try {
      int temperature = future.exceptionally(ex -> 17).get(); (1)
      System.out.println(temperature);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
  }
}
1 The new CompletableFuture returns 17

Output:

Getting the current weather
17

Process finished with exit code 0

6.5. Transforming a CompletableFuture

Sometimes you need to transorm the result into ie complex data structure.

public class CompletableFuturesDemo {
  public static void show() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 20);
    try {
      Double result = future
          .thenApply(celsius -> (celsius * 1.8) + 32) (1)
          .get();
      System.out.println(result);
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }
  }
}

Output:

68.0

Process finished with exit code 0
1 The temperature in Celsius is transformed to Fahrenheit.

We can make a more "beautiful" code

public class CompletableFuturesDemo {

  public static void show() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 20);
    future
        .thenApply(celsius -> (celsius * 1.8) + 32)
        .thenAccept(f -> System.out.println(f)); (1)
  }
}
1 You could als use .thenAccept(System.out::println);

The CompletionStage-Interface provides a bunch of methods:

  • thenRun: executes the given action (Runnable)

  • thenApply: executes the given function (Function) with parameter to it

  • thenAccept: executes the given action (Consumer) with parameter to it

  • etc

Einen guten Überblick (v.a. ab Abschnitt 4) gibt Leitfaden für CompletableFuture

It is also possible to use a method reference.

public class CompletableFuturesDemo {

  public static int toFahrenheit(int celsius) { (1)
    return (int) (celsius * 1.8) + 32;
  }

  public static void show() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 20);
    future
        .thenApply(CompletableFuturesDemo::toFahrenheit) (2)
        .thenAccept(System.out::println);
  }
}
1 the new method toFahrenheit(…​). We introduce a method, because in reality this actions often more complex, like accessing a database.
2 the method reference

6.6. Composing Completables Futures

Now a task is started, after completion a second task is processed. As example with an id you can get the email, which is used for a (fictive) streaming website to get a playlist.

public class CompletableFuturesDemo {
  public static void show() {
    // id -> email
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "email");
    // email -> playlist
    future
        .thenCompose(email -> CompletableFuture.supplyAsync(() -> "playlist"))
        .thenAccept(playlist -> System.out.println(playlist));
  }
}

Output:

playlist

Process finished with exit code 0

thenCompose() creates a new CompletableFuture. It is similar to thenApply(), but it flattens the hierarchy.

Now we get rid of the variable:

public class CompletableFuturesDemo {
  public static void show() {
    CompletableFuture
        .supplyAsync(() -> "email")
        .thenCompose(email -> CompletableFuture.supplyAsync(() -> "playlist"))
        .thenAccept(playlist -> System.out.println(playlist));
  }
}

Now we introduce two methods getUserEmail() and getPlaylistAsync().

public class CompletableFuturesDemo {

  public static CompletableFuture<String> getUserEmail() { (1)
    return CompletableFuture.supplyAsync(() -> "email");
  }

  public static CompletableFuture<String> getPlaylistAsync(String email) { (2)
    return CompletableFuture.supplyAsync(() -> "playlist");
  }

  public static void show() {
    getUserEmail()
        .thenCompose(CompletableFuturesDemo::getPlaylistAsync)  (3)
        .thenAccept(playlist -> System.out.println(playlist));
  }
}
1 method getUserEmail()
2 method getPlaylistAsync()
3 the algorithm is now declarative

6.7. Combining Completable Futures

Ability to start two tasks asynchronosly and to combine the results.

Example

First we access a remote service to get a price of a product in US$. Second, at the same time we access another remote service to get the exchange rate → 0.9

public class CompletableFuturesDemo {
  public static void show() {
    CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> 20);
    CompletableFuture<Double> second = CompletableFuture.supplyAsync(() -> 0.9);

    first.thenCombine(second, (price, exchangeRate) -> price * exchangeRate) (1)
         .thenAccept(result -> System.out.println(result));
  }
}

Output:

18.0

Process finished with exit code 0
1 The second parameter is a Bifunction, which provides the algorithm to combine the results of the two threads.
java.util.function.BiFunction<? super T, ? super U, ? extends V> fn)

What happens, when we get the price as string?

public class CompletableFuturesDemo {
  public static void show() {
    CompletableFuture<Integer> first = CompletableFuture
        .supplyAsync(() -> "20USD")
        .thenApply(str -> {
          String price = str.replace("USD", "");
          return Integer.parseInt(price);
        });
    CompletableFuture<Double> second = CompletableFuture.supplyAsync(() -> 0.9);

    first.thenCombine(second, (price, exchangeRate) -> price * exchangeRate)
        .thenAccept(result -> System.out.println(result));
  }
}

6.8. Waiting for Many Tasks to Complete

0620 acd allof
public class CompletableFuturesDemo {
  public static void show() {
    CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> 1);
    CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 2);
    CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 3);

    CompletableFuture<Void> all = CompletableFuture.allOf(first, second, third);
    all.thenRun(() -> { (1)
      try {
        int firstResult = first.get(); (2)
        int secondResult = second.get(); (2)
        int thirdResult = third.get(); (2)
        System.out.println("Result: " + (firstResult + secondResult + thirdResult));
      } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
      }
      System.out.println("All tasks completed successfully");
    });
  }
}

Output

Result: 6
All tasks completed successfully

Process finished with exit code 0
1 CompletableFuture.allOf(…​) joins the threads.
2 You have also access to the result of each thread.

6.9. Waiting for the First Task

public class CompletableFuturesDemo {
  public static void show() {
    CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> {
      LongTask.simulate();
      return 20;
    });

    CompletableFuture<Integer> second =CompletableFuture.supplyAsync(() -> 20);

    CompletableFuture
        .anyOf(first, second)  (1)
        .thenAccept(temp -> System.out.println(temp));
  }
}

Output:

20

Process finished with exit code 0
1 anyOf() waits for the first thread, which finishes. So the result will appear immediately. There is no need to wait for the LongTask-thread

6.10. Handling Timeouts

public class CompletableFuturesDemo {
  public static void show() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      LongTask.simulate();
      return 1;
    });

    int result = 0;
    try {
      result = future
          .orTimeout(1, TimeUnit.SECONDS) (1)
          .get();
      System.out.println(result);
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }
  }
}

Output: 0640 future timeout exception

1 The .timeout(…​)-method throws an exception, because the LongTask-thread processes longer than 1 second.
pay attention to the great support of the IDE

0630 future or timeout

For the end-user it might be better, that the program is not exiting with an exception

public class CompletableFuturesDemo {
  public static void show() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      LongTask.simulate();
      return 1;
    });

    int result = 0;
    try {
      result = future
          .completeOnTimeout(1, 1, TimeUnit.SECONDS) (1)
          .get();
      System.out.println(result);
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }
  }
}

Output:

1

Process finished with exit code 0
1 Completes this CompletableFuture with the given value, if not otherwise completed before the given timeout.

7. Project: Best Price Finder

Create a asynchronous program to get the prices of three different sites.

This output should be achieved.

Getting a quote from site3
Getting a quote from site1
Getting a quote from site2
Quote{site='site2', price=108}
Quote{site='site1', price=109}
Quote{site='site3', price=105}
Retreived all quotes in 2415 msec

Process finished with exit code 0

7.1. Getting one quote

Class Quote
public class Quote {
  private final String site;
  private final int price;

  public Quote(String site, int price) {
    this.site = site;
    this.price = price;
  }

  public String getSite() {
    return site;
  }

  public int getPrice() {
    return price;
  }

  @Override
  public String toString() {
    return "Quote{" +
        "site='" + site + '\'' +
        ", price=" + price +
        '}';
  }
}
CLass FlightService
public class FlightService {
  public CompletableFuture<Quote> getQuote(String site) {
    return CompletableFuture.supplyAsync(() -> {
      System.out.println("Getting a quote from " + site);

      LongTask.simulate();

      Random random = new Random();
      int price = 100 + random.nextInt(10);

      return new Quote(site, price);
    });
  }
}
public class CompletableFuturesDemo {
  public static void show() {
    FlightService service = new FlightService();
    service
        .getQuote("site1")
        .thenAccept(System.out::println);

    try {
      Thread.sleep(10_000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

Output:

Getting a quote from site1
Quote{site='site1', price=104}

Process finished with exit code 0

7.2. Getting Many Quotes

public class FlightService {
  public Stream<CompletableFuture<Quote>> getQuotes() { (1)
    List<String> sites = List.of("site1", "site2", "site3");

    return sites.stream()
        .map(this::getQuote);
  }


  public CompletableFuture<Quote> getQuote(String site) {
    return CompletableFuture.supplyAsync(() -> {
      System.out.println("Getting a quote from " + site);

      LongTask.simulate();

      Random random = new Random();
      int price = 100 + random.nextInt(10);

      return new Quote(site, price);
    });
  }
}
0650 acd many sites
public class CompletableFuturesDemo {
  public static void show() {
    FlightService service = new FlightService();
    service
        .getQuotes()
        .map(future -> future.thenAccept(System.out::println))
        .collect(Collectors.toList());

    try {
      Thread.sleep(10_000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

Output:

Getting a quote from site2
Getting a quote from site3
Getting a quote from site1
Quote{site='site3', price=106}
Quote{site='site1', price=100}
Quote{site='site2', price=106}

Process finished with exit code 0
1 We introduced a new method getQuotes().
Notice, the return value type of getQuotes(…​) is a Stream<…​>.

7.3. Random Delays

Overwrite the simulate()-method
public class LongTask {
  public static void simulate() {
    try {
      Thread.sleep(3000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public static void simulate(int delay) { (1)
    try {
      Thread.sleep(delay);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}
generate a random waiting duration
public class FlightService {
  public Stream<CompletableFuture<Quote>> getQuotes() {
    List<String> sites = List.of("site1", "site2", "site3");

    return sites.stream()
        .map(this::getQuote);
  }


  public CompletableFuture<Quote> getQuote(String site) {
    return CompletableFuture.supplyAsync(() -> {
      System.out.println("Getting a quote from " + site);

      Random random = new Random(); (2)
      LongTask.simulate(1_000 + random.nextInt(2_000)); (2)
      int price = 100 + random.nextInt(10);

      return new Quote(site, price);
    });
  }
}
0660 acd all of
public class CompletableFuturesDemo {
  public static void show() {
    LocalTime start = LocalTime.now(); (3)

    FlightService service = new FlightService();
    List<CompletableFuture<Void>> futures = service
        .getQuotes()
        .map(future -> future.thenAccept(System.out::println))
        .collect(Collectors.toList());

    CompletableFuture
        .allOf(futures.toArray(new CompletableFuture[0]))
        .thenRun(() -> {
          LocalTime end = LocalTime.now();
          Duration duration = Duration.between(start, end);
          System.out.println("Retrieved all quotes in " + duration.toMillis() + " msec");
        });

    try {
      Thread.sleep(10_000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

Output:

Getting a quote from site2
Getting a quote from site3
Getting a quote from site1
Quote{site='site3', price=103}
Quote{site='site1', price=103}
Quote{site='site2', price=101}
Retrieved all quotes in 2419 msec

Process finished with exit code 0

8. Sources

A very comprehensive book is:

  • Modern Java in Action, Lambdas, Streams, Functional and Reactive Programming, Urma et. al., Manning, 2019

  • Java 11 Cookbook - A Definitive Guide to Learning the Key Concepts of Modern Application Development, Second Ed., 2018, packt

Links: