Skip to content

Commit

Permalink
Read and write with content type (#67)
Browse files Browse the repository at this point in the history
* read and write with content type

* update CHANGELOG

* fix dump logs

* update checkout action
  • Loading branch information
atimin authored Jan 26, 2023
1 parent dd0aede commit a646780
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 29 deletions.
14 changes: 8 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
format:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Update pip
run: python3 -m pip install --no-cache --upgrade pip setuptools wheel

Expand All @@ -25,7 +25,7 @@ jobs:
needs: format
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Update pip
run: python3 -m pip install --no-cache --upgrade pip pipx

Expand All @@ -41,11 +41,11 @@ jobs:
needs: build
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Update pip
run: python3 -m pip install --no-cache --upgrade pip pipx

- uses: actions/checkout@v2
- uses: actions/checkout@v3
- uses: actions/download-artifact@master
with:
name: package
Expand All @@ -71,7 +71,7 @@ jobs:
python: ["3.7", "3.8", "3.9", "3.10", "3.11"]
token: ["", "ACCESS_TOKEN"]
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- uses: actions/download-artifact@master
with:
name: package
Expand All @@ -95,6 +95,8 @@ jobs:
- name: Dump docker logs on failure
if: failure()
uses: jwalton/gh-docker-logs@v2
with:
dest: './logs'
- name: Tar logs
if: failure()
run: tar cvzf ./logs.tgz ./logs
Expand All @@ -108,7 +110,7 @@ jobs:
needs: build
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- uses: actions/download-artifact@master
with:
name: package
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Quick Start example and guide, [PR-65](https://github.com/reductstore/reduct-py/pull/65)
- Support labels for read, write and querying, [PR-66](https://github.com/reductstore/reduct-py/pull/66)
- 'Content-Type' header for read and write operations, [PR-67](https://github.com/reductstore/reduct-py/pull/67)

## [1.2.0] - 2022-12-22

Expand Down
48 changes: 27 additions & 21 deletions reduct/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class Record:
"""size of data"""
last: bool
"""last record in the query"""
content_type: str
"""content type of data"""
read_all: Callable[[None], Awaitable[bytes]]
"""read all data"""
read: Callable[[int], AsyncIterator[bytes]]
Expand All @@ -118,6 +120,27 @@ class Record:
LABEL_PREFIX = "x-reduct-label-"


def _parse_record(resp, last=True):
timestamp = int(resp.headers["x-reduct-time"])
size = int(resp.headers["content-length"])
content_type = resp.headers.get("content-type", "application/octet-stream")
labels = dict(
(name[len(LABEL_PREFIX) :], value)
for name, value in resp.headers.items()
if name.startswith(LABEL_PREFIX)
)

return Record(
timestamp=timestamp,
size=size,
last=last,
read_all=resp.read,
read=resp.content.iter_chunked,
labels=labels,
content_type=content_type,
)


class Bucket:
"""A bucket of data in Reduct Storage"""

Expand Down Expand Up @@ -195,7 +218,7 @@ async def read(
async with self._http.request(
"GET", f"/b/{self.name}/{entry_name}", params=params
) as resp:
yield self._parse_record(resp)
yield _parse_record(resp)

async def write(
self,
Expand All @@ -216,6 +239,7 @@ async def write(
needed only when the data is an iterator
Keyword Args:
labels (dict): labels as key-values
content_type (str): content type of data
Raises:
ReductError: if there is an HTTP error
Expand All @@ -232,14 +256,13 @@ async def write(
"""
params = {"ts": timestamp if timestamp else time.time_ns() / 1000}
labels = kwargs["labels"] if "labels" in kwargs else None
await self._http.request_all(
"POST",
f"/b/{self.name}/{entry_name}",
params=params,
data=data,
content_length=content_length if content_length is not None else len(data),
labels=labels,
**kwargs,
)

async def query(
Expand Down Expand Up @@ -298,7 +321,7 @@ async def query(
if resp.status == 202:
return
last = int(resp.headers["x-reduct-last"]) != 0
yield self._parse_record(resp, last)
yield _parse_record(resp, last)

async def get_full_info(self) -> BucketFullInfo:
"""
Expand All @@ -307,20 +330,3 @@ async def get_full_info(self) -> BucketFullInfo:
return BucketFullInfo.parse_raw(
await self._http.request_all("GET", f"/b/{self.name}")
)

def _parse_record(self, resp, last=True):
timestamp = int(resp.headers["x-reduct-time"])
size = int(resp.headers["content-length"])
labels = dict(
(name[len(LABEL_PREFIX) :], value)
for name, value in resp.headers.items()
if name.startswith(LABEL_PREFIX)
)
return Record(
timestamp=timestamp,
size=size,
last=last,
read_all=resp.read,
read=resp.content.iter_chunked,
labels=labels,
)
7 changes: 6 additions & 1 deletion reduct/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ async def request(
if "content_length" in kwargs:
extra_headers["Content-Length"] = str(kwargs["content_length"])
del kwargs["content_length"]

if "content_type" in kwargs:
extra_headers["Content-Type"] = str(kwargs["content_type"])
del kwargs["content_type"]

if "labels" in kwargs:
if kwargs["labels"]:
for name, value in kwargs["labels"].items():
Expand Down Expand Up @@ -68,7 +73,7 @@ async def request_all(self, method: str, path: str = "", **kwargs) -> bytes:
async with self.request(method, path, **kwargs) as response:
return await response.read()

async def request_by(
async def request_chunked(
self, method: str, path: str = "", chunk_size=1024, **kwargs
) -> AsyncIterator[bytes]:
"""Http request"""
Expand Down
18 changes: 17 additions & 1 deletion tests/bucket_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ async def test__read_by_timestamp(bucket_1):
async with bucket_1.read("entry-2", timestamp=3_000_000) as record:
data = await record.read_all()
assert data == b"some-data-3"
assert record.timestamp == 3_000_000
assert record.size == 11
assert record.content_type == "application/octet-stream"


@pytest.mark.asyncio
Expand Down Expand Up @@ -137,7 +140,7 @@ async def sender():

@pytest.mark.asyncio
async def test__write_with_labels(bucket_1):
"""Schould write data with labels"""
"""Should write data with labels"""
await bucket_1.write(
"entry-1", b"something", labels={"label1": 123, "label2": 0.1, "label3": "hey"}
)
Expand All @@ -147,6 +150,17 @@ async def test__write_with_labels(bucket_1):
assert record.labels == {"label1": "123", "label2": "0.1", "label3": "hey"}


@pytest.mark.asyncio
async def test__write_with_content_type(bucket_1):
"""Should write data with content_type"""
await bucket_1.write("entry-1", b"something", content_type="text/plain")

async with bucket_1.read("entry-1") as record:
data = await record.read_all()
assert data == b"something"
assert record.content_type == "text/plain"


@pytest.mark.asyncio
async def test_query_records(bucket_1):
"""Should query records for a time interval"""
Expand All @@ -158,10 +172,12 @@ async def test_query_records(bucket_1):

assert records[0].timestamp == 3000000
assert records[0].size == 11
assert records[0].content_type == "application/octet-stream"
assert not records[0].last

assert records[1].timestamp == 4000000
assert records[1].size == 11
assert records[1].content_type == "application/octet-stream"
assert records[1].last


Expand Down

0 comments on commit a646780

Please sign in to comment.