Skip to content

Commit

Permalink
Pdct 981/admin api if language idempotent language is removed from do…
Browse files Browse the repository at this point in the history
…cument in db (#100)
  • Loading branch information
katybaulch authored Mar 19, 2024
1 parent 6ce2761 commit f5d9b31
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 147 deletions.
14 changes: 11 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,17 @@ run:

start: build_dev run

restart:
docker stop navigator-admin-backend && docker rm navigator-admin-backend && make start && docker logs -f navigator-admin-backend

start_local: build
# - docker stop navigator-admin-backend
docker run -p 8888:8888 \
--name navigator-admin-backend \
--network=navigator-backend_default \
-e ADMIN_POSTGRES_HOST=backend_db \
-e SECRET_KEY="secret_test_key" \
-d navigator-admin-backend

restart: build
docker stop navigator-admin-backend && docker rm navigator-admin-backend && make start_local && docker logs -f navigator-admin-backend

show_logs:
- docker compose logs -f
120 changes: 73 additions & 47 deletions app/repository/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,9 @@ def update(db: Session, import_id: str, document: DocumentWriteDTO) -> bool:
return False

# User Language changed?
existing_language = (
db.query(PhysicalDocumentLanguage)
.filter(
PhysicalDocumentLanguage.document_id == original_fd.physical_document_id
)
.filter(PhysicalDocumentLanguage.source == LanguageSource.USER)
.one_or_none()
)
new_language = _get_new_language(db, new_values, existing_language)
existing_language = _get_existing_language(db, original_fd)
new_language = _get_requested_language(db, new_values)
has_language_changed = not _is_language_equal(existing_language, new_language)

update_slug = original_pd.title != new_values["title"]

Expand All @@ -245,9 +239,11 @@ def update(db: Session, import_id: str, document: DocumentWriteDTO) -> bool:
.where(PhysicalDocument.id == original_pd.id)
.values(
title=new_values["title"],
source_url=str(new_values["source_url"])
if new_values["source_url"] is not None
else None,
source_url=(
str(new_values["source_url"])
if new_values["source_url"] is not None
else None
),
),
db_update(FamilyDocument)
.where(FamilyDocument.import_id == original_fd.import_id)
Expand All @@ -258,33 +254,37 @@ def update(db: Session, import_id: str, document: DocumentWriteDTO) -> bool:
),
]

if new_language is not None:
if existing_language is not None:
command = (
db_update(PhysicalDocumentLanguage)
.where(
and_(
PhysicalDocumentLanguage.document_id
== original_fd.physical_document_id,
PhysicalDocumentLanguage.source == LanguageSource.USER,
# Update logic to only perform update if not idempotent.
if has_language_changed:
if new_language is not None:
if existing_language is not None:
command = (
db_update(PhysicalDocumentLanguage)
.where(
and_(
PhysicalDocumentLanguage.document_id
== original_fd.physical_document_id,
PhysicalDocumentLanguage.source == LanguageSource.USER,
)
)
.values(language_id=new_language.id)
)
else:
command = db_insert(PhysicalDocumentLanguage).values(
document_id=original_fd.physical_document_id,
language_id=new_language.id,
source=LanguageSource.USER,
)
.values(language_id=new_language.id)
)
else:
command = db_insert(PhysicalDocumentLanguage).values(
document_id=original_fd.physical_document_id,
language_id=new_language.id,
source=LanguageSource.USER,
)
commands.append(command)
else:
if existing_language is not None:
command = db_delete(PhysicalDocumentLanguage).where(
PhysicalDocumentLanguage.document_id == original_fd.physical_document_id
)
commands.append(command)

else:
if existing_language is not None:
command = db_delete(PhysicalDocumentLanguage).where(
PhysicalDocumentLanguage.document_id
== original_fd.physical_document_id
)
commands.append(command)

for c in commands:
result = db.execute(c)

Expand All @@ -303,24 +303,50 @@ def update(db: Session, import_id: str, document: DocumentWriteDTO) -> bool:
return True


def _get_new_language(
db: Session, new_values: dict, pdl: PhysicalDocumentLanguage
def _get_existing_language(
db: Session, original_fd: FamilyDocument
) -> Optional[Language]:
existing_language = (
db.query(PhysicalDocumentLanguage)
.filter(
PhysicalDocumentLanguage.document_id == original_fd.physical_document_id
)
.filter(PhysicalDocumentLanguage.source == LanguageSource.USER)
.one_or_none()
)
return existing_language


def _get_requested_language(db: Session, new_values: dict) -> Optional[Language]:
requested_language = new_values["user_language_name"]
if requested_language is None:
return None
else:
new_language = (
db.query(Language)
.filter(Language.name == new_values["user_language_name"])
.one()
)
update_language = (
pdl.language_id != new_language.id if pdl is not None else True

new_language = (
db.query(Language)
.filter(Language.name == new_values["user_language_name"])
.one()
)
return new_language


def _is_language_equal(
existing_language: Optional[Language],
requested_language: Optional[Language],
) -> bool:
"""Check whether the language is idempotent."""
if (existing_language == requested_language) is None:
return True

if requested_language is not None:
is_idempotent = bool(
existing_language.language_id == requested_language.id
if existing_language is not None
else False
)
return is_idempotent

if update_language:
return new_language
return False


def create(db: Session, document: DocumentCreateDTO) -> str:
Expand Down
Loading

0 comments on commit f5d9b31

Please sign in to comment.