Das "Fork-Join-Framework"

Fork Join Klassen

Die Klasse ForkJoinPool

Die Klasse java.util.concurrent.ForkJoinPool wurde im JDK 7 eingeführt.

In diser Klasse werden eine Menge von Threads verwaltet und zur Ausführung von Task (Aufgaben) verwendet. Dies macht es dem Betriebsystem leichter die Threads auf die Prozessoren zu verteilen, da nicht beliebig viele Threads von Java erzeugt werden. Die Klasse ForkJoinPool verwaltet die Zuordnung der Task auf die konfigurierten (Software)threads.

Tasks können auf 3 Arten dem Framework übergeben werden:

  Aufrufe von ausserhalb Aufruf innerhalb einer Berechnung
asynchrone Ausführung execute(ForkJoinTask) ForkJoinTask.fork()
warte auf Ergebnis invoke(ForkJoinTask) ForkJoinTask.invoke()
asynchrone Ausführung mit Ergebnis submit(ForkJoinTask)
submit(Callable)
ForkJoinTask.fork(ForkJoinTasks sind Futures)

Die Implementierung wird eine RejectedExecutionException werden falls der Threadpool runtergefahren wird oder falls es keine Threads mehr in der VM gibt. 

Interessante Konstruktoren:

Paralleles Abarbeiten ohne Rückgabewerte

Ein Beispiel hierfür ist der Quicksort. Man kann parallel das linke und das rechte Teilintervall sortieren nachdem die Vorarbeit (seriell) erledigt wurde.

Hierzu nutzt man die abstrakte Klasse RecursiveAction die aus der abstrakten Klasse ForkJoinTask spezialisiert wird. Die Klasse ForkJoinPool kann Tasks die die Bedingungen der Klasse ForkJoinTask erfüllen, parallel ausführen. Hierzu gibt es in der Klasse die Methoden:

Es verbleibt das Problem, dass man einen bestimmten Code ausführen möchte. Hierzu dient die abstrakte Klasse RecursiveAction. Sie besitzt eine abstrakte Methode:

  • compute(): führt den Code aus. Diese Methode ist abstrakt!

Wie implementiert man so etwas?

  1. Man implementiert den Algorithmus in einer Unterklasse von RecursiveAction.
    1. Alle Eingabeparameter werden als Attribute im Objekt gespeichert
    2. Man implementiert einen Konstruktor der die Parameter entgegennimmt und abspeichert
    3. Der Algorithmus wird in compute() implementiert
  2. Man bettet seine Klasse als innere Klasse ein
    1. Man hat dann Zugriff auf alle Attribute der äusseren Klasse!
  3. Man legt sich einen Threadpool an
    1. Wahrscheinlich als globales, statisches Objekt
  4. Man startet seinen Algorithmus aus der äusseren Klasse
    1. Anlegen eines Objekts mit den Parametern
    2. Aufruf von invoke() im Treadpool

 Hilfe! Ich muß das am Beispiel sehen...

Paralleles Abarbeiten mit Rückgabewerte

Die Aufgabe parallelisieren und als Aufgabe starten reicht oft nicht. Oft müssen die Ergebnisse entgegen genommen werden und dann rekursiv zu neuen Ergebnissen aggregiert werden.

Hierzu benötigt man einen Mechanismus der

  • die Eingabeparameter für jede Aufgabe (Task) entgegennimmt
  • die Aufgaben (Tasks) ausführt
  • und die Ergebnisse erst zurückgibt wenn die Ergebnisse vorliegen

Die Schnittstelle Callable

Die Schnittstelle java.util.concurrent.Callable muss implementiert werden um die Eingabeparameter einer Aufgabe (Task) entgegenzunehmen. Hierfür muss man

  • einen generischen Typen wählen der das Ergebnis zur Verfügung stellt
  • einen Konstruktor implementieren der alle Eingabeparameter entgegen nimmt.
  • alle Parameter als Attribute implementiert
  • die Methode V call() die das Ergebnis zurückliefert

Ein Beispiel auf der Programmierübung Ariadnefaden

public class SearchCallable implements Callable<List<Position>> {
Position von;
Position nach;
Ariadne4Parallel a; /** * * @param a Instanz von Ariadne * @param von Start * @param nach Ziel */ SearchCallable(Ariadne4Parallel a,Position von, Position nach) { this.a =a; this.von = von; this.nach = nach; } /** * Führe Task in eigenem Thread aus und nutze Instanzvariablen * als Parameter um Aufgabe auszuführen. * @throws java.lang.Exception */ @Override public List<Position> call() throws Exception { return a.suche(von, nach); } }

In diesem Beispiel ist das Ergebnis eine Liste von Positionen die den Web aus dem Labyrint weisen.

Die Eingabeparameter sind ein Labyrinth, die Startposition und die Position mit dem Ausgang ais dem Labyrinth.

Die Schnittstelle Future

Die Schnittstelle Future erlaubt es das Ergebnis einer Aufgabe zu analysieren. Die Schnittstelle ist generisch und erlaubt es daher nur einen bestimmten Type einer Klasse auszulesen.

Die beiden wichtigsten Methoden der Schnittstelle sind:

  • V get(): wartet falls notwendig auf das Beenden der Aufgabe und liefert das Ergebnis ab
  • V get(long timeout, TimeUnit unit): wartet eine bestimmte Zeit auf auf ein bestimmtes Ergebnis

In der Schnittstellendefinition sind noch weitere Methoden zum Kontrollieren und Beenden von Aufgaben (Tasks) vorhanden.

Abschicken von Aufgaben (Tasks)

Die Klasse  java.util.concurrent.ForkJoinPool verfügt über eine Methode

  • public <T> ForkJoinTask<T> submit(Callable<T> task)

Diese Methode nimmt die Eingaben für die Aufgabe (Task) an solange es eine Spezialierung der Klasse Callable<T> ist. Das Ergebnis ist ein Objekt vom Typ ForkJoinTask. Die Klasse ForkJoinTask implementiert die Schnittstelle Future. Das Ergebnis der Aufgabe (Task) wird hier abgeliefert sobald die Aufgabe abgearbeitet ist.

Wie implementiert man so etwas?

  1. Man implementiert den Algorithmus in einer Unterklasse von RecursiveAction.
    1. Alle Eingabeparameter werden als Attribute in einem Objekt von Callable gespeichert
    2. Man implementiert einen Konstruktor der die Parameter entgegennimmt und abspeichert
    3. Der Algorithmus wird in compute() implementiert
      1. Der Algorithmus führt rekursive Aufrufe durch und nimmt die Ergebnisse vom Typ Future an.
  2. Man bettet seine Klasse die Callable spezialisiert als innere Klasse ein
    1. Man hat dann Zugriff auf alle Attribute der äusseren Klasse!
  3. Man legt sich einen Threadpool an
    1. Wahrscheinlich als globales, statisches Objekt
  4. Man startet seinen Algorithmus aus der äusseren Klasse
    1. Anlegen eines Objekts mit den Parametern
    2. Aufruf von invoke() im Treadpool

 Hilfe! Ich muß das am Beispiel sehen...