Skip to content
46 changes: 42 additions & 4 deletions real_intent/deliver/neoworlder/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Deliverer for NeoWorlder AI lead nurturing platform."""
import csv
import io
import json
import requests
Expand Down Expand Up @@ -67,7 +68,10 @@ class NeoworlderDeliverer(BaseOutputDeliverer):

# URL constants for reference - no default to force explicit choice
STAGING_BASE_URL = "https://public-api.staging.neoworlder.com"
# PRODUCTION_BASE_URL = "https://public-api.neoworlder.com" # Update when available
PRODUCTION_BASE_URL = "https://public-api.neoworlder.com"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep this out of the SDK, they should be supplied in keys.yaml from real-intent-deliveries.


# Valid campaign types for NeoWorlder persona routing
VALID_CAMPAIGN_TYPES = ("seller", "buyer")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a StrEnum for this, and reference that datatype below in the initializer. Serves both purposes of data validation and documenting available values.


def __init__(
self,
Expand All @@ -78,26 +82,40 @@ def __init__(
customer_phone: str = "",
company_name: str = "",
address: str = "",
campaign_type: str = "seller",
is_recovery: bool = False,
sms_optin: bool = False,
):
"""
Initialize the NeoWorlder deliverer.

Args:
api_key: NeoWorlder API key (neo-api-access-key).
base_url: NeoWorlder API base URL (use STAGING_BASE_URL or production URL).
base_url: NeoWorlder API base URL (use STAGING_BASE_URL or PRODUCTION_BASE_URL).
customer_name: Customer's full name (required).
customer_email: Customer's email address (required, also used as client identifier).
customer_phone: Customer's phone number (optional).
company_name: Company name (optional).
address: Customer address (optional).
campaign_type: Campaign type for NeoWorlder persona routing ("seller" or "buyer").
is_recovery: Whether leads are for the lead recovery campaign.
sms_optin: Whether SMS opt-in has been obtained for these leads.
"""
if campaign_type not in self.VALID_CAMPAIGN_TYPES:
raise ValueError(
f"Invalid campaign_type '{campaign_type}'. Must be one of: {self.VALID_CAMPAIGN_TYPES}"
)

self.api_key = api_key
self.base_url = base_url.rstrip("/")
self.customer_name = customer_name
self.customer_email = customer_email
self.customer_phone = customer_phone
self.company_name = company_name
self.address = address
self.campaign_type = campaign_type
self.is_recovery = is_recovery
self.sms_optin = sms_optin
# Use customer email as the unique client identifier
self.real_intent_client_id = customer_email

Expand Down Expand Up @@ -195,8 +213,9 @@ def _convert_leads_to_csv(self, pii_md5s: list[MD5WithPII]) -> io.BytesIO:
"""
Convert a list of MD5WithPII leads to a CSV file in memory.

Uses the standard CSVStringFormatter for consistent output format
with all emails, phones, and detailed PII fields.
Uses the standard CSVStringFormatter for consistent output format,
then appends NeoWorlder campaign columns (BUYER, RECOVERY, SMS_OPTIN)
based on the deliverer's campaign configuration.

Args:
pii_md5s: List of leads with PII data.
Expand All @@ -205,6 +224,25 @@ def _convert_leads_to_csv(self, pii_md5s: list[MD5WithPII]) -> io.BytesIO:
BytesIO: In-memory CSV file ready for upload.
"""
csv_string = CSVStringFormatter().deliver(pii_md5s)

if csv_string:
reader = csv.reader(io.StringIO(csv_string))
rows = list(reader)

# Append campaign columns to header and data rows
buyer_val = "BUYER" if self.campaign_type == "buyer" else ""
recovery_val = "YES" if self.is_recovery else ""
sms_val = "YES" if self.sms_optin else ""

rows[0].extend(["BUYER", "RECOVERY", "SMS_OPTIN"])
for row in rows[1:]:
row.extend([buyer_val, recovery_val, sms_val])

output = io.StringIO()
writer = csv.writer(output)
writer.writerows(rows)
csv_string = output.getvalue()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a situation like this I think it makes more sense to use the pattern:

if not csv_string:
    # handle edge case

# main logic unindented


bytes_output = io.BytesIO(csv_string.encode("utf-8"))
bytes_output.seek(0)

Expand Down
152 changes: 152 additions & 0 deletions tests/test_neoworlder.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,44 @@ def test_deliverer_initialization():
assert deliverer.base_url == TEST_BASE_URL
assert deliverer.customer_name == TEST_CUSTOMER_NAME
assert deliverer.customer_email == TEST_CUSTOMER_EMAIL
# Default campaign params
assert deliverer.campaign_type == "seller"
assert deliverer.is_recovery is False
assert deliverer.sms_optin is False


def test_deliverer_initialization_with_campaign_params():
"""Test that campaign parameters are stored correctly."""
deliverer = NeoworlderDeliverer(
api_key=TEST_API_KEY,
base_url=TEST_BASE_URL,
customer_name=TEST_CUSTOMER_NAME,
customer_email=TEST_CUSTOMER_EMAIL,
campaign_type="buyer",
is_recovery=True,
sms_optin=True,
)

assert deliverer.campaign_type == "buyer"
assert deliverer.is_recovery is True
assert deliverer.sms_optin is True


def test_deliverer_invalid_campaign_type():
"""Test that invalid campaign_type raises ValueError."""
with pytest.raises(ValueError, match="Invalid campaign_type"):
NeoworlderDeliverer(
api_key=TEST_API_KEY,
base_url=TEST_BASE_URL,
customer_name=TEST_CUSTOMER_NAME,
customer_email=TEST_CUSTOMER_EMAIL,
campaign_type="invalid",
)


def test_deliverer_production_url():
"""Test that the production URL constant is available."""
assert NeoworlderDeliverer.PRODUCTION_BASE_URL == "https://public-api.neoworlder.com"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In accordance with prior comment about removing URLs from SDK, would remove this test entirely.



def test_deliverer_strips_trailing_slash():
Expand Down Expand Up @@ -199,6 +237,120 @@ def test_convert_empty_leads_to_csv(neoworlder_deliverer):
assert csv_content == ""


# ---- Campaign Column Tests ----

def test_csv_has_campaign_columns_seller_default(neoworlder_deliverer, sample_pii_md5s):
"""Test that CSV includes BUYER/RECOVERY/SMS_OPTIN columns with seller defaults."""
csv_file = neoworlder_deliverer._convert_leads_to_csv(sample_pii_md5s)
csv_content = csv_file.read().decode("utf-8")

import pandas as pd
from io import StringIO
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would put these at the root of the test module, not within the function, to fail-fast on test collection vs. test execution. Imports within functions are useful for conditional imports, but io is stdlib and pandas is required by the package at large (I'm pretty sure).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would apply to all instances of imports within functions.

df = pd.read_csv(StringIO(csv_content))

assert "BUYER" in df.columns
assert "RECOVERY" in df.columns
assert "SMS_OPTIN" in df.columns

# Default seller campaign: BUYER column should be blank
assert all(v == "" for v in df["BUYER"].fillna(""))
assert all(v == "" for v in df["RECOVERY"].fillna(""))
assert all(v == "" for v in df["SMS_OPTIN"].fillna(""))


def test_csv_buyer_campaign(sample_pii_md5s, neoworlder_api_key):
"""Test that buyer campaign_type sets BUYER column to 'BUYER'."""
deliverer = NeoworlderDeliverer(
api_key=neoworlder_api_key,
base_url=TEST_BASE_URL,
customer_name=TEST_CUSTOMER_NAME,
customer_email=TEST_CUSTOMER_EMAIL,
campaign_type="buyer",
)

csv_file = deliverer._convert_leads_to_csv(sample_pii_md5s)
csv_content = csv_file.read().decode("utf-8")

import pandas as pd
from io import StringIO
df = pd.read_csv(StringIO(csv_content))

assert all(df["BUYER"] == "BUYER")
assert all(v == "" for v in df["RECOVERY"].fillna(""))
assert all(v == "" for v in df["SMS_OPTIN"].fillna(""))


def test_csv_recovery_campaign(sample_pii_md5s, neoworlder_api_key):
"""Test that is_recovery=True sets RECOVERY column to 'YES'."""
deliverer = NeoworlderDeliverer(
api_key=neoworlder_api_key,
base_url=TEST_BASE_URL,
customer_name=TEST_CUSTOMER_NAME,
customer_email=TEST_CUSTOMER_EMAIL,
is_recovery=True,
)

csv_file = deliverer._convert_leads_to_csv(sample_pii_md5s)
csv_content = csv_file.read().decode("utf-8")

import pandas as pd
from io import StringIO
df = pd.read_csv(StringIO(csv_content))

assert all(v == "" for v in df["BUYER"].fillna(""))
assert all(df["RECOVERY"] == "YES")
assert all(v == "" for v in df["SMS_OPTIN"].fillna(""))


def test_csv_sms_optin(sample_pii_md5s, neoworlder_api_key):
"""Test that sms_optin=True sets SMS_OPTIN column to 'YES'."""
deliverer = NeoworlderDeliverer(
api_key=neoworlder_api_key,
base_url=TEST_BASE_URL,
customer_name=TEST_CUSTOMER_NAME,
customer_email=TEST_CUSTOMER_EMAIL,
sms_optin=True,
)

csv_file = deliverer._convert_leads_to_csv(sample_pii_md5s)
csv_content = csv_file.read().decode("utf-8")

import pandas as pd
from io import StringIO
df = pd.read_csv(StringIO(csv_content))

assert all(v == "" for v in df["BUYER"].fillna(""))
assert all(v == "" for v in df["RECOVERY"].fillna(""))
assert all(df["SMS_OPTIN"] == "YES")


def test_csv_all_campaign_params(sample_pii_md5s, neoworlder_api_key):
"""Test all campaign params set simultaneously."""
deliverer = NeoworlderDeliverer(
api_key=neoworlder_api_key,
base_url=TEST_BASE_URL,
customer_name=TEST_CUSTOMER_NAME,
customer_email=TEST_CUSTOMER_EMAIL,
campaign_type="buyer",
is_recovery=True,
sms_optin=True,
)

csv_file = deliverer._convert_leads_to_csv(sample_pii_md5s)
csv_content = csv_file.read().decode("utf-8")

import pandas as pd
from io import StringIO
df = pd.read_csv(StringIO(csv_content))

assert all(df["BUYER"] == "BUYER")
assert all(df["RECOVERY"] == "YES")
assert all(df["SMS_OPTIN"] == "YES")

# Verify campaign columns are the last 3 columns
assert list(df.columns[-3:]) == ["BUYER", "RECOVERY", "SMS_OPTIN"]


# ---- Delivery Tests ----

def test_deliver_empty_leads(neoworlder_deliverer):
Expand Down
Loading