From a646780479b96e166ee10596541900fdcd486ec4 Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Thu, 26 Jan 2023 21:36:13 +0100 Subject: [PATCH] Read and write with content type (#67) * read and write with content type * update CHANGELOG * fix dump logs * update checkout action --- .github/workflows/ci.yml | 14 +++++++----- CHANGELOG.md | 1 + reduct/bucket.py | 48 ++++++++++++++++++++++------------------ reduct/http.py | 7 +++++- tests/bucket_test.py | 18 ++++++++++++++- 5 files changed, 59 insertions(+), 29 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2ff1f0d..6ded9bb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,7 +11,7 @@ jobs: format: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Update pip run: python3 -m pip install --no-cache --upgrade pip setuptools wheel @@ -25,7 +25,7 @@ jobs: needs: format runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Update pip run: python3 -m pip install --no-cache --upgrade pip pipx @@ -41,11 +41,11 @@ jobs: needs: build runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Update pip run: python3 -m pip install --no-cache --upgrade pip pipx - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - uses: actions/download-artifact@master with: name: package @@ -71,7 +71,7 @@ jobs: python: ["3.7", "3.8", "3.9", "3.10", "3.11"] token: ["", "ACCESS_TOKEN"] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - uses: actions/download-artifact@master with: name: package @@ -95,6 +95,8 @@ jobs: - name: Dump docker logs on failure if: failure() uses: jwalton/gh-docker-logs@v2 + with: + dest: './logs' - name: Tar logs if: failure() run: tar cvzf ./logs.tgz ./logs @@ -108,7 +110,7 @@ jobs: needs: build runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - uses: actions/download-artifact@master with: name: package diff --git a/CHANGELOG.md b/CHANGELOG.md index c21a78b..24482b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Quick Start example and guide, [PR-65](https://github.com/reductstore/reduct-py/pull/65) - Support labels for read, write and querying, [PR-66](https://github.com/reductstore/reduct-py/pull/66) +- 'Content-Type' header for read and write operations, [PR-67](https://github.com/reductstore/reduct-py/pull/67) ## [1.2.0] - 2022-12-22 diff --git a/reduct/bucket.py b/reduct/bucket.py index c3c0241..d10c8e4 100644 --- a/reduct/bucket.py +++ b/reduct/bucket.py @@ -106,6 +106,8 @@ class Record: """size of data""" last: bool """last record in the query""" + content_type: str + """content type of data""" read_all: Callable[[None], Awaitable[bytes]] """read all data""" read: Callable[[int], AsyncIterator[bytes]] @@ -118,6 +120,27 @@ class Record: LABEL_PREFIX = "x-reduct-label-" +def _parse_record(resp, last=True): + timestamp = int(resp.headers["x-reduct-time"]) + size = int(resp.headers["content-length"]) + content_type = resp.headers.get("content-type", "application/octet-stream") + labels = dict( + (name[len(LABEL_PREFIX) :], value) + for name, value in resp.headers.items() + if name.startswith(LABEL_PREFIX) + ) + + return Record( + timestamp=timestamp, + size=size, + last=last, + read_all=resp.read, + read=resp.content.iter_chunked, + labels=labels, + content_type=content_type, + ) + + class Bucket: """A bucket of data in Reduct Storage""" @@ -195,7 +218,7 @@ async def read( async with self._http.request( "GET", f"/b/{self.name}/{entry_name}", params=params ) as resp: - yield self._parse_record(resp) + yield _parse_record(resp) async def write( self, @@ -216,6 +239,7 @@ async def write( needed only when the data is an iterator Keyword Args: labels (dict): labels as key-values + content_type (str): content type of data Raises: ReductError: if there is an HTTP error @@ -232,14 +256,13 @@ async def write( """ params = {"ts": timestamp if timestamp else time.time_ns() / 1000} - labels = kwargs["labels"] if "labels" in kwargs else None await self._http.request_all( "POST", f"/b/{self.name}/{entry_name}", params=params, data=data, content_length=content_length if content_length is not None else len(data), - labels=labels, + **kwargs, ) async def query( @@ -298,7 +321,7 @@ async def query( if resp.status == 202: return last = int(resp.headers["x-reduct-last"]) != 0 - yield self._parse_record(resp, last) + yield _parse_record(resp, last) async def get_full_info(self) -> BucketFullInfo: """ @@ -307,20 +330,3 @@ async def get_full_info(self) -> BucketFullInfo: return BucketFullInfo.parse_raw( await self._http.request_all("GET", f"/b/{self.name}") ) - - def _parse_record(self, resp, last=True): - timestamp = int(resp.headers["x-reduct-time"]) - size = int(resp.headers["content-length"]) - labels = dict( - (name[len(LABEL_PREFIX) :], value) - for name, value in resp.headers.items() - if name.startswith(LABEL_PREFIX) - ) - return Record( - timestamp=timestamp, - size=size, - last=last, - read_all=resp.read, - read=resp.content.iter_chunked, - labels=labels, - ) diff --git a/reduct/http.py b/reduct/http.py index af697be..6b8aa0e 100644 --- a/reduct/http.py +++ b/reduct/http.py @@ -34,6 +34,11 @@ async def request( if "content_length" in kwargs: extra_headers["Content-Length"] = str(kwargs["content_length"]) del kwargs["content_length"] + + if "content_type" in kwargs: + extra_headers["Content-Type"] = str(kwargs["content_type"]) + del kwargs["content_type"] + if "labels" in kwargs: if kwargs["labels"]: for name, value in kwargs["labels"].items(): @@ -68,7 +73,7 @@ async def request_all(self, method: str, path: str = "", **kwargs) -> bytes: async with self.request(method, path, **kwargs) as response: return await response.read() - async def request_by( + async def request_chunked( self, method: str, path: str = "", chunk_size=1024, **kwargs ) -> AsyncIterator[bytes]: """Http request""" diff --git a/tests/bucket_test.py b/tests/bucket_test.py index deace15..c20fea9 100644 --- a/tests/bucket_test.py +++ b/tests/bucket_test.py @@ -90,6 +90,9 @@ async def test__read_by_timestamp(bucket_1): async with bucket_1.read("entry-2", timestamp=3_000_000) as record: data = await record.read_all() assert data == b"some-data-3" + assert record.timestamp == 3_000_000 + assert record.size == 11 + assert record.content_type == "application/octet-stream" @pytest.mark.asyncio @@ -137,7 +140,7 @@ async def sender(): @pytest.mark.asyncio async def test__write_with_labels(bucket_1): - """Schould write data with labels""" + """Should write data with labels""" await bucket_1.write( "entry-1", b"something", labels={"label1": 123, "label2": 0.1, "label3": "hey"} ) @@ -147,6 +150,17 @@ async def test__write_with_labels(bucket_1): assert record.labels == {"label1": "123", "label2": "0.1", "label3": "hey"} +@pytest.mark.asyncio +async def test__write_with_content_type(bucket_1): + """Should write data with content_type""" + await bucket_1.write("entry-1", b"something", content_type="text/plain") + + async with bucket_1.read("entry-1") as record: + data = await record.read_all() + assert data == b"something" + assert record.content_type == "text/plain" + + @pytest.mark.asyncio async def test_query_records(bucket_1): """Should query records for a time interval""" @@ -158,10 +172,12 @@ async def test_query_records(bucket_1): assert records[0].timestamp == 3000000 assert records[0].size == 11 + assert records[0].content_type == "application/octet-stream" assert not records[0].last assert records[1].timestamp == 4000000 assert records[1].size == 11 + assert records[1].content_type == "application/octet-stream" assert records[1].last