Skip to content

Commit

Permalink
e2e_add_k8s_native
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh committed Dec 30, 2024
1 parent 18f4ba6 commit ddcf890
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public void run(JobStatement jobStatement) throws Exception {
} else {
log.error(
"Only one pipeline job is executed. The statement has be skipped: " + jobStatement.getStatement());
return;
}
}

Expand Down
8 changes: 5 additions & 3 deletions e2e_test/tools/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ class FlinkRunMode(Enum):

@staticmethod
def getAllMode():
return [FlinkRunMode.LOCAL, FlinkRunMode.STANDALONE, FlinkRunMode.YARN_APPLICATION,FlinkRunMode.KUBERNETES_APPLICATION]
# todo 这里暂时剔除 local,因为并发场景下,会出现接口卡住问题
return [FlinkRunMode.STANDALONE, FlinkRunMode.YARN_APPLICATION, FlinkRunMode.KUBERNETES_APPLICATION]


class Task:
def __init__(self, session: requests.Session, cluster_id: int, yarn_cluster_id: int,k8s_native_cluster_id: int, parent_id: int, name: str,
def __init__(self, session: requests.Session, cluster_id: int, yarn_cluster_id: int, k8s_native_cluster_id: int,
parent_id: int, name: str,
statement):
self.session = session
self.cluster_id = cluster_id
Expand Down Expand Up @@ -125,7 +127,7 @@ def taskFunc(mode: FlinkRunMode):

if is_async:
with concurrent.futures.ThreadPoolExecutor() as executor:
results = [executor.submit(taskFunc, model ) for model in modes]
results = [executor.submit(taskFunc, model) for model in modes]
for result in results:
result.result()
else:
Expand Down

0 comments on commit ddcf890

Please sign in to comment.