Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#349: Generalize graph write queries #1038

Merged
merged 31 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
45ec55e
Build ingest query
achantavy Aug 15, 2022
a641ad0
Linter
achantavy Aug 15, 2022
5b3c83c
Save cleanup query for another PR
achantavy Aug 15, 2022
71edc8b
Implement schema
achantavy Nov 23, 2022
41c9a24
Merge latest master
achantavy Nov 23, 2022
519cce9
bump mypy to 0.981 for python/mypy#13398
achantavy Nov 23, 2022
b900146
linter
achantavy Nov 23, 2022
05973fb
make load_graph_data interface make more sense
achantavy Nov 23, 2022
b6f3faf
fix comment
achantavy Nov 23, 2022
aafa38d
Docs and some better names
achantavy Nov 24, 2022
1293b2e
add a todo
achantavy Nov 24, 2022
b810d75
Doc updates, rename some fields
achantavy Nov 28, 2022
be30b3a
Fix pre-commit
achantavy Nov 28, 2022
7458af7
Merge branch 'master' into generalizequerywrites
achantavy Nov 28, 2022
8021520
Code commment suggestions
achantavy Dec 9, 2022
2e3dd8e
Merge branch 'master' into generalizequerywrites
achantavy Dec 9, 2022
d2e4794
Merge branch 'generalizequerywrites' of github.com:lyft/cartography i…
achantavy Dec 9, 2022
5bcb915
Stackoverflow comment for clarity)
achantavy Dec 9, 2022
52a1d29
Support ingesting only parts of a schema without breaking the others
achantavy Dec 12, 2022
b5875a9
Doc comment
achantavy Dec 12, 2022
fa40b09
Linter
achantavy Dec 12, 2022
80b6ff5
Support matching on one or more properties
achantavy Dec 13, 2022
346ad50
Correctly name test
achantavy Dec 13, 2022
2e65877
Change key_refs to TargetNodeMatcher to enforce it as a mandatory field
achantavy Dec 14, 2022
cb233c3
Remove use of hacky default_field()
achantavy Dec 14, 2022
3be472b
Support subset of schema relationships for query generation, test mul…
achantavy Dec 14, 2022
5dd60a2
Docstrings
achantavy Dec 14, 2022
f6fea75
Comments in tests
achantavy Dec 15, 2022
fea0308
Better comments
achantavy Dec 15, 2022
7b3a268
Test for exception conditions
achantavy Dec 15, 2022
ab0af7b
Remove irrelevant comment
achantavy Dec 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ repos:
- id: debug-statements
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://gitlab.com/pycqa/flake8
- repo: https://github.com/pycqa/flake8
rev: 3.9.2
hooks:
- id: flake8
Expand All @@ -39,7 +39,7 @@ repos:
- id: reorder-python-imports
args: [--py3-plus]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.950
rev: v0.981
achantavy marked this conversation as resolved.
Show resolved Hide resolved
hooks:
- id: mypy
additional_dependencies:
Expand Down
Empty file removed cartography/client/__init__py
Empty file.
64 changes: 64 additions & 0 deletions cartography/client/core/tx.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import neo4j

from cartography.util import batch


def read_list_of_values_tx(tx: neo4j.Transaction, query: str, **kwargs) -> List[Union[str, int]]:
"""
Expand Down Expand Up @@ -146,3 +148,65 @@ def read_single_dict_tx(tx: neo4j.Transaction, query: str, **kwargs) -> Dict[str

result.consume()
return value


def write_list_of_dicts_tx(
tx: neo4j.Transaction,
query: str,
**kwargs,
) -> None:
"""
Writes a list of dicts to Neo4j.

Example usage:
import neo4j
dict_list: List[Dict[Any, Any]] = [{...}, ...]

neo4j_driver = neo4j.driver(... args ...)
neo4j_session = neo4j_driver.Session(... args ...)

neo4j_session.write_transaction(
write_list_of_dicts_tx,
'''
UNWIND $DictList as data
MERGE (a:SomeNode{id: data.id})
SET
a.other_field = $other_field,
a.yet_another_kwarg_field = $yet_another_kwarg_field
...
''',
DictList=dict_list,
other_field='some extra value',
yet_another_kwarg_field=1234
)

:param tx: The neo4j write transaction.
:param query: The Neo4j write query to run.
:param kwargs: Keyword args to be supplied to the Neo4j query.
:return: None
"""
tx.run(query, kwargs)


def load_graph_data(
neo4j_session: neo4j.Session,
query: str,
dict_list: List[Dict[str, Any]],
**kwargs,
) -> None:
"""
Writes data to the graph.
:param neo4j_session: The Neo4j session
:param query: The Neo4j write query to run. This query is not meant to be handwritten, rather it should be generated
with cartography.graph.querybuilder.build_ingestion_query().
:param dict_list: The data to load to the graph represented as a list of dicts.
:param kwargs: Allows additional keyword args to be supplied to the Neo4j query.
:return: None
"""
for data_batch in batch(dict_list, size=10000):
neo4j_session.write_transaction(
write_list_of_dicts_tx,
query,
DictList=data_batch,
**kwargs,
)
234 changes: 234 additions & 0 deletions cartography/graph/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
import abc
from dataclasses import dataclass
from dataclasses import field
from enum import auto
from enum import Enum
from typing import List
from typing import Optional


class LinkDirection(Enum):
"""
Each CartographyRelSchema has a LinkDirection that determines whether the relationship points toward the original
node ("INWARD") or away from the original node ("OUTWARD").

For example the following code creates the path `(:EMRCluster)<-[:RESOURCE]-(:AWSAccount)`:

class EMRCluster(CartographyNodeSchema):
label: str = "EMRCluster"
sub_resource_relationship: CartographyRelSchema = EMRClusterToAWSAccount()
# ...

class EMRClusterToAWSAccount(CartographyRelSchema):
target_node_label: str = "AWSAccount"
rel_label: str = "RESOURCE"
direction: LinkDirection = LinkDirection.INWARD
# ...

If `EMRClusterToAWSAccount.direction` was LinkDirection.OUTWARD, then the directionality of the relationship would
be `(:EMRCluster)-[:RESOURCE]->(:AWSAccount)` instead.
"""
INWARD = auto()
OUTWARD = auto()


class PropertyRef:
"""
PropertyRefs represent properties on cartography nodes and relationships.

cartography takes lists of Python dicts and loads them to Neo4j. PropertyRefs allow our dynamically generated Neo4j
ingestion queries to set values for a given node or relationship property from (A) a field on the dict being
processed (PropertyRef.override=False, default), or (B) from a single variable provided by a keyword argument
(PropertyRef.override=True).
achantavy marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(self, name: str, set_in_kwargs=False):
"""
:param name: The name of the property
:param set_in_kwargs: Optional. If True, the property is not defined on the data dict, and we expect to find the
property in the kwargs.
If False, looks for the property in the data dict.
Defaults to False.
"""
self.name = name
self.set_in_kwargs = set_in_kwargs

def _parameterize_name(self) -> str:
return f"${self.name}"

def __repr__(self) -> str:
achantavy marked this conversation as resolved.
Show resolved Hide resolved
achantavy marked this conversation as resolved.
Show resolved Hide resolved
return f"item.{self.name}" if not self.set_in_kwargs else self._parameterize_name()


@dataclass
class CartographyNodeProperties(abc.ABC):
"""
Abstract base dataclass that represents the properties on a CartographyNodeSchema. This is intended to enforce that
all subclasses will have an id and a lastupdated field defined on their resulting nodes.
"""
id: PropertyRef = field(init=False)
lastupdated: PropertyRef = field(init=False)

def __post_init__(self):
achantavy marked this conversation as resolved.
Show resolved Hide resolved
"""
Designed to prevent direct instantiation. This workaround is needed since this is both an abstract class and a
dataclass.
"""
if self.__class__ == CartographyNodeProperties:
raise TypeError("Cannot instantiate abstract class.")


@dataclass
class CartographyRelProperties(abc.ABC):
"""
Abstract class that represents the properties on a CartographyRelSchema. This is intended to enforce that all
subclasses will have a lastupdated field defined on their resulting relationships.
"""
lastupdated: PropertyRef = field(init=False)

def __post_init__(self):
"""
Designed to prevent direct instantiation. This workaround is needed since this is both an abstract class and a
dataclass.
"""
if self.__class__ == CartographyRelProperties:
raise TypeError("Cannot instantiate abstract class.")


@dataclass
class CartographyRelSchema(abc.ABC):
"""
Abstract base dataclass that represents a cartography relationship.

The CartographyRelSchema contains properties that make it possible to connect the CartographyNodeSchema to other
existing nodes in the graph.
"""
@property
@abc.abstractmethod
def properties(self) -> CartographyRelProperties:
"""
:return: The properties of the relationship.
"""
pass

@property
@abc.abstractmethod
def target_node_label(self) -> str:
"""
:return: The target node label that this relationship will connect to.
"""
pass

@property
@abc.abstractmethod
def target_node_key(self) -> str:
"""
:return: The attribute name on the target_node_label used to uniquely identify what node to connect to.
"""
pass

@property
@abc.abstractmethod
def target_node_key_property_ref(self) -> PropertyRef:
"""
:return: The value of the target_node_key used to uniquely identify what node to connect to. This is given as a
PropertyRef.
"""
pass

@property
@abc.abstractmethod
def rel_label(self) -> str:
"""
:return: The str label of the relationship.
"""
pass

@property
@abc.abstractmethod
def direction(self) -> LinkDirection:
"""
:return: The LinkDirection of the query. Please see the `LinkDirection` docs for a detailed explanation.
"""
pass


@dataclass
class CartographyNodeSchema(abc.ABC):
"""
Abstract base dataclass that represents a graph node in cartography. This is used to dynamically generate graph
ingestion queries.

A CartographyNodeSchema is composed of:

- CartographyNodeProperties: contains the properties on the node and where to find their values with PropertyRef
objects.
- [Optional] A CartographyRelSchema pointing to the node's sub-resource (see the docstring on
`sub_resource_relationship` for details.
- [Optional] One or more other CartographyRelSchemas to other nodes.
"""
_extra_labels: Optional[List[str]] = field(init=False, default=None)
_other_relationships: Optional[List[CartographyRelSchema]] = field(init=False, default=None)

@property
@abc.abstractmethod
def label(self) -> str:
"""
:return: The primary str label of the node.
"""
pass

@property
@abc.abstractmethod
def properties(self) -> CartographyNodeProperties:
"""
:return: The properties of the node.
"""
pass

@property
def sub_resource_relationship(self) -> Optional[CartographyRelSchema]:
"""
Optional.
Allows subclasses to specify a subresource relationship for the given node. "Sub resource" is a term we made up
best defined by examples:
- In the AWS module, the subresource is an AWSAccount
- In Azure, the subresource is a Subscription
- In GCP, the subresource is a GCPProject
- In Okta, the subresource is an OktaOrganization
... and so on and so forth.
:return:
"""
return None

@property
def other_relationships(self) -> Optional[List[CartographyRelSchema]]:
"""
Optional.
Allows subclasses to specify additional cartography relationships on the node.
:return: None of not overriden. Else return a list of CartographyRelSchema associated with the node.
"""
return self._other_relationships

@other_relationships.setter
def other_relationships(self, other_rels: List[CartographyRelSchema]) -> None:
"""
Boilerplate setter function used to keep typehints happy.
"""
self._other_relationships = other_rels

@property
def extra_labels(self) -> Optional[List[str]]:
"""
Optional.
Allows specifying extra labels on the node.
:return: None if not overriden. Else return a str list of the extra labels specified on the node.
"""
return self._extra_labels

@extra_labels.setter
def extra_labels(self, labels: List[str]) -> None:
"""
Boilerplate setter function used to keep typehints happy.
"""
self._extra_labels = labels
Loading