Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite pipeline selection to serially build cluster #266

Merged
merged 4 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 80 additions & 182 deletions flux_local/git_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import contextlib
from dataclasses import dataclass, field
import logging
import networkx
import os
import tempfile
from collections.abc import Callable, Awaitable, Iterable
Expand All @@ -47,7 +46,7 @@
from .exceptions import FluxException
from .manifest import (
CRD_KIND,
CLUSTER_KUSTOMIZE_DOMAIN,
FLUXTOMIZE_DOMAIN,
KUSTOMIZE_DOMAIN,
Cluster,
ClusterPolicy,
Expand Down Expand Up @@ -76,7 +75,10 @@
GIT_REPO_KIND = "GitRepository"
OCI_REPO_KIND = "OCIRepository"
DEFAULT_NAMESPACE = "flux-system"
ROOT_KUSTOMIZATION_NAME = "flux-system"
DEFAULT_NAME = "flux-system"
GREP_SOURCE_REF_KIND = f"spec.sourceRef.kind={GIT_REPO_KIND}|{OCI_REPO_KIND}"
ERROR_DETAIL_BAD_PATH = "Try specifying another path within the git repo?"
ERROR_DETAIL_BAD_KS = "Is a Kustomization pointing to a path that does not exist?"


@dataclass
Expand Down Expand Up @@ -161,7 +163,7 @@ def func(doc: dict[str, Any]) -> bool:
return func


CLUSTER_KUSTOMIZE_DOMAIN_FILTER = domain_filter(CLUSTER_KUSTOMIZE_DOMAIN)
FLUXTOMIZE_DOMAIN_FILTER = domain_filter(FLUXTOMIZE_DOMAIN)
KUSTOMIZE_DOMAIN_FILTER = domain_filter(KUSTOMIZE_DOMAIN)


Expand Down Expand Up @@ -271,7 +273,7 @@ def predicate(

def cluster_metadata_selector() -> MetadataSelector:
"""Create a new MetadataSelector for Kustomizations."""
return MetadataSelector(namespace=DEFAULT_NAMESPACE)
return MetadataSelector(name=DEFAULT_NAME, namespace=DEFAULT_NAMESPACE)


def ks_metadata_selector() -> MetadataSelector:
Expand Down Expand Up @@ -314,92 +316,74 @@ class ResourceSelector:
"""ClusterPolicy objects to return."""


async def get_flux_kustomizations(
root: Path, relative_path: Path
async def get_fluxtomizations(
root: Path, relative_path: Path, build: bool
) -> list[Kustomization]:
"""Find all flux Kustomizations in the specified path.

This may be called repeatedly with different paths to repeatedly collect
Kustomizations from the repo. Assumes that any flux Kustomization
for a GitRepository is pointed at this cluster, following normal conventions.
"""
cmd = kustomize.grep(f"kind={CLUSTER_KUSTOMIZE_KIND}", root / relative_path).grep(
f"spec.sourceRef.kind={GIT_REPO_KIND}|{OCI_REPO_KIND}"
)
cmd: kustomize.Kustomize
if build:
cmd = (
kustomize.build(root / relative_path)
.grep(f"kind={CLUSTER_KUSTOMIZE_KIND}")
.grep(GREP_SOURCE_REF_KIND)
)
else:
cmd = kustomize.grep(
f"kind={CLUSTER_KUSTOMIZE_KIND}", root / relative_path
).grep(GREP_SOURCE_REF_KIND)
docs = await cmd.objects()
return [
Kustomization.parse_doc(doc)
for doc in filter(CLUSTER_KUSTOMIZE_DOMAIN_FILTER, docs)
Kustomization.parse_doc(doc) for doc in filter(FLUXTOMIZE_DOMAIN_FILTER, docs)
]


def find_source_kustomization(
search: Path, node_map: dict[Path, Kustomization]
async def kustomization_traversal(
root_path_selector: PathSelector, path_selector: PathSelector, build: bool
) -> list[Kustomization]:
"""Return all source Kustomizations that might manage the specified path."""
results = []
set(node_map)
for parent in search.parents:
if node := node_map.get(parent):
results.append(node)
return results


def find_path_parent(search: Path, prefixes: set[Path]) -> Path | None:
"""Return a prefix path that is a parent of the search path."""
for parent in search.parents:
if parent in prefixes:
return parent
return None


async def kustomization_traversal(path_selector: PathSelector) -> list[Kustomization]:
"""Search for kustomizations in the specified path."""

kustomizations: list[Kustomization] = []
visited: set[Path] = set() # Relative paths within the cluster
nodes: set[str] = set()

path_queue: queue.Queue[Path] = queue.Queue()
path_queue.put(path_selector.relative_path)
root = path_selector.root
while not path_queue.empty():
path = path_queue.get()
_LOGGER.debug("Visiting path (%s) %s", root, path)
_LOGGER.debug("Visiting path (%s) %s", root_path_selector.path, path)
try:
docs = await get_flux_kustomizations(root, path)
docs = await get_fluxtomizations(root_path_selector.root, path, build=build)
except FluxException as err:
if visited:
raise FluxException(
f"Error building Fluxtomization in '{root}' path '{path}': {err}"
f"Is a Kustomization pointing to a path that does not exist?"
)
detail = ERROR_DETAIL_BAD_KS if visited else ERROR_DETAIL_BAD_PATH
raise FluxException(
f"Error building Fluxtomization in '{root}' path '{path}': {err}"
f"Try specifying another path within the git repo?"
)

# Source path is relative to the search path. Update to have the
# full prefix relative to the root.
for kustomization in docs:
if not kustomization.path:
_LOGGER.debug("Assigning implicit path %s", path_selector.relative_path)
kustomization.path = str(path_selector.relative_path)
if not kustomization.source_path:
continue
kustomization.source_path = str(
((root / path) / kustomization.source_path).relative_to(root)
)
_LOGGER.debug(
"Updated relative path: %s => %s",
node_name(kustomization),
kustomization.source_path,
f"Error building Fluxtomization in '{root_path_selector.root}' "
f"path '{path}': {err} - {detail}"
)

visited |= set({path})

_LOGGER.debug("Found %s Kustomizations", len(docs))
result_docs = []
for doc in docs:
if doc.namespaced_name in nodes:
_LOGGER.debug(
"Ignoring duplicate Kustomization %s", doc.namespaced_name
)
continue
nodes.add(doc.namespaced_name)
# Source path is relative to the search path. Update to have the
# full prefix relative to the root.
if not doc.path:
_LOGGER.debug(
"Assigning implicit path %s", root_path_selector.relative_path
)
doc.path = str(root_path_selector.relative_path)

found_path: Path | None = None
_LOGGER.debug(
"Kustomization '%s' has sourceRef.kind '%s' of '%s'",
Expand Down Expand Up @@ -428,7 +412,7 @@ async def kustomization_traversal(path_selector: PathSelector) -> list[Kustomiza
_LOGGER.debug("Skipping kustomization %s; not known source", doc.name)
continue

if not find_path_parent(found_path, visited) and found_path not in visited:
if found_path not in visited:
path_queue.put(found_path)
else:
_LOGGER.debug("Already visited %s", found_path)
Expand All @@ -446,125 +430,54 @@ def node_name(ks: Kustomization) -> str:
return f"{ks.namespaced_name} @ {ks.id_name}"


def make_clusters(
kustomizations: list[Kustomization], sources: list[Source] | None = None
) -> list[Cluster]:
"""Convert the flat list of Kustomizations into a Cluster.

This will reverse engineer which Kustomizations are root nodes for the cluster
based on the parent paths. Root Kustomizations are made the cluster and everything
else is made a child.
"""
if not sources:
sources = []

# Build a directed graph from a kustomization path to the path
# of the kustomization that created it.
graph = networkx.DiGraph()
node_path_map = {Path(ks.path): ks for ks in kustomizations if ks.source_path}
for ks in kustomizations:
if not ks.source_path:
raise InputException(
"Kustomization did not have source path; Old kustomize?"
)

graph.add_node(node_name(ks), ks=ks)
if ks.name == ROOT_KUSTOMIZATION_NAME and ks.namespace == DEFAULT_NAMESPACE:
# Do not attempt parent search below
continue

# Find the parent Kustomization that produced this based on the
# matching the kustomize source parent paths with a Kustomization
# target path.
source = Path(ks.source_path)
_LOGGER.debug("--- Examining candidate Kustomization ---")
_LOGGER.debug("Ks : %s", ks.namespaced_name)
_LOGGER.debug("Path : %s", Path(ks.path))
_LOGGER.debug("Source path: %s", source)
source_kustomizations = find_source_kustomization(source, node_path_map)
_LOGGER.debug(
"Possible sources: %s", [f"{node_name(ks)}" for ks in source_kustomizations]
)
if source_kustomizations:
while source_kustomizations:
candidate = source_kustomizations.pop(0)
# These names can be compared since they are within the scope of
# the source path so within the same cluster.
if candidate.namespaced_name != ks.namespaced_name:
_LOGGER.debug(
"Found parent %s => %s",
ks.namespaced_name,
candidate.namespaced_name,
)
if graph.has_edge(node_name(ks), node_name(candidate)):
_LOGGER.debug("Already has opposite edge; Skipping cycle")
else:
graph.add_edge(node_name(candidate), node_name(ks))
else:
_LOGGER.debug(
"Skipping candidate source %s", candidate.namespaced_name
)
else:
_LOGGER.debug("No parent for %s (source=%s)", node_name(ks), source)

# Clusters are subgraphs within the graph that are connected, with the root
# node being the cluster itself. All children Kustomizations are flattended.
_LOGGER.debug("Creating clusters based on connectivity")
for node, degree in graph.in_degree():
_LOGGER.debug("Node: %s, degree: %s", node, degree)
roots = [node for node, degree in graph.in_degree() if degree == 0]
roots.sort()

clusters: list[Cluster] = []
_LOGGER.debug("roots=%s", roots)
for root in roots:
root_ks = graph.nodes[root]["ks"]
nodes = [root] + list(networkx.descendants(graph, root))
nodes.sort()
kustomizations = [graph.nodes[node]["ks"] for node in nodes]
clusters.append(
Cluster(
name=root_ks.name,
namespace=root_ks.namespace,
path=root_ks.path,
kustomizations=kustomizations,
)
)
_LOGGER.debug(
"Created cluster %s with %s kustomizations", root_ks.name, len(nodes)
)

return clusters


async def get_clusters(
path_selector: PathSelector,
cluster_selector: MetadataSelector,
kustomization_selector: MetadataSelector,
) -> list[Cluster]:
"""Load Cluster objects from the specified path."""

kustomizations = await kustomization_traversal(path_selector)
clusters = list(
filter(
cluster_selector.predicate,
make_clusters(kustomizations, path_selector.sources or []),
try:
roots = await get_fluxtomizations(
path_selector.root, path_selector.relative_path, build=False
)
)
except FluxException as err:
raise FluxException(
f"Error building Fluxtomization in '{path_selector.root}' path "
f"'{path_selector.relative_path}': {err}"
f"Try specifying another path within the git repo?"
)
_LOGGER.debug("roots=%s", roots)
clusters = [
Cluster(name=ks.name, namespace=ks.namespace or "", path=ks.path)
for ks in roots
if cluster_selector.predicate(ks)
]
build = True
if not clusters:
# There are no flux-system Kustomizations within this path. Fall back to
# assuming everything in the current directory is part of a cluster.
_LOGGER.debug(
"No clusters found; Processing as a Kustomization: %s",
path_selector.relative_path,
)
clusters = [
Cluster(name="cluster", namespace="", path=str(path_selector.relative_path))
]
build = False

for cluster in clusters:
cluster.kustomizations = list(
filter(kustomization_selector.predicate, cluster.kustomizations)
_LOGGER.debug("Building cluster %s %s", cluster.name, cluster.path)
results = await kustomization_traversal(
path_selector,
PathSelector(path=Path(cluster.path), sources=path_selector.sources),
build=build,
)
results.sort(key=lambda x: (x.namespace, x.name))
cluster.kustomizations = list(filter(kustomization_selector.predicate, results))
clusters.sort(key=lambda x: (x.path, x.namespace, x.name))
return clusters


async def get_kustomizations(path: Path) -> list[dict[str, Any]]:
"""Load Kustomization objects from the specified path."""
cmd = kustomize.grep(f"kind={KUSTOMIZE_KIND}", path)
docs = await cmd.objects()
return list(filter(KUSTOMIZE_DOMAIN_FILTER, docs))


async def build_kustomization(
kustomization: Kustomization,
cluster_path: Path,
Expand Down Expand Up @@ -669,21 +582,6 @@ async def build_manifest(
clusters = await get_clusters(
selector.path, selector.cluster, selector.kustomization
)
if not clusters and selector.path.path:
_LOGGER.debug("No clusters found; Processing as a Kustomization: %s", selector)
# The argument path may be a Kustomization inside a cluster. Create a synthetic
# cluster with any found Kustomizations
cluster = Cluster(
name="cluster", namespace="", path=str(selector.path.relative_path)
)
objects = await get_kustomizations(selector.path.path)
if objects:
cluster.kustomizations = [
Kustomization(
name="kustomization", path=str(selector.path.relative_path)
)
]
clusters.append(cluster)

async def update_kustomization(cluster: Cluster) -> None:
build_tasks = []
Expand Down
Loading