Skip to content

Commit

Permalink
feat(csharp/src/Drivers/Apache/Spark): add request_timeout_ms option …
Browse files Browse the repository at this point in the history
…to allow longer HTTP request length (#2218)

Adds a new connection option to allow longer HTTP request length

| Property               | Description | Default |
| :---                   | :---        | :---    |
| `adbc.spark.http_request_timeout_ms` | Sets the timeout (in
milliseconds) when making requests to the Spark server (type: `http`).
Set the value higher than the default if you notice errors due to
network timeouts. | `30000` |
  • Loading branch information
birschick-bq authored Oct 9, 2024
1 parent a5eb60c commit 5f1f675
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 2 deletions.
2 changes: 2 additions & 0 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ internal async Task OpenAsync()

protected internal HiveServer2TlsOption TlsOptions { get; set; } = HiveServer2TlsOption.Empty;

protected internal int HttpRequestTimeout { get; set; } = 30000;

protected abstract Task<TTransport> CreateTransportAsync();

protected abstract Task<TProtocol> CreateProtocolAsync(TTransport transport);
Expand Down
1 change: 1 addition & 0 deletions csharp/src/Drivers/Apache/Spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ but can also be passed in the call to `AdbcDatabase.Connect`.
| `password` | The password for the user name used for basic authentication. | |
| `adbc.spark.data_type_conv` | Comma-separated list of data conversion options. Each option indicates the type of conversion to perform on data returned from the Spark server. <br><br>Allowed values: `none`, `scalar`. <br><br>Option `none` indicates there is no conversion from Spark type to native type (i.e., no conversion from String to Timestamp for Apache Spark over HTTP). Example `adbc.spark.conv_data_type=none`. <br><br>Option `scalar` will perform conversion (if necessary) from the Spark data type to corresponding Arrow data types for types `DATE/Date32/DateTime`, `DECIMAL/Decimal128/SqlDecimal`, and `TIMESTAMP/Timestamp/DateTimeOffset`. Example `adbc.spark.conv_data_type=scalar` | `scalar` |
| `adbc.spark.tls_options` | Comma-separated list of TLS/SSL options. Each option indicates the TLS/SSL option when connecting to a Spark server. <br><br>Allowed values: `allow_self_signed`, `allow_hostname_mismatch`. <br><br>Option `allow_self_signed` allows certificate errors due to an unknown certificate authority, typically when using a self-signed certificate. Option `allow_hostname_mismatch` allow certificate errors due to a mismatch of the hostname. (e.g., when connecting through an SSH tunnel). Example `adbc.spark.tls_options=allow_self_signed` | |
| `adbc.spark.http_request_timeout_ms` | Sets the timeout (in milliseconds) when making requests to the Spark server (type: `http`). Set the value higher than the default if you notice errors due to network timeouts. | `30000` |
| `adbc.statement.batch_size` | Sets the maximum number of rows to retrieve in a single batch request. | `50000` |
| `adbc.statement.polltime_milliseconds` | If polling is necessary to get a result, this option sets the length of time (in milliseconds) to wait between polls. | `500` |

Expand Down
15 changes: 13 additions & 2 deletions csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
Expand Down Expand Up @@ -118,7 +119,14 @@ protected override void ValidateOptions()
Properties.TryGetValue(SparkParameters.DataTypeConv, out string? dataTypeConv);
DataTypeConversion = DataTypeConversionParser.Parse(dataTypeConv);
Properties.TryGetValue(SparkParameters.TLSOptions, out string? tlsOptions);
TlsOptions = Hive2.TlsOptionsParser.Parse(tlsOptions);
TlsOptions = TlsOptionsParser.Parse(tlsOptions);
Properties.TryGetValue(SparkParameters.HttpRequestTimeoutMilliseconds, out string? requestTimeoutMs);
if (requestTimeoutMs != null)
{
HttpRequestTimeout = int.TryParse(requestTimeoutMs, NumberStyles.Integer, CultureInfo.InvariantCulture, out int requestTimeoutMsValue) && requestTimeoutMsValue > 0
? requestTimeoutMsValue
: throw new ArgumentOutOfRangeException(SparkParameters.HttpRequestTimeoutMilliseconds, requestTimeoutMs, $"must be a value between 1 .. {int.MaxValue}. default is 30000 milliseconds.");
}
}

internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, CancellationToken cancellationToken = default) => new HiveServer2Reader(statement, schema, dataTypeConversion: statement.Connection.DataTypeConversion, cancellationToken);
Expand Down Expand Up @@ -154,7 +162,10 @@ protected override Task<TTransport> CreateTransportAsync()
httpClient.DefaultRequestHeaders.ExpectContinue = false;

TConfiguration config = new();
ThriftHttpTransport transport = new(httpClient, config);
ThriftHttpTransport transport = new(httpClient, config)
{
ConnectTimeout = HttpRequestTimeout,
};
return Task.FromResult<TTransport>(transport);
}

Expand Down
1 change: 1 addition & 0 deletions csharp/src/Drivers/Apache/Spark/SparkParameters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public static class SparkParameters
public const string Type = "adbc.spark.type";
public const string DataTypeConv = "adbc.spark.data_type_conv";
public const string TLSOptions = "adbc.spark.tls_options";
public const string HttpRequestTimeoutMilliseconds = "adbc.spark.http_request_timeout_ms";
}

public static class SparkAuthTypeConstants
Expand Down
3 changes: 3 additions & 0 deletions csharp/test/Drivers/Apache/ApacheTestConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,8 @@ public class ApacheTestConfiguration : TestConfiguration
[JsonPropertyName("polltime_milliseconds"), JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public string PollTimeMilliseconds { get; set; } = string.Empty;

[JsonPropertyName("http_request_timeout_ms"), JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public string HttpRequestTimeoutMilliseconds { get; set; } = string.Empty;

}
}
5 changes: 5 additions & 0 deletions csharp/test/Drivers/Apache/Spark/SparkConnectionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public InvalidConnectionParametersTestData()
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Databricks, [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [AdbcOptions.Uri] = "httpxxz://hostname.com" }, typeof(ArgumentOutOfRangeException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Databricks, [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [AdbcOptions.Uri] = "http-//hostname.com" }, typeof(UriFormatException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Databricks, [SparkParameters.HostName] = "valid.server.com", [SparkParameters.Token] = "abcdef", [AdbcOptions.Uri] = "httpxxz://hostname.com:1234567890" }, typeof(UriFormatException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword" , [SparkParameters.HttpRequestTimeoutMilliseconds] = "0" }, typeof(ArgumentOutOfRangeException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [SparkParameters.HttpRequestTimeoutMilliseconds] = "-1" }, typeof(ArgumentOutOfRangeException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [SparkParameters.HttpRequestTimeoutMilliseconds] = ((long)int.MaxValue + 1).ToString() }, typeof(ArgumentOutOfRangeException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [SparkParameters.HttpRequestTimeoutMilliseconds] = "non-numeric" }, typeof(ArgumentOutOfRangeException)));
Add(new(new() { [SparkParameters.Type] = SparkServerTypeConstants.Http, [SparkParameters.HostName] = "valid.server.com", [AdbcOptions.Username] = "user", [AdbcOptions.Password] = "myPassword", [SparkParameters.HttpRequestTimeoutMilliseconds] = "" }, typeof(ArgumentOutOfRangeException)));
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions csharp/test/Drivers/Apache/Spark/SparkTestEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ public override Dictionary<string, string> GetDriverParameters(SparkTestConfigur
{
parameters.Add(HiveServer2Statement.Options.PollTimeMilliseconds, testConfiguration.PollTimeMilliseconds!);
}
if (!string.IsNullOrEmpty(testConfiguration.HttpRequestTimeoutMilliseconds))
{
parameters.Add(SparkParameters.HttpRequestTimeoutMilliseconds, testConfiguration.HttpRequestTimeoutMilliseconds!);
}

return parameters;
}
Expand Down

0 comments on commit 5f1f675

Please sign in to comment.