Skip to content

Commit

Permalink
fix(transport): don't reuse buffer for request data
Browse files Browse the repository at this point in the history
Depending on the implementation of the request handler, reusing buffers
is unsafe. For example, the Query API has an asynchronous request
handler which means that modifying the buffer in `handleAtomixRequest`
is a data race.

Constructing an UnsafeBuffer is relatively cheap, so we can just do it
for every request to be on the safe side. Alternatively we could have
changed the interface for `RequestHandler` to accept the raw byte array
instead of a buffer.

(cherry picked from commit 768cc36)
  • Loading branch information
lenaschoenburg authored and github-actions[bot] committed Nov 25, 2021
1 parent 764962f commit cf9d5a8
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
Expand All @@ -33,15 +32,13 @@ public class AtomixServerTransport extends Actor implements ServerTransport {
private final Int2ObjectHashMap<Long2ObjectHashMap<CompletableFuture<byte[]>>>
partitionsRequestMap;
private final AtomicLong requestCount;
private final DirectBuffer reusableRequestBuffer;
private final MessagingService messagingService;
private final String actorName;

public AtomixServerTransport(final int nodeId, final MessagingService messagingService) {
this.messagingService = messagingService;
partitionsRequestMap = new Int2ObjectHashMap<>();
requestCount = new AtomicLong(0);
reusableRequestBuffer = new UnsafeBuffer(0, 0);
actorName = buildActorName(nodeId, "ServerTransport");
}

Expand Down Expand Up @@ -122,9 +119,13 @@ private CompletableFuture<byte[]> handleAtomixRequest(
}

try {
reusableRequestBuffer.wrap(requestBytes);
requestHandler.onRequest(
this, partitionId, requestId, reusableRequestBuffer, 0, requestBytes.length);
this,
partitionId,
requestId,
new UnsafeBuffer(requestBytes),
0,
requestBytes.length);
if (LOG.isTraceEnabled()) {
LOG.trace(
"Handled request {} for topic {}",
Expand Down

0 comments on commit cf9d5a8

Please sign in to comment.