Skip to content

Commit

Permalink
WIP: Transaction ops in translog
Browse files Browse the repository at this point in the history
Some very basic writing to the translog, I'm
not sure everything is needed yet.
  • Loading branch information
Nikola Grcevski committed Nov 17, 2021
1 parent 98f9902 commit 4d5d4b6
Show file tree
Hide file tree
Showing 6 changed files with 383 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -181,20 +182,32 @@ public static void performOnPrimary(

@Override
protected void doRun() throws Exception {
while (context.hasMoreOperationsToExecute()) {
if (executeBulkItemRequest(
context,
updateHelper,
nowInMillisSupplier,
mappingUpdater,
waitForMappingUpdate,
ActionListener.wrap(v -> executor.execute(this), this::onRejection)
) == false) {
// We are waiting for a mapping update on another thread, that will invoke this action again once its done
// so we just break out here.
return;
String uid = UUIDs.base64UUID();
long transactionId = -1L;
try {
transactionId = primary.startTransaction(uid);
while (context.hasMoreOperationsToExecute()) {
if (executeBulkItemRequest(
context,
updateHelper,
nowInMillisSupplier,
mappingUpdater,
waitForMappingUpdate,
ActionListener.wrap(v -> executor.execute(this), this::onRejection)
) == false) {
// We are waiting for a mapping update on another thread, that will invoke this action again once its done
// so we just break out here.
return;
}
assert context.isInitial(); // either completed and moved to next or reset
}
assert context.isInitial(); // either completed and moved to next or reset

primary.commitTransaction(uid, transactionId);
} catch (Exception x) {
logger.warn("Encountered an error while executing bulk transaction", x);
primary.rollbackTransaction(uid, transactionId);
} finally {
primary.closeTransaction(uid, transactionId);
}
primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
// We're done, there's no more operations to execute so we resolve the wrapped listener
Expand All @@ -206,7 +219,6 @@ public void onRejection(Exception e) {
// We must finish the outstanding request. Finishing the outstanding request can include
// refreshing and fsyncing. Therefore, we must force execution on the WRITE thread.
executor.execute(new ActionRunnable<>(listener) {

@Override
protected void doRun() {
// Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request
Expand Down
48 changes: 47 additions & 1 deletion server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,14 @@ public Condition newCondition() {
}
}

public abstract long startTransaction(String id) throws IOException;

public abstract boolean commitTransaction(String id, long transactionId) throws IOException;

public abstract boolean rollbackTransaction(String id, long transactionId) throws IOException;

public abstract boolean closeTransaction(String id, long transactionId) throws IOException;

/**
* Perform document index operation on the engine
* @param index operation to perform
Expand Down Expand Up @@ -1271,7 +1279,8 @@ public abstract static class Operation {
public enum TYPE {
INDEX,
DELETE,
NO_OP;
NO_OP,
TX_OP;

private final String lowercase;

Expand Down Expand Up @@ -1597,6 +1606,43 @@ public int estimatedSizeInBytes() {

}

public static class TxOp extends Operation {
public TxOp(final long startTime) {
super(null, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, Versions.NOT_FOUND, null, null, startTime);
}

@Override
public Term uid() {
throw new UnsupportedOperationException();
}

@Override
public long version() {
throw new UnsupportedOperationException();
}

@Override
public VersionType versionType() {
throw new UnsupportedOperationException();
}

@Override
String id() {
throw new UnsupportedOperationException();
}

@Override
public TYPE operationType() {
return TYPE.TX_OP;
}

@Override
public int estimatedSizeInBytes() {
return 2 * Long.BYTES;
}

}

public static class Get {
private final boolean realtime;
private final Term uid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2053,6 +2053,30 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws E
}
}

@Override
public long startTransaction(String id) throws IOException {
Translog.Location location = translog.add(new Translog.TxStart(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime()))));
return location.translogLocation;
}

@Override
public boolean commitTransaction(String id, long transactionId) throws IOException {
translog.add(new Translog.TxCommit(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId));
return true;
}

@Override
public boolean rollbackTransaction(String id, long transactionId) throws IOException {
translog.add(new Translog.TxRollback(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId));
return true;
}

@Override
public boolean closeTransaction(String id, long transactionId) throws IOException {
translog.add(new Translog.TxClose(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId));
return true;
}

private void pruneDeletedTombstones() {
/*
* We need to deploy two different trimming strategies for GC deletes on primary and replicas. Delete operations on primary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,26 @@ public void skipTranslogRecovery() {}
@Override
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {}

@Override
public long startTransaction(String id) throws IOException {
return 0;
}

@Override
public boolean commitTransaction(String id, long transactionId) throws IOException {
return false;
}

@Override
public boolean rollbackTransaction(String id, long transactionId) throws IOException {
return false;
}

@Override
public boolean closeTransaction(String id, long transactionId) throws IOException {
return false;
}

@Override
public void maybePruneDeletes() {}

Expand Down
16 changes: 16 additions & 0 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,22 @@ private IndexShardState changeState(IndexShardState newState, String reason) {
return previousState;
}

public long startTransaction(String id) throws IOException {
return getEngine().startTransaction(id);
}

public boolean commitTransaction(String id, long transactionId) throws IOException {
return getEngine().commitTransaction(id, transactionId);
}

public boolean rollbackTransaction(String id, long transactionId) throws IOException {
return getEngine().rollbackTransaction(id, transactionId);
}

public boolean closeTransaction(String id, long transactionId) throws IOException {
return getEngine().closeTransaction(id, transactionId);
}

public Engine.IndexResult applyIndexOperationOnPrimary(
long version,
VersionType versionType,
Expand Down
Loading

0 comments on commit 4d5d4b6

Please sign in to comment.