Skip to content

Commit

Permalink
Merge pull request #54 from dandi/gh-53
Browse files Browse the repository at this point in the history
Use a Dockerized Archive instance instead of staging for testing
  • Loading branch information
jwodder authored Jul 26, 2024
2 parents 2ef1826 + 99e4daa commit d53f154
Show file tree
Hide file tree
Showing 11 changed files with 465 additions and 96 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ jobs:
- name: Run tests with coverage
if: matrix.toxenv == 'py'
run: tox -e py -- -vv --cov-report=xml
env:
DANDI_API_KEY: ${{ secrets.DANDI_API_KEY }}

- name: Run generic tests
if: matrix.toxenv != 'py'
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ Before running `backups2datalad`, the following setup must be performed:
- When `dandi_instance` is `"dandi-staging"`, this should be
`"dandi-api-staging-dandisets"`.

- `s3endpoint` — The base endpoint URL of the S3 instance on which the
bucket is located. If this is set, the base bucket URL will be
calculated as `{s3endpoint}/{s3bucket}`; otherwise, it will be
`https://{s3bucket}.s3.amazonaws.com`. This option is intended primarily
for use in testing.

- `content_url_regex` — A regular expression used to identify which of an
asset's `contentUrl`s is its S3 URL. Defaults to
`"amazonaws.com/.*blobs/"`.
Expand Down
4 changes: 1 addition & 3 deletions src/backups2datalad/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ async def get_file_bucket_url(self, s3client: httpx.AsyncClient) -> str:
self.log.debug("Fetching bucket URL")
aws_url = self.asset.get_content_url(self.config.content_url_regex)
urlbits = urlparse(aws_url)
key = urlbits.path.lstrip("/")
self.log.debug("About to query S3")
r = await arequest(
s3client,
"HEAD",
f"https://{self.config.s3bucket}.s3.amazonaws.com/{key}",
urlunparse(urlbits._replace(params="", query="", fragment="")),
)
r.raise_for_status()
version_id = r.headers["x-amz-version-id"]
self.log.debug("Got bucket URL")
return urlunparse(urlbits._replace(query=f"versionId={version_id}"))
Expand Down
8 changes: 8 additions & 0 deletions src/backups2datalad/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class BackupConfig(BaseModel):
# config file is given
dandi_instance: str = "dandi"
s3bucket: str = "dandiarchive"
s3endpoint: str | None = None
content_url_regex: str = r"amazonaws.com/.*blobs/"
dandisets: ResourceConfig = Field(
default_factory=lambda: ResourceConfig(path="dandisets")
Expand Down Expand Up @@ -88,6 +89,13 @@ def load_yaml(cls, filepath: Path) -> BackupConfig:
def dump_yaml(self, filepath: Path) -> None:
filepath.write_text(yaml_dump(self.model_dump(mode="json", exclude_unset=True)))

@property
def bucket_url(self) -> str:
if self.s3endpoint is not None:
return f"{self.s3endpoint}/{self.s3bucket}"
else:
return f"https://{self.s3bucket}.s3.amazonaws.com"

@property
def dandiset_root(self) -> Path:
return self.backup_root / self.dandisets.path
Expand Down
46 changes: 30 additions & 16 deletions src/backups2datalad/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .adandi import RemoteZarrAsset
from .adataset import AsyncDataset
from .annex import AsyncAnnex
from .config import ZarrMode
from .config import BackupConfig, ZarrMode
from .consts import MAX_ZARR_SYNCS
from .logging import PrefixedLogger
from .manager import Manager
Expand Down Expand Up @@ -105,34 +105,49 @@ def get_summary(self) -> str:
@dataclass
class ZarrSyncer:
asset: RemoteZarrAsset
api_url: str = field(init=False)
zarr_id: str = field(init=False)
ds: AsyncDataset
repo: Path = field(init=False)
annex: AsyncAnnex
s3bucket: str
s3prefix: str = field(init=False)
config: BackupConfig
backup_remote: str | None
checksum: str | None
log: PrefixedLogger
mode: ZarrMode
last_timestamp: datetime | None = None
error_on_change: bool = False
report: ZarrReport = field(default_factory=ZarrReport)
_local_checksum: str | None = None

def __post_init__(self) -> None:
self.api_url = self.asset.aclient.api_url
self.zarr_id = self.asset.zarr
self.repo = self.ds.pathobj
self.s3prefix = f"zarr/{self.zarr_id}/"
@property
def api_url(self) -> str:
return self.asset.aclient.api_url

@property
def zarr_id(self) -> str:
return self.asset.zarr

@property
def repo(self) -> Path:
return self.ds.pathobj

@property
def s3bucket(self) -> str:
return self.config.s3bucket

@property
def s3prefix(self) -> str:
return f"zarr/{self.zarr_id}/"

@property
def mode(self) -> ZarrMode:
return self.config.zarr_mode

async def run(self) -> None:
last_sync = self.read_sync_file()
async with aclosing(self.annex.list_files()) as fileiter:
to_delete = {f async for f in fileiter if not is_meta_file(f)}
async with get_session().create_client(
"s3", config=AioConfig(signature_version=UNSIGNED)
"s3",
config=AioConfig(signature_version=UNSIGNED),
endpoint_url=self.config.s3endpoint,
) as client:
if not await self.needs_sync(client, last_sync, to_delete):
self.log.info("backup up to date")
Expand Down Expand Up @@ -443,7 +458,7 @@ async def aiter_file_entries(
size=v["Size"],
md5_digest=v["ETag"].strip('"'),
last_modified=v["LastModified"],
bucket_url=f"https://{self.s3bucket}.s3.amazonaws.com/{quote(v['Key'])}?versionId={v['VersionId']}",
bucket_url=f"{self.config.bucket_url}/{quote(v['Key'])}?versionId={v['VersionId']}",
)
for dm in page.get("DeleteMarkers", []):
if dm["IsLatest"]:
Expand Down Expand Up @@ -543,12 +558,11 @@ async def sync_zarr(
asset=asset,
ds=ds,
annex=annex,
s3bucket=manager.config.s3bucket,
config=manager.config,
backup_remote=backup_remote,
checksum=checksum,
log=manager.log,
error_on_change=error_on_change,
mode=manager.config.zarr_mode,
)
await zsync.run()
report = zsync.report
Expand Down
Loading

0 comments on commit d53f154

Please sign in to comment.