1. Course summary
Following you find the summary of this course:
This is only a short excerpt - you are strongly advised to buy this course for the full information. |
2. Thread Pools
Das Arbeiten mit Threads ist kompliziert unf fehleranfällig. Es gibt daher die Möglichkeit Java die Arbeit mit den Threads zu überlassen und sich auf die zu erfüllende aufgabe zu konzentrieren.
-
Availability: too many threads → out-of-memory exception
-
Costs: the creating of threads is "expensive"
→ Thread Pools
Thread of this pool can be used and afterwards they are returned to the thread pool, where they are ready for new usage.
Java takes care of using the threads for executing the threads. When not enough threads in the pool, the tasks are waiting in a task queue.
3. Executors
ThreadPoolExecutor is a implementation of Interface ExecutorService
There are also
-
ScheduledThreadPoolExecutor and
-
ForkJoinPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecutorsDemo {
public static void show() {
final ExecutorService executor = Executors.newFixedThreadPool(2);
System.out.println(executor.getClass().getName());
}
}
Output:
java.util.concurrent.ThreadPoolExecutor
public class ExecutorsDemo {
public static void show() {
final ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
System.out.println(Thread.currentThread().getName());
});
}
}
Output:
pool-1-thread-1
Now we run our first thread of the thread pool. We didn’t have to start the thread, we only provided the task as lambda expression.
public class ExecutorsDemo {
public static void show() {
final ExecutorService executor = Executors.newFixedThreadPool(2);
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
System.out.println(Thread.currentThread().getName());
});
}
}
}
Output:
pool-1-thread-2 pool-1-thread-1 pool-1-thread-2 pool-1-thread-2 pool-1-thread-2 pool-1-thread-2 pool-1-thread-2 pool-1-thread-2 pool-1-thread-2 pool-1-thread-2
Internally exists a task queue, which processes all 10 tasks with the two threads of the thread pool.
The program is still running, because the tread pool is awaiting new tasks.
We have to use executor.shutdown()
|
public class ExecutorsDemo {
public static void show() {
final ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
System.out.println(Thread.currentThread().getName());
});
executor.shutdown();
}
}
Output:
pool-1-thread-1 Process finished with exit code 0 (1)
1 | Now, the program is finished. |
But now is a problem. When an error occurs, the executor service (thread pool) will not be ended. So it is best tu place the shutdown()-method into a finally block. |
public class ExecutorsDemo {
public static void show() {
final ExecutorService executor = Executors.newFixedThreadPool(2);
try {
executor.submit(() -> {
System.out.println(Thread.currentThread().getName());
});
} finally {
executor.shutdown(); (1)
}
}
}
1 | Now the shutdown()-method will be executed even when there occurs an error. |
4. Callables and Futures
The tasks are normally of type Runnable
, but when you need a return-value ie an record of a database,
the type is Callable
→ Interface Callable<V>
Callable is very similar to Runnable, but this task returns a value. Instead of a method run() it has a method call() which returns a value.
public class ExecutorsDemo {
public static void show() {
final ExecutorService executor = Executors.newFixedThreadPool(2);
try {
Future<Integer> future = executor.submit(() -> { (1)
System.out.println(Thread.currentThread().getName());
return 1; (2)
});
} finally {
executor.shutdown();
}
}
}
1 | The variable is from type Future<Integer>, because the result will available in the future, even this are only milliseconds. |
2 | The return value of the task. |
package at.htl.multithreading.executors;
public class LongTask {
public static void simulate() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ExecutorsDemo {
public static void show() {
final ExecutorService executor = Executors.newFixedThreadPool(2);
try {
Future<Integer> future = executor.submit(() -> {
LongTask.simulate(); (1)
return 1;
});
System.out.println("Do more work");
System.out.println(future); (2)
int result = future.get(); (3)
System.out.println(result);
System.out.println(future); (4)
} catch (InterruptedException | ExecutionException e) { (5)
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}
Output:
Do more work java.util.concurrent.FutureTask@6e8dacdf[Not completed, task = at.htl.multithreading.executors.ExecutorsDemo$$Lambda$14/0x0000000800066840@1e643faf] 1 java.util.concurrent.FutureTask@6e8dacdf[Completed normally] Process finished with exit code 0
1 | First, we simulate a long run of the task. |
2 | Now immediately the future-Variable will be printed to the console |
3 | The get() method is blocking for 3 seconds, until the task will be finished. 1 is the result. |
4 | Printing the future-variable again, shows that the task has completed normally |
5 | For the get-method you need InterruptedException and ExecutionException |
There are also another methods in the Future-class:
5. Asynchronous Programming
asynchronous \(\cong\) non-blocking
The get()-method of a future is blocking and therefore a synchronous code.
Man kann sich als Analogie für ein asynchrones Programm ein Restaurant vorstellen, in dem der Keller auch mehrere Gäste hintereinander bedienen kann, ohne auf die Zubereitung des Essens für jeden Gast zu warten.
6. CompletableFutures
6.1. Creating a CompletableFuture - Object
public class CompletableFuturesDemo {
public static void show() {
Runnable task = () -> System.out.println("a");
final CompletableFuture<Void> future = CompletableFuture.runAsync(task);
}
}
Output:
a Process finished with exit code 0
-
When no thread pool is defined, under the hood a
ForkJoinPool.commonPool()
is used with 8 Threads. -
The number of 8 threads is calculated by
Runtime.getRuntime().availableProcessors()
: The current machine has 4 cores á 2 threads. -
There is no return value, so the return type is
CompletableFuture<Void>
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
public class CompletableFuturesDemo {
public static void show() {
Supplier<Integer> task = () -> 1;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(task);
try {
int result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
Output:
1 Process finished with exit code 0
6.2. Implementing an Asynchronous API
6.2.1. Synchronous Operation
public class MailService {
public void send() {
LongTask.simulate();
System.out.println("Mail was sent.");
}
}
public class Main {
public static void main(String[] args) {
final MailService service = new MailService();
service.send();
System.out.println("Hello World!");
}
}
Output:
Mail was sent. Hello World! Process finished with exit code 0
send() is a blocking (synchronous) operation, so "Hello World" is printed after waiting for the completion od send().
6.2.2. Asynchronous Operation
public class MailService {
public void send() {
LongTask.simulate();
System.out.println("Mail was sent.");
}
public CompletableFuture<Void> sendAsync() {
return CompletableFuture.runAsync(() -> send()); (1)
}
}
public class Main {
public static void main(String[] args) {
final MailService service = new MailService();
service.sendAsync(); (2)
System.out.println("Hello World!");
}
}
1 | First, we wrap the invocation of send() into a CompletableFuture. |
2 | The wrapper-method sendAsync() is called. |
Output:
Hello World! Process finished with exit code 0
But now occurs a problem. The main()-Method has finished so fast, that the thread is not able to print "Mail was sent."
public class Main {
public static void main(String[] args) {
final MailService service = new MailService();
service.sendAsync();
System.out.println("Hello World!");
try {
Thread.sleep(5000); (1)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Output:
Hello World! Mail was sent. (2) Process finished with exit code 0 (3)
1 | We have to wait until the mail is sent to see the output. |
2 | 3 seconds after "Hello World!" the mail-output is printed |
3 | After 5 seconds the program will end. |
6.3. Running Code on Completion
6.3.1. CompletionStage
java.util.concurrent.CompletionStage<T>
interface represents a commutation task (either synchronous or asynchronous).
As all methods declared in this interface return an instance of CompletionStage itself, multiple CompletionStages
can be chained together in different ways to complete a group of tasks.
Java - Basics of CompletionStage And CompletableFuture
6.3.2. thenRun()
public class CompletableFuturesDemo {
public static void show() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1);
future.thenRun(() -> { (1)
System.out.println(Thread.currentThread().getName());
System.out.println("Done");
});
}
}
Output:
main (2) Done Process finished with exit code 0
1 | After finishing the task continue with the next task. |
2 | This is a synchronous operation and therefore executed in the main-thread |
6.3.3. thenRunAsync()
public class CompletableFuturesDemo {
public static void show() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1);
future.thenRunAsync(() -> { (1)
System.out.println(Thread.currentThread().getName());
System.out.println("Done");
});
}
}
Output:
ForkJoinPool.commonPool-worker-3 (2) Done Process finished with exit code 0
1 | You can also continue with a asynchronous operation, … |
2 | … which is executed in a worker thread. |
6.3.4. then Accept()
To get the result of the CompletableFuture. It uses a consumer-object (Interface Consumer<T>). The accept-method has no return value.
public class CompletableFuturesDemo {
public static void show() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1);
future.thenAccept(result -> {
System.out.println(Thread.currentThread().getName());
System.out.println(result);
});
}
}
Output:
main 1 Process finished with exit code 0
6.3.5. thenAcceptAsync()
public class CompletableFuturesDemo {
public static void show() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1);
future.thenAcceptAsync(result -> { (1)
System.out.println(Thread.currentThread().getName());
System.out.println(result);
});
}
}
Output
ForkJoinPool.commonPool-worker-3 1
1 | when the result (1) is not shown, then the main thread finishes earlier than the thread. Then you need to use sleep() in the main-thread. |
6.4. Handling Exceptions
public class CompletableFuturesDemo {
public static void show() {
final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Getting the current weather");
throw new IllegalStateException(); (1)
// return <temperature>
});
}
}
Output
Process finished with exit code 0
1 | A exception is thrown, but nothing happens in the console, because the exception is thrown in another thread. |
public class CompletableFuturesDemo {
public static void show() {
final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Getting the current weather");
throw new IllegalStateException(); (1)
// return <temperature>
});
try {
future.get(); (2)
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) { (3)
e.printStackTrace();
}
}
}
Output
Getting the current weather java.util.concurrent.ExecutionException: java.lang.IllegalStateException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at at.htl.multithreading.executors.CompletableFuturesDemo.show(CompletableFuturesDemo.java:15) at at.htl.multithreading.Main.main(Main.java:15) Caused by: java.lang.IllegalStateException at at.htl.multithreading.executors.CompletableFuturesDemo.lambda$show$0(CompletableFuturesDemo.java:11) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177) Process finished with exit code 0
1 | Now we get a ExecutionException caused by a IllegalStateException. The IllegalStateException is wrapped by a ExecutionException |
With System.err.println(e.getCause());
in the ExecutionException you can show the original exception.
Now we cann add .exceptionally():
This method throws a Throwable, the parent class of all Errors and Exceptions in Java.
The method exceptionally() returns a new CompletableFuture and shows ie the last valid temperature.
public class CompletableFuturesDemo {
public static void show() {
final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Getting the current weather");
throw new IllegalStateException();
// return <temperature>
});
try {
int temperature = future.exceptionally(ex -> 17).get(); (1)
System.out.println(temperature);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
1 | The new CompletableFuture returns 17 |
Output:
Getting the current weather 17 Process finished with exit code 0
6.5. Transforming a CompletableFuture
Sometimes you need to transorm the result into ie complex data structure.
public class CompletableFuturesDemo {
public static void show() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 20);
try {
Double result = future
.thenApply(celsius -> (celsius * 1.8) + 32) (1)
.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
Output:
68.0 Process finished with exit code 0
1 | The temperature in Celsius is transformed to Fahrenheit. |
We can make a more "beautiful" code
public class CompletableFuturesDemo {
public static void show() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 20);
future
.thenApply(celsius -> (celsius * 1.8) + 32)
.thenAccept(f -> System.out.println(f)); (1)
}
}
1 | You could als use .thenAccept(System.out::println); |
The CompletionStage-Interface provides a bunch of methods:
-
thenRun: executes the given action (Runnable)
-
thenApply: executes the given function (Function) with parameter to it
-
thenAccept: executes the given action (Consumer) with parameter to it
-
etc
It is also possible to use a method reference.
public class CompletableFuturesDemo {
public static int toFahrenheit(int celsius) { (1)
return (int) (celsius * 1.8) + 32;
}
public static void show() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 20);
future
.thenApply(CompletableFuturesDemo::toFahrenheit) (2)
.thenAccept(System.out::println);
}
}
1 | the new method toFahrenheit(…). We introduce a method, because in reality this actions often more complex, like accessing a database. |
2 | the method reference |
6.6. Composing Completables Futures
Now a task is started, after completion a second task is processed. As example with an id you can get the email, which is used for a (fictive) streaming website to get a playlist.
public class CompletableFuturesDemo {
public static void show() {
// id -> email
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "email");
// email -> playlist
future
.thenCompose(email -> CompletableFuture.supplyAsync(() -> "playlist"))
.thenAccept(playlist -> System.out.println(playlist));
}
}
Output:
playlist Process finished with exit code 0
thenCompose() creates a new CompletableFuture. It is similar to thenApply(), but it flattens the hierarchy.
Now we get rid of the variable:
public class CompletableFuturesDemo {
public static void show() {
CompletableFuture
.supplyAsync(() -> "email")
.thenCompose(email -> CompletableFuture.supplyAsync(() -> "playlist"))
.thenAccept(playlist -> System.out.println(playlist));
}
}
Now we introduce two methods getUserEmail()
and getPlaylistAsync()
.
public class CompletableFuturesDemo {
public static CompletableFuture<String> getUserEmail() { (1)
return CompletableFuture.supplyAsync(() -> "email");
}
public static CompletableFuture<String> getPlaylistAsync(String email) { (2)
return CompletableFuture.supplyAsync(() -> "playlist");
}
public static void show() {
getUserEmail()
.thenCompose(CompletableFuturesDemo::getPlaylistAsync) (3)
.thenAccept(playlist -> System.out.println(playlist));
}
}
1 | method getUserEmail() |
2 | method getPlaylistAsync() |
3 | the algorithm is now declarative |
6.7. Combining Completable Futures
Ability to start two tasks asynchronosly and to combine the results.
First we access a remote service to get a price of a product in US$. Second, at the same time we access another remote service to get the exchange rate → 0.9
public class CompletableFuturesDemo {
public static void show() {
CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Double> second = CompletableFuture.supplyAsync(() -> 0.9);
first.thenCombine(second, (price, exchangeRate) -> price * exchangeRate) (1)
.thenAccept(result -> System.out.println(result));
}
}
Output:
18.0 Process finished with exit code 0
1 | The second parameter is a Bifunction, which provides the algorithm to combine the results of the two threads. |
java.util.function.BiFunction<? super T, ? super U, ? extends V> fn)
What happens, when we get the price as string?
public class CompletableFuturesDemo {
public static void show() {
CompletableFuture<Integer> first = CompletableFuture
.supplyAsync(() -> "20USD")
.thenApply(str -> {
String price = str.replace("USD", "");
return Integer.parseInt(price);
});
CompletableFuture<Double> second = CompletableFuture.supplyAsync(() -> 0.9);
first.thenCombine(second, (price, exchangeRate) -> price * exchangeRate)
.thenAccept(result -> System.out.println(result));
}
}
6.8. Waiting for Many Tasks to Complete
public class CompletableFuturesDemo {
public static void show() {
CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 3);
CompletableFuture<Void> all = CompletableFuture.allOf(first, second, third);
all.thenRun(() -> { (1)
try {
int firstResult = first.get(); (2)
int secondResult = second.get(); (2)
int thirdResult = third.get(); (2)
System.out.println("Result: " + (firstResult + secondResult + thirdResult));
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("All tasks completed successfully");
});
}
}
Output
Result: 6 All tasks completed successfully Process finished with exit code 0
1 | CompletableFuture.allOf(…) joins the threads. |
2 | You have also access to the result of each thread. |
6.9. Waiting for the First Task
public class CompletableFuturesDemo {
public static void show() {
CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> {
LongTask.simulate();
return 20;
});
CompletableFuture<Integer> second =CompletableFuture.supplyAsync(() -> 20);
CompletableFuture
.anyOf(first, second) (1)
.thenAccept(temp -> System.out.println(temp));
}
}
Output:
20 Process finished with exit code 0
1 | anyOf() waits for the first thread, which finishes. So the result will appear immediately.
There is no need to wait for the LongTask-thread |
6.10. Handling Timeouts
public class CompletableFuturesDemo {
public static void show() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
LongTask.simulate();
return 1;
});
int result = 0;
try {
result = future
.orTimeout(1, TimeUnit.SECONDS) (1)
.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
Output:
1 | The .timeout(…) -method throws an exception, because the LongTask-thread processes longer than 1 second. |
For the end-user it might be better, that the program is not exiting with an exception
public class CompletableFuturesDemo {
public static void show() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
LongTask.simulate();
return 1;
});
int result = 0;
try {
result = future
.completeOnTimeout(1, 1, TimeUnit.SECONDS) (1)
.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
Output:
1 Process finished with exit code 0
1 | Completes this CompletableFuture with the given value, if not otherwise completed before the given timeout. |
7. Project: Best Price Finder
Create a asynchronous program to get the prices of three different sites.
This output should be achieved.
Getting a quote from site3 Getting a quote from site1 Getting a quote from site2 Quote{site='site2', price=108} Quote{site='site1', price=109} Quote{site='site3', price=105} Retreived all quotes in 2415 msec Process finished with exit code 0
7.1. Getting one quote
public class Quote {
private final String site;
private final int price;
public Quote(String site, int price) {
this.site = site;
this.price = price;
}
public String getSite() {
return site;
}
public int getPrice() {
return price;
}
@Override
public String toString() {
return "Quote{" +
"site='" + site + '\'' +
", price=" + price +
'}';
}
}
public class FlightService {
public CompletableFuture<Quote> getQuote(String site) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Getting a quote from " + site);
LongTask.simulate();
Random random = new Random();
int price = 100 + random.nextInt(10);
return new Quote(site, price);
});
}
}
public class CompletableFuturesDemo {
public static void show() {
FlightService service = new FlightService();
service
.getQuote("site1")
.thenAccept(System.out::println);
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Output:
Getting a quote from site1 Quote{site='site1', price=104} Process finished with exit code 0
7.2. Getting Many Quotes
public class FlightService {
public Stream<CompletableFuture<Quote>> getQuotes() { (1)
List<String> sites = List.of("site1", "site2", "site3");
return sites.stream()
.map(this::getQuote);
}
public CompletableFuture<Quote> getQuote(String site) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Getting a quote from " + site);
LongTask.simulate();
Random random = new Random();
int price = 100 + random.nextInt(10);
return new Quote(site, price);
});
}
}
public class CompletableFuturesDemo {
public static void show() {
FlightService service = new FlightService();
service
.getQuotes()
.map(future -> future.thenAccept(System.out::println))
.collect(Collectors.toList());
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Output:
Getting a quote from site2 Getting a quote from site3 Getting a quote from site1 Quote{site='site3', price=106} Quote{site='site1', price=100} Quote{site='site2', price=106} Process finished with exit code 0
1 | We introduced a new method getQuotes(). |
Notice, the return value type of getQuotes(…) is a Stream<…>. |
7.3. Random Delays
public class LongTask {
public static void simulate() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void simulate(int delay) { (1)
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class FlightService {
public Stream<CompletableFuture<Quote>> getQuotes() {
List<String> sites = List.of("site1", "site2", "site3");
return sites.stream()
.map(this::getQuote);
}
public CompletableFuture<Quote> getQuote(String site) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Getting a quote from " + site);
Random random = new Random(); (2)
LongTask.simulate(1_000 + random.nextInt(2_000)); (2)
int price = 100 + random.nextInt(10);
return new Quote(site, price);
});
}
}
public class CompletableFuturesDemo {
public static void show() {
LocalTime start = LocalTime.now(); (3)
FlightService service = new FlightService();
List<CompletableFuture<Void>> futures = service
.getQuotes()
.map(future -> future.thenAccept(System.out::println))
.collect(Collectors.toList());
CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
LocalTime end = LocalTime.now();
Duration duration = Duration.between(start, end);
System.out.println("Retrieved all quotes in " + duration.toMillis() + " msec");
});
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Output:
Getting a quote from site2 Getting a quote from site3 Getting a quote from site1 Quote{site='site3', price=103} Quote{site='site1', price=103} Quote{site='site2', price=101} Retrieved all quotes in 2419 msec Process finished with exit code 0