Skip to content

Commit

Permalink
Rename parquet_dfs -> rawohlcv_dfs; hist_df -> mergedohlcv_df; update…
Browse files Browse the repository at this point in the history
… related
  • Loading branch information
trentmc committed Dec 12, 2023
1 parent 11b6bd0 commit 4460259
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 275 deletions.
28 changes: 14 additions & 14 deletions pdr_backend/data_eng/model_data_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
class ModelDataFactory:
"""
Roles:
- From hist_df, create (X, y, x_df) for model building
- From mergedohlcv_df, create (X, y, x_df) for model building
Where
parquet files -> parquet_dfs -> hist_df, via parquet_data_factory
parquet files -> parquet_dfs -> mergedohlcv_df, via ohlcv_data_factory
X -- 2d array of [sample_i, var_i] : value -- inputs for model
y -- 1d array of [sample_i] -- target outputs for model
Expand All @@ -45,35 +45,35 @@ def __init__(self, pp: DataPP, ss: DataSS):

def create_xy(
self,
hist_df: pl.DataFrame,
mergedohlcv_df: pl.DataFrame,
testshift: int,
do_fill_nans: bool = True,
) -> Tuple[np.ndarray, np.ndarray, pd.DataFrame]:
"""
@arguments
hist_df -- *polars* DataFrame. See class docstring
mergedohlcv_df -- *polars* DataFrame. See class docstring
testshift -- to simulate across historical test data
do_fill_nans -- if any values are nan, fill them? (Via interpolation)
If you turn this off and hist_df has nans, then X/y/etc gets nans
If you turn this off and mergedohlcv_df has nans, then X/y/etc gets nans
@return --
X -- 2d array of [sample_i, var_i] : value -- inputs for model
y -- 1d array of [sample_i] -- target outputs for model
x_df -- *pandas* DataFrame. See class docstring.
"""
# preconditions
assert isinstance(hist_df, pl.DataFrame), pl.__class__
assert "timestamp" in hist_df.columns
assert "datetime" in hist_df.columns
assert isinstance(mergedohlcv_df, pl.DataFrame), pl.__class__
assert "timestamp" in mergedohlcv_df.columns
assert "datetime" in mergedohlcv_df.columns

# every column should be ordered with oldest first, youngest last.
# let's verify! The timestamps should be in ascending order
uts = hist_df["timestamp"].to_list()
uts = mergedohlcv_df["timestamp"].to_list()
assert uts == sorted(uts, reverse=False)

# condition inputs
if do_fill_nans and has_nan(hist_df):
hist_df = fill_nans(hist_df)
if do_fill_nans and has_nan(mergedohlcv_df):
mergedohlcv_df = fill_nans(mergedohlcv_df)
ss = self.ss

# main work
Expand All @@ -85,8 +85,8 @@ def create_xy(
]

for hist_col in target_hist_cols:
assert hist_col in hist_df.columns, f"missing data col: {hist_col}"
z = hist_df[hist_col].to_list() # [..., z(t-3), z(t-2), z(t-1)]
assert hist_col in mergedohlcv_df.columns, f"missing data col: {hist_col}"
z = mergedohlcv_df[hist_col].to_list() # [..., z(t-3), z(t-2), z(t-1)]
maxshift = testshift + ss.autoregressive_n
N_train = min(ss.max_n_train, len(z) - maxshift - 1)
if N_train <= 0:
Expand All @@ -111,7 +111,7 @@ def create_xy(
# eg y = [BinEthC_-1, BinEthC_-2, ..., BinEthC_-450, BinEthC_-451]
pp = self.pp
hist_col = f"{pp.exchange_str}:{pp.pair_str}:{pp.signal_str}"
z = hist_df[hist_col].to_list()
z = mergedohlcv_df[hist_col].to_list()
y = np.array(_slice(z, -testshift - N_train - 1, -testshift))

# postconditions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
initialize_df,
transform_df,
concat_next_df,
load_parquet,
save_parquet,
load_rawohlcv_file,
save_rawohlcv_file,
has_data,
oldest_ut,
newest_ut,
Expand All @@ -28,19 +28,19 @@


@enforce_types
class ParquetDataFactory:
class OhlcvDataFactory:
"""
Roles:
- From each CEX API, fill >=1 parquet_dfs -> parquet files data lake
- From parquet_dfs, fill 1 hist_df -- historical data across all CEXes
- From each CEX API, fill >=1 rawohlcv_dfs -> rawohlcv files data lake
- From rawohlcv_dfs, fill 1 mergedohlcv_df -- all data across all CEXes
Where:
parquet_dfs -- dict of [exch_str][pair_str] : df
rawohlcv_dfs -- dict of [exch_str][pair_str] : df
And df has columns of: "open", "high", .., "volume", "datetime"
Where pair_str must have '/' not '-', to avoid key issues
(and index = timestamp)
hist_df -- polars DataFrame with cols like:
mergedohlcv_df -- polars DataFrame with cols like:
"timestamp",
"binanceus:ETH-USDT:open",
"binanceus:ETH-USDT:high",
Expand Down Expand Up @@ -77,13 +77,13 @@ def __init__(self, pp: DataPP, ss: DataSS):
self.pp = pp
self.ss = ss

def get_hist_df(self) -> pl.DataFrame:
def get_mergedohlcv_df(self) -> pl.DataFrame:
"""
@description
Get historical dataframe, across many exchanges & pairs.
Get dataframe of all ohlcv data: merged from many exchanges & pairs.
@return
hist_df -- *polars* Dataframe. See class docstring
mergedohlcv_df -- *polars* Dataframe. See class docstring
"""
print("Get historical data, across many exchanges & pairs: begin.")

Expand All @@ -95,22 +95,23 @@ def get_hist_df(self) -> pl.DataFrame:
print(f" Data start: {pretty_timestr(self.ss.st_timestamp)}")
print(f" Data fin: {pretty_timestr(fin_ut)}")

self._update_parquet(fin_ut)
parquet_dfs = self._load_parquet(fin_ut)
hist_df = self._merge_parquet_dfs(parquet_dfs)
self._update_rawohlcv_files(fin_ut)
rawohlcv_dfs = self._load_rawohlcv_files(fin_ut)
mergedohlcv_df = self._merge_rawohlcv_dfs(rawohlcv_dfs)

print("Get historical data, across many exchanges & pairs: done.")

# postconditions
assert isinstance(hist_df, pl.DataFrame)
return hist_df
assert isinstance(mergedohlcv_df, pl.DataFrame)
return mergedohlcv_df

def _update_parquet(self, fin_ut: int):
print(" Update parquet.")
def _update_rawohlcv_files(self, fin_ut: int):
print(" Update all rawohlcv files: begin")
for exch_str, pair_str in self.ss.exchange_pair_tups:
self._update_hist_parquet_at_exch_and_pair(exch_str, pair_str, fin_ut)
self._update_rawohlcv_files_at_exch_and_pair(exch_str, pair_str, fin_ut)
print(" Update all rawohlcv files: done")

def _update_hist_parquet_at_exch_and_pair(
def _update_rawohlcv_files_at_exch_and_pair(
self, exch_str: str, pair_str: str, fin_ut: int
):
"""
Expand All @@ -120,9 +121,9 @@ def _update_hist_parquet_at_exch_and_pair(
fin_ut -- a timestamp, in ms, in UTC
"""
assert "/" in pair_str, f"pair_str={pair_str} needs '/'"
print(f" Update parquet at exchange={exch_str}, pair={pair_str}.")
print(f" Update rawohlcv file at exchange={exch_str}, pair={pair_str}.")

filename = self._hist_parquet_filename(exch_str, pair_str)
filename = self._rawohlcv_filename(exch_str, pair_str)
print(f" filename={filename}")

st_ut = self._calc_start_ut_maybe_delete(filename)
Expand Down Expand Up @@ -178,8 +179,8 @@ def _update_hist_parquet_at_exch_and_pair(
# add "datetime" col, more
df = transform_df(df)

# output to parquet
save_parquet(filename, df)
# output to file
save_rawohlcv_file(filename, df)

def _calc_start_ut_maybe_delete(self, filename: str) -> int:
"""
Expand Down Expand Up @@ -215,104 +216,107 @@ def _calc_start_ut_maybe_delete(self, filename: str) -> int:
os.remove(filename)
return self.ss.st_timestamp

def _load_parquet(self, fin_ut: int) -> Dict[str, Dict[str, pl.DataFrame]]:
def _load_rawohlcv_files(self, fin_ut: int) -> Dict[str, Dict[str, pl.DataFrame]]:
"""
@arguments
fin_ut -- finish timestamp
@return
parquet_dfs -- dict of [exch_str][pair_str] : df
rawohlcv_dfs -- dict of [exch_str][pair_str] : df
Where df has columns=OHLCV_COLS+"datetime", and index=timestamp
And pair_str is eg "BTC/USDT". Not "BTC-USDT", to avoid key issues
"""
print(" Load parquet.")
print(" Load rawohlcv file.")
st_ut = self.ss.st_timestamp

parquet_dfs: Dict[str, Dict[str, pl.DataFrame]] = {} # [exch][pair] : df
rawohlcv_dfs: Dict[str, Dict[str, pl.DataFrame]] = {} # [exch][pair] : df
for exch_str in self.ss.exchange_strs:
parquet_dfs[exch_str] = {}
rawohlcv_dfs[exch_str] = {}

for exch_str, pair_str in self.ss.exchange_pair_tups:
assert "/" in pair_str, f"pair_str={pair_str} needs '/'"
filename = self._hist_parquet_filename(exch_str, pair_str)
filename = self._rawohlcv_filename(exch_str, pair_str)
cols = [
signal_str # cols is a subset of TOHLCV_COLS
for e, signal_str, p in self.ss.input_feed_tups
if e == exch_str and p == pair_str
]
parquet_df = load_parquet(filename, cols, st_ut, fin_ut)
rawohlcv_df = load_rawohlcv_file(filename, cols, st_ut, fin_ut)

assert "datetime" in parquet_df.columns
assert "timestamp" in parquet_df.columns
assert "datetime" in rawohlcv_df.columns
assert "timestamp" in rawohlcv_df.columns

parquet_dfs[exch_str][pair_str] = parquet_df
rawohlcv_dfs[exch_str][pair_str] = rawohlcv_df

return parquet_dfs
return rawohlcv_dfs

def _merge_parquet_dfs(self, parquet_dfs: dict) -> pl.DataFrame:
def _merge_rawohlcv_dfs(self, rawohlcv_dfs: dict) -> pl.DataFrame:
"""
@arguments
parquet_dfs -- see class docstring
rawohlcv_dfs -- see class docstring
@return
hist_df -- see class docstring
mergedohlcv_df -- see class docstring
"""
# init hist_df such that it can do basic operations
print(" Merge parquet DFs.")
hist_df = initialize_df()
hist_df_cols = ["timestamp"]
for exch_str in parquet_dfs.keys():
for pair_str, parquet_df in parquet_dfs[exch_str].items():
# init mergedohlcv_df such that it can do basic operations
print(" Merge rawohlcv dataframes.")
mergedohlcv_df = initialize_df() # grow this
mergedohlcv_cols = ["timestamp"] # grow this
for exch_str in rawohlcv_dfs.keys():
for pair_str, rawohlcv_df in rawohlcv_dfs[exch_str].items():
assert "/" in pair_str, f"pair_str={pair_str} needs '/'"
assert "datetime" in parquet_df.columns
assert "timestamp" in parquet_df.columns
assert "datetime" in rawohlcv_df.columns
assert "timestamp" in rawohlcv_df.columns

for parquet_col in parquet_df.columns:
if parquet_col in ["timestamp", "datetime"]:
for rawohlcv_col in rawohlcv_df.columns:
if rawohlcv_col in ["timestamp", "datetime"]:
continue

signal_str = parquet_col # eg "close"
hist_col = f"{exch_str}:{pair_str}:{signal_str}"
signal_str = rawohlcv_col # eg "close"
mergedohlcv_col = f"{exch_str}:{pair_str}:{signal_str}"

parquet_df = parquet_df.with_columns(
[pl.col(parquet_col).alias(hist_col)]
rawohlcv_df = rawohlcv_df.with_columns(
[pl.col(rawohlcv_col).alias(mergedohlcv_col)]
)
hist_df_cols.append(hist_col)
mergedohlcv_cols.append(mergedohlcv_col)

# drop columns we won't merge
# drop original OHLCV cols and datetime
parquet_df = parquet_df.drop(OHLCV_COLS)
if "datetime" in hist_df.columns:
parquet_df = parquet_df.drop("datetime")
rawohlcv_df = rawohlcv_df.drop(OHLCV_COLS)
if "datetime" in mergedohlcv_df.columns:
rawohlcv_df = rawohlcv_df.drop("datetime")

# drop OHLCV from hist_df_cols
hist_df_cols = [x for x in hist_df_cols if x not in OHLCV_COLS]
# only keep OHCLV cols
mergedohlcv_cols = [col for col in mergedohlcv_cols
if col not in OHLCV_COLS]

# join to hist_df
if hist_df.shape[0] == 0:
hist_df = parquet_df
# join rawohclv_df into mergedohlcv_df
if mergedohlcv_df.shape[0] == 0:
mergedohlcv_df = rawohlcv_df
else:
hist_df = hist_df.join(parquet_df, on="timestamp", how="outer")
mergedohlcv_df = mergedohlcv_df.join(
rawohlcv_df, on="timestamp", how="outer"
)

# select columns in-order [timestamp, ..., datetime]
hist_df = hist_df.select(hist_df_cols + ["datetime"])
mergedohlcv_df = mergedohlcv_df.select(mergedohlcv_cols + ["datetime"])

assert "datetime" in hist_df.columns
assert "timestamp" in hist_df.columns
assert "datetime" in mergedohlcv_df.columns
assert "timestamp" in mergedohlcv_df.columns

return hist_df
return mergedohlcv_df

def _hist_parquet_filename(self, exch_str, pair_str) -> str:
def _rawohlcv_filename(self, exch_str, pair_str) -> str:
"""
@description
Computes a name for the parquet file.
Computes a filename for the rawohlcv data.
@arguments
exch_str -- eg "binanceus"
pair_str -- eg "BTC/USDT" or "BTC-USDT"
@return
parquet_filename -- name for parquet file.
rawohlcv_filename --
@notes
If pair_str has '/', it will become '-' in the filename.
Expand Down
6 changes: 3 additions & 3 deletions pdr_backend/data_eng/plutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def concat_next_df(df: pl.DataFrame, next_df: pl.DataFrame) -> pl.DataFrame:


@enforce_types
def save_parquet(filename: str, df: pl.DataFrame):
def save_rawohlcv_file(filename: str, df: pl.DataFrame):
"""write to parquet file
parquet only supports appending via the pyarrow engine
"""
Expand All @@ -88,7 +88,7 @@ def save_parquet(filename: str, df: pl.DataFrame):


@enforce_types
def load_parquet(filename: str, cols=None, st=None, fin=None) -> pl.DataFrame:
def load_rawohlcv_file(filename: str, cols=None, st=None, fin=None) -> pl.DataFrame:
"""Load parquet file as a dataframe.
Features:
Expand All @@ -109,7 +109,7 @@ def load_parquet(filename: str, cols=None, st=None, fin=None) -> pl.DataFrame:
Polars does not have an index. "timestamp" is a regular col and required for "datetime"
(1) Don't specify "datetime" as a column, as that'll get calc'd from timestamp
TO DO: Fix (1), save_parquet already saves out dataframe.
TO DO: Fix (1), save_rawohlcv_file already saves out dataframe.
Either don't save datetime, or save it and load it so it doesn't have to be re-computed.
"""
# handle cols
Expand Down
Loading

0 comments on commit 4460259

Please sign in to comment.