Skip to content

Commit

Permalink
Merge branch 'master' into feature/issue-3778-u
Browse files Browse the repository at this point in the history
  • Loading branch information
shuchu committed Feb 15, 2024
2 parents 35a224f + 5c9f592 commit e90b90e
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Callable, List, Literal, Optional, Sequence, Union, cast

import dill
import pandas
import pandas as pd
import pyarrow
from tqdm import tqdm
Expand Down Expand Up @@ -178,9 +179,9 @@ def _materialize_one(
self.repo_config.batch_engine.partitions
)

spark_df.foreachPartition(
lambda x: _process_by_partition(x, spark_serialized_artifacts)
)
spark_df.mapInPandas(
lambda x: _map_by_partition(x, spark_serialized_artifacts), "status int"
).count() # dummy action to force evaluation

return SparkMaterializationJob(
job_id=job_id, status=MaterializationJobStatus.SUCCEEDED
Expand Down Expand Up @@ -225,38 +226,40 @@ def unserialize(self):
return feature_view, online_store, repo_config


def _process_by_partition(rows, spark_serialized_artifacts: _SparkSerializedArtifacts):
"""Load pandas df to online store"""

# convert to pyarrow table
dicts = []
for row in rows:
dicts.append(row.asDict())
def _map_by_partition(iterator, spark_serialized_artifacts: _SparkSerializedArtifacts):
for pdf in iterator:
if pdf.shape[0] == 0:
print("Skipping")
return

df = pd.DataFrame.from_records(dicts)
if df.shape[0] == 0:
print("Skipping")
return
table = pyarrow.Table.from_pandas(pdf)

table = pyarrow.Table.from_pandas(df)
(
feature_view,
online_store,
repo_config,
) = spark_serialized_artifacts.unserialize()

if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
)

# unserialize artifacts
feature_view, online_store, repo_config = spark_serialized_artifacts.unserialize()
join_key_to_value_type = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}

if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
rows_to_write = _convert_arrow_to_proto(
table, feature_view, join_key_to_value_type
)
online_store.online_write_batch(
repo_config,
feature_view,
rows_to_write,
lambda x: None,
)

join_key_to_value_type = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}

rows_to_write = _convert_arrow_to_proto(table, feature_view, join_key_to_value_type)
online_store.online_write_batch(
repo_config,
feature_view,
rows_to_write,
lambda x: None,
)
yield pd.DataFrame(
[pd.Series(range(1, 2))]
) # dummy result because mapInPandas needs to return something
48 changes: 22 additions & 26 deletions sdk/python/requirements/py3.10-ci-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ black==22.12.0
# via feast (setup.py)
bleach==6.1.0
# via nbconvert
boto3==1.34.39
boto3==1.34.42
# via
# feast (setup.py)
# moto
botocore==1.34.39
botocore==1.34.42
# via
# boto3
# moto
Expand Down Expand Up @@ -123,9 +123,7 @@ comm==0.2.1
# ipykernel
# ipywidgets
coverage[toml]==7.4.1
# via
# coverage
# pytest-cov
# via pytest-cov
cryptography==41.0.7
# via
# azure-identity
Expand Down Expand Up @@ -177,7 +175,7 @@ executing==2.0.1
# via stack-data
fastapi==0.109.2
# via feast (setup.py)
fastavro==1.9.3
fastavro==1.9.4
# via
# feast (setup.py)
# pandavro
Expand All @@ -195,15 +193,15 @@ flake8==6.0.0
# via feast (setup.py)
fqdn==1.5.1
# via jsonschema
fsspec==2023.9.2
fsspec==2023.12.2
# via
# dask
# feast (setup.py)
geojson==2.5.0
# via rockset
geomet==0.2.1.post1
# via cassandra-driver
google-api-core[grpc]==2.17.0
google-api-core[grpc]==2.17.1
# via
# feast (setup.py)
# firebase-admin
Expand All @@ -215,7 +213,7 @@ google-api-core[grpc]==2.17.0
# google-cloud-datastore
# google-cloud-firestore
# google-cloud-storage
google-api-python-client==2.117.0
google-api-python-client==2.118.0
# via firebase-admin
google-auth==2.27.0
# via
Expand All @@ -233,6 +231,8 @@ google-cloud-bigquery[pandas]==3.12.0
# google-cloud-bigquery
google-cloud-bigquery-storage==2.24.0
# via feast (setup.py)
google-cloud-bigquery-storage==2.24.0
# via feast (setup.py)
google-cloud-bigtable==2.23.0
# via feast (setup.py)
google-cloud-core==2.4.1
Expand Down Expand Up @@ -304,7 +304,7 @@ hazelcast-python-client==5.3.0
# via feast (setup.py)
hiredis==2.3.2
# via feast (setup.py)
httpcore==1.0.2
httpcore==1.0.3
# via httpx
httplib2==0.22.0
# via
Expand All @@ -313,7 +313,9 @@ httplib2==0.22.0
httptools==0.6.1
# via uvicorn
httpx==0.26.0
# via feast (setup.py)
# via
# feast (setup.py)
# jupyterlab
identify==2.5.34
# via pre-commit
idna==3.6
Expand Down Expand Up @@ -410,11 +412,11 @@ jupyter-server==2.12.5
# notebook-shim
jupyter-server-terminals==0.5.2
# via jupyter-server
jupyterlab==4.0.12
jupyterlab==4.1.1
# via notebook
jupyterlab-pygments==0.3.0
# via nbconvert
jupyterlab-server==2.25.2
jupyterlab-server==2.25.3
# via
# jupyterlab
# notebook
Expand Down Expand Up @@ -487,7 +489,7 @@ nest-asyncio==1.6.0
# via ipykernel
nodeenv==1.8.0
# via pre-commit
notebook==7.0.8
notebook==7.1.0
# via great-expectations
notebook-shim==0.2.3
# via
Expand Down Expand Up @@ -566,7 +568,7 @@ portalocker==2.8.2
# via msal-extensions
pre-commit==3.3.1
# via feast (setup.py)
prometheus-client==0.19.0
prometheus-client==0.20.0
# via jupyter-server
prompt-toolkit==3.0.43
# via ipython
Expand Down Expand Up @@ -757,7 +759,7 @@ requests==2.31.0
# trino
requests-oauthlib==1.3.1
# via kubernetes
responses==0.24.1
responses==0.25.0
# via moto
rfc3339-validator==0.1.4
# via
Expand All @@ -769,7 +771,7 @@ rfc3986-validator==0.1.1
# jupyter-events
rockset==2.1.0
# via feast (setup.py)
rpds-py==0.17.1
rpds-py==0.18.0
# via
# jsonschema
# referencing
Expand Down Expand Up @@ -804,9 +806,7 @@ sniffio==1.3.0
snowballstemmer==2.2.0
# via sphinx
snowflake-connector-python[pandas]==3.7.0
# via
# feast (setup.py)
# snowflake-connector-python
# via feast (setup.py)
sortedcontainers==2.4.0
# via snowflake-connector-python
soupsieve==2.5
Expand All @@ -826,9 +826,7 @@ sphinxcontrib-qthelp==1.0.7
sphinxcontrib-serializinghtml==1.1.10
# via sphinx
sqlalchemy[mypy]==1.4.51
# via
# feast (setup.py)
# sqlalchemy
# via feast (setup.py)
sqlalchemy2-stubs==0.0.2a38
# via sqlalchemy
stack-data==0.6.3
Expand Down Expand Up @@ -959,9 +957,7 @@ urllib3==1.26.18
# responses
# rockset
uvicorn[standard]==0.27.1
# via
# feast (setup.py)
# uvicorn
# via feast (setup.py)
uvloop==0.19.0
# via uvicorn
virtualenv==20.23.0
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/requirements/py3.10-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ exceptiongroup==1.2.0
# via anyio
fastapi==0.109.2
# via feast (setup.py)
fastavro==1.9.3
fastavro==1.9.4
# via
# feast (setup.py)
# pandavro
Expand Down Expand Up @@ -74,7 +74,7 @@ h11==0.14.0
# via
# httpcore
# uvicorn
httpcore==1.0.2
httpcore==1.0.3
# via httpx
httptools==0.6.1
# via uvicorn
Expand Down Expand Up @@ -166,7 +166,7 @@ referencing==0.33.0
# jsonschema-specifications
requests==2.31.0
# via feast (setup.py)
rpds-py==0.17.1
rpds-py==0.18.0
# via
# jsonschema
# referencing
Expand Down
Loading

0 comments on commit e90b90e

Please sign in to comment.