Skip to content

Commit

Permalink
Merge pull request #87 from Keviinplz/feature/df-overall-columns-sema…
Browse files Browse the repository at this point in the history
…ntics

[INRIA Internship] Dataframe Columns Usage
  • Loading branch information
caterinaurban authored Oct 18, 2023
2 parents 802a148 + e9891a5 commit c19ebbf
Show file tree
Hide file tree
Showing 17 changed files with 492 additions and 26 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,9 @@ ENV/

# generated docs
/docs/_build/

# examples
/src/lyra/tests/example.py

# mac
.DS_Store
103 changes: 91 additions & 12 deletions src/lyra/abstract_domains/usage/dataframe_usage_domain.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
from copy import deepcopy
from typing import Set, Union, List
from typing import Set, Union, Dict

from lyra.abstract_domains.lattice import BoundedLattice
from lyra.abstract_domains.stack import Stack
from lyra.core.expressions import walk, Input
from lyra.abstract_domains.state import State
from lyra.abstract_domains.usage.usage_lattice import UsageLattice
from lyra.core.expressions import Slicing, Expression, Subscription, VariableIdentifier, BinaryComparisonOperation, \
Literal, ListDisplay
from lyra.core.types import DataFrameLyraType

ColumnName = Union[str, None]


def _get_columns(df: VariableIdentifier, expr: Expression):
columns = set()

for e in walk(expr):
if isinstance(e, Subscription) and isinstance(e.typ, DataFrameLyraType):
if not e.target == df:
continue
columns.add(e.key)

return columns


class DataFrameColumnUsageState(BoundedLattice, State):

def __init__(self, variables: Set[VariableIdentifier], precursory: State = None):
super().__init__() # BoundedLattice
State.__init__(self, precursory) # State
super().__init__() # BoundedLattice
State.__init__(self, precursory) # State
self._store = {v: {None: UsageLattice()} for v in variables}

@property
Expand All @@ -26,16 +39,40 @@ def __repr__(self):
def do(columns):
def name(column):
return str(column) if column else '_'
itms = sorted(columns.items(), key=lambda x: name(x[0]))
return "{" + ", ".join("{}: {}".format(name(column), usage) for column, usage in itms) + "}"

_items = sorted(columns.items(), key=lambda x: name(x[0]))
return "{" + ", ".join("{}: {}".format(name(column), usage) for column, usage in _items) + "}"

items = sorted(self.store.items(), key=lambda x: x[0].name)
return "; ".join("{} -> {}".format(variable, do(value)) for variable, value in items)

def _less_equal(self, other: 'DataFrameColumnUsageState') -> bool:
raise NotImplementedError('_less_equal in DataFrameColumnUsageState is not yet implemented!')

@staticmethod
def _merge_var_stores(s1: Dict[ColumnName, UsageLattice], s2: Dict[ColumnName, UsageLattice]) -> dict:
result = {}
for column in s1:
if s2.get(column) is None:
result[column] = s1[column]
continue

lat1 = s1[column]
lat2 = s2[column]
result[column] = lat1.join(lat2)

result.update({key: value for key, value in s2.items() if key not in s1.keys()})
return result

def _join(self, other: 'DataFrameColumnUsageState') -> 'DataFrameColumnUsageState':
raise NotImplementedError('_join in DataFrameColumnUsageState is not yet implemented!')
for var in other.store:
usage = self.store.get(var, None)
if not usage:
self.store[var] = other.store[var]
continue
self.store[var] = self._merge_var_stores(self.store[var], other.store[var])

return self

def _meet(self, other: 'DataFrameColumnUsageState') -> 'DataFrameColumnUsageState':
raise NotImplementedError('_meet in DataFrameColumnUsageState is not yet implemented!')
Expand Down Expand Up @@ -105,19 +142,61 @@ def forget_variable(self, variable: VariableIdentifier) -> 'DataFrameColumnUsage

def _output(self, output: Expression) -> 'DataFrameColumnUsageState':
if isinstance(output, VariableIdentifier):
# Removing written states
self.store[output] = {k: v for k, v in self.store[output].items() if v != UsageLattice().written()}
self.store[output] = {col: UsageLattice().top() for col in self.store[output].keys()}
elif isinstance(output, Subscription):
analysis = self.store.get(output.target, {None: UsageLattice()})
analysis[output.key] = UsageLattice().top()
self.store[output.target] = analysis
return self

def _substitute_variable(self, left: VariableIdentifier, right: Expression) -> 'DataFrameColumnUsageState':
used = any(usage.is_top() for usage in self.store[left].values())
scoped = any(usage.is_scoped() for usage in self.store[left].values())
if used or scoped:
# the assigned variable is used or scoped
# Ignore variable if the substitution is not a DataFrame
if isinstance(right, Input):
self.store[left] = {None: UsageLattice().written()}
return self

if not isinstance(right.typ, DataFrameLyraType):
return self

if isinstance(right, Subscription):
for idn in right.ids():
if right.key in self.store[idn].keys():
continue

state = self.store[left].get(right.key, self.store[left][None])
self.store[idn][right.key] = UsageLattice() if state == UsageLattice().written() else state

self.store[left] = {None: UsageLattice().written()}
return self

if isinstance(right, VariableIdentifier):
# Because we know the columns that are on the right side, we keep it on the left side
# Ignoring written columns (because we don't know if there are created or modified columns)

self.store[left].update({
col: usage for col, usage in self.store[right].items()
if (col is not None) or (usage != UsageLattice().written())
})
self.store[left][None] = UsageLattice().written()
return self

# W state in other case
self.store[left] = {None: UsageLattice().written()}
return self

def _substitute_subscription(self, left: Subscription, right: Expression) -> 'DataFrameColumnUsageState':
raise NotImplementedError('_substitute_subscription in DataFrameColumnUsageState is not yet implemented!')
self.store[left.target][left.key] = UsageLattice().written()

for _id in right.ids():
if not isinstance(_id.typ, DataFrameLyraType):
continue

columns = _get_columns(_id, right)
self.store[_id].update({column: UsageLattice().top() for column in columns})

return self

def _substitute_slicing(self, left: Slicing, right: Expression) -> 'DataFrameColumnUsageState':
raise NotImplementedError('_substitute_slicing in DataFrameColumnUsageState is not yet implemented!')
Expand Down
4 changes: 2 additions & 2 deletions src/lyra/core/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def ids(self) -> Set['VariableIdentifier']:
:return: set of identifiers that appear in the expression
"""
ids = set()
for expr in _walk(self):
for expr in walk(self):
if isinstance(expr, VariableIdentifier):
ids.add(expr)
return ids
Expand All @@ -93,7 +93,7 @@ def _iter_child_exprs(expr: Expression):
yield item


def _walk(expr: Expression):
def walk(expr: Expression):
"""
Recursively yield all expressions in an expression tree
starting at ``expr`` (including ``expr`` itself),
Expand Down
132 changes: 125 additions & 7 deletions src/lyra/semantics/dataframe_usage_semantics.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,141 @@
import itertools

from lyra.abstract_domains.state import State
from lyra.abstract_domains.usage.dataframe_usage_domain import DataFrameColumnUsageState
from lyra.core.statements import Call
from lyra.core.expressions import Subscription, Literal, VariableIdentifier, ListDisplay, BinaryArithmeticOperation, Input
from lyra.core.statements import Call, SubscriptionAccess, SlicingAccess, VariableAccess
from lyra.core.types import (
StringLyraType,
ListLyraType,
SetLyraType,
DictLyraType,
TupleLyraType,
DataFrameLyraType,
)
from lyra.engine.interpreter import Interpreter
from lyra.semantics.backward import DefaultPandasBackwardSemantics


class DataFrameColumnUsageSemantics(DefaultPandasBackwardSemantics):
"""Backward semantics of statements with support for Pandas library calls for dataframe column usage analysis."""

def drop_call_semantics(self, stmt: Call, state: DataFrameColumnUsageState, interpreter: Interpreter) -> DataFrameColumnUsageState:
def _summarized_view(
self, stmt: Call, state: DataFrameColumnUsageState, interpreter: Interpreter
) -> DataFrameColumnUsageState:
dfs = self.semantics(stmt.arguments[0], state, interpreter).result
return state.output(dfs)

def subscription_access_semantics(
self,
stmt: SubscriptionAccess,
state: DataFrameColumnUsageState,
interpreter: Interpreter,
) -> DataFrameColumnUsageState:
target = self.semantics(stmt.target, state, interpreter).result
key = self.semantics(stmt.key, state, interpreter).result
result = set()
for primary, index in itertools.product(target, key):
if not isinstance(primary.typ, DataFrameLyraType):
error = (
f"Semantics for subscription of {primary} is not yet implemented!"
)
raise NotImplementedError(error)
if isinstance(index, ListDisplay):
for idx in index.items:
subscription = Subscription(primary.typ, primary, idx)
result.add(subscription)
elif isinstance(index, (Literal, VariableIdentifier)):
subscription = Subscription(primary.typ, primary, index)
result.add(subscription)
else:
error = f"Semantics for subscription of {primary} and {index} is not yet implemented!"
raise NotImplementedError(error)

state.result = result
return state

def drop_call_semantics(
self, stmt: Call, state: DataFrameColumnUsageState, interpreter: Interpreter
) -> DataFrameColumnUsageState:
dataframes = self.semantics(stmt.arguments[0], state, interpreter).result
columns = self.semantics(stmt.arguments[1], state, interpreter).result
return state.drop_dataframe_column(dataframes, columns)

def head_call_semantics(self, stmt: Call, state: DataFrameColumnUsageState, interpreter: Interpreter) -> DataFrameColumnUsageState:
dataframes = self.semantics(stmt.arguments[0], state, interpreter).result
return state.output(dataframes)
def head_call_semantics(
self, stmt: Call, state: DataFrameColumnUsageState, interpreter: Interpreter
) -> DataFrameColumnUsageState:
return self._summarized_view(stmt, state, interpreter)

def hist_call_semantics(
self, stmt: Call, state: DataFrameColumnUsageState, interpreter: Interpreter
) -> DataFrameColumnUsageState:
return self._summarized_view(stmt, state, interpreter)

def tail_call_semantics(
self, stmt: Call, state: DataFrameColumnUsageState, interpreter: Interpreter
) -> DataFrameColumnUsageState:
return self._summarized_view(stmt, state, interpreter)

def describe_call_semantics(
self, stmt: Call, state: DataFrameColumnUsageState, interpreter: Interpreter
) -> DataFrameColumnUsageState:
return self._summarized_view(stmt, state, interpreter)

def info_call_semantics(
self, stmt: Call, state: DataFrameColumnUsageState, interpreter: Interpreter
) -> DataFrameColumnUsageState:
return self._summarized_view(stmt, state, interpreter)

def min_call_semantics(
self, stmt: Call, state: DataFrameColumnUsageState, interpreter: Interpreter
) -> DataFrameColumnUsageState:
dfs = self.semantics(stmt.arguments[0], state, interpreter).result
state.result = {df for df in dfs}
return state

def max_call_semantics(
self, stmt: Call, state: DataFrameColumnUsageState, interpreter: Interpreter
) -> DataFrameColumnUsageState:
dfs = self.semantics(stmt.arguments[0], state, interpreter).result
state.result = {df for df in dfs}
return state

def median_call_semantics(
self, stmt: Call, state: DataFrameColumnUsageState, interpreter: Interpreter
) -> DataFrameColumnUsageState:
dfs = self.semantics(stmt.arguments[0], state, interpreter).result
state.result = {df for df in dfs}
return state

def fillna_call_semantics(
self, stmt: Call, state: DataFrameColumnUsageState, interpreter: Interpreter
) -> DataFrameColumnUsageState:
dfs = self.semantics(stmt.arguments[0], state, interpreter).result
state.result = {df for df in dfs}
return state

def replace_call_semantics(self, stmt: Call, state: DataFrameColumnUsageState,
interpreter: Interpreter) -> DataFrameColumnUsageState:
dfs = self.semantics(stmt.arguments[0], state, interpreter).result
state.result = {df for df in dfs}
return state

def read_csv_call_semantics(self, stmt: Call, state: DataFrameColumnUsageState, interpreter: Interpreter) -> DataFrameColumnUsageState:
return state # TODO
def concat_call_semantics(self, stmt: Call, state: DataFrameColumnUsageState,
interpreter: Interpreter) -> DataFrameColumnUsageState:
# Concat always recieves a sequence (or mapping) of dfs
lists_dfs = self.semantics(stmt.arguments[1], state, interpreter).result
result = set()
for lists in lists_dfs:
if not isinstance(lists, ListDisplay):
error = f"Semantics for subscription of {list} is not yet implemented!"
raise NotImplementedError(error)

result.update(lists.items)
state.result = result
return state

def read_csv_call_semantics(
self, stmt: Call, state: DataFrameColumnUsageState, interpreter: Interpreter
) -> DataFrameColumnUsageState:
state.result = {Input(typ=StringLyraType())}
return state
5 changes: 0 additions & 5 deletions src/lyra/tests/example.py

This file was deleted.

48 changes: 48 additions & 0 deletions src/lyra/unittests/dataframe_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""
Data Usage Analysis Dataframes - Unit Tests
======================================
:Authors: Caterina Urban and Kevin Pinochet
"""


import glob
import os
import unittest

import sys

from lyra.abstract_domains.usage.dataframe_usage_domain import DataFrameColumnUsageState
from lyra.engine.backward import BackwardInterpreter
from lyra.semantics.dataframe_usage_semantics import DataFrameColumnUsageSemantics
from lyra.unittests.runner import TestRunner


class UsageTest(TestRunner):

def interpreter(self):
return BackwardInterpreter(self.cfgs, self.fargs, DataFrameColumnUsageSemantics(), 3)

def state(self):
return DataFrameColumnUsageState(self.variables)

def test_suite():
suite = unittest.TestSuite()
name = os.getcwd() + '/usage/dataframes/**.py'
for path in glob.iglob(name):
if os.path.basename(path) != "__init__.py":
print(os.path.basename(path))
suite.addTest(UsageTest(path))
# name = os.getcwd() + '/usage/fulara/**.py'
# for path in glob.iglob(name):
# if os.path.basename(path) != "__init__.py":
# print('fulara/' + os.path.basename(path))
# suite.addTest(FularaIntervalUsageTest(path))
return suite


if __name__ == '__main__':
runner = unittest.TextTestRunner()
result = runner.run(test_suite())
if not result.wasSuccessful():
sys.exit(1)
Empty file.
12 changes: 12 additions & 0 deletions src/lyra/unittests/usage/dataframes/handcraft_example_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import pandas as pd

# INITIAL: df -> {_: W}
df: pd.DataFrame = pd.read_csv("...")

# STATE: df -> {"id": N, "t": U, _: N}
df.drop(['id'], axis=1, inplace=True)

# STATE: df -> {"t": U, _: N}
df["t"].head()

# FINAL: df -> {_: N}
Loading

0 comments on commit c19ebbf

Please sign in to comment.