Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Publisher.fromInputStream(InputStream, ByteArrayMapper) #2989

Merged
merged 3 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.api.FromInputStreamPublisher.ToByteArrayMapper;

import static io.servicetalk.concurrent.api.FromInputStreamPublisher.DEFAULT_MAX_BUFFER_SIZE;
import static io.servicetalk.concurrent.api.FromInputStreamPublisher.ToByteArrayMapper.DEFAULT_TO_BYTE_ARRAY_MAPPER;

/**
* A mapper to transform {@code byte[]} buffer regions into a desired type {@code T}.
*
* @param <T> Type of the result of this mapper
*/
@FunctionalInterface
public interface ByteArrayMapper<T> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this interface bcz concurrent-api doesn't have a dependency on buffer-api module.
Eventually, I think it's worth it. We can consolidate configuration parameters for how to read data from InputStream as default methods on this interface instead of adding more and more overloads for Publisher.fromInputStream. Publisher is already quite a lengthy class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


/**
* Maps a specified {@code byte[]} buffer region into a {@code T}.
* <p>
* The mapper can operate only with the specified region of the {@code buffer}. Access to other parts of the buffer
* may lead to unexpected results. The specified region can be safely used by the mapper without a need to copy
* data.
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
*
* @param buffer {@code byte[]} buffer with data
* @param offset the offset of the region
* @param length the length of the region
* @return result of type {@code T}
*/
T map(byte[] buffer, int offset, int length);

/**
* Returns the maximum allowed buffer size for the {@link #map(byte[], int, int)} operation.
* <p>
* Must be a positive number.
*
* @return the maximum allowed buffer size for the {@link #map(byte[], int, int)} operation
*/
default int maxBufferSize() {
return DEFAULT_MAX_BUFFER_SIZE;
}

/**
* Mapper from the buffer region to an independent {@code byte[]} buffer.
* <p>
* Returns {@link #toByteArray(int)} with default {@link #maxBufferSize()}.
*
* @return a mapper from the buffer region to an independent {@code byte[]} buffer
*/
static ByteArrayMapper<byte[]> toByteArray() {
return DEFAULT_TO_BYTE_ARRAY_MAPPER;
}

/**
* Mapper from the buffer region to an independent {@code byte[]} buffer.
* <p>
* Returns the original {@code byte[]} buffer as-is if it was completely full of data or allocates a new buffer for
* the specified length and copies data. Returned {@code byte[]} buffer is always completely full.
*
* @param maxBufferSize the value for {@link #maxBufferSize()}
* @return a mapper from the buffer region to an independent {@code byte[]} buffer
*/
static ByteArrayMapper<byte[]> toByteArray(final int maxBufferSize) {
return new ToByteArrayMapper(maxBufferSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,22 @@
import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid;
import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN;
import static io.servicetalk.utils.internal.NumberUtils.ensurePositive;
import static java.lang.Math.min;
import static java.lang.System.arraycopy;
import static java.util.Objects.requireNonNull;

/**
* A {@link Publisher} created from an {@link InputStream} such that any data requested from the {@link Publisher} is
* read from the {@link InputStream} until it terminates.
*
* <p>
* Given that {@link InputStream} is a blocking API, requesting data from the {@link Publisher} can block on {@link
* Subscription#request(long)} until there is sufficient data available. The implementation attempts to minimize
* blocking, however by reading data faster than the writer is sending, blocking is inevitable.
*
* @param <T> Type of items emitted to the {@link PublisherSource.Subscriber}.
*/
final class FromInputStreamPublisher extends Publisher<byte[]> implements PublisherSource<byte[]> {
final class FromInputStreamPublisher<T> extends Publisher<T> implements PublisherSource<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(FromInputStreamPublisher.class);
// While sun.nio.ch.FileChannelImpl and java.io.InputStream.transferTo(...) use 8Kb chunks,
// we use 16Kb-32B because 16Kb is:
Expand All @@ -53,7 +56,8 @@ final class FromInputStreamPublisher extends Publisher<byte[]> implements Publis
// write hits SslHandler. This helps utilize the full potential of the transport without fragmentation at TLS/HTTP/2
// layers or introducing too many flushes (they are expensive!) for large payloads. Benchmarks confirmed that
// subtraction of 32B significantly improves throughput and latency for TLS and has no effect on plaintext traffic.
private static final int DEFAULT_READ_CHUNK_SIZE = 16 * 1024 - 32;
static final int DEFAULT_MAX_BUFFER_SIZE = 16 * 1024 - 32;
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<FromInputStreamPublisher> subscribedUpdater =
AtomicIntegerFieldUpdater.newUpdater(FromInputStreamPublisher.class, "subscribed");

Expand All @@ -71,42 +75,30 @@ final class FromInputStreamPublisher extends Publisher<byte[]> implements Publis
private volatile int subscribed;

private final InputStream stream;
private final int readChunkSize;
private final ByteArrayMapper<T> mapper;

/**
* A new instance.
*
* @param stream the {@link InputStream} to expose as a {@link Publisher}
* @param mapper a mapper to transform a {@code byte[]} buffer into a desired type {@code T} that will be emitted by
* the {@link Publisher}
*/
FromInputStreamPublisher(final InputStream stream) {
this(stream, DEFAULT_READ_CHUNK_SIZE);
}

/**
* A new instance.
*
* @param stream the {@link InputStream} to expose as a {@link Publisher}
* @param readChunkSize the maximum length of {@code byte[]} chunks which will be read from the {@link InputStream}
* and emitted by the {@link Publisher}.
*/
FromInputStreamPublisher(final InputStream stream, final int readChunkSize) {
FromInputStreamPublisher(final InputStream stream, final ByteArrayMapper<T> mapper) {
this.stream = requireNonNull(stream);
if (readChunkSize <= 0) {
throw new IllegalArgumentException("readChunkSize: " + readChunkSize + " (expected: >0)");
}
this.readChunkSize = readChunkSize;
this.mapper = requireNonNull(mapper);
}

@Override
public void subscribe(final Subscriber<? super byte[]> subscriber) {
public void subscribe(final Subscriber<? super T> subscriber) {
subscribeInternal(subscriber);
}

@Override
protected void handleSubscribe(final Subscriber<? super byte[]> subscriber) {
protected void handleSubscribe(final Subscriber<? super T> subscriber) {
if (subscribedUpdater.compareAndSet(this, 0, 1)) {
try {
subscriber.onSubscribe(new InputStreamPublisherSubscription(stream, subscriber, readChunkSize));
subscriber.onSubscribe(new InputStreamPublisherSubscription<>(stream, subscriber, mapper));
} catch (Throwable t) {
handleExceptionFromOnSubscribe(subscriber, t);
}
Expand All @@ -115,7 +107,7 @@ protected void handleSubscribe(final Subscriber<? super byte[]> subscriber) {
}
}

private static final class InputStreamPublisherSubscription implements Subscription {
private static final class InputStreamPublisherSubscription<T> implements Subscription {

private static final int END_OF_FILE = -1;
/**
Expand All @@ -124,8 +116,8 @@ private static final class InputStreamPublisherSubscription implements Subscript
private static final int TERMINAL_SENT = -1;

private final InputStream stream;
private final Subscriber<? super byte[]> subscriber;
private final int readChunkSize;
private final Subscriber<? super T> subscriber;
private final ByteArrayMapper<T> mapper;
/**
* Contains the outstanding demand or {@link #TERMINAL_SENT} indicating when {@link InputStream} and {@link
* Subscription} are terminated.
Expand All @@ -134,11 +126,11 @@ private static final class InputStreamPublisherSubscription implements Subscript
private int writeIdx;
private boolean ignoreRequests;

InputStreamPublisherSubscription(final InputStream stream, final Subscriber<? super byte[]> subscriber,
final int readChunkSize) {
InputStreamPublisherSubscription(final InputStream stream, final Subscriber<? super T> subscriber,
final ByteArrayMapper<T> mapper) {
this.stream = stream;
this.subscriber = subscriber;
this.readChunkSize = readChunkSize;
this.mapper = mapper;
}

@Override
Expand Down Expand Up @@ -170,7 +162,7 @@ public void cancel() {
}
}

private void readAndDeliver(final Subscriber<? super byte[]> subscriber) {
private void readAndDeliver(final Subscriber<? super T> subscriber) {
try {
do {
// Initialize readByte with a negative value different from END_OF_FILE as an indicator that it was
Expand All @@ -191,8 +183,8 @@ private void readAndDeliver(final Subscriber<? super byte[]> subscriber) {
if (available == 0) {
// This InputStream either does not implement available() method at all, or does not honor
// the 0 == EOF contract, or does not prefetch data in larger chunks.
// In this case, we attempt to read based on the configured readChunkSize:
available = readChunkSize;
// In this case, we attempt to read based on the configured maxBufferSize:
available = mapper.maxBufferSize();
}
}
available = readAvailableAndEmit(available, readByte);
Expand All @@ -207,9 +199,10 @@ private void readAndDeliver(final Subscriber<? super byte[]> subscriber) {
}

private int readAvailableAndEmit(final int available, final int readByte) throws IOException {
final int readChunkSize = mapper.maxBufferSize();
final byte[] buffer;
if (readByte >= 0) {
buffer = new byte[available < readChunkSize ? available + 1 : readChunkSize];
buffer = new byte[min(available + 1, readChunkSize)];
buffer[writeIdx++] = (byte) readByte;
} else {
buffer = new byte[min(available, readChunkSize)];
Expand All @@ -233,29 +226,21 @@ private int fillBuffer(final byte[] buffer, int available) throws IOException {
return available;
}

private void emitSingleBuffer(final Subscriber<? super byte[]> subscriber,
private void emitSingleBuffer(final Subscriber<? super T> subscriber,
final byte[] buffer, final int remainingLength) {
if (writeIdx < 1) {
assert remainingLength == END_OF_FILE :
"unexpected writeIdx == 0 while we still have some remaining data to read";
return;
}
assert writeIdx <= buffer.length : "writeIdx can not be grater than buffer.length";
final byte[] b;
if (writeIdx == buffer.length) {
b = buffer;
} else {
// this extra copy is necessary when we read the last chunk and total number of bytes read before EOF
// is less than guesstimated buffer size
b = new byte[writeIdx];
arraycopy(buffer, 0, b, 0, writeIdx);
}
final T item = mapper.map(buffer, 0, writeIdx);
requested--;
writeIdx = 0;
subscriber.onNext(b);
subscriber.onNext(item);
}

private void sendOnComplete(final Subscriber<? super byte[]> subscriber) {
private void sendOnComplete(final Subscriber<? super T> subscriber) {
closeStream(subscriber);
if (trySetTerminalSent()) {
try {
Expand All @@ -266,7 +251,7 @@ private void sendOnComplete(final Subscriber<? super byte[]> subscriber) {
}
}

private void sendOnError(final Subscriber<? super byte[]> subscriber, final Throwable t) {
private void sendOnError(final Subscriber<? super T> subscriber, final Throwable t) {
if (trySetTerminalSent()) {
try {
subscriber.onError(t);
Expand All @@ -285,7 +270,7 @@ private Throwable closeStreamOnError(Throwable t) {
return t;
}

private void closeStream(final Subscriber<? super byte[]> subscriber) {
private void closeStream(final Subscriber<? super T> subscriber) {
try {
stream.close();
} catch (Throwable e) {
Expand All @@ -306,4 +291,32 @@ private boolean trySetTerminalSent() {
return true;
}
}

static final class ToByteArrayMapper implements ByteArrayMapper<byte[]> {

static final ByteArrayMapper<byte[]> DEFAULT_TO_BYTE_ARRAY_MAPPER =
new ToByteArrayMapper(DEFAULT_MAX_BUFFER_SIZE);

private final int maxBufferSize;

ToByteArrayMapper(final int maxBufferSize) {
this.maxBufferSize = ensurePositive(maxBufferSize, "maxBufferSize");
}

@Override
public byte[] map(final byte[] buffer, final int offset, final int length) {
if (offset == 0 && length == buffer.length) {
return buffer;
} else {
final byte[] partial = new byte[length];
daschl marked this conversation as resolved.
Show resolved Hide resolved
arraycopy(buffer, offset, partial, 0, length);
return partial;
}
}

@Override
public int maxBufferSize() {
return maxBufferSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4522,9 +4522,12 @@ public static <T> Publisher<T> fromBlockingIterable(BlockingIterable<? extends T
* InputStream#read(byte[], int, int)}.
* @return a new {@link Publisher} that when subscribed will emit all data from the {@link InputStream} to the
* {@link Subscriber} and then {@link Subscriber#onComplete()}.
* @deprecated Use {@link #fromInputStream(InputStream, ByteArrayMapper)} with
* {@link ByteArrayMapper#toByteArray()}.
*/
@Deprecated // FIXME: 0.43 - remove deprecated method
public static Publisher<byte[]> fromInputStream(InputStream stream) {
return new FromInputStreamPublisher(stream);
return fromInputStream(stream, ByteArrayMapper.toByteArray());
}

/**
Expand Down Expand Up @@ -4552,9 +4555,45 @@ public static Publisher<byte[]> fromInputStream(InputStream stream) {
* and emitted by the returned {@link Publisher}.
* @return a new {@link Publisher} that when subscribed will emit all data from the {@link InputStream} to the
* {@link Subscriber} and then {@link Subscriber#onComplete()}.
* @deprecated Use {@link #fromInputStream(InputStream, ByteArrayMapper)} with
* {@link ByteArrayMapper#toByteArray(int)}.
*/
@Deprecated // FIXME: 0.43 - remove deprecated method
public static Publisher<byte[]> fromInputStream(InputStream stream, int readChunkSize) {
return new FromInputStreamPublisher(stream, readChunkSize);
return fromInputStream(stream, ByteArrayMapper.toByteArray(readChunkSize));
}

/**
* Create a new {@link Publisher} that when subscribed will emit all data from the {@link InputStream} to the
* {@link Subscriber} as a mapped type {@code T} and then {@link Subscriber#onComplete()}.
* <p>
* The resulting publisher is not replayable and supports only a single {@link Subscriber}.
* <p>
* After a returned {@link Publisher} is subscribed, it owns the passed {@link InputStream}, meaning that the
* {@link InputStream} will be automatically closed when the {@link Publisher} is cancelled or terminated. Not
* necessary to close the {@link InputStream} after subscribe, but it should be closed when control flow never
* subscribes to the returned {@link Publisher}.
* <p>
* The Reactive Streams specification provides two criteria (
* <a href="https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.4">3.4</a>, and
* <a href="https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.5">3.5</a>) stating
* the {@link Subscription} should be "responsive". The responsiveness of the associated {@link Subscription}s will
* depend upon the behavior of the {@code stream} below. Make sure the {@link Executor} for this execution chain
* can tolerate this responsiveness and any blocking behavior.
* <p>
* Given the blocking nature of {@link InputStream}, assume {@link Subscription#request(long)} can block when the
* underlying {@link InputStream} blocks on {@link InputStream#read(byte[], int, int)}.
*
* @param stream provides the data in the form of {@code byte[]} buffer regions for the specified
* {@link ByteArrayMapper}.
* @param mapper a mapper to transform raw {@code byte[]} buffer regions into a desired type {@code T} to be emitted
* to the {@link Subscriber} by the returned {@link Publisher}.
* @param <T> Type of the items emitted by the returned {@link Publisher}.
* @return a new {@link Publisher} that when subscribed will emit all data from the {@link InputStream} to the
* {@link Subscriber} as a mapped type {@code T} and then {@link Subscriber#onComplete()}.
*/
public static <T> Publisher<T> fromInputStream(InputStream stream, ByteArrayMapper<T> mapper) {
return new FromInputStreamPublisher<>(stream, mapper);
}

/**
Expand Down
Loading
Loading