Parallele Programming mit Lockset: Eine Lockset ist eine Mutex für mehrere Objekte, bei der die Sperrung der Ressource sich auf die Identität des Objektes selber bezieht.

Eine einfache Mutex sperrt üblicherweise eine einzelne Ressource für mehrere Konsumenten, so dass stets nur ein Konsument die Ressource verwenden kann. Klassisches Beispiel aus der realen Welt ist ein Drucker (nur einer kann gleichzeitig drucken) und in der Programmierung das Java Schlüsselwort synchronized, das nur einem Thread erlaubt, den synchronisierten Codebereich zu betreten.

Usecase

In meinem Anwendungsfall ging es darum, Objekte neu zu Erstellen, wenn sie noch nicht existierten. Das kommt bei Singleton-Klassen häufig vor, wobei es hier auch nur um eine Instanz einer Klasse geht.
In meinem Beispiel ging es um das Erstellen eines Objektes in der Datenbank, wenn dieses noch nicht existiert:

public MyObject createAndSaveIfNotExists(String naturalId) {
    MyObject myObject = null;
    if (naturalId != null){
        myObject = getByNaturalId(naturalId);
        if (myObject == null){
          // paralleles erzeugen verhindern
          myObject = create(naturalId);
        }
    }
    return myObject;
}

Thread-Unsicher

Dieser Code ist nicht Thread sicher, denn laufen zwei Prozesse parallel und führen if (myObject == null) mit true als Ergebnis aus, wird das Erzeugen des Objektes doppelt ausgeführt.

Die einfachste Lösung hier ist es, die Methode als synchronized zu deklarieren:

public synchronized MyObject createAndSaveIfNotExists(String naturalId) {
    MyObject myObject = null;
    if (naturalId != null){
        myObject = getByNaturalId(naturalId);
        if (myObject == null){
          // paralleles erzeugen verhindern
          myObject = create(naturalId);
        }
    }
    return myObject;
}

Nun ist sichergestellt, dass sich stets genau ein Prozess an der Stelle der Prüfung befindet. Für den Zeitraum der Akquise, der Prüfung und dem Erzeugen des Objektes sind alle anderen Zugriffe geblockt.

Zeit für Verbesserungen

Diese Sperrung wirkt sich also auf die Erzeugung aller Objekte aus, obwohl theoretisch die Prüfung und Erzeugung des Objektes mit der naturalId = "abc" unabhängig mit dem für naturalId = "xyz" ist. Es soll ja nur verhindert werden, dass es nachher zwei Objekte mit der Natural-ID "abc" gibt.

Die Lösung lässt sich verbessern, in dem ein eigener Mutex pro Natural-ID verwendet wird. So kann jede Natural-ID unabhängig von allen anderen gesperrt oder wieder frei gegeben werden.

Hierzu bediene ich mich einer Set als Collection, welche die gesperrten Schlüssel (in diesem Fall die Natural-IDs) aufbewahrt.

Nun muss zuerst sicher gestellt werden, dass die Zugriffe auf die Set selber atomar / synchronisiert sind.
Der findige Leser wird bemerkt haben, dass wir nun die Synchronisation einer Methode (createAndSaveIfNotExists) durch die Synchronisation einer anderen (der Set) austauschen. Warum also die zusätzliche Arbeit?

Die Set ermöglicht uns zwei Optimierungen zugleich:

Zuerst reduziert sie den Ausführungszeitraum, für den alle Objekte geblockt werden. Während in der obigen Lösung für die Zeitdauer der Akquise (getByNaturalId, evtl. Zugriff auf Datenbank, Netzwerk oder Festplatte), der Prüfung (ist das Resultat der Akquise erfolgreich?) und dem Erstellen selber (create) gesperrt wird, reduziert sich die Blockierung bei der Lockset auf ein contains, welches selber in einer HashSet sehr effizient gelöst ist.

Zweitens wird für die Dauer der Akquise, Prüfung und Erstellung des Objektes der gleiche Ablauf nur für Objekte mit gleicher Natural-ID blockiert. Mehrere Prozesse, die Objekte mit verschiedenen Natural-IDs abfragen, können parallel und zeitgleich abgearbeitet werden.

public class LockSet<T> {
    private final Set<T> lockedObjects = new HashSet<>();
 
    /**
     * Liefert {@code true}, wenn das Objekt noch nicht gesperrt ist und nun gesperrt wurde,
     * {@code false} wenn das Objekt bereits gesperrt ist.
     *
     * @param t
     * @return
     */
    public synchronized boolean lock(T t) {
        boolean locked = false;
        if (!lockedObjects.contains(t)) {
            lockedObjects.add(t);
            locked = true;
        }
        return locked;
    }
 
    /**
     * Entfernt die Sperre.<br/>
     * Diese Methode sollte <i>unbedingt</i> in einem {@code finally}-Block
     * nach Einholen einer Sperre aufgerufen werden.<br/>
     * Sollte die Sperre nicht vorhanden sein, wird nichts gemacht.
     *
     * @param t
     */
    public synchronized void unlock(T t) {
        if (t != null) {
            lockedObjects.remove(t);
        }
    }
 
    /**
     * Wartet so lange, bis die Sperre frei wird. Es wird
     * nach jedem Sperrversuch die angegebene Zeit gewartet.
     *
     * @param t
     * @param pollIntervallInMillis
     */
    public void waitAndLock(T t, long pollIntervallInMillis) {
        if (t != null) {
            while (!lock(t)) {
                try {
                    Thread.sleep(pollIntervallInMillis);
                } catch (InterruptedException e) {
                    // what to do?
                }
            }
        }
    }
}

Die Methode waitAndLock führt ein aktives Warten aus, um im Fall der Sperrung so lange zu warten, bis diese frei wird. Man kann das ganze auch mit passivem wait@/@notify machen, was eleganter ist:

public class LockSet<T> {
    private final HashSet<T> lockedObjects = new HashSet<T>();
 
    @Override
    public synchronized boolean lock(T t) {
        boolean locked = false;
        if (!lockedObjects.contains(t)) {
            lockedObjects.add(t);
            locked = true;
        }
        return locked;
    }
 
    @Override
    public synchronized void unlock(T t) {
        if (t != null) {
            lockedObjects.remove(t);
            notifyAll();
        }
    }
 
    @Override
    public synchronized void waitAndLock(T t) {
        if (t != null) {
            while (!lock(t)) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    // what to do?
                }
            }
        }
    }
}

Wichtig ist hier das notify (sobald eine Lock freigegeben wurde, damit die Wartenden erneut eine Prüfung machen können) und das wait (wenn die Sperre auf die Natural-ID gerade aktiv ist).

Die Verwendung ändert sich wie folgt:

private LockSet<String> naturalIdLocks = new LockSet<>();
public MyObject createAndSaveIfNotExists(String naturalId) {
    MyObject myObject = null;
    if (naturalId != null){
        naturalIdLocks.waitAndLock(naturalId, 50L);
        try{
            myObject = getByNaturalId(naturalId);
            if (myObject == null){
                // paralleles erzeugen verhindern
                myObject = create(naturalId);
            }
        }
        finally {
            naturalIdLocks.unlock(naturalId);
        }
    }
    return myObject;
}

Wichtig: Die Lock muss mit einem finally Block immer entsperrt werden. Wird dies nicht sichergestellt, können Objekte bis zum Neustart gesperrt bleiben.

Noch besser?

Im gleichen Rahmen habe ich ein paar weitere Optimierungen versucht. In einer Variante setze ich statt dem synchronized der Methoden in der Lockset eine ReentrantReadWriteLock ein, damit parallele Lesezugriffe auf die Sperren der Lockset möglich sind. Der Code wird dadurch sehr viel komplexer:

public class ReentrantReadWriteLockSet<T> {
    private final Set<T> lockedObjects = new HashSet<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
    @Override
    public boolean lock(T t) {
        boolean locked = false;
        if (t != null) {
            lock.readLock().lock();
            try {
                if (!lockedObjects.contains(t)) {
                    lock.readLock().unlock();
                    lock.writeLock().lock();
                    try {
                        if (!lockedObjects.contains(t)) {
                            lockedObjects.add(t);
                            locked = true;
                        }
                    } finally {
                        lock.readLock().lock();
                        lock.writeLock().unlock();
                    }
                }
            } finally {
                lock.readLock().unlock();
            }
        }
        return locked;
    }
 
    @Override
    public void unlock(T t) {
        if (t != null) {
            lock.writeLock().lock();
            try {
                lockedObjects.remove(t);
            } finally {
                lock.writeLock().unlock();
            }
        }
    }
 
    @Override
    public void waitAndLock(T t, long pollIntervallInMillis) {
        ...
    }
}

Benchmark

Ergebnisse eines Microbenchmarks zeigen, dass die Effektivität der verschiedenen Ansätze erst bei großen Datenmengen, vielen Kollisionen auf der Natural-ID und langer Dauer der Resourcenbelegung beim Abfragen / Erstellen ersichtlich wird.

Threads=16
Objects per thread=1000000
Duration on get=10 ms
Duration on create=2 ms
Natural-ID poolsize=1000

LockSetCreator{lockSet=SynchronizedLockSet}=183
LockSetCreator{lockSet=ReentrantReadWriteLockSet}=168
LockSetCreator{lockSet=WaitNotifyAllSynchronizedLockSet}=177

Threads=16
Objects per thread=1000000
Duration on get=10 ms
Duration on create=2 ms
Natural-ID poolsize=100

Stats:
SynchronizedCreator{}=179
LockSetCreator{lockSet=SynchronizedLockSet}=187
LockSetCreator{lockSet=ReentrantReadWriteLockSet}=157
LockSetCreator{lockSet=WaitNotifyAllSynchronizedLockSet}=157

Threads=16
Objects per thread=1000000
Duration on get=30 ms
Duration on create=2 ms
Natural-ID poolsize=100

Stats:
SynchronizedCreator{}=182
LockSetCreator{lockSet=SynchronizedLockSet}=171
LockSetCreator{lockSet=ReentrantReadWriteLockSet}=164
LockSetCreator{lockSet=WaitNotifyAllSynchronizedLockSet}=160

Referenzen