-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44263][CONNECT] Custom Interceptors Support #41880
Conversation
|
||
override def thisUsesUnstableApi(): Unit = { | ||
// Marks this API is not stable. Left empty on purpose. | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API is no longer unstable (was stabilized) so this can be removed. See grpc/grpc-java#1914
}) | ||
.build() | ||
|
||
val session = SparkSession.builder().client(client).create() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client()
method is private[sql]
for anyone providing any kinds of wrappers on top of the SparkSession for custom auth integrations it's not possible to use this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The proposed usage is rather
SparkSession.builder().interceptor(my interceptor).create()
, and this one is public.
Perhaps I should add a test doing this exact line nearby
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now there is an external api test as well (in SparkSessionSuite)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think generally the approach goes in the right direction, we just need to figure out a better wrapping for the construction of the class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, lets polish and get it in.
...nnect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
Outdated
Show resolved
Hide resolved
private def startDummyServer(port: Int): Unit = { | ||
service = new DummySparkConnectService | ||
server = NettyServerBuilder | ||
.forPort(port) | ||
.addService(service) | ||
.build() | ||
server.start() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have something called e2e client test for this that does properly start the service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you refer me to ClientE2ETestSuite? I will take a look. Here I was following how it's done in SparkConnectClientSuite. If there is a better way, we should replace how the server starts in both places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check ClientE2ETestSuite
which uses RemoteSparkSession
which is probably what you want most.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well I was going to complain that ClientE2ETestSuite is not a great fit to reuse, for some reasons such as it owns how the session is created and it lasts for the whole suite, but actually I realized that my test doesn't event need a server, so I dropped the server creation altogether
@@ -656,6 +654,11 @@ object SparkSession extends Logging { | |||
this | |||
} | |||
|
|||
def interceptor(interceptor: ClientInterceptor): Builder = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doc please :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the public facing part of the interface and needs the most extensive documentation,.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, good point :). I didn't do it because other public methods here are undocumented, but that's probably not a good idea either
...nnect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
Outdated
Show resolved
Hide resolved
@cdkrot Please make the PR description very explicit, ideally with a code snippet of how to use it and why this is needed. |
…ql/connect/client/SparkConnectClient.scala Co-authored-by: Martin Grund <[email protected]>
…ql/connect/client/SparkConnectClient.scala Co-authored-by: Martin Grund <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's get the nits out the way and it's good to merge. @HyukjinKwon and @hvanhovell can you please have a look as well.
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
Show resolved
Hide resolved
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can merge when the tests pass.
I fixed tests, and also this weird SQL<->Connect comparison errors. Hope we can get this merged. cc @HyukjinKwon |
Merged to master. |
### What changes were proposed in this pull request? Extend SparkSession to allow custom interceptors (grpc's ClientInterceptor) ### Why are the changes needed? This can be used for different purposes, such as customized authentication mechanisms This is similar to Python which allows custom channel builders (apache#40993) ### Does this PR introduce _any_ user-facing change? SparkSession allows custom interceptors. The proposed usage is ``` val interceptor = new ClientInterceptor {....} val session = SparkSession.builder().interceptor(interceptor).create() ``` Or same with more than one interceptor ### How was this patch tested? UT Closes apache#41880 from cdkrot/scala_channel_builder_2. Lead-authored-by: Alice Sayutina <[email protected]> Co-authored-by: Alice Sayutina <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
Extend SparkSession to allow custom interceptors (grpc's ClientInterceptor)
Why are the changes needed?
This can be used for different purposes, such as customized authentication mechanisms
This is similar to Python which allows custom channel builders (#40993)
Does this PR introduce any user-facing change?
SparkSession allows custom interceptors. The proposed usage is
Or same with more than one interceptor
How was this patch tested?
UT