Skip to content

Commit

Permalink
feat: Convert internal interfaces to use protos
Browse files Browse the repository at this point in the history
  • Loading branch information
dpcollins-google committed Feb 20, 2023
1 parent 4daf3ab commit c14b41b
Show file tree
Hide file tree
Showing 37 changed files with 370 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package com.google.cloud.pubsublite.cloudpubsub.internal;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberResetHandler;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.io.Serializable;
import java.util.List;
import java.util.function.Consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ private boolean terminated() {
}

@VisibleForTesting
void onMessages(List<SequencedMessage> sequencedMessages) {
void onMessages(List<com.google.cloud.pubsublite.proto.SequencedMessage> sequencedMessages) {
try {
for (SequencedMessage message : sequencedMessages) {
for (com.google.cloud.pubsublite.proto.SequencedMessage proto : sequencedMessages) {
SequencedMessage message = SequencedMessage.fromProto(proto);
PubsubMessage userMessage = transformer.transform(message);
long bytes = message.byteSize();
Runnable trackerConsumer = ackSetTracker.track(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
return ApiFutures.immediateFailedFuture(e.underlying);
}
return ApiFutures.transform(
wirePublisher.publish(wireMessage),
wirePublisher.publish(wireMessage.toProto()),
MessageMetadata::encode,
MoreExecutors.directExecutor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.util.Optional;
import javax.annotation.concurrent.ThreadSafe;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import com.google.api.core.ApiService.State;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Collection;
Expand Down Expand Up @@ -110,7 +110,7 @@ public synchronized Optional<SequencedMessage> messageIfAvailable() throws Check
underlying.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedMessages(1)
.setAllowedBytes(msg.byteSize())
.setAllowedBytes(msg.getSizeBytes())
.build());
return Optional.of(msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import com.google.api.core.ApiService.Listener;
import com.google.api.core.ApiService.State;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.errorprone.annotations.concurrent.GuardedBy;
Expand Down Expand Up @@ -85,13 +85,13 @@ public synchronized List<SequencedMessage> pull() throws CheckedApiException {
}
Deque<SequencedMessage> collection = messages;
messages = new ArrayDeque<>();
long bytes = collection.stream().mapToLong(SequencedMessage::byteSize).sum();
long bytes = collection.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
underlying.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedBytes(bytes)
.setAllowedMessages(collection.size())
.build());
lastDelivered = Optional.of(Iterables.getLast(collection).offset());
lastDelivered = Optional.of(Offset.of(Iterables.getLast(collection).getCursor().getOffset()));
return ImmutableList.copyOf(collection);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import java.io.Flushable;

/** A generic PubSub Lite publisher. Errors are handled out of band. Thread safe. */
Expand All @@ -28,7 +28,7 @@ public interface Publisher<ResponseT> extends ApiService, Flushable {
//
// Guarantees that if a single publish future has an exception set, all publish calls made after
// that will also have an exception set.
ApiFuture<ResponseT> publish(Message message);
ApiFuture<ResponseT> publish(PubSubMessage message);

// Attempts to cancel all outstanding publishes.
void cancelOutstandingPublishes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import java.io.Flushable;

/**
Expand All @@ -35,7 +35,7 @@ public interface SequencedPublisher<ResponseT> extends ApiService, Flushable {
* <p>Guarantees that if a single publish future has an exception set, all publish calls made
* after that will also have an exception set.
*/
ApiFuture<ResponseT> publish(Message message, PublishSequenceNumber sequenceNumber);
ApiFuture<ResponseT> publish(PubSubMessage message, PublishSequenceNumber sequenceNumber);

/** Attempts to cancel all outstanding publishes. */
void cancelOutstandingPublishes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import java.io.IOException;

public class ApiExceptionPublisher<T> extends ProxyService implements Publisher<T> {
Expand All @@ -34,7 +34,7 @@ public class ApiExceptionPublisher<T> extends ProxyService implements Publisher<
}

@Override
public ApiFuture<T> publish(Message message) {
public ApiFuture<T> publish(PubSubMessage message) {
return toClientFuture(publisher.publish(message));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.google.cloud.pubsublite.internal.wire;

import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.cloud.pubsublite.proto.SubscribeRequest;
import com.google.cloud.pubsublite.proto.SubscribeResponse;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@

import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.wire.StreamFactories.SubscribeStreamFactory;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.MessageResponse;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.cloud.pubsublite.proto.SubscribeRequest;
import com.google.cloud.pubsublite.proto.SubscribeResponse;
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.stream.Collectors;

class ConnectedSubscriberImpl
extends SingleConnection<SubscribeRequest, SubscribeResponse, List<SequencedMessage>>
Expand Down Expand Up @@ -103,14 +102,10 @@ private void onMessages(MessageResponse response) throws CheckedApiException {
response.getMessagesCount() > 0,
"Received an empty MessageResponse on stream with initial request %s.",
initialRequest);
List<SequencedMessage> messages =
response.getMessagesList().stream()
.map(SequencedMessage::fromProto)
.collect(Collectors.toList());
checkState(
Predicates.isOrdered(messages),
Predicates.isOrdered(response.getMessagesList()),
"Received out of order messages on the stream with initial request %s.",
initialRequest);
sendToClient(messages);
sendToClient(response.getMessagesList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package com.google.cloud.pubsublite.internal.wire;

import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.util.Collection;
import java.util.Optional;

Expand All @@ -40,7 +40,7 @@ void onClientFlowRequest(FlowControlRequest request) throws CheckedApiException
}

void onMessages(Collection<SequencedMessage> received) throws CheckedApiException {
long byteSize = received.stream().mapToLong(SequencedMessage::byteSize).sum();
long byteSize = received.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
clientTokens.sub(byteSize, received.size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import static com.google.cloud.pubsublite.internal.wire.Predicates.isOrdered;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.Optional;
Expand All @@ -37,13 +37,13 @@ public class NextOffsetTracker {
void onMessages(Collection<SequencedMessage> messages) throws CheckedApiException {
checkArgument(!messages.isEmpty());
checkArgument(isOrdered(messages));
Offset firstMessageOffset = messages.iterator().next().offset();
long firstMessageOffset = messages.iterator().next().getCursor().getOffset();
checkState(
!nextOffset.isPresent() || (nextOffset.get().value() <= firstMessageOffset.value()),
!nextOffset.isPresent() || (nextOffset.get().value() <= firstMessageOffset),
String.format(
"Received message with offset %s older than known cursor location %s.",
firstMessageOffset, nextOffset));
nextOffset = Optional.of(Offset.of(Iterables.getLast(messages).offset().value() + 1));
nextOffset = Optional.of(Offset.of(Iterables.getLast(messages).getCursor().getOffset() + 1));
}

// Gives the SeekRequest that should be sent on restart, or empty if none should be sent because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.RoutingPolicy;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.common.collect.ImmutableMap;
import com.google.common.flogger.GoogleLogger;
import com.google.errorprone.annotations.concurrent.GuardedBy;
Expand All @@ -56,12 +56,12 @@ private PartitionsWithRouting(
this.routingPolicy = routingPolicy;
}

public ApiFuture<MessageMetadata> publish(Message message) throws CheckedApiException {
public ApiFuture<MessageMetadata> publish(PubSubMessage message) throws CheckedApiException {
try {
Partition routedPartition =
message.key().isEmpty()
message.getKey().isEmpty()
? routingPolicy.routeWithoutKey()
: routingPolicy.route(message.key());
: routingPolicy.route(message.getKey());
checkState(
publishers.containsKey(routedPartition),
"Routed to partition %s for which there is no publisher available.",
Expand Down Expand Up @@ -108,7 +108,7 @@ public void stop() {
}

@Override
public ApiFuture<MessageMetadata> publish(Message message) {
public ApiFuture<MessageMetadata> publish(PubSubMessage message) {
Optional<PartitionsWithRouting> partitions;
try (CloseableMonitor.Hold h = monitor.enter()) {
partitions = partitionsWithRouting;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.DefaultRoutingPolicy;
import com.google.cloud.pubsublite.internal.Publisher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package com.google.cloud.pubsublite.internal.wire;

import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.Ordering;
import java.util.Comparator;

public final class Predicates {
public static boolean isOrdered(Iterable<SequencedMessage> messages) {
return Ordering.from(Comparator.comparingLong((SequencedMessage m) -> m.offset().value()))
return Ordering.from(
Comparator.comparingLong((SequencedMessage m) -> m.getCursor().getOffset()))
.isStrictlyOrdered(messages);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.Constants;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.CheckedApiException;
Expand Down Expand Up @@ -246,8 +245,7 @@ private void terminateOutstandingPublishes(CheckedApiException e) {
}

@Override
public ApiFuture<Offset> publish(Message message, PublishSequenceNumber sequenceNumber) {
PubSubMessage proto = message.toProto();
public ApiFuture<Offset> publish(PubSubMessage message, PublishSequenceNumber sequenceNumber) {
try (CloseableMonitor.Hold h = batcherMonitor.enter()) {
ApiService.State currentState = state();
switch (currentState) {
Expand All @@ -258,7 +256,7 @@ public ApiFuture<Offset> publish(Message message, PublishSequenceNumber sequence
Code.FAILED_PRECONDITION);
case STARTING:
case RUNNING:
return batcher.add(proto, sequenceNumber);
return batcher.add(message, sequenceNumber);
default:
throw new CheckedApiException(
"Cannot publish when Publisher state is " + currentState.name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.RoutingPolicy;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import java.io.IOException;
import java.util.Map;

Expand All @@ -46,10 +46,10 @@ public class RoutingPublisher extends ProxyService implements Publisher<MessageM

// Publisher implementation.
@Override
public ApiFuture<MessageMetadata> publish(Message message) {
public ApiFuture<MessageMetadata> publish(PubSubMessage message) {
try {
Partition routedPartition =
message.key().isEmpty() ? policy.routeWithoutKey() : policy.route(message.key());
message.getKey().isEmpty() ? policy.routeWithoutKey() : policy.route(message.getKey());
checkState(
partitionPublishers.containsKey(routedPartition),
"Routed to partition %s for which there is no publisher available.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.SequencedPublisher;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import java.io.IOException;
import javax.annotation.concurrent.GuardedBy;

Expand All @@ -44,7 +44,7 @@ public class SequenceAssigningPublisher extends ProxyService implements Publishe

// Publisher implementation.
@Override
public synchronized ApiFuture<Offset> publish(Message message) {
public synchronized ApiFuture<Offset> publish(PubSubMessage message) {
ApiFuture<Offset> future = publisher.publish(message, nextSequence);
nextSequence = nextSequence.next();
return future;
Expand Down
Loading

0 comments on commit c14b41b

Please sign in to comment.