Skip to content

Commit

Permalink
XRay service graph (aws#362)
Browse files Browse the repository at this point in the history
* implementation of XRay service graph

* update xray service graph file mapper

* update comments

* add unit tests and addressed commments

* refactor: rename class Service to XRayGraphServiceInfo
  • Loading branch information
mingkun2020 authored Jul 9, 2021
1 parent cfa5bbb commit 163eddc
Show file tree
Hide file tree
Showing 8 changed files with 648 additions and 21 deletions.
75 changes: 65 additions & 10 deletions samcli/commands/traces/traces_puller_factory.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
"""
Factory methods which generates puller and consumer instances for XRay events
"""
from typing import Any, Optional
from typing import Any, Optional, List

from samcli.commands.traces.trace_console_consumers import XRayTraceConsoleConsumer
from samcli.lib.observability.observability_info_puller import (
ObservabilityPuller,
ObservabilityEventConsumer,
ObservabilityEventConsumerDecorator,
ObservabilityCombinedPuller,
)
from samcli.lib.observability.xray_traces.xray_event_consumers import XRayEventFileConsumer
from samcli.lib.observability.xray_traces.xray_event_mappers import XRayTraceConsoleMapper, XRayTraceFileMapper
from samcli.lib.observability.xray_traces.xray_event_mappers import (
XRayTraceConsoleMapper,
XRayTraceFileMapper,
XRayServiceGraphConsoleMapper,
XRayServiceGraphFileMapper,
)
from samcli.lib.observability.xray_traces.xray_event_puller import XRayTracePuller
from samcli.lib.observability.xray_traces.xray_service_graph_event_puller import XRayServiceGraphPuller


def generate_trace_puller(
xray_client: Any,
output_dir: Optional[str],
output_dir: Optional[str] = None,
) -> ObservabilityPuller:
"""
Generates puller instance with correct consumer and/or mapper configuration
Expand All @@ -33,15 +40,14 @@ def generate_trace_puller(
-------
Puller instance with desired configuration
"""
if output_dir:
consumer = generate_file_consumer(output_dir)
else:
consumer = generate_console_consumer()
pullers: List[ObservabilityPuller] = []
pullers.append(XRayTracePuller(xray_client, generate_xray_event_consumer(output_dir)))
pullers.append(XRayServiceGraphPuller(xray_client, generate_xray_service_graph_consumer(output_dir)))

return XRayTracePuller(xray_client, consumer)
return ObservabilityCombinedPuller(pullers)


def generate_file_consumer(output_dir: str) -> ObservabilityEventConsumer:
def generate_xray_event_file_consumer(output_dir: str) -> ObservabilityEventConsumer:
"""
Generates file consumer, which will store XRay events into a file in given folder
Expand All @@ -57,7 +63,7 @@ def generate_file_consumer(output_dir: str) -> ObservabilityEventConsumer:
return ObservabilityEventConsumerDecorator([XRayTraceFileMapper()], XRayEventFileConsumer(output_dir))


def generate_console_consumer() -> ObservabilityEventConsumer:
def generate_xray_event_console_consumer() -> ObservabilityEventConsumer:
"""
Generates an instance of event consumer which will print events into console
Expand All @@ -66,3 +72,52 @@ def generate_console_consumer() -> ObservabilityEventConsumer:
Console consumer instance with desired mapper configuration
"""
return ObservabilityEventConsumerDecorator([XRayTraceConsoleMapper()], XRayTraceConsoleConsumer())


def generate_xray_event_consumer(output_dir: Optional[str] = None) -> ObservabilityEventConsumer:
"""
Generates consumer instance with the given variables.
If output directory have been provided, then it will return file consumer.
If not, it will return console consumer
"""
if output_dir:
return generate_xray_event_file_consumer(output_dir)
return generate_xray_event_console_consumer()


def generate_xray_service_graph_file_consumer(output_dir: str) -> ObservabilityEventConsumer:
"""
Generates file consumer, which will store XRay events into a file in given folder
Parameters
----------
output_dir : str
Location of the output directory where events and file will be stored.
Returns
-------
File consumer instance with desired mapper configuration
"""
return ObservabilityEventConsumerDecorator([XRayServiceGraphFileMapper()], XRayEventFileConsumer(output_dir))


def generate_xray_service_graph_console_consumer() -> ObservabilityEventConsumer:
"""
Generates an instance of event consumer which will print events into console
Returns
-------
Console consumer instance with desired mapper configuration
"""
return ObservabilityEventConsumerDecorator([XRayServiceGraphConsoleMapper()], XRayTraceConsoleConsumer())


def generate_xray_service_graph_consumer(output_dir: Optional[str] = None) -> ObservabilityEventConsumer:
"""
Generates consumer instance with the given variables.
If output directory have been provided, then it will return file consumer.
If not, it will return console consumer
"""
if output_dir:
return generate_xray_service_graph_file_consumer(output_dir)
return generate_xray_service_graph_console_consumer()
100 changes: 99 additions & 1 deletion samcli/lib/observability/xray_traces/xray_event_mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
from typing import List

from samcli.lib.observability.observability_info_puller import ObservabilityEventMapper
from samcli.lib.observability.xray_traces.xray_events import XRayTraceEvent, XRayTraceSegment
from samcli.lib.observability.xray_traces.xray_events import (
XRayTraceEvent,
XRayTraceSegment,
XRayServiceGraphEvent,
XRayGraphServiceInfo,
)
from samcli.lib.utils.time import to_utc, utc_to_timestamp, timestamp_to_iso


class XRayTraceConsoleMapper(ObservabilityEventMapper[XRayTraceEvent]):
Expand Down Expand Up @@ -55,3 +61,95 @@ def map(self, event: XRayTraceEvent) -> XRayTraceEvent:
mapped_event["Segments"] = segments
event.event = mapped_event
return event


class XRayServiceGraphConsoleMapper(ObservabilityEventMapper[XRayServiceGraphEvent]):
"""
Maps given XRayServiceGraphEvent.message field into printable format to use it in the console consumer
"""

def map(self, event: XRayServiceGraphEvent) -> XRayServiceGraphEvent:
formatted_services = self.format_services(event.services)
mapped_message = f"\nNew XRay Service Graph" f"{formatted_services}"
event.message = mapped_message

return event

def format_services(self, services: List[XRayGraphServiceInfo]) -> str:
"""
Prints given services information back to console.
Parameters
----------
services : List[XRayGraphServiceInfo]
List of services which will be printed into console
"""
formatted_str = ""
for service in services:
formatted_str += f"\n Reference Id: {service.id}"
formatted_str += f"{ ' - (Root)' if service.is_root else ' -'}"
formatted_str += f" {service.type} - {service.name}"
formatted_str += f" - Edges: {self.format_edges(service)}"
formatted_str += self.format_summary_statistics(service, 1)

return formatted_str

@staticmethod
def format_edges(service: XRayGraphServiceInfo) -> str:
edge_ids = service.edge_ids
return str(edge_ids)

@staticmethod
def format_summary_statistics(service: XRayGraphServiceInfo, level) -> str:
"""
Prints given summary statistics information back to console.
Parameters
----------
service: XRayGraphServiceInfo
summary statistics of the service which will be printed into console
level : int
Optional level value which will be used to make the indentation of each segment. Default value is 0
"""
formatted_str = f"\n{' ' * level} Summary_statistics:"
formatted_str += f"\n{' ' * (level + 1)} - total requests: {service.total_count}"
formatted_str += f"\n{' ' * (level + 1)} - ok count(2XX): {service.ok_count}"
formatted_str += f"\n{' ' * (level + 1)} - error count(4XX): {service.error_count}"
formatted_str += f"\n{' ' * (level + 1)} - fault count(5XX): {service.fault_count}"
formatted_str += f"\n{' ' * (level + 1)} - total response time: {service.response_time}"
return formatted_str


class XRayServiceGraphFileMapper(ObservabilityEventMapper[XRayServiceGraphEvent]):
"""
Original response from xray client contains datetime object. This mapper convert datetime object to iso string
"""

def map(self, event: XRayServiceGraphEvent) -> XRayServiceGraphEvent:
mapped_event = deepcopy(event.event)

self._convert_start_and_end_time_to_iso(mapped_event)
services = mapped_event.get("Services", [])
for service in services:
self._convert_start_and_end_time_to_iso(service)
edges = service.get("Edges", [])
for edge in edges:
self._convert_start_and_end_time_to_iso(edge)

event.event = mapped_event
return event

def _convert_start_and_end_time_to_iso(self, event):
self.convert_event_datetime_to_iso(event, "StartTime")
self.convert_event_datetime_to_iso(event, "EndTime")

def convert_event_datetime_to_iso(self, event, datetime_key):
datetime = event.get(datetime_key, None)
if datetime:
event[datetime_key] = self.convert_local_datetime_to_iso(datetime)

@staticmethod
def convert_local_datetime_to_iso(local_datetime):
utc_datetime = to_utc(local_datetime)
time_stamp = utc_to_timestamp(utc_datetime)
return timestamp_to_iso(time_stamp)
64 changes: 64 additions & 0 deletions samcli/lib/observability/xray_traces/xray_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,67 @@ def get_latest_event_time(self):
latest_event_time = sub_segment_latest_time

return latest_event_time


class XRayServiceGraphEvent(ObservabilityEvent[dict]):
"""
Represents a result of each XRay service graph event, which is returned by boto3 client by calling
'get_service_graph' See XRayServiceGraphPuller
"""

def __init__(self, event: dict):
self.services: List[XRayGraphServiceInfo] = []
self.message = str(event)
self._construct_service(event)
super().__init__(event, 0)

def _construct_service(self, event_dict):
services = event_dict.get("Services", [])
for service in services:
self.services.append(XRayGraphServiceInfo(service))


class XRayGraphServiceInfo:
"""
Represents each services information for a XRayServiceGraphEvent
"""

def __init__(self, service: dict):
self.id = service.get("ReferenceId", "")
self.document = service
self.name = service.get("Name", "")
self.is_root = service.get("Root", False)
self.type = service.get("Type")
self.edge_ids: List[int] = []
self.ok_count = 0
self.error_count = 0
self.fault_count = 0
self.total_count = 0
self.response_time = 0
self._construct_edge_ids(service.get("Edges", []))
self._set_summary_statistics(service.get("SummaryStatistics", None))

def _construct_edge_ids(self, edges):
"""
covert the edges information to a list of edge reference ids
"""
edge_ids: List[int] = []
for edge in edges:
edge_ids.append(edge.get("ReferenceId", -1))
self.edge_ids = edge_ids

def _set_summary_statistics(self, summary_statistics):
"""
get some useful information from summary statistics
"""
if not summary_statistics:
return
self.ok_count = summary_statistics.get("OkCount", 0)
error_statistics = summary_statistics.get("ErrorStatistics", None)
if error_statistics:
self.error_count = error_statistics.get("TotalCount", 0)
fault_statistics = summary_statistics.get("FaultStatistics", None)
if fault_statistics:
self.fault_count = fault_statistics.get("TotalCount", 0)
self.total_count = summary_statistics.get("TotalCount", 0)
self.response_time = summary_statistics.get("TotalResponseTime", 0)
Loading

0 comments on commit 163eddc

Please sign in to comment.