Skip to content

Commit 2779ef3

Browse files
committed
various attempts at fixing issue
1 parent 8355988 commit 2779ef3

File tree

4 files changed

+105
-51
lines changed

4 files changed

+105
-51
lines changed

core/sail/api/src/main/java/org/eclipse/rdf4j/common/concurrent/locks/ExclusiveReentrantLockManager.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class ExclusiveReentrantLockManager {
3434

3535
private final int waitToCollect;
3636

37-
LockMonitoring<ExclusiveReentrantLock> lockMonitoring;
37+
LockMonitoring<Lock> lockMonitoring;
3838

3939
public ExclusiveReentrantLockManager() {
4040
this(false);
@@ -50,18 +50,18 @@ public ExclusiveReentrantLockManager(boolean trackLocks, int collectionFrequency
5050

5151
if (trackLocks || Properties.lockTrackingEnabled()) {
5252

53-
lockMonitoring = new LockTracking(
53+
lockMonitoring = new LockTracking<>(
5454
true,
55-
"ExclusiveReentrantLockManager",
55+
"ExclusiveReentrantLockManager-Tracking",
5656
LoggerFactory.getLogger(this.getClass()),
5757
waitToCollect,
5858
Lock.ExtendedSupplier.wrap(this::getExclusiveLockInner, this::tryExclusiveLockInner)
5959
);
6060

6161
} else {
62-
lockMonitoring = new LockCleaner(
62+
lockMonitoring = new LockCleaner<>(
6363
false,
64-
"ExclusiveReentrantLockManager",
64+
"ExclusiveReentrantLockManager-Cleaner",
6565
LoggerFactory.getLogger(this.getClass()),
6666
Lock.ExtendedSupplier.wrap(this::getExclusiveLockInner, this::tryExclusiveLockInner)
6767
);
@@ -87,6 +87,8 @@ private Lock tryExclusiveLockInner() {
8787

8888
}
8989

90+
private final AtomicLong ownerIsDead = new AtomicLong();
91+
9092
private Lock getExclusiveLockInner() throws InterruptedException {
9193

9294
synchronized (owner) {
@@ -100,6 +102,14 @@ private Lock getExclusiveLockInner() throws InterruptedException {
100102
if (lock != null) {
101103
return lock;
102104
} else {
105+
if (!owner.get().isAlive()) {
106+
long l = ownerIsDead.incrementAndGet();
107+
if (l > 10) {
108+
ownerIsDead.set(0);
109+
continue;
110+
}
111+
112+
}
103113
lockMonitoring.runCleanup();
104114
owner.wait(waitToCollect);
105115
}
@@ -113,6 +123,12 @@ private Lock getExclusiveLockInner() throws InterruptedException {
113123
if (lock != null) {
114124
return lock;
115125
} else {
126+
if (!owner.get().isAlive()) {
127+
System.out.println("Owner thread is dead");
128+
// activeLocks.set(0);
129+
// owner.set(null);
130+
// continue;
131+
}
116132
owner.wait(waitToCollect);
117133
}
118134
}

core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.List;
1919
import java.util.Map;
2020
import java.util.Objects;
21+
import java.util.WeakHashMap;
2122
import java.util.concurrent.locks.ReentrantReadWriteLock;
2223

2324
import org.eclipse.rdf4j.common.transaction.IsolationLevel;
@@ -119,6 +120,7 @@ protected static boolean debugEnabled() {
119120
* debugging was disable at the time the connection was acquired.
120121
*/
121122
private final Map<SailConnection, Throwable> activeConnections = new IdentityHashMap<>();
123+
// private final Map<SailConnection, Throwable> activeConnections = new WeakHashMap<>();
122124

123125
/*
124126
* constructors

core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import java.util.concurrent.atomic.AtomicReference;
2323
import java.util.concurrent.atomic.LongAdder;
2424
import java.util.concurrent.locks.LockSupport;
25-
import java.util.concurrent.locks.ReentrantLock;
2625

2726
import org.eclipse.rdf4j.common.annotation.InternalUseOnly;
27+
import org.eclipse.rdf4j.common.concurrent.locks.ExclusiveReentrantLockManager;
2828
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
2929
import org.eclipse.rdf4j.common.concurrent.locks.diagnostics.ConcurrentCleaner;
3030
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
@@ -98,17 +98,16 @@ public abstract class AbstractSailConnection implements SailConnection {
9898
private final AtomicReference<Thread> activeThread = new AtomicReference<>();
9999

100100
@SuppressWarnings("FieldMayBeFinal")
101-
private boolean isOpen = true;
101+
private volatile boolean isOpen = true;
102102
private static final VarHandle IS_OPEN;
103103

104104
private Thread owner;
105105

106106
/**
107107
* Lock used to prevent concurrent calls to update methods like addStatement, clear, commit, etc. within a
108108
* transaction.
109-
*
110109
*/
111-
private final ReentrantLock updateLock = new ReentrantLock();
110+
private final ExclusiveReentrantLockManager updateLock = new ExclusiveReentrantLockManager();
112111
private final LongAdder iterationsOpened = new LongAdder();
113112
private final LongAdder iterationsClosed = new LongAdder();
114113

@@ -200,8 +199,7 @@ public void begin(IsolationLevel isolationLevel) throws SailException {
200199
activeThread.setRelease(Thread.currentThread());
201200

202201
verifyIsOpen();
203-
204-
updateLock.lock();
202+
Lock exclusiveLock = updateLock.getExclusiveLock();
205203
try {
206204
if (isActive()) {
207205
throw new SailException("a transaction is already active on this connection.");
@@ -210,8 +208,11 @@ public void begin(IsolationLevel isolationLevel) throws SailException {
210208
startTransactionInternal();
211209
txnActive = true;
212210
} finally {
213-
updateLock.unlock();
211+
exclusiveLock.release();
214212
}
213+
} catch (InterruptedException e) {
214+
Thread.currentThread().interrupt();
215+
throw new SailException(e);
215216
} finally {
216217
try {
217218
activeThread.setRelease(null);
@@ -505,15 +506,19 @@ public final void prepare() throws SailException {
505506
activeThread.setRelease(Thread.currentThread());
506507
verifyIsOpen();
507508

508-
updateLock.lock();
509+
Lock exclusiveLock = updateLock.getExclusiveLock();
510+
509511
try {
510512
if (txnActive) {
511513
prepareInternal();
512514
txnPrepared = true;
513515
}
514516
} finally {
515-
updateLock.unlock();
517+
exclusiveLock.release();
516518
}
519+
} catch (InterruptedException e) {
520+
Thread.currentThread().interrupt();
521+
throw new SailException(e);
517522
} finally {
518523
try {
519524
activeThread.setRelease(null);
@@ -535,7 +540,8 @@ public final void commit() throws SailException {
535540

536541
verifyIsOpen();
537542

538-
updateLock.lock();
543+
Lock exclusiveLock = updateLock.getExclusiveLock();
544+
539545
try {
540546
if (txnActive) {
541547
if (!txnPrepared) {
@@ -546,8 +552,11 @@ public final void commit() throws SailException {
546552
txnPrepared = false;
547553
}
548554
} finally {
549-
updateLock.unlock();
555+
exclusiveLock.release();
550556
}
557+
} catch (InterruptedException e) {
558+
Thread.currentThread().interrupt();
559+
throw new SailException(e);
551560
} finally {
552561
try {
553562
activeThread.setRelease(null);
@@ -572,7 +581,8 @@ public final void rollback() throws SailException {
572581

573582
verifyIsOpen();
574583

575-
updateLock.lock();
584+
Lock exclusiveLock = updateLock.getExclusiveLock();
585+
576586
try {
577587
if (txnActive) {
578588
try {
@@ -586,8 +596,11 @@ public final void rollback() throws SailException {
586596
debugEnabled ? new Throwable() : null);
587597
}
588598
} finally {
589-
updateLock.unlock();
599+
exclusiveLock.release();
590600
}
601+
} catch (InterruptedException e) {
602+
Thread.currentThread().interrupt();
603+
throw new SailException(e);
591604
} finally {
592605
try {
593606
activeThread.setRelease(null);
@@ -694,13 +707,17 @@ public final void endUpdate(UpdateContext op) throws SailException {
694707

695708
verifyIsOpen();
696709

697-
updateLock.lock();
710+
Lock exclusiveLock = updateLock.getExclusiveLock();
711+
698712
try {
699713
verifyIsActive();
700714
endUpdateInternal(op);
701715
} finally {
702-
updateLock.unlock();
716+
exclusiveLock.release();
703717
}
718+
} catch (InterruptedException e) {
719+
Thread.currentThread().interrupt();
720+
throw new SailException(e);
704721
} finally {
705722
try {
706723
activeThread.setRelease(null);
@@ -750,14 +767,18 @@ public final void clear(Resource... contexts) throws SailException {
750767

751768
verifyIsOpen();
752769

753-
updateLock.lock();
770+
Lock exclusiveLock = updateLock.getExclusiveLock();
771+
754772
try {
755773
verifyIsActive();
756774
clearInternal(contexts);
757775
statementsRemoved = true;
758776
} finally {
759-
updateLock.unlock();
777+
exclusiveLock.release();
760778
}
779+
} catch (InterruptedException e) {
780+
Thread.currentThread().interrupt();
781+
throw new SailException(e);
761782
} finally {
762783
try {
763784
activeThread.setRelease(null);
@@ -820,13 +841,17 @@ public final void setNamespace(String prefix, String name) throws SailException
820841

821842
verifyIsOpen();
822843

823-
updateLock.lock();
844+
Lock exclusiveLock = updateLock.getExclusiveLock();
845+
824846
try {
825847
verifyIsActive();
826848
setNamespaceInternal(prefix, name);
827849
} finally {
828-
updateLock.unlock();
850+
exclusiveLock.release();
829851
}
852+
} catch (InterruptedException e) {
853+
Thread.currentThread().interrupt();
854+
throw new SailException(e);
830855
} finally {
831856
try {
832857
activeThread.setRelease(null);
@@ -848,13 +873,17 @@ public final void removeNamespace(String prefix) throws SailException {
848873

849874
verifyIsOpen();
850875

851-
updateLock.lock();
876+
Lock exclusiveLock = updateLock.getExclusiveLock();
877+
852878
try {
853879
verifyIsActive();
854880
removeNamespaceInternal(prefix);
855881
} finally {
856-
updateLock.unlock();
882+
exclusiveLock.release();
857883
}
884+
} catch (InterruptedException e) {
885+
Thread.currentThread().interrupt();
886+
throw new SailException(e);
858887
} finally {
859888
try {
860889
activeThread.setRelease(null);
@@ -873,13 +902,17 @@ public final void clearNamespaces() throws SailException {
873902

874903
verifyIsOpen();
875904

876-
updateLock.lock();
905+
Lock exclusiveLock = updateLock.getExclusiveLock();
906+
877907
try {
878908
verifyIsActive();
879909
clearNamespacesInternal();
880910
} finally {
881-
updateLock.unlock();
911+
exclusiveLock.release();
882912
}
913+
} catch (InterruptedException e) {
914+
Thread.currentThread().interrupt();
915+
throw new SailException(e);
883916
} finally {
884917
try {
885918
activeThread.setRelease(null);

core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ShaclSailConnection.java

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -735,56 +735,59 @@ synchronized public void close() throws SailException {
735735
if (closed) {
736736
return;
737737
}
738-
739-
if (getWrappedConnection() instanceof AbstractSailConnection) {
740-
AbstractSailConnection abstractSailConnection = (AbstractSailConnection) getWrappedConnection();
741-
742-
abstractSailConnection.waitForOtherOperations(true);
743-
}
744-
745738
try {
746-
if (isActive()) {
747-
rollback();
739+
if (getWrappedConnection() instanceof AbstractSailConnection) {
740+
AbstractSailConnection abstractSailConnection = (AbstractSailConnection) getWrappedConnection();
741+
742+
abstractSailConnection.waitForOtherOperations(true);
748743
}
749744
} finally {
750-
try {
751-
shapesRepoConnection.close();
752745

746+
try {
747+
if (isActive()) {
748+
rollback();
749+
}
753750
} finally {
754751
try {
755-
if (previousStateConnection != null) {
756-
previousStateConnection.close();
757-
}
752+
shapesRepoConnection.close();
758753

759754
} finally {
760755
try {
761-
if (serializableConnection != null) {
762-
serializableConnection.close();
756+
if (previousStateConnection != null) {
757+
previousStateConnection.close();
763758
}
764-
} finally {
765759

760+
} finally {
766761
try {
767-
super.close();
762+
if (serializableConnection != null) {
763+
serializableConnection.close();
764+
}
768765
} finally {
766+
769767
try {
770-
sail.closeConnection();
768+
super.close();
771769
} finally {
772770
try {
773-
cleanupShapesReadWriteLock();
771+
sail.closeConnection();
774772
} finally {
775773
try {
776-
cleanupReadWriteLock();
774+
cleanupShapesReadWriteLock();
777775
} finally {
778-
closed = true;
779-
}
776+
try {
777+
cleanupReadWriteLock();
778+
} finally {
779+
closed = true;
780+
}
780781

782+
}
781783
}
782784
}
783785
}
784786
}
785787
}
786788
}
787789
}
790+
788791
}
789792

790793
@Override

0 commit comments

Comments
 (0)