Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7481] [build] Add spark-hadoop-cloud module to pull in object store access. #17834

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1da9a3d
[SPARK-7481] stripped down packaging only module
steveloughran Nov 18, 2016
028d9ed
[SPARK-7481] basic instantiation tests verify that dependency hadoop-…
steveloughran Nov 18, 2016
ace46e9
[SPARK-7481] tests restricted to instantiation; logging modified appr…
steveloughran Nov 18, 2016
3f6dfda
[SPARK-7481] declare httpcomponents:httpclient explicitly, as downstr…
steveloughran Nov 21, 2016
5f8f996
[SPARK-7481] update docs by culling section on cloud integration test…
steveloughran Nov 21, 2016
e92a493
[SPARK-7481] updated documentation as per review
steveloughran Nov 28, 2016
97e80e1
[SPARK-7481] SBT will build this now, optionally
steveloughran Nov 28, 2016
ef3cebf
[SPARK-7481] cloud POM includes jackson-dataformat-cbor, so that the …
steveloughran Nov 28, 2016
66650c7
[SPARK-7481] rebase with master; Pom had got out of sync
steveloughran Dec 1, 2016
31cc37e
[SPARK-7481] rename spark-cloud module to spark-hadoo-cloud, in POMs …
steveloughran Dec 2, 2016
2fc6f23
[SPARK-7841] bump up cloud pom to 2.2.0-SNAPSHOT; other minor pom cle…
steveloughran Dec 14, 2016
65f6814
[SPARK-7481] builds against Hadoop shaded 3.x clients failing as dire…
steveloughran Jan 10, 2017
73820a3
[SPARK-7481] update 2.7 dependencies to include azure, aws and openst…
steveloughran Jan 20, 2017
824d801
[SPARK-7481] add joda time as the dependency. Tested against hadoop b…
steveloughran Jan 30, 2017
12a1b84
SPARK-7481 purge all tests from the cloud module
steveloughran Feb 24, 2017
a7a2dec
SPARK-7481 add cloud module to sbt sequence
steveloughran Mar 20, 2017
02f6e19
SPARK-7481 break line of mvn XML declaration
steveloughran Mar 20, 2017
ce042d2
SPARK-7481 cloud pom is still JAR (not pom). works against Hadoop 2.6…
steveloughran Mar 20, 2017
a985753
SPARK-7481 move to Spark 2.3.0-SNAPSHOT
steveloughran Apr 27, 2017
0e0527d
tweaked pom; updated docs
steveloughran Apr 27, 2017
b78158f
SPARK-7481 strip down the docs to a bare minimum: FS differences, sec…
steveloughran Apr 28, 2017
de3e95b
SPARK-7481 doc review
steveloughran Apr 28, 2017
9b1579b
review comments
steveloughran Apr 28, 2017
844e255
SPARK-7481 more proofreading
steveloughran May 2, 2017
72a03ed
SPARK-7481 proofreading docs
steveloughran May 2, 2017
b788494
SPARK-7481 module rename, POM movement, docs
steveloughran May 3, 2017
e173e3f
SPARK-7481: artifact is spark-hadoop-cloud while project is hadoop-cl…
steveloughran May 4, 2017
32ebc8c
SPARK-7481: applied proofreading, moved links to https; also cut a co…
steveloughran May 5, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -226,5 +226,19 @@
<parquet.deps.scope>provided</parquet.deps.scope>
</properties>
</profile>

<!--
Pull in spark-hadoop-cloud and its associated JARs,
-->
<profile>
<id>hadoop-cloud</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hadoop-cloud_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
200 changes: 200 additions & 0 deletions docs/cloud-integration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
---
layout: global
displayTitle: Integration with Cloud Infrastructures
title: Integration with Cloud Infrastructures
description: Introduction to cloud storage support in Apache Spark SPARK_VERSION_SHORT
---
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->

* This will become a table of contents (this text will be scraped).
{:toc}

## Introduction


All major cloud providers offer persistent data storage in *object stores*.
These are not classic "POSIX" file systems.
In order to store hundreds of petabytes of data without any single points of failure,
object stores replace the classic filesystem directory tree
with a simpler model of `object-name => data`. To enable remote access, operations
on objects are usually offered as (slow) HTTP REST operations.

Spark can read and write data in object stores through filesystem connectors implemented
in Hadoop or provided by the infrastructure suppliers themselves.
These connectors make the object stores look *almost* like filesystems, with directories and files
and the classic operations on them such as list, delete and rename.


### Important: Cloud Object Stores are Not Real Filesystems

While the stores appear to be filesystems, underneath
they are still object stores, [and the difference is significant](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/filesystem/introduction.html)

They cannot be used as a direct replacement for a cluster filesystem such as HDFS
*except where this is explicitly stated*.

Key differences are:

* Changes to stored objects may not be immediately visible, both in directory listings and actual data access.
* The means by which directories are emulated may make working with them slow.
* Rename operations may be very slow and, on failure, leave the store in an unknown state.
* Seeking within a file may require new HTTP calls, hurting performance.

How does this affect Spark?

1. Reading and writing data can be significantly slower than working with a normal filesystem.
1. Some directory structures may be very inefficient to scan during query split calculation.
1. The output of work may not be immediately visible to a follow-on query.
1. The rename-based algorithm by which Spark normally commits work when saving an RDD, DataFrame or Dataset
is potentially both slow and unreliable.

For these reasons, it is not always safe to use an object store as a direct destination of queries, or as
an intermediate store in a chain of queries. Consult the documentation of the object store and its
connector to determine which uses are considered safe.

In particular: *without some form of consistency layer, Amazon S3 cannot
be safely used as the direct destination of work with the normal rename-based committer.*

### Installation

With the relevant libraries on the classpath and Spark configured with valid credentials,
objects can be can be read or written by using their URLs as the path to data.
For example `sparkContext.textFile("s3a://landsat-pds/scene_list.gz")` will create
an RDD of the file `scene_list.gz` stored in S3, using the s3a connector.

To add the relevant libraries to an application's classpath, include the `hadoop-cloud`
module and its dependencies.

In Maven, add the following to the `pom.xml` file, assuming `spark.version`
is set to the chosen version of Spark:

{% highlight xml %}
<dependencyManagement>
...
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>hadoop-cloud_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
...
</dependencyManagement>
{% endhighlight %}

Commercial products based on Apache Spark generally directly set up the classpath
for talking to cloud infrastructures, in which case this module may not be needed.

### Authenticating

Spark jobs must authenticate with the object stores to access data within them.

1. When Spark is running in a cloud infrastructure, the credentials are usually automatically set up.
1. `spark-submit` reads the `AWS_ACCESS_KEY`, `AWS_SECRET_KEY`
and `AWS_SESSION_TOKEN` environment variables and sets the associated authentication options
for the `s3n` and `s3a` connectors to Amazon S3.
1. In a Hadoop cluster, settings may be set in the `core-site.xml` file.
1. Authentication details may be manually added to the Spark configuration in `spark-default.conf`
1. Alternatively, they can be programmatically set in the `SparkConf` instance used to configure
the application's `SparkContext`.

*Important: never check authentication secrets into source code repositories,
especially public ones*

Consult [the Hadoop documentation](https://hadoop.apache.org/docs/current/) for the relevant
configuration and security options.

## Configuring

Each cloud connector has its own set of configuration parameters, again,
consult the relevant documentation.

### Recommended settings for writing to object stores

For object stores whose consistency model means that rename-based commits are safe
use the `FileOutputCommitter` v2 algorithm for performance:

```
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
```

This does less renaming at the end of a job than the "version 1" algorithm.
As it still uses `rename()` to commit files, it is unsafe to use
when the object store does not have consistent metadata/listings.

The committer can also be set to ignore failures when cleaning up temporary
files; this reduces the risk that a transient network problem is escalated into a
job failure:

```
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true
```

As storing temporary files can run up charges; delete
directories called `"_temporary"` on a regular basis to avoid this.

### Parquet I/O Settings

For optimal performance when working with Parquet data use the following settings:

```
spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.hive.metastorePartitionPruning true
```

These minimise the amount of data read during queries.

### ORC I/O Settings

For best performance when working with ORC data, use these settings:

```
spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.hive.metastorePartitionPruning true
```

Again, these minimise the amount of data read during queries.

## Spark Streaming and Object Storage

Spark Streaming can monitor files added to object stores, by
creating a `FileInputDStream` to monitor a path in the store through a call to
`StreamingContext.textFileStream()`.

1. The time to scan for new files is proportional to the number of files
under the path, not the number of *new* files, so it can become a slow operation.
The size of the window needs to be set to handle this.

1. Files only appear in an object store once they are completely written; there
is no need for a worklow of write-then-rename to ensure that files aren't picked up
while they are still being written. Applications can write straight to the monitored directory.

1. Streams should only be checkpointed to an store implementing a fast and
atomic `rename()` operation Otherwise the checkpointing may be slow and potentially unreliable.

## Further Reading

Here is the documentation on the standard connectors both from Apache and the cloud providers.

* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html). Hadoop 2.6+
* [Azure Blob Storage](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html). Since Hadoop 2.7
* [Azure Data Lake](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html). Since Hadoop 2.8
* [Amazon S3 via S3A and S3N](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html). Hadoop 2.6+
* [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon
* [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector). From Google


1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ options for deployment:
* [Security](security.html): Spark security support
* [Hardware Provisioning](hardware-provisioning.html): recommendations for cluster hardware
* Integration with other storage systems:
* [Cloud Infrastructures](cloud-integration.html)
* [OpenStack Swift](storage-openstack-swift.html)
* [Building Spark](building-spark.html): build Spark using the Maven system
* [Contributing to Spark](http://spark.apache.org/contributing.html)
Expand Down
6 changes: 3 additions & 3 deletions docs/rdd-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ One important parameter for parallel collections is the number of *partitions* t

Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, [Amazon S3](http://wiki.apache.org/hadoop/AmazonS3), etc. Spark supports text files, [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), and any other Hadoop [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html).

Text file RDDs can be created using `SparkContext`'s `textFile` method. This method takes an URI for the file (either a local path on the machine, or a `hdfs://`, `s3n://`, etc URI) and reads it as a collection of lines. Here is an example invocation:
Text file RDDs can be created using `SparkContext`'s `textFile` method. This method takes an URI for the file (either a local path on the machine, or a `hdfs://`, `s3a://`, etc URI) and reads it as a collection of lines. Here is an example invocation:

{% highlight scala %}
scala> val distFile = sc.textFile("data.txt")
Expand Down Expand Up @@ -356,7 +356,7 @@ Apart from text files, Spark's Scala API also supports several other data format

Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, [Amazon S3](http://wiki.apache.org/hadoop/AmazonS3), etc. Spark supports text files, [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), and any other Hadoop [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html).

Text file RDDs can be created using `SparkContext`'s `textFile` method. This method takes an URI for the file (either a local path on the machine, or a `hdfs://`, `s3n://`, etc URI) and reads it as a collection of lines. Here is an example invocation:
Text file RDDs can be created using `SparkContext`'s `textFile` method. This method takes an URI for the file (either a local path on the machine, or a `hdfs://`, `s3a://`, etc URI) and reads it as a collection of lines. Here is an example invocation:

{% highlight java %}
JavaRDD<String> distFile = sc.textFile("data.txt");
Expand Down Expand Up @@ -388,7 +388,7 @@ Apart from text files, Spark's Java API also supports several other data formats

PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, [Amazon S3](http://wiki.apache.org/hadoop/AmazonS3), etc. Spark supports text files, [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), and any other Hadoop [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html).

Text file RDDs can be created using `SparkContext`'s `textFile` method. This method takes an URI for the file (either a local path on the machine, or a `hdfs://`, `s3n://`, etc URI) and reads it as a collection of lines. Here is an example invocation:
Text file RDDs can be created using `SparkContext`'s `textFile` method. This method takes an URI for the file (either a local path on the machine, or a `hdfs://`, `s3a://`, etc URI) and reads it as a collection of lines. Here is an example invocation:

{% highlight python %}
>>> distFile = sc.textFile("data.txt")
Expand Down
38 changes: 12 additions & 26 deletions docs/storage-openstack-swift.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ same URI formats as in Hadoop. You can specify a path in Swift as input through
URI of the form <code>swift://container.PROVIDER/path</code>. You will also need to set your
Swift security credentials, through <code>core-site.xml</code> or via
<code>SparkContext.hadoopConfiguration</code>.
Current Swift driver requires Swift to use Keystone authentication method.
The current Swift driver requires Swift to use the Keystone authentication method, or
its Rackspace-specific predecessor.

# Configuring Swift for Better Data Locality

Expand All @@ -19,41 +20,30 @@ Although not mandatory, it is recommended to configure the proxy server of Swift

# Dependencies

The Spark application should include <code>hadoop-openstack</code> dependency.
The Spark application should include <code>hadoop-openstack</code> dependency, which can
be done by including the `hadoop-cloud` module for the specific version of spark used.
For example, for Maven support, add the following to the <code>pom.xml</code> file:

{% highlight xml %}
<dependencyManagement>
...
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-openstack</artifactId>
<version>2.3.0</version>
<groupId>org.apache.spark</groupId>
<artifactId>hadoop-cloud_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
...
</dependencyManagement>
{% endhighlight %}


# Configuration Parameters

Create <code>core-site.xml</code> and place it inside Spark's <code>conf</code> directory.
There are two main categories of parameters that should to be configured: declaration of the
Swift driver and the parameters that are required by Keystone.
The main category of parameters that should be configured are the authentication parameters
required by Keystone.

Configuration of Hadoop to use Swift File system achieved via

<table class="table">
<tr><th>Property Name</th><th>Value</th></tr>
<tr>
<td>fs.swift.impl</td>
<td>org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem</td>
</tr>
</table>

Additional parameters required by Keystone (v2.0) and should be provided to the Swift driver. Those
parameters will be used to perform authentication in Keystone to access Swift. The following table
contains a list of Keystone mandatory parameters. <code>PROVIDER</code> can be any name.
The following table contains a list of Keystone mandatory parameters. <code>PROVIDER</code> can be
any (alphanumeric) name.

<table class="table">
<tr><th>Property Name</th><th>Meaning</th><th>Required</th></tr>
Expand Down Expand Up @@ -94,7 +84,7 @@ contains a list of Keystone mandatory parameters. <code>PROVIDER</code> can be a
</tr>
<tr>
<td><code>fs.swift.service.PROVIDER.public</code></td>
<td>Indicates if all URLs are public</td>
<td>Indicates whether to use the public (off cloud) or private (in cloud; no transfer fees) endpoints</td>
<td>Mandatory</td>
</tr>
</table>
Expand All @@ -104,10 +94,6 @@ defined for tenant <code>test</code>. Then <code>core-site.xml</code> should inc

{% highlight xml %}
<configuration>
<property>
<name>fs.swift.impl</name>
<value>org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem</value>
</property>
<property>
<name>fs.swift.service.SparkTest.auth.url</name>
<value>http://127.0.0.1:5000/v2.0/tokens</value>
Expand Down
Loading