-
Notifications
You must be signed in to change notification settings - Fork 630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Keep separate per-pipeline operator counters. Error out when "stealing" subgraphs from other pipelines results in duplicate names. #5506
Conversation
def get_op_outputs_num(): | ||
# BSF traverse the graph first to learn, for each reachable operator in the graph, | ||
# how many data-nodes/edges the operator contributes to | ||
# (i.e. the number of outputs of the operator instance) | ||
op_outputs_num = {} | ||
edges = deque(output_nodes) | ||
while edges: | ||
current_edge = edges.popleft() | ||
source_op = get_source_op(current_edge) | ||
if source_op.id in op_outputs_num: | ||
op_outputs_num[source_op.id] += 1 | ||
else: | ||
op_outputs_num[source_op.id] = 1 | ||
source_op.check_args() | ||
edges.extend(get_op_input_edges(source_op)) | ||
return op_outputs_num | ||
|
||
visited = set() | ||
ops = [] | ||
edges = deque(output_nodes) | ||
op_total_outputs_num = get_op_outputs_num() | ||
op_visited_outputs_num = {op_id: 0 for op_id in op_total_outputs_num} | ||
while edges: | ||
current_edge = edges.popleft() | ||
source_op = get_source_op(current_edge) | ||
op_visited_outputs_num[source_op.id] += 1 | ||
# Actually visit the operator only when all the nodes it contributes to | ||
# were already processed | ||
if op_visited_outputs_num[source_op.id] == op_total_outputs_num[source_op.id]: | ||
ops.append(source_op) | ||
edges.extend(get_op_input_edges(source_op)) | ||
ops.reverse() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was unnecessarily complex. DFS is much better suited for discovering topology.
@@ -57,7 +57,8 @@ | |||
class _OpCounter(object): | |||
# pylint: disable=too-few-public-methods | |||
_lock = threading.Lock() | |||
_op_count = count(0) | |||
# start from something large to avoid confusion with (more common) per-pipeline numbering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for reviewers: "avoid confusion" means only confusion while debugging. The renaming can deal with duplicate names.
CI MESSAGE: [15624008]: BUILD STARTED |
@@ -57,7 +57,8 @@ | |||
class _OpCounter(object): | |||
# pylint: disable=too-few-public-methods | |||
_lock = threading.Lock() | |||
_op_count = count(0) | |||
# start from something large to avoid confusion with (more common) per-pipeline numbering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# start from something large to avoid confusion with (more common) per-pipeline numbering | |
# start from something large to avoid confusion while debugging with (more common) per-pipeline numbering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...
CI MESSAGE: [15624354]: BUILD STARTED |
CI MESSAGE: [15624997]: BUILD STARTED |
CI MESSAGE: [15624997]: BUILD FAILED |
CI MESSAGE: [15633718]: BUILD STARTED |
CI MESSAGE: [15633718]: BUILD FAILED |
CI MESSAGE: [15654084]: BUILD STARTED |
CI MESSAGE: [15654084]: BUILD FAILED |
CI MESSAGE: [15654586]: BUILD STARTED |
6ac2b99
to
a291562
Compare
CI MESSAGE: [15709123]: BUILD FAILED |
…es from other pipelines (including None). Signed-off-by: Michal Zientkiewicz <[email protected]>
Signed-off-by: Michal Zientkiewicz <[email protected]>
Signed-off-by: Michal Zientkiewicz <[email protected]>
Signed-off-by: Michal Zientkiewicz <[email protected]>
- handle renaming of duplicate nodes - handle renaming of pipeline inputs that are not direct outputs of any operator - improve error message Signed-off-by: Michal Zientkiewicz <[email protected]>
Signed-off-by: Michal Zientkiewicz <[email protected]>
Signed-off-by: Michal Zientkiewicz <[email protected]>
Signed-off-by: Michal Zientkiewicz <[email protected]>
…ends. Signed-off-by: Michal Zientkiewicz <[email protected]>
a291562
to
999c1cc
Compare
CI MESSAGE: [15734267]: BUILD STARTED |
CI MESSAGE: [15734267]: BUILD PASSED |
Category:
New feature (non-breaking change which adds functionality)
Refactoring (Redesign of existing code that doesn't affect functionality)
Description:
Prior to this change constructing exactly the same pipeline (e.g. by calling a function decorated with
@pipeline_def
) multiple times produced pipelines with different operator instance names and differently named operator instances and DataNodes. This PR changes that so that pipelines with the same structure have the same node names.This is achieved by:
Pipelines that are defined without a "current" pipeline have distinct operator instance names.
Additionally, there were some problems with operator discovery. I rewrote it to a much simpler DFS.
Additional information:
Affected modules and functionalities:
Key points relevant for the review:
Tests:
pipeline_test.py: test_dangling_subgraph
Checklist
Documentation
DALI team only
Requirements
REQ IDs: N/A
JIRA TASK: N/A