Skip to content

Commit

Permalink
Fixes for schema caching (#260)
Browse files Browse the repository at this point in the history
* Fixes for schema caching

* Handle errors caching schema to disk

* Fix flake
  • Loading branch information
philippjfr authored Mar 25, 2022
1 parent ffaeae0 commit 1f45403
Showing 1 changed file with 42 additions and 13 deletions.
55 changes: 42 additions & 13 deletions lumen/sources/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import hashlib
import json
import os
import pathlib
import re
import shutil
import sys
Expand Down Expand Up @@ -188,22 +189,42 @@ def _get_key(self, table, **query):
return key

def _get_schema_cache(self):
if self._schema_cache:
return self._schema_cache
elif self.cache_dir:
schema = self._schema_cache if self._schema_cache else None
if self.cache_dir:
path = os.path.join(self.root, self.cache_dir, f'{self.name}.json')
if os.path.isfile(path):
with open(path) as f:
return json.load(f)
return None
if not os.path.isfile(path):
return schema
with open(path) as f:
json_schema = json.load(f)
if schema is None:
schema = {}
for table, tschema in json_schema.items():
if table in schema:
continue
for col, cschema in tschema.items():
if cschema.get('type') == 'string' and cschema.get('format') == 'datetime':
cschema['inclusiveMinimum'] = pd.to_datetime(
cschema['inclusiveMinimum']
)
cschema['inclusiveMaximum'] = pd.to_datetime(
cschema['inclusiveMaximum']
)
schema[table] = tschema
return schema

def _set_schema_cache(self, schema):
self._schema_cache = schema
if self.cache_dir:
path = Path(os.path.join(self.root, self.cache_dir))
path.mkdir(parents=True, exist_ok=True)
with open(path / f'{self.name}.json', 'w') as f:
json.dump(schema, f)
try:
with open(path / f'{self.name}.json', 'w') as f:
json.dump(schema, f, default=str)
except Exception as e:
self.param.warning(
f"Could not cache schema to disk. Error while "
f"serializing schema to disk: {e}"
)

def _get_cache(self, table, **query):
query.pop('__dask', None)
Expand All @@ -218,7 +239,7 @@ def _get_cache(self, table, **query):
filename = f'{table}.parq'
path = os.path.join(self.root, self.cache_dir, filename)
if os.path.isfile(path) or os.path.isdir(path):
if 'dask.dataframe' in sys.modules:
if 'dask.dataframe' in sys.modules or os.path.isdir(path):
import dask.dataframe as dd
return dd.read_parquet(path), not bool(query)
return pd.read_parquet(path), not bool(query)
Expand All @@ -236,11 +257,19 @@ def _set_cache(self, data, table, write_to_file=True, **query):
filename = f'{sha}_{table}.parq'
else:
filename = f'{table}.parq'
filepath = os.path.join(path, filename)
try:
data.to_parquet(os.path.join(path, filename))
data.to_parquet(filepath)
except Exception as e:
self.param.warning(f"Could not cache '{table}' to parquet"
f"file. Error during saving process: {e}")
path = pathlib.Path(filepath)
if path.is_file():
path.unlink()
elif path.is_dir():
shutil.rmtree(path)
self.param.warning(
f"Could not cache '{table}' to parquet file. "
f"Error during saving process: {e}"
)

def clear_cache(self):
"""
Expand Down

0 comments on commit 1f45403

Please sign in to comment.