Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions modules/channels/widget/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Widget channel domain events."""

from dataclasses import dataclass, field

from modules.core.events import BaseEvent


@dataclass
class WidgetSessionCreated(BaseEvent):
"""Emitted on first user message in a widget chat session.

CRM domain subscribes to auto-create a lead in amoCRM.
"""

session_id: str = ""
first_message: str = ""
visitor_metadata: dict = field(default_factory=dict)


@dataclass
class WidgetMessageSent(BaseEvent):
"""Emitted after each conversation turn in a widget chat session.

CRM domain subscribes to append notes to an amoCRM lead.
"""

session_id: str = ""
lead_id: int = 0
user_message: str = ""
assistant_response: str = ""


@dataclass
class WidgetContactSubmitted(BaseEvent):
"""Emitted when a visitor submits contact info via widget lead form.

CRM domain subscribes to create an amoCRM contact and link to a lead.
"""

session_id: str = ""
contact_name: str = ""
phone: str = ""
email: str = ""
visitor_metadata: dict = field(default_factory=dict)
249 changes: 55 additions & 194 deletions modules/channels/widget/router_public.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Public widget endpoints — no authentication required.

Serves the embeddable widget JS, chat sessions, streaming responses,
and contact form submissions with amoCRM integration.
and contact form submissions. CRM integration (amoCRM lead/contact
creation) is handled reactively via EventBus — see modules/crm/startup.py.
"""

import json
Expand All @@ -14,7 +15,6 @@
from fastapi.responses import Response, StreamingResponse

from cloud_llm_service import CloudLLMService
from db.retry import retry_on_busy
from modules.channels.widget.service import widget_instance_service
from modules.chat.service import chat_service
from modules.core.service import config_service
Expand All @@ -34,103 +34,6 @@ def _escape_js_string(s: str) -> str:
return s.replace("\\", "\\\\").replace("'", "\\'").replace("\n", "\\n").replace("\r", "\\r")


@retry_on_busy()
async def _save_amocrm_lead_id(session_id: str, lead_id: int) -> None:
"""Persist amoCRM lead_id on a chat session (retryable)."""
from sqlalchemy import update as sa_update

from db.database import AsyncSessionLocal
from db.models import ChatSession as ChatSessionModel

async with AsyncSessionLocal() as db_session:
await db_session.execute(
sa_update(ChatSessionModel)
.where(ChatSessionModel.id == session_id)
.values(amocrm_lead_id=lead_id)
)
await db_session.commit()


async def _widget_create_amocrm_lead(session_id: str, session_data: dict, content: str):
"""Fire-and-forget: create amoCRM lead for a widget session."""
try:
from app.services import amocrm_service
from modules.crm.service import amocrm_service as amocrm_db_service

config = await amocrm_db_service.get_config_with_secrets()
if not config or not config.get("access_token") or not config.get("auto_create_lead"):
return

subdomain = config["subdomain"]
access_token = config["access_token"]
pipeline_id = config.get("lead_pipeline_id")
status_id = config.get("lead_status_id")

# Build lead name from visitor metadata
metadata = session_data.get("visitor_metadata") or {}
if isinstance(metadata, str):
metadata = json.loads(metadata)
page_title = metadata.get("page_title", "")
page_url = metadata.get("page_url", "")
lead_name = f"Виджет: {page_title or page_url or session_id[:8]}"

result = await amocrm_service.create_lead(
subdomain, access_token, lead_name, pipeline_id, status_id
)

lead_id = None
if result and "_embedded" in result and "leads" in result["_embedded"]:
lead_id = result["_embedded"]["leads"][0].get("id")

if not lead_id:
logger.warning("amoCRM create_lead returned no lead ID: %s", result)
return

# Save lead_id to session
await _save_amocrm_lead_id(session_id, lead_id)

# Add first message + metadata as note
note_parts = [f"Первое сообщение: {content}"]
if metadata.get("page_url"):
note_parts.append(f"Страница: {metadata['page_url']}")
if metadata.get("ip"):
note_parts.append(f"IP: {metadata['ip']}")
utm_parts = []
for key in ("utm_source", "utm_medium", "utm_campaign"):
if metadata.get(key):
utm_parts.append(f"{key}={metadata[key]}")
if utm_parts:
note_parts.append(f"UTM: {', '.join(utm_parts)}")
if metadata.get("referrer"):
note_parts.append(f"Referrer: {metadata['referrer']}")

await amocrm_service.add_note_to_lead(
subdomain, access_token, lead_id, "\n".join(note_parts)
)
logger.info("Created amoCRM lead %s for widget session %s", lead_id, session_id)

except Exception:
logger.debug("Failed to create amoCRM lead for session %s", session_id, exc_info=True)


async def _widget_add_note_to_lead(session_id: str, lead_id: int, user_text: str, ai_text: str):
"""Fire-and-forget: append conversation turn to amoCRM lead notes."""
try:
from app.services import amocrm_service
from modules.crm.service import amocrm_service as amocrm_db_service

config = await amocrm_db_service.get_config_with_secrets()
if not config or not config.get("access_token"):
return

note = f"Пользователь: {user_text}\nAI: {ai_text}"
await amocrm_service.add_note_to_lead(
config["subdomain"], config["access_token"], lead_id, note
)
except Exception:
logger.debug("Failed to add note to lead %s", lead_id, exc_info=True)


# ============== Endpoints ==============


Expand Down Expand Up @@ -385,11 +288,25 @@ async def widget_stream_message(request: Request, session_id: str):

user_msg = await chat_service.add_message(session_id, "user", content)

# Auto-create amoCRM lead on first user message (fire-and-forget)
# Publish WidgetSessionCreated on first message (fire-and-forget → CRM creates lead)
if not session.get("amocrm_lead_id"):
import asyncio

asyncio.create_task(_widget_create_amocrm_lead(session_id, session, content))
from app.dependencies import get_container
from modules.channels.widget.events import WidgetSessionCreated

metadata = session.get("visitor_metadata") or {}
if isinstance(metadata, str):
metadata = json.loads(metadata)
asyncio.create_task(
get_container().event_bus.publish(
WidgetSessionCreated(
session_id=session_id,
first_message=content,
visitor_metadata=metadata,
)
)
)

default_prompt = custom_prompt
if not default_prompt and hasattr(active_llm, "get_system_prompt"):
Expand All @@ -411,12 +328,22 @@ async def generate_stream():
yield f"data: {json.dumps({'type': 'assistant_message', 'message': assistant_msg}, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"

# Append conversation turn to amoCRM lead notes (fire-and-forget)
# Publish WidgetMessageSent (fire-and-forget → CRM appends note)
if lead_id and response_text:
import asyncio

from app.dependencies import get_container
from modules.channels.widget.events import WidgetMessageSent

asyncio.create_task(
_widget_add_note_to_lead(session_id, lead_id, content, response_text)
get_container().event_bus.publish(
WidgetMessageSent(
session_id=session_id,
lead_id=lead_id,
user_message=content,
assistant_response=response_text,
)
)
)
except Exception as e:
logger.error(f"❌ Widget chat stream error: {e}")
Expand All @@ -431,7 +358,11 @@ async def generate_stream():

@router.post("/widget/chat/session/{session_id}/contacts")
async def widget_submit_contacts(request: Request, session_id: str):
"""Public: submit contact info from widget lead form → create/update amoCRM contact."""
"""Public: submit contact info from widget lead form.

Publishes WidgetContactSubmitted event; CRM domain handles amoCRM
contact/lead creation reactively.
"""
body = await request.json()
name = body.get("name", "").strip()
phone = body.get("phone", "").strip()
Expand All @@ -446,98 +377,28 @@ async def widget_submit_contacts(request: Request, session_id: str):
if not session or session.get("source") != "widget":
raise HTTPException(status_code=404, detail="Session not found")

try:
from app.services import amocrm_service
from modules.crm.service import amocrm_service as amocrm_db_service

config = await amocrm_db_service.get_config_with_secrets()
if not config or not config.get("access_token"):
return {"ok": True, "crm": False, "reason": "amoCRM not connected"}

subdomain = config["subdomain"]
access_token = config["access_token"]

# Build custom_fields for phone/email
custom_fields = []
if phone:
custom_fields.append({"field_code": "PHONE", "values": [{"value": phone}]})
if email:
custom_fields.append({"field_code": "EMAIL", "values": [{"value": email}]})

# Create contact
result = await amocrm_service.create_contact(subdomain, access_token, name, custom_fields)

contact_id = None
if result and "_embedded" in result and "contacts" in result["_embedded"]:
contact_id = result["_embedded"]["contacts"][0].get("id")

if not contact_id:
logger.warning("amoCRM create_contact returned no contact ID: %s", result)
return {"ok": True, "crm": False, "reason": "Contact creation failed"}

# Save contact_id to session
from sqlalchemy import update as sa_update

from db.database import AsyncSessionLocal
from db.models import ChatSession as ChatSessionModel
from app.dependencies import get_container
from modules.channels.widget.events import WidgetContactSubmitted

lead_id = session.get("amocrm_lead_id")
metadata = session.get("visitor_metadata") or {}
if isinstance(metadata, str):
metadata = json.loads(metadata)

async with AsyncSessionLocal() as db_session:
await db_session.execute(
sa_update(ChatSessionModel)
.where(ChatSessionModel.id == session_id)
.values(amocrm_contact_id=contact_id)
)
await db_session.commit()

# Link contact to existing lead
if lead_id:
await amocrm_service.link_contact_to_lead(subdomain, access_token, lead_id, contact_id)
# Add note about contact info
note_parts = [f"Контакт оставлен: {name}"]
if phone:
note_parts.append(f"Телефон: {phone}")
if email:
note_parts.append(f"Email: {email}")
await amocrm_service.add_note_to_lead(
subdomain, access_token, lead_id, "\n".join(note_parts)
)
else:
# No lead yet — create one with this contact
pipeline_id = config.get("lead_pipeline_id")
status_id = config.get("lead_status_id")
metadata = session.get("visitor_metadata") or {}
if isinstance(metadata, str):
metadata = json.loads(metadata)
page_title = metadata.get("page_title", "")
page_url = metadata.get("page_url", "")
lead_name = f"Виджет: {name} ({page_title or page_url or session_id[:8]})"

lead_result = await amocrm_service.create_lead(
subdomain, access_token, lead_name, pipeline_id, status_id, contact_id
try:
await get_container().event_bus.publish(
WidgetContactSubmitted(
session_id=session_id,
contact_name=name,
phone=phone,
email=email,
visitor_metadata=metadata,
)
new_lead_id = None
if lead_result and "_embedded" in lead_result and "leads" in lead_result["_embedded"]:
new_lead_id = lead_result["_embedded"]["leads"][0].get("id")

if new_lead_id:
async with AsyncSessionLocal() as db_session:
await db_session.execute(
sa_update(ChatSessionModel)
.where(ChatSessionModel.id == session_id)
.values(amocrm_lead_id=new_lead_id)
)
await db_session.commit()

logger.info(
"Widget contact submitted: %s (contact=%s, lead=%s)",
name,
contact_id,
lead_id,
)
return {"ok": True, "crm": True}

except Exception:
logger.error("Failed to create amoCRM contact for session %s", session_id, exc_info=True)
return {"ok": True, "crm": False, "reason": "CRM error"}
logger.error(
"Failed to publish WidgetContactSubmitted for session %s",
session_id,
exc_info=True,
)

return {"ok": True}
2 changes: 2 additions & 0 deletions modules/core/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,11 @@ async def on_session_revoked(event: SessionRevoked) -> None:
event_bus.subscribe(SessionRevoked, on_session_revoked)

# Domain-specific subscriptions
from modules.crm.startup import setup_crm_event_subscriptions
from modules.knowledge.startup import setup_knowledge_event_subscriptions
from modules.llm.startup import setup_llm_event_subscriptions

await setup_crm_event_subscriptions(event_bus)
await setup_knowledge_event_subscriptions(event_bus)
await setup_llm_event_subscriptions(event_bus)

Expand Down
Loading
Loading