Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ability to ignore requests on initial "ADDED" events #217

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ If the filename ends with `.url` suffix, the content will be processed as a URL
| `REQ_RETRY_CONNECT` | How many connection-related errors to retry on for any http request (`*.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `10` | integer |
| `REQ_RETRY_READ` | How many times to retry on read errors for any http request (`.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `5` | integer |
| `REQ_RETRY_BACKOFF_FACTOR` | A backoff factor to apply between attempts after the second try for any http request (`.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `1.1` | float |
| `REQ_IGNORE_INITIAL_EVENT` | Set to `true` to ignore requests for first events or initial list for each configmap and secret. For listing configmaps/secrets, request will be skipped only if all items in the batch weren't discovered yet. Applicable only when `IGNORE_ALREADY_PROCESSED` is enabled. | false | `false` | boolean |
| `REQ_TIMEOUT` | How many seconds to wait for the server to send data before giving up for `.url` triggered requests or requests to `REQ_URI` (does not apply to k8s api requests) | false | `10` | float |
| `REQ_USERNAME` | Username to use for basic authentication for requests to `REQ_URL` and for `*.url` triggered requests | false | - | string |
| `REQ_PASSWORD` | Password to use for basic authentication for requests to `REQ_URL` and for `*.url` triggered requests | false | - | string |
Expand Down
48 changes: 35 additions & 13 deletions src/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _get_destination_folder(metadata, default_folder, folder_annotation):

def list_resources(label, label_value, target_folder, request_url, request_method, request_payload,
namespace, folder_annotation, resource, unique_filenames, script, enable_5xx,
ignore_already_processed):
ignore_already_processed, request_ignore_initial_event):
v1 = client.CoreV1Api()
# Filter resources based on label and value or just label
label_selector = f"{label}={label_value}" if label_value else label
Expand All @@ -87,6 +87,7 @@ def list_resources(label, label_value, target_folder, request_url, request_metho
ret = getattr(v1, _list_namespace[namespace][resource])(**additional_args)

files_changed = False
should_do_request = False

# For all the found resources
for item in ret.items:
Expand All @@ -95,11 +96,16 @@ def list_resources(label, label_value, target_folder, request_url, request_metho
# Ignore already processed resource
# Avoid numerous logs about useless resource processing each time the LIST loop reconnects
if ignore_already_processed:
if _resources_version_map.get(metadata.namespace + metadata.name) == metadata.resource_version:
logger.debug(f"Ignoring {resource} {metadata.namespace}/{metadata.name}")
resource_version_map_key = metadata.namespace + metadata.name
if _resources_version_map.get(resource_version_map_key) == metadata.resource_version:
logger.debug(f"Ignoring already processed resource version for {resource} {metadata.namespace}/{metadata.name}")
continue

_resources_version_map[metadata.namespace + metadata.name] = metadata.resource_version
if resource_version_map_key in _resources_version_map:
logger.debug(f"Item is already in the resource version map: {resource} {metadata.namespace}/{metadata.name}")
should_do_request = True

_resources_version_map[resource_version_map_key] = metadata.resource_version

logger.debug(f"Working on {resource}: {metadata.namespace}/{metadata.name}")

Expand All @@ -114,7 +120,12 @@ def list_resources(label, label_value, target_folder, request_url, request_metho
if script and files_changed:
execute(script)

if request_ignore_initial_event and not should_do_request:
logger.debug(f"Ignoring sending request for initial list for all items")
return

if request_url and files_changed:
logger.debug("Doing request as files changed")
request(request_url, request_method, enable_5xx, request_payload)


Expand Down Expand Up @@ -206,7 +217,7 @@ def _update_file(data_key, data_content, dest_folder, metadata, resource,

def _watch_resource_iterator(label, label_value, target_folder, request_url, request_method, request_payload,
namespace, folder_annotation, resource, unique_filenames, script, enable_5xx,
ignore_already_processed):
ignore_already_processed, request_ignore_initial_event):
v1 = client.CoreV1Api()
# Filter resources based on label and value or just label
label_selector = f"{label}={label_value}" if label_value else label
Expand All @@ -223,22 +234,28 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req

# Process events
for event in stream:
should_do_request = False
item = event['object']
metadata = item.metadata
event_type = event['type']

# Ignore already processed resource
# Avoid numerous logs about useless resource processing each time the WATCH loop reconnects
if ignore_already_processed:
if _resources_version_map.get(metadata.namespace + metadata.name) == metadata.resource_version:
resource_version_map_key = metadata.namespace + metadata.name
if _resources_version_map.get(resource_version_map_key) == metadata.resource_version:
if event_type == "ADDED" or event_type == "MODIFIED":
logger.debug(f"Ignoring {event_type} {resource} {metadata.namespace}/{metadata.name}")
logger.debug(f"Ignoring already processed resource version for {event_type} {resource} {metadata.namespace}/{metadata.name}")
continue
elif event_type == "DELETED":
_resources_version_map.pop(metadata.namespace + metadata.name)
_resources_version_map.pop(resource_version_map_key)

if resource_version_map_key in _resources_version_map:
logger.debug(f"Item is already in the resource version map: {resource} {metadata.namespace}/{metadata.name}")
should_do_request = True

if event_type == "ADDED" or event_type == "MODIFIED":
_resources_version_map[metadata.namespace + metadata.name] = metadata.resource_version
_resources_version_map[resource_version_map_key] = metadata.resource_version

logger.debug(f"Working on {event_type} {resource} {metadata.namespace}/{metadata.name}")

Expand All @@ -257,7 +274,12 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req
if script and files_changed:
execute(script)

if request_ignore_initial_event and not should_do_request:
logger.debug(f"Ignoring sending request for initial event for all items")
return

if request_url and files_changed:
logger.debug("Doing request as files changed")
request(request_url, request_method, enable_5xx, request_payload)


Expand Down Expand Up @@ -287,11 +309,11 @@ def _watch_resource_loop(mode, *args):

def watch_for_changes(mode, label, label_value, target_folder, request_url, request_method, request_payload,
current_namespace, folder_annotation, resources, unique_filenames, script, enable_5xx,
ignore_already_processed):
ignore_already_processed, request_ignore_initial_event):
processes = _start_watcher_processes(current_namespace, folder_annotation, label,
label_value, request_method, mode, request_payload, resources,
target_folder, unique_filenames, script, request_url, enable_5xx,
ignore_already_processed)
ignore_already_processed, request_ignore_initial_event)

while True:
died = False
Expand All @@ -311,14 +333,14 @@ def watch_for_changes(mode, label, label_value, target_folder, request_url, requ

def _start_watcher_processes(namespace, folder_annotation, label, label_value, request_method,
mode, request_payload, resources, target_folder, unique_filenames, script, request_url,
enable_5xx, ignore_already_processed):
enable_5xx, ignore_already_processed, request_ignore_initial_event):
processes = []
for resource in resources:
for ns in namespace.split(','):
proc = Process(target=_watch_resource_loop,
args=(mode, label, label_value, target_folder, request_url, request_method, request_payload,
ns, folder_annotation, resource, unique_filenames, script, enable_5xx,
ignore_already_processed)
ignore_already_processed, request_ignore_initial_event)
)
proc.daemon = True
proc.start()
Expand Down
8 changes: 7 additions & 1 deletion src/sidecar.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
REQ_PAYLOAD = "REQ_PAYLOAD"
REQ_URL = "REQ_URL"
REQ_METHOD = "REQ_METHOD"
REQ_IGNORE_INITIAL_EVENT = "REQ_IGNORE_INITIAL_EVENT"
SCRIPT = "SCRIPT"
ENABLE_5XX = "ENABLE_5XX"
IGNORE_ALREADY_PROCESSED = "IGNORE_ALREADY_PROCESSED"
Expand Down Expand Up @@ -103,6 +104,11 @@ def main():
if not ignore_already_processed:
logger.debug("Ignore already processed resource version will not be enabled.")

request_ignore_initial_event = os.getenv(REQ_IGNORE_INITIAL_EVENT) and ignore_already_processed

if request_ignore_initial_event:
logger.debug("Requests for initial list or first event for every resource will be skipped.")

with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f:
namespace = os.getenv("NAMESPACE", f.read())

Expand All @@ -116,7 +122,7 @@ def main():
else:
watch_for_changes(method, label, label_value, target_folder, request_url, request_method, request_payload,
namespace, folder_annotation, resources, unique_filenames, script, enable_5xx,
ignore_already_processed)
ignore_already_processed, request_ignore_initial_event)


def _initialize_kubeclient_configuration():
Expand Down