Skip to content

Commit c018e00

Browse files
committed
AIP-81 Add Overwrite for Bulk Insert Pool API
1 parent 8a3d0f4 commit c018e00

File tree

9 files changed

+238
-76
lines changed

9 files changed

+238
-76
lines changed

airflow/api_fastapi/core_api/datamodels/pools.py

+1
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,4 @@ class PoolPostBulkBody(BaseModel):
8383
"""Pools serializer for post bodies."""
8484

8585
pools: list[PoolPostBody]
86+
overwrite: bool | None = Field(default=False)

airflow/api_fastapi/core_api/openapi/v1-generated.yaml

+16-4
Original file line numberDiff line numberDiff line change
@@ -3999,12 +3999,12 @@ paths:
39993999
schema:
40004000
$ref: '#/components/schemas/HTTPValidationError'
40014001
/public/pools/bulk:
4002-
post:
4002+
put:
40034003
tags:
40044004
- Pool
4005-
summary: Post Pools
4005+
summary: Put Pools
40064006
description: Create multiple pools.
4007-
operationId: post_pools
4007+
operationId: put_pools
40084008
requestBody:
40094009
content:
40104010
application/json:
@@ -4013,7 +4013,7 @@ paths:
40134013
required: true
40144014
responses:
40154015
'201':
4016-
description: Successful Response
4016+
description: Created
40174017
content:
40184018
application/json:
40194019
schema:
@@ -4036,6 +4036,12 @@ paths:
40364036
application/json:
40374037
schema:
40384038
$ref: '#/components/schemas/HTTPExceptionResponse'
4039+
'200':
4040+
description: Created with overwriting
4041+
content:
4042+
application/json:
4043+
schema:
4044+
$ref: '#/components/schemas/PoolCollectionResponse'
40394045
'422':
40404046
description: Validation Error
40414047
content:
@@ -8645,6 +8651,12 @@ components:
86458651
$ref: '#/components/schemas/PoolPostBody'
86468652
type: array
86478653
title: Pools
8654+
overwrite:
8655+
anyOf:
8656+
- type: boolean
8657+
- type: 'null'
8658+
title: Overwrite
8659+
default: false
86488660
type: object
86498661
required:
86508662
- pools

airflow/api_fastapi/core_api/routes/public/pools.py

+38-10
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
from typing import Annotated, cast
2020

21-
from fastapi import Depends, HTTPException, Query, status
21+
from fastapi import Depends, HTTPException, Query, Response, status
2222
from fastapi.exceptions import RequestValidationError
2323
from pydantic import ValidationError
2424
from sqlalchemy import delete, select
@@ -176,21 +176,49 @@ def post_pool(
176176
return pool
177177

178178

179-
@pools_router.post(
179+
@pools_router.put(
180180
"/bulk",
181181
status_code=status.HTTP_201_CREATED,
182-
responses=create_openapi_http_exception_doc(
183-
[
184-
status.HTTP_409_CONFLICT, # handled by global exception handler
185-
]
186-
),
182+
responses={
183+
**create_openapi_http_exception_doc(
184+
[
185+
status.HTTP_409_CONFLICT, # handled by global exception handler
186+
]
187+
),
188+
status.HTTP_201_CREATED: {
189+
"description": "Created",
190+
"model": PoolCollectionResponse,
191+
},
192+
status.HTTP_200_OK: {
193+
"description": "Created with overwriting",
194+
"model": PoolCollectionResponse,
195+
},
196+
},
187197
)
188-
def post_pools(
189-
body: PoolPostBulkBody,
198+
def put_pools(
199+
response: Response,
200+
put_body: PoolPostBulkBody,
190201
session: SessionDep,
191202
) -> PoolCollectionResponse:
192203
"""Create multiple pools."""
193-
pools = [Pool(**body.model_dump()) for body in body.pools]
204+
response.status_code = status.HTTP_201_CREATED if not put_body.overwrite else status.HTTP_200_OK
205+
pools: list[Pool]
206+
if not put_body.overwrite:
207+
pools = [Pool(**body.model_dump()) for body in put_body.pools]
208+
else:
209+
pool_names = [pool.pool for pool in put_body.pools]
210+
existed_pools = session.execute(select(Pool).filter(Pool.pool.in_(pool_names))).scalars()
211+
existed_pools_dict = {pool.pool: pool for pool in existed_pools}
212+
pools = []
213+
# if pool already exists, update the corresponding pool, else add a new pool
214+
for body in put_body.pools:
215+
if body.pool in existed_pools_dict:
216+
pool = existed_pools_dict[body.pool]
217+
for key, val in body.model_dump().items():
218+
setattr(pool, key, val)
219+
pools.append(pool)
220+
else:
221+
pools.append(Pool(**body.model_dump()))
194222
session.add_all(pools)
195223
return PoolCollectionResponse(
196224
pools=cast(list[PoolResponse], pools),

airflow/ui/openapi-gen/queries/common.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1616,7 +1616,6 @@ export type TaskInstanceServicePostClearTaskInstancesMutationResult = Awaited<
16161616
ReturnType<typeof TaskInstanceService.postClearTaskInstances>
16171617
>;
16181618
export type PoolServicePostPoolMutationResult = Awaited<ReturnType<typeof PoolService.postPool>>;
1619-
export type PoolServicePostPoolsMutationResult = Awaited<ReturnType<typeof PoolService.postPools>>;
16201619
export type VariableServicePostVariableMutationResult = Awaited<
16211620
ReturnType<typeof VariableService.postVariable>
16221621
>;
@@ -1635,6 +1634,7 @@ export type BackfillServiceCancelBackfillMutationResult = Awaited<
16351634
export type ConnectionServicePutConnectionsMutationResult = Awaited<
16361635
ReturnType<typeof ConnectionService.putConnections>
16371636
>;
1637+
export type PoolServicePutPoolsMutationResult = Awaited<ReturnType<typeof PoolService.putPools>>;
16381638
export type DagParsingServiceReparseDagFileMutationResult = Awaited<
16391639
ReturnType<typeof DagParsingService.reparseDagFile>
16401640
>;

airflow/ui/openapi-gen/queries/queries.ts

+37-36
Original file line numberDiff line numberDiff line change
@@ -3033,42 +3033,6 @@ export const usePoolServicePostPool = <
30333033
mutationFn: ({ requestBody }) => PoolService.postPool({ requestBody }) as unknown as Promise<TData>,
30343034
...options,
30353035
});
3036-
/**
3037-
* Post Pools
3038-
* Create multiple pools.
3039-
* @param data The data for the request.
3040-
* @param data.requestBody
3041-
* @returns PoolCollectionResponse Successful Response
3042-
* @throws ApiError
3043-
*/
3044-
export const usePoolServicePostPools = <
3045-
TData = Common.PoolServicePostPoolsMutationResult,
3046-
TError = unknown,
3047-
TContext = unknown,
3048-
>(
3049-
options?: Omit<
3050-
UseMutationOptions<
3051-
TData,
3052-
TError,
3053-
{
3054-
requestBody: PoolPostBulkBody;
3055-
},
3056-
TContext
3057-
>,
3058-
"mutationFn"
3059-
>,
3060-
) =>
3061-
useMutation<
3062-
TData,
3063-
TError,
3064-
{
3065-
requestBody: PoolPostBulkBody;
3066-
},
3067-
TContext
3068-
>({
3069-
mutationFn: ({ requestBody }) => PoolService.postPools({ requestBody }) as unknown as Promise<TData>,
3070-
...options,
3071-
});
30723036
/**
30733037
* Post Variable
30743038
* Create a variable.
@@ -3292,6 +3256,43 @@ export const useConnectionServicePutConnections = <
32923256
ConnectionService.putConnections({ requestBody }) as unknown as Promise<TData>,
32933257
...options,
32943258
});
3259+
/**
3260+
* Put Pools
3261+
* Create multiple pools.
3262+
* @param data The data for the request.
3263+
* @param data.requestBody
3264+
* @returns PoolCollectionResponse Created with overwriting
3265+
* @returns PoolCollectionResponse Created
3266+
* @throws ApiError
3267+
*/
3268+
export const usePoolServicePutPools = <
3269+
TData = Common.PoolServicePutPoolsMutationResult,
3270+
TError = unknown,
3271+
TContext = unknown,
3272+
>(
3273+
options?: Omit<
3274+
UseMutationOptions<
3275+
TData,
3276+
TError,
3277+
{
3278+
requestBody: PoolPostBulkBody;
3279+
},
3280+
TContext
3281+
>,
3282+
"mutationFn"
3283+
>,
3284+
) =>
3285+
useMutation<
3286+
TData,
3287+
TError,
3288+
{
3289+
requestBody: PoolPostBulkBody;
3290+
},
3291+
TContext
3292+
>({
3293+
mutationFn: ({ requestBody }) => PoolService.putPools({ requestBody }) as unknown as Promise<TData>,
3294+
...options,
3295+
});
32953296
/**
32963297
* Reparse Dag File
32973298
* Request re-parsing a DAG file.

airflow/ui/openapi-gen/requests/schemas.gen.ts

+12
Original file line numberDiff line numberDiff line change
@@ -3767,6 +3767,18 @@ export const $PoolPostBulkBody = {
37673767
type: "array",
37683768
title: "Pools",
37693769
},
3770+
overwrite: {
3771+
anyOf: [
3772+
{
3773+
type: "boolean",
3774+
},
3775+
{
3776+
type: "null",
3777+
},
3778+
],
3779+
title: "Overwrite",
3780+
default: false,
3781+
},
37703782
},
37713783
type: "object",
37723784
required: ["pools"],

airflow/ui/openapi-gen/requests/services.gen.ts

+7-6
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ import type {
160160
GetPoolsResponse,
161161
PostPoolData,
162162
PostPoolResponse,
163-
PostPoolsData,
164-
PostPoolsResponse,
163+
PutPoolsData,
164+
PutPoolsResponse,
165165
GetProvidersData,
166166
GetProvidersResponse,
167167
GetXcomEntryData,
@@ -2701,16 +2701,17 @@ export class PoolService {
27012701
}
27022702

27032703
/**
2704-
* Post Pools
2704+
* Put Pools
27052705
* Create multiple pools.
27062706
* @param data The data for the request.
27072707
* @param data.requestBody
2708-
* @returns PoolCollectionResponse Successful Response
2708+
* @returns PoolCollectionResponse Created with overwriting
2709+
* @returns PoolCollectionResponse Created
27092710
* @throws ApiError
27102711
*/
2711-
public static postPools(data: PostPoolsData): CancelablePromise<PostPoolsResponse> {
2712+
public static putPools(data: PutPoolsData): CancelablePromise<PutPoolsResponse> {
27122713
return __request(OpenAPI, {
2713-
method: "POST",
2714+
method: "PUT",
27142715
url: "/public/pools/bulk",
27152716
body: data.requestBody,
27162717
mediaType: "application/json",

airflow/ui/openapi-gen/requests/types.gen.ts

+10-5
Original file line numberDiff line numberDiff line change
@@ -904,6 +904,7 @@ export type PoolPostBody = {
904904
*/
905905
export type PoolPostBulkBody = {
906906
pools: Array<PoolPostBody>;
907+
overwrite?: boolean | null;
907908
};
908909

909910
/**
@@ -2068,11 +2069,11 @@ export type PostPoolData = {
20682069

20692070
export type PostPoolResponse = PoolResponse;
20702071

2071-
export type PostPoolsData = {
2072+
export type PutPoolsData = {
20722073
requestBody: PoolPostBulkBody;
20732074
};
20742075

2075-
export type PostPoolsResponse = PoolCollectionResponse;
2076+
export type PutPoolsResponse = PoolCollectionResponse;
20762077

20772078
export type GetProvidersData = {
20782079
limit?: number;
@@ -4277,11 +4278,15 @@ export type $OpenApiTs = {
42774278
};
42784279
};
42794280
"/public/pools/bulk": {
4280-
post: {
4281-
req: PostPoolsData;
4281+
put: {
4282+
req: PutPoolsData;
42824283
res: {
42834284
/**
4284-
* Successful Response
4285+
* Created with overwriting
4286+
*/
4287+
200: PoolCollectionResponse;
4288+
/**
4289+
* Created
42854290
*/
42864291
201: PoolCollectionResponse;
42874292
/**

0 commit comments

Comments
 (0)