Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for issue reduct-py-40 #43

Merged
merged 7 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
build/
pkg/reduct/VERSION
*.egg-info
.venv
.gitignore
13 changes: 9 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Removed:

- Deprecated entry `list` function, [PR-42](https://github.com/reduct-storage/reduct-py/pull/42)

### Added:

- `/api/v1/` prefix to all http endpoints, [PR-42](https://github.com/reduct-storage/reduct-py/pull/42)

### Changed:

- `bucket.read()` now returns a Record yielded from an asynccontext, [PR-43](https://github.com/reduct-storage/reduct-py/pull/43)

### Removed:

- Deprecated entry `list` function, [PR-42](https://github.com/reduct-storage/reduct-py/pull/42)
- `bucket.read_by` method, [PR-43](https://github.com/reduct-storage/reduct-py/pull/43)

## [0.5.1] - 2022-09-14

### Removed:
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ async def main():

ts = time.time_ns() / 1000
await bucket.write("entry-1", b"Hey!!", ts)
data = await bucket.read("entry-1", ts)
print(data)
async with bucket.read("entry-1", ts) as record:
data = await record.read_all()
print(data)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Expand Down
55 changes: 24 additions & 31 deletions pkg/reduct/bucket.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Bucket API"""
from contextlib import asynccontextmanager
import json
import time
from dataclasses import dataclass
Expand Down Expand Up @@ -99,7 +100,7 @@ class Record:
"""Record in a query"""

timestamp: int
"""UNIX timestamp in microsecods"""
"""UNIX timestamp in microseconds"""
size: int
"""size of data"""
last: bool
Expand Down Expand Up @@ -165,46 +166,38 @@ async def remove(self):
"""
await self._http.request_all("DELETE", f"/b/{self.name}")

async def read(self, entry_name: str, timestamp: Optional[int] = None) -> bytes:
@asynccontextmanager
async def read(
self, entry_name: str, timestamp: Optional[int] = None
) -> AsyncIterator[Record]:
"""
Read a record from entry
Args:
entry_name: name of entry in the bucket
timestamp: UNIX timestamp in microseconds - if None: get the latest record
Returns:
bytes:
Raises:
ReductError: if there is an HTTP error
"""
blob = b""
async for chunk in self.read_by(entry_name, timestamp):
blob += chunk

return blob

async def read_by(
self, entry_name: str, timestamp: Optional[int] = None, chunk_size: int = 1024
) -> AsyncIterator[bytes]:
"""
Read a record from entry in chunks

>>> async for chunk in bucket.read_by("entry-1", chunk_size=1024):
>>> print(chunk)
>>> async def reader():
>>> async with bucket.read("entry", timestamp=123456789) as record:
>>> data = await record.read_all()
Args:
entry_name: name of entry in the bucket
timestamp: UNIX timestamp in microseconds
if None get the latest record
chunk_size:
timestamp: UNIX timestamp in microseconds - if None: get the latest record
Returns:
bytes:
async context, which generates Records
Raises:
ReductError: if there is an HTTP error
"""
params = {"ts": timestamp} if timestamp else None
async for chunk in self._http.request_by(
"GET", f"/b/{self.name}/{entry_name}", params=params, chunk_size=chunk_size
):
yield chunk
async with self._http.request(
"GET", f"/b/{self.name}/{entry_name}", params=params
) as resp:
timestamp = int(resp.headers["x-reduct-time"])
size = int(resp.headers["content-length"])

yield Record(
timestamp=timestamp,
size=size,
last=True,
read_all=resp.read,
read=resp.content.iter_chunked,
)

async def write(
self,
Expand Down
4 changes: 3 additions & 1 deletion pkg/reduct/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ def __init__(
self.timeout = ClientTimeout(timeout)

@asynccontextmanager
async def request(self, method: str, path: str = "", **kwargs) -> ClientResponse:
async def request(
self, method: str, path: str = "", **kwargs
) -> AsyncIterator[ClientResponse]:
"""HTTP request with ReductError exception"""

extra_headers = {}
Expand Down
35 changes: 15 additions & 20 deletions tests/bucket_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,33 +79,26 @@ async def test__get_entries(bucket_1):
@pytest.mark.asyncio
async def test__read_by_timestamp(bucket_1):
"""Should read a record by timestamp"""
data = await bucket_1.read("entry-2", timestamp=3_000_000)
assert data == b"some-data-3"
async with bucket_1.read("entry-2", timestamp=3_000_000) as record:
data = await record.read_all()
assert data == b"some-data-3"


@pytest.mark.asyncio
async def test__read_latest(bucket_1):
"""Should read the latest record if no timestamp"""
data = await bucket_1.read("entry-2")
assert data == b"some-data-4"


@pytest.mark.asyncio
async def test__read_by_chunks(bucket_1):
"""Should read by chunks"""
data = b""
async for chunk in bucket_1.read_by("entry-2", chunk_size=3):
data += chunk

assert data == b"some-data-4"
async with bucket_1.read("entry-2") as record:
data = await record.read_all()
assert data == b"some-data-4"


@pytest.mark.asyncio
async def test__write_by_timestamp(bucket_2):
"""Should write a record by timestamp"""
await bucket_2.write("entry-3", b"test-data", timestamp=5_000_000)
data = await bucket_2.read("entry-3", timestamp=5_000_000)
assert data == b"test-data"
async with bucket_2.read("entry-3", timestamp=5_000_000) as record:
data = await record.read_all()
assert data == b"test-data"


@pytest.mark.asyncio
Expand All @@ -115,8 +108,9 @@ async def test__write_with_current_time(bucket_2):

await bucket_2.write("entry-3", b"test-data")
await bucket_2.write("entry-3", b"old-data", timestamp=belated_timestamp)
data = await bucket_2.read("entry-3")
assert data == b"test-data"
async with bucket_2.read("entry-3") as record:
data = await record.read_all()
assert data == b"test-data"


@pytest.mark.asyncio
Expand All @@ -128,8 +122,9 @@ async def sender():
yield chunk

await bucket_2.write("entry-1", sender(), content_length=10)
data = await bucket_2.read("entry-1")
assert data == b"part1part2"
async with bucket_2.read("entry-1") as record:
data = await record.read_all()
assert data == b"part1part2"


@pytest.mark.asyncio
Expand Down