Skip to content

Commit

Permalink
Add store ldshm subcmd
Browse files Browse the repository at this point in the history
Changed from the old `store clone` to instead simply load any shm buffer
matching a user provided `FQME: str` pattern; writing to parquet file is
only done if an explicit option flag is passed by user.

Implement new `iter_dfs_from_shms()` generator which allows interatively
loading both 1m and 1s buffers delivering the `Path`, `ShmArray` and
`polars.DataFrame` instances per matching file B)

Also add a todo for a `NativeStorageClient.clear_range()` method.
  • Loading branch information
goodboy committed Jun 20, 2023
1 parent 0c99749 commit bad71fa
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 83 deletions.
226 changes: 144 additions & 82 deletions piker/storage/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
"""
from __future__ import annotations
from pathlib import Path
import time
from typing import Generator
# from typing import TYPE_CHECKING

import polars as pl
import numpy as np
import tractor
# import pendulum
from rich.console import Console
import trio
Expand All @@ -32,6 +35,16 @@

from piker.service import open_piker_runtime
from piker.cli import cli
from piker.config import get_conf_dir
from piker.data import (
maybe_open_shm_array,
def_iohlcv_fields,
ShmArray,
)
from piker.data.history import (
_default_hist_size,
_default_rt_size,
)
from . import (
log,
)
Expand Down Expand Up @@ -132,8 +145,6 @@ def anal(

) -> np.ndarray:

import tractor

async def main():
async with (
open_piker_runtime(
Expand Down Expand Up @@ -171,99 +182,150 @@ async def main():
trio.run(main)


def iter_dfs_from_shms(fqme: str) -> Generator[
tuple[Path, ShmArray, pl.DataFrame],
None,
None,
]:
# shm buffer size table based on known sample rates
sizes: dict[str, int] = {
'hist': _default_hist_size,
'rt': _default_rt_size,
}

# load all detected shm buffer files which have the
# passed FQME pattern in the file name.
shmfiles: list[Path] = []
shmdir = Path('/dev/shm/')

for shmfile in shmdir.glob(f'*{fqme}*'):
filename: str = shmfile.name

# skip index files
if (
'_first' in filename
or '_last' in filename
):
continue

assert shmfile.is_file()
log.debug(f'Found matching shm buffer file: {filename}')
shmfiles.append(shmfile)

for shmfile in shmfiles:

# lookup array buffer size based on file suffix
# being either .rt or .hist
size: int = sizes[shmfile.name.rsplit('.')[-1]]

# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=shmfile.name,
size=size,
dtype=def_iohlcv_fields,
readonly=True,
)
assert not opened
ohlcv = shm.array

start = time.time()

# XXX: thanks to this SO answer for this conversion tip:
# https://stackoverflow.com/a/72054819
df = pl.DataFrame({
field_name: ohlcv[field_name]
for field_name in ohlcv.dtype.fields
})
delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'numpy -> polars conversion took {delay} secs\n'
f'polars df: {df}'
)

yield (
shmfile,
shm,
df,
)


@store.command()
def clone(
def ldshm(
fqme: str,

write_parquet: bool = False,

) -> None:
import time
from piker.config import get_conf_dir
from piker.data import (
maybe_open_shm_array,
def_iohlcv_fields,
)
import polars as pl

# TODO: actually look up an existing shm buf (set) from
# an fqme and file name parsing..
# open existing shm buffer for kucoin backend
key: str = 'piker.brokerd[3595d316-3c15-46].xmrusdt.kucoin.hist'
shmpath: Path = Path('/dev/shm') / key
assert shmpath.is_file()
'''
Linux ONLY: load any fqme file name matching shm buffer from
/dev/shm/ into an OHLCV numpy array and polars DataFrame,
optionally write to .parquet file.
'''
async def main():
async with (
open_piker_runtime(
'polars_boi',
enable_modules=['piker.data._sharedmem'],
),
):
# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=key,
dtype=def_iohlcv_fields,
)
assert not opened
ohlcv = shm.array

start = time.time()

# XXX: thanks to this SO answer for this conversion tip:
# https://stackoverflow.com/a/72054819
df = pl.DataFrame({
field_name: ohlcv[field_name]
for field_name in ohlcv.dtype.fields
})
delay: float = round(
time.time() - start,
ndigits=6,
)
print(
f'numpy -> polars conversion took {delay} secs\n'
f'polars df: {df}'
)

# compute ohlc properties for naming
times: np.ndarray = ohlcv['time']
secs: float = times[-1] - times[-2]
if secs < 1.:
breakpoint()
raise ValueError(
f'Something is wrong with time period for {shm}:\n{ohlcv}'
)

timeframe: str = f'{secs}s'

# write to parquet file
datadir: Path = get_conf_dir() / 'parqdb'
if not datadir.is_dir():
datadir.mkdir()

path: Path = datadir / f'{fqme}.{timeframe}.parquet'

# write to fs
start = time.time()
df.write_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
print(
f'parquet write took {delay} secs\n'
f'file path: {path}'
)
df: pl.DataFrame | None = None
for shmfile, shm, df in iter_dfs_from_shms(fqme):

# read back from fs
start = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
print(
f'parquet read took {delay} secs\n'
f'polars df: {read_df}'
)
# compute ohlc properties for naming
times: np.ndarray = shm.array['time']
secs: float = times[-1] - times[-2]
if secs < 1.:
breakpoint()
raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}'
)

# TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection?
await tractor.breakpoint()

# write to parquet file?
if write_parquet:
timeframe: str = f'{secs}s'

datadir: Path = get_conf_dir() / 'nativedb'
if not datadir.is_dir():
datadir.mkdir()

path: Path = datadir / f'{fqme}.{timeframe}.parquet'

# write to fs
start = time.time()
df.write_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'parquet write took {delay} secs\n'
f'file path: {path}'
)

# read back from fs
start = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
print(
f'parquet read took {delay} secs\n'
f'polars df: {read_df}'
)

if df is None:
log.error(f'No matching shm buffers for {fqme} ?')

trio.run(main)

Expand Down
27 changes: 26 additions & 1 deletion piker/storage/nativedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ def mk_ohlcv_shm_keyed_filepath(
return path


def unpack_fqme_from_parquet_filepath(path: Path) -> str:

filename: str = str(path.name)
fqme, fmt_descr, suffix = filename.split('.')
assert suffix == 'parquet'
return fqme


ohlc_key_map = None


Expand Down Expand Up @@ -347,10 +355,27 @@ async def delete_ts(
path.unlink()
log.warning(f'Deleting parquet entry:\n{path}')
else:
log.warning(f'No path exists:\n{path}')
log.error(f'No path exists:\n{path}')

return path

# TODO: allow wiping and refetching a segment of the OHLCV timeseries
# data.
# def clear_range(
# self,
# key: str,
# start_dt: datetime,
# end_dt: datetime,
# timeframe: int | None = None,
# ) -> pl.DataFrame:
# '''
# Clear and re-fetch a range of datums for the OHLCV time series.

# Useful for series editing from a chart B)

# '''
# ...


@acm
async def get_client(
Expand Down

0 comments on commit bad71fa

Please sign in to comment.