From 4b555d6e1bea1977a4a735f038bd1768259ed829 Mon Sep 17 00:00:00 2001 From: notarious Date: Tue, 20 Feb 2024 23:40:38 +0500 Subject: [PATCH 1/2] Add database indexes --- alembic/versions/0006_create_indexes.py | 143 ++++++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 alembic/versions/0006_create_indexes.py diff --git a/alembic/versions/0006_create_indexes.py b/alembic/versions/0006_create_indexes.py new file mode 100644 index 0000000..ff95d3d --- /dev/null +++ b/alembic/versions/0006_create_indexes.py @@ -0,0 +1,143 @@ +"""create indexes + +Revision ID: 0006 +Revises: 0005 +Create Date: 2024-02-20 23:11:06.364557 + +""" +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "0006" +down_revision: Union[str, None] = "0005" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.execute("COMMIT") + # chat participant + + # chat + op.create_index( + index_name="idx_chat_on_is_deleted_chat_type_updated_at", + table_name="chat", + schema="chat", + columns=["is_deleted", "chat_type", "updated_at"], + if_not_exists=True, + postgresql_concurrently=True, + ) + + op.execute( + """ + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_partial_on_updated_at ON chat.chat (updated_at DESC) + WHERE is_deleted IS FALSE AND chat_type = 'DIRECT'; + """ + ) + + op.create_index( + index_name="idx_chat_on_guid", + table_name="chat", + schema="chat", + columns=["guid"], + if_not_exists=True, + postgresql_concurrently=True, + ) + + op.create_index( + index_name="idx_chat_on_chat_type_is_deleted", + table_name="chat", + schema="chat", + columns=["chat_type", "is_deleted"], + if_not_exists=True, + postgresql_concurrently=True, + ) + + # user + op.create_index( + index_name="idx_user_on_email_username", + table_name="user", + schema="chat", + columns=["email", "username"], + if_not_exists=True, + postgresql_concurrently=True, + ) + + op.execute( + """ + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_user_partial_on_email_not_deleted ON chat.user (email) + WHERE is_deleted IS FALSE; + """ + ) + + # read status + op.create_index( + index_name="idx_read_status_on_chat_id", + table_name="read_status", + schema="chat", + columns=["chat_id"], + if_not_exists=True, + postgresql_concurrently=True, + ) + + op.create_index( + index_name="idx_read_status_on_user_id", + table_name="read_status", + schema="chat", + columns=["user_id"], + if_not_exists=True, + postgresql_concurrently=True, + ) + + op.create_index( + index_name="idx_read_status_on_user_id_chat_id", + table_name="read_status", + schema="chat", + columns=["user_id", "chat_id"], + if_not_exists=True, + postgresql_concurrently=True, + ) + + # message + op.create_index( + index_name="idx_message_on_chat_id", + table_name="message", + schema="chat", + columns=["chat_id"], + if_not_exists=True, + postgresql_concurrently=True, + ) + + op.create_index( + index_name="idx_message_on_user_id", + table_name="message", + schema="chat", + columns=["user_id"], + if_not_exists=True, + postgresql_concurrently=True, + ) + + op.create_index( + index_name="idx_message_on_user_id_chat_id", + table_name="message", + schema="chat", + columns=["user_id", "chat_id"], + if_not_exists=True, + postgresql_concurrently=True, + ) + + +def downgrade() -> None: + op.drop_index("idx_chat_on_is_deleted_chat_type_updated_at", schema="chat", if_exists=True) + op.drop_index("idx_chat_on_guid", schema="chat", if_exists=True) + op.drop_index("idx_chat_on_chat_type_is_deleted", schema="chat", if_exists=True) + op.drop_index("idx_chat_partial_on_updated_at", schema="chat", if_exists=True) + op.drop_index("idx_message_on_chat_id", schema="chat", if_exists=True) + op.drop_index("idx_message_on_user_id", schema="chat", if_exists=True) + op.drop_index("idx_message_on_user_id_chat_id", schema="chat", if_exists=True) + op.drop_index("idx_read_status_on_chat_id", schema="chat", if_exists=True) + op.drop_index("idx_read_status_on_user_id_chat_id", schema="chat", if_exists=True) + op.drop_index("idx_user_on_email_username", schema="chat", if_exists=True) + op.drop_index("idx_user_partial_on_email_not_deleted", schema="chat", if_exists=True) From 9efdb1f1b84dc55e9f34cde9c94675f4b7b43bab Mon Sep 17 00:00:00 2001 From: notarious Date: Sun, 25 Feb 2024 09:45:39 +0500 Subject: [PATCH 2/2] Increase user_image character length Recreate indexes properly with definition in models Improve query efficiency for getting unread messages count --- alembic/versions/0006_create_indexes.py | 143 -------------- ...reate_indexes_increase_user_image_char_.py | 174 ++++++++++++++++++ src/chat/router.py | 15 +- src/chat/schemas.py | 1 - src/chat/services.py | 58 +++++- src/models.py | 34 +++- 6 files changed, 270 insertions(+), 155 deletions(-) delete mode 100644 alembic/versions/0006_create_indexes.py create mode 100644 alembic/versions/0006_create_indexes_increase_user_image_char_.py diff --git a/alembic/versions/0006_create_indexes.py b/alembic/versions/0006_create_indexes.py deleted file mode 100644 index ff95d3d..0000000 --- a/alembic/versions/0006_create_indexes.py +++ /dev/null @@ -1,143 +0,0 @@ -"""create indexes - -Revision ID: 0006 -Revises: 0005 -Create Date: 2024-02-20 23:11:06.364557 - -""" -from typing import Sequence, Union - -from alembic import op - -# revision identifiers, used by Alembic. -revision: str = "0006" -down_revision: Union[str, None] = "0005" -branch_labels: Union[str, Sequence[str], None] = None -depends_on: Union[str, Sequence[str], None] = None - - -def upgrade() -> None: - op.execute("COMMIT") - # chat participant - - # chat - op.create_index( - index_name="idx_chat_on_is_deleted_chat_type_updated_at", - table_name="chat", - schema="chat", - columns=["is_deleted", "chat_type", "updated_at"], - if_not_exists=True, - postgresql_concurrently=True, - ) - - op.execute( - """ - CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_partial_on_updated_at ON chat.chat (updated_at DESC) - WHERE is_deleted IS FALSE AND chat_type = 'DIRECT'; - """ - ) - - op.create_index( - index_name="idx_chat_on_guid", - table_name="chat", - schema="chat", - columns=["guid"], - if_not_exists=True, - postgresql_concurrently=True, - ) - - op.create_index( - index_name="idx_chat_on_chat_type_is_deleted", - table_name="chat", - schema="chat", - columns=["chat_type", "is_deleted"], - if_not_exists=True, - postgresql_concurrently=True, - ) - - # user - op.create_index( - index_name="idx_user_on_email_username", - table_name="user", - schema="chat", - columns=["email", "username"], - if_not_exists=True, - postgresql_concurrently=True, - ) - - op.execute( - """ - CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_user_partial_on_email_not_deleted ON chat.user (email) - WHERE is_deleted IS FALSE; - """ - ) - - # read status - op.create_index( - index_name="idx_read_status_on_chat_id", - table_name="read_status", - schema="chat", - columns=["chat_id"], - if_not_exists=True, - postgresql_concurrently=True, - ) - - op.create_index( - index_name="idx_read_status_on_user_id", - table_name="read_status", - schema="chat", - columns=["user_id"], - if_not_exists=True, - postgresql_concurrently=True, - ) - - op.create_index( - index_name="idx_read_status_on_user_id_chat_id", - table_name="read_status", - schema="chat", - columns=["user_id", "chat_id"], - if_not_exists=True, - postgresql_concurrently=True, - ) - - # message - op.create_index( - index_name="idx_message_on_chat_id", - table_name="message", - schema="chat", - columns=["chat_id"], - if_not_exists=True, - postgresql_concurrently=True, - ) - - op.create_index( - index_name="idx_message_on_user_id", - table_name="message", - schema="chat", - columns=["user_id"], - if_not_exists=True, - postgresql_concurrently=True, - ) - - op.create_index( - index_name="idx_message_on_user_id_chat_id", - table_name="message", - schema="chat", - columns=["user_id", "chat_id"], - if_not_exists=True, - postgresql_concurrently=True, - ) - - -def downgrade() -> None: - op.drop_index("idx_chat_on_is_deleted_chat_type_updated_at", schema="chat", if_exists=True) - op.drop_index("idx_chat_on_guid", schema="chat", if_exists=True) - op.drop_index("idx_chat_on_chat_type_is_deleted", schema="chat", if_exists=True) - op.drop_index("idx_chat_partial_on_updated_at", schema="chat", if_exists=True) - op.drop_index("idx_message_on_chat_id", schema="chat", if_exists=True) - op.drop_index("idx_message_on_user_id", schema="chat", if_exists=True) - op.drop_index("idx_message_on_user_id_chat_id", schema="chat", if_exists=True) - op.drop_index("idx_read_status_on_chat_id", schema="chat", if_exists=True) - op.drop_index("idx_read_status_on_user_id_chat_id", schema="chat", if_exists=True) - op.drop_index("idx_user_on_email_username", schema="chat", if_exists=True) - op.drop_index("idx_user_partial_on_email_not_deleted", schema="chat", if_exists=True) diff --git a/alembic/versions/0006_create_indexes_increase_user_image_char_.py b/alembic/versions/0006_create_indexes_increase_user_image_char_.py new file mode 100644 index 0000000..1933b1f --- /dev/null +++ b/alembic/versions/0006_create_indexes_increase_user_image_char_.py @@ -0,0 +1,174 @@ +"""Create indexes, Increase user_image char length + +Revision ID: 0006 +Revises: 0005 +Create Date: 2024-02-25 04:37:53.273850 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "0006" +down_revision: Union[str, None] = "0005" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.execute("COMMIT") # to run concurrently + # ### commands auto generated by Alembic - please adjust! ### + op.create_index( + "idx_chat_on_guid", + "chat", + ["guid"], + unique=False, + schema="chat", + postgresql_concurrently=True, + if_not_exists=True, + ) + op.create_index( + "idx_chat_on_is_deleted_chat_type", + "chat", + ["is_deleted", "chat_type"], + unique=False, + schema="chat", + postgresql_concurrently=True, + if_not_exists=True, + ) + op.create_index( + "idx_message_on_chat_id", + "message", + ["chat_id"], + unique=False, + schema="chat", + postgresql_concurrently=True, + if_not_exists=True, + ) + op.create_index( + "idx_message_on_user_id", + "message", + ["user_id"], + unique=False, + schema="chat", + postgresql_concurrently=True, + if_not_exists=True, + ) + op.create_index( + "idx_message_on_user_id_chat_id", + "message", + ["chat_id", "user_id"], + unique=False, + schema="chat", + postgresql_concurrently=True, + if_not_exists=True, + ) + op.create_index( + "idx_read_status_on_chat_id", + "read_status", + ["chat_id"], + unique=False, + schema="chat", + postgresql_concurrently=True, + if_not_exists=True, + ) + op.create_index( + "idx_read_status_on_user_id", + "read_status", + ["user_id"], + unique=False, + schema="chat", + postgresql_concurrently=True, + if_not_exists=True, + ) + op.alter_column( + "user", + "user_image", + existing_type=sa.VARCHAR(length=1000), + type_=sa.String(length=1048), + existing_nullable=True, + schema="chat", + ) + op.create_index( + "idx_user_on_email_username", + "user", + ["email", "username"], + unique=False, + schema="chat", + postgresql_concurrently=True, + if_not_exists=True, + ) + op.create_index( + "idx_user_partial_on_email_not_deleted", + "user", + ["email", "is_deleted"], + unique=False, + schema="chat", + postgresql_concurrently=True, + postgresql_where=sa.text("is_deleted IS false"), + if_not_exists=True, + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + op.execute("COMMIT") # to run concurrently + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index( + "idx_user_partial_on_email_not_deleted", + table_name="user", + schema="chat", + postgresql_concurrently=True, + postgresql_where=sa.text("is_deleted IS false"), + if_exists=True, + ) + op.drop_index( + "idx_user_on_email_username", table_name="user", schema="chat", postgresql_concurrently=True, if_exists=True + ) + op.alter_column( + "user", + "user_image", + existing_type=sa.String(length=1048), + type_=sa.VARCHAR(length=1000), + existing_nullable=True, + schema="chat", + ) + op.drop_index( + "idx_read_status_on_user_id", + table_name="read_status", + schema="chat", + postgresql_concurrently=True, + if_exists=True, + ) + op.drop_index( + "idx_read_status_on_chat_id", + table_name="read_status", + schema="chat", + postgresql_concurrently=True, + if_exists=True, + ) + op.drop_index( + "idx_message_on_user_id_chat_id", + table_name="message", + schema="chat", + postgresql_concurrently=True, + if_exists=True, + ) + op.drop_index( + "idx_message_on_user_id", table_name="message", schema="chat", postgresql_concurrently=True, if_exists=True + ) + op.drop_index( + "idx_message_on_chat_id", table_name="message", schema="chat", postgresql_concurrently=True, if_exists=True + ) + op.drop_index( + "idx_chat_on_is_deleted_chat_type", + table_name="chat", + schema="chat", + postgresql_concurrently=True, + if_exists=True, + ) + op.drop_index("idx_chat_on_guid", table_name="chat", schema="chat", postgresql_concurrently=True, if_exists=True) + # ### end Alembic commands ### diff --git a/src/chat/router.py b/src/chat/router.py index 695c798..1090a2a 100644 --- a/src/chat/router.py +++ b/src/chat/router.py @@ -16,12 +16,12 @@ GetOldMessagesSchema, ) from src.chat.services import ( - add_new_messages_stats_to_direct_chat, create_direct_chat, direct_chat_exists, get_active_message_by_guid_and_chat, get_chat_by_guid, get_chat_messages, + get_new_messages_per_chat, get_older_chat_messages, get_unread_messages_count, get_user_by_guid, @@ -131,15 +131,16 @@ async def get_user_chats_view( chats: list[Chat] = await get_user_direct_chats(db_session, current_user=current_user) - # Store response in the cache with a TTL - direct_chats: list[GetDirectChatSchema] = [ - await add_new_messages_stats_to_direct_chat(db_session, current_user=current_user, chat=chat) for chat in chats - ] + chats_with_new_messages_count: list[GetDirectChatSchema] = await get_new_messages_per_chat( + db_session, chats, current_user + ) # calculate total unread messages count for all user's chats - total_unread_messages_count = sum(direct_chat.new_messages_count for direct_chat in direct_chats) + total_unread_messages_count = sum(direct_chat.new_messages_count for direct_chat in chats_with_new_messages_count) - response = GetDirectChatsSchema(chats=direct_chats, total_unread_messages_count=total_unread_messages_count) + response = GetDirectChatsSchema( + chats=chats_with_new_messages_count, total_unread_messages_count=total_unread_messages_count + ) if cache_enabled: # Store response in the cache with a TTL diff --git a/src/chat/schemas.py b/src/chat/schemas.py index 7dfc2fd..d33a2e7 100644 --- a/src/chat/schemas.py +++ b/src/chat/schemas.py @@ -58,7 +58,6 @@ class GetDirectChatSchema(BaseModel): created_at: datetime updated_at: datetime users: list[UserSchema] - has_new_messages: bool new_messages_count: int class Config: diff --git a/src/chat/services.py b/src/chat/services.py index 63a20ca..d7dacbe 100644 --- a/src/chat/services.py +++ b/src/chat/services.py @@ -1,13 +1,16 @@ +import logging from datetime import datetime from uuid import UUID from sqlalchemy import and_, func, select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import selectinload +from sqlalchemy.orm import aliased, selectinload from src.chat.schemas import GetDirectChatSchema, GetMessageSchema from src.models import Chat, ChatType, Message, ReadStatus, User +logger = logging.getLogger(__name__) + async def create_direct_chat(db_session: AsyncSession, *, initiator_user: User, recipient_user: User) -> Chat: try: @@ -66,6 +69,57 @@ async def get_user_by_guid(db_session: AsyncSession, *, user_guid: UUID) -> User return user +async def get_new_messages_per_chat( + db_session: AsyncSession, chats: list[Chat], current_user: User +) -> list[GetDirectChatSchema]: + """ + New message are those messages that: + - don't belong to current user + - are not yet read by current user + + """ + # Create a dictionary with default values of 0 + new_messages_count_per_chat = {chat.id: 0 for chat in chats} + + # Create an alias for the ReadStatus table + read_status_alias = aliased(ReadStatus) + + query = ( + select(Message.chat_id, func.count().label("message_count")) + .join( + read_status_alias, + and_(read_status_alias.user_id == current_user.id, read_status_alias.chat_id == Message.chat_id), + ) + .where( + and_( + Message.user_id != current_user.id, + Message.id > func.coalesce(read_status_alias.last_read_message_id, 0), + Message.is_deleted.is_(False), + Message.chat_id.in_(new_messages_count_per_chat), + ) + ) + .group_by(Message.chat_id) + ) + + result = await db_session.execute(query) + new_messages_count = result.fetchall() + + for messages_count in new_messages_count: + new_messages_count_per_chat[messages_count[0]] = messages_count[1] + + return [ + GetDirectChatSchema( + chat_guid=chat.guid, + chat_type=chat.chat_type, + created_at=chat.created_at, + updated_at=chat.updated_at, + users=chat.users, + new_messages_count=new_messages_count_per_chat[chat.id], + ) + for chat in chats + ] + + async def get_user_direct_chats(db_session: AsyncSession, *, current_user: User) -> list[Chat]: query = ( select(Chat) @@ -76,7 +130,7 @@ async def get_user_direct_chats(db_session: AsyncSession, *, current_user: User) Chat.chat_type == ChatType.DIRECT, ) ) - .options(selectinload(Chat.users), selectinload(Chat.read_statuses)) + .options(selectinload(Chat.users)) ).order_by(Chat.updated_at.desc()) result = await db_session.execute(query) diff --git a/src/models.py b/src/models.py index 5afed6a..14e4148 100644 --- a/src/models.py +++ b/src/models.py @@ -3,7 +3,7 @@ from datetime import datetime from typing import Any, List -from sqlalchemy import Boolean, Column, DateTime, Enum, ForeignKey, String, Table +from sqlalchemy import Boolean, Column, DateTime, Enum, ForeignKey, Index, String, Table from sqlalchemy.dialects.postgresql import JSONB, UUID from sqlalchemy.orm import Mapped, mapped_column, relationship @@ -35,8 +35,9 @@ class User(BaseModel): email: Mapped[str] = mapped_column(String(254), unique=True) last_login: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=True) is_superuser: Mapped[bool] = mapped_column(Boolean, default=False) - user_image: Mapped[str] = mapped_column(String(1000), nullable=True) + user_image: Mapped[str] = mapped_column(String(1048), nullable=True) settings: Mapped[dict[str, Any]] = mapped_column(JSONB, server_default="{}") + is_deleted: Mapped[bool] = mapped_column(Boolean, default=False) # redefine is_deleted column for index chats: Mapped[List["Chat"]] = relationship(secondary=chat_participant, back_populates="users") messages: Mapped[List["Message"]] = relationship(back_populates="user") @@ -45,6 +46,19 @@ class User(BaseModel): def __str__(self): return f"{self.username}" + # Indexes + + __table_args__ = ( + Index("idx_user_on_email_username", "email", "username", postgresql_concurrently=True), + Index( + "idx_user_partial_on_email_not_deleted", + "email", + "is_deleted", + postgresql_concurrently=True, + postgresql_where=(is_deleted.is_(False)), + ), + ) + class Chat(BaseModel): __tablename__ = "chat" @@ -60,6 +74,11 @@ class Chat(BaseModel): def __str__(self): return f"{self.chat_type.value.title()} {self.guid}" + __table_args__ = ( + Index("idx_chat_on_is_deleted_chat_type", "is_deleted", "chat_type", postgresql_concurrently=True), + Index("idx_chat_on_guid", "guid", postgresql_concurrently=True), + ) + class MessageType(enum.Enum): TEXT = "text" @@ -85,6 +104,12 @@ class Message(BaseModel): def __str__(self): return f"{self.content}" + __table_args__ = ( + Index("idx_message_on_chat_id", "chat_id", postgresql_concurrently=True), + Index("idx_message_on_user_id", "user_id", postgresql_concurrently=True), + Index("idx_message_on_user_id_chat_id", "chat_id", "user_id", postgresql_concurrently=True), + ) + class ReadStatus(RemoveBaseFieldsMixin, BaseModel): __tablename__ = "read_status" @@ -100,3 +125,8 @@ class ReadStatus(RemoveBaseFieldsMixin, BaseModel): def __str__(self): return f"User: {self.user_id}, Message: {self.last_read_message_id}" + + __table_args__ = ( + Index("idx_read_status_on_chat_id", "chat_id", postgresql_concurrently=True), + Index("idx_read_status_on_user_id", "user_id", postgresql_concurrently=True), + )