Skip to content

Commit

Permalink
Gromet Wiring Detector PR (#732)
Browse files Browse the repository at this point in the history
This PR adds a new script
`skema/program_analysis/gromet_wire_diagnosis.py` that can be used to do
some simple analysis and error detecting in the wires of GroMEt FNs. It
currently checks the ports of all types of wires, and detects whether
the ports are out of bounds (in either negative or positive indices)
within their respective port tables. It also attempts to find the most
relevant SourceCodeReference metadata that is associated with the wires
and displays the line number information contained within it.

## Summary of Changes
- Adds `skema/program_analysis/gromet_wire_diagnosis.py` script
- Modifies `skema/program_analysis/JSON2GroMEt/json2gromet.py` script so
that it can ingest newer GroMEt JSON that uses the updated Gromet
metadata fields.
- Fixes a small issue with the incorrect SourceCodeReference metadata
type being used in
`skema/program_analysis/CAST2FN/ann_cast/to_gromet_pass.py`
- Adds a test script
`skema/program_analysis/tests/test_wiring_diagnosis.py` that can test
the consistency of the individual wire checker utility without needing a
GroMEt JSON.

### Potential Next steps
- Need to determine more things that can be easily analyzed in a GroMEt
JSON
- Come up with a more robust way of determining what line numbers go
with the wires.
- Determine what to do with the Metadata fields that don't have an
"is_metadatum" field attached to them. (NOTE: A solution to this has
been currently proposed.)

Resolves #697

---------

Co-authored-by: Vincent Raymond <[email protected]>
  • Loading branch information
titomeister and vincentraymond-ua authored Dec 22, 2023
1 parent 3e38c48 commit 97f7e50
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 8 deletions.
4 changes: 2 additions & 2 deletions skema/gromet/fn/gromet_fn_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def fn_array(self, fn_array):
def metadata_collection(self):
"""Gets the metadata_collection of this GrometFNModule. # noqa: E501
Table (array) of lists (arrays) of metadata, where each list in the Table-array represents the collection of metadata associated with a GroMEt object. # noqa: E501
Table (array) of lists (arrays) of metadata, where each list in the Table-array represents the collection of metadata associated with a GrometFNModule object. # noqa: E501
:return: The metadata_collection of this GrometFNModule. # noqa: E501
:rtype: list[list[Metadata]]
Expand All @@ -202,7 +202,7 @@ def metadata_collection(self):
def metadata_collection(self, metadata_collection):
"""Sets the metadata_collection of this GrometFNModule.
Table (array) of lists (arrays) of metadata, where each list in the Table-array represents the collection of metadata associated with a GroMEt object. # noqa: E501
Table (array) of lists (arrays) of metadata, where each list in the Table-array represents the collection of metadata associated with a GrometFNModule object. # noqa: E501
:param metadata_collection: The metadata_collection of this GrometFNModule. # noqa: E501
:type: list[list[Metadata]]
Expand Down
1 change: 0 additions & 1 deletion skema/program_analysis/CAST2FN/ann_cast/to_gromet_pass.py
Original file line number Diff line number Diff line change
Expand Up @@ -3239,7 +3239,6 @@ def visit_literal_value(
)

code_data_metadata = SourceCodeDataType(
gromet_type="source_code_data_type",
provenance=generate_provenance(),
source_language=ref[0],
source_language_version=ref[1],
Expand Down
12 changes: 7 additions & 5 deletions skema/program_analysis/JSON2GroMEt/json2gromet.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ def json_to_gromet(path: str) -> GrometFNModuleCollection:
sys.modules["skema.gromet.metadata"], inspect.isclass
):
instance = metadata_object()
if "metadata_type" in instance.attribute_map:
gromet_metadata_map[instance.metadata_type] = metadata_object
if "is_metadatum" in instance.attribute_map and instance.is_metadatum:
gromet_metadata_map[metadata_name] = metadata_object
else:
gromet_fn_map[metadata_name] = metadata_object

def get_obj_type(obj: Dict) -> Any:
"""Given a dictionary representing a Gromet object (i.e. BoxFunction), return an instance of that object.
Expand All @@ -42,10 +44,10 @@ def get_obj_type(obj: Dict) -> Any:

# First check if we already have a mapping to a data-class memeber. All Gromet FN and most Gromet Metadata classes will fall into this category.
# There are a few Gromet Metadata fields such as Provenance that do not have a "metadata_type" field
if "gromet_type" in obj:
if "gromet_type" in obj and ("is_metadatum" not in obj or obj["is_metadatum"] != True):
return gromet_fn_map[obj["gromet_type"]]()
elif "metadata_type" in obj:
return gromet_metadata_map[obj["metadata_type"]]()
elif obj["is_metadatum"]:
return gromet_metadata_map[obj["gromet_type"]]()

# If there is not a mapping to an object, we will check the fields to see if they match an existing class in the data-model.
# For example: (id, box, metadata) would map to GrometPort
Expand Down
208 changes: 208 additions & 0 deletions skema/program_analysis/gromet_wire_diagnosis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import argparse
from skema.program_analysis.JSON2GroMEt import json2gromet
from skema.gromet.metadata import SourceCodeReference

# Ways to expand
# Check loop, condition FN indices
# Check bf call FN indices
# Boxes associated with ports

def disp_wire(wire):
return f"src:{wire.src}<-->tgt:{wire.tgt}"

def get_length(gromet_item):
# For any gromet object we can generically retrieve the length, since they all exist
# in lists
return len(gromet_item) if gromet_item != None else 0

def check_wire(gromet_wire, src_port_count, tgt_port_count, wire_type = "", metadata=None):
# The current wiring checks are
# Checking if the ports on both ends of the wire are below or over the bounds
error_detected = False
if gromet_wire.src < 0:
error_detected = True
print(f"Gromet Wire {wire_type} {disp_wire(gromet_wire)} has negative src port.")
if gromet_wire.src == 0:
error_detected = True
print(f"Gromet Wire {wire_type} {disp_wire(gromet_wire)} has zero src port.")
if gromet_wire.src > src_port_count:
error_detected = True
print(f"Gromet Wire {wire_type} {disp_wire(gromet_wire)} has a src port that goes over the boundary of {src_port_count} src ports.")

if gromet_wire.tgt < 0:
error_detected = True
print(f"Gromet Wire {wire_type} {disp_wire(gromet_wire)} has negative tgt port.")
if gromet_wire.tgt == 0:
error_detected = True
print(f"Gromet Wire {wire_type} {disp_wire(gromet_wire)} has zero tgt port.")
if gromet_wire.tgt > tgt_port_count:
error_detected = True
print(f"Gromet Wire {wire_type} {disp_wire(gromet_wire)} has a tgt port that goes over the boundary of {tgt_port_count} tgt ports.")


if error_detected:
if metadata == None:
print("No line number information exists for this particular wire!")
else:
print(f"Wire is associated with source code lines start:{metadata.line_begin} end:{metadata.line_end}")
print()

return error_detected

def find_metadata_idx(gromet_fn):
"""
Attempts to find a metadata associated with this fn
If it finds something, return it, otherwise return None
"""
if gromet_fn.b != None:
for b in gromet_fn.b:
if b.metadata != None:
return b.metadata

if gromet_fn.bf != None:
for bf in gromet_fn.bf:
if bf.metadata != None:
return bf.metadata

return None

def analyze_fn_wiring(gromet_fn, metadata_collection):
# Acquire information for all the ports, if they exist
pif_length = get_length(gromet_fn.pif)
pof_length = get_length(gromet_fn.pof)
opi_length = get_length(gromet_fn.opi)
opo_length = get_length(gromet_fn.opo)
pil_length = get_length(gromet_fn.pil)
pol_length = get_length(gromet_fn.pol)
pic_length = get_length(gromet_fn.pic)
poc_length = get_length(gromet_fn.poc)

# Find a SourceCodeReference metadata that we can extract line number information for
# so we can display some line number information about potential errors in the wiring
# NOTE: Can we make this extraction more accurate?
metadata_idx = find_metadata_idx(gromet_fn)
metadata = None
if metadata_idx != None:
for md in metadata_collection[metadata_idx - 1]:
if isinstance(md, SourceCodeReference):
metadata = md

wopio_length = get_length(gromet_fn.wopio)
if wopio_length > 0:
for wire in gromet_fn.wff:
check_wire(wire, opo_length, opi_length, "wff", metadata)

######################## loop (bl) wiring

wlopi_length = get_length(gromet_fn.wlopi)
if wlopi_length > 0:
for wire in gromet_fn.wlopi:
check_wire(wire, pil_length, opi_length, "wlopi", metadata)

wll_length = get_length(gromet_fn.wll)
if wll_length > 0:
for wire in gromet_fn.wll:
check_wire(wire, pil_length, pol_length, "wll", metadata)

wlf_length = get_length(gromet_fn.wlf)
if wlf_length > 0:
for wire in gromet_fn.wlf:
check_wire(wire, pif_length, pol_length, "wlf", metadata)

wlc_length = get_length(gromet_fn.wlc)
if wlc_length > 0:
for wire in gromet_fn.wlc:
check_wire(wire, pic_length, pol_length, "wlc", metadata)

wlopo_length = get_length(gromet_fn.wlopo)
if wlopo_length > 0:
for wire in gromet_fn.wlopo:
check_wire(wire, opo_length, pol_length, "wlopo", metadata)

######################## function (bf) wiring
wfopi_length = get_length(gromet_fn.wfopi)
if wfopi_length > 0:
for wire in gromet_fn.wfopi:
check_wire(wire, pif_length, opi_length, "wfopi", metadata)

wfl_length = get_length(gromet_fn.wfl)
if wfl_length > 0:
for wire in gromet_fn.wfl:
check_wire(wire, pil_length, pof_length, "wfl", metadata)

wff_length = get_length(gromet_fn.wff)
if wff_length > 0:
for wire in gromet_fn.wff:
check_wire(wire, pif_length, pof_length, "wff", metadata)

wfc_length = get_length(gromet_fn.wfc)
if wfc_length > 0:
for wire in gromet_fn.wfc:
check_wire(wire, pic_length, pof_length, "wfc", metadata)

wfopo_length = get_length(gromet_fn.wfopo)
if wfopo_length > 0:
for wire in gromet_fn.wfopo:
check_wire(wire, opo_length, pof_length, "wfopo", metadata)

######################## condition (bc) wiring
wcopi_length = get_length(gromet_fn.wcopi)
if wcopi_length > 0:
for wire in gromet_fn.wcopi:
check_wire(wire, pic_length, opi_length, "wcopi", metadata)

wcl_length = get_length(gromet_fn.wcl)
if wcl_length > 0:
for wire in gromet_fn.wcl:
check_wire(wire, pil_length, poc_length, "wcl", metadata)

wcf_length = get_length(gromet_fn.wcf)
if wcf_length > 0:
for wire in gromet_fn.wcf:
check_wire(wire, pif_length, poc_length, "wcf", metadata)

wcc_length = get_length(gromet_fn.wcc)
if wcc_length > 0:
for wire in gromet_fn.wcc:
check_wire(wire, pic_length, poc_length, "wcc", metadata)

wcopo_length = get_length(gromet_fn.wcopo)
if wcopo_length > 0:
for wire in gromet_fn.wcopo:
check_wire(wire, opo_length, poc_length, "wcopo", metadata)


def wiring_analyzer(gromet_obj):
# TODO: Multifiles

for module in gromet_obj.modules:
# first_module = gromet_obj.modules[0]
metadata = []
# Analyze base FN
print(f"Analyzing {module.name}")
analyze_fn_wiring(module.fn, module.metadata_collection)

# Analyze the rest of the FN_array
for fn in module.fn_array:
analyze_fn_wiring(fn, module.metadata_collection)

def get_args():
parser = argparse.ArgumentParser(
"Attempts to analyize GroMEt JSON for issues"
)
parser.add_argument(
"gromet_file_path",
help="input GroMEt JSON file"
)

options = parser.parse_args()
return options

if __name__ == "__main__":
args = get_args()
gromet_obj = json2gromet.json_to_gromet(args.gromet_file_path)

wiring_analyzer(gromet_obj)



28 changes: 28 additions & 0 deletions skema/program_analysis/tests/test_wiring_diagnosis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from skema.program_analysis.gromet_wire_diagnosis import check_wire
from skema.gromet.fn import GrometWire


def test_correct_wire():
correct_wire = GrometWire(src=1, tgt=1)
result = check_wire(correct_wire, 1, 1, "wff")
assert not result

correct_wire = GrometWire(src=3, tgt=4)
result = check_wire(correct_wire, 4, 5, "wlc")
assert not result

correct_wire = GrometWire(src=2, tgt=1)
result = check_wire(correct_wire, 2, 1, "wff")

def test_wrong_wire():
wrong_wire = GrometWire(src=0, tgt=-1)
result = check_wire(wrong_wire, 1, 1, "wff")
assert result

wrong_wire = GrometWire(src=20, tgt=2)
result = check_wire(wrong_wire, 19, 2, "wff")
assert result

wrong_wire = GrometWire(src=-1, tgt=2)
result = check_wire(wrong_wire, 1, 1, "wlc")
assert result

0 comments on commit 97f7e50

Please sign in to comment.