Skip to content

Commit

Permalink
feat(controller): Add ingress processing
Browse files Browse the repository at this point in the history
  • Loading branch information
SQuent committed Sep 14, 2024
1 parent 7eb0dc9 commit 30b58bc
Show file tree
Hide file tree
Showing 15 changed files with 534 additions and 151 deletions.
22 changes: 22 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
repos:
- repo: local
hooks:
- id: poetry-check
name: poetry-check
description: run poetry check to validate config
entry: poetry check
language: python
pass_filenames: false
files: ^(.*/)?(poetry\.lock|pyproject\.toml)$
- id: poetry-flake8
name: poetry-flake8
description: run linter
entry: poetry run flake8
pass_filenames: false
language: python
- id: poetry-pytest
name: poetry-pytest
description: run pytest
entry: poetry run pytest
language: python
pass_filenames: false
22 changes: 17 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

## Overview

Kuma ingress watcher is a kubernetes controller designed to automatically monitor Traefik Ingress routes in a Kubernetes cluster and create corresponding monitors in Uptime Kuma. It provides seamless integration between Kubernetes ingress resources and Uptime Kuma monitoring, allowing for easy and efficient monitoring of web services deployed on Kubernetes.
Kuma ingress watcher is a kubernetes controller designed to automatically monitor `Kubernetes Ingress` and `Traefik Ingressroutes` in a Kubernetes cluster and create corresponding monitors in `Uptime Kuma`. It provides seamless integration between Kubernetes ingress resources and Uptime Kuma monitoring, allowing for easy and efficient monitoring of web services deployed on Kubernetes.

## Features

- Automatically creates, updates and deletes monitors in Uptime Kuma for Kubernetes Ingress routes.
- Automatically creates, updates and deletes monitors in Uptime Kuma for `Kubernetes Ingress` and `Traefik Ingressroutes`.
- Supports both single and multiple routes per Ingress resource.
- Customizable monitors by annotate ingressroutes.
- Customizable monitors by annotate Ingressroutes and Ingress.

## Installation

Expand All @@ -27,10 +27,13 @@ Before running the controller, make sure to configure the following environment
- `UPTIME_KUMA_URL`: The URL of your Uptime Kuma instance.
- `UPTIME_KUMA_USER`: The username for authenticating with Uptime Kuma.
- `UPTIME_KUMA_PASSWORD`: The password for authenticating with Uptime Kuma.
- `WATCH_INGRESSROUTES`: Set to `True` to enable monitoring of Traefik IngressRoutes.
- `WATCH_INGRESS`: Set to `True` to enable monitoring of Kubernetes Ingress resources.
- `WATCH_INTERVAL`: Interval in seconds between each check for changes in Ingress or IngressRoutes (default is `10` seconds).

### Annotations for Uptime Kuma Autodiscovery

The following annotations can be used to configure automatic discovery of monitors by Uptime Kuma from Kubernetes Ingress Resources:
These annotations apply to both Kubernetes Ingress and Traefik Ingressroutes resources, allowing you to customize the behavior of Uptime Kuma monitors for each.:

1. **`uptime-kuma.autodiscovery.probe.interval`**
- Sets the probing interval in seconds for the monitor.
Expand Down Expand Up @@ -109,7 +112,16 @@ Currently, the addition of tags to monitors is not supported due to limitations

### Custom Watcher for IngressRoutes

The Kubernetes event watcher (`watch`) does not provide specific details on creation, modification, or deletion events for IngressRoutes. To overcome this limitation, this controller implements a custom watcher mechanism that continuously monitors IngressRoutes and triggers appropriate actions based on changes detected. This custom solution ensures accurate monitoring and synchronization with Uptime Kuma configurations.
The Kubernetes event watcher (`watch`) does not provide specific details on creation, modification, or deletion events for IngressRoutes. To overcome this limitation, this controller implements a custom watcher mechanism that continuously monitors IngressRoutes and triggers appropriate actions based on changes detected. homemade watcher is used for Ingress objects too. This custom solution ensures accurate monitoring and synchronization with Uptime Kuma configurations.

Here’s the passage with the new **Improvements** section included:

---

## Improvements

- **IngressRoute Version Selection**: You can now choose which version of IngressRoutes to watch. This allows you to customize the controller's behavior based on the version of Traefik you're using.
- **Tag Addition for Monitors**: Currently, the addition of tags to monitors is not supported due to limitations in the Uptime Kuma API. Future updates may include support for this feature, allowing you to tag monitors directly through the controller.


## Contributing
Expand Down
167 changes: 110 additions & 57 deletions kuma_ingress_watcher/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@
from kubernetes import client, config


def str_to_bool(value):
return str(value).lower() in ['true', '1', 't', 'y', 'yes']


# Configuration
UPTIME_KUMA_URL = os.getenv('UPTIME_KUMA_URL')
UPTIME_KUMA_USER = os.getenv('UPTIME_KUMA_USER')
UPTIME_KUMA_PASSWORD = os.getenv('UPTIME_KUMA_PASSWORD')
WATCH_INTERVAL = int(os.getenv('WATCH_INTERVAL', '10') or 10)
WATCH_INGRESSROUTES = str_to_bool(os.getenv('WATCH_INGRESSROUTES', 'True'))
WATCH_INGRESS = str_to_bool(os.getenv('WATCH_INGRESS', 'False'))
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper()

LOG_LEVELS = {
Expand All @@ -23,14 +30,15 @@

# Logging configuration
logging.basicConfig(
level=LOG_LEVELS.get(LOG_LEVEL, logging.DEBUG),
level=LOG_LEVELS.get(LOG_LEVEL, 'INFO'),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Global variables for Kubernetes and Uptime Kuma
kuma = None
api_instance = None
custom_api_instance = None
networking_api_instance = None


def check_config():
Expand Down Expand Up @@ -84,19 +92,45 @@ def delete_monitor(name):
logger.error(f"Failed to delete monitor {name}: {e}")


def extract_hosts(match):
def extract_hosts_from_match(match):
host_pattern = re.compile(r'Host\(`([^`]*)`\)')
return host_pattern.findall(match)


def process_ingressroutes(item):
def extract_hosts_from_ingress_rule(rule):
hosts = []
if 'host' in rule:
hosts.append(rule['host'])
return hosts


def extract_hosts(route_or_rule, type_obj):
if type_obj == 'IngressRoute':
match = route_or_rule.get('match')
return extract_hosts_from_match(match) if match else []
elif type_obj == 'Ingress':
return extract_hosts_from_ingress_rule(route_or_rule)
else:
return []


def get_routes_or_rules(spec, type_obj):
if type_obj == 'IngressRoute':
return spec.get('routes', [])
elif type_obj == 'Ingress':
return spec.get('rules', [])
else:
return []


def process_routing_object(item, type_obj):
metadata = item['metadata']
annotations = metadata.get('annotations', {})

name = metadata['name']
namespace = metadata['namespace']
spec = item['spec']
routes = spec['routes']
routes_or_rules = get_routes_or_rules(spec, type_obj)
interval = int(annotations.get('uptime-kuma.autodiscovery.probe.interval', 60))
monitor_name = annotations.get('uptime-kuma.autodiscovery.probe.name', f"{name}-{namespace}")
enabled = annotations.get('uptime-kuma.autodiscovery.probe.enabled', 'true').lower() == 'true'
Expand All @@ -110,51 +144,44 @@ def process_ingressroutes(item):
delete_monitor(monitor_name)
return

if len(routes) == 1:
process_single_route(monitor_name, routes[0], interval, probe_type, headers, port, method)
else:
process_multiple_routes(monitor_name, routes, interval, probe_type, headers, port, method)

process_routes(monitor_name, routes_or_rules, interval, probe_type, headers, port, method, type_obj)

def process_single_route(monitor_name, route, interval, probe_type, headers, port, method):
match = route.get('match')
if match:
hosts = extract_hosts(match)
for host in hosts:
url = f"https://{host}"
if port:
url = f"{url}:{port}"
create_or_update_monitor(monitor_name, url, interval, probe_type, headers, method)


def process_multiple_routes(monitor_name, routes, interval, probe_type, headers, port, method):
def process_routes(monitor_name, routes_or_rules, interval, probe_type, headers, port, method, type_obj):
index = 1
for route in routes:
match = route.get('match')
if match:
hosts = extract_hosts(match)
for route_or_rule in routes_or_rules:
hosts = extract_hosts(route_or_rule, type_obj)

if hosts:
for host in hosts:
url = f"https://{host}"
if port:
url = f"{url}:{port}"
monitor_name_with_index = f"{monitor_name}-{index}"
index += 1

monitor_name_with_index = f"{monitor_name}-{index}" if len(routes_or_rules) > 1 else monitor_name

create_or_update_monitor(monitor_name_with_index, url, interval, probe_type, headers, method)
index += 1


def init_kubernetes_client():
try:
config.load_incluster_config()
global api_instance
api_instance = client.CustomObjectsApi()
if WATCH_INGRESS:
global networking_api_instance
networking_api_instance = client.NetworkingV1Api()

if WATCH_INGRESSROUTES:
global custom_api_instance
custom_api_instance = client.CustomObjectsApi()
except Exception as e:
logger.error(f"Failed to initialize Kubernetes client: {e}")
sys.exit(1)


def get_ingressroutes(api_inst):
def get_ingressroutes(custom_api_instance):
try:
return api_inst.list_cluster_custom_object(
return custom_api_instance.list_cluster_custom_object(
group="traefik.containo.us",
version="v1alpha1",
plural="ingressroutes"
Expand All @@ -164,46 +191,72 @@ def get_ingressroutes(api_inst):
return {'items': []}


def get_ingress(networking_api_instance):
try:
ingress_list = networking_api_instance.list_ingress_for_all_namespaces()
ingress_dict_list = [ingress.to_dict() for ingress in ingress_list.items]
return {'items': ingress_dict_list}
except Exception as e:
logger.error(f"Failed to get Ingress: {e}")
return {'items': []}


def handle_changes(previous_items, current_items, resource_type):
current_names = set(current_items.keys())
previous_names = set(previous_items.keys())

added = current_names - previous_names
for name in added:
logger.info(f"{resource_type} {name} added.")
process_routing_object(current_items[name], resource_type)

deleted = previous_names - current_names
for name in deleted:
namespace = previous_items[name]['metadata']['namespace']
monitor_name = f"{name}-{namespace}"
logger.info(f"{resource_type} {name} deleted.")
delete_monitor(monitor_name)

modified = current_names & previous_names
for name in modified:
if ingressroute_changed(previous_items[name], current_items[name]):
logger.info(f"{resource_type} {name} modified.")
process_routing_object(current_items[name], resource_type)

return current_items


def ingressroute_changed(old, new):
return old != new


def watch_ingressroutes(interval=10):
previous_ingressroutes = {}
def watch_ingress_resources():
if WATCH_INGRESSROUTES:
logger.info("Start watching Traefik Ingress Routes")
previous_ingressroutes = {}
if WATCH_INGRESS:
logger.info("Start watching Kubernetes Ingress Object")
previous_ingress = {}

while True:
current_ingressroutes = get_ingressroutes(api_instance)
current_items = {item['metadata']['name']: item for item in current_ingressroutes['items']}
current_names = set(current_items.keys())
previous_names = set(previous_ingressroutes.keys())

added = current_names - previous_names
for name in added:
logger.info(f"IngressRoute {name} added.")
process_ingressroutes(current_items[name])

deleted = previous_names - current_names
for name in deleted:
namespace = previous_ingressroutes[name]['metadata']['namespace']
monitor_name = f"{name}-{namespace}"
logger.info(f"IngressRoute {name} deleted.")
delete_monitor(monitor_name)
if WATCH_INGRESSROUTES:
current_ingressroutes = get_ingressroutes(custom_api_instance)
current_items = {item['metadata']['name']: item for item in current_ingressroutes['items']}
previous_ingressroutes = handle_changes(previous_ingressroutes, current_items, "IngressRoute")

modified = current_names & previous_names
for name in modified:
if ingressroute_changed(previous_ingressroutes[name], current_items[name]):
logger.info(f"IngressRoute {name} modified.")
process_ingressroutes(current_items[name])
if WATCH_INGRESS:
current_ingress = get_ingress(networking_api_instance)
current_items = {item['metadata']['name']: item for item in current_ingress['items']}
previous_ingress = handle_changes(previous_ingress, current_items, "Ingress")

previous_ingressroutes = current_items
time.sleep(interval)
time.sleep(WATCH_INTERVAL)


def main():
check_config()
init_kuma_api()
init_kubernetes_client()
watch_ingressroutes()
watch_ingress_resources()


if __name__ == "__main__":
Expand Down
20 changes: 10 additions & 10 deletions tests/test_extract_hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@


class TestExtractHosts(unittest.TestCase):
def test_extract_hosts_single(self):
match = 'Host(`example.com`)'
hosts = extract_hosts(match)
self.assertEqual(hosts, ['example.com'])

def test_extract_hosts_multiple(self):
match = 'Host(`example.com`) && Host(`example.org`)'
hosts = extract_hosts(match)
def test_extract_hosts_ingressroute(self):
route_or_rule = {'match': 'Host(`example.com`) && Host(`example.org`)'}
hosts = extract_hosts(route_or_rule, 'IngressRoute')
self.assertEqual(hosts, ['example.com', 'example.org'])

def test_extract_hosts_ingress(self):
route_or_rule = {'host': 'example.com'}
hosts = extract_hosts(route_or_rule, 'Ingress')
self.assertEqual(hosts, ['example.com'])

def test_extract_hosts_none(self):
match = 'Path(`/test`)'
hosts = extract_hosts(match)
route_or_rule = {'path': '/test'}
hosts = extract_hosts(route_or_rule, 'IngressRoute')
self.assertEqual(hosts, [])


Expand Down
18 changes: 18 additions & 0 deletions tests/test_extract_hosts_from_ingress_rule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import unittest
from kuma_ingress_watcher.controller import extract_hosts_from_ingress_rule


class TestExtractHostsFromIngressRule(unittest.TestCase):
def test_extract_hosts_single(self):
rule = {'host': 'example.com'}
hosts = extract_hosts_from_ingress_rule(rule)
self.assertEqual(hosts, ['example.com'])

def test_extract_hosts_none(self):
rule = {'path': '/test'}
hosts = extract_hosts_from_ingress_rule(rule)
self.assertEqual(hosts, [])


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 30b58bc

Please sign in to comment.