Skip to content

Commit

Permalink
Add transactionId almost everwhere
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikola Grcevski committed Nov 17, 2021
1 parent 4d5d4b6 commit a781eb6
Show file tree
Hide file tree
Showing 17 changed files with 334 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ public void testMaybeFlush() throws Exception {
SequenceNumbers.UNASSIGNED_SEQ_NO,
0,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false
false,
IndexShard.NO_TRANSACTION_ID
);
assertTrue(shard.shouldPeriodicallyFlush());
final Translog translog = getTranslog(shard);
Expand Down Expand Up @@ -447,7 +448,8 @@ public void testMaybeRollTranslogGeneration() throws Exception {
SequenceNumbers.UNASSIGNED_SEQ_NO,
0,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false
false,
IndexShard.NO_TRANSACTION_ID
);
final Translog.Location location = result.getTranslogLocation();
shard.afterWriteOperation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ protected void doRun() throws Exception {
nowInMillisSupplier,
mappingUpdater,
waitForMappingUpdate,
ActionListener.wrap(v -> executor.execute(this), this::onRejection)
ActionListener.wrap(v -> executor.execute(this), this::onRejection),
transactionId
) == false) {
// We are waiting for a mapping update on another thread, that will invoke this action again once its done
// so we just break out here.
Expand Down Expand Up @@ -262,6 +263,25 @@ private void finishRequest() {
}.run();
}

static boolean executeBulkItemRequest(
BulkPrimaryExecutionContext context,
UpdateHelper updateHelper,
LongSupplier nowInMillisSupplier,
MappingUpdatePerformer mappingUpdater,
Consumer<ActionListener<Void>> waitForMappingUpdate,
ActionListener<Void> itemDoneListener
) throws Exception {
return executeBulkItemRequest(
context,
updateHelper,
nowInMillisSupplier,
mappingUpdater,
waitForMappingUpdate,
itemDoneListener,
IndexShard.NO_TRANSACTION_ID
);
}

/**
* Executes bulk item requests and handles request execution exceptions.
* @return {@code true} if request completed on this thread and the listener was invoked, {@code false} if the request triggered
Expand All @@ -273,7 +293,8 @@ static boolean executeBulkItemRequest(
LongSupplier nowInMillisSupplier,
MappingUpdatePerformer mappingUpdater,
Consumer<ActionListener<Void>> waitForMappingUpdate,
ActionListener<Void> itemDoneListener
ActionListener<Void> itemDoneListener,
long transactionId
) throws Exception {
final DocWriteRequest.OpType opType = context.getCurrent().opType();

Expand Down Expand Up @@ -318,7 +339,8 @@ static boolean executeBulkItemRequest(
request.id(),
request.versionType(),
request.ifSeqNo(),
request.ifPrimaryTerm()
request.ifPrimaryTerm(),
transactionId
);
} else {
final IndexRequest request = context.getRequestToExecute();
Expand All @@ -336,7 +358,8 @@ static boolean executeBulkItemRequest(
request.ifSeqNo(),
request.ifPrimaryTerm(),
request.getAutoGeneratedTimestamp(),
request.isRetry()
request.isRetry(),
transactionId
);
}
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
Expand Down Expand Up @@ -521,7 +544,7 @@ private static BulkItemResponse processUpdateResponse(
protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
final long startBulkTime = System.nanoTime();
final Translog.Location location = performOnReplica(request, replica);
final Translog.Location location = performOnReplica(request, replica, IndexShard.NO_TRANSACTION_ID);
replica.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
return new WriteReplicaResult<>(request, location, null, replica, logger);
});
Expand All @@ -537,7 +560,7 @@ protected int replicaOperationCount(BulkShardRequest request) {
return request.items().length;
}

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica, long transactionId) throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
final BulkItemRequest item = request.items()[i];
Expand Down Expand Up @@ -565,7 +588,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
continue; // ignore replication as it's a noop
}
assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
operationResult = performOpOnReplica(response.getResponse(), item.request(), replica);
operationResult = performOpOnReplica(response.getResponse(), item.request(), replica, transactionId);
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
Expand All @@ -576,7 +599,8 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
private static Engine.Result performOpOnReplica(
DocWriteResponse primaryResponse,
DocWriteRequest<?> docWriteRequest,
IndexShard replica
IndexShard replica,
long transactionId
) throws Exception {
final Engine.Result result;
switch (docWriteRequest.opType()) {
Expand All @@ -596,7 +620,8 @@ private static Engine.Result performOpOnReplica(
primaryResponse.getVersion(),
indexRequest.getAutoGeneratedTimestamp(),
indexRequest.isRetry(),
sourceToParse
sourceToParse,
transactionId
);
break;
case DELETE:
Expand All @@ -605,7 +630,8 @@ private static Engine.Result performOpOnReplica(
primaryResponse.getSeqNo(),
primaryResponse.getPrimaryTerm(),
primaryResponse.getVersion(),
deleteRequest.id()
deleteRequest.id(),
transactionId
);
break;
default:
Expand Down
60 changes: 60 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,7 @@ public static class Index extends Operation {
private final boolean isRetry;
private final long ifSeqNo;
private final long ifPrimaryTerm;
private final long transactionId;

public Index(
Term uid,
Expand All @@ -1386,6 +1387,38 @@ public Index(
boolean isRetry,
long ifSeqNo,
long ifPrimaryTerm
) {
this(
uid,
doc,
seqNo,
primaryTerm,
version,
versionType,
origin,
startTime,
autoGeneratedIdTimestamp,
isRetry,
ifSeqNo,
ifPrimaryTerm,
-1L
);
}

public Index(
Term uid,
ParsedDocument doc,
long seqNo,
long primaryTerm,
long version,
VersionType versionType,
Origin origin,
long startTime,
long autoGeneratedIdTimestamp,
boolean isRetry,
long ifSeqNo,
long ifPrimaryTerm,
long transactionId
) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
Expand All @@ -1398,6 +1431,7 @@ public Index(
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
this.ifSeqNo = ifSeqNo;
this.ifPrimaryTerm = ifPrimaryTerm;
this.transactionId = transactionId;
}

public Index(Term uid, long primaryTerm, ParsedDocument doc) {
Expand Down Expand Up @@ -1476,13 +1510,18 @@ public long getIfSeqNo() {
public long getIfPrimaryTerm() {
return ifPrimaryTerm;
}

public long getTransactionId() {
return transactionId;
}
}

public static class Delete extends Operation {

private final String id;
private final long ifSeqNo;
private final long ifPrimaryTerm;
private final long transactionId;

public Delete(
String id,
Expand All @@ -1495,6 +1534,22 @@ public Delete(
long startTime,
long ifSeqNo,
long ifPrimaryTerm
) {
this(id, uid, seqNo, primaryTerm, version, versionType, origin, startTime, ifSeqNo, ifPrimaryTerm, -1L);
}

public Delete(
String id,
Term uid,
long seqNo,
long primaryTerm,
long version,
VersionType versionType,
Origin origin,
long startTime,
long ifSeqNo,
long ifPrimaryTerm,
long transactionId
) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
Expand All @@ -1505,6 +1560,7 @@ public Delete(
this.id = Objects.requireNonNull(id);
this.ifSeqNo = ifSeqNo;
this.ifPrimaryTerm = ifPrimaryTerm;
this.transactionId = transactionId;
}

public Delete(String id, Term uid, long primaryTerm) {
Expand Down Expand Up @@ -1559,6 +1615,10 @@ public long getIfSeqNo() {
public long getIfPrimaryTerm() {
return ifPrimaryTerm;
}

public long getTransactionId() {
return transactionId;
}
}

public static class NoOp extends Operation {
Expand Down
Loading

0 comments on commit a781eb6

Please sign in to comment.