diff --git a/Makefile b/Makefile index 28257ce5..789bdc18 100644 --- a/Makefile +++ b/Makefile @@ -40,9 +40,17 @@ run: start: build_dev run -restart: - docker stop navigator-admin-backend && docker rm navigator-admin-backend && make start && docker logs -f navigator-admin-backend - +start_local: build + # - docker stop navigator-admin-backend + docker run -p 8888:8888 \ + --name navigator-admin-backend \ + --network=navigator-backend_default \ + -e ADMIN_POSTGRES_HOST=backend_db \ + -e SECRET_KEY="secret_test_key" \ + -d navigator-admin-backend + +restart: build + docker stop navigator-admin-backend && docker rm navigator-admin-backend && make start_local && docker logs -f navigator-admin-backend show_logs: - docker compose logs -f diff --git a/app/repository/document.py b/app/repository/document.py index 39dfb949..3c6bea3a 100644 --- a/app/repository/document.py +++ b/app/repository/document.py @@ -228,15 +228,9 @@ def update(db: Session, import_id: str, document: DocumentWriteDTO) -> bool: return False # User Language changed? - existing_language = ( - db.query(PhysicalDocumentLanguage) - .filter( - PhysicalDocumentLanguage.document_id == original_fd.physical_document_id - ) - .filter(PhysicalDocumentLanguage.source == LanguageSource.USER) - .one_or_none() - ) - new_language = _get_new_language(db, new_values, existing_language) + existing_language = _get_existing_language(db, original_fd) + new_language = _get_requested_language(db, new_values) + has_language_changed = not _is_language_equal(existing_language, new_language) update_slug = original_pd.title != new_values["title"] @@ -245,9 +239,11 @@ def update(db: Session, import_id: str, document: DocumentWriteDTO) -> bool: .where(PhysicalDocument.id == original_pd.id) .values( title=new_values["title"], - source_url=str(new_values["source_url"]) - if new_values["source_url"] is not None - else None, + source_url=( + str(new_values["source_url"]) + if new_values["source_url"] is not None + else None + ), ), db_update(FamilyDocument) .where(FamilyDocument.import_id == original_fd.import_id) @@ -258,33 +254,37 @@ def update(db: Session, import_id: str, document: DocumentWriteDTO) -> bool: ), ] - if new_language is not None: - if existing_language is not None: - command = ( - db_update(PhysicalDocumentLanguage) - .where( - and_( - PhysicalDocumentLanguage.document_id - == original_fd.physical_document_id, - PhysicalDocumentLanguage.source == LanguageSource.USER, + # Update logic to only perform update if not idempotent. + if has_language_changed: + if new_language is not None: + if existing_language is not None: + command = ( + db_update(PhysicalDocumentLanguage) + .where( + and_( + PhysicalDocumentLanguage.document_id + == original_fd.physical_document_id, + PhysicalDocumentLanguage.source == LanguageSource.USER, + ) ) + .values(language_id=new_language.id) + ) + else: + command = db_insert(PhysicalDocumentLanguage).values( + document_id=original_fd.physical_document_id, + language_id=new_language.id, + source=LanguageSource.USER, ) - .values(language_id=new_language.id) - ) - else: - command = db_insert(PhysicalDocumentLanguage).values( - document_id=original_fd.physical_document_id, - language_id=new_language.id, - source=LanguageSource.USER, - ) - commands.append(command) - else: - if existing_language is not None: - command = db_delete(PhysicalDocumentLanguage).where( - PhysicalDocumentLanguage.document_id == original_fd.physical_document_id - ) commands.append(command) + else: + if existing_language is not None: + command = db_delete(PhysicalDocumentLanguage).where( + PhysicalDocumentLanguage.document_id + == original_fd.physical_document_id + ) + commands.append(command) + for c in commands: result = db.execute(c) @@ -303,24 +303,50 @@ def update(db: Session, import_id: str, document: DocumentWriteDTO) -> bool: return True -def _get_new_language( - db: Session, new_values: dict, pdl: PhysicalDocumentLanguage +def _get_existing_language( + db: Session, original_fd: FamilyDocument ) -> Optional[Language]: + existing_language = ( + db.query(PhysicalDocumentLanguage) + .filter( + PhysicalDocumentLanguage.document_id == original_fd.physical_document_id + ) + .filter(PhysicalDocumentLanguage.source == LanguageSource.USER) + .one_or_none() + ) + return existing_language + + +def _get_requested_language(db: Session, new_values: dict) -> Optional[Language]: requested_language = new_values["user_language_name"] if requested_language is None: return None - else: - new_language = ( - db.query(Language) - .filter(Language.name == new_values["user_language_name"]) - .one() - ) - update_language = ( - pdl.language_id != new_language.id if pdl is not None else True + + new_language = ( + db.query(Language) + .filter(Language.name == new_values["user_language_name"]) + .one() + ) + return new_language + + +def _is_language_equal( + existing_language: Optional[Language], + requested_language: Optional[Language], +) -> bool: + """Check whether the language is idempotent.""" + if (existing_language == requested_language) is None: + return True + + if requested_language is not None: + is_idempotent = bool( + existing_language.language_id == requested_language.id + if existing_language is not None + else False ) + return is_idempotent - if update_language: - return new_language + return False def create(db: Session, document: DocumentCreateDTO) -> str: diff --git a/integration_tests/document/test_update.py b/integration_tests/document/test_update.py index e200b53d..529f2c8f 100644 --- a/integration_tests/document/test_update.py +++ b/integration_tests/document/test_update.py @@ -1,6 +1,7 @@ from typing import Tuple, cast from db_client.models.document.physical_document import ( + Language, LanguageSource, PhysicalDocument, PhysicalDocumentLanguage, @@ -91,66 +92,64 @@ def test_update_document_no_source_url( client: TestClient, test_db: Session, user_header_token ): setup_db(test_db) + + # Keep all values apart from the Source URL the same. new_document = DocumentWriteDTO( - variant_name="Translation", - role="SUMMARY", - type="Annex", - title="Updated Title No Source URL", + variant_name="Original Language", + role="MAIN", + type="Law", + title="big title1", source_url=None, - user_language_name="Ghotuo", + user_language_name="English", ) + response = client.put( - "/api/v1/documents/D.0.0.2", + "/api/v1/documents/D.0.0.1", json=new_document.model_dump(mode="json"), headers=user_header_token, ) assert response.status_code == status.HTTP_200_OK data = response.json() - assert data["import_id"] == "D.0.0.2" - assert data["variant_name"] == "Translation" - assert data["role"] == "SUMMARY" - assert data["type"] == "Annex" - assert data["title"] == new_document.title + assert data["import_id"] == "D.0.0.1" + assert data["variant_name"] == "Original Language" + assert data["role"] == "MAIN" + assert data["type"] == "Law" + assert data["title"] == "big title1" assert data["source_url"] == new_document.source_url - assert data["slug"].startswith("updated-title") - assert data["user_language_name"] == "Ghotuo" - fd, pd = _get_doc_tuple(test_db, "D.0.0.2") - assert fd.import_id == "D.0.0.2" - assert fd.variant_name == "Translation" - assert fd.document_role == "SUMMARY" - assert fd.document_type == "Annex" - assert pd.title == new_document.title + fd, pd = _get_doc_tuple(test_db, "D.0.0.1") + assert fd.import_id == "D.0.0.1" + assert fd.variant_name == "Original Language" + assert fd.document_role == "MAIN" + assert fd.document_type == "Law" + assert pd.title == "big title1" assert pd.source_url == new_document.source_url # Check the user language in the db - lang = ( - test_db.query(PhysicalDocumentLanguage) + _, lang = ( + test_db.query(PhysicalDocumentLanguage, Language) + .join(Language, PhysicalDocumentLanguage.language_id == Language.id) .filter(PhysicalDocumentLanguage.document_id == data["physical_id"]) .filter(PhysicalDocumentLanguage.source == LanguageSource.USER) .one() ) - assert lang.language_id == 1 - - # Check slug is updated too - slugs = ( - test_db.query(Slug).filter(Slug.family_document_import_id == "D.0.0.2").all() - ) - last_slug = slugs[-1].name - assert last_slug.startswith("updated-title") + assert lang.id == 1826 + assert data["user_language_name"] == lang.name def test_update_document_remove_variant( client: TestClient, test_db: Session, user_header_token ): setup_db(test_db) + + # Keep all values apart from the variant the same. new_document = DocumentWriteDTO( variant_name=None, - role="SUMMARY", - type="Annex", - title="Updated Title", + role="MAIN", + type="Law", + title="title2", source_url=cast(AnyHttpUrl, "http://update_source"), - user_language_name="Ghotuo", + user_language_name=None, ) response = client.put( "/api/v1/documents/D.0.0.2", @@ -161,47 +160,41 @@ def test_update_document_remove_variant( data = response.json() assert data["import_id"] == "D.0.0.2" assert data["variant_name"] is None - assert data["role"] == "SUMMARY" - assert data["type"] == "Annex" - assert data["title"] == "Updated Title" + assert data["role"] == "MAIN" + assert data["type"] == "Law" + assert data["title"] == "title2" assert data["source_url"] == "http://update_source/" - assert data["slug"].startswith("updated-title") - assert data["user_language_name"] == "Ghotuo" fd, pd = _get_doc_tuple(test_db, "D.0.0.2") assert fd.import_id == "D.0.0.2" assert fd.variant_name is None - assert fd.document_role == "SUMMARY" - assert fd.document_type == "Annex" - assert pd.title == "Updated Title" + assert fd.document_role == "MAIN" + assert fd.document_type == "Law" + assert pd.title == "title2" assert pd.source_url == "http://update_source/" # Check the user language in the db lang = ( - test_db.query(PhysicalDocumentLanguage) + test_db.query(PhysicalDocumentLanguage, Language) + .join(Language, PhysicalDocumentLanguage.language_id == Language.id) .filter(PhysicalDocumentLanguage.document_id == data["physical_id"]) .filter(PhysicalDocumentLanguage.source == LanguageSource.USER) - .one() - ) - assert lang.language_id == 1 - - # Check slug is updated too - slugs = ( - test_db.query(Slug).filter(Slug.family_document_import_id == "D.0.0.2").all() + .one_or_none() ) - last_slug = slugs[-1].name - assert last_slug.startswith("updated-title") + assert lang is None def test_update_document_remove_user_language( client: TestClient, test_db: Session, user_header_token ): setup_db(test_db) + + # Keep all values apart from the language the same as in setup_db.py. new_document = DocumentWriteDTO( - variant_name=None, - role="SUMMARY", - type="Annex", - title="Updated Title", + variant_name="Original Language", + role="MAIN", + type="Law", + title="big title1", source_url=cast(AnyHttpUrl, "http://update_source"), user_language_name=None, ) @@ -213,37 +206,29 @@ def test_update_document_remove_user_language( assert response.status_code == status.HTTP_200_OK data = response.json() assert data["import_id"] == "D.0.0.1" - assert data["variant_name"] is None - assert data["role"] == "SUMMARY" - assert data["type"] == "Annex" - assert data["title"] == "Updated Title" + assert data["variant_name"] == "Original Language" + assert data["role"] == "MAIN" + assert data["type"] == "Law" + assert data["title"] == "big title1" assert data["source_url"] == "http://update_source/" - assert data["slug"].startswith("updated-title") - assert data["user_language_name"] is None fd, pd = _get_doc_tuple(test_db, "D.0.0.1") assert fd.import_id == "D.0.0.1" - assert fd.variant_name is None - assert fd.document_role == "SUMMARY" - assert fd.document_type == "Annex" - assert pd.title == "Updated Title" - assert pd.source_url == "http://update_source/" + assert fd.variant_name == "Original Language" + assert fd.document_role == "MAIN" + assert fd.document_type == "Law" + assert pd.title == "big title1" # Check the user language in the db lang = ( - test_db.query(PhysicalDocumentLanguage) + test_db.query(PhysicalDocumentLanguage, Language) + .join(Language, PhysicalDocumentLanguage.language_id == Language.id) .filter(PhysicalDocumentLanguage.document_id == data["physical_id"]) .filter(PhysicalDocumentLanguage.source == LanguageSource.USER) .one_or_none() ) assert lang is None - - # Check slug is updated too - slugs = ( - test_db.query(Slug).filter(Slug.family_document_import_id == "D.0.0.1").all() - ) - last_slug = slugs[-1].name - assert last_slug.startswith("updated-title") + assert data["user_language_name"] is None def test_update_document_when_not_authorised(client: TestClient, test_db: Session): @@ -368,14 +353,13 @@ def test_update_document_idempotent_user_language( ): setup_db(test_db) new_document = DocumentWriteDTO( - variant_name="Translation", - role="SUMMARY", - type="Annex", - title="Updated Title", + variant_name="Original Language", + role="MAIN", + type="Law", + title="title2", source_url=cast(AnyHttpUrl, "http://update_source"), user_language_name=None, ) - print(new_document.model_dump(mode="json")) response = client.put( "/api/v1/documents/D.0.0.2", json=new_document.model_dump(mode="json"), @@ -384,34 +368,27 @@ def test_update_document_idempotent_user_language( assert response.status_code == status.HTTP_200_OK data = response.json() assert data["import_id"] == "D.0.0.2" - assert data["variant_name"] == "Translation" - assert data["role"] == "SUMMARY" - assert data["type"] == "Annex" - assert data["title"] == "Updated Title" + assert data["variant_name"] == "Original Language" + assert data["role"] == "MAIN" + assert data["type"] == "Law" + assert data["title"] == "title2" assert data["source_url"] == "http://update_source/" - assert data["slug"].startswith("updated-title") - assert data["user_language_name"] is None fd, pd = _get_doc_tuple(test_db, "D.0.0.2") assert fd.import_id == "D.0.0.2" - assert fd.variant_name == "Translation" - assert fd.document_role == "SUMMARY" - assert fd.document_type == "Annex" - assert pd.title == "Updated Title" + assert fd.variant_name == "Original Language" + assert fd.document_role == "MAIN" + assert fd.document_type == "Law" + assert pd.title == "title2" assert pd.source_url == "http://update_source/" # Check the user language in the db lang = ( - test_db.query(PhysicalDocumentLanguage) + test_db.query(PhysicalDocumentLanguage, Language) + .join(Language, PhysicalDocumentLanguage.language_id == Language.id) .filter(PhysicalDocumentLanguage.document_id == data["physical_id"]) .filter(PhysicalDocumentLanguage.source == LanguageSource.USER) .one_or_none() ) assert lang is None - - # Check slug is updated too - slugs = ( - test_db.query(Slug).filter(Slug.family_document_import_id == "D.0.0.2").all() - ) - last_slug = slugs[-1].name - assert last_slug.startswith("updated-title") + assert data["user_language_name"] is None