Skip to content

Commit

Permalink
Introduce json2sandesh proxy server to support analytics collector wi…
Browse files Browse the repository at this point in the history
…th new Intent API.

Change-Id: I506b9d57c66fb214e5744264587a1bea7c84c012
closes-jira-bug: JBE-873
  • Loading branch information
Dmitry-Eremeev committed Mar 5, 2019
1 parent 0abd877 commit e81ee8a
Show file tree
Hide file tree
Showing 14 changed files with 882 additions and 1 deletion.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ contrail.gpg
contrail.gpg.md5
contrailtpc.repo
contrailtpc.repo.md5
__*
.project
.pydevproject
*.pyc
19 changes: 19 additions & 0 deletions containers/analytics/json2sandesh/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
ARG CONTRAIL_REGISTRY
ARG CONTRAIL_CONTAINER_TAG
FROM ${CONTRAIL_REGISTRY}/contrail-analytics-base:${CONTRAIL_CONTAINER_TAG}

ARG CONTAINER_NAME

COPY entrypoint.sh /
COPY setup /setup/

WORKDIR /setup/
RUN python setup.py install
RUN python setup.py install_lib

ENTRYPOINT ["/entrypoint.sh"]
CMD ["/usr/bin/python", "/usr/bin/contrail-json2sandesh", "--config", "/etc/contrail/contrail-json2sandesh.conf"]

ENV SERVICE_NAME=contrail-json2sandesh
LABEL net.juniper.contrail.service=$SERVICE_NAME
LABEL net.juniper.contrail.container.name=$CONTAINER_NAME
25 changes: 25 additions & 0 deletions containers/analytics/json2sandesh/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

source /common.sh

pre_start_init

host_ip=$(get_listen_ip_for_node ANALYTICS)

cat > /etc/contrail/contrail-json2sandesh.conf << EOM
[INIT_GENERATOR]
instance_id=99
collectors=$COLLECTOR_SERVERS
log_level=$LOG_LEVEL
log_file=$LOG_DIR/contrail-json2sandesh.log
sandesh_send_rate_limit=100
[INTERFACE_CONFIG]
api_host=${host_ip}
api_port=8113
api_debug=False
EOM

add_ini_params_from_env ANALYTICS_JSON2SANDESH /etc/contrail/contrail-json2sandesh.conf

exec "$@"
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from bottle import route, request, response, run, install
from traceback import format_exc
from functools import wraps
from datetime import datetime
from socket import error as socket_error
from simplejson import JSONDecodeError


class ApiServer(object):
def __init__(self, generator):
self._generator = generator
self.logger = generator.logger
install(self.logging)
route('/', "GET", self.test_route)
route('/json2sandesh', "POST", self.parse_json)

@property
def generator(self):
return self._generator

def logging(self, fn):
@wraps(fn)
def _logging(*args, **kwargs):
actual_response = fn(*args, **kwargs)
self.logger.warning("%s request %s" % (datetime.now(), request))
self.logger.warning("%s response %s" % (datetime.now(), response))
return actual_response
return _logging

def run(self, interface_config):
try:
run(**interface_config)
except socket_error:
self.logger.error("Api host or port are already used."
" Please change it.")

def test_route(self):
return {"api_version": "0.5"}

def parse_json(self):
try:
raw_payload = request.json
sandesh_type = raw_payload["sandesh_type"]
response = self._generator.send_trace(raw_payload)
except JSONDecodeError as e:
sandesh_type = "Unknown"
response = "request %s. request body is not valid json: %s" % (str(
request),
str(e))
self.logger.error(response)
except:
response = format_exc()
self.logger.error(str(response))
return {"sandesh_type": sandesh_type, "response": response}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
from traceback import format_exc

from cfgm_common.uve.vnc_api.ttypes import ContrailConfig,\
ContrailConfigTrace,\
VncApiStatsLog,\
VncApiStats, \
VncApiConfigLog, \
VncApiCommon, \
FabricJobExecution, \
FabricJobUve, \
PhysicalRouterJobUve, \
PhysicalRouterJobExecution, \
VncApiLatencyStatsLog, \
VncApiLatencyStats, \
VncApiDebug, \
VncApiInfo, \
VncApiNotice, \
VncApiError
from cfgm_common.uve.cfgm_cpuinfo.ttypes import ModuleCpuStateTrace, \
ModuleCpuState

from pysandesh.Thrift import TType


class Converter(object):
def __init__(self, sandesh):
self._thrift = {
"BOOL": bool,
"BYTE": int,
"I16": int,
"I32": int,
"I64": int,
"DOUBLE": float,
"STRING": str,
"LIST": list,
"U64": int,
"MAP": self.thrift_dict}

# classes are arranged in specific order to initialize objects and insert them
# in the next class instance.
# args stand for arguments for last class in list
# args are arranged:
# 1st - instance of previous class or main trace data
# 2nd - sandesh
# others - additional arguments
self._converters_map = {
"ContrailConfigTrace": {
"sandesh_classes": [ContrailConfig,
ContrailConfigTrace],
"args": ["data", "sandesh", "table"]},
"VncApiStatsLog": {
"sandesh_classes": [VncApiStats,
VncApiStatsLog],
"args": ["api_stats", "sandesh"]},
"VncApiConfigLog": {
"sandesh_classes": [VncApiCommon,
VncApiConfigLog],
"args": ["api_log", "sandesh"]},
"FabricJobUve":
{"sandesh_classes": [FabricJobExecution,
FabricJobUve],
"args": ["data", "sandesh", "table"]},
"PhysicalRouterJobUve":
{"sandesh_classes": [PhysicalRouterJobExecution,
PhysicalRouterJobUve],
"args": ["data", "sandesh", "table"]},
"ModuleCpuStateTrace":
{"sandesh_classes": [ModuleCpuState,
ModuleCpuStateTrace],
"args": ["data", "sandesh", "table"]},
"VncApiLatencyStatsLog":
{"sandesh_classes": [VncApiLatencyStats,
VncApiLatencyStatsLog],
"args": ["api_latency_stats", "sandesh", "node_name"]},
"VncApiDebug":
{"sandesh_classes": [VncApiDebug],
"args": ["api_msg", "sandesh"]},
"VncApiInfo":
{"sandesh_classes": [VncApiInfo],
"args": ["api_msg", "sandesh"]},
"VncApiNotice":
{"sandesh_classes": [VncApiNotice],
"args": ["api_msg", "sandesh"]},
"VncApiError":
{"sandesh_classes": [VncApiError],
"args": ["api_msg", "sandesh"]},
}

self._sandesh = sandesh
self._logger = self._sandesh.logger()

@property
def logger(self):
return self._sandesh.logger()

def thrift_dict(self, attr_data):
thrift_dict = dict(attr_data)
return {key: str(thrift_dict[key]) for key in thrift_dict}

def convert(self, sandesh_type, raw_sandesh_data):
error = ""
trace = None
send_method = None

sandesh_classes = self._converters_map[sandesh_type]["sandesh_classes"]
self.logger.debug("sandesh classes %s" % str(sandesh_classes))
try:
result = self.verify_data(sandesh_classes[0],
raw_sandesh_data)
if result["error"]:
return {"trace": trace,
"send_method": send_method,
"error": result["error"]}
checked_data = result["data"]
args = self._converters_map[sandesh_type]["args"]
kwargs = dict()
kwargs[args[1]] = self._sandesh
if len(sandesh_classes) > 1:
sandesh_data = sandesh_classes[0](**checked_data)
kwargs[args[0]] = sandesh_data
if len(args) > 2:
for arg in args[2:]:
kwargs[arg] = raw_sandesh_data[arg]
else:
kwargs[args[0]] = checked_data[args[0]]
self.logger.debug("trace args %s" % str(kwargs))
trace = sandesh_classes[-1](**kwargs)
send_method = getattr(sandesh_classes[-1], "send")
except KeyError:
error = "json field '%s' was not provided" % str(arg)
self.logger.error(error)
except:
error = format_exc()
return {"trace": trace, "send_method": send_method, "error": error}

def verify_data(self, checked_class, raw_sandesh_data):
try:
error = str()
self.logger.debug("checked class %s" % str(checked_class))
self.logger.debug("raw sandesh data %s" % str(raw_sandesh_data))
thrift_format = checked_class.thrift_spec
data = dict()

if thrift_format is not None:
for thrift_field in thrift_format:
self.logger.debug("thrift_field %s" % str(thrift_field))
if thrift_field is not None:
thrift_type = TType._VALUES_TO_NAMES[thrift_field[1]]
self.logger.debug("thrift_type %s" % str(
thrift_type))
convert_method = self._thrift[thrift_type]
self.logger.debug("convert_method %s" % str(
convert_method))
attr_name = thrift_field[2]
self.logger.debug("attr name %s" % str(attr_name))
data[attr_name] = convert_method(
raw_sandesh_data[attr_name])
self.logger.debug("data[attr] %s" % str(
data[attr_name]))
else:
data = raw_sandesh_data
except KeyError:
error = "json field '%s' was not provided" % str(thrift_field)
data = None
self.logger.error(error)
return {"data": data, "error": error}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from gevent import monkey
monkey.patch_all()
from gevent import spawn, joinall
from time import sleep
from socket import gethostname

from pysandesh.sandesh_base import Sandesh, SandeshConfig
from cfgm_common.vnc_greenlets import VncGreenlet
from converter import Converter


class Generator(object):
def __init__(self, generator_config):
self._sandesh = Sandesh()
self._converter = Converter(self._sandesh)
self._sandesh_config = SandeshConfig(
system_logs_rate_limit=generator_config["sandesh_send_rate_limit"])
self._sandesh.init_generator(
module="contrail-api",
source=gethostname(),
node_type="Config",
instance_id=generator_config["instance_id"],
collectors=generator_config["collectors"],
client_context="json2sandesh_context",
http_port=int("-1"),
sandesh_req_uve_pkg_list=["cfgm_common"],
connect_to_collector=True,
logger_class=None,
logger_config_file=None,
host_ip="127.0.0.1",
alarm_ack_callback=None,
config=self._sandesh_config)

self._sandesh.set_logging_params(
enable_local_log=True,
category=None,
level=generator_config["level"],
file=generator_config["file"],
enable_syslog=None,
syslog_facility=None,
enable_trace_print=None,
enable_flow_log=None)

VncGreenlet.register_sandesh_handler()
self._client = self._sandesh.client()
self.logger.warning("sandesh client %s" % self._client)
self._connection = self._client.connection()
self.logger.warning(
"sandesh connection %s" % self._connection)
self._con_state = self._connection.state()
self.logger.warning(
"sandesh client connection state %s" % self.con_state)

@property
def logger(self):
return self._sandesh.logger()

@property
def con_state(self):
return self._connection.state()

def connect(self):
while self.con_state is not "Established":
self.logger.warning("connection state %s" % self.con_state)
sleep(1)

def send_trace(self, raw_payload):
sandesh_type = raw_payload['sandesh_type']
converting = self._converter.convert(sandesh_type,
raw_payload['payload'])
if not converting["error"]:
trace = converting["trace"]
self.logger.warning("trace %s" % trace)
send_task = spawn(
converting["send_method"], trace, sandesh=self._sandesh)
joinall([send_task],)
value = send_task.value
self.logger.warning(
"send method return code %s" % value)
exception = send_task.exception
self.logger.warning(
"exception in send method %s" % exception)
return {'send_method_return_code': value,
'send_method_exception': exception}
else:
self.logger.error(converting["error"])
return {"error": converting["error"]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from utils import parse_args

from api_server import ApiServer
from generator import Generator


def main():
config = parse_args()
generator = Generator(config["generator_config"])
generator.connect()

app = ApiServer(generator)
app.run(config["interface_config"])


if __name__ == '__main__':
main()
Loading

0 comments on commit e81ee8a

Please sign in to comment.