Skip to content

Commit

Permalink
add toplevel collect() function and add python adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
sslivkoff committed Jul 22, 2023
1 parent bfbe755 commit 953762d
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 9 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions crates/freeze/src/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@ use std::collections::HashMap;

use polars::prelude::*;

use crate::types::Chunk;
use crate::types::CollectError;
use crate::types::Datatype;
use crate::types::MultiQuery;
use crate::types::SingleQuery;
use crate::types::Source;

/// collect data and return as dataframe
pub async fn collect(_query: SingleQuery, _source: Source) -> Result<DataFrame, CollectError> {
todo!()
pub async fn collect(query: SingleQuery, source: Source) -> Result<DataFrame, CollectError> {
let chunk: Chunk = query.chunks.into();
let filter = query.row_filter.as_ref();
query
.datatype
.dataset()
.collect_chunk(&chunk, &source, &query.schema, filter)
.await
}

/// collect data and return as dataframe
Expand Down
10 changes: 10 additions & 0 deletions crates/freeze/src/types/chunks/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,13 @@ impl Chunk {
}
}
}

impl From<Vec<Chunk>> for Chunk {
fn from(chunks: Vec<Chunk>) -> Self {
match chunks.len() {
0 => panic!("invalid empty chunk range"),
1 => chunks.into_iter().next().unwrap(),
_ => todo!("not implemented yet"),
}
}
}
28 changes: 27 additions & 1 deletion crates/freeze/src/types/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use crate::types::Table;
/// Query multiple data types
#[derive(Clone)]
pub struct SingleQuery {
/// Datatype for query
pub datatype: Datatype,
/// Schemas for each datatype to collect
pub schemas: Table,
pub schema: Table,
/// Block chunks to collect
pub chunks: Vec<Chunk>,
/// Row filter
Expand All @@ -36,3 +38,27 @@ pub struct RowFilter {
/// address to filter for
pub address: Option<ValueOrArray<H160>>,
}

impl From<MultiQuery> for SingleQuery {
fn from(query: MultiQuery) -> Self {
let (datatype, schema) = match query.schemas.len() {
0 => panic!("bad query, needs 1 datatype"),
1 => {
let datatype_schema = query
.schemas
.iter()
.next()
.expect("Expected at least one schema");
(*datatype_schema.0, datatype_schema.1.clone())
}
_ => panic!("bad query, needs 1 datatype"),
};
let row_filter = query.row_filters.get(&datatype).cloned();
SingleQuery {
datatype,
schema,
chunks: query.chunks,
row_filter,
}
}
}
3 changes: 3 additions & 0 deletions crates/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ crate-type = ["cdylib"]

[dependencies]
cryo_cli = { version = "0.1.0", path = "../cli" }
cryo_freeze = { version = "0.1.0", path = "../freeze" }
polars = { version = "0.30.0", features = ["parquet", "string_encoding", "polars-lazy", "lazy", "binary_encoding", "json", "dtype-struct"] }
pyo3 = { version = "0.18.0", features = ["extension-module"] }
pyo3-asyncio = { version = "0.18.0", features = ["tokio-runtime"] }
pyo3-polars = "0.4.0"
tokio = "1.29.0"
2 changes: 1 addition & 1 deletion crates/python/python/cryo/_collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async def async_collect(

from . import _cryo_rust

result = _cryo_rust._collect(*args, **kwargs)
result = await _cryo_rust._collect(*args, **kwargs)

if output_format == 'polars':
return result
Expand Down
14 changes: 12 additions & 2 deletions crates/python/python/cryo/_freeze.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async def async_freeze(
end_block: _spec.BlockReference = 'latest',
hex: bool = False,
file_format: _spec.FileFormat = 'parquet',
compresion: _spec.FileCompression,
compresion: _spec.FileCompression = 'lz4',
**kwargs,
) -> None:
"""asynchronously collect data and save to files"""
Expand All @@ -22,11 +22,21 @@ async def async_freeze(

if isinstance(datatype, str):
datatypes = [datatype]
elif isinstance(datatypes, list):
elif isinstance(datatype, list):
datatypes = datatype
else:
raise Exception('invalid format for datatype(s)')

if start_block is not None and end_block is not None:
blocks = str(start_block) + ':' + str(end_block)
elif start_block is not None:
blocks = str(start_block) + ':'
elif end_block is not None:
blocks = ':' + str(end_block)
else:
blocks = None
kwargs['blocks'] = [blocks]

if file_format == 'parquet':
pass
elif file_format == 'json':
Expand Down
124 changes: 121 additions & 3 deletions crates/python/src/collect_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,136 @@
use polars::prelude::*;
use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use pyo3_polars::PyDataFrame;

use cryo_cli::parse_opts;
use cryo_cli::Args;
use cryo_freeze::collect;


#[pyfunction(
signature = (
datatype,
blocks,
*,
align = false,
reorg_buffer = 0,
include_columns = None,
exclude_columns = None,
columns = None,
hex = false,
sort = None,
rpc = None,
network_name = None,
requests_per_second = None,
max_concurrent_requests = None,
max_concurrent_chunks = None,
dry = false,
chunk_size = 1000,
n_chunks = None,
output_dir = ".".to_string(),
file_suffix = None,
overwrite = false,
csv = false,
json = false,
row_group_size = None,
n_row_groups = None,
no_stats = false,
compression = vec!["lz4".to_string()],
contract = None,
topic0 = None,
topic1 = None,
topic2 = None,
topic3 = None,
inner_request_size = 1,
)
)]
#[allow(clippy::too_many_arguments)]
pub fn _collect(
py: Python<'_>,
datatype: Vec<String>,
blocks: Vec<String>,
align: bool,
reorg_buffer: u64,
include_columns: Option<Vec<String>>,
exclude_columns: Option<Vec<String>>,
columns: Option<Vec<String>>,
hex: bool,
sort: Option<Vec<String>>,
rpc: Option<String>,
network_name: Option<String>,
requests_per_second: Option<u32>,
max_concurrent_requests: Option<u64>,
max_concurrent_chunks: Option<u64>,
dry: bool,
chunk_size: u64,
n_chunks: Option<u64>,
output_dir: String,
file_suffix: Option<String>,
overwrite: bool,
csv: bool,
json: bool,
row_group_size: Option<usize>,
n_row_groups: Option<usize>,
no_stats: bool,
compression: Vec<String>,
contract: Option<String>,
topic0: Option<String>,
topic1: Option<String>,
topic2: Option<String>,
topic3: Option<String>,
inner_request_size: u64,
) -> PyResult<&PyAny> {
let args = Args {
datatype,
blocks,
align,
reorg_buffer,
include_columns,
exclude_columns,
columns,
hex,
sort,
rpc,
network_name,
requests_per_second,
max_concurrent_requests,
max_concurrent_chunks,
dry,
chunk_size,
n_chunks,
output_dir,
file_suffix,
overwrite,
csv,
json,
row_group_size,
n_row_groups,
no_stats,
compression,
contract,
topic0,
topic1,
topic2,
topic3,
inner_request_size,
};

pyo3_asyncio::tokio::future_into_py(py, async move {
match run(args).await {
Ok(()) => Ok(Python::with_gil(|py| py.None())),
match run_collect(args).await {
// Ok(df) => Ok(Python::with_gil(|py| py.None())),
Ok(df) => Ok(PyDataFrame(df)),
Err(_e) => Err(PyErr::new::<PyTypeError, _>("failed")),
}
})
}

async fn run_collect(args: Args) -> PolarsResult<DataFrame> {
let (query, source, _sink) = match parse_opts(&args).await {
Ok(opts) => opts,
Err(_e) => panic!(),
};
match collect(query.into(), source).await {
Ok(df) => Ok(df),
Err(_e) => panic!(),
}
}
2 changes: 2 additions & 0 deletions crates/python/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod collect_adapter;
mod freeze_adapter;

use pyo3::prelude::*;
Expand All @@ -15,5 +16,6 @@ fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
fn cryo_rust(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
m.add_function(wrap_pyfunction!(freeze_adapter::_freeze, m)?)?;
m.add_function(wrap_pyfunction!(collect_adapter::_collect, m)?)?;
Ok(())
}

0 comments on commit 953762d

Please sign in to comment.