Skip to content

Commit

Permalink
Merge pull request #110 from Point72/pit/pandas_extension_example
Browse files Browse the repository at this point in the history
Add example for the pandas extension types
  • Loading branch information
timkpaine authored Feb 22, 2024
2 parents 2cc848e + 7c74506 commit dbade0c
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 74 deletions.
74 changes: 0 additions & 74 deletions csp/impl/wiring/edge.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,3 @@
from types import MappingProxyType

_UFUNC_MAP = MappingProxyType(
{
"add": lambda x, y: x.__add__(y) if isinstance(x, Edge) else y.__add__(x),
"subtract": lambda x, y: x.__sub__(y) if isinstance(x, Edge) else y.__sub__(x),
"multiply": lambda x, y: x.__mul__(y) if isinstance(x, Edge) else y.__mul__(x),
"divide": lambda x, y: x.__truediv__(y) if isinstance(x, Edge) else y.__truediv__(x),
"floor_divide": lambda x, y: x.__floordiv__(y) if isinstance(x, Edge) else y.__floordiv__(x),
"power": lambda x, y: x.pow(y),
"pos": lambda x: x.pos(),
"neg": lambda x: x.neg(),
"abs": lambda x: x.abs(),
"log": lambda x: x.ln(),
"log2": lambda x: x.log2(),
"log10": lambda x: x.log10(),
"exp": lambda x: x.exp(),
"exp2": lambda x: x.exp2(),
"sin": lambda x: x.sin(),
"cos": lambda x: x.cos(),
"tan": lambda x: x.tan(),
"arcsin": lambda x: x.asin(),
"arccos": lambda x: x.acos(),
"arctan": lambda x: x.atan(),
"sqrt": lambda x: x.sqrt(),
"erf": lambda x: x.erf(),
}
)


class Edge:
__slots__ = ["tstype", "nodedef", "output_idx", "basket_idx"]
Expand Down Expand Up @@ -209,36 +180,6 @@ def exp(self):

return csp.exp(self)

def sin(self):
import csp

return csp.sin(self)

def cos(self):
import csp

return csp.cos(self)

def tan(self):
import csp

return csp.tan(self)

def arcsin(self):
import csp

return csp.arcsin(self)

def arccos(self):
import csp

return csp.arccos(self)

def arctan(self):
import csp

return csp.arctan(self)

def sqrt(self):
import csp

Expand All @@ -249,21 +190,6 @@ def erf(self):

return csp.erf(self)

def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
ufunc_func = _UFUNC_MAP.get(ufunc.__name__, None)
if ufunc_func:
if ufunc.__name__ in (
"add",
"subtract",
"multiply",
"divide",
"floor_divide",
"power",
):
return ufunc_func(inputs[0], inputs[1])
return ufunc_func(self)
raise NotImplementedError("Not Implemented for type csp.Edge: {}".format(ufunc))

def __getattr__(self, key):
from csp.impl.struct import Struct

Expand Down
98 changes: 98 additions & 0 deletions examples/2_intermediate/e_18_pandas_extension_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
This example illustrates how csp edges can be used inside a pandas data frame via the pandas extension type mechanism.
"""
import numpy as np
import pandas as pd
import random
from datetime import datetime, timedelta

import csp
import csp.impl.pandas_accessor # This registers the "csp" accessors on pd.Series and pd.DataFrame
from csp.impl.pandas_ext_type import TsDtype
from csp.random import brownian_motion
from csp.stats import numpy_to_list


def main():
random.seed(1234)
rng = np.random.default_rng(seed=None)
N = 10
symbols = [f"SYMBOL_{i}" for i in range(N)]
regions = [random.choice(["US", "EU", "AP"]) for _ in range(N)]
exchanges = [region + random.choice(["X", "Y"]) for region in regions]
open_prices = np.round(np.random.rand(N) * 200, 2)

# Create a static data frame
df = pd.DataFrame(
{
"region": regions,
"exchange": exchanges,
"open_price": open_prices,
},
index=symbols,
)

print("Create a standard dataframe...")
print(df)
print()
print("Add some time series...")
trigger = csp.timer(timedelta(seconds=2))
mids = (
brownian_motion(
trigger, drift=csp.const(np.zeros(N)), covariance=csp.const(0.01 * np.diag(np.ones(N))), seed=rng
).apply(np.exp)
* open_prices
) # Ignore drift adjustment for simplicity
df["mid"] = pd.Series(numpy_to_list(mids, len(df.index)), index=df.index, dtype=TsDtype(float))
print(df)
print()
print("Compute bid and ask columns...")
width = csp.const(0.25)
df["bid"] = df["mid"] - width / 2.0
df["ask"] = df["mid"] + width / 2.0
print(df)
print()
print("Notice the dtypes on the frame...")
print(df.dtypes)
print()
print('Snap a "live" version of the frame...')
print(df.csp.snap())
print()
print("Compute an edge for the weighted average price...")
weights = np.array([random.randint(0, 10) for _ in symbols])
weighted_price = (df["mid"] * weights).sum() / weights.sum()
print(weighted_price)
print()
print("Run the weighted price as a graph...")
for timestamp, value in weighted_price.run(starttime=datetime.utcnow(), endtime=timedelta(seconds=6)):
print(timestamp, value)
print()
print()
# Numeric group-by aggregation is not supported on extension types until pandas version 1.3
if pd.__version__ >= "1.3":
print("Aggregate by exchange (mix of csp and non-csp results)...")
df_agg = df.groupby("exchange")["mid"].agg(["count", "mean", "sum"])
print(df_agg)
print()
print("Run the aggregate frame as of now")
print(df_agg.csp.run(datetime.utcnow(), timedelta(seconds=6)))
print()
print()
print()
print("Convert the original frame to a standard pandas frame with an extra index for the time dimension")
out = df.csp.run(datetime(2024, 1, 1), timedelta(seconds=10))
print(out)
print(
"Convert the above result back to the original dataframe by turning the mid, bid, and ask column into edges and applying "
'a "last" aggregation to the static columns'
)
df2 = out.to_csp(columns=["bid", "ask", "mid"], agg="last")
print(df2)
print()
print('Apply a non-csp function (i.e. np.log) to transform all the edges in the "mid" column, and run it')
out2 = df2["mid"].csp.apply(np.log).csp.run(starttime=datetime(2024, 1, 1), endtime=timedelta(seconds=10))
print(out2)


if __name__ == "__main__":
main()

0 comments on commit dbade0c

Please sign in to comment.