Skip to content

Commit 5c9e45b

Browse files
bisgaard-itisGitHKgiancarloromeo
authored
1848 add permission rights to async jobs (#7262)
Co-authored-by: Andrei Neagu <[email protected]> Co-authored-by: Giancarlo Romeo <[email protected]>
1 parent 315de3a commit 5c9e45b

File tree

13 files changed

+244
-117
lines changed

13 files changed

+244
-117
lines changed

packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,15 @@ class AsyncJobResult(BaseModel):
3636

3737
class AsyncJobGet(BaseModel):
3838
job_id: AsyncJobId
39-
job_name: str
4039

4140

4241
class AsyncJobAbort(BaseModel):
4342
result: bool
4443
job_id: AsyncJobId
4544

4645

47-
class AsyncJobAccessData(BaseModel):
46+
class AsyncJobNameData(BaseModel):
4847
"""Data for controlling access to an async job"""
4948

50-
user_id: UserID | None
49+
user_id: UserID
5150
product_name: str
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,33 @@
11
# pylint: disable=R6301
2-
from pathlib import Path
32

43
from common_library.errors_classes import OsparcErrorMixin
5-
from models_library.projects_nodes_io import LocationID
6-
from models_library.users import UserID
4+
from models_library.projects_nodes_io import LocationID, StorageFileID
75
from pydantic import BaseModel, Field
86

97

108
class DataExportTaskStartInput(BaseModel):
11-
user_id: UserID
12-
product_name: str
139
location_id: LocationID
14-
paths: list[Path] = Field(..., min_length=1)
10+
file_and_folder_ids: list[StorageFileID] = Field(..., min_length=1)
1511

1612

1713
### Exceptions
1814

1915

20-
class StorageRpcError(OsparcErrorMixin, RuntimeError):
16+
class StorageRpcBaseError(OsparcErrorMixin, RuntimeError):
2117
pass
2218

2319

24-
class InvalidFileIdentifierError(StorageRpcError):
20+
class InvalidLocationIdError(StorageRpcBaseError):
21+
msg_template: str = "Invalid location_id {location_id}"
22+
23+
24+
class InvalidFileIdentifierError(StorageRpcBaseError):
2525
msg_template: str = "Could not find the file {file_id}"
2626

2727

28-
class AccessRightError(StorageRpcError):
29-
msg_template: str = "User {user_id} does not have access to file {file_id}"
28+
class AccessRightError(StorageRpcBaseError):
29+
msg_template: str = "User {user_id} does not have access to file {file_id} with location {location_id}"
3030

3131

32-
class DataExportError(StorageRpcError):
32+
class DataExportError(StorageRpcBaseError):
3333
msg_template: str = "Could not complete data export job with id {job_id}"

packages/models-library/src/models_library/api_schemas_webserver/storage.py

+4-9
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@
1212
)
1313
from ..api_schemas_storage.data_export_async_jobs import DataExportTaskStartInput
1414
from ..progress_bar import ProgressReport
15-
from ..projects_nodes_io import LocationID
15+
from ..projects_nodes_io import LocationID, StorageFileID
1616
from ..rest_pagination import CursorQueryParameters
17-
from ..users import UserID
1817
from ._base import InputSchema, OutputSchema
1918

2019

@@ -27,15 +26,11 @@ class ListPathsQueryParams(InputSchema, CursorQueryParameters):
2726

2827

2928
class DataExportPost(InputSchema):
30-
paths: list[Path]
29+
paths: list[StorageFileID]
3130

32-
def to_rpc_schema(
33-
self, user_id: UserID, product_name: str, location_id: LocationID
34-
) -> DataExportTaskStartInput:
31+
def to_rpc_schema(self, location_id: LocationID) -> DataExportTaskStartInput:
3532
return DataExportTaskStartInput(
36-
paths=self.paths,
37-
user_id=user_id,
38-
product_name=product_name,
33+
file_and_folder_ids=self.paths,
3934
location_id=location_id,
4035
)
4136

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py

+17-10
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
44
AsyncJobAbort,
5-
AsyncJobAccessData,
65
AsyncJobGet,
76
AsyncJobId,
7+
AsyncJobNameData,
88
AsyncJobResult,
99
AsyncJobStatus,
1010
)
@@ -23,13 +23,13 @@ async def abort(
2323
*,
2424
rpc_namespace: RPCNamespace,
2525
job_id: AsyncJobId,
26-
access_data: AsyncJobAccessData | None
26+
job_id_data: AsyncJobNameData
2727
) -> AsyncJobAbort:
2828
result = await rabbitmq_rpc_client.request(
2929
rpc_namespace,
3030
_RPC_METHOD_NAME_ADAPTER.validate_python("abort"),
3131
job_id=job_id,
32-
access_data=access_data,
32+
job_id_data=job_id_data,
3333
timeout_s=_DEFAULT_TIMEOUT_S,
3434
)
3535
assert isinstance(result, AsyncJobAbort)
@@ -41,13 +41,13 @@ async def get_status(
4141
*,
4242
rpc_namespace: RPCNamespace,
4343
job_id: AsyncJobId,
44-
access_data: AsyncJobAccessData | None
44+
job_id_data: AsyncJobNameData
4545
) -> AsyncJobStatus:
4646
result = await rabbitmq_rpc_client.request(
4747
rpc_namespace,
4848
_RPC_METHOD_NAME_ADAPTER.validate_python("get_status"),
4949
job_id=job_id,
50-
access_data=access_data,
50+
job_id_data=job_id_data,
5151
timeout_s=_DEFAULT_TIMEOUT_S,
5252
)
5353
assert isinstance(result, AsyncJobStatus)
@@ -59,26 +59,31 @@ async def get_result(
5959
*,
6060
rpc_namespace: RPCNamespace,
6161
job_id: AsyncJobId,
62-
access_data: AsyncJobAccessData | None
62+
job_id_data: AsyncJobNameData
6363
) -> AsyncJobResult:
6464
result = await rabbitmq_rpc_client.request(
6565
rpc_namespace,
6666
_RPC_METHOD_NAME_ADAPTER.validate_python("get_result"),
6767
job_id=job_id,
68-
access_data=access_data,
68+
job_id_data=job_id_data,
6969
timeout_s=_DEFAULT_TIMEOUT_S,
7070
)
7171
assert isinstance(result, AsyncJobResult)
7272
return result
7373

7474

7575
async def list_jobs(
76-
rabbitmq_rpc_client: RabbitMQRPCClient, *, rpc_namespace: RPCNamespace, filter_: str
76+
rabbitmq_rpc_client: RabbitMQRPCClient,
77+
*,
78+
rpc_namespace: RPCNamespace,
79+
filter_: str,
80+
job_id_data: AsyncJobNameData
7781
) -> list[AsyncJobGet]:
7882
result: list[AsyncJobGet] = await rabbitmq_rpc_client.request(
7983
rpc_namespace,
8084
_RPC_METHOD_NAME_ADAPTER.validate_python("list_jobs"),
8185
filter_=filter_,
86+
job_id_data=job_id_data,
8287
timeout_s=_DEFAULT_TIMEOUT_S,
8388
)
8489
return result
@@ -88,12 +93,14 @@ async def submit_job(
8893
rabbitmq_rpc_client: RabbitMQRPCClient,
8994
*,
9095
rpc_namespace: RPCNamespace,
91-
job_name: str,
96+
method_name: str,
97+
job_id_data: AsyncJobNameData,
9298
**kwargs
9399
) -> AsyncJobGet:
94100
result = await rabbitmq_rpc_client.request(
95101
rpc_namespace,
96-
_RPC_METHOD_NAME_ADAPTER.validate_python(job_name),
102+
_RPC_METHOD_NAME_ADAPTER.validate_python(method_name),
103+
job_id_data=job_id_data,
97104
**kwargs,
98105
timeout_s=_DEFAULT_TIMEOUT_S,
99106
)

services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py

+11-6
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
from fastapi import FastAPI
66
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
77
AsyncJobAbort,
8-
AsyncJobAccessData,
98
AsyncJobGet,
109
AsyncJobId,
10+
AsyncJobNameData,
1111
AsyncJobResult,
1212
AsyncJobStatus,
1313
)
@@ -23,17 +23,19 @@
2323

2424
@router.expose()
2525
async def abort(
26-
app: FastAPI, job_id: AsyncJobId, access_data: AsyncJobAccessData | None
26+
app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
2727
) -> AsyncJobAbort:
2828
assert app # nosec
29+
assert job_id_data # nosec
2930
return AsyncJobAbort(result=True, job_id=job_id)
3031

3132

3233
@router.expose(reraise_if_error_type=(StatusError,))
3334
async def get_status(
34-
app: FastAPI, job_id: AsyncJobId, access_data: AsyncJobAccessData | None
35+
app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
3536
) -> AsyncJobStatus:
3637
assert app # nosec
38+
assert job_id_data # nosec
3739
progress_report = ProgressReport(actual_value=0.5, total=1.0, attempt=1)
3840
return AsyncJobStatus(
3941
job_id=job_id,
@@ -46,14 +48,17 @@ async def get_status(
4648

4749
@router.expose(reraise_if_error_type=(ResultError,))
4850
async def get_result(
49-
app: FastAPI, job_id: AsyncJobId, access_data: AsyncJobAccessData | None
51+
app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData
5052
) -> AsyncJobResult:
5153
assert app # nosec
5254
assert job_id # nosec
55+
assert job_id_data # nosec
5356
return AsyncJobResult(result="Here's your result.", error=None)
5457

5558

5659
@router.expose()
57-
async def list_jobs(app: FastAPI, filter_: str) -> list[AsyncJobGet]:
60+
async def list_jobs(
61+
app: FastAPI, filter_: str, job_id_data: AsyncJobNameData
62+
) -> list[AsyncJobGet]:
5863
assert app # nosec
59-
return [AsyncJobGet(job_id=AsyncJobId(f"{uuid4()}"), job_name="myjob")]
64+
return [AsyncJobGet(job_id=AsyncJobId(f"{uuid4()}"))]

services/storage/src/simcore_service_storage/api/rpc/_data_export.py

+31-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
from uuid import uuid4
22

33
from fastapi import FastAPI
4-
from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobGet, AsyncJobId
4+
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
5+
AsyncJobGet,
6+
AsyncJobId,
7+
AsyncJobNameData,
8+
)
59
from models_library.api_schemas_storage.data_export_async_jobs import (
610
AccessRightError,
711
DataExportError,
@@ -10,6 +14,12 @@
1014
)
1115
from servicelib.rabbitmq import RPCRouter
1216

17+
from ...datcore_dsm import DatCoreDataManager
18+
from ...dsm import get_dsm_provider
19+
from ...exceptions.errors import FileAccessRightError
20+
from ...modules.datcore_adapter.datcore_adapter_exceptions import DatcoreAdapterError
21+
from ...simcore_s3_dsm import SimcoreS3DataManager
22+
1323
router = RPCRouter()
1424

1525

@@ -21,10 +31,28 @@
2131
)
2232
)
2333
async def start_data_export(
24-
app: FastAPI, paths: DataExportTaskStartInput
34+
app: FastAPI,
35+
data_export_start: DataExportTaskStartInput,
36+
job_id_data: AsyncJobNameData,
2537
) -> AsyncJobGet:
2638
assert app # nosec
39+
40+
dsm = get_dsm_provider(app).get(data_export_start.location_id)
41+
42+
try:
43+
for _id in data_export_start.file_and_folder_ids:
44+
if isinstance(dsm, DatCoreDataManager):
45+
_ = await dsm.get_file(user_id=job_id_data.user_id, file_id=_id)
46+
elif isinstance(dsm, SimcoreS3DataManager):
47+
await dsm.can_read_file(user_id=job_id_data.user_id, file_id=_id)
48+
49+
except (FileAccessRightError, DatcoreAdapterError) as err:
50+
raise AccessRightError(
51+
user_id=job_id_data.user_id,
52+
file_id=_id,
53+
location_id=data_export_start.location_id,
54+
) from err
55+
2756
return AsyncJobGet(
2857
job_id=AsyncJobId(f"{uuid4()}"),
29-
job_name=", ".join(str(p) for p in paths.paths),
3058
)

services/storage/src/simcore_service_storage/simcore_s3_dsm.py

+6
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,12 @@ async def get_file(self, user_id: UserID, file_id: StorageFileID) -> FileMetaDat
355355
fmd = await self._update_database_from_storage(fmd)
356356
return convert_db_to_model(fmd)
357357

358+
async def can_read_file(self, user_id: UserID, file_id: StorageFileID):
359+
async with self.engine.connect() as conn:
360+
can = await get_file_access_rights(conn, int(user_id), file_id)
361+
if not can.read:
362+
raise FileAccessRightError(access_right="read", file_id=file_id)
363+
358364
async def create_file_upload_links(
359365
self,
360366
user_id: UserID,

services/storage/tests/conftest.py

+39-1
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,13 @@
6161
from simcore_service_storage.core.application import create_app
6262
from simcore_service_storage.core.settings import ApplicationSettings
6363
from simcore_service_storage.dsm import get_dsm_provider
64-
from simcore_service_storage.models import S3BucketName
64+
from simcore_service_storage.models import FileMetaData, FileMetaDataAtDB, S3BucketName
6565
from simcore_service_storage.modules.long_running_tasks import (
6666
get_completed_upload_tasks,
6767
)
6868
from simcore_service_storage.modules.s3 import get_s3_client
6969
from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager
70+
from sqlalchemy import literal_column
7071
from sqlalchemy.ext.asyncio import AsyncEngine
7172
from tenacity.asyncio import AsyncRetrying
7273
from tenacity.retry import retry_if_exception_type
@@ -849,3 +850,40 @@ async def with_random_project_with_files(
849850
faker: Faker,
850851
) -> tuple[dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, FileIDDict]],]:
851852
return await random_project_with_files(project_params)
853+
854+
855+
@pytest.fixture()
856+
async def output_file(
857+
user_id: UserID, project_id: str, sqlalchemy_async_engine: AsyncEngine, faker: Faker
858+
) -> AsyncIterator[FileMetaData]:
859+
node_id = "fd6f9737-1988-341b-b4ac-0614b646fa82"
860+
861+
# pylint: disable=no-value-for-parameter
862+
863+
file = FileMetaData.from_simcore_node(
864+
user_id=user_id,
865+
file_id=f"{project_id}/{node_id}/filename.txt",
866+
bucket=TypeAdapter(S3BucketName).validate_python("master-simcore"),
867+
location_id=SimcoreS3DataManager.get_location_id(),
868+
location_name=SimcoreS3DataManager.get_location_name(),
869+
sha256_checksum=faker.sha256(),
870+
)
871+
file.entity_tag = "df9d868b94e53d18009066ca5cd90e9f"
872+
file.file_size = ByteSize(12)
873+
file.user_id = user_id
874+
async with sqlalchemy_async_engine.begin() as conn:
875+
stmt = (
876+
file_meta_data.insert()
877+
.values(jsonable_encoder(FileMetaDataAtDB.model_validate(file)))
878+
.returning(literal_column("*"))
879+
)
880+
result = await conn.execute(stmt)
881+
row = result.one()
882+
assert row
883+
884+
yield file
885+
886+
async with sqlalchemy_async_engine.begin() as conn:
887+
result = await conn.execute(
888+
file_meta_data.delete().where(file_meta_data.c.file_id == row.file_id)
889+
)

0 commit comments

Comments
 (0)