Skip to content

Commit

Permalink
RS-418: Implement removing record API (#114)
Browse files Browse the repository at this point in the history
* remove single record

* implement remove query
  • Loading branch information
atimin authored Sep 9, 2024
1 parent ed26632 commit e485ff2
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 25 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ reduct/VERSION
.venv
.gitignore
*.pyc
.vscode
.vscode
venv/
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ requires = ["setuptools>=40.8.0", "wheel"]
[project]

name = "reduct-py"
version = "1.11.0"
version = "1.12.0"
description = "ReductStore Client SDK for Python"
requires-python = ">=3.8"
readme = "README.md"
Expand Down
112 changes: 92 additions & 20 deletions reduct/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,77 @@ async def remove_entry(self, entry_name: str):
"""
await self._http.request_all("DELETE", f"/b/{self.name}/{entry_name}")

async def remove_record(
self, entry_name: str, timestamp: Union[int, datetime, float, str]
):
"""
Remove record from entry
Args:
entry_name: name of entry
timestamp: timestamp of record
Raises:
ReductError: if there is an HTTP error
"""
timestamp = unix_timestamp_from_any(timestamp)
await self._http.request_all(
"DELETE", f"/b/{self.name}/{entry_name}?ts={timestamp}"
)

async def remove_batch(
self, entry_name: str, batch: Batch
) -> Dict[int, ReductError]:
"""
Remove batch of records from entries in a sole request
Args:
entry_name: name of entry in the bucket
batch: list of timestamps
Returns:
dict of errors with timestamps as keys
Raises:
ReductError: if there is an HTTP error
"""
_, record_headers = self._make_headers(batch)
_, headers = await self._http.request_all(
"DELETE",
f"/b/{self.name}/{entry_name}/batch",
extra_headers=record_headers,
)

return self._parse_errors_from_headers(headers)

async def remove_query(
self,
entry_name: str,
start: Optional[Union[int, datetime, float, str]] = None,
stop: Optional[Union[int, datetime, float, str]] = None,
**kwargs,
) -> int:
"""
Query data to remove within a time interval
The time interval is defined by the start and stop parameters that can be:
int (UNIX timestamp in microseconds), datetime,
float (UNIX timestamp in seconds) or str (ISO 8601 string).
Args:
entry_name: name of entry in the bucket
start: the beginning of the time interval.
If None, then from the first record
stop: the end of the time interval. If None, then to the latest record
Keyword Args:
include (dict): remove records which have all labels from this dict
exclude (dict): remove records which doesn't have all labels from this
each_s(Union[int, float]): remove a record for each S seconds
each_n(int): remove each N-th record
Returns:
number of removed records
"""
params = await self._parse_query_params(kwargs, start, stop)
resp, _ = await self._http.request_all(
"DELETE", f"/b/{self.name}/{entry_name}/q", params=params
)

return json.loads(resp)["removed_records"]

@asynccontextmanager
async def read(
self,
Expand Down Expand Up @@ -383,8 +454,6 @@ async def query(
>>> async for chunk in record.read(n=1024):
>>> print(chunk)
"""
start = unix_timestamp_from_any(start) if start else None
stop = unix_timestamp_from_any(stop) if stop else None

query_id = await self._query(entry_name, start, stop, ttl, **kwargs)
last = False
Expand Down Expand Up @@ -485,24 +554,7 @@ async def subscribe(
yield parse_record(resp, False)

async def _query(self, entry_name, start, stop, ttl, **kwargs):
params = {}
if start:
params["start"] = int(start)
if stop:
params["stop"] = int(stop)

if "include" in kwargs:
for name, value in kwargs["include"].items():
params[f"include-{name}"] = str(value)
if "exclude" in kwargs:
for name, value in kwargs["exclude"].items():
params[f"exclude-{name}"] = str(value)

if "each_s" in kwargs:
params["each_s"] = float(kwargs["each_s"])

if "each_n" in kwargs:
params["each_n"] = int(kwargs["each_n"])
params = await self._parse_query_params(kwargs, start, stop)

if "limit" in kwargs:
params["limit"] = kwargs["limit"]
Expand All @@ -522,6 +574,26 @@ async def _query(self, entry_name, start, stop, ttl, **kwargs):
query_id = json.loads(data)["id"]
return query_id

async def _parse_query_params(self, kwargs, start, stop):
start = unix_timestamp_from_any(start) if start else None
stop = unix_timestamp_from_any(stop) if stop else None
params = {}
if start:
params["start"] = start
if stop:
params["stop"] = stop
if "include" in kwargs:
for name, value in kwargs["include"].items():
params[f"include-{name}"] = str(value)
if "exclude" in kwargs:
for name, value in kwargs["exclude"].items():
params[f"exclude-{name}"] = str(value)
if "each_s" in kwargs:
params["each_s"] = float(kwargs["each_s"])
if "each_n" in kwargs:
params["each_n"] = int(kwargs["each_n"])
return params

@staticmethod
def _make_headers(batch: Batch) -> Tuple[int, Dict[str, str]]:
"""Make headers for batch"""
Expand Down
42 changes: 42 additions & 0 deletions tests/bucket_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,3 +542,45 @@ async def test_update_labels_batch(bucket_1):

async with bucket_1.read("entry-2", timestamp=4000000) as record:
assert record.labels == {"label1": "new-value", "label4": "value4"}


@pytest.mark.asyncio
@requires_api("1.12")
async def test_remove_single_record(bucket_1):
"""Should remove a single record"""
await bucket_1.remove_record("entry-2", 3000000)
records = [record async for record in bucket_1.query("entry-2")]
assert len(records) == 2
assert records[0].timestamp == 4000000
assert records[1].timestamp == 5000000


@pytest.mark.asyncio
@requires_api("1.12")
async def test_remove_batched_records(bucket_1):
"""Should remove batched records"""
batch = Batch()
batch.add(3000000)
batch.add(4000000)
batch.add(8000000)

errors = await bucket_1.remove_batch("entry-2", batch)
assert len(errors) == 1
assert errors[8000000] == ReductError(404, "No record with timestamp 8000000")

records = [record async for record in bucket_1.query("entry-2")]
assert len(records) == 1

assert records[0].timestamp == 5000000


@pytest.mark.asyncio
@requires_api("1.12")
async def test_remove_query(bucket_1):
"""Should remove records by query"""
removed = await bucket_1.remove_query("entry-2", start=3000000, stop=5000000)
assert removed == 2

records = [record async for record in bucket_1.query("entry-2")]
assert len(records) == 1
assert records[0].timestamp == 5000000
6 changes: 3 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ def requires_env(key):

def requires_api(version):
"""Skip test if API version is not supported"""
current_version = requests.get("http://127.0.0.1:8383/info", timeout=1.0).headers[
"x-reduct-api"
]
current_version = requests.get(
"http://127.0.0.1:8383/api/v1/info", timeout=1.0
).headers["x-reduct-api"]
return pytest.mark.skipif(
extract_api_version(version)[1] > extract_api_version(current_version)[1],
reason=f"Not suitable API version {current_version} for current test",
Expand Down

0 comments on commit e485ff2

Please sign in to comment.