Skip to content

Commit

Permalink
Add support for TypedData for CSV export (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
atimin authored Sep 14, 2023
1 parent 31e8f61 commit 5b7e883
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 14 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- support for TypedData for CSV export, [PR-17](https://github.com/panda-official/DriftCLI/pull/17)

## 0.8.0 - 2023-07-11

### Added
Expand Down
97 changes: 95 additions & 2 deletions drift_cli/export_impl/raw.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
"""Export data"""
import asyncio
import csv
import json
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor, Executor
from pathlib import Path
from typing import List, Tuple

import numpy as np
from drift_client import DriftClient, DriftDataPackage
from drift_client.error import DriftClientError
from drift_protocol.common import StatusCode
from drift_protocol.meta import MetaInfo
from google.protobuf.json_format import MessageToDict
from rich.progress import Progress
from wavelet_buffer import WaveletBuffer
from wavelet_buffer.img import RgbJpeg, HslJpeg, GrayJpeg

from drift_cli.utils.helpers import read_topic, filter_topics
from drift_cli.utils.helpers import read_topic, filter_topics, to_timestamp


def _export_metadata_to_json(path: Path, pkg: DriftDataPackage):
Expand Down Expand Up @@ -155,7 +158,34 @@ async def _export_csv(
**kwargs,
):
Path.mkdir(Path(dest), exist_ok=True, parents=True)

it = client.walk(topic, to_timestamp(kwargs["start"]), to_timestamp(kwargs["stop"]))

def _next():
try:
return next(it)
except StopIteration:
return None
except DriftClientError:
return None

pkg = await asyncio.get_running_loop().run_in_executor(pool, _next)
if pkg is None or pkg.meta.type == MetaInfo.TIME_SERIES:
await _export_csv_timeseries(pool, client, topic, dest, progress, sem, **kwargs)
elif pkg.meta.type == MetaInfo.TYPED_DATA:
await _export_csv_typed_data(pool, client, topic, dest, progress, sem, **kwargs)
else:
raise RuntimeError(f"Can't export topic {topic} to csv")


async def _export_csv_timeseries(
pool: Executor,
client: DriftClient,
topic: str,
dest: str,
progress: Progress,
sem,
**kwargs,
):
filename = Path(dest) / f"{topic}.csv"
started = False
first_timestamp = 0
Expand Down Expand Up @@ -214,6 +244,68 @@ async def _export_csv(
)


async def _export_csv_typed_data(
pool: Executor,
client: DriftClient,
topic: str,
dest: str,
progress: Progress,
sem,
**kwargs,
):
filename = Path(dest) / f"{topic}.csv"
csv_writer = None
first_timestamp = 0
last_timestamp = 0
count = 0
with open(Path(dest) / f"{topic}.csv", "w") as file:
async for package, task in read_topic(
pool, client, topic, progress, sem, **kwargs
):
meta = package.meta
if meta.type != MetaInfo.TYPED_DATA:
progress.update(
task,
description=f"[SKIPPED] Topic {topic} is not typed data",
completed=True,
)
break

if package.status_code != 0:
continue

fields = {"timestamp": package.package_id}

if csv_writer is None:
file.write(" " * 256 + "\n")
first_timestamp = package.package_id
csv_writer = csv.DictWriter(
file,
fieldnames=list(fields.keys())
+ list(sorted(package.as_typed_data().keys())),
)
csv_writer.writeheader()

fields.update(package.as_typed_data()) # Use | when dropping python 3.8
csv_writer.writerow(fields)

count += 1

if csv_writer:
with open(filename, "r+") as file:
file.seek(0)
file.write(
",".join(
[
topic,
str(count),
str(first_timestamp),
str(last_timestamp),
]
)
)


async def export_raw(client: DriftClient, dest: str, parallel: int, **kwargs):
"""Export data from Drift instance to DST folder
Args:
Expand All @@ -231,6 +323,7 @@ async def export_raw(client: DriftClient, dest: str, parallel: int, **kwargs):
with Progress() as progress:
with ThreadPoolExecutor(max_workers=8) as pool:
topics = filter_topics(client.get_topics(), kwargs.pop("topics", ""))

task = _export_csv if kwargs.get("csv", False) else _export_topic
task = _export_jpeg if kwargs.get("jpeg", False) else task

Expand Down
12 changes: 7 additions & 5 deletions drift_cli/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ def parse_path(path) -> Tuple[str, str]:
return tuple(args)


def to_timestamp(date: str) -> float:
"""Convert ISO date to timestamp"""
return datetime.fromisoformat(date.replace("Z", "+00:00")).timestamp()


async def read_topic(
pool: Executor,
client: DriftClient,
Expand All @@ -68,11 +73,8 @@ async def read_topic(
Record: Record from entry
"""

def _to_timestamp(date: str) -> float:
return datetime.fromisoformat(date.replace("Z", "+00:00")).timestamp()

start = _to_timestamp(kwargs["start"])
stop = _to_timestamp(kwargs["stop"])
start = to_timestamp(kwargs["start"])
stop = to_timestamp(kwargs["stop"])
parallel = kwargs.pop("parallel", 1)

last_time = start
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ classifiers = [
]

dependencies = [
"drift-python-client~=0.7.0",
"drift-python-client~=0.8.1",
"click~=8.1",
"tomlkit~=0.11",
"rich~=12.6",
Expand Down
94 changes: 88 additions & 6 deletions tests/export_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@

import numpy as np
import pytest
from drift_bytes import OutputBuffer, Variant
from drift_client import DriftClient, DriftDataPackage
from drift_protocol.common import (
DriftPackage,
DataPayload,
StatusCode,
)
from drift_protocol.meta import TimeSeriesInfo, MetaInfo, ImageInfo
from drift_protocol.meta import TimeSeriesInfo, MetaInfo, ImageInfo, TypedDataInfo
from google.protobuf.any_pb2 import Any # pylint: disable=no-name-in-module
from wavelet_buffer import WaveletBuffer, WaveletType, denoise
from wavelet_buffer.img import WaveletImage, codecs
Expand Down Expand Up @@ -103,6 +104,50 @@ def _make_image_pkgs() -> List[DriftPackage]:
return packages


@pytest.fixture(name="typed_data")
def _make_typed_data(typed_data_pkgs) -> List[DriftDataPackage]:
return [DriftDataPackage(pkg.SerializeToString()) for pkg in typed_data_pkgs]


@pytest.fixture(name="typed_data_pkgs")
def _make_typed_data_pkgs() -> List[DriftPackage]:
packages = []
buffer = OutputBuffer()
typed_data_info = TypedDataInfo()
data = {
"bool": True,
"int": 1,
"float": 1.0,
"string": "string",
}

for name, value in data.items():
item = TypedDataInfo.Item()
item.name = name
item.status = StatusCode.GOOD

typed_data_info.items.append(item)
buffer.push(Variant(value))

for package_id in range(1, 3):
pkg = DriftPackage()
pkg.id = package_id
pkg.status = 0

payload = DataPayload()
payload.data = buffer.bytes()

msg = Any()
msg.Pack(payload)
pkg.data.append(msg)

pkg.meta.type = MetaInfo.TYPED_DATA
pkg.meta.typed_data_info.CopyFrom(typed_data_info)

packages.append(pkg)
return packages


@pytest.fixture(name="images")
def _make_images(image_pkgs) -> List[DriftDataPackage]:
return [DriftDataPackage(pkg.SerializeToString()) for pkg in image_pkgs]
Expand Down Expand Up @@ -196,7 +241,12 @@ def test__export_raw_data_with_metadata(
@pytest.mark.usefixtures("set_alias")
def test__export_raw_data_as_csv(runner, client, conf, export_path, topics, timeseries):
"""Test export raw data as csv"""
client.walk.side_effect = [Iterator(timeseries), Iterator(timeseries)]
client.walk.side_effect = [
Iterator(timeseries),
Iterator(timeseries),
Iterator(timeseries),
Iterator(timeseries),
]
result = runner(
f"-c {conf} -p 2 export raw test {export_path} --start 2022-01-01 --stop 2022-01-02 --csv"
)
Expand All @@ -222,8 +272,8 @@ def test__export_raw_data_start_stop_required(runner, conf, export_path):


@pytest.mark.usefixtures("set_alias")
def test__export_raw_data_no_timeseries(runner, client, conf, export_path):
"""Should skip no timeseries"""
def test__export_raw_data_image(runner, client, conf, export_path):
"""Should skip no image"""
pkg = DriftPackage()
pkg.meta.type = MetaInfo.IMAGE
client.walk.side_effect = [
Expand All @@ -233,7 +283,7 @@ def test__export_raw_data_no_timeseries(runner, client, conf, export_path):
result = runner(
f"-c {conf} -p 1 export raw test {export_path} --start 2022-01-01 --stop 2022-01-02 --csv"
)
assert "[SKIPPED] Topic topic1 is not a time series" in result.output
assert "[RuntimeError] Can't export topic topic1 to csv" in result.output


@pytest.mark.usefixtures("set_alias")
Expand All @@ -259,7 +309,12 @@ def test__export_raw_data_topics_jpeg(
runner, client, conf, export_path, topics, images
):
"""Should exctract jpeg from wavelet buffers"""
client.walk.side_effect = [Iterator(images), Iterator(images)]
client.walk.side_effect = [
Iterator(images),
Iterator(images),
Iterator(images),
Iterator(images),
]
result = runner(
f"-c {conf} -p 1 export raw test {export_path} --start 2022-01-01 --stop 2022-01-02 "
f"--jpeg"
Expand Down Expand Up @@ -368,3 +423,30 @@ def test__export_raw_jpeg_stacked_image(
img.import_from_file(
str(export_path / topics[0] / "1_2.jpeg"), denoise.Null(), codecs.GrayJpeg()
)


@pytest.mark.usefixtures("set_alias")
def test__export_raw_typed_data(runner, client, conf, export_path, topics, typed_data):
"""Should export typed data"""
client.walk.side_effect = [
Iterator(typed_data),
Iterator(typed_data),
Iterator(typed_data),
Iterator(typed_data),
]
result = runner(
f"-c {conf} -p 1 export raw test {export_path} --start 2022-01-01 --stop 2022-01-02 "
f"--csv"
)

assert f"Topic '{topics[0]}' (copied 2 packages (380 B)" in result.output
assert result.exit_code == 0

assert (export_path / f"{topics[0]}.csv").exists()
assert (export_path / f"{topics[1]}.csv").exists()

with open(export_path / f"{topics[0]}.csv", encoding="utf-8") as file:
assert file.readline().strip() == "topic1,2,1,0"
assert file.readline().strip() == "timestamp,bool,float,int,string"
assert file.readline().strip() == "1,True,1.0,1,string"
assert file.readline().strip() == "2,True,1.0,1,string"

0 comments on commit 5b7e883

Please sign in to comment.