1. Course summary
Following you find the summary of this course:
This is only a short excerpt - you are strongly advised to buy this course for the full information. |
2. Processes and Threads
-
Instance of a program or a application.
-
A operating system can execute many processes at the same time.
-
Concurrency at the process level
-
Concurrency within the process or within the application.
-
Technically a sequence of instructions → a thread executes the code
-
Each process has at least one thread - the main thread
Thread … Faden
Concurrency … Gleichzeitigkeit
3. Threads
You can create threats to run several tasks concurrently
-
ie web-server serves each client in a separate thread
-
ie an application downloads multiple files concurrently
4. Starting a Thread
public static void main(String[] args) {
//Thread thread = new Thread();
ThreadDemo.show();
}
4.1. Download Threads 1
public class ThreadDemo {
public static void show() {
System.out.println(Thread.currentThread().getName());
Thread thread = new Thread(new DownloadFileTask());
thread.start();
}
}
public class DownloadFileTask implements Runnable {
@Override
public void run() {
System.out.println("Downloading a file: "
+ Thread.currentThread().getName());
}
}
5. Pausing a Thread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class DownloadFileTask implements Runnable {
@Override
public void run() {
System.out.println("Downloading a file: " + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Download complete: " + Thread.currentThread().getName());
}
}
How long will these tasks run?
6. Joining a Thread
-
Problem
-
First, we want to download a file.
-
Second, we want to search for viruses in this file.
-
Both tasks gets an own thread.
-
How will the Second task know, that the first task is finished?
-
The solution is called joining.
1
2
3
4
5
6
7
8
9
10
11
12
13
public class ThreadDemo {
public static void show() {
Thread thread = new Thread(new DownloadFileTask());
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("File is ready to be scanned.");
}
}
7. Interrupting a Thread
1
2
3
4
5
6
7
8
9
10
11
12
public class DownloadFileTask implements Runnable {
@Override
public void run() {
System.out.println("Downloading a file: " + Thread.currentThread().getName());
for (int i = 0; i < Integer.MAX_VALUE; i++) {
System.out.println("Downloading byte " + i);
}
System.out.println("Download complete: " + Thread.currentThread().getName());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ThreadDemo {
public static void show() {
Thread thread = new Thread(new DownloadFileTask());
thread.start();
try {
Thread.sleep(1000); (1)
} catch (InterruptedException e) {
e.printStackTrace();
}
(2)
thread.interrupt();
}
}
1 | wait a second |
2 | then interrupt the DownloadTask |
This code doesn’t work |
There is only sent a request to the thread to stop. It is up to the thread, if he really stops.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class DownloadFileTask implements Runnable {
@Override
public void run() {
System.out.println("Downloading a file: " + Thread.currentThread().getName());
for (int i = 0; i < Integer.MAX_VALUE; i++) {
if (Thread.currentThread().isInterrupted()) {
return;
}
System.out.println("Downloading byte " + i);
}
System.out.println("Download complete: " + Thread.currentThread().getName());
}
}
Now the code works |
8. Concurrency Issues
-
Until now the threads were isolated
-
Sometimes they need to modify shared resources
-
When at least on thread changes a shared object, there can several issues occure
Beispiel: Drei Personen essen gemeinsam einen Hamburger
Multiple Threads competing to modify a shared resource
A thread changes a shared resource, the other threads will not notice this
Code that can be safely executed by many threads in parallel
9. Race Conditions
1
2
3
4
5
6
7
8
9
10
11
public class DownloadStatus {
private int totalBytes;
public int getTotalBytes() {
return totalBytes;
}
public void incrementTotalBytes() {
totalBytes++;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class DownloadFileTask implements Runnable {
private DownloadStatus status;
public DownloadFileTask(DownloadStatus status) {
this.status = status;
}
@Override
public void run() {
System.out.println("Downloading a file: " + Thread.currentThread().getName());
for (int i = 0; i < 10_000; i++) { (1)
if (Thread.currentThread().isInterrupted()) {
return;
}
status.incrementTotalBytes();
}
System.out.println("Download complete: " + Thread.currentThread().getName());
}
}
1 | anstelle des MAX_VALUES verwenden wir eine Obergrenze von 10.000 (each file has 10.000 bytes) |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ThreadDemo {
public static void show() {
final DownloadStatus status = new DownloadStatus();
List<Thread> threads = new ArrayList<>(); (1)
for (int i = 0; i < 10; i++) {
final Thread thread = new Thread(new DownloadFileTask(status)); (2)
threads.add(thread); (3)
thread.start(); (4)
}
for (Thread thread : threads) {
try {
thread.join(); (5)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(status.getTotalBytes()); (6)
}
}
1 | Eine List für die Threads wird erstellt |
2 | Ein neuer Thread wird erstellt |
3 | Dieser Thread wird der Liste hinzugefügt … |
4 | … und anschließend gestartet |
5 | In dieser for-Loop wird auf die Fertigstellung aller Threads gewartet |
6 | Anzahl der heruntergeladenen Bytes |
Erwartete Zahl: 100.000
The number changes. This is a typical case of a race-condition. Multiple threads compete for incrementing a variable. |
10. Strategies for Thread Safety
-
Confinement
-
Immutability
-
Synchronization
-
Atomic objects
-
Partitioning
10.1. Confinement
dt. Beschränkung, Einschränkung, …
Restrict each thread to have its own data. Instead of sharing a download status object between multiple download tasks, …
There are several kinds of confinement
-
stack confinement
-
thread confinement with ThreadLocal
you could have each download task have its own download object.
When all the tasks are complete, you can combine the results.
10.2. Immutability
(unchangeable objects)
An object is immutable if its value cannot changed, after it is created. Sharing immutable object is total ok, because the threads can only read immutable objects.
10.3. Synchronization
Prevent multiple threads from accessing the same object concurrently. We are synchronizing (or coordinating) the access to an object across different threads.
This works with using locks
Only one thread at a time can execute this part → the code run sequentially.
Das sequentielle Abarbeiten von Code widerspricht jedoch dem Begriff der Gleichzeitigkeit (Concurrency) |
Implementing synchronization is challenging and error-prone.
10.4. Deadlock
One of these problems can be a deadlock. Two threads are waiting for each other indefinately.
When a deadlock occurs the app is crashing. A deadlock is to avoid. |
11. Confinement
1
2
3
4
5
6
7
8
9
10
11
12
13
package at.htl.multithreading.concurrency;
public class DownloadStatus {
private int totalBytes;
public int getTotalBytes() {
return totalBytes;
}
public void incrementTotalBytes() {
totalBytes++;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package at.htl.multithreading.concurrency;
public class DownloadFileTask implements Runnable {
private DownloadStatus status;
public DownloadFileTask() { (1)
this.status = new DownloadStatus(); (2)
}
@Override
public void run() {
System.out.println("Downloading a file: " + Thread.currentThread().getName());
for (int i = 0; i < 10_000; i++) {
if (Thread.currentThread().isInterrupted()) {
return;
}
status.incrementTotalBytes();
}
System.out.println("Download complete: " + Thread.currentThread().getName());
}
public DownloadStatus getStatus() { (3)
return status;
}
}
1 | The constructor parameter disappears |
2 | A new DownloadStatus-Object is instantiated |
3 | A getter is necessary |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package at.htl.multithreading.concurrency;
import java.util.ArrayList;
import java.util.List;
public class ThreadDemo {
public static void show() {
List<Thread> threads = new ArrayList<>();
List<DownloadFileTask> tasks = new ArrayList<>(); (1)
for (int i = 0; i < 10; i++) {
final DownloadFileTask task = new DownloadFileTask();
tasks.add(task); (2)
Thread thread = new Thread(task); (3)
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int totalBytes = tasks.stream()
.map((t -> t.getStatus().getTotalBytes()))
//.reduce(0, (a,b) -> a + b);
.reduce(0, Integer::sum); (4)
System.out.println(totalBytes); (5)
}
}
1 | Instead of the status-object (of type DownloadStatus), we need now a list of DownloadStatus-objects. |
2 | Now multiple tasks are created and … |
3 | each thread gets its own task. |
4 | Now the bytes of each tasks are collected … |
5 | and printed to the console. |
12. Locks
In a hotel room only one guest is sleeping. After he enters his room, the door will be locked.
12.1. Situation w/o locks
1
2
3
4
5
6
7
8
9
10
11
12
13
import java.util.concurrent.locks.ReentrantLock;
public class DownloadStatus {
private int totalBytes;
public int getTotalBytes() {
return totalBytes;
}
public void incrementTotalBytes() {
totalBytes++;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class DownloadFileTask implements Runnable {
private DownloadStatus status;
public DownloadFileTask(DownloadStatus status) {
this.status = status;
}
@Override
public void run() {
System.out.println("Downloading a file: " + Thread.currentThread().getName());
for (int i = 0; i < 10_000; i++) {
if (Thread.currentThread().isInterrupted()) {
return;
}
status.incrementTotalBytes();
}
System.out.println("Download complete: " + Thread.currentThread().getName());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadDemo {
public static void show() {
var status = new DownloadStatus();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final Thread thread = new Thread(new DownloadFileTask(status));
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(status.getTotalBytes());
}
}
12.2. Introduction of Locks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.concurrent.locks.ReentrantLock;
public class DownloadStatus {
private int totalBytes;
private Lock lock = new ReentrantLock(); (1)
public int getTotalBytes() {
return totalBytes;
}
public void incrementTotalBytes() {
lock.lock(); (2)
try {
totalBytes++;
} finally {
lock.unlock(); (3)
}
}
}
1 | we introduce a new Class → Lock |
2 | before incrementing, we lock |
3 | unkocking should always be in a finally-block (even in this case will be no exception thrown) |
As the name says, ReentrantLock allow threads to enter into lock on a resource more than once. When the thread first enters into lock, a hold count is set to one. Before unlocking the thread can re-enter into lock again and every time hold count is incremented by one. For every unlock request, hold count is decremented by one and when hold count is 0, the resource is unlocked.
13. The synchronized Keyword
1
2
3
4
5
6
7
8
9
10
11
public class DownloadStatus {
private int totalBytes;
public int getTotalBytes() {
return totalBytes;
}
public void incrementTotalBytes() {
totalBytes++;
}
}
Now we have a race condition again.
13.1. this
as Monitor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import java.util.concurrent.locks.ReentrantLock;
public class DownloadStatus {
private int totalBytes;
public int getTotalBytes() {
return totalBytes;
}
public void incrementTotalBytes() {
synchronized (this) { (1)
totalBytes++;
}
}
}
1 | we wrap the operation in a synchronized block.
this is called the monitor object (Every Java object has a monitor).
It is a bad practice to use this .
Because so it would be only possible to use one synchronized block in the this-object. |
13.2. An Dedicated object
as Monitor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package at.htl.multithreading.concurrency;
public class DownloadStatus {
private int totalBytes;
private final Object totalBytesLock = new Object();
public int getTotalBytes() {
return totalBytes;
}
public void incrementTotalBytes() {
synchronized (totalBytesLock) {
totalBytes++;
}
}
}
13.3. A Synchronized Method
1
2
3
4
5
6
7
8
9
10
11
12
13
package at.htl.multithreading.concurrency;
public class DownloadStatus {
private int totalBytes;
public int getTotalBytes() {
return totalBytes;
}
public synchronized void incrementTotalBytes() { (1)
totalBytes++;
}
}
1 | This would be the same as using syncronized(this) { … } |
Don’t use this in new code. Use always a dedicated monitor object, because of possible problems. |
14. The volatile Keyword
Another tool for writing thread safe code in Java, but without the overhead of synchronization.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package at.htl.multithreading.concurrency;
public class DownloadFileTask implements Runnable {
private DownloadStatus status;
public DownloadFileTask(DownloadStatus status) {
this.status = status;
}
@Override
public void run() {
System.out.println("Downloading a file: " + Thread.currentThread().getName());
for (int i = 0; i < 1_000_000; i++) {
if (Thread.currentThread().isInterrupted()) {
return;
}
status.incrementTotalBytes();
}
status.done(); (1)
System.out.println("Download complete: " + Thread.currentThread().getName());
}
}
1 | After finishing the download task, we set the status done |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ThreadDemo {
public static void show() {
final DownloadStatus status = new DownloadStatus();
final Thread thread1 = new Thread(new DownloadFileTask(status)); (1)
final Thread thread2 = new Thread(() -> { (2)
while (!status.isDone()) { }
System.out.println(status.getTotalBytes());
});
thread1.start(); (3)
thread2.start();
}
}
1 | The first thread downloads something |
2 | The second thread is waitng for the competion of the first thread. Notice, we use a lambda expression instead of using an object with a run()-method. |
3 | we start both threads |
The program fails The first thread stopped already, but the second thread is still running. (The program is not finsished yet) |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class DownloadStatus {
private boolean isDone;
private int totalBytes;
public int getTotalBytes() {
return totalBytes;
}
public synchronized void incrementTotalBytes() {
totalBytes++;
}
public synchronized boolean isDone() { (1)
return isDone;
}
public synchronized void done() { (2)
isDone = true;
}
}
1 | we have to synchronize the getter |
2 | and the setter |
Now the program works; even synchronized methods are a bad practice. |
The problem now is the overhead in the second thread. He is constantly checking, if the first thread is already finished.
14.1. Use volatile Keyword
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class DownloadStatus {
private volatile boolean isDone; (1)
private int totalBytes;
private final Object totalBytesLock = new Object();
public int getTotalBytes() {
return totalBytes;
}
public void incrementTotalBytes() {
synchronized (totalBytesLock) { (2)
totalBytes++;
}
}
public boolean isDone() { (3)
return isDone;
}
public void done() { (4)
isDone = true;
}
}
1 | isDone is declared volatile |
2 | we use a dedicated monitor object (because it is good practice) |
3 | we don’t have to synchronize the getter |
4 | and the setter |
The problem is, that the cpus are caching the value of the isDone variable.
When a cpu changes a value, the other cpu will not see ist. Even the cpu writes the value back to the memory, the other cpu will not see the new value because of its own cached value.
14.2. Thread signalling with wait() and notify()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ThreadDemo {
public static void show() {
final DownloadStatus status = new DownloadStatus();
final Thread thread1 = new Thread(new DownloadFileTask(status));
final Thread thread2 = new Thread(() -> {
while (!status.isDone()) {
synchronized (status) { (2)
try {
status.wait(); (1)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println(status.getTotalBytes());
});
thread1.start();
thread2.start();
}
}
1 | status.wait() is added (inherited from Object class) |
2 | wait() must wrapped in a synchronized block |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class DownloadFileTask implements Runnable {
private DownloadStatus status;
public DownloadFileTask(DownloadStatus status) {
this.status = status;
}
@Override
public void run() {
System.out.println("Downloading a file: " + Thread.currentThread().getName());
for (int i = 0; i < 1_000_000; i++) {
if (Thread.currentThread().isInterrupted()) {
return;
}
status.incrementTotalBytes();
}
status.done();
synchronized (status) { (2)
status.notify(); (1)
}
System.out.println("Download complete: " + Thread.currentThread().getName());
}
}
1 | thread2 is notified by the task |
2 | notify() (like wait()) has to be wrapped in a synchronized block. |
This implementation is more performant |
wait() and notify() can have weird side effects. Don’t use them in new programs. |
15. Atomic Objects
With atomic classes we can perform atomic operations
15.1. The Code with a race condition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ThreadDemo {
public static void show() {
final DownloadStatus status = new DownloadStatus();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final Thread thread = new Thread(new DownloadFileTask(status));
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(status.getTotalBytes());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class DownloadFileTask implements Runnable {
private DownloadStatus status;
public DownloadFileTask(DownloadStatus status) {
this.status = status;
}
@Override
public void run() {
System.out.println("Downloading a file: " + Thread.currentThread().getName());
for (int i = 0; i < 10_000; i++) {
if (Thread.currentThread().isInterrupted()) {
return;
}
status.incrementTotalBytes();
}
System.out.println("Download complete: " + Thread.currentThread().getName());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class DownloadStatus {
private boolean isDone;
private int totalBytes;
public int getTotalBytes() {
return totalBytes;
}
public void incrementTotalBytes() {
totalBytes++;
}
public boolean isDone() {
return isDone;
}
public void done() {
isDone = true;
}
}
15.2. Introducing AtomicInteger
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class DownloadStatus {
private boolean isDone;
private AtomicInteger totalBytes = new AtomicInteger(); (1)
public int getTotalBytes() {
return totalBytes.get(); (2)
}
public void incrementTotalBytes() {
totalBytes.incrementAndGet(); // ++a (3)
// totalBytes.getAndIncrement(); // a++
}
public boolean isDone() {
return isDone;
}
public void done() {
isDone = true;
}
}
1 | change the type to AtomicInteger |
2 | add .get() |
3 | incrementAndGet() |
Now the program works again. |
The primary use of AtomicInteger is when you are in a multithreaded context and you need to perform thread safe operations on an integer without using synchronized. The assignation and retrieval on the primitive type int are already atomic but AtomicInteger comes with many operations which are not atomic on int.
16. Adders
LongAdder which are recommended instead of the Atomic classes when multiple threads update frequently and less read frequently. During high contention (→ Konkurrenzsituation), they were designed in such a way they can grow dynamically. medium.com
public class DownloadStatus {
private boolean isDone;
private int totalBytes;
public int getTotalBytes() {
return totalBytes;
}
public void incrementTotalBytes() {
totalBytes++;
}
public boolean isDone() {
return isDone;
}
public void done() {
isDone = true;
}
}
import java.util.concurrent.atomic.LongAdder;
public class DownloadStatus {
private boolean isDone;
private LongAdder totalBytes = new LongAdder(); (1)
public int getTotalBytes() {
return totalBytes.intValue(); (2)
}
public void incrementTotalBytes() {
totalBytes.increment(); (3)
}
public boolean isDone() {
return isDone;
}
public void done() {
isDone = true;
}
}
1 | Ein LongAdder-Objekt wird erstellt. |
2 | Intern wird hier für jeden Thread eine eigene Long-Variable verwendet. Beim Aufruf von .intValue() werden die Variablen der einzelnen Threads summiert. Weitere Methoden (floatValue(), doubleValue(), longValue(), shortValue(), …) |
3 | inkrementieren der Variable |
17. Synchronized Collections
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
public class ThreadDemo {
public static void show() {
Collection<Integer> collection = new ArrayList<>();
Thread thread1 = new Thread(() -> {
collection.addAll(Arrays.asList(1, 2, 3));
});
Thread thread2 = new Thread(() -> {
collection.addAll(Arrays.asList(4, 5, 6));
});
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(collection);
}
}
[4, 5, 6]
race condition - two threads are competing for changing a single resource.
public class ThreadDemo {
public static void show() {
Collection<Integer> collection = Collections.synchronizedCollection(new ArrayList<>()); (1)
Thread thread1 = new Thread(() -> {
collection.addAll(Arrays.asList(1, 2, 3));
});
Thread thread2 = new Thread(() -> {
collection.addAll(Arrays.asList(4, 5, 6));
});
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(collection);
}
}
1 | wrap the ArrayList in a synchronized Collection. |
The synchronizedCollection is using locks.
[1, 2, 3, 4, 5, 6]
18. Concurrent Collections
Concurrent collections are faster than synchronized collections, which are using locks.
import java.util.HashMap;
public class ThreadDemo {
public static void show() {
Map<Integer, String> map = new HashMap<>();
map.put(1, "HTL Leonding");
map.get(1);
map.remove(1);
}
}
import java.util.concurrent.ConcurrentHashMap;
public class ThreadDemo {
public static void show() {
Map<Integer, String> map = new ConcurrentHashMap<>(); (1)
map.put(1, "HTL Leonding");
map.get(1);
map.remove(1);
}
}
1 | Only the implementation is changed |