data_sources.csv_data module
+module that reads a csv file from disk forms the result into a data table based on a column mapping
+ + +# Copyright 2020 James P Goodwin data table package to manage sparse columnar data +""" module that reads a csv file from disk forms the result into a data table based on a column mapping """ +import locale +locale.setlocale(locale.LC_ALL,'') +import sys +import os +import glob +import gzip +import re +import keyring +from datetime import datetime,timedelta +from data_sources.data_table import DataTable,Column,Cell,blank_type,string_type,float_type,int_type,date_type,format_string,format_float,format_date,format_int,synchronized,from_csv + + +class CSVDataTable( DataTable ): + """ class that collects data from a CSV file on disk and extracts columns based on a column map of the form [[CSV_column_name, DataTable_column_name,DataTable_type (one of _string,_int,_float,_date )],...] """ + def __init__(self, refresh_minutes=1, csv_spec = None, csv_map= None, csv_name= None ): + """ Initialize the CSVDataTable object from the file named in csv_spec and extract the columns in the provided csv_map, name the table based on the name provided or extracted from the CSV """ + self.csv_spec = csv_spec + self.csv_map = csv_map + self.csv_name = csv_name + DataTable.__init__(self,None,(csv_name if csv_name else "CSVDataTable"),refresh_minutes) + self.refresh() + + @synchronized + def refresh( self ): + """ refresh the table by opening the csv file and loading it into a table """ + dt = from_csv(open(self.csv_spec,"r"),self.name,self.csv_map) + if dt: + rows,cols = dt.get_bounds() + for idx in range(cols): + self.replace_column(idx,dt.get_column(idx)) + + if dt.get_name(): + self.name = dt.get_name() + + self.changed() + DataTable.refresh(self) +
Module variables
+var blank_type
+ + +var date_type
+ + +var float_type
+ + +var int_type
+ + +var string_type
+ + +Classes
+ +class CSVDataTable
+ + +class that collects data from a CSV file on disk and extracts columns based on a column map of the form [[CSV_column_name, DataTable_column_name,DataTable_type (one of _string,_int,_float,_date )],...]
class CSVDataTable( DataTable ): + """ class that collects data from a CSV file on disk and extracts columns based on a column map of the form [[CSV_column_name, DataTable_column_name,DataTable_type (one of _string,_int,_float,_date )],...] """ + def __init__(self, refresh_minutes=1, csv_spec = None, csv_map= None, csv_name= None ): + """ Initialize the CSVDataTable object from the file named in csv_spec and extract the columns in the provided csv_map, name the table based on the name provided or extracted from the CSV """ + self.csv_spec = csv_spec + self.csv_map = csv_map + self.csv_name = csv_name + DataTable.__init__(self,None,(csv_name if csv_name else "CSVDataTable"),refresh_minutes) + self.refresh() + + @synchronized + def refresh( self ): + """ refresh the table by opening the csv file and loading it into a table """ + dt = from_csv(open(self.csv_spec,"r"),self.name,self.csv_map) + if dt: + rows,cols = dt.get_bounds() + for idx in range(cols): + self.replace_column(idx,dt.get_column(idx)) + + if dt.get_name(): + self.name = dt.get_name() + + self.changed() + DataTable.refresh(self) +
Ancestors (in MRO)
+-
+
- CSVDataTable +
- data_sources.data_table.DataTable +
- builtins.object +
Static methods
+ +def __init__(
self, refresh_minutes=1, csv_spec=None, csv_map=None, csv_name=None)
+Initialize the CSVDataTable object from the file named in csv_spec and extract the columns in the provided csv_map, name the table based on the name provided or extracted from the CSV
def __init__(self, refresh_minutes=1, csv_spec = None, csv_map= None, csv_name= None ): + """ Initialize the CSVDataTable object from the file named in csv_spec and extract the columns in the provided csv_map, name the table based on the name provided or extracted from the CSV """ + self.csv_spec = csv_spec + self.csv_map = csv_map + self.csv_name = csv_name + DataTable.__init__(self,None,(csv_name if csv_name else "CSVDataTable"),refresh_minutes) + self.refresh() +
def acquire_refresh_lock(
self)
+acquire the refresh lock before reading/writing the table state
def acquire_refresh_lock(self): + """ acquire the refresh lock before reading/writing the table state """ + self.refresh_lock.acquire() +
def add_column(
self, *args, **kwargs)
+@synchronized +def add_column(self,column): + idx = len(self.columns) + column.set_idx(idx) + if not column.get_name(): + column.set_name("%s_%d"%(self.name,idx)) + self.columns.append(column) + self.cnames[column.get_name()] = column + column.set_table(self) +
def changed(
self)
+notify listeners that this table has been changed
def changed(self): + """ notify listeners that this table has been changed """ + for f in self.listeners: + f(self) +
def get(
self, *args, **kwargs)
+@synchronized +def get(self, row, reference ): + return self.columns[self.map_column(reference)].get(row) +
def get_bounds(
self, *args, **kwargs)
+return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols
@synchronized +def get_bounds(self): + """ return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols """ + cols = len(self.columns) + rows = -1 + for c in self.columns: + size = c.size() + if rows < 0 or size > rows: + rows = size + return (rows,cols) +
def get_column(
self, *args, **kwargs)
+@synchronized +def get_column(self, reference): + return self.columns[self.map_column(reference)] +
def get_columns(
self, *args, **kwargs)
+return the list of columns
@synchronized +def get_columns(self): + """ return the list of columns """ + return self.columns +
def get_name(
self)
+return the name of the table
def get_name(self): + """ return the name of the table """ + return self.name +
def get_names(
self, *args, **kwargs)
+return a list of the names of the columns in order
@synchronized +def get_names(self): + """ return a list of the names of the columns in order""" + return [c.get_name() for c in self.columns] +
def get_refresh_timestamp(
self)
+get the time that the table was last refreshed
def get_refresh_timestamp( self ): + """ get the time that the table was last refreshed """ + return self.refresh_timestamp +
def has_column(
self, *args, **kwargs)
+@synchronized +def has_column(self, reference ): + if type(reference) == str or type(reference) == str: + return reference in self.cnames + elif type(reference) == int: + return idx < len(self.columns) + else: + return False +
def insert_column(
self, *args, **kwargs)
+@synchronized +def insert_column(self,idx,column): + while idx > len(self.columns): + self.add_column(blank_column) + if idx == len(self.columns): + self.add_column(column) + else: + if not column.get_name(): + column.set_name("%s_%d"%(self.name,idx)) + self.columns.insert(idx,column) + self.cnames[column.get_name()] = column + column.set_table(self) + while idx < len(self.columns): + if column.get_name() == "%s_%d"%(self.name,idx-1): + column.set_name("%s_%d"%(self.name,idx)) + self.cnames[column.get_name()] = column + self.columns[idx].set_idx(idx) + idx += 1 +
def listen(
self, listen_func)
+register for notifications when a change event is raised on this table
def listen(self,listen_func): + """ register for notifications when a change event is raised on this table """ + self.listeners.append(listen_func) +
def map_column(
self, *args, **kwargs)
+@synchronized +def map_column(self, reference ): + if type(reference) == str or type(reference) == str: + return self.cnames[reference].get_idx() + elif type(reference) == int: + return reference + else: + raise TypeError("wrong type in mapping") +
def perform_refresh(
self)
+Thread worker that sleeps and refreshes the data on a schedule
def perform_refresh( self ): + """ Thread worker that sleeps and refreshes the data on a schedule """ + start_time = time.time() + while not self.refresh_thread_stop: + if time.time() - start_time >= self.refresh_minutes*60.0: + self.refresh() + start_time = time.time() + time.sleep(1) +
def put(
self, *args, **kwargs)
+@synchronized +def put(self, row, reference, value): + self.columns[self.map_column(reference)].put(row,value) +
def refresh(
self, *args, **kwargs)
+refresh the table by opening the csv file and loading it into a table
@synchronized +def refresh( self ): + """ refresh the table by opening the csv file and loading it into a table """ + dt = from_csv(open(self.csv_spec,"r"),self.name,self.csv_map) + if dt: + rows,cols = dt.get_bounds() + for idx in range(cols): + self.replace_column(idx,dt.get_column(idx)) + if dt.get_name(): + self.name = dt.get_name() + self.changed() + DataTable.refresh(self) +
def release_refresh_lock(
self)
+release the refresh lock after reading/writing the table state
def release_refresh_lock(self): + """ release the refresh lock after reading/writing the table state """ + self.refresh_lock.release() +
def replace_column(
self, *args, **kwargs)
+@synchronized +def replace_column(self,idx,column): + column.set_idx(idx) + if not column.get_name(): + column.set_name("%s_%d"%(self.name,idx)) + if idx == len(self.columns): + self.columns.append(column) + else: + self.columns[idx] = column + self.cnames[column.get_name()] = column + column.set_table(self) +
def start_refresh(
self)
+Start the background refresh thread
def start_refresh( self ): + """ Start the background refresh thread """ + self.stop_refresh() + self.refresh_thread = threading.Thread(target=self.perform_refresh) + self.refresh_thread.start() +
def stop_refresh(
self)
+Stop the background refresh thread
def stop_refresh( self ): + """ Stop the background refresh thread """ + self.refresh_thread_stop = True + if self.refresh_thread and self.refresh_thread.is_alive(): + self.refresh_thread.join() + self.refresh_thread = None + self.refresh_thread_stop = False +
def unlisten(
self, listen_func)
+unregister for notifications when a change event is raised on this table
def unlisten(self,listen_func): + """ unregister for notifications when a change event is raised on this table """ + self.listeners.remove(listen_func) +
Instance variables
+var csv_map
+ + + + +var csv_name
+ + + + +var csv_spec
+ + + + +