Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nifi monitoring script #66

Merged
merged 10 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions tools/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ SHELL ["/bin/bash", "-o", "pipefail", "-c"]
RUN microdnf update && \
microdnf install java-11-openjdk-headless --nodocs && \
microdnf install tar gzip zip shadow-utils curl openssl && \
microdnf install python3-devel python3-pip python3-setuptools && \
microdnf clean all

# The bcrypt tool is needed by NiFi to locally encrypt the admin password that is mounted as a secret in cleartext
COPY tools/bin/stackable-bcrypt-1.0-SNAPSHOT-jar-with-dependencies.jar /bin/stackable-bcrypt.jar
# add all python scripts
COPY tools/python /stackable/python

RUN groupadd -r stackable --gid=1000 && \
RUN pip3 install --no-cache-dir -r /stackable/python/requirements.txt && \
groupadd -r stackable --gid=1000 && \
useradd -r -g stackable --uid=1000 stackable && \
mkdir /stackable && \
chown -R stackable:stackable /stackable

USER stackable
Expand Down
151 changes: 151 additions & 0 deletions tools/python/create_nifi_reporting_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#!/usr/bin/env python3
"""This is a script to create a ReportingTask in NiFi via the REST API"""
import sys
import argparse
import nipyapi

# no stack trace
sys.tracebacklimit = 0


def init(url: str, username: str, password: str, ca_file: str):
"""Initialize authenticated connection to NiFi"""
nipyapi.config.nifi_config.host = url
nipyapi.security.set_service_ssl_context(service='nifi', ca_file=ca_file)

try:
nipyapi.security.service_login(service='nifi', username=username, password=password)
print("Successfully authenticated and established connection with [%s]" % url)
except Exception as ex:
raise Exception("Failed to connect to {}: {}".format(url, str(ex))) from None


def find_reporting_task(name: str, port: str):
"""Find a ReportingTask via its name and port"""
flow_api = nipyapi.nifi.apis.flow_api.FlowApi()

try:
reporting_tasks = flow_api.get_reporting_tasks().reporting_tasks
except Exception as ex:
raise Exception("Failed to retrieve ReportingTask[{}/{}]: {}"
.format(name, port, str(ex))) from None

for task in reporting_tasks:
maltesander marked this conversation as resolved.
Show resolved Hide resolved
task_dict = task.to_dict()
task_component = task_dict["component"]
task_name = task_component["name"]
task_port = task_component["properties"]["prometheus-reporting-task-metrics-endpoint-port"]
if task_name == name and task_port == port:
return task

return None


def create_reporting_task(name: str, port: str, version: str):
"""Create a ReportingTask"""
task = nipyapi.nifi.models.reporting_task_entity.ReportingTaskEntity(
revision=nipyapi.nifi.models.revision_dto.RevisionDTO(version=0),
disconnected_node_acknowledged=False,
component=nipyapi.nifi.models.reporting_task_dto.ReportingTaskDTO(
name=name,
type="org.apache.nifi.reporting.prometheus.PrometheusReportingTask",
bundle=nipyapi.nifi.models.bundle_dto.BundleDTO(
group="org.apache.nifi",
artifact="nifi-prometheus-nar",
version=version
maltesander marked this conversation as resolved.
Show resolved Hide resolved
),
properties={
"prometheus-reporting-task-metrics-endpoint-port": port,
"prometheus-reporting-task-metrics-send-jvm": True
}
)
)

controller_api = nipyapi.nifi.apis.controller_api.ControllerApi()

try:
return controller_api.create_reporting_task(body=task)
except Exception as ex:
raise Exception("Failed to create reporting task: {}".format(str(ex))) from None


def get_reporting_task_name(task):
"""Return the ReportingTask name"""
task_dict = task.to_dict()
return task_dict["component"]["name"]


def get_revision_version(task):
"""Return the ReportingTask revision version"""
task_dict = task.to_dict()
return task_dict["revision"]["version"]


def is_reporting_task_running(task):
"""Check if the the ReportingTask is already running"""
task_dict = task.to_dict()
return task_dict["component"]["state"] == "RUNNING"


def set_reporting_task_running(task):
"""Set ReportingTask to RUNNING"""
reporting_task_api = nipyapi.nifi.apis.reporting_tasks_api.ReportingTasksApi()

state = {
"revision": {
"version": get_revision_version(task)
maltesander marked this conversation as resolved.
Show resolved Hide resolved
},
"disconnected_node_acknowledged": False,
"state": "RUNNING"
}

try:
return reporting_task_api.update_run_status(id=task.id, body=state)
except Exception as ex:
raise Exception("Failed to set ReportingTask [{}] to RUNNING: {}"
.format(task.id, str(ex))) from None


def main():
"""Main method with cli argument parsing and ReportingTask logic"""
# Construct an argument parser
all_args = argparse.ArgumentParser()
# Add arguments to the parser
all_args.add_argument("-n", "--nifi_api_url", required=True,
help="The NiFi node url to connect to.")
all_args.add_argument("-u", "--username", required=True,
help="Username to connect as.")
all_args.add_argument("-p", "--password", required=True,
help="Password for the user.")
all_args.add_argument("-v", "--nifi_version", required=True,
help="The NiFi product version.")
all_args.add_argument("-c", "--cert", required=True,
maltesander marked this conversation as resolved.
Show resolved Hide resolved
help="The path to the trusted certificate authority that "
"signed our expected certificates.")
all_args.add_argument("-m", "--metrics_port", required=True,
help="Metrics port to be set in the ReportingTask.")
all_args.add_argument("-t", "--task_name", required=False,
default="StackablePrometheusReportingTask",
help="The name of ReportingTask to create or activate.")
args = vars(all_args.parse_args())

task_name = args["task_name"]
port = args["metrics_port"]

init(args["nifi_api_url"], args["username"], args["password"], args["cert"])

reporting_task = find_reporting_task(name=task_name, port=port)

if reporting_task is None:
reporting_task = create_reporting_task(name=task_name, port=port,
version=args["nifi_version"])
print(get_reporting_task_name(task=reporting_task) + " [%s] -> CREATED" % reporting_task.id)

if not is_reporting_task_running(task=reporting_task):
reporting_task = set_reporting_task_running(task=reporting_task)
maltesander marked this conversation as resolved.
Show resolved Hide resolved

print(get_reporting_task_name(task=reporting_task) + " [%s] -> RUNNING" % reporting_task.id)


if __name__ == '__main__':
main()
1 change: 1 addition & 0 deletions tools/python/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
nipyapi==0.15.0