Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(csharp/src/Drivers/Apache/Spark): add request_timeout_ms option to allow longer HTTP request length #2218

Merged
2 changes: 2 additions & 0 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ internal async Task OpenAsync()

protected internal HiveServer2TlsOption TlsOptions { get; set; } = HiveServer2TlsOption.Empty;

protected internal int HttpRequestTimeout { get; set; } = 30000;

protected abstract Task<TTransport> CreateTransportAsync();

protected abstract Task<TProtocol> CreateProtocolAsync(TTransport transport);
Expand Down
1 change: 1 addition & 0 deletions csharp/src/Drivers/Apache/Spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ but can also be passed in the call to `AdbcDatabase.Connect`.
| `password` | The password for the user name used for basic authentication. | |
| `adbc.spark.data_type_conv` | Comma-separated list of data conversion options. Each option indicates the type of conversion to perform on data returned from the Spark server. <br><br>Allowed values: `none`, `scalar`. <br><br>Option `none` indicates there is no conversion from Spark type to native type (i.e., no conversion from String to Timestamp for Apache Spark over HTTP). Example `adbc.spark.conv_data_type=none`. <br><br>Option `scalar` will perform conversion (if necessary) from the Spark data type to corresponding Arrow data types for types `DATE/Date32/DateTime`, `DECIMAL/Decimal128/SqlDecimal`, and `TIMESTAMP/Timestamp/DateTimeOffset`. Example `adbc.spark.conv_data_type=scalar` | `scalar` |
| `adbc.spark.tls_options` | Comma-separated list of TLS/SSL options. Each option indicates the TLS/SSL option when connecting to a Spark server. <br><br>Allowed values: `allow_self_signed`, `allow_hostname_mismatch`. <br><br>Option `allow_self_signed` allows certificate errors due to an unknown certificate authority, typically when using a self-signed certificate. Option `allow_hostname_mismatch` allow certificate errors due to a mismatch of the hostname. (e.g., when connecting through an SSH tunnel). Example `adbc.spark.tls_options=allow_self_signed` | |
| `adbc.spark.http_request_timeout_ms` | Sets the timeout (in milliseconds) when making requests to the Spark server (type: `http`). Set the value higher than the default if you notice errors due to network timeouts. | `30000` |
| `adbc.statement.batch_size` | Sets the maximum number of rows to retrieve in a single batch request. | `50000` |
| `adbc.statement.polltime_milliseconds` | If polling is necessary to get a result, this option sets the length of time (in milliseconds) to wait between polls. | `500` |

Expand Down
15 changes: 13 additions & 2 deletions csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
Expand Down Expand Up @@ -118,7 +119,14 @@ protected override void ValidateOptions()
Properties.TryGetValue(SparkParameters.DataTypeConv, out string? dataTypeConv);
DataTypeConversion = DataTypeConversionParser.Parse(dataTypeConv);
Properties.TryGetValue(SparkParameters.TLSOptions, out string? tlsOptions);
TlsOptions = Hive2.TlsOptionsParser.Parse(tlsOptions);
TlsOptions = TlsOptionsParser.Parse(tlsOptions);
Properties.TryGetValue(SparkParameters.HttpRequestTimeoutMilliseconds, out string? requestTimeoutMs);
if (requestTimeoutMs != null)
{
HttpRequestTimeout = int.TryParse(requestTimeoutMs, NumberStyles.Integer, CultureInfo.InvariantCulture, out int requestTimeoutMsValue) && requestTimeoutMsValue > 0
? requestTimeoutMsValue
: throw new ArgumentOutOfRangeException(SparkParameters.HttpRequestTimeoutMilliseconds, requestTimeoutMs, $"must be a value between 1 .. {int.MaxValue}. default is 30000 milliseconds.");
}
}

internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, CancellationToken cancellationToken = default) => new HiveServer2Reader(statement, schema, dataTypeConversion: statement.Connection.DataTypeConversion, cancellationToken);
Expand Down Expand Up @@ -154,7 +162,10 @@ protected override Task<TTransport> CreateTransportAsync()
httpClient.DefaultRequestHeaders.ExpectContinue = false;

TConfiguration config = new();
ThriftHttpTransport transport = new(httpClient, config);
ThriftHttpTransport transport = new(httpClient, config)
{
ConnectTimeout = HttpRequestTimeout,
};
return Task.FromResult<TTransport>(transport);
}

Expand Down
1 change: 1 addition & 0 deletions csharp/src/Drivers/Apache/Spark/SparkParameters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public static class SparkParameters
public const string Type = "adbc.spark.type";
public const string DataTypeConv = "adbc.spark.data_type_conv";
public const string TLSOptions = "adbc.spark.tls_options";
public const string HttpRequestTimeoutMilliseconds = "adbc.spark.http_request_timeout_ms";
}

public static class SparkAuthTypeConstants
Expand Down
3 changes: 3 additions & 0 deletions csharp/test/Drivers/Apache/ApacheTestConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,8 @@ public class ApacheTestConfiguration : TestConfiguration
[JsonPropertyName("polltime_milliseconds"), JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public string PollTimeMilliseconds { get; set; } = string.Empty;

[JsonPropertyName("http_request_timeout_ms"), JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public string HttpRequestTimeoutMilliseconds { get; set; } = string.Empty;

}
}
5 changes: 5 additions & 0 deletions csharp/test/Drivers/Apache/Spark/SparkConnectionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public InvalidConnectionParametersTestData()
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Databricks, [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [AdbcOptions.Uri] = "httpxxz://hostname.com" }, typeof(ArgumentOutOfRangeException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Databricks, [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [AdbcOptions.Uri] = "http-//hostname.com" }, typeof(UriFormatException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Databricks, [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [AdbcOptions.Uri] = "httpxxz://hostname.com:1234567890" }, typeof(UriFormatException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword" , [SparkParameters.HttpRequestTimeoutMilliseconds] = "0" }, typeof(ArgumentOutOfRangeException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [SparkParameters.HttpRequestTimeoutMilliseconds] = "-1" }, typeof(ArgumentOutOfRangeException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [SparkParameters.HttpRequestTimeoutMilliseconds] = ((long)int.MaxValue + 1).ToString() }, typeof(ArgumentOutOfRangeException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [SparkParameters.HttpRequestTimeoutMilliseconds] = "non-numeric" }, typeof(ArgumentOutOfRangeException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [SparkParameters.HttpRequestTimeoutMilliseconds] = "" }, typeof(ArgumentOutOfRangeException)));
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions csharp/test/Drivers/Apache/Spark/SparkTestEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ public override Dictionary<string, string> GetDriverParameters(SparkTestConfigur
{
parameters.Add(HiveServer2Statement.Options.PollTimeMilliseconds, testConfiguration.PollTimeMilliseconds!);
}
if (!string.IsNullOrEmpty(testConfiguration.HttpRequestTimeoutMilliseconds))
{
parameters.Add(SparkParameters.HttpRequestTimeoutMilliseconds, testConfiguration.HttpRequestTimeoutMilliseconds!);
}

return parameters;
}
Expand Down
Loading