Skip to content

Commit

Permalink
Aggregate signal instance messages in Case thread (#3290)
Browse files Browse the repository at this point in the history
* add database tenant revision for signal_thread_ts column addition to case model

* aggregate signal instances in case thread and update entity and case counter

* rebase master and fix revision sequence for db migration

* use validated email in user engagement

* abandon dynamic spacing for correlation messages and just newline it out

* only show case correlation message
  • Loading branch information
wssheldon authored Apr 21, 2023
1 parent b6fc587 commit e18ec21
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 94 deletions.
2 changes: 2 additions & 0 deletions src/dispatch/case/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class Case(Base, TimeStampMixin, ProjectMixin):
related_id = Column(Integer, ForeignKey("case.id"))
related = relationship("Case", remote_side=[id], uselist=True, foreign_keys=[related_id])

signal_thread_ts = Column(String, nullable=True)

storage = relationship("Storage", uselist=False, backref="case", cascade="all, delete-orphan")

tags = relationship(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Adds a thread_ts column for a signal message in a case so it can be updated by conversation plugin
Revision ID: 930eb80028d2
Revises: 56eb1c0a3a92
Create Date: 2023-04-20 16:55:33.901449
"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = "930eb80028d2"
down_revision = "56eb1c0a3a92"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("case", sa.Column("signal_thread_ts", sa.String(), nullable=True))
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("case", "signal_thread_ts")
# ### end Alembic commands ###
152 changes: 72 additions & 80 deletions src/dispatch/plugins/dispatch_slack/case/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,114 +144,106 @@ def create_case_message(case: Case, channel_id: str) -> list[Block]:

def create_signal_messages(case: Case, channel_id: str, db_session: Session) -> List[Message]:
"""Creates the signal instance message."""
messages = []
num_of_instances = len(case.signal_instances)
first_instance = case.signal_instances[0]

for instance in case.signal_instances:
button_metadata = SubjectMetadata(
type=SignalSubjects.signal_instance,
organization_slug=case.project.organization.slug,
id=str(instance.id),
project_id=case.project.id,
channel_id=channel_id,
).json()

signal_metadata_blocks = [
Section(
text=f"*{instance.signal.name}* - {instance.signal.variant}",
),
Actions(
elements=[
Button(
text="Raw Data",
action_id=SignalNotificationActions.view,
value=button_metadata,
),
Button(
text="Snooze",
action_id=SignalNotificationActions.snooze,
value=button_metadata,
),
Button(
text="Response Plan",
action_id="button-link",
url=instance.signal.external_url,
),
]
),
Section(text="*Entities*"),
Divider(),
]
button_metadata = SubjectMetadata(
type=SignalSubjects.signal_instance,
organization_slug=case.project.organization.slug,
id=str(first_instance.id),
project_id=case.project.id,
channel_id=channel_id,
).json()

if not instance.entities:
signal_metadata_blocks.append(
Section(
text="No entities found.",
signal_metadata_blocks = [
Section(
text=f"*{first_instance.signal.name}* - {first_instance.signal.variant}",
),
Actions(
elements=[
Button(
text="Raw Data",
action_id=SignalNotificationActions.view,
value=button_metadata,
),
)
Button(
text="Snooze",
action_id=SignalNotificationActions.snooze,
value=button_metadata,
),
Button(
text="Response Plan",
action_id="button-link",
url=first_instance.signal.external_url,
),
]
),
Section(text=f"Total instances in this case: *{num_of_instances}*\n"),
Section(text="\n*Entities*"),
Divider(),
]

processed_entities = set()

for instance in case.signal_instances:
EntityGroup = namedtuple(
"EntityGroup", ["value", "related_instance_count", "related_case_count"]
"EntityGroup",
["value", "related_case_count"],
)
entity_groups = defaultdict(list)

for e in instance.entities:
related_instances = entity_service.get_signal_instances_with_entity(
db_session=db_session, entity_id=e.id, days_back=14
)
related_instance_count = len(related_instances)
# prevent duplicate entities from appearing in the message
if e.value in processed_entities:
continue

processed_entities.add(e.value)

related_cases = entity_service.get_cases_with_entity(
db_session=db_session, entity_id=e.id, days_back=14
related_case_count = len(
entity_service.get_cases_with_entity(
db_session=db_session, entity_id=e.id, days_back=14
)
)
related_case_count = len(related_cases)

# Deduplicate the entity_groups by checking if the value is already in the set
entity_groups[e.entity_type.name].append(
EntityGroup(
value=e.value,
related_instance_count=related_instance_count,
related_case_count=related_case_count,
)
)

for k, v in entity_groups.items():
if v:
related_instance_count = v[0].related_instance_count
match related_instance_count:
case 0:
signal_message = "First time this entity has been seen in a signal."
case 1:
signal_message = f"Seen in *{related_instance_count}* other signal."
case _:
signal_message = f"Seen in *{related_instance_count}* other signals."

related_case_count = v[0].related_case_count
match related_case_count:
case 0:
case_message = "First time this entity has been seen in a case."
case 1:
case_message = f"Seen in *{related_instance_count}* other case."
case _:
case_message = f"Seen in *{related_instance_count}* other cases."

# dynamically allocate space for the entity type name and entity type values
entity_type_name_length = len(k)
entity_type_value_length = len(", ".join(item.value for item in v))
entity_type_name_spaces = " " * (55 - entity_type_name_length)
entity_type_value_spaces = " " * (50 - entity_type_value_length)
entity_group = v[0]
case_message = (
"First time this entity has been seen in a case."
if entity_group.related_case_count == 0
else f"Seen in *{entity_group.related_case_count}* other case(s)."
)

# Threaded messages do not overflow text fields, so we hack together the same UI with spaces
signal_metadata_blocks.append(
Context(
elements=[
f"*{k}*{entity_type_name_spaces}{signal_message}\n`{', '.join(item.value for item in v)}`{entity_type_value_spaces}{case_message}"
f"*{k}*\n`{', '.join(item.value for item in v)}`\n\n{case_message}"
]
),
)
signal_metadata_blocks.append(Divider())

if instance.entities:
signal_metadata_blocks.append(
Context(elements=["Correlation is based on two weeks of signal data."]),
)
if any(instance.entities for instance in case.signal_instances):
signal_metadata_blocks.append(
Context(elements=["Correlation is based on two weeks of signal data."]),
)
else:
signal_metadata_blocks.append(
Section(
text="No entities found.",
),
)

messages.append(Message(blocks=signal_metadata_blocks[:50]).build()["blocks"])
return messages
return Message(blocks=signal_metadata_blocks).build()["blocks"]


def create_signal_engagement_message(
Expand Down
33 changes: 25 additions & 8 deletions src/dispatch/plugins/dispatch_slack/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,17 @@ def create_threaded(self, case: Case, conversation_id: str, db_session: Session)
ts=response["timestamp"],
)
if case.signal_instances:
messages = create_signal_messages(
message = create_signal_messages(
case=case, channel_id=conversation_id, db_session=db_session
)
for m in messages:
send_message(
client=client,
conversation_id=conversation_id,
ts=response["timestamp"],
blocks=m,
)
signal_response = send_message(
client=client,
conversation_id=conversation_id,
ts=response["timestamp"],
blocks=message,
)
case.signal_thread_ts = signal_response.get("timestamp")
db_session.commit()
return response

def create_engagement_threaded(
Expand Down Expand Up @@ -126,6 +127,22 @@ def update_thread(self, case: Case, conversation_id: str, ts: str):
blocks = create_case_message(case=case, channel_id=conversation_id)
return update_message(client=client, conversation_id=conversation_id, ts=ts, blocks=blocks)

def update_signal_message(
self,
case: Case,
conversation_id: str,
db_session: Session,
thread_id: str,
):
"""Updates the signal message."""
client = create_slack_client(self.configuration)
blocks = create_signal_messages(
case=case, channel_id=conversation_id, db_session=db_session
)
return update_message(
client=client, conversation_id=conversation_id, blocks=blocks, ts=thread_id
)

def send(
self,
conversation_id: str,
Expand Down
39 changes: 33 additions & 6 deletions src/dispatch/signal/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dispatch.signal import service as signal_service
from dispatch.signal import flows as signal_flows
from dispatch.signal.enums import SignalEngagementStatus
from dispatch.signal.models import SignalInstance, SignalInstanceCreate
from dispatch.signal.models import SignalInstance, SignalInstanceCreate, SignalFilterAction
from dispatch.workflow import flows as workflow_flows

log = logging.getLogger(__name__)
Expand All @@ -40,12 +40,21 @@ def signal_instance_create_flow(

# we don't need to continue if a filter action took place
if signal_service.filter_signal(db_session=db_session, signal_instance=signal_instance):
# If the signal was deduplicated, we can assume a case exists,
# and we need to update the corresponding signal message
if (
signal_instance.filter_action == SignalFilterAction.deduplicate
and signal_instance.case.signal_thread_ts # noqa
):
update_signal_message(
db_session=db_session,
signal_instance=signal_instance,
)
return signal_instance

if not signal_instance.signal.create_case:
return signal_instance

# create a case if not duplicate or snoozed
if signal_instance.case_priority:
case_priority = signal_instance.case_priority
else:
Expand All @@ -56,6 +65,7 @@ def signal_instance_create_flow(
else:
case_type = signal_instance.signal.case_type

# create a case if not duplicate or snoozed and case creation is enabled
case_in = CaseCreate(
title=signal_instance.signal.name,
description=signal_instance.signal.description,
Expand Down Expand Up @@ -142,16 +152,15 @@ def engage_signal_identity(db_session: Session, signal_instance: SignalInstance)
for entity in signal_instance.entities:
if engagement.entity_type_id == entity.entity_type_id:
try:
email = validate_email(entity.value, check_deliverability=False)
email = email.normalized
validated_email = validate_email(entity.value, check_deliverability=False)
except EmailNotValidError as e:
log.warning(
f"Discovered entity value in Signal {signal_instance.signal.id} that did not appear to be a valid email: {e}"
)
else:
users_to_engage.append(
{
"user": entity.value,
"user": validated_email.email,
"engagement": engagement,
}
)
Expand All @@ -168,7 +177,7 @@ def engage_signal_identity(db_session: Session, signal_instance: SignalInstance)
plugin_type="conversation",
)
if not plugin:
log.warning("No contact plugin is active.")
log.warning("No conversation plugin is active.")
return

for reachout in users_to_engage:
Expand Down Expand Up @@ -197,3 +206,21 @@ def engage_signal_identity(db_session: Session, signal_instance: SignalInstance)
)
signal_instance.engagement_thread_ts = response.get("timestamp")
db_session.commit()


def update_signal_message(db_session: Session, signal_instance: SignalInstance) -> None:
plugin = plugin_service.get_active_instance(
db_session=db_session,
project_id=signal_instance.case.project.id,
plugin_type="conversation",
)
if not plugin:
log.warning("No conversation plugin is active.")
return

plugin.instance.update_signal_message(
case=signal_instance.case,
conversation_id=signal_instance.case.conversation.channel_id,
db_session=db_session,
thread_id=signal_instance.case.signal_thread_ts,
)

0 comments on commit e18ec21

Please sign in to comment.