Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRIFT-660: Implement export time series data as .dp and csv files #2

Merged
merged 9 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- `drift-cli alias` command to manage aliases, [PR-1](https://github.com/panda-official/DriftCLI/pull/1)
- `drift-cli export raw` command to export data from blob
storage, [PR-2](https://github.com/panda-official/DriftCLI/pull/2)
13 changes: 11 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ Drift CLI is a command line client for [PANDA|Drift](https://driftpythonclient.r

## Features

*

* Export data from Drift Blob Storage

## Requirements

Expand All @@ -23,3 +22,13 @@ To install the Drift CLI, simply use pip:
```
pip install drift-cli
```
##

```
drift-cli --help
drift-cli alias add drift-device --address 127.0.0.1 --password SOME_PASSWORD
drift-cli export raw drift-device ./tmp --start 2021-01-01 --end 2021-01-02[export.md](..%2F..%2Freduct%2Freduct-cli%2Fdocs%2Fexport.md)[export.md](..%2F..%2Freduct%2Freduct-cli%2Fdocs%2Fexport.md)
```
## Links

* [Documentation](https://driftcli.readthedocs.io/en/latest/)
53 changes: 53 additions & 0 deletions docs/export.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Export Data

In this section, we will show how to export data from a Drift instance by using the `drift-cli export` command.

## Export Raw Data


The `drift-cli export raw` command allows you to export data from a Drift instance to a local folder
on your computer. This can be useful if you want to make a copy of your data for backup or process them locally.

The `drift-cli export raw` command has the following syntax:

```
drift-cli export raw [OPTIONS] SRC DEST
```

`SRC` should be an alias of a Drift instance you want to export data from.

`DEST` should be the destination folder on your computer where you want to save the exported data.

Here is an example of how you might use the `drift-cli export raw` command:

```
drift-cli export raw drift-device ./exported-data --start 2021-01-23 --end 2021-01-24
```

This will export all the raw data from the `drift-device` Drift instance to the `./exported-data` folder on your computer.
For each topic the CLI will create a separate folder. Each package will be saved as a separate file with the name
`<timestamp>.dp`.

## Available options

Here is a list of the options that you can use with the `drift-cli export` commands:

* `--start`: This option allows you to specify a starting time point for the data that you want to export. Data with
timestamps newer than this time point will be included in the export. The time point should be in ISO format (e.g.,
2022-01-01T00:00:00Z).

* `--stop`: This option allows you to specify an ending time point for the data that you want to export. Data with
timestamps older than this time point will be included in the export. The time point should be in ISO format (e.g.,
2022-01-01T00:00:00Z).

* `--csv`: This option allows you to export data in csv format. It creates a separate csv file for each topic in the
exported data and save time series data in a single column with meta information in first row. The meta information
has the following format: `topic,package count, first timestamp, last timestamp`. The timestamp format is Unix time
in milliseconds.


You also can use the global `--parallel` option to specify the number of entries that you want to export in parallel:

```
drift-cli --parallel 10 export raw drift-device ./exported-data --start 2021-01-01T00:00:00Z --end 2021-01-02T00:00:00Z
```
14 changes: 3 additions & 11 deletions drift_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import click

from drift_cli.alias import alias
from drift_cli.export import export

from drift_cli.config import write_config, Config


Expand All @@ -17,12 +19,6 @@
type=Path,
help="Path to config file. Default ${HOME}/drift-cli/config.toml",
)
@click.option(
"--timeout",
"-t",
type=int,
help="Timeout for requests in seconds. Default 5",
)
@click.option(
"--parallel",
"-p",
Expand All @@ -33,25 +29,21 @@
def cli(
ctx,
config: Optional[Path] = None,
timeout: Optional[int] = None,
parallel: Optional[int] = None,
):
"""CLI client for PANDA | Drift Platform"""
if config is None:
config = Path.home() / ".drift-cli" / "config.toml"

if timeout is None:
timeout = 5

if parallel is None:
parallel = 10

if not Path.exists(config):
write_config(config, Config(aliases={}))

ctx.obj["config_path"] = config
ctx.obj["timeout"] = timeout
ctx.obj["parallel"] = parallel


cli.add_command(alias, "alias")
cli.add_command(export, "export")
83 changes: 83 additions & 0 deletions drift_cli/export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""Export Command"""
import asyncio

import click
from click import Abort
from drift_client import DriftClient

from drift_cli.config import Alias
from drift_cli.config import read_config
from drift_cli.export_impl.raw import export_raw
from drift_cli.utils.consoles import error_console
from drift_cli.utils.error import error_handle
from drift_cli.utils.helpers import (
parse_path,
)

start_option = click.option(
"--start",
help="Export records with timestamps newer than this time point in ISO format e.g. 2023-01-01T00:00:00.000Z",
)

stop_option = click.option(
"--stop",
help="Export records with timestamps older than this time point in ISO format e.g 2023-01-01T00:00:00.000Z",
)


@click.group()
def export():
"""Export data from a bucket somewhere else"""


@export.command()
@click.argument("src")
@click.argument("dest")
@stop_option
@start_option
@click.option(
"--csv",
help="Export data as CSV instead of raw data (only for timeseries)",
default=False,
is_flag=True,
)
@click.pass_context
def raw(
ctx,
src: str,
dest: str,
start: str,
stop: str,
csv: bool,
): # pylint: disable=too-many-arguments
"""Export data from SRC bucket to DST folder

SRC should be in the format of ALIAS/BUCKET_NAME.
DST should be a path to a folder.

As result, the folder will contain a folder for each entry in the bucket.
Each entry folder will contain a file for each record
in the entry with the timestamp as the name.
"""
if start is None or stop is None:
error_console.print("Error: --start and --stop are required")
raise Abort()

alias_name, _ = parse_path(src)
alias: Alias = read_config(ctx.obj["config_path"]).aliases[alias_name]

loop = asyncio.get_event_loop()
run = loop.run_until_complete
client = DriftClient(alias.address, alias.password, loop=loop)

with error_handle():
run(
export_raw(
client,
dest,
parallel=ctx.obj["parallel"],
start=start,
stop=stop,
csv=csv,
)
)
122 changes: 122 additions & 0 deletions drift_cli/export_impl/raw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Export data"""
import asyncio
from concurrent.futures import ThreadPoolExecutor, Executor
from pathlib import Path

import numpy as np
from drift_client import DriftClient
from drift_protocol.meta import MetaInfo
from rich.progress import Progress

from drift_cli.utils.helpers import read_topic


async def _export_topic(
pool: Executor,
client: DriftClient,
topic: str,
dest: str,
progress: Progress,
sem,
**kwargs,
):
async for package, task in read_topic(pool, client, topic, progress, sem, **kwargs):
Path.mkdir(Path(dest) / topic, exist_ok=True, parents=True)
with open(Path(dest) / topic / f"{package.package_id}.dp", "wb") as file:
file.write(package.blob)


async def _export_csv(
pool: Executor,
client: DriftClient,
curren_topic: str,
dest: str,
progress: Progress,
sem,
**kwargs,
):
Path.mkdir(Path(dest), exist_ok=True, parents=True)

filename = Path(dest) / f"{curren_topic}.csv"
started = False
first_timestamp = 0
last_timestamp = 0
count = 0
async for package, task in read_topic(
pool, client, curren_topic, progress, sem, **kwargs
):
meta = package.meta
if meta.type != MetaInfo.TIME_SERIES:
progress.update(
task,
description=f"[SKIPPED] Topic {curren_topic} is not a time series",
completed=True,
)
break

if not started:
with open(Path(dest) / f"{curren_topic}.csv", "w") as file:
file.write(" " * 256 + "\n")
started = True
first_timestamp = meta.time_series_info.start_timestamp.ToMilliseconds()
else:
if last_timestamp != meta.time_series_info.start_timestamp.ToMilliseconds():
progress.update(
task,
description=f"[ERROR] Topic {curren_topic} has gaps",
completed=True,
)
break

if package.status_code != 0:
progress.update(
task,
description=f"[ERROR] Topic {curren_topic} has a bad package",
completed=True,
)
break

last_timestamp = package.meta.time_series_info.stop_timestamp.ToMilliseconds()
with open(filename, "a") as file:
np.savetxt(file, package.as_np(), delimiter=",", fmt="%.5f")

count += 1

if started:
with open(filename, "r+") as file:
file.seek(0)
file.write(
",".join(
[
curren_topic,
str(count),
str(first_timestamp),
str(last_timestamp),
]
)
)


async def export_raw(client: DriftClient, dest: str, parallel: int, **kwargs):
"""Export data from Drift instance to DST folder
Args:
client: Drift client
dest: Path to a folder
parallel: Number of parallel tasks
KArgs:
start: Export records with timestamps newer than this time point in ISO format
stop: Export records with timestamps older than this time point in ISO format
csv: Export data as CSV instead of raw data
"""
sem = asyncio.Semaphore(parallel)
with Progress() as progress:
with ThreadPoolExecutor() as pool:
topics = client.get_topics()
csv = kwargs.pop("csv", False)
task = _export_csv if csv else _export_topic

tasks = [
task(pool, client, topic, dest, progress, sem, topics=topics, **kwargs)
for topic in topics
]
await asyncio.gather(*tasks)
Loading