Skip to content

Commit

Permalink
Merge branch 'master' of github.com:notarious2/fastapi-chat
Browse files Browse the repository at this point in the history
  • Loading branch information
notarious2 committed Mar 2, 2024
2 parents 69fa09c + 9efdb1f commit d71b706
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 12 deletions.
174 changes: 174 additions & 0 deletions alembic/versions/0006_create_indexes_increase_user_image_char_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""Create indexes, Increase user_image char length
Revision ID: 0006
Revises: 0005
Create Date: 2024-02-25 04:37:53.273850
"""
from typing import Sequence, Union

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "0006"
down_revision: Union[str, None] = "0005"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.execute("COMMIT") # to run concurrently
# ### commands auto generated by Alembic - please adjust! ###
op.create_index(
"idx_chat_on_guid",
"chat",
["guid"],
unique=False,
schema="chat",
postgresql_concurrently=True,
if_not_exists=True,
)
op.create_index(
"idx_chat_on_is_deleted_chat_type",
"chat",
["is_deleted", "chat_type"],
unique=False,
schema="chat",
postgresql_concurrently=True,
if_not_exists=True,
)
op.create_index(
"idx_message_on_chat_id",
"message",
["chat_id"],
unique=False,
schema="chat",
postgresql_concurrently=True,
if_not_exists=True,
)
op.create_index(
"idx_message_on_user_id",
"message",
["user_id"],
unique=False,
schema="chat",
postgresql_concurrently=True,
if_not_exists=True,
)
op.create_index(
"idx_message_on_user_id_chat_id",
"message",
["chat_id", "user_id"],
unique=False,
schema="chat",
postgresql_concurrently=True,
if_not_exists=True,
)
op.create_index(
"idx_read_status_on_chat_id",
"read_status",
["chat_id"],
unique=False,
schema="chat",
postgresql_concurrently=True,
if_not_exists=True,
)
op.create_index(
"idx_read_status_on_user_id",
"read_status",
["user_id"],
unique=False,
schema="chat",
postgresql_concurrently=True,
if_not_exists=True,
)
op.alter_column(
"user",
"user_image",
existing_type=sa.VARCHAR(length=1000),
type_=sa.String(length=1048),
existing_nullable=True,
schema="chat",
)
op.create_index(
"idx_user_on_email_username",
"user",
["email", "username"],
unique=False,
schema="chat",
postgresql_concurrently=True,
if_not_exists=True,
)
op.create_index(
"idx_user_partial_on_email_not_deleted",
"user",
["email", "is_deleted"],
unique=False,
schema="chat",
postgresql_concurrently=True,
postgresql_where=sa.text("is_deleted IS false"),
if_not_exists=True,
)
# ### end Alembic commands ###


def downgrade() -> None:
op.execute("COMMIT") # to run concurrently
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"idx_user_partial_on_email_not_deleted",
table_name="user",
schema="chat",
postgresql_concurrently=True,
postgresql_where=sa.text("is_deleted IS false"),
if_exists=True,
)
op.drop_index(
"idx_user_on_email_username", table_name="user", schema="chat", postgresql_concurrently=True, if_exists=True
)
op.alter_column(
"user",
"user_image",
existing_type=sa.String(length=1048),
type_=sa.VARCHAR(length=1000),
existing_nullable=True,
schema="chat",
)
op.drop_index(
"idx_read_status_on_user_id",
table_name="read_status",
schema="chat",
postgresql_concurrently=True,
if_exists=True,
)
op.drop_index(
"idx_read_status_on_chat_id",
table_name="read_status",
schema="chat",
postgresql_concurrently=True,
if_exists=True,
)
op.drop_index(
"idx_message_on_user_id_chat_id",
table_name="message",
schema="chat",
postgresql_concurrently=True,
if_exists=True,
)
op.drop_index(
"idx_message_on_user_id", table_name="message", schema="chat", postgresql_concurrently=True, if_exists=True
)
op.drop_index(
"idx_message_on_chat_id", table_name="message", schema="chat", postgresql_concurrently=True, if_exists=True
)
op.drop_index(
"idx_chat_on_is_deleted_chat_type",
table_name="chat",
schema="chat",
postgresql_concurrently=True,
if_exists=True,
)
op.drop_index("idx_chat_on_guid", table_name="chat", schema="chat", postgresql_concurrently=True, if_exists=True)
# ### end Alembic commands ###
15 changes: 8 additions & 7 deletions src/chat/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
GetOldMessagesSchema,
)
from src.chat.services import (
add_new_messages_stats_to_direct_chat,
create_direct_chat,
direct_chat_exists,
get_active_message_by_guid_and_chat,
get_chat_by_guid,
get_chat_messages,
get_new_messages_per_chat,
get_older_chat_messages,
get_unread_messages_count,
get_user_by_guid,
Expand Down Expand Up @@ -131,15 +131,16 @@ async def get_user_chats_view(

chats: list[Chat] = await get_user_direct_chats(db_session, current_user=current_user)

# Store response in the cache with a TTL
direct_chats: list[GetDirectChatSchema] = [
await add_new_messages_stats_to_direct_chat(db_session, current_user=current_user, chat=chat) for chat in chats
]
chats_with_new_messages_count: list[GetDirectChatSchema] = await get_new_messages_per_chat(
db_session, chats, current_user
)

# calculate total unread messages count for all user's chats
total_unread_messages_count = sum(direct_chat.new_messages_count for direct_chat in direct_chats)
total_unread_messages_count = sum(direct_chat.new_messages_count for direct_chat in chats_with_new_messages_count)

response = GetDirectChatsSchema(chats=direct_chats, total_unread_messages_count=total_unread_messages_count)
response = GetDirectChatsSchema(
chats=chats_with_new_messages_count, total_unread_messages_count=total_unread_messages_count
)

if cache_enabled:
# Store response in the cache with a TTL
Expand Down
1 change: 0 additions & 1 deletion src/chat/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ class GetDirectChatSchema(BaseModel):
created_at: datetime
updated_at: datetime
users: list[UserSchema]
has_new_messages: bool
new_messages_count: int

class Config:
Expand Down
58 changes: 56 additions & 2 deletions src/chat/services.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import logging
from datetime import datetime
from uuid import UUID

from sqlalchemy import and_, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import aliased, selectinload

from src.chat.schemas import GetDirectChatSchema, GetMessageSchema
from src.models import Chat, ChatType, Message, ReadStatus, User

logger = logging.getLogger(__name__)


async def create_direct_chat(db_session: AsyncSession, *, initiator_user: User, recipient_user: User) -> Chat:
try:
Expand Down Expand Up @@ -66,6 +69,57 @@ async def get_user_by_guid(db_session: AsyncSession, *, user_guid: UUID) -> User
return user


async def get_new_messages_per_chat(
db_session: AsyncSession, chats: list[Chat], current_user: User
) -> list[GetDirectChatSchema]:
"""
New message are those messages that:
- don't belong to current user
- are not yet read by current user
"""
# Create a dictionary with default values of 0
new_messages_count_per_chat = {chat.id: 0 for chat in chats}

# Create an alias for the ReadStatus table
read_status_alias = aliased(ReadStatus)

query = (
select(Message.chat_id, func.count().label("message_count"))
.join(
read_status_alias,
and_(read_status_alias.user_id == current_user.id, read_status_alias.chat_id == Message.chat_id),
)
.where(
and_(
Message.user_id != current_user.id,
Message.id > func.coalesce(read_status_alias.last_read_message_id, 0),
Message.is_deleted.is_(False),
Message.chat_id.in_(new_messages_count_per_chat),
)
)
.group_by(Message.chat_id)
)

result = await db_session.execute(query)
new_messages_count = result.fetchall()

for messages_count in new_messages_count:
new_messages_count_per_chat[messages_count[0]] = messages_count[1]

return [
GetDirectChatSchema(
chat_guid=chat.guid,
chat_type=chat.chat_type,
created_at=chat.created_at,
updated_at=chat.updated_at,
users=chat.users,
new_messages_count=new_messages_count_per_chat[chat.id],
)
for chat in chats
]


async def get_user_direct_chats(db_session: AsyncSession, *, current_user: User) -> list[Chat]:
query = (
select(Chat)
Expand All @@ -76,7 +130,7 @@ async def get_user_direct_chats(db_session: AsyncSession, *, current_user: User)
Chat.chat_type == ChatType.DIRECT,
)
)
.options(selectinload(Chat.users), selectinload(Chat.read_statuses))
.options(selectinload(Chat.users))
).order_by(Chat.updated_at.desc())
result = await db_session.execute(query)

Expand Down
34 changes: 32 additions & 2 deletions src/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime
from typing import Any, List

from sqlalchemy import Boolean, Column, DateTime, Enum, ForeignKey, String, Table
from sqlalchemy import Boolean, Column, DateTime, Enum, ForeignKey, Index, String, Table
from sqlalchemy.dialects.postgresql import JSONB, UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship

Expand Down Expand Up @@ -35,8 +35,9 @@ class User(BaseModel):
email: Mapped[str] = mapped_column(String(254), unique=True)
last_login: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=True)
is_superuser: Mapped[bool] = mapped_column(Boolean, default=False)
user_image: Mapped[str] = mapped_column(String(1000), nullable=True)
user_image: Mapped[str] = mapped_column(String(1048), nullable=True)
settings: Mapped[dict[str, Any]] = mapped_column(JSONB, server_default="{}")
is_deleted: Mapped[bool] = mapped_column(Boolean, default=False) # redefine is_deleted column for index

chats: Mapped[List["Chat"]] = relationship(secondary=chat_participant, back_populates="users")
messages: Mapped[List["Message"]] = relationship(back_populates="user")
Expand All @@ -45,6 +46,19 @@ class User(BaseModel):
def __str__(self):
return f"{self.username}"

# Indexes

__table_args__ = (
Index("idx_user_on_email_username", "email", "username", postgresql_concurrently=True),
Index(
"idx_user_partial_on_email_not_deleted",
"email",
"is_deleted",
postgresql_concurrently=True,
postgresql_where=(is_deleted.is_(False)),
),
)


class Chat(BaseModel):
__tablename__ = "chat"
Expand All @@ -60,6 +74,11 @@ class Chat(BaseModel):
def __str__(self):
return f"{self.chat_type.value.title()} {self.guid}"

__table_args__ = (
Index("idx_chat_on_is_deleted_chat_type", "is_deleted", "chat_type", postgresql_concurrently=True),
Index("idx_chat_on_guid", "guid", postgresql_concurrently=True),
)


class MessageType(enum.Enum):
TEXT = "text"
Expand All @@ -85,6 +104,12 @@ class Message(BaseModel):
def __str__(self):
return f"{self.content}"

__table_args__ = (
Index("idx_message_on_chat_id", "chat_id", postgresql_concurrently=True),
Index("idx_message_on_user_id", "user_id", postgresql_concurrently=True),
Index("idx_message_on_user_id_chat_id", "chat_id", "user_id", postgresql_concurrently=True),
)


class ReadStatus(RemoveBaseFieldsMixin, BaseModel):
__tablename__ = "read_status"
Expand All @@ -100,3 +125,8 @@ class ReadStatus(RemoveBaseFieldsMixin, BaseModel):

def __str__(self):
return f"User: {self.user_id}, Message: {self.last_read_message_id}"

__table_args__ = (
Index("idx_read_status_on_chat_id", "chat_id", postgresql_concurrently=True),
Index("idx_read_status_on_user_id", "user_id", postgresql_concurrently=True),
)

0 comments on commit d71b706

Please sign in to comment.