Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove apache_beam import in BeamBasedBuilder._save_info #6265

Merged
merged 5 commits into from
Sep 28, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions src/datasets/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2099,14 +2099,15 @@ def _download_and_prepare(self, dl_manager, verification_mode, **prepare_splits_
self._rename(src_fpath, dst_fpath)

def _save_info(self):
import apache_beam as beam

fs = beam.io.filesystems.FileSystems
path_join = os.path.join if not is_remote_filesystem(self._fs) else posixpath.join
with fs.create(path_join(self._output_dir, config.DATASET_INFO_FILENAME)) as f:
download_config = (
self.dl_manager.download_config
if self.dl_manager
else DownloadConfig(token=self.token, storage_options=self._fs.storage_options)
)
with xopen(f"{self._output_dir}/{config.DATASET_INFO_FILENAME}", "wb", download_config=download_config) as f:
self.info._dump_info(f)
if self.info.license:
with fs.create(path_join(self._output_dir, config.LICENSE_FILENAME)) as f:
with xopen(f"{self._output_dir}/{config.LICENSE_FILENAME}", "wb", download_config=download_config) as f:
self.info._dump_license(f)

def _prepare_split(
Expand Down Expand Up @@ -2176,8 +2177,13 @@ def _generate_examples_from_hf_gcs(self, split: SplitInfo):
else:
remote_prepared_urls = [f"{self._remote_cache_dir_from_hf_gcs}/{self.name}-{split.name}.arrow"]
key = 0
download_config = (
self.dl_manager.download_config
if self.dl_manager
else DownloadConfig(token=self.token, storage_options=self._fs.storage_options)
)
for remote_prepared_url in remote_prepared_urls:
with xopen(remote_prepared_url, "rb") as f:
with xopen(remote_prepared_url, "rb", download_config=download_config) as f:
with pa.ipc.open_stream(f) as reader:
for record_batch in reader:
for record in record_batch.to_pylist():
Expand All @@ -2189,7 +2195,12 @@ def _request_info_from_hf_gcs(self):

remote_dataset_info = f"{self._remote_cache_dir_from_hf_gcs}/{config.DATASET_INFO_FILENAME}"
try:
with xopen(remote_dataset_info) as f:
download_config = download_config = (
self.dl_manager.download_config
if self.dl_manager
else DownloadConfig(token=self.token, storage_options=self._fs.storage_options)
)
with xopen(remote_dataset_info, download_config=download_config) as f:
import json

_info = json.load(f)
Expand Down
Loading