Skip to content

Commit

Permalink
remove zero-copy enabled in GrpcReplicationService
Browse files Browse the repository at this point in the history
  • Loading branch information
scolley31 committed Oct 23, 2024
1 parent 07706d6 commit 014514f
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,37 +58,24 @@ public class GrpcReplicationService extends
private final ContainerReplicationSource source;
private final ContainerImporter importer;

private final boolean zeroCopyEnabled;

private final ZeroCopyMessageMarshaller<SendContainerRequest>
sendContainerZeroCopyMessageMarshaller;

private final ZeroCopyMessageMarshaller<CopyContainerRequestProto>
copyContainerZeroCopyMessageMarshaller;

public GrpcReplicationService(ContainerReplicationSource source,
ContainerImporter importer, boolean zeroCopyEnabled) {
public GrpcReplicationService(ContainerReplicationSource source, ContainerImporter importer) {
this.source = source;
this.importer = importer;
this.zeroCopyEnabled = zeroCopyEnabled;

if (zeroCopyEnabled) {
sendContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
SendContainerRequest.getDefaultInstance());
copyContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
CopyContainerRequestProto.getDefaultInstance());
} else {
sendContainerZeroCopyMessageMarshaller = null;
copyContainerZeroCopyMessageMarshaller = null;
}

sendContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
SendContainerRequest.getDefaultInstance());
copyContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
CopyContainerRequestProto.getDefaultInstance());
}

public ServerServiceDefinition bindServiceWithZeroCopy() {
ServerServiceDefinition orig = super.bindService();
if (!zeroCopyEnabled) {
LOG.info("Zerocopy is not enabled.");
return orig;
}

Set<String> methodNames = new HashSet<>();
ServerServiceDefinition.Builder builder =
Expand Down Expand Up @@ -154,10 +141,7 @@ public void download(CopyContainerRequestProto request,
} finally {
// output may have already been closed, ignore such errors
IOUtils.cleanupWithLogger(LOG, outputStream);

if (copyContainerZeroCopyMessageMarshaller != null) {
copyContainerZeroCopyMessageMarshaller.release(request);
}
copyContainerZeroCopyMessageMarshaller.release(request);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,12 @@ public ReplicationServer(ContainerController controller,
new LinkedBlockingQueue<>(replicationQueueLimit),
threadFactory);

init(replicationConfig.isZeroCopyEnable());
init();
}

public void init(boolean enableZeroCopy) {
public void init() {
GrpcReplicationService grpcReplicationService = new GrpcReplicationService(
new OnDemandContainerReplicationSource(controller), importer,
enableZeroCopy);
new OnDemandContainerReplicationSource(controller), importer);
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.addService(ServerInterceptors.intercept(
Expand Down Expand Up @@ -203,11 +202,6 @@ public static final class ReplicationConfig {
static final String REPLICATION_OUTOFSERVICE_FACTOR_KEY =
PREFIX + "." + OUTOFSERVICE_FACTOR_KEY;

public static final String ZEROCOPY_ENABLE_KEY = "zerocopy.enabled";
private static final boolean ZEROCOPY_ENABLE_DEFAULT = true;
private static final String ZEROCOPY_ENABLE_DEFAULT_VALUE =
"true";

/**
* The maximum number of replication commands a single datanode can execute
* simultaneously.
Expand Down Expand Up @@ -249,15 +243,6 @@ public static final class ReplicationConfig {
)
private double outOfServiceFactor = OUTOFSERVICE_FACTOR_DEFAULT;

@Config(key = ZEROCOPY_ENABLE_KEY,
type = ConfigType.BOOLEAN,
defaultValue = ZEROCOPY_ENABLE_DEFAULT_VALUE,
tags = {DATANODE, SCM},
description = "Specify if zero-copy should be enabled for " +
"replication protocol."
)
private boolean zeroCopyEnable = ZEROCOPY_ENABLE_DEFAULT;

public double getOutOfServiceFactor() {
return outOfServiceFactor;
}
Expand Down Expand Up @@ -291,14 +276,6 @@ public void setReplicationQueueLimit(int limit) {
this.replicationQueueLimit = limit;
}

public boolean isZeroCopyEnable() {
return zeroCopyEnable;
}

public void setZeroCopyEnable(boolean zeroCopyEnable) {
this.zeroCopyEnable = zeroCopyEnable;
}

@PostConstruct
public void validate() {
if (replicationMaxStreams < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,15 @@ class TestGrpcReplicationService {

@BeforeEach
public void setUp() throws Exception {
init(false);
init();
}

public void init(boolean isZeroCopy) throws Exception {
public void init() throws Exception {
conf = new OzoneConfiguration();

ReplicationServer.ReplicationConfig replicationConfig =
conf.getObject(ReplicationServer.ReplicationConfig.class);

replicationConfig.setZeroCopyEnable(isZeroCopy);

SecurityConfig secConf = new SecurityConfig(conf);

ContainerSet containerSet = new ContainerSet(1000);
Expand Down Expand Up @@ -230,7 +228,7 @@ public void copyData(long containerId, OutputStream destination,
};
ContainerImporter importer = mock(ContainerImporter.class);
GrpcReplicationService subject =
new GrpcReplicationService(source, importer, false);
new GrpcReplicationService(source, importer);

CopyContainerRequestProto request = CopyContainerRequestProto.newBuilder()
.setContainerID(1)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.hadoop.ozone.client.rpc;

import org.apache.commons.lang3.NotImplementedException;
Expand Down Expand Up @@ -74,7 +75,7 @@
/**
* Tests key output stream.
*/
class TestECKeyOutputStreamWithZeroCopy {
public class TestECKeyOutputStream {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf = new OzoneConfiguration();
private static OzoneClient client;
Expand Down

0 comments on commit 014514f

Please sign in to comment.