Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read and write with content type #67

Merged
merged 4 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
format:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Update pip
run: python3 -m pip install --no-cache --upgrade pip setuptools wheel

Expand All @@ -25,7 +25,7 @@ jobs:
needs: format
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Update pip
run: python3 -m pip install --no-cache --upgrade pip pipx

Expand All @@ -41,11 +41,11 @@ jobs:
needs: build
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Update pip
run: python3 -m pip install --no-cache --upgrade pip pipx

- uses: actions/checkout@v2
- uses: actions/checkout@v3
- uses: actions/download-artifact@master
with:
name: package
Expand All @@ -71,7 +71,7 @@ jobs:
python: ["3.7", "3.8", "3.9", "3.10", "3.11"]
token: ["", "ACCESS_TOKEN"]
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- uses: actions/download-artifact@master
with:
name: package
Expand All @@ -95,6 +95,8 @@ jobs:
- name: Dump docker logs on failure
if: failure()
uses: jwalton/gh-docker-logs@v2
with:
dest: './logs'
- name: Tar logs
if: failure()
run: tar cvzf ./logs.tgz ./logs
Expand All @@ -108,7 +110,7 @@ jobs:
needs: build
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- uses: actions/download-artifact@master
with:
name: package
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Quick Start example and guide, [PR-65](https://github.com/reductstore/reduct-py/pull/65)
- Support labels for read, write and querying, [PR-66](https://github.com/reductstore/reduct-py/pull/66)
- 'Content-Type' header for read and write operations, [PR-67](https://github.com/reductstore/reduct-py/pull/67)

## [1.2.0] - 2022-12-22

Expand Down
48 changes: 27 additions & 21 deletions reduct/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class Record:
"""size of data"""
last: bool
"""last record in the query"""
content_type: str
"""content type of data"""
read_all: Callable[[None], Awaitable[bytes]]
"""read all data"""
read: Callable[[int], AsyncIterator[bytes]]
Expand All @@ -118,6 +120,27 @@ class Record:
LABEL_PREFIX = "x-reduct-label-"


def _parse_record(resp, last=True):
timestamp = int(resp.headers["x-reduct-time"])
size = int(resp.headers["content-length"])
content_type = resp.headers.get("content-type", "application/octet-stream")
labels = dict(
(name[len(LABEL_PREFIX) :], value)
for name, value in resp.headers.items()
if name.startswith(LABEL_PREFIX)
)

return Record(
timestamp=timestamp,
size=size,
last=last,
read_all=resp.read,
read=resp.content.iter_chunked,
labels=labels,
content_type=content_type,
)


class Bucket:
"""A bucket of data in Reduct Storage"""

Expand Down Expand Up @@ -195,7 +218,7 @@ async def read(
async with self._http.request(
"GET", f"/b/{self.name}/{entry_name}", params=params
) as resp:
yield self._parse_record(resp)
yield _parse_record(resp)

async def write(
self,
Expand All @@ -216,6 +239,7 @@ async def write(
needed only when the data is an iterator
Keyword Args:
labels (dict): labels as key-values
content_type (str): content type of data
Raises:
ReductError: if there is an HTTP error

Expand All @@ -232,14 +256,13 @@ async def write(

"""
params = {"ts": timestamp if timestamp else time.time_ns() / 1000}
labels = kwargs["labels"] if "labels" in kwargs else None
await self._http.request_all(
"POST",
f"/b/{self.name}/{entry_name}",
params=params,
data=data,
content_length=content_length if content_length is not None else len(data),
labels=labels,
**kwargs,
)

async def query(
Expand Down Expand Up @@ -298,7 +321,7 @@ async def query(
if resp.status == 202:
return
last = int(resp.headers["x-reduct-last"]) != 0
yield self._parse_record(resp, last)
yield _parse_record(resp, last)

async def get_full_info(self) -> BucketFullInfo:
"""
Expand All @@ -307,20 +330,3 @@ async def get_full_info(self) -> BucketFullInfo:
return BucketFullInfo.parse_raw(
await self._http.request_all("GET", f"/b/{self.name}")
)

def _parse_record(self, resp, last=True):
timestamp = int(resp.headers["x-reduct-time"])
size = int(resp.headers["content-length"])
labels = dict(
(name[len(LABEL_PREFIX) :], value)
for name, value in resp.headers.items()
if name.startswith(LABEL_PREFIX)
)
return Record(
timestamp=timestamp,
size=size,
last=last,
read_all=resp.read,
read=resp.content.iter_chunked,
labels=labels,
)
7 changes: 6 additions & 1 deletion reduct/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ async def request(
if "content_length" in kwargs:
extra_headers["Content-Length"] = str(kwargs["content_length"])
del kwargs["content_length"]

if "content_type" in kwargs:
extra_headers["Content-Type"] = str(kwargs["content_type"])
del kwargs["content_type"]

if "labels" in kwargs:
if kwargs["labels"]:
for name, value in kwargs["labels"].items():
Expand Down Expand Up @@ -68,7 +73,7 @@ async def request_all(self, method: str, path: str = "", **kwargs) -> bytes:
async with self.request(method, path, **kwargs) as response:
return await response.read()

async def request_by(
async def request_chunked(
self, method: str, path: str = "", chunk_size=1024, **kwargs
) -> AsyncIterator[bytes]:
"""Http request"""
Expand Down
18 changes: 17 additions & 1 deletion tests/bucket_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ async def test__read_by_timestamp(bucket_1):
async with bucket_1.read("entry-2", timestamp=3_000_000) as record:
data = await record.read_all()
assert data == b"some-data-3"
assert record.timestamp == 3_000_000
assert record.size == 11
assert record.content_type == "application/octet-stream"


@pytest.mark.asyncio
Expand Down Expand Up @@ -137,7 +140,7 @@ async def sender():

@pytest.mark.asyncio
async def test__write_with_labels(bucket_1):
"""Schould write data with labels"""
"""Should write data with labels"""
await bucket_1.write(
"entry-1", b"something", labels={"label1": 123, "label2": 0.1, "label3": "hey"}
)
Expand All @@ -147,6 +150,17 @@ async def test__write_with_labels(bucket_1):
assert record.labels == {"label1": "123", "label2": "0.1", "label3": "hey"}


@pytest.mark.asyncio
async def test__write_with_content_type(bucket_1):
"""Should write data with content_type"""
await bucket_1.write("entry-1", b"something", content_type="text/plain")

async with bucket_1.read("entry-1") as record:
data = await record.read_all()
assert data == b"something"
assert record.content_type == "text/plain"


@pytest.mark.asyncio
async def test_query_records(bucket_1):
"""Should query records for a time interval"""
Expand All @@ -158,10 +172,12 @@ async def test_query_records(bucket_1):

assert records[0].timestamp == 3000000
assert records[0].size == 11
assert records[0].content_type == "application/octet-stream"
assert not records[0].last

assert records[1].timestamp == 4000000
assert records[1].size == 11
assert records[1].content_type == "application/octet-stream"
assert records[1].last


Expand Down