Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28209][CORE][SHUFFLE] Proposed new shuffle writer API #25007

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1957e82
[SPARK-25299] Introduce the new shuffle writer API (#5) (#520)
mccheah Mar 20, 2019
857552a
[SPARK-25299] Local shuffle implementation of the shuffle writer API …
mccheah Apr 3, 2019
d13037f
[SPARK-25299] Make UnsafeShuffleWriter use the new API (#536)
mccheah Apr 17, 2019
8f5fb60
[SPARK-25299] Use the shuffle writer plugin for the SortShuffleWriter…
mccheah Apr 15, 2019
e17c7ea
[SPARK-25299] Shuffle locations api (#517)
mccheah Apr 19, 2019
3f0c131
[SPARK-25299] Move shuffle writers back to being given specific parti…
mccheah Apr 19, 2019
f982df7
[SPARK-25299] Don't set map status twice in bypass merge sort shuffle…
mccheah Apr 19, 2019
6891197
[SPARK-25299] Propose a new NIO transfer API for partition writing. (…
mccheah May 24, 2019
7b44ed2
Remove shuffle location support.
mccheah Jun 27, 2019
df75f1f
Remove changes to UnsafeShuffleWriter
mccheah Jun 27, 2019
a8558af
Revert changes for SortShuffleWriter
mccheah Jun 27, 2019
806d7bb
Revert a bunch of other stuff
mccheah Jun 27, 2019
3167030
More reverts
mccheah Jun 27, 2019
70f59db
Set task contexts in failing test
mccheah Jun 28, 2019
3083d86
Fix style
mccheah Jun 28, 2019
4c3d692
Check for null on the block manager as well.
mccheah Jun 28, 2019
2421c92
Add task attempt id in the APIs
mccheah Jul 1, 2019
982f207
Address comments
mccheah Jul 8, 2019
594d1e2
Fix style
mccheah Jul 8, 2019
66aae91
Address comments.
mccheah Jul 12, 2019
8b432f9
Merge remote-tracking branch 'origin/master' into spark-shuffle-write…
mccheah Jul 17, 2019
9f597dd
Address comments.
mccheah Jul 18, 2019
86c1829
Restructure test
mccheah Jul 18, 2019
a7885ae
Add ShuffleWriteMetricsReporter to the createMapOutputWriter API.
mccheah Jul 19, 2019
9893c6c
Add more documentation
mccheah Jul 19, 2019
cd897e7
REfactor reading records from file in test
mccheah Jul 19, 2019
9f17b9b
Address comments
mccheah Jul 24, 2019
e53a001
Code tags
mccheah Jul 24, 2019
56fa450
Add some docs
mccheah Jul 24, 2019
b8b7b8d
Change mockito format in BypassMergeSortShuffleWriterSuite
mccheah Jul 25, 2019
2d29404
Remove metrics from the API.
mccheah Jul 29, 2019
06ea01a
Address more comments.
mccheah Jul 29, 2019
7dceec9
Args per line
mccheah Jul 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw it was recommended that this package be used instead of o.a.s.api. The problem is that org.apache.spark.shuffle is explicit removed from the documentation, and we want this to (eventually) be documented. So either need to go back to the old package, or tweak SparkBuild.scala to not filter this sub-package...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned about conflicts with the other kinds of APIs in the org.apache.spark.api.* namespace, particularly because these are all related to other language bindings, e.g. org.apache.spark.api.java.function.Function, org.apache.spark.api.r.RRDD. Let's modify SparkBuild.scala instead - I'll look into what that would require. Can that be done in a follow-up PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Better be proactive and file a bug to make these interfaces non-Private and at the same time make sure they're showing up properly in documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


import org.apache.spark.annotation.Private;

/**
* :: Private ::
* An interface for plugging in modules for storing and reading temporary shuffle data.
* <p>
* This is the root of a plugin system for storing shuffle bytes to arbitrary storage
* backends in the sort-based shuffle algorithm implemented by the
* {@link org.apache.spark.shuffle.sort.SortShuffleManager}. If another shuffle algorithm is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to check how these links render in the final documentation, since as I mentioned that package is removed from public docs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the link isn't made.
Screen Shot 2019-08-01 at 4 52 00 PM

* needed instead of sort-based shuffle, one should implement
* {@link org.apache.spark.shuffle.ShuffleManager} instead.
* <p>
* A single instance of this module is loaded per process in the Spark application.
* The default implementation reads and writes shuffle data from the local disks of
* the executor, and is the implementation of shuffle file storage that has remained
* consistent throughout most of Spark's history.
* <p>
* Alternative implementations of shuffle data storage can be loaded via setting
* <code>spark.shuffle.sort.io.plugin.class</code>.
* @since 3.0.0
*/
@Private
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question from SPARK-28568. Is it an API or not? Looks so given the PR description. @Private is:

  • This should be used only when the standard Scala / Java means of protecting classes are
  • insufficient. In particular, Java has no equivalent of private[spark], so we use this annotation
  • in its place.

So @Private doesn't look like for APIs. Shall we change it to @Unstable (maybe with an explicit warning)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon it'll all eventually be @Experimental, but we decided to start by making it @Private just in case spark 3.0 gets released in the middle. (discussed here: #25007 (comment))

Looks like we forgot to file a follow up jira about that, I just filed https://issues.apache.org/jira/browse/SPARK-28592

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, okie. That's good.
My impression was that @Unstable guarantees less than @Experimental. Maybe we can consider this point as well later.

public interface ShuffleDataIO {

This comment was marked as resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add some JavaDoc explaining the difference between the ShuffleManager plugin and this plugin system.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A question that may be naive, why do we choose Java over Scala? I see Spark classes except the ones dealing with underlying memory write in Scala...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a public interface, it is better to use Java, so that other users can implement with Java, Scala or other jvm languages.

If we defined the APIs using Scala, mostly user can only use Scala to implement it, unless it is well designed to avoid Scala specific features, so that it can be leveraged by Java.


/**
* Called once on executor processes to bootstrap the shuffle data storage modules that
* are only invoked on the executors.
*/
ShuffleExecutorComponents executor();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api;

import java.io.IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: white space between different import groups.


import org.apache.spark.annotation.Private;

/**
* :: Private ::
* An interface for building shuffle support for Executors.
*
* @since 3.0.0
*/
@Private
public interface ShuffleExecutorComponents {

/**
* Called once per executor to bootstrap this module with state that is specific to
* that executor, specifically the application ID and executor ID.
*/
void initializeExecutor(String appId, String execId);

/**
* Called once per map task to create a writer that will be responsible for persisting all the
* partitioned bytes written by that map task.
* @param shuffleId Unique identifier for the shuffle the map task is a part of
* @param mapId Within the shuffle, the identifier of the map task
* @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task
* with the same (shuffleId, mapId) pair can be distinguished by the
* different values of mapTaskAttemptId.
* @param numPartitions The number of partitions that will be written by the map task. Some of
* these partitions may be empty.
*/
ShuffleMapOutputWriter createMapOutputWriter(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During the fix of SPARK-25341, we need to pass more param into shuffle writer and shuffle block resolver, give #25361 for the quick API change review. Thanks :)

int shuffleId,
int mapId,
long mapTaskAttemptId,
int numPartitions) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api;

import java.io.IOException;

import org.apache.spark.annotation.Private;

/**
* :: Private ::
* A top-level writer that returns child writers for persisting the output of a map task,
* and then commits all of the writes as one atomic operation.
*
* @since 3.0.0
*/
@Private
public interface ShuffleMapOutputWriter {

/**
* Creates a writer that can open an output stream to persist bytes targeted for a given reduce
* partition id.
* <p>

This comment was marked as resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want these to have line breaks in the generated HTML. But I'm not sure what the stance is across the rest of the codebase - we can remove these if pretty-formatting with line breaks isn't necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think it is needed for javadoc, though its not needed for scaladoc. IMO its worth keeping them.

https://www.oracle.com/technetwork/java/javase/documentation/index-137868.html#format

* The chunk corresponds to bytes in the given reduce partition. This will not be called twice
* for the same partition within any given map task. The partition identifier will be in the
* range of precisely 0 (inclusive) to numPartitions (exclusive), where numPartitions was
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mention in order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the docs more thorough, indicating ordering and also indicating how there's no guarantee that this will be called for an empty partition.

* provided upon the creation of this map output writer via
* {@link ShuffleExecutorComponents#createMapOutputWriter(int, int, long, int)}.
* <p>
* Calls to this method will be invoked with monotonically increasing reducePartitionIds; each
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How useful is this? I think we can make Spark shuffle more flexible if we don't guarantee this. Do you have a concrete example of how an implementation can leverage this guarantee?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark's existing implementation makes this assumption. The index & data file assume they are in sequential order.

though it would be really easy to change the index format to allow for the order to random (just need to include a start and end, rather having the end be implicit).

* call to this method will be called with a reducePartitionId that is strictly greater than
* the reducePartitionIds given to any previous call to this method. This method is not
* guaranteed to be called for every partition id in the above described range. In particular,
* no guarantees are made as to whether or not this method will be called for empty partitions.
*/
ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws IOException;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "calls to this method will be invoked with monotonically increasing reducePartitionIds"? This may cause potential issues in future and cause burden on implementation. for example, if people want to implement multiple partition writers and write shuffle data in parallel. It cannot guarantee monotonically increasing reducePartitionIds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

People using this will be using it with SortShuffleManager which has a specific algorithm that won't open streams in parallel. If these invariants are broken, it implies the algorithm has changed, in which case we'd need to reconsider these APIs.


/**
* Commits the writes done by all partition writers returned by all calls to this object's
* {@link #getPartitionWriter(int)}.
* <p>
* This should ensure that the writes conducted by this module's partition writers are
* available to downstream reduce tasks. If this method throws any exception, this module's
* {@link #abort(Throwable)} method will be invoked before propagating the exception.
* <p>
* This can also close any resources and clean up temporary state if necessary.
*/
void commitAllPartitions() throws IOException;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this return Optional<MapShuffleLocations>?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gczsjdy any reason to return Optional<MapShuffleLocations>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We ended up adjusting the API for shuffle locations. This will come later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the SPIP has the latest API.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao @mccheah has explained well, because Optional<MapShuffleLocations> make implementers customize locations recorded in Driver. @mccheah This will be in driver lifecycle subissue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to that effect yeah - it also has implications on the reader API, but these are concerns to be addressed in subsequent patches.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. : )


/**
* Abort all of the writes done by any writers returned by {@link #getPartitionWriter(int)}.
* <p>
* This should invalidate the results of writing bytes. This can also close any resources and
* clean up temporary state if necessary.
*/
void abort(Throwable error) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api;

import java.io.IOException;
import java.util.Optional;
import java.io.OutputStream;

import org.apache.spark.annotation.Private;

/**
* :: Private ::
* An interface for opening streams to persist partition bytes to a backing data store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add that this stores bytes for one (mapper, reducer) pair, which corresponds to one ShuffleBlock

* <p>
* This writer stores bytes for one (mapper, reducer) pair, corresponding to one shuffle
* block.
*
* @since 3.0.0
*/
@Private
public interface ShufflePartitionWriter {

/**
* Open and return an {@link OutputStream} that can write bytes to the underlying
* data store.
* <p>
* This method will only be called once on this partition writer in the map task, to write the
* bytes to the partition. The output stream will only be used to write the bytes for this
* partition. The map task closes this output stream upon writing all the bytes for this
* block, or if the write fails for any reason.
* <p>
* Implementations that intend on combining the bytes for all the partitions written by this
* map task should reuse the same OutputStream instance across all the partition writers provided
* by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that
* {@link OutputStream#close()} does not close the resource, since it will be reused across
* partition writes. The underlying resources should be cleaned up in
* {@link ShuffleMapOutputWriter#commitAllPartitions()} and
* {@link ShuffleMapOutputWriter#abort(Throwable)}.
*/
OutputStream openStream() throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to say more here about the lifecycle of this outputstream. In particular, that (a) the framework will only keep one of these outputstreams open at a time per map task (b) the framework ensures that the outputstreams are closed, even if there are any exceptions and (c) if an individual implementation wants to keep all the output for one map task together (like the index / data file organization of local shuffle output), then they may want to reuse the the real underlying outputstream across all ShufflePartitionWriters of one ShuffleMapOutputWriter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added more docs.


/**
* Opens and returns a {@link WritableByteChannelWrapper} for transferring bytes from
* input byte channels to the underlying shuffle data store.
* <p>
* This method will only be called once on this partition writer in the map task, to write the
* bytes to the partition. The channel will only be used to write the bytes for this
* partition. The map task closes this channel upon writing all the bytes for this
* block, or if the write fails for any reason.
* <p>
* Implementations that intend on combining the bytes for all the partitions written by this
* map task should reuse the same channel instance across all the partition writers provided
* by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that
* {@link WritableByteChannelWrapper#close()} does not close the resource, since the channel
* will be reused across partition writes. The underlying resources should be cleaned up in
* {@link ShuffleMapOutputWriter#commitAllPartitions()} and
* {@link ShuffleMapOutputWriter#abort(Throwable)}.
* <p>
* This method is primarily for advanced optimizations where bytes can be copied from the input
* spill files to the output channel without copying data into memory. If such optimizations are
* not supported, the implementation should return {@link Optional#empty()}. By default, the
* implementation returns {@link Optional#empty()}.
* <p>
* Note that the returned {@link WritableByteChannelWrapper} itself is closed, but not the
* underlying channel that is returned by {@link WritableByteChannelWrapper#channel()}. Ensure
* that the underlying channel is cleaned up in {@link WritableByteChannelWrapper#close()},
* {@link ShuffleMapOutputWriter#commitAllPartitions()}, or
* {@link ShuffleMapOutputWriter#abort(Throwable)}.
*/
default Optional<WritableByteChannelWrapper> openChannelWrapper() throws IOException {
return Optional.empty();
}

/**
* Returns the number of bytes written either by this writer's output stream opened by
* {@link #openStream()} or the byte channel opened by {@link #openChannelWrapper()}.
* <p>
* This can be different from the number of bytes given by the caller. For example, the
* stream might compress or encrypt the bytes before persisting the data to the backing
* data store.
*/
long getNumBytesWritten();
Copy link

@hiboyang hiboyang Jul 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class delegates writing to OutputStream by openStream(). Will getNumBytesWritten() in this class access internal state inside that OutputStream? How about let OutputStream track the number of bytes written so this class does not need to access OutputStream? One possible solution is to add a subclass of OutputStream to track number of bytes. Something like existing TimeTrackingOutputStream class in Spark which extends OutputStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that if the implementation also supports creating a custom WritableByteChannel, then the number of bytes written would be from that of the channel, not the output stream. One could see us having both a custom output stream and an added method on WritableByteChannelWrapper.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I also remember why we didn't attach it to the output stream - it's particularly because of the lifecycle. If we have an output stream for the partition that pads bytes upon closing the stream, it's unclear that one will continue to call methods on the output stream object after it has been closed. That's why we have the contract:

  1. Open stream for writing bytes.
  2. Write bytes
  3. Close stream
  4. Get written bytes for that partition, accounting for the fact that the above step closed the stream.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the OutputStream returned by openStream() is tightly coupled with ShufflePartitionWriter. Could we merge them together into one class, e.g.

ShufflePartitionWriterStream extends OutputStream {
  open();
  getNumBytesWritten();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An OutputStream instance is considered opened as soon as the object exists, which is why OutputStream extends Closeable. As soon as I have a reference to the OutputStream object I can call write on it to push bytes to the sink. So having a separate open method doesn't make sense.

The open method belongs in the ShufflePartitionWriter API, which is effectively what we have with openStream and openChannel.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I mean the OutputStream returned by openStream() is tightly coupled with ShufflePartitionWriter, thus suggest merging them together. for example, rename ShufflePartitionWriter to ShufflePartitionWriterStream which extends OutputStream:

ShufflePartitionWriterStream extends OutputStream {
void open();
long getNumBytesWritten();
}

In this case, user do not need to create a ShufflePartitionWriter and then call its openStream() method to get an OutputStream. Instead, user will create ShufflePartitionWriterStream, which is already an OutputStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But again, do we call getNumBytesWritten before or after calling close on this object? If before, does it include the bytes that might be padded in close-ing the stream? If after, are we going to be invoking methods on a closed resource, and is that reasonable?

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api;

import java.io.Closeable;
import java.nio.channels.WritableByteChannel;

import org.apache.spark.annotation.Private;

/**
* :: Private ::
*
* A thin wrapper around a {@link WritableByteChannel}.
* <p>
* This is primarily provided for the local disk shuffle implementation to provide a
* {@link java.nio.channels.FileChannel} that keeps the channel open across partition writes.
*
* @since 3.0.0
*/
@Private
public interface WritableByteChannelWrapper extends Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we only need a wrapper for WritableByteChannel, but not OutputStream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to return the FileChannel object directly to the caller, because FileChannel#transfer[from|to] checks instanceof on the argument channel to transfer to/from in order to decide to optimize via zero-memory copy. Extending FileChannel is nearly impossible since it's an internal JDK abstract class with a lot of methods. But if we return the FileChannel, we have no way to shield the channel from being closed so that we can share the same channel resource across partitions.

This has come up in #25007 (comment) and palantir#535 and especially palantir#535 (comment). Given that this has come up as a question a number of times, I wonder if there's a better way we can make the semantics more accessible. I don't see a way to improve the architecture itself, but perhaps better documentation in the right places explaining why we went about this the way we did is warranted.


/**
* The underlying channel to write bytes into.
*/
WritableByteChannel channel();
}
Loading