Skip to content
1 change: 1 addition & 0 deletions real_intent/deliver/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Take a list of MD5WithPII (leads) and format/deliver them. Do something with them, essentially."""
from real_intent.deliver.csv import CSVStringFormatter
from real_intent.deliver.neoworlder import (
CampaignType,
NeoworlderDeliverer,
NeoworlderAPIError,
NeoworlderAuthError,
Expand Down
60 changes: 47 additions & 13 deletions real_intent/deliver/neoworlder/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Deliverer for NeoWorlder AI lead nurturing platform."""
import requests

import csv
import io
import json
import requests
from enum import StrEnum
from typing import Any

from real_intent.deliver.base import BaseOutputDeliverer
Expand All @@ -16,6 +19,12 @@
TIMEOUT_SECONDS = 30


class CampaignType(StrEnum):
"""NeoWorlder campaign types for persona routing."""
SELLER = "seller"
BUYER = "buyer"


# ---- Exceptions ----

class NeoworlderAPIError(Exception):
Expand Down Expand Up @@ -54,7 +63,7 @@ class NeoworlderDeliverer(BaseOutputDeliverer):
Example:
deliverer = NeoworlderDeliverer(
api_key="...",
base_url=NeoworlderDeliverer.STAGING_BASE_URL,
base_url="https://apiurl.com",
customer_name="John Doe",
customer_email="john@example.com",
)
Expand All @@ -65,10 +74,6 @@ class NeoworlderDeliverer(BaseOutputDeliverer):
accidentally sending data to the wrong environment (staging vs production).
"""

# URL constants for reference - no default to force explicit choice
STAGING_BASE_URL = "https://public-api.staging.neoworlder.com"
# PRODUCTION_BASE_URL = "https://public-api.neoworlder.com" # Update when available

def __init__(
self,
api_key: str,
Expand All @@ -78,18 +83,24 @@ def __init__(
customer_phone: str = "",
company_name: str = "",
address: str = "",
campaign_type: CampaignType = CampaignType.SELLER,
is_recovery: bool = False,
sms_optin: bool = False,
):
"""
Initialize the NeoWorlder deliverer.

Args:
api_key: NeoWorlder API key (neo-api-access-key).
base_url: NeoWorlder API base URL (use STAGING_BASE_URL or production URL).
base_url: NeoWorlder API base URL (supplied via configuration, not hardcoded).
customer_name: Customer's full name (required).
customer_email: Customer's email address (required, also used as client identifier).
customer_phone: Customer's phone number (optional).
company_name: Company name (optional).
address: Customer address (optional).
campaign_type: Campaign type for NeoWorlder persona routing.
is_recovery: Whether leads are for the lead recovery campaign.
sms_optin: Whether SMS opt-in has been obtained for these leads.
"""
self.api_key = api_key
self.base_url = base_url.rstrip("/")
Expand All @@ -98,7 +109,9 @@ def __init__(
self.customer_phone = customer_phone
self.company_name = company_name
self.address = address
# Use customer email as the unique client identifier
self.campaign_type = CampaignType(campaign_type)
self.is_recovery = is_recovery
self.sms_optin = sms_optin
self.real_intent_client_id = customer_email

@property
Expand Down Expand Up @@ -142,11 +155,9 @@ def _register_client(self) -> dict[str, Any]:
}

log("debug", f"Registering/updating NeoWorlder client: {self.real_intent_client_id}")

response = requests.post(url, json=payload, headers=headers, timeout=TIMEOUT_SECONDS)

log("trace", f"Raw response: {response.text}, status_code: {response.status_code}")

self._handle_response_errors(response, "register_client")

log("info", f"Successfully registered/updated NeoWorlder client: {self.real_intent_client_id}")
Expand Down Expand Up @@ -195,8 +206,9 @@ def _convert_leads_to_csv(self, pii_md5s: list[MD5WithPII]) -> io.BytesIO:
"""
Convert a list of MD5WithPII leads to a CSV file in memory.

Uses the standard CSVStringFormatter for consistent output format
with all emails, phones, and detailed PII fields.
Uses the standard CSVStringFormatter for consistent output format,
then appends NeoWorlder campaign columns (BUYER, RECOVERY, SMS_OPTIN)
based on the deliverer's campaign configuration.

Args:
pii_md5s: List of leads with PII data.
Expand All @@ -205,6 +217,29 @@ def _convert_leads_to_csv(self, pii_md5s: list[MD5WithPII]) -> io.BytesIO:
BytesIO: In-memory CSV file ready for upload.
"""
csv_string = CSVStringFormatter().deliver(pii_md5s)

if not csv_string:
bytes_output = io.BytesIO(b"")
bytes_output.seek(0)
log("debug", "No CSV content generated (empty lead list)")
return bytes_output

reader = csv.reader(io.StringIO(csv_string))
rows = list(reader)

buyer_val = "BUYER" if self.campaign_type == CampaignType.BUYER else ""
recovery_val = "YES" if self.is_recovery else ""
sms_val = "YES" if self.sms_optin else ""

rows[0].extend(["BUYER", "RECOVERY", "SMS_OPTIN"])
for row in rows[1:]:
row.extend([buyer_val, recovery_val, sms_val])

output = io.StringIO()
writer = csv.writer(output)
writer.writerows(rows)
csv_string = output.getvalue()

bytes_output = io.BytesIO(csv_string.encode("utf-8"))
bytes_output.seek(0)

Expand Down Expand Up @@ -266,7 +301,6 @@ def _deliver(self, pii_md5s: list[MD5WithPII]) -> dict[str, Any]:
)

log("trace", f"Raw response: {response.text}, status_code: {response.status_code}")

self._handle_response_errors(response, "execute_inbound_flow")

log("info", f"Successfully delivered {len(filtered_leads)} leads to NeoWorlder")
Expand Down
152 changes: 150 additions & 2 deletions tests/test_neoworlder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import os
from dotenv import load_dotenv
from unittest.mock import patch, MagicMock
import pandas as pd
from io import StringIO

from real_intent.deliver.neoworlder import (
CampaignType,
NeoworlderDeliverer,
NeoworlderAPIError,
NeoworlderAuthError,
Expand All @@ -17,7 +20,7 @@

# Test constants
TEST_API_KEY = "nk_test_dummy_key_for_unit_tests"
TEST_BASE_URL = NeoworlderDeliverer.STAGING_BASE_URL
TEST_BASE_URL = "https://public-api.staging.neoworlder.com"
TEST_CUSTOMER_NAME = "Test Customer"
TEST_CUSTOMER_EMAIL = "test@example.com"

Expand Down Expand Up @@ -54,6 +57,39 @@ def test_deliverer_initialization():
assert deliverer.base_url == TEST_BASE_URL
assert deliverer.customer_name == TEST_CUSTOMER_NAME
assert deliverer.customer_email == TEST_CUSTOMER_EMAIL
# Default campaign params
assert deliverer.campaign_type == CampaignType.SELLER
assert deliverer.is_recovery is False
assert deliverer.sms_optin is False


def test_deliverer_initialization_with_campaign_params():
"""Test that campaign parameters are stored correctly."""
deliverer = NeoworlderDeliverer(
api_key=TEST_API_KEY,
base_url=TEST_BASE_URL,
customer_name=TEST_CUSTOMER_NAME,
customer_email=TEST_CUSTOMER_EMAIL,
campaign_type="buyer",
is_recovery=True,
sms_optin=True,
)

assert deliverer.campaign_type == CampaignType.BUYER
assert deliverer.is_recovery is True
assert deliverer.sms_optin is True


def test_deliverer_invalid_campaign_type():
"""Test that invalid campaign_type raises ValueError."""
with pytest.raises(ValueError):
NeoworlderDeliverer(
api_key=TEST_API_KEY,
base_url=TEST_BASE_URL,
customer_name=TEST_CUSTOMER_NAME,
customer_email=TEST_CUSTOMER_EMAIL,
campaign_type="invalid",
)


def test_deliverer_strips_trailing_slash():
Expand Down Expand Up @@ -199,6 +235,118 @@ def test_convert_empty_leads_to_csv(neoworlder_deliverer):
assert csv_content == ""


# ---- Campaign Column Tests ----

def test_csv_has_campaign_columns_seller_default(neoworlder_deliverer, sample_pii_md5s):
"""Test that CSV includes BUYER/RECOVERY/SMS_OPTIN columns with seller defaults."""
csv_file = neoworlder_deliverer._convert_leads_to_csv(sample_pii_md5s)
csv_content = csv_file.read().decode("utf-8")

df = pd.read_csv(StringIO(csv_content))

assert "BUYER" in df.columns
assert "RECOVERY" in df.columns
assert "SMS_OPTIN" in df.columns

# Default seller campaign: BUYER column should be blank
assert all(v == "" for v in df["BUYER"].fillna(""))
assert all(v == "" for v in df["RECOVERY"].fillna(""))
assert all(v == "" for v in df["SMS_OPTIN"].fillna(""))


def test_csv_buyer_campaign(sample_pii_md5s, neoworlder_api_key):
"""Test that buyer campaign_type sets BUYER column to 'BUYER'."""
deliverer = NeoworlderDeliverer(
api_key=neoworlder_api_key,
base_url=TEST_BASE_URL,
customer_name=TEST_CUSTOMER_NAME,
customer_email=TEST_CUSTOMER_EMAIL,
campaign_type="buyer",
)

csv_file = deliverer._convert_leads_to_csv(sample_pii_md5s)
csv_content = csv_file.read().decode("utf-8")

import pandas as pd
from io import StringIO
df = pd.read_csv(StringIO(csv_content))

assert all(df["BUYER"] == "BUYER")
assert all(v == "" for v in df["RECOVERY"].fillna(""))
assert all(v == "" for v in df["SMS_OPTIN"].fillna(""))


def test_csv_recovery_campaign(sample_pii_md5s, neoworlder_api_key):
"""Test that is_recovery=True sets RECOVERY column to 'YES'."""
deliverer = NeoworlderDeliverer(
api_key=neoworlder_api_key,
base_url=TEST_BASE_URL,
customer_name=TEST_CUSTOMER_NAME,
customer_email=TEST_CUSTOMER_EMAIL,
is_recovery=True,
)

csv_file = deliverer._convert_leads_to_csv(sample_pii_md5s)
csv_content = csv_file.read().decode("utf-8")

import pandas as pd
from io import StringIO
df = pd.read_csv(StringIO(csv_content))

assert all(v == "" for v in df["BUYER"].fillna(""))
assert all(df["RECOVERY"] == "YES")
assert all(v == "" for v in df["SMS_OPTIN"].fillna(""))


def test_csv_sms_optin(sample_pii_md5s, neoworlder_api_key):
"""Test that sms_optin=True sets SMS_OPTIN column to 'YES'."""
deliverer = NeoworlderDeliverer(
api_key=neoworlder_api_key,
base_url=TEST_BASE_URL,
customer_name=TEST_CUSTOMER_NAME,
customer_email=TEST_CUSTOMER_EMAIL,
sms_optin=True,
)

csv_file = deliverer._convert_leads_to_csv(sample_pii_md5s)
csv_content = csv_file.read().decode("utf-8")

import pandas as pd
from io import StringIO
df = pd.read_csv(StringIO(csv_content))

assert all(v == "" for v in df["BUYER"].fillna(""))
assert all(v == "" for v in df["RECOVERY"].fillna(""))
assert all(df["SMS_OPTIN"] == "YES")


def test_csv_all_campaign_params(sample_pii_md5s, neoworlder_api_key):
"""Test all campaign params set simultaneously."""
deliverer = NeoworlderDeliverer(
api_key=neoworlder_api_key,
base_url=TEST_BASE_URL,
customer_name=TEST_CUSTOMER_NAME,
customer_email=TEST_CUSTOMER_EMAIL,
campaign_type="buyer",
is_recovery=True,
sms_optin=True,
)

csv_file = deliverer._convert_leads_to_csv(sample_pii_md5s)
csv_content = csv_file.read().decode("utf-8")

import pandas as pd
from io import StringIO
df = pd.read_csv(StringIO(csv_content))

assert all(df["BUYER"] == "BUYER")
assert all(df["RECOVERY"] == "YES")
assert all(df["SMS_OPTIN"] == "YES")

# Verify campaign columns are the last 3 columns
assert list(df.columns[-3:]) == ["BUYER", "RECOVERY", "SMS_OPTIN"]


# ---- Delivery Tests ----

def test_deliver_empty_leads(neoworlder_deliverer):
Expand Down Expand Up @@ -474,7 +622,7 @@ def test_integration_deliver(sample_pii_md5s):
# Note: customer_email is used as the client identifier
deliverer = NeoworlderDeliverer(
api_key=api_key,
base_url=NeoworlderDeliverer.STAGING_BASE_URL,
base_url=TEST_BASE_URL,
customer_name="Integration Test Customer",
customer_email="integration-test@realintent.co",
customer_phone="555-123-4567",
Expand Down