-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
de7e635
commit c69c6a6
Showing
8 changed files
with
232 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
from typing import Annotated | ||
from uuid import UUID | ||
|
||
from fastapi import APIRouter, Depends, UploadFile, status | ||
from fastapi.responses import FileResponse | ||
from sqlalchemy.orm import Session | ||
|
||
from src.database import get_db | ||
from src.documents import models as doc_models | ||
from src.documents import schemas as doc_schemas | ||
from src.documents import service as doc_service | ||
from src.documents.dependencies import get_doc_by_id, valid_file | ||
from src.users import schemas as user_schemas | ||
|
||
# from src.projects import service as project_service | ||
from src.users.dependencies import get_curr_user | ||
|
||
router = APIRouter(prefix="/documents") | ||
|
||
|
||
@router.get("/{doc_id}", status_code=status.HTTP_200_OK) | ||
def download( | ||
document: Annotated[doc_models.Document, Depends(get_doc_by_id)], | ||
user: Annotated[user_schemas.User, Depends(get_curr_user)], | ||
db: Annotated[Session, Depends(get_db)], | ||
) -> FileResponse: | ||
doc = doc_service.read(document, user.id, db) | ||
return FileResponse(doc.url, filename=doc.name) | ||
|
||
|
||
@router.put("/{doc_id}", status_code=status.HTTP_200_OK) | ||
def update( | ||
doc_id: UUID, | ||
document: Annotated[doc_models.Document, Depends(get_doc_by_id)], | ||
file: Annotated[UploadFile, Depends(valid_file)], | ||
user: Annotated[user_schemas.User, Depends(get_curr_user)], | ||
db: Annotated[Session, Depends(get_db)], | ||
) -> doc_schemas.Document: | ||
doc = doc_service.update(document, file, user.id, db) | ||
return doc | ||
|
||
|
||
@router.delete("/{doc_id}", status_code=status.HTTP_204_NO_CONTENT) | ||
def delete( | ||
doc_id: UUID, | ||
document: Annotated[doc_models.Document, Depends(get_doc_by_id)], | ||
user: Annotated[user_schemas.User, Depends(get_curr_user)], | ||
db: Annotated[Session, Depends(get_db)], | ||
) -> None: | ||
doc_service.delete(document, user.id, db) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,30 +1,101 @@ | ||
import os | ||
import pathlib | ||
from uuid import UUID | ||
|
||
from fastapi import UploadFile | ||
from fastapi import HTTPException, UploadFile, status | ||
from sqlalchemy.orm import Session | ||
from sqlalchemy.sql import exists | ||
|
||
from src.documents import models | ||
from src.documents import models as doc_models | ||
from src.documents import schemas as doc_schemas | ||
from src.models import ProjectUser | ||
from src.projects.dependencies import get_proj_by_id | ||
from src.users import schemas as user_schemas | ||
|
||
|
||
def read( | ||
document: doc_models.Document, user_id: UUID, db: Session | ||
) -> doc_schemas.Document: | ||
project_user = db.query( | ||
exists().where( | ||
ProjectUser.user_id == user_id, | ||
ProjectUser.project_id == document.project_id, | ||
) | ||
).scalar() | ||
|
||
if not project_user: | ||
raise HTTPException( | ||
status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden access" | ||
) | ||
|
||
return doc_schemas.Document.model_validate(document) | ||
|
||
|
||
def create( | ||
name: str | None, url: str, proj_id: UUID, user: user_schemas.User, db: Session | ||
) -> doc_schemas.Document: | ||
document = models.Document(name=name, url=url, owner_id=user.id, project_id=proj_id) | ||
document = doc_models.Document( | ||
name=name, url=url, owner_id=user.id, project_id=proj_id | ||
) | ||
db.add(document) | ||
db.commit() | ||
return doc_schemas.Document.model_validate(document) | ||
|
||
|
||
async def file_upload(file: UploadFile, proj_id: UUID) -> str: | ||
def update( | ||
document: doc_models.Document, file: UploadFile, user_id: UUID, db: Session | ||
) -> doc_schemas.Document: | ||
project_user = db.query( | ||
exists().where( | ||
ProjectUser.user_id == user_id, | ||
ProjectUser.project_id == document.project_id, | ||
) | ||
).scalar() | ||
|
||
if not project_user: | ||
raise HTTPException( | ||
status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden access" | ||
) | ||
if file.filename is not None: | ||
document.name = file.filename | ||
|
||
old_url = document.url | ||
url = file_upload(file, document.project_id) | ||
|
||
document.url = url | ||
db.commit() | ||
|
||
file_delete(old_url) | ||
|
||
return doc_schemas.Document.model_validate(document) | ||
|
||
|
||
def delete( | ||
document: doc_models.Document, | ||
user_id: UUID, | ||
db: Session, | ||
) -> None: | ||
project = get_proj_by_id(document.project_id, db) | ||
if project.owner_id != user_id: | ||
raise HTTPException( | ||
status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden access" | ||
) | ||
|
||
file_delete(document.url) | ||
db.delete(document) | ||
db.commit() | ||
|
||
|
||
def file_upload(file: UploadFile, proj_id: UUID) -> str: | ||
filename = f"{proj_id}_{file.filename}" | ||
path = ( | ||
pathlib.Path(__file__).parent.parent.parent / "bucket" / "documents" / filename | ||
) | ||
with open(path, "wb+") as f: | ||
contents = await file.read() | ||
f.write(contents) | ||
f.write(file.file.read()) | ||
|
||
return str(path) | ||
|
||
|
||
def file_delete(url: str) -> None: | ||
os.remove(url) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
import os | ||
|
||
from fastapi import UploadFile, status | ||
from fastapi.testclient import TestClient | ||
from sqlalchemy.orm import Session | ||
|
||
from src.documents.schemas import Document | ||
from src.projects.schemas import Project | ||
from src.users.schemas import User | ||
|
||
|
||
def test_download_document( | ||
client: TestClient, | ||
db: Session, | ||
test_user: User, | ||
test_projects: list[Project], | ||
test_token: str, | ||
mock_upload_file: UploadFile, | ||
test_documents: list[Document], | ||
) -> None: | ||
document = test_documents[0] | ||
|
||
res = client.get( | ||
f"/documents/{document.id}/", | ||
headers={"Authorization": f"Bearer {test_token}"}, | ||
) | ||
|
||
assert res.status_code == 200 | ||
|
||
|
||
def test_update_document( | ||
client: TestClient, | ||
db: Session, | ||
test_user: User, | ||
test_projects: list[Project], | ||
test_token: str, | ||
mock_upload_file: UploadFile, | ||
test_documents: list[Document], | ||
) -> None: | ||
project = test_projects[0] | ||
document = test_documents[0] | ||
|
||
data = { | ||
"file": ("mock_file.pdf", mock_upload_file.file, mock_upload_file.content_type) | ||
} | ||
|
||
res = client.put( | ||
f"/documents/{document.id}/", | ||
headers={"Authorization": f"Bearer {test_token}"}, | ||
files=data, | ||
) | ||
|
||
expected_end_of_path = os.path.join( | ||
"bucket", "documents", f"{project.id}_mock_file.pdf" | ||
) | ||
|
||
assert res.json()["url"].endswith(expected_end_of_path) | ||
|
||
|
||
def test_delete_document( | ||
client: TestClient, | ||
db: Session, | ||
test_user: User, | ||
test_projects: list[Project], | ||
test_token: str, | ||
mock_upload_file: UploadFile, | ||
test_documents: list[Document], | ||
) -> None: | ||
document = test_documents[0] | ||
|
||
res = client.delete( | ||
f"/documents/{document.id}", headers={"Authorization": f"Bearer {test_token}"} | ||
) | ||
|
||
assert res.status_code == status.HTTP_204_NO_CONTENT |