Skip to content

Commit ebcd60b

Browse files
committed
wip
1 parent 319c76d commit ebcd60b

File tree

11 files changed

+183
-101
lines changed

11 files changed

+183
-101
lines changed

core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java

Lines changed: 69 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public abstract class AbstractSailConnection implements SailConnection {
7474
private final AbstractSail sailBase;
7575

7676
private volatile boolean txnActive;
77+
private static final VarHandle TXN_ACTIVE;
7778

7879
private volatile boolean txnPrepared;
7980

@@ -91,16 +92,6 @@ public abstract class AbstractSailConnection implements SailConnection {
9192
private boolean isOpen = true;
9293
private static final VarHandle IS_OPEN;
9394

94-
static {
95-
try {
96-
IS_OPEN = MethodHandles.lookup()
97-
.in(AbstractSailConnection.class)
98-
.findVarHandle(AbstractSailConnection.class, "isOpen", boolean.class);
99-
} catch (ReflectiveOperationException e) {
100-
throw new Error(e);
101-
}
102-
}
103-
10495
/**
10596
* Lock used to prevent concurrent calls to update methods like addStatement, clear, commit, etc. within a
10697
* transaction.
@@ -138,17 +129,20 @@ public abstract class AbstractSailConnection implements SailConnection {
138129

139130
private IsolationLevel transactionIsolationLevel;
140131

141-
// used to decide if we need to call flush()
132+
// Used to decide if we need to call flush(). Use the VarHandles below in relase/acquire mode instead.
142133
private volatile boolean statementsAdded;
143134
private volatile boolean statementsRemoved;
144135

136+
private static final VarHandle STATEMENTS_ADDED;
137+
private static final VarHandle STATEMENTS_REMOVED;
138+
145139
/*--------------*
146140
* Constructors *
147141
*--------------*/
148142

149143
public AbstractSailConnection(AbstractSail sailBase) {
150144
this.sailBase = sailBase;
151-
txnActive = false;
145+
TXN_ACTIVE.setRelease(this, false);
152146
if (debugEnabled) {
153147
activeIterationsDebug = new ConcurrentHashMap<>();
154148
} else {
@@ -177,7 +171,7 @@ protected void verifyIsOpen() throws SailException {
177171
* @throws SailException if no transaction is active.
178172
*/
179173
protected void verifyIsActive() throws SailException {
180-
if (!isActive()) {
174+
if (!((boolean) TXN_ACTIVE.getAcquire(this))) {
181175
throw new SailException("No active transaction");
182176
}
183177
}
@@ -215,7 +209,7 @@ public void begin(IsolationLevel isolationLevel) throws SailException {
215209
}
216210

217211
startTransactionInternal();
218-
txnActive = true;
212+
TXN_ACTIVE.setRelease(this, true);
219213
} finally {
220214
updateLock.unlock();
221215
}
@@ -268,15 +262,15 @@ public final void close() throws SailException {
268262
try {
269263
forceCloseActiveOperations();
270264

271-
if (txnActive) {
265+
if ((boolean) TXN_ACTIVE.getAcquire(this)) {
272266
logger.warn("Rolling back transaction due to connection close",
273267
debugEnabled ? new Throwable() : null);
274268
try {
275269
// Use internal method to avoid deadlock: the public
276270
// rollback method will try to obtain a connection lock
277271
rollbackInternal();
278272
} finally {
279-
txnActive = false;
273+
TXN_ACTIVE.setRelease(this, false);
280274
txnPrepared = false;
281275
}
282276
}
@@ -417,7 +411,7 @@ public final long size(Resource... contexts) throws SailException {
417411
}
418412

419413
protected final boolean transactionActive() {
420-
return txnActive;
414+
return (boolean) TXN_ACTIVE.getAcquire(this);
421415
}
422416

423417
/**
@@ -459,7 +453,7 @@ public final void prepare() throws SailException {
459453

460454
updateLock.lock();
461455
try {
462-
if (txnActive) {
456+
if ((boolean) TXN_ACTIVE.getAcquire(this)) {
463457
prepareInternal();
464458
txnPrepared = true;
465459
}
@@ -489,12 +483,12 @@ public final void commit() throws SailException {
489483

490484
updateLock.lock();
491485
try {
492-
if (txnActive) {
486+
if ((boolean) TXN_ACTIVE.getAcquire(this)) {
493487
if (!txnPrepared) {
494488
prepareInternal();
495489
}
496490
commitInternal();
497-
txnActive = false;
491+
TXN_ACTIVE.setRelease(this, false);
498492
txnPrepared = false;
499493
}
500494
} finally {
@@ -525,11 +519,11 @@ public final void rollback() throws SailException {
525519

526520
updateLock.lock();
527521
try {
528-
if (txnActive) {
522+
if ((boolean) TXN_ACTIVE.getAcquire(this)) {
529523
try {
530524
rollbackInternal();
531525
} finally {
532-
txnActive = false;
526+
TXN_ACTIVE.setRelease(this, false);
533527
txnPrepared = false;
534528
}
535529
} else {
@@ -553,7 +547,8 @@ public final void addStatement(Resource subj, IRI pred, Value obj, Resource... c
553547
flushPendingUpdates();
554548
}
555549
addStatement(null, subj, pred, obj, contexts);
556-
statementsAdded = true;
550+
551+
STATEMENTS_ADDED.setRelease(this, true);
557552
}
558553

559554
@Override
@@ -562,7 +557,7 @@ public final void removeStatements(Resource subj, IRI pred, Value obj, Resource.
562557
flushPendingUpdates();
563558
}
564559
removeStatement(null, subj, pred, obj, contexts);
565-
statementsRemoved = true;
560+
STATEMENTS_REMOVED.setRelease(this, true);
566561
}
567562

568563
@Override
@@ -604,7 +599,7 @@ public void addStatement(UpdateContext op, Resource subj, IRI pred, Value obj, R
604599
startUpdate(op);
605600
}
606601
}
607-
statementsAdded = true;
602+
STATEMENTS_ADDED.setRelease(this, true);
608603
}
609604

610605
/**
@@ -632,7 +627,7 @@ public void removeStatement(UpdateContext op, Resource subj, IRI pred, Value obj
632627
startUpdate(op);
633628
}
634629
}
635-
statementsRemoved = true;
630+
STATEMENTS_REMOVED.setRelease(this, true);
636631
}
637632

638633
@Override
@@ -703,7 +698,7 @@ public final void clear(Resource... contexts) throws SailException {
703698
try {
704699
verifyIsActive();
705700
clearInternal(contexts);
706-
statementsRemoved = true;
701+
STATEMENTS_REMOVED.setRelease(this, true);
707702
} finally {
708703
updateLock.unlock();
709704
}
@@ -836,19 +831,20 @@ public final void clearNamespaces() throws SailException {
836831

837832
@Override
838833
public boolean pendingRemovals() {
839-
return statementsRemoved;
834+
return (boolean) STATEMENTS_REMOVED.getAcquire(this);
835+
840836
}
841837

842838
protected boolean pendingAdds() {
843-
return statementsAdded;
839+
return (boolean) STATEMENTS_ADDED.getAcquire(this);
844840
}
845841

846842
protected void setStatementsAdded() {
847-
statementsAdded = true;
843+
STATEMENTS_ADDED.setRelease(this, true);
848844
}
849845

850846
protected void setStatementsRemoved() {
851-
statementsRemoved = true;
847+
STATEMENTS_REMOVED.setRelease(this, true);
852848
}
853849

854850
@Deprecated(forRemoval = true)
@@ -992,9 +988,10 @@ private void forceCloseActiveOperations() throws SailException {
992988
* @throws SailException
993989
*/
994990
private void flushPendingUpdates() throws SailException {
995-
if ((statementsAdded || statementsRemoved) && isActive()) {
991+
if ((pendingAdds() || pendingRemovals()) && isActive()) {
996992
if (isActive()) {
997993
synchronized (this) {
994+
// we are inside a synchornized block so there isn't much point using the VarHandle
998995
if ((statementsAdded || statementsRemoved) && isActive()) {
999996
flush();
1000997
statementsAdded = false;
@@ -1128,4 +1125,44 @@ public synchronized void release() {
11281125
}
11291126
}
11301127
}
1128+
1129+
static {
1130+
try {
1131+
IS_OPEN = MethodHandles.lookup()
1132+
.in(AbstractSailConnection.class)
1133+
.findVarHandle(AbstractSailConnection.class, "isOpen", boolean.class);
1134+
} catch (ReflectiveOperationException e) {
1135+
throw new Error(e);
1136+
}
1137+
}
1138+
1139+
static {
1140+
try {
1141+
TXN_ACTIVE = MethodHandles.lookup()
1142+
.in(AbstractSailConnection.class)
1143+
.findVarHandle(AbstractSailConnection.class, "txnActive", boolean.class);
1144+
} catch (ReflectiveOperationException e) {
1145+
throw new Error(e);
1146+
}
1147+
}
1148+
1149+
static {
1150+
try {
1151+
STATEMENTS_REMOVED = MethodHandles.lookup()
1152+
.in(AbstractSailConnection.class)
1153+
.findVarHandle(AbstractSailConnection.class, "statementsRemoved", boolean.class);
1154+
} catch (ReflectiveOperationException e) {
1155+
throw new Error(e);
1156+
}
1157+
}
1158+
1159+
static {
1160+
try {
1161+
STATEMENTS_ADDED = MethodHandles.lookup()
1162+
.in(AbstractSailConnection.class)
1163+
.findVarHandle(AbstractSailConnection.class, "statementsAdded", boolean.class);
1164+
} catch (ReflectiveOperationException e) {
1165+
throw new Error(e);
1166+
}
1167+
}
11311168
}

core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/FileIO.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class FileIO {
8282

8383
public static final int INF_QUAD_MARKER = 5;
8484

85-
public static final int URI_MARKER = 6;
85+
public static final int IRI_MARKER = 6;
8686

8787
public static final int BNODE_MARKER = 7;
8888

@@ -270,7 +270,7 @@ private void readStatement(boolean hasContext, boolean isExplicit, DataInputStre
270270

271271
private void writeValue(Value value, DataOutputStream dataOut) throws IOException {
272272
if (value.isIRI()) {
273-
dataOut.writeByte(URI_MARKER);
273+
dataOut.writeByte(IRI_MARKER);
274274
writeString(((IRI) value).stringValue(), dataOut);
275275
} else if (value.isBNode()) {
276276
dataOut.writeByte(BNODE_MARKER);
@@ -301,9 +301,9 @@ private void writeValue(Value value, DataOutputStream dataOut) throws IOExceptio
301301
private Value readValue(DataInputStream dataIn) throws IOException, ClassCastException {
302302
int valueTypeMarker = dataIn.readByte();
303303

304-
if (valueTypeMarker == URI_MARKER) {
305-
String uriString = readString(dataIn);
306-
return vf.createIRI(uriString);
304+
if (valueTypeMarker == IRI_MARKER) {
305+
String iriString = readString(dataIn);
306+
return vf.createIRI(iriString);
307307
} else if (valueTypeMarker == BNODE_MARKER) {
308308
String bnodeID = readString(dataIn);
309309
return vf.createBNode(bnodeID);

core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemEvaluationStatistics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private int minStatementCount(Value subj, Value pred, Value obj, Value context)
9898
}
9999

100100
if (pred != null) {
101-
MemIRI memPred = valueFactory.getMemURI((IRI) pred);
101+
MemIRI memPred = valueFactory.getMemIRI((IRI) pred);
102102
if (memPred != null) {
103103
minListSizes = Math.min(minListSizes, memPred.getPredicateStatementCount());
104104
if (minListSizes == 0) {

core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemorySailStore.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ private CloseableIteration<MemStatement, SailException> createStatementIterator(
213213
return EMPTY_ITERATION;
214214
}
215215

216-
MemIRI memPred = valueFactory.getMemURI(pred);
216+
MemIRI memPred = valueFactory.getMemIRI(pred);
217217
if (pred != null && memPred == null) {
218218
// non-existent predicate
219219
return EMPTY_ITERATION;
@@ -345,7 +345,7 @@ private CloseableIteration<MemTriple, SailException> createTripleIterator(Resour
345345
return EMPTY_TRIPLE_ITERATION;
346346
}
347347

348-
MemIRI memPred = valueFactory.getMemURI(pred);
348+
MemIRI memPred = valueFactory.getMemIRI(pred);
349349
if (pred != null && memPred == null) {
350350
// non-existent predicate
351351
return EMPTY_TRIPLE_ITERATION;
@@ -592,7 +592,7 @@ public String toString() {
592592
} else {
593593
sb.append("inferred ");
594594
}
595-
if (txnLock) {
595+
if ((boolean) TXN_LOCK.getAcquire(this)) {
596596
sb.append("snapshot ").append(nextSnapshot);
597597
} else {
598598
sb.append(super.toString());
@@ -635,7 +635,7 @@ public synchronized void prepare() throws SailException {
635635

636636
@Override
637637
public synchronized void flush() throws SailException {
638-
if (txnLock) {
638+
if ((boolean) TXN_LOCK.getAcquire(this)) {
639639
invalidateCache();
640640
currentSnapshot = Math.max(currentSnapshot, nextSnapshot);
641641
if (requireCleanup) {
@@ -653,8 +653,8 @@ public void close() {
653653
reservedSnapshot.release();
654654
}
655655
} finally {
656-
boolean toCloseTxnLock = txnLock;
657-
txnLock = false;
656+
boolean toCloseTxnLock = (boolean) TXN_LOCK.getAcquire(this);
657+
TXN_LOCK.setRelease(this, false);
658658
if (toCloseTxnLock) {
659659
txnLockManager.unlock();
660660
}
@@ -792,7 +792,7 @@ private void innerDeprecate(Statement statement, int nextSnapshot) {
792792
}
793793

794794
private void acquireExclusiveTransactionLock() throws SailException {
795-
if (!txnLock) {
795+
if (!(boolean) TXN_LOCK.getAcquire(this)) {
796796
synchronized (this) {
797797
if (!txnLock) {
798798
txnLockManager.lock();
@@ -812,7 +812,7 @@ private MemStatement addStatement(Resource subj, IRI pred, Value obj, Resource c
812812

813813
// Get or create MemValues for the operands
814814
MemResource memSubj = valueFactory.getOrCreateMemResource(subj);
815-
MemIRI memPred = valueFactory.getOrCreateMemURI(pred);
815+
MemIRI memPred = valueFactory.getOrCreateMemIRI(pred);
816816
MemValue memObj = valueFactory.getOrCreateMemValue(obj);
817817
MemResource memContext = context == null ? null : valueFactory.getOrCreateMemResource(context);
818818

@@ -1118,7 +1118,7 @@ public ReservedSnapshot reserve(int snapshot, Object reservedBy) {
11181118
}
11191119
}
11201120

1121-
LongAdder longAdder = activeSnapshots.computeIfAbsent(snapshot, (k) -> new LongAdder());
1121+
LongAdder longAdder = activeSnapshots.computeIfAbsent(snapshot, k -> new LongAdder());
11221122
longAdder.increment();
11231123

11241124
return new ReservedSnapshot(snapshot, reservedBy, debug, longAdder, activeSnapshots,
@@ -1162,7 +1162,7 @@ public ReservedSnapshot(int snapshot, Object reservedBy, boolean debug,
11621162
this.frequency = frequency;
11631163
this.highestEverReservedSnapshot = highestEverReservedSnapshot;
11641164
cleanable = cleaner.register(reservedBy, () -> {
1165-
int tempSnapshot = ((int) SNAPSHOT.getVolatile(this));
1165+
int tempSnapshot = (int) SNAPSHOT.getVolatile(this);
11661166
if (tempSnapshot != SNAPSHOT_RELEASED) {
11671167
String message = "Releasing MemorySailStore snapshot {} which was reserved and never released (possibly unclosed MemorySailDataset or MemorySailSink).";
11681168
if (stackTraceForDebugging != null) {
@@ -1201,4 +1201,17 @@ public void release() {
12011201

12021202
}
12031203
}
1204+
1205+
private static final VarHandle TXN_LOCK;
1206+
1207+
static {
1208+
try {
1209+
TXN_LOCK = MethodHandles.lookup()
1210+
.in(MemorySailSink.class)
1211+
.findVarHandle(MemorySailSink.class, "txnLock", boolean.class);
1212+
} catch (ReflectiveOperationException e) {
1213+
throw new Error(e);
1214+
}
1215+
}
1216+
12041217
}

0 commit comments

Comments
 (0)