Skip to content

Commit

Permalink
Introduce ability to ignore requests on initial list and first event
Browse files Browse the repository at this point in the history
  • Loading branch information
pkosiec committed Sep 27, 2022
1 parent bdedf16 commit 7ac1ac8
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 7 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ If the filename ends with `.url` suffix, the content will be processed as a URL
| `REQ_RETRY_CONNECT` | How many connection-related errors to retry on for any http request (`*.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `10` | integer |
| `REQ_RETRY_READ` | How many times to retry on read errors for any http request (`.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `5` | integer |
| `REQ_RETRY_BACKOFF_FACTOR` | A backoff factor to apply between attempts after the second try for any http request (`.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `1.1` | float |
| `REQ_IGNORE_INITIAL_EVENT` | Set to `true` to ignore requests for first events or initial list for configmaps and secrets events. Applicable only when `IGNORE_ALREADY_PROCESSED` is enabled. | false | `false` | boolean |
| `REQ_TIMEOUT` | How many seconds to wait for the server to send data before giving up for `.url` triggered requests or requests to `REQ_URI` (does not apply to k8s api requests) | false | `10` | float |
| `REQ_USERNAME` | Username to use for basic authentication for requests to `REQ_URL` and for `*.url` triggered requests | false | - | string |
| `REQ_PASSWORD` | Password to use for basic authentication for requests to `REQ_URL` and for `*.url` triggered requests | false | - | string |
Expand Down
28 changes: 22 additions & 6 deletions src/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _get_destination_folder(metadata, default_folder, folder_annotation):

def list_resources(label, label_value, target_folder, request_url, request_method, request_payload,
namespace, folder_annotation, resource, unique_filenames, script, enable_5xx,
ignore_already_processed):
ignore_already_processed, request_ignore_initial_event):
v1 = client.CoreV1Api()
# Filter resources based on label and value or just label
label_selector = f"{label}={label_value}" if label_value else label
Expand All @@ -87,6 +87,7 @@ def list_resources(label, label_value, target_folder, request_url, request_metho
ret = getattr(v1, _list_namespace[namespace][resource])(**additional_args)

files_changed = False
ignore_request = False

# For all the found resources
for item in ret.items:
Expand All @@ -99,6 +100,8 @@ def list_resources(label, label_value, target_folder, request_url, request_metho
logger.debug(f"Ignoring {resource} {metadata.namespace}/{metadata.name}")
continue

logger.debug(f"Initial list for {resource} {metadata.namespace}/{metadata.name}")
ignore_request = True
_resources_version_map[metadata.namespace + metadata.name] = metadata.resource_version

logger.debug(f"Working on {resource}: {metadata.namespace}/{metadata.name}")
Expand All @@ -114,6 +117,10 @@ def list_resources(label, label_value, target_folder, request_url, request_metho
if script and files_changed:
execute(script)

if request_ignore_initial_event and ignore_request:
logger.debug(f"Ignoring sending request for initial list {resource} {metadata.namespace}/{metadata.name}")
return

if request_url and files_changed:
request(request_url, request_method, enable_5xx, request_payload)

Expand Down Expand Up @@ -206,7 +213,7 @@ def _update_file(data_key, data_content, dest_folder, metadata, resource,

def _watch_resource_iterator(label, label_value, target_folder, request_url, request_method, request_payload,
namespace, folder_annotation, resource, unique_filenames, script, enable_5xx,
ignore_already_processed):
ignore_already_processed, request_ignore_initial_event):
v1 = client.CoreV1Api()
# Filter resources based on label and value or just label
label_selector = f"{label}={label_value}" if label_value else label
Expand All @@ -219,6 +226,8 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req
if namespace != "ALL":
additional_args['namespace'] = namespace

ignore_request = False

stream = watch.Watch().stream(getattr(v1, _list_namespace[namespace][resource]), **additional_args)

# Process events
Expand All @@ -238,6 +247,9 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req
_resources_version_map.pop(metadata.namespace + metadata.name)

if event_type == "ADDED" or event_type == "MODIFIED":
if request_ignore_initial_event and _resources_version_map.get(metadata.namespace + metadata.name) is None:
logger.debug(f"Initial event for {event_type} {resource} {metadata.namespace}/{metadata.name}")
ignore_request = True
_resources_version_map[metadata.namespace + metadata.name] = metadata.resource_version

logger.debug(f"Working on {event_type} {resource} {metadata.namespace}/{metadata.name}")
Expand All @@ -257,6 +269,10 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req
if script and files_changed:
execute(script)

if request_ignore_initial_event and ignore_request:
logger.debug(f"Ignoring sending request for initial {event_type} {resource} {metadata.namespace}/{metadata.name}")
return

if request_url and files_changed:
request(request_url, request_method, enable_5xx, request_payload)

Expand Down Expand Up @@ -287,11 +303,11 @@ def _watch_resource_loop(mode, *args):

def watch_for_changes(mode, label, label_value, target_folder, request_url, request_method, request_payload,
current_namespace, folder_annotation, resources, unique_filenames, script, enable_5xx,
ignore_already_processed):
ignore_already_processed, request_ignore_initial_event):
processes = _start_watcher_processes(current_namespace, folder_annotation, label,
label_value, request_method, mode, request_payload, resources,
target_folder, unique_filenames, script, request_url, enable_5xx,
ignore_already_processed)
ignore_already_processed, request_ignore_initial_event)

while True:
died = False
Expand All @@ -311,14 +327,14 @@ def watch_for_changes(mode, label, label_value, target_folder, request_url, requ

def _start_watcher_processes(namespace, folder_annotation, label, label_value, request_method,
mode, request_payload, resources, target_folder, unique_filenames, script, request_url,
enable_5xx, ignore_already_processed):
enable_5xx, ignore_already_processed, request_ignore_initial_event):
processes = []
for resource in resources:
for ns in namespace.split(','):
proc = Process(target=_watch_resource_loop,
args=(mode, label, label_value, target_folder, request_url, request_method, request_payload,
ns, folder_annotation, resource, unique_filenames, script, enable_5xx,
ignore_already_processed)
ignore_already_processed, request_ignore_initial_event)
)
proc.daemon = True
proc.start()
Expand Down
8 changes: 7 additions & 1 deletion src/sidecar.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
REQ_PAYLOAD = "REQ_PAYLOAD"
REQ_URL = "REQ_URL"
REQ_METHOD = "REQ_METHOD"
REQ_IGNORE_INITIAL_EVENT = "REQ_IGNORE_INITIAL_EVENT"
SCRIPT = "SCRIPT"
ENABLE_5XX = "ENABLE_5XX"
IGNORE_ALREADY_PROCESSED = "IGNORE_ALREADY_PROCESSED"
Expand Down Expand Up @@ -103,6 +104,11 @@ def main():
if not ignore_already_processed:
logger.debug("Ignore already processed resource version will not be enabled.")

request_ignore_initial_event = os.getenv(REQ_IGNORE_INITIAL_EVENT) and ignore_already_processed

if request_ignore_initial_event:
logger.debug("Initial list or first event for a given resource will skip requests to a given URL.")

with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f:
namespace = os.getenv("NAMESPACE", f.read())

Expand All @@ -116,7 +122,7 @@ def main():
else:
watch_for_changes(method, label, label_value, target_folder, request_url, request_method, request_payload,
namespace, folder_annotation, resources, unique_filenames, script, enable_5xx,
ignore_already_processed)
ignore_already_processed, request_ignore_initial_event)


def _initialize_kubeclient_configuration():
Expand Down

0 comments on commit 7ac1ac8

Please sign in to comment.