Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A few changes (w4) #404

Merged
merged 6 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/public_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"has_leak",
"event_set",
"input_node",
"input_node_from_schema",
"plot",
"compile",
"config",
Expand Down
Empty file.
2 changes: 1 addition & 1 deletion temporian/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

# EventSetNodes
from temporian.core.data.node import EventSetNode
from temporian.core.data.node import input_node
from temporian.core.data.node import input_node, input_node_from_schema

# Dtypes
from temporian.core.data.dtype import float64
Expand Down
8 changes: 8 additions & 0 deletions temporian/beam/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ def run_multi_io(

data = {**inputs}

# Check that operators implementations are available
needed_operators = set()
for step in schedule.steps:
needed_operators.add(step.op.definition.key)
implementation_lib.check_operators_implementations_are_available(
needed_operators
)

num_steps = len(schedule.steps)
for step_idx, step in enumerate(schedule.steps):
operator_def = step.op.definition
Expand Down
14 changes: 13 additions & 1 deletion temporian/beam/implementation_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,26 @@

"""Registering mechanism for operator implementation classes."""

from typing import Any, Dict
from typing import Any, Dict, Set

_OPERATOR_IMPLEMENTATIONS = {}

# TODO: Create a "registration" module to handle in-process and beam operator
# registration.


def check_operators_implementations_are_available(needed: Set[str]):
"""Checks if operator implementations are available."""
missing = set(needed) - set(_OPERATOR_IMPLEMENTATIONS.keys())
if missing:
raise ValueError(
f"Unknown operator implementations '{missing}' for Beam backend. It"
" seems this operator is only available for the in-process"
" Temporian backend. Available Beam operator implementations are:"
f" {list(_OPERATOR_IMPLEMENTATIONS.keys())}."
)


def register_operator_implementation(
operator_class, operator_implementation_class
):
Expand Down
43 changes: 41 additions & 2 deletions temporian/core/data/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,10 @@ def __repr__(self) -> str:


def input_node(
features: List[Tuple[str, DType]],
indexes: Optional[List[Tuple[str, IndexDType]]] = None,
features: Union[List[FeatureSchema], List[Tuple[str, DType]]],
indexes: Optional[
Union[List[IndexSchema], List[Tuple[str, IndexDType]]]
] = None,
is_unix_timestamp: bool = False,
same_sampling_as: Optional[EventSetNode] = None,
name: Optional[str] = None,
Expand Down Expand Up @@ -245,6 +247,43 @@ def input_node(
)


def input_node_from_schema(
schema: Schema,
same_sampling_as: Optional[EventSetNode] = None,
name: Optional[str] = None,
) -> EventSetNode:
"""Creates an input [`EventSetNode`][temporian.EventSetNode] from a schema.

Usage example:

```python
>>> # Create two nodes with the same schema.
>>> a = tp.input_node(features=[("f1", tp.float64), ("f2", tp.str_)])
>>> b = tp.input_node_from_schema(a.schema)

```

Args:
schema: Schema of the node.
same_sampling_as: If set, the created EventSetNode is guaranteed to have the
same sampling as same_sampling_as`. In this case, `indexes` and
`is_unix_timestamp` should not be provided. Some operators require
for input EventSetNodes to have the same sampling.
name: Name for the EventSetNode.

Returns:
EventSetNode with the given specifications.
"""

return input_node(
features=schema.features,
indexes=schema.indexes,
is_unix_timestamp=schema.is_unix_timestamp,
same_sampling_as=same_sampling_as,
name=name,
)


@dataclass
class Sampling:
"""A sampling is a reference to the way data is sampled."""
Expand Down
48 changes: 48 additions & 0 deletions temporian/core/data/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import List, Tuple, Dict, Union

from temporian.core.data.dtype import DType, IndexDType
from google.protobuf import text_format


@dataclass(frozen=True)
Expand Down Expand Up @@ -134,6 +135,53 @@ def check_compatible_features(self, other: Schema, check_order: bool):
f"{self.feature_names} and {other.feature_names}."
)

def to_proto(self) -> "serialization.pb.Schema":
"""Converts the schema into a protobuf.

Usage example:
```
schema = tp.Schema(features=[("f1",int), (f2, float)])
proto_schema = schema.to_proto()
restored_schema = tp.Schema.from_proto(proto_schema)
```
"""
from temporian.core import serialization

return serialization._serialize_schema(self)

def to_proto_file(self, path: str) -> None:
"""Save the schema to a file with text protobuf format.

Usage example:
```
schema = tp.Schema(features=[("f1",int), (f2, float)])
path = "/tmp/my_schema.pbtxt"
schema.to_proto_file(path)
restored_schema = tp.Schema.from_proto_file(path)
```
"""
proto = self.to_proto()
with open(path, "wb") as f:
f.write(text_format.MessageToBytes(proto))

@classmethod
def from_proto(cls, proto: "serialization.pb.Schema") -> "Schema":
"""Creates a schema from a protobuf."""

from temporian.core import serialization

return serialization._unserialize_schema(proto)

@classmethod
def from_proto_file(cls, path: str) -> "Schema":
"""Creates a schema from a file text protobuf."""

from temporian.core import serialization

with open(path, "rb") as f:
proto = text_format.Parse(f.read(), serialization.pb.Schema())
return Schema.from_proto(proto)


def _normalize_feature(x):
if isinstance(x, FeatureSchema):
Expand Down
13 changes: 13 additions & 0 deletions temporian/core/data/test/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,16 @@ py_test(
"//temporian/core/data:dtype",
],
)


py_test(
name = "schema_test",
srcs = ["schema_test.py"],
srcs_version = "PY3",
deps = [
# already_there/absl/testing:absltest
# already_there/absl/testing:parameterized
"//temporian/core/data:schema",
"//temporian/core:serialization",
],
)
29 changes: 29 additions & 0 deletions temporian/core/data/test/schema_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2021 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from absl.testing import absltest
from temporian.core.data import schema
from temporian.core.data.dtype import DType


class SchemaTest(absltest.TestCase):
def test_proto(self):
a = schema.Schema(features=[("f1", DType.INT32), ("f2", DType.FLOAT64)])
p = a.to_proto()
b = schema.Schema.from_proto(p)
self.assertEqual(a, b)


if __name__ == "__main__":
absltest.main()
8 changes: 8 additions & 0 deletions temporian/core/data/test/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

from temporian.core.test import utils
from temporian.implementation.numpy.data.event_set import EventSet
from temporian.core.data.node import (
input_node_from_schema,
)


class EventSetNodeTest(absltest.TestCase):
Expand All @@ -26,6 +29,11 @@ def test_run_input(self):
self.assertIsInstance(result, EventSet)
self.assertTrue(result is evset)

def test_input_node_from_schema(self):
node = utils.create_input_node()
other_node = input_node_from_schema(node.schema)
self.assertEqual(node.schema, other_node.schema)

def test_run_single_operator(self):
evset = utils.create_input_event_set()
result = evset.node().simple_moving_average(10)
Expand Down
2 changes: 1 addition & 1 deletion temporian/core/operators/filter_moving_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(
self.add_output(
"output",
create_node_new_features_new_sampling(
features=[],
features=input.schema.features,
indexes=input.schema.indexes,
is_unix_timestamp=input.schema.is_unix_timestamp,
creator=self,
Expand Down
10 changes: 10 additions & 0 deletions temporian/core/operators/test/test_filter_moving_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ def test_base(self, input_timestamps, expected_timestamps, win_length):
self, result, expected_output, check_sampling=False
)

def test_with_feature(
self,
):
evset = event_set([1, 2, 4], {"f": [10, 11, 14]})
expected_output = event_set([1, 4], {"f": [10, 14]})
result = evset.filter_moving_count(window_length=1.5)
assertOperatorResult(
self, result, expected_output, check_sampling=False
)


if __name__ == "__main__":
absltest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ def __call__(self, input: EventSet) -> Dict[str, EventSet]:

# Fill output EventSet's data
for index_key, index_data in input.data.items():
dst_timestamps = operators_cc.filter_moving_count(
sampling_idx = operators_cc.filter_moving_count(
index_data.timestamps,
window_length=window_length,
)

output_evset.set_index_value(
index_key,
IndexData(
features=[],
timestamps=dst_timestamps,
features=[f[sampling_idx] for f in index_data.features],
achoum marked this conversation as resolved.
Show resolved Hide resolved
timestamps=index_data.timestamps[sampling_idx],
schema=output_schema,
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
namespace {
namespace py = pybind11;

py::array_t<double> filter_moving_count(
py::array_t<Idx> filter_moving_count(
const py::array_t<double> &event_timestamps, const double window_length) {
// Input size
const Idx n_event = event_timestamps.shape(0);

// Access raw input / output data
auto v_event = event_timestamps.unchecked<1>();

std::vector<double> output;
std::vector<Idx> output;

// Index of the last emitted event. If -1, no event was emitted so far.
Idx last_emitted_idx = -1;
Expand All @@ -31,7 +31,7 @@ py::array_t<double> filter_moving_count(
(t - v_event[last_emitted_idx]) >= window_length) {
// Emitting event.
last_emitted_idx = event_idx;
output.push_back(t);
output.push_back(event_idx);
}
}

Expand Down
14 changes: 14 additions & 0 deletions temporian/test/api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,20 @@ def test_duration_to_string(self):
"2d3h",
)

def test_schema_to_from_proto(self):
a = tp.Schema(features=[("f1", tp.int32), ("f2", tp.float64)])
p = a.to_proto()
b = tp.Schema.from_proto(p)
self.assertEqual(a, b)

def test_schema_to_from_proto_file(self):
with tempfile.TemporaryDirectory() as tempdir:
path = os.path.join(tempdir, "schema.pbtxt")
a = tp.Schema(features=[("f1", tp.int32), ("f2", tp.float64)])
a.to_proto_file(path)
b = tp.Schema.from_proto_file(path)
self.assertEqual(a, b)


if __name__ == "__main__":
absltest.main()
Loading