Skip to content

Commit

Permalink
More replay doc
Browse files Browse the repository at this point in the history
  • Loading branch information
aandres committed Aug 5, 2023
1 parent ef4eaef commit b5c239a
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 21 deletions.
20 changes: 18 additions & 2 deletions docs/concepts/3_replay.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,22 @@ The replay driver is responsible for putting the dag, context, sources and sinks
```


## Reading files partitioned by time
## Reading Files Partitioned By Time

TODO
Assuming:

- you want to replay a dag for a long period of time.
- all that historic data doesn't fit into time
- the data is partitioned by time period. For example one file per day, `input_2023-01-01.csv`.

It's then possible, with the `IteratorDataSourceAdapter` to load each file one by one as they are needed.

In this example, csv files are stored under . We need to provide:

- a generator that will yield a `DataSource` for each file, in order
- a way to concatenate the output of 2 `DataSource`. In this case we'll use `+` to merge two lists
- an empty value for the case there is no more data, or we reach the last file.

```python
--8<-- "examples/replay_concepts.py:iterator_data_source_adapter"
```
112 changes: 93 additions & 19 deletions examples/replay_concepts.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# isort: skip_file
# ruff: noqa: E402
import operator

import beavers


# --8<-- [start:simple_dag]
dag = beavers.Dag()
my_source = dag.source_stream(name="my_source")
Expand Down Expand Up @@ -62,7 +64,9 @@ def get_next(self) -> pd.Timestamp:


# --8<-- [start:replay_context]
replay_context = beavers.replay.ReplayContext(
from beavers.replay import ReplayContext

replay_context = ReplayContext(
start=pd.to_datetime("2023-01-01T00:00:00Z"),
end=pd.to_datetime("2023-01-02T00:00:00Z"),
frequency=pd.to_timedelta("1h"),
Expand All @@ -76,7 +80,7 @@ class CsvDataSourceProvider:
file_name: str

def __call__(
self, replay_context: beavers.replay.ReplayContext
self, replay_context: ReplayContext
) -> beavers.replay.DataSource[list[Message]]:
df = pd.read_csv(self.file_name, parse_dates=["timestamp"])
messages = [Message(*row) for _, row in df.iterrows()]
Expand All @@ -91,37 +95,31 @@ def __call__(
@dataclasses.dataclass(frozen=True)
class CsvDataSink:
destination: str
data: list[pd.DataFrame] = dataclasses.field(default_factory=list)
data: list[Message] = dataclasses.field(default_factory=list)

def append(self, timestamp: pd.Timestamp, data: pd.DataFrame):
self.data.append(data)
def append(self, timestamp: pd.Timestamp, data: list[Message]):
self.data.extend(data)

def close(self):
pd.concat(self.data).to_csv(self.destination)
pd.DataFrame([dataclasses.asdict(value) for value in self.data]).to_csv(
self.destination, index=False
)


# --8<-- [end:data_sink]


# --8<-- [start:data_sink_provider]
@dataclasses.dataclass(frozen=True)
class CsvDataSinkProvider:
destination: str

def __call__(self, replay_context: beavers.replay.ReplayContext) -> CsvDataSink:
def __call__(self, replay_context: ReplayContext) -> CsvDataSink:
return CsvDataSink(self.destination)


# --8<-- [end:data_sink_provider]

# --8<-- [start:replay_driver]
replay_driver = beavers.replay.ReplayDriver.create(
dag=dag,
replay_context=replay_context,
data_source_providers={"my_source": CsvDataSourceProvider()},
data_sink_providers={"my_sink": CsvDataSinkProvider()},
)
replay_driver.run()
# --8<-- [end:replay_driver]

# This is just to print the csv file:
file = "data.csv"
Expand All @@ -134,10 +132,86 @@ def __call__(self, replay_context: beavers.replay.ReplayContext) -> CsvDataSink:
"message": ["Hello", "How are you"],
}
)
df.to_csv("data.csv", index=False)
df.to_csv("input.csv", index=False)

df_after = pd.read_csv("data.csv", parse_dates=["timestamp"])
df_after = pd.read_csv("input.csv", parse_dates=["timestamp"])
pd.testing.assert_frame_equal(df, df_after)

messages = [Message(*row) for _, row in df_after.iterrows()]
print(messages)

df2 = pd.DataFrame(
{
"timestamp": [
pd.Timestamp("2023-01-02T01:00:00Z"),
pd.Timestamp("2023-01-02T01:01:00Z"),
],
"message": ["I'm fine", "Thanks"],
}
)
df.to_csv("input_2023-01-01.csv", index=False)
df2.to_csv("input_2023-01-02.csv", index=False)
df2[:0].to_csv("input_2023-01-03.csv", index=False)


# --8<-- [start:replay_driver]
from beavers.replay import ReplayDriver

replay_driver = beavers.replay.ReplayDriver.create(
dag=dag,
replay_context=replay_context,
data_source_providers={"my_source": CsvDataSourceProvider("input.csv")},
data_sink_providers={"my_sink": CsvDataSinkProvider("output.csv")},
)
replay_driver.run()
# --8<-- [end:replay_driver]


# --8<-- [start:iterator_data_source_adapter]
from beavers.replay import IteratorDataSourceAdapter


@dataclasses.dataclass(frozen=True)
class PartitionedCsvDataSourceProvider:
source_format: str

def __call__(self, replay_context: ReplayContext):
file_names = [
self.source_format.format(date=date)
for date in pd.date_range(replay_context.start, replay_context.end)
]
generator = (self._load_one_file(file_name) for file_name in file_names)
return IteratorDataSourceAdapter(
sources=generator,
empty=[],
concatenator=operator.add,
)

def _load_one_file(self, file_name: str) -> MessageDataSource:
return MessageDataSource(
[
Message(*row)
for _, row in pd.read_csv(
file_name, parse_dates=["timestamp"]
).iterrows()
]
)


source_provider = PartitionedCsvDataSourceProvider("input_{date:%Y-%m-%d}.csv")
# --8<-- [end:iterator_data_source_adapter]

# --8<-- [start:iterator_data_source_adapter_run]
ReplayDriver.create(
dag=dag,
replay_context=ReplayContext(
start=pd.to_datetime("2023-01-01T00:00:00Z"),
end=pd.to_datetime("2023-01-03T00:00:00Z"),
frequency=pd.to_timedelta("1h"),
),
data_source_providers={
"my_source": PartitionedCsvDataSourceProvider("input_{date:%Y-%m-%d}.csv")
},
data_sink_providers={"my_sink": CsvDataSinkProvider("output.csv")},
).run()

# --8<-- [end:iterator_data_source_adapter_run]

0 comments on commit b5c239a

Please sign in to comment.