Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
79e79e2
Improve incomplete type hints
disrupted Jul 24, 2024
5c7f0de
Improve incomplete type hints
disrupted Jul 24, 2024
05ee4bf
Improve incomplete type hints
disrupted Jul 24, 2024
3049f23
Fix Pyright complaint
disrupted Jul 24, 2024
3ff2ad8
Improve incomplete type hints
disrupted Jul 24, 2024
251c998
Improve incomplete type hints
disrupted Jul 24, 2024
b0ab9fe
Improve incomplete type hints
disrupted Jul 24, 2024
abea358
Improve incomplete type hints
disrupted Jul 24, 2024
5d272d2
Improve incomplete type hints
disrupted Jul 24, 2024
e3396d2
Improve incomplete type hints
disrupted Jul 24, 2024
fb35718
Improve incomplete type hints
disrupted Jul 24, 2024
bee89d2
Update Pydantic
disrupted Jul 29, 2024
0fa92cd
Add type stubs for cached_classproperty
disrupted Jul 29, 2024
4b9e0e4
Improve incomplete type hints
disrupted Jul 29, 2024
9fd16c9
Improve incomplete type hints
disrupted Jul 29, 2024
6a9b4b2
Improve incomplete type hints
disrupted Jul 29, 2024
054ba0c
Improve incomplete type hints
disrupted Jul 29, 2024
1c2ff0c
Improve incomplete type hints
disrupted Jul 29, 2024
0d0e684
Improve incomplete type hints
disrupted Jul 29, 2024
47f521b
Improve incomplete type hints
disrupted Jul 29, 2024
cef68a5
Improve incomplete type hints
disrupted Jul 29, 2024
eff083f
Improve incomplete type hints
disrupted Jul 29, 2024
78eef7e
Revert "Update Pydantic"
disrupted Jul 29, 2024
1f595d9
Fix
disrupted Jul 29, 2024
9b73dec
Rename TypeVars
disrupted Jul 29, 2024
ecff6b2
Improve incomplete type hints
disrupted Jul 29, 2024
fa4e21c
Improve incomplete type hints
disrupted Jul 29, 2024
725fd25
Remove noqa directive
disrupted Jul 29, 2024
776dcc4
Improve incomplete type hints
disrupted Jul 29, 2024
394c362
Update Ruff
disrupted Jul 29, 2024
2653378
Replace networkx with rustworkx
disrupted Jul 29, 2024
bc5d458
Remove networkx
disrupted Jul 29, 2024
c170b20
Merge remote-tracking branch 'origin/main' into refactor/rustworkx
disrupted Jul 29, 2024
e7d08fa
Remove Pyright directive
disrupted Jul 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions kpops/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, TypeAlias

import networkx as nx
import rustworkx as rx
import yaml
from pydantic import (
BaseModel,
Expand Down Expand Up @@ -38,7 +38,7 @@ class Pipeline(BaseModel):
"""Pipeline representation."""

_component_index: dict[str, PipelineComponent] = {}
_graph: nx.DiGraph = nx.DiGraph()
_graph: rx.PyDiGraph[str, None] = rx.PyDiGraph()

model_config = ConfigDict(arbitrary_types_allowed=True)

Expand Down Expand Up @@ -118,19 +118,19 @@ async def run_graph_tasks(pending_tasks: list[Awaitable[None]]) -> None:
for pending_task in pending_tasks:
await pending_task

graph: nx.DiGraph = self._graph.copy() # pyright: ignore[reportAssignmentType, reportGeneralTypeIssues] imprecise type hint in networkx
graph = self._graph.copy()

# We add an extra node to the graph, connecting all the leaf nodes to it
# in that way we make this node the root of the graph, avoiding backtracking
root_node = "root_node_bfs"
graph.add_node(root_node)
root_node = graph.add_node("root_node_bfs")

for node in graph:
for node in graph.node_indices():
predecessors = list(graph.predecessors(node))
if not predecessors:
graph.add_edge(root_node, node)
graph.add_edge(root_node, node, None)

layers_graph: list[list[str]] = list(nx.bfs_layers(graph, root_node))
# TODO: blocker, not implemented in rustworkx
layers_graph: list[list[str]] = list(rx.bfs_layers(graph, root_node))

sorted_tasks: list[Awaitable[None]] = []
for layer in layers_graph[1:]:
Expand Down Expand Up @@ -159,21 +159,21 @@ def __len__(self) -> int:
return len(self.components)

def __add_to_graph(self, component: PipelineComponent):
self._graph.add_node(component.id)
node = self._graph.add_node(component.id)

for input_topic in component.inputs:
self.__add_input(input_topic.id, component.id)
self.__add_input(input_topic.id, node)

for output_topic in component.outputs:
self.__add_output(output_topic.id, component.id)
self.__add_output(output_topic.id, node)

def __add_output(self, topic_id: str, source: str) -> None:
self._graph.add_node(topic_id)
self._graph.add_edge(source, topic_id)
def __add_output(self, topic_id: str, source: int) -> None:
topic = self._graph.add_node(topic_id)
self._graph.add_edge(source, topic, None)

def __add_input(self, topic_id: str, target: str) -> None:
self._graph.add_node(topic_id)
self._graph.add_edge(topic_id, target)
def __add_input(self, topic_id: str, target: int) -> None:
topic = self._graph.add_node(topic_id)
self._graph.add_edge(topic, target, None)

def __get_parallel_tasks_from(
self,
Expand All @@ -189,7 +189,7 @@ def gen_parallel_tasks():
return list(gen_parallel_tasks())

def __validate_graph(self) -> None:
if not nx.is_directed_acyclic_graph(self._graph):
if not rx.is_directed_acyclic_graph(self._graph):
msg = "Pipeline is not a valid DAG."
raise ValueError(msg)

Expand Down
103 changes: 84 additions & 19 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ cachetools = "^5.2.0"
dictdiffer = "^0.9.0"
python-schema-registry-client = "^2.4.1"
httpx = "^0 >=0.24.1"
networkx = "^3.1"
rustworkx = "^0.15.1"
kubernetes-asyncio = "^29.0.0"

[tool.poetry.group.dev.dependencies]
Expand Down