java.util.concurrent
Timer und TimerTask
- Timer erlaubt die Ausführung von nebenläufigen Aktionen
- TimerTask stellt den Code zur Verfügung, der ausgeführt werden soll
- Anwender muss sich nicht um Threads kümmern, das übernimmt die Klasse
Timer
, die einen Thread für die Ausführung der Tasks verwendet Timer
kommt mit sehr vielen (> 1000) Tasks gut zu recht- Tasks dürfen nicht zu lange laufen, da es sonst zu Staus kommt
Beispiel: Timer / TimerTask
public class Timebomb extends TimerTask {
public void run() {
System.out.print("Boom!");
cancel();
}
}
Timer timer = new Timer("Timebomb", true);
timer.schedule(new Timebomb(), 3000);
for (int i = 0; i < 1000; i++) {
System.out.print(i + " ");
Thread.sleep(1000);
}
timer.cancel();
0 1 2 3 Boom! 4 5
public class Countdown extends TimerTask {
private int zaehler;
public Countdown(int zaehler) { this.zaehler = zaehler; }
public void run() {
if (zaehler-- <= 0) {
System.out.print("Boom!");
cancel();
}
}
}
Timer timer = new Timer("Countdown");
timer.scheduleAtFixedRate(new Countdown(4), 0, 500);
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
Thread.sleep(1000);
}
timer.cancel();
0 1 Boom! 2 3 4
java.util.concurrent
Seit Java 5 gibt es fertige Lösungen für viele Thread-Probleme in java.util.concurrent
- Producer/Consumer mit
BlockingQueue
- Performante, threadsichere Collections
ConcurrentHashMap
CopyOnWriteArrayList
- Feingranulare Thread-Synchronisation und Steuerung
Barrier
,Latch
,Semaphore
- Ausführung paralleler Tasks
Executor
Future
Producer/Consumer mit BlockingQueue
BlockingQueue<E>
enthält bereits alle Zutaten um ein Producer/Consumer-Pattern zu implementieren
boolean offer(E e)
– Versucht ein Element hinzuzufügen, ist die Queue voll, wird false zurückgegebenboolean offer(E e, long timeout, TimeUnit unit)
– Wieoffer
, nur mit timeoutvoid put(E e)
– Versucht ein Element hinzuzufügen. Ist die Queue voll, blockiert die Methode solange bis wieder Platz in der Queue istE take()
– Versucht ein Element aus der Queue zu nehmen, ist sie leer, blockiert die MethodeE poll(long timeout, TimeUnit unit)
– Wietake
, nur mit Timeoutint remainingCapacity()
– Verbleibende Kapazität auf der Queue
Beispiel: BlockingQueue (Producer)
public class Producer implements Runnable {
BlockingQueue<String> queue;
...
public void run() {
int i = 0;
try {
while (true) {
queue.put("P" + ++i);
Thread.sleep(100);
}
}
catch (InterruptedException e) {
}
}
}
Beispiel: BlockingQueue (Consumer)
public class Consumer implements Runnable {
BlockingQueue<String> queue;
...
public void run() {
try {
while (true) {
String element = queue.take();
System.out.println(Thread.currentThread().getName()
+ ": " + element);
Thread.sleep(100);
}
}
catch (InterruptedException e) {
}
}
}
CountDownLatch
- Ein Latch erlaubt es Threads darauf warten zu lassen, dass ein bestimmter Zustand des Latches eintritt
- Ein CountDownLatch wird mit einem bestimmten Zahlenwert initialisiert und Threads können ihn herunter zählen. Wenn 0 erreicht ist, laufen all Threads weiter
void await()
– Thread blockiert bis der Zähler den Wert 0 erreicht hatboolean await(long timeout, TimeUnit unit)
– Thread blockiert bis der Zähler den Wert 0 erreicht hat oder timeout verstrichen istvoid countDown()
– Der Zähler wird um eins herunter gezählt- Der Zähler kann nicht zurückgesetzt werden
Beispiel: Threads vereinigen mit Latch
public class Join {
static CountDownLatch startLatch = new CountDownLatch(1);
static CountDownLatch endLatch = new CountDownLatch(3);
static class Runner implements Runnable {
String text;
Runner(String text) {
this.text = text;
}
public void run() {
...
}
}
}
public void run() {
try {
startLatch.await();
for (int i = 0; i < 5; i++) {
System.out.println(text);
Thread.yield();
}
System.out.println(text + " ist fertig.");
endLatch.countDown();
} catch (InterruptedException e) { // ignorieren
}
}
new Thread(new Runner("Runner 1")).start();
new Thread(new Runner("Runner 2")).start();
new Thread(new Runner("Runner 3")).start();
startLatch.countDown(); // alle laufen los
endLatch.await(); // warten bis alle fertig sind
System.out.println("Alle fertig");
Runner 3
Runner 2
Runner 1
Runner 3
Runner 2
Runner 1 ist fertig.
Runner 3
Runner 3 ist fertig.
Runner 2
Runner 2 ist fertig.
Alle fertig
Executor
- Führt Tasks in eigenen Threads aus
- Übernimmt das Pooling der Threads
- Erlaubt Tasks für spätere Zeiträume einzuplanen
Beispiel: Executor
class Task implements Runnable {
String name;
public Task(String name) { this.name = name; }
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println(name);
Thread.yield();
}
}
}
ExecutorService ex = Executors.newCachedThreadPool();
ex.execute(new Task("Task 1"));
ex.execute(new Task("Task 2"));
ScheduledExecutorService ex2 = Executors.newScheduledThreadPool(3);
ex2.schedule(new Task("Delayed 1"), 5, TimeUnit.SECONDS);
ex2.schedule(new Task("Delayed 2"), 10, TimeUnit.SECONDS);
Weggelassen sind im Beispiel die Imports:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Future
Berechnung deren Ergebnis zu einem späteren Zeitpunkt benötigt wird
boolean cancel(boolean mayInterruptIfRunning)
– Versucht die Ausführung abzubrechen bzw. zu verhindernboolean isCancelled()
– Zeigt an, ob der Task abgebrochen wurde bevor er zu Ende laufen konnteboolean isDone()
– Zeigt an, ob der Task beendet wurde (auch unterbrochen etc.)V get()
– Holt das Ergebnis ab und blockiert solange, bis das Ergebnis vorhanden ist, d. h. der Task beendet wurdeV get(long timeout, TimeUnit unit)
– Wieget()
wartet aber maximal für das angegebene Timeout
Beispiel: Callable
class RechnerCallable implements Callable<Integer> {
public Integer call() throws Exception {
return 2 + 2;
}
}
class RechnerRunnable implements Runnable {
private int result;
private Exception e;
public int getResult() { return result; }
public Exception getException() { return e; }
public void run() {
try {
result = 2+2;
}
catch (Exception e) {
this.e = e;
}
}
}
Beispiel: Future
class FutureCallable implements Callable<String> {
public String call() throws Exception {
Thread.sleep(1000); // unglaublich teure Berechnung ;-)
return "Ergebnis";
}
}
ExecutorService executor = Executors.newCachedThreadPool();
Future<String> future = executor.submit(new FutureCallable());
System.out.println("Viele interessante Dinge parallel");
try {
String ergebnis = future.get();
System.out.println(ergebnis);
}
catch (ExecutionException ex) { /* oops */ }
System.out.println("Alles fertig");
Atomics
Komplexere atomare Operationen können mit java.util.concurrent.atomic gebaut werden
- Primitive Typen:
AtomicBoolean
,AtomicInteger
,AtomicLong
- Arrays:
AtomicIntegerArray
,AtomicLongArray
- Referenzen:
AtomicReference
,AtomicReferenceArray
public class AtomicExample implements Runnable {
private final AtomicInteger counter = new AtomicInteger(1);
public void run() {
while (counter.getAndIncrement() > 0) {
System.out.printf("Wert: %d%n", counter.get());
Thread.yield();
}
System.out.println("** Gestoppt **");
}
public static void main(String[] args) throws Exception {
AtomicExample st = new AtomicExample();
Thread thread = new Thread(st);
thread.start();
Thread.sleep(100);
st.counter.set(-1);
}
}