forked from w6113/databass-public
-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.py
110 lines (87 loc) · 2.5 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from .util import guess_type
from .schema import Schema
from .tables import *
import pandas
import numbers
import os
openfile = open
def infer_schema_from_df(df):
from .exprs import guess_type, Attr
attrs = list(df.columns)
schema = Schema([])
row = None
if df.shape[0]:
row = df.iloc[0].to_dict()
for attr in attrs:
typ = "str"
if row:
typ = guess_type(row[attr])
schema.attrs.append(Attr(attr, typ))
return schema
class Database(object):
_db = None
"""
Manages all tables registered in the database
"""
def __init__(self):
self.registry = {}
self.id2table = {}
self._df_registry = {}
self.function_registry = {}
self.table_function_registry = {}
self.setup()
@staticmethod
def db():
if not Database._db:
Database._db = Database()
return Database._db
def setup(self):
"""
Walks all CSV files in the current directory and registers
them in the database
"""
for root, dirs, files in os.walk("."):
for fname in files:
if fname.lower().endswith(".csv"):
self.register_file_by_path(os.path.join(root, fname))
def register_file_by_path(self, path):
root, fname = os.path.split(path)
tablename, _ = os.path.splitext(fname)
fpath = os.path.join(root, fname)
loaded = False
exception = None
for sep in [',', '|', '\t']:
df = None
try:
with openfile(fpath) as f:
df = pandas.read_csv(f, sep=sep)
except Exception as e:
exception = e
if df is not None:
self.register_dataframe(tablename, df)
loaded = True
break
if not loaded:
print("Failed to read data file %s" % (fpath))
print(exception)
def register_table(self, tablename, schema, table):
self.registry[tablename] = table
self.id2table[table.id] = table
def register_dataframe(self, tablename, df):
self._df_registry[tablename] = df
schema = infer_schema_from_df(df)
rows = list(df.T.to_dict().values())
rows = [[row[attr.aname] for attr in schema] for row in rows]
table = InMemoryTable(schema, rows)
self.register_table(tablename, schema, table)
@property
def tablenames(self):
return list(self.registry.keys())
def schema(self, tablename):
return self[tablename].schema
def table_by_id(self, id):
return self.id2table.get(id, None)
def __contains__(self, tablename):
return tablename in self.registry
def __getitem__(self, tablename):
return self.registry.get(tablename, None)