Skip to content

Commit

Permalink
Rewrite pipeline selection to serially build cluster
Browse files Browse the repository at this point in the history
Update logic for handling flux-system
Revert test changes
Add valid repositories for cluster5
Include flux-system in kustomization output
  • Loading branch information
allenporter committed Jul 7, 2023
1 parent ad47093 commit 3cf8607
Show file tree
Hide file tree
Showing 26 changed files with 283 additions and 219 deletions.
246 changes: 83 additions & 163 deletions flux_local/git_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import contextlib
from dataclasses import dataclass, field
import logging
import networkx
import os
import tempfile
from collections.abc import Callable, Awaitable, Iterable
Expand All @@ -47,7 +46,7 @@
from .exceptions import FluxException
from .manifest import (
CRD_KIND,
CLUSTER_KUSTOMIZE_DOMAIN,
FLUXTOMIZE_DOMAIN,
KUSTOMIZE_DOMAIN,
Cluster,
ClusterPolicy,
Expand Down Expand Up @@ -76,7 +75,10 @@
GIT_REPO_KIND = "GitRepository"
OCI_REPO_KIND = "OCIRepository"
DEFAULT_NAMESPACE = "flux-system"
ROOT_KUSTOMIZATION_NAME = "flux-system"
DEFAULT_NAME = "flux-system"
GREP_SOURCE_REF_KIND = f"spec.sourceRef.kind={GIT_REPO_KIND}|{OCI_REPO_KIND}"
ERROR_DETAIL_BAD_PATH = "Try specifying another path within the git repo?"
ERROR_DETAIL_BAD_KS = "Is a Kustomization pointing to a path that does not exist?"


@dataclass
Expand Down Expand Up @@ -161,7 +163,7 @@ def func(doc: dict[str, Any]) -> bool:
return func


CLUSTER_KUSTOMIZE_DOMAIN_FILTER = domain_filter(CLUSTER_KUSTOMIZE_DOMAIN)
FLUXTOMIZE_DOMAIN_FILTER = domain_filter(FLUXTOMIZE_DOMAIN)
KUSTOMIZE_DOMAIN_FILTER = domain_filter(KUSTOMIZE_DOMAIN)


Expand Down Expand Up @@ -271,7 +273,7 @@ def predicate(

def cluster_metadata_selector() -> MetadataSelector:
"""Create a new MetadataSelector for Kustomizations."""
return MetadataSelector(namespace=DEFAULT_NAMESPACE)
return MetadataSelector(name=DEFAULT_NAME, namespace=DEFAULT_NAMESPACE)


def ks_metadata_selector() -> MetadataSelector:
Expand Down Expand Up @@ -314,22 +316,31 @@ class ResourceSelector:
"""ClusterPolicy objects to return."""


async def get_flux_kustomizations(
root: Path, relative_path: Path
async def get_fluxtomizations(
root: Path, relative_path: Path, build: bool
) -> list[Kustomization]:
"""Find all flux Kustomizations in the specified path.
This may be called repeatedly with different paths to repeatedly collect
Kustomizations from the repo. Assumes that any flux Kustomization
for a GitRepository is pointed at this cluster, following normal conventions.
"""
cmd = kustomize.grep(f"kind={CLUSTER_KUSTOMIZE_KIND}", root / relative_path).grep(
f"spec.sourceRef.kind={GIT_REPO_KIND}|{OCI_REPO_KIND}"
)
cmd: kustomize.Kustomize
if build:
_LOGGER.debug("kustomize.build")
cmd = (
kustomize.build(root / relative_path)
.grep(f"kind={CLUSTER_KUSTOMIZE_KIND}")
.grep(GREP_SOURCE_REF_KIND)
)
else:
_LOGGER.debug("kustomize.grep")
cmd = kustomize.grep(
f"kind={CLUSTER_KUSTOMIZE_KIND}", root / relative_path
).grep(GREP_SOURCE_REF_KIND)
docs = await cmd.objects()
return [
Kustomization.parse_doc(doc)
for doc in filter(CLUSTER_KUSTOMIZE_DOMAIN_FILTER, docs)
Kustomization.parse_doc(doc) for doc in filter(FLUXTOMIZE_DOMAIN_FILTER, docs)
]


Expand All @@ -353,53 +364,48 @@ def find_path_parent(search: Path, prefixes: set[Path]) -> Path | None:
return None


async def kustomization_traversal(path_selector: PathSelector) -> list[Kustomization]:
async def kustomization_traversal(
root_path_selector: PathSelector, path_selector: PathSelector, build: bool
) -> list[Kustomization]:
"""Search for kustomizations in the specified path."""

kustomizations: list[Kustomization] = []
visited: set[Path] = set() # Relative paths within the cluster
nodes: set[str] = set()

path_queue: queue.Queue[Path] = queue.Queue()
path_queue.put(path_selector.relative_path)
root = path_selector.root
while not path_queue.empty():
path = path_queue.get()
_LOGGER.debug("Visiting path (%s) %s", root, path)
_LOGGER.debug("Visiting path (%s) %s", root_path_selector.path, path)
try:
docs = await get_flux_kustomizations(root, path)
docs = await get_fluxtomizations(root_path_selector.root, path, build=build)
except FluxException as err:
if visited:
raise FluxException(
f"Error building Fluxtomization in '{root}' path '{path}': {err}"
f"Is a Kustomization pointing to a path that does not exist?"
)
detail = ERROR_DETAIL_BAD_KS if visited else ERROR_DETAIL_BAD_PATH
raise FluxException(
f"Error building Fluxtomization in '{root}' path '{path}': {err}"
f"Try specifying another path within the git repo?"
)

# Source path is relative to the search path. Update to have the
# full prefix relative to the root.
for kustomization in docs:
if not kustomization.path:
_LOGGER.debug("Assigning implicit path %s", path_selector.relative_path)
kustomization.path = str(path_selector.relative_path)
if not kustomization.source_path:
continue
kustomization.source_path = str(
((root / path) / kustomization.source_path).relative_to(root)
)
_LOGGER.debug(
"Updated relative path: %s => %s",
node_name(kustomization),
kustomization.source_path,
f"Error building Fluxtomization in '{root_path_selector.root}' "
f"path '{path}': {err} - {detail}"
)

visited |= set({path})

_LOGGER.debug("Found %s Kustomizations", len(docs))
result_docs = []
for doc in docs:
if doc.namespaced_name in nodes:
_LOGGER.debug(
"Ignoring duplicate Kustomization %s", doc.namespaced_name
)
continue
nodes.add(doc.namespaced_name)
# Source path is relative to the search path. Update to have the
# full prefix relative to the root.
if not doc.path:
_LOGGER.debug(
"Assigning implicit path %s", root_path_selector.relative_path
)
doc.path = str(root_path_selector.relative_path)

found_path: Path | None = None
_LOGGER.debug(
"Kustomization '%s' has sourceRef.kind '%s' of '%s'",
Expand Down Expand Up @@ -428,7 +434,7 @@ async def kustomization_traversal(path_selector: PathSelector) -> list[Kustomiza
_LOGGER.debug("Skipping kustomization %s; not known source", doc.name)
continue

if not find_path_parent(found_path, visited) and found_path not in visited:
if found_path not in visited:
path_queue.put(found_path)
else:
_LOGGER.debug("Already visited %s", found_path)
Expand All @@ -446,125 +452,54 @@ def node_name(ks: Kustomization) -> str:
return f"{ks.namespaced_name} @ {ks.id_name}"


def make_clusters(
kustomizations: list[Kustomization], sources: list[Source] | None = None
) -> list[Cluster]:
"""Convert the flat list of Kustomizations into a Cluster.
This will reverse engineer which Kustomizations are root nodes for the cluster
based on the parent paths. Root Kustomizations are made the cluster and everything
else is made a child.
"""
if not sources:
sources = []

# Build a directed graph from a kustomization path to the path
# of the kustomization that created it.
graph = networkx.DiGraph()
node_path_map = {Path(ks.path): ks for ks in kustomizations if ks.source_path}
for ks in kustomizations:
if not ks.source_path:
raise InputException(
"Kustomization did not have source path; Old kustomize?"
)

graph.add_node(node_name(ks), ks=ks)
if ks.name == ROOT_KUSTOMIZATION_NAME and ks.namespace == DEFAULT_NAMESPACE:
# Do not attempt parent search below
continue

# Find the parent Kustomization that produced this based on the
# matching the kustomize source parent paths with a Kustomization
# target path.
source = Path(ks.source_path)
_LOGGER.debug("--- Examining candidate Kustomization ---")
_LOGGER.debug("Ks : %s", ks.namespaced_name)
_LOGGER.debug("Path : %s", Path(ks.path))
_LOGGER.debug("Source path: %s", source)
source_kustomizations = find_source_kustomization(source, node_path_map)
_LOGGER.debug(
"Possible sources: %s", [f"{node_name(ks)}" for ks in source_kustomizations]
)
if source_kustomizations:
while source_kustomizations:
candidate = source_kustomizations.pop(0)
# These names can be compared since they are within the scope of
# the source path so within the same cluster.
if candidate.namespaced_name != ks.namespaced_name:
_LOGGER.debug(
"Found parent %s => %s",
ks.namespaced_name,
candidate.namespaced_name,
)
if graph.has_edge(node_name(ks), node_name(candidate)):
_LOGGER.debug("Already has opposite edge; Skipping cycle")
else:
graph.add_edge(node_name(candidate), node_name(ks))
else:
_LOGGER.debug(
"Skipping candidate source %s", candidate.namespaced_name
)
else:
_LOGGER.debug("No parent for %s (source=%s)", node_name(ks), source)

# Clusters are subgraphs within the graph that are connected, with the root
# node being the cluster itself. All children Kustomizations are flattended.
_LOGGER.debug("Creating clusters based on connectivity")
for node, degree in graph.in_degree():
_LOGGER.debug("Node: %s, degree: %s", node, degree)
roots = [node for node, degree in graph.in_degree() if degree == 0]
roots.sort()

clusters: list[Cluster] = []
_LOGGER.debug("roots=%s", roots)
for root in roots:
root_ks = graph.nodes[root]["ks"]
nodes = [root] + list(networkx.descendants(graph, root))
nodes.sort()
kustomizations = [graph.nodes[node]["ks"] for node in nodes]
clusters.append(
Cluster(
name=root_ks.name,
namespace=root_ks.namespace,
path=root_ks.path,
kustomizations=kustomizations,
)
)
_LOGGER.debug(
"Created cluster %s with %s kustomizations", root_ks.name, len(nodes)
)

return clusters


async def get_clusters(
path_selector: PathSelector,
cluster_selector: MetadataSelector,
kustomization_selector: MetadataSelector,
) -> list[Cluster]:
"""Load Cluster objects from the specified path."""

kustomizations = await kustomization_traversal(path_selector)
clusters = list(
filter(
cluster_selector.predicate,
make_clusters(kustomizations, path_selector.sources or []),
try:
roots = await get_fluxtomizations(
path_selector.root, path_selector.relative_path, build=False
)
)
except FluxException as err:
raise FluxException(
f"Error building Fluxtomization in '{path_selector.root}' path "
f"'{path_selector.relative_path}': {err}"
f"Try specifying another path within the git repo?"
)
_LOGGER.debug("roots=%s", roots)
clusters = [
Cluster(name=ks.name, namespace=ks.namespace or "", path=ks.path)
for ks in roots
if cluster_selector.predicate(ks)
]
build = True
if not clusters:
# There are no flux-system Kustomizations within this path. Fall back to
# assuming everything in the current directory is part of a cluster.
_LOGGER.debug(
"No clusters found; Processing as a Kustomization: %s",
path_selector.relative_path,
)
clusters = [
Cluster(name="cluster", namespace="", path=str(path_selector.relative_path))
]
build = False

for cluster in clusters:
cluster.kustomizations = list(
filter(kustomization_selector.predicate, cluster.kustomizations)
_LOGGER.debug("Building cluster %s %s", cluster.name, cluster.path)
results = await kustomization_traversal(
path_selector,
PathSelector(path=Path(cluster.path), sources=path_selector.sources),
build=build,
)
results.sort(key=lambda x: (x.namespace, x.name))
cluster.kustomizations = results
clusters.sort(key=lambda x: (x.path, x.namespace, x.name))
return clusters


async def get_kustomizations(path: Path) -> list[dict[str, Any]]:
"""Load Kustomization objects from the specified path."""
cmd = kustomize.grep(f"kind={KUSTOMIZE_KIND}", path)
docs = await cmd.objects()
return list(filter(KUSTOMIZE_DOMAIN_FILTER, docs))


async def build_kustomization(
kustomization: Kustomization,
cluster_path: Path,
Expand Down Expand Up @@ -669,21 +604,6 @@ async def build_manifest(
clusters = await get_clusters(
selector.path, selector.cluster, selector.kustomization
)
if not clusters and selector.path.path:
_LOGGER.debug("No clusters found; Processing as a Kustomization: %s", selector)
# The argument path may be a Kustomization inside a cluster. Create a synthetic
# cluster with any found Kustomizations
cluster = Cluster(
name="cluster", namespace="", path=str(selector.path.relative_path)
)
objects = await get_kustomizations(selector.path.path)
if objects:
cluster.kustomizations = [
Kustomization(
name="kustomization", path=str(selector.path.relative_path)
)
]
clusters.append(cluster)

async def update_kustomization(cluster: Cluster) -> None:
build_tasks = []
Expand Down
11 changes: 8 additions & 3 deletions flux_local/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

# Match a prefix of apiVersion to ensure we have the right type of object.
# We don't check specific versions for forward compatibility on upgrade.
CLUSTER_KUSTOMIZE_DOMAIN = "kustomize.toolkit.fluxcd.io"
FLUXTOMIZE_DOMAIN = "kustomize.toolkit.fluxcd.io"
KUSTOMIZE_DOMAIN = "kustomize.config.k8s.io"
HELM_REPO_DOMAIN = "source.toolkit.fluxcd.io"
HELM_RELEASE_DOMAIN = "helm.toolkit.fluxcd.io"
Expand Down Expand Up @@ -283,7 +283,7 @@ class Kustomization(BaseManifest):
@classmethod
def parse_doc(cls, doc: dict[str, Any]) -> "Kustomization":
"""Parse a partial Kustomization from a kubernetes resource."""
_check_version(doc, CLUSTER_KUSTOMIZE_DOMAIN)
_check_version(doc, FLUXTOMIZE_DOMAIN)
if not (metadata := doc.get("metadata")):
raise InputException(f"Invalid {cls} missing metadata: {doc}")
if not (name := metadata.get("name")):
Expand Down Expand Up @@ -349,7 +349,7 @@ class Cluster(BaseManifest):
@classmethod
def parse_doc(cls, doc: dict[str, Any]) -> "Cluster":
"""Parse a partial Kustomization from a kubernetes resource."""
_check_version(doc, CLUSTER_KUSTOMIZE_DOMAIN)
_check_version(doc, FLUXTOMIZE_DOMAIN)
if not (metadata := doc.get("metadata")):
raise InputException(f"Invalid {cls} missing metadata: {doc}")
if not (name := metadata.get("name")):
Expand All @@ -362,6 +362,11 @@ def parse_doc(cls, doc: dict[str, Any]) -> "Cluster":
raise InputException(f"Invalid {cls} missing spec.path: {doc}")
return Cluster(name=name, namespace=namespace, path=path)

@property
def namespaced_name(self, sep: str = "/") -> str:
"""Return the namespace and name concatenated as an id."""
return f"{self.namespace}{sep}{self.name}"

@property
def id_name(self) -> str:
"""Identifier for the Cluster in tests."""
Expand Down
Loading

0 comments on commit 3cf8607

Please sign in to comment.