Skip to content

Commit

Permalink
[BUG] Fix credentials issues in colab/CI (#1539)
Browse files Browse the repository at this point in the history
* We currently throw a ProviderError in colab/CI when attempting to
query for S3 credentials
* To circumvent this, we need to specify `anonymous=True` in all s3
accesses

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Oct 27, 2023
1 parent d24f0df commit 6bd8f51
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 18 deletions.
3 changes: 2 additions & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,9 @@ class PythonStorageConfig:
"""

fs: fsspec.AbstractFileSystem
io_config: IOConfig

def __init__(self, fs: fsspec.AbstractFileSystem | None = None): ...
def __init__(self, fs: fsspec.AbstractFileSystem | None = None, io_config: IOConfig | None = None): ...

class StorageConfig:
"""
Expand Down
2 changes: 1 addition & 1 deletion daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,6 @@ def read_csv(
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(None))
storage_config = StorageConfig.python(PythonStorageConfig(None, io_config=io_config))
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
return DataFrame(builder)
5 changes: 4 additions & 1 deletion daft/io/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from daft.api_annotations import PublicAPI
from daft.daft import (
FileFormatConfig,
IOConfig,
JsonSourceConfig,
PythonStorageConfig,
StorageConfig,
Expand All @@ -18,6 +19,7 @@
def read_json(
path: Union[str, List[str]],
schema_hints: Optional[Dict[str, DataType]] = None,
io_config: Optional["IOConfig"] = None,
) -> DataFrame:
"""Creates a DataFrame from line-delimited JSON file(s)
Expand All @@ -31,6 +33,7 @@ def read_json(
path (str): Path to JSON files (allows for wildcards)
schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option will
disable all schema inference on data being read, and throw an error if data being read is incompatible.
io_config (IOConfig): Config to be used with the native downloader
returns:
DataFrame: parsed DataFrame
Expand All @@ -40,6 +43,6 @@ def read_json(

json_config = JsonSourceConfig()
file_format_config = FileFormatConfig.from_json_config(json_config)
storage_config = StorageConfig.python(PythonStorageConfig(None))
storage_config = StorageConfig.python(PythonStorageConfig(None, io_config=io_config))
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
return DataFrame(builder)
5 changes: 2 additions & 3 deletions daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ def read_parquet(
schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option will
disable all schema inference on data being read, and throw an error if data being read is incompatible.
io_config (IOConfig): Config to be used with the native downloader
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet.
_multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
the amount of system resources (number of connections and thread contention) when running in the Ray runner.
Defaults to None, which will let Daft decide based on the runner it is currently using.
Expand All @@ -63,7 +62,7 @@ def read_parquet(
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(None))
storage_config = StorageConfig.python(PythonStorageConfig(None, io_config=io_config))

builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
return DataFrame(builder)
14 changes: 12 additions & 2 deletions daft/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
from typing import TYPE_CHECKING

from daft.context import get_context
from daft.daft import FileFormatConfig, NativeStorageConfig, StorageConfig
from daft.daft import (
FileFormatConfig,
NativeStorageConfig,
PythonStorageConfig,
StorageConfig,
)
from daft.datatype import DataType
from daft.logical.builder import LogicalPlanBuilder
from daft.logical.schema import Schema
Expand All @@ -30,10 +35,15 @@ def _get_tabular_files_scan(
schema_hint = _get_schema_from_hints(schema_hints) if schema_hints is not None else None

# Glob the path using the Runner
# NOTE: Globbing will
# NOTE: Globbing will always need the IOConfig, regardless of whether "native reads" are used
io_config = None
if isinstance(storage_config.config, NativeStorageConfig):
io_config = storage_config.config.io_config
elif isinstance(storage_config.config, PythonStorageConfig):
io_config = storage_config.config.io_config
else:
raise NotImplementedError(f"Tabular scan with config not implemented: {storage_config.config}")

runner_io = get_context().runner().runner_io()
file_infos = runner_io.glob_paths_details(paths, file_format_config=file_format_config, io_config=io_config)

Expand Down
25 changes: 19 additions & 6 deletions src/daft-plan/src/source_info/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use {
pub enum StorageConfig {
Native(Arc<NativeStorageConfig>),
#[cfg(feature = "python")]
Python(PythonStorageConfig),
Python(Arc<PythonStorageConfig>),
}

/// Storage configuration for the Rust-native I/O layer.
Expand Down Expand Up @@ -54,23 +54,36 @@ impl NativeStorageConfig {
/// Storage configuration for the legacy Python I/O layer.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[cfg(feature = "python")]
#[cfg_attr(feature = "python", pyclass(module = "daft.daft", get_all))]
#[cfg_attr(feature = "python", pyclass(module = "daft.daft"))]
pub struct PythonStorageConfig {
/// An fsspec filesystem instance.
#[pyo3(get)]
#[serde(
serialize_with = "serialize_py_object_optional",
deserialize_with = "deserialize_py_object_optional",
default
)]
pub fs: Option<PyObject>,
/// IOConfig is used for globbing still, but not for actual data read
pub io_config: Option<IOConfig>,
}

#[cfg(feature = "python")]
#[pymethods]
impl PythonStorageConfig {
#[new]
pub fn new(fs: Option<PyObject>) -> Self {
Self { fs }
pub fn new(fs: Option<PyObject>, io_config: Option<python::IOConfig>) -> Self {
Self {
fs,
io_config: io_config.map(|c| c.config),
}
}

#[getter]
pub fn io_config(&self) -> Option<python::IOConfig> {
self.io_config
.as_ref()
.map(|c| python::IOConfig { config: c.clone() })
}
}

Expand Down Expand Up @@ -126,7 +139,7 @@ impl PyStorageConfig {
/// Create from a Python storage config.
#[staticmethod]
fn python(config: PythonStorageConfig) -> Self {
Self(Arc::new(StorageConfig::Python(config)))
Self(Arc::new(StorageConfig::Python(config.into())))
}

/// Get the underlying storage config.
Expand All @@ -136,7 +149,7 @@ impl PyStorageConfig {

match self.0.as_ref() {
Native(config) => config.as_ref().clone().into_py(py),
Python(config) => config.clone().into_py(py),
Python(config) => config.as_ref().clone().into_py(py),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@
"import daft\n",
"\n",
"SAMPLE_DATA_PATH = \"s3://daft-public-data/redpajama-1t-sample/stackexchange_sample.jsonl\"\n",
"IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True)) # Use anonymous-mode for accessing AWS S3\n",
"\n",
"df = daft.read_json(SAMPLE_DATA_PATH)\n",
"df = daft.read_json(SAMPLE_DATA_PATH, io_config=IO_CONFIG)\n",
"\n",
"if CI:\n",
" df = df.limit(500)\n",
Expand Down
9 changes: 7 additions & 2 deletions tutorials/image_querying/top_n_red_color.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@
"source": [
"import daft\n",
"\n",
"df = daft.from_glob_path(\"s3://daft-public-data/open-images/validation-images/*\")\n",
"IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True)) # Use anonymous S3 access\n",
"\n",
"df = daft.from_glob_path(\n",
" \"s3://daft-public-data/open-images/validation-images/*\",\n",
" io_config=IO_CONFIG,\n",
")\n",
"\n",
"if USE_RAY:\n",
" df = df.limit(10000)\n",
Expand Down Expand Up @@ -318,7 +323,7 @@
},
"outputs": [],
"source": [
"df = df.with_column(\"image\", df[\"path\"].url.download())"
"df = df.with_column(\"image\", df[\"path\"].url.download(io_config=IO_CONFIG))"
]
},
{
Expand Down
3 changes: 2 additions & 1 deletion tutorials/text_to_image/using_cloud_with_ray.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@
},
"outputs": [],
"source": [
"images_df = parquet_df.with_column(\"images\", col(\"URL\").url.download())\n",
"io_config = daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True)) # Use anonymous-mode for accessing S3\n",
"images_df = parquet_df.with_column(\"images\", col(\"URL\").url.download(io_config=io_config))\n",
"images_df.collect()"
]
},
Expand Down

0 comments on commit 6bd8f51

Please sign in to comment.