From 520f49e4c864af0d3b721eda3242344b9ec85856 Mon Sep 17 00:00:00 2001 From: tbains Date: Mon, 2 Mar 2026 14:13:20 -0800 Subject: [PATCH 1/6] feat: NeoWorlder integration to production --- real_intent/deliver/neoworlder/__init__.py | 39 +++++- tests/test_neoworlder.py | 152 +++++++++++++++++++++ 2 files changed, 187 insertions(+), 4 deletions(-) diff --git a/real_intent/deliver/neoworlder/__init__.py b/real_intent/deliver/neoworlder/__init__.py index f26a5f0..821bfd0 100644 --- a/real_intent/deliver/neoworlder/__init__.py +++ b/real_intent/deliver/neoworlder/__init__.py @@ -1,6 +1,7 @@ """Deliverer for NeoWorlder AI lead nurturing platform.""" import io import json +import pandas as pd import requests from typing import Any @@ -67,7 +68,10 @@ class NeoworlderDeliverer(BaseOutputDeliverer): # URL constants for reference - no default to force explicit choice STAGING_BASE_URL = "https://public-api.staging.neoworlder.com" - # PRODUCTION_BASE_URL = "https://public-api.neoworlder.com" # Update when available + PRODUCTION_BASE_URL = "https://public-api.neoworlder.com" + + # Valid campaign types for NeoWorlder persona routing + VALID_CAMPAIGN_TYPES = ("seller", "buyer") def __init__( self, @@ -78,19 +82,30 @@ def __init__( customer_phone: str = "", company_name: str = "", address: str = "", + campaign_type: str = "seller", + is_recovery: bool = False, + sms_optin: bool = False, ): """ Initialize the NeoWorlder deliverer. Args: api_key: NeoWorlder API key (neo-api-access-key). - base_url: NeoWorlder API base URL (use STAGING_BASE_URL or production URL). + base_url: NeoWorlder API base URL (use STAGING_BASE_URL or PRODUCTION_BASE_URL). customer_name: Customer's full name (required). customer_email: Customer's email address (required, also used as client identifier). customer_phone: Customer's phone number (optional). company_name: Company name (optional). address: Customer address (optional). + campaign_type: Campaign type for NeoWorlder persona routing ("seller" or "buyer"). + is_recovery: Whether leads are for the lead recovery campaign. + sms_optin: Whether SMS opt-in has been obtained for these leads. """ + if campaign_type not in self.VALID_CAMPAIGN_TYPES: + raise ValueError( + f"Invalid campaign_type '{campaign_type}'. Must be one of: {self.VALID_CAMPAIGN_TYPES}" + ) + self.api_key = api_key self.base_url = base_url.rstrip("/") self.customer_name = customer_name @@ -98,6 +113,9 @@ def __init__( self.customer_phone = customer_phone self.company_name = company_name self.address = address + self.campaign_type = campaign_type + self.is_recovery = is_recovery + self.sms_optin = sms_optin # Use customer email as the unique client identifier self.real_intent_client_id = customer_email @@ -195,8 +213,9 @@ def _convert_leads_to_csv(self, pii_md5s: list[MD5WithPII]) -> io.BytesIO: """ Convert a list of MD5WithPII leads to a CSV file in memory. - Uses the standard CSVStringFormatter for consistent output format - with all emails, phones, and detailed PII fields. + Uses the standard CSVStringFormatter for consistent output format, + then appends NeoWorlder campaign columns (BUYER, RECOVERY, SMS_OPTIN) + based on the deliverer's campaign configuration. Args: pii_md5s: List of leads with PII data. @@ -205,6 +224,18 @@ def _convert_leads_to_csv(self, pii_md5s: list[MD5WithPII]) -> io.BytesIO: BytesIO: In-memory CSV file ready for upload. """ csv_string = CSVStringFormatter().deliver(pii_md5s) + + if csv_string: + df = pd.read_csv(io.StringIO(csv_string)) + + df["BUYER"] = "BUYER" if self.campaign_type == "buyer" else "" + df["RECOVERY"] = "YES" if self.is_recovery else "" + df["SMS_OPTIN"] = "YES" if self.sms_optin else "" + + output = io.StringIO() + df.to_csv(output, index=False) + csv_string = output.getvalue() + bytes_output = io.BytesIO(csv_string.encode("utf-8")) bytes_output.seek(0) diff --git a/tests/test_neoworlder.py b/tests/test_neoworlder.py index abc472f..cd37ee6 100644 --- a/tests/test_neoworlder.py +++ b/tests/test_neoworlder.py @@ -54,6 +54,44 @@ def test_deliverer_initialization(): assert deliverer.base_url == TEST_BASE_URL assert deliverer.customer_name == TEST_CUSTOMER_NAME assert deliverer.customer_email == TEST_CUSTOMER_EMAIL + # Default campaign params + assert deliverer.campaign_type == "seller" + assert deliverer.is_recovery is False + assert deliverer.sms_optin is False + + +def test_deliverer_initialization_with_campaign_params(): + """Test that campaign parameters are stored correctly.""" + deliverer = NeoworlderDeliverer( + api_key=TEST_API_KEY, + base_url=TEST_BASE_URL, + customer_name=TEST_CUSTOMER_NAME, + customer_email=TEST_CUSTOMER_EMAIL, + campaign_type="buyer", + is_recovery=True, + sms_optin=True, + ) + + assert deliverer.campaign_type == "buyer" + assert deliverer.is_recovery is True + assert deliverer.sms_optin is True + + +def test_deliverer_invalid_campaign_type(): + """Test that invalid campaign_type raises ValueError.""" + with pytest.raises(ValueError, match="Invalid campaign_type"): + NeoworlderDeliverer( + api_key=TEST_API_KEY, + base_url=TEST_BASE_URL, + customer_name=TEST_CUSTOMER_NAME, + customer_email=TEST_CUSTOMER_EMAIL, + campaign_type="invalid", + ) + + +def test_deliverer_production_url(): + """Test that the production URL constant is available.""" + assert NeoworlderDeliverer.PRODUCTION_BASE_URL == "https://public-api.neoworlder.com" def test_deliverer_strips_trailing_slash(): @@ -199,6 +237,120 @@ def test_convert_empty_leads_to_csv(neoworlder_deliverer): assert csv_content == "" +# ---- Campaign Column Tests ---- + +def test_csv_has_campaign_columns_seller_default(neoworlder_deliverer, sample_pii_md5s): + """Test that CSV includes BUYER/RECOVERY/SMS_OPTIN columns with seller defaults.""" + csv_file = neoworlder_deliverer._convert_leads_to_csv(sample_pii_md5s) + csv_content = csv_file.read().decode("utf-8") + + import pandas as pd + from io import StringIO + df = pd.read_csv(StringIO(csv_content)) + + assert "BUYER" in df.columns + assert "RECOVERY" in df.columns + assert "SMS_OPTIN" in df.columns + + # Default seller campaign: BUYER column should be blank + assert all(v == "" for v in df["BUYER"].fillna("")) + assert all(v == "" for v in df["RECOVERY"].fillna("")) + assert all(v == "" for v in df["SMS_OPTIN"].fillna("")) + + +def test_csv_buyer_campaign(sample_pii_md5s, neoworlder_api_key): + """Test that buyer campaign_type sets BUYER column to 'BUYER'.""" + deliverer = NeoworlderDeliverer( + api_key=neoworlder_api_key, + base_url=TEST_BASE_URL, + customer_name=TEST_CUSTOMER_NAME, + customer_email=TEST_CUSTOMER_EMAIL, + campaign_type="buyer", + ) + + csv_file = deliverer._convert_leads_to_csv(sample_pii_md5s) + csv_content = csv_file.read().decode("utf-8") + + import pandas as pd + from io import StringIO + df = pd.read_csv(StringIO(csv_content)) + + assert all(df["BUYER"] == "BUYER") + assert all(v == "" for v in df["RECOVERY"].fillna("")) + assert all(v == "" for v in df["SMS_OPTIN"].fillna("")) + + +def test_csv_recovery_campaign(sample_pii_md5s, neoworlder_api_key): + """Test that is_recovery=True sets RECOVERY column to 'YES'.""" + deliverer = NeoworlderDeliverer( + api_key=neoworlder_api_key, + base_url=TEST_BASE_URL, + customer_name=TEST_CUSTOMER_NAME, + customer_email=TEST_CUSTOMER_EMAIL, + is_recovery=True, + ) + + csv_file = deliverer._convert_leads_to_csv(sample_pii_md5s) + csv_content = csv_file.read().decode("utf-8") + + import pandas as pd + from io import StringIO + df = pd.read_csv(StringIO(csv_content)) + + assert all(v == "" for v in df["BUYER"].fillna("")) + assert all(df["RECOVERY"] == "YES") + assert all(v == "" for v in df["SMS_OPTIN"].fillna("")) + + +def test_csv_sms_optin(sample_pii_md5s, neoworlder_api_key): + """Test that sms_optin=True sets SMS_OPTIN column to 'YES'.""" + deliverer = NeoworlderDeliverer( + api_key=neoworlder_api_key, + base_url=TEST_BASE_URL, + customer_name=TEST_CUSTOMER_NAME, + customer_email=TEST_CUSTOMER_EMAIL, + sms_optin=True, + ) + + csv_file = deliverer._convert_leads_to_csv(sample_pii_md5s) + csv_content = csv_file.read().decode("utf-8") + + import pandas as pd + from io import StringIO + df = pd.read_csv(StringIO(csv_content)) + + assert all(v == "" for v in df["BUYER"].fillna("")) + assert all(v == "" for v in df["RECOVERY"].fillna("")) + assert all(df["SMS_OPTIN"] == "YES") + + +def test_csv_all_campaign_params(sample_pii_md5s, neoworlder_api_key): + """Test all campaign params set simultaneously.""" + deliverer = NeoworlderDeliverer( + api_key=neoworlder_api_key, + base_url=TEST_BASE_URL, + customer_name=TEST_CUSTOMER_NAME, + customer_email=TEST_CUSTOMER_EMAIL, + campaign_type="buyer", + is_recovery=True, + sms_optin=True, + ) + + csv_file = deliverer._convert_leads_to_csv(sample_pii_md5s) + csv_content = csv_file.read().decode("utf-8") + + import pandas as pd + from io import StringIO + df = pd.read_csv(StringIO(csv_content)) + + assert all(df["BUYER"] == "BUYER") + assert all(df["RECOVERY"] == "YES") + assert all(df["SMS_OPTIN"] == "YES") + + # Verify campaign columns are the last 3 columns + assert list(df.columns[-3:]) == ["BUYER", "RECOVERY", "SMS_OPTIN"] + + # ---- Delivery Tests ---- def test_deliver_empty_leads(neoworlder_deliverer): From 94b2491651a3afb4db3e300ddffa9801ceea78ae Mon Sep 17 00:00:00 2001 From: tbains Date: Mon, 2 Mar 2026 14:24:18 -0800 Subject: [PATCH 2/6] fix: replace pd.read_csv with csv module to prevent PII dtype coercion --- real_intent/deliver/neoworlder/__init__.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/real_intent/deliver/neoworlder/__init__.py b/real_intent/deliver/neoworlder/__init__.py index 821bfd0..dddc710 100644 --- a/real_intent/deliver/neoworlder/__init__.py +++ b/real_intent/deliver/neoworlder/__init__.py @@ -1,7 +1,7 @@ """Deliverer for NeoWorlder AI lead nurturing platform.""" +import csv import io import json -import pandas as pd import requests from typing import Any @@ -226,14 +226,21 @@ def _convert_leads_to_csv(self, pii_md5s: list[MD5WithPII]) -> io.BytesIO: csv_string = CSVStringFormatter().deliver(pii_md5s) if csv_string: - df = pd.read_csv(io.StringIO(csv_string)) + reader = csv.reader(io.StringIO(csv_string)) + rows = list(reader) - df["BUYER"] = "BUYER" if self.campaign_type == "buyer" else "" - df["RECOVERY"] = "YES" if self.is_recovery else "" - df["SMS_OPTIN"] = "YES" if self.sms_optin else "" + # Append campaign columns to header and data rows + buyer_val = "BUYER" if self.campaign_type == "buyer" else "" + recovery_val = "YES" if self.is_recovery else "" + sms_val = "YES" if self.sms_optin else "" + + rows[0].extend(["BUYER", "RECOVERY", "SMS_OPTIN"]) + for row in rows[1:]: + row.extend([buyer_val, recovery_val, sms_val]) output = io.StringIO() - df.to_csv(output, index=False) + writer = csv.writer(output) + writer.writerows(rows) csv_string = output.getvalue() bytes_output = io.BytesIO(csv_string.encode("utf-8")) From 51c45427212dcda1b035c65eaf1ce651bc7fee27 Mon Sep 17 00:00:00 2001 From: tbains Date: Tue, 3 Mar 2026 14:52:20 -0800 Subject: [PATCH 3/6] fix: addressed feedback --- real_intent/deliver/__init__.py | 1 + real_intent/deliver/neoworlder/__init__.py | 62 +++++++++++----------- tests/test_neoworlder.py | 20 +++---- 3 files changed, 39 insertions(+), 44 deletions(-) diff --git a/real_intent/deliver/__init__.py b/real_intent/deliver/__init__.py index ca9b612..1ed2ced 100644 --- a/real_intent/deliver/__init__.py +++ b/real_intent/deliver/__init__.py @@ -1,6 +1,7 @@ """Take a list of MD5WithPII (leads) and format/deliver them. Do something with them, essentially.""" from real_intent.deliver.csv import CSVStringFormatter from real_intent.deliver.neoworlder import ( + CampaignType, NeoworlderDeliverer, NeoworlderAPIError, NeoworlderAuthError, diff --git a/real_intent/deliver/neoworlder/__init__.py b/real_intent/deliver/neoworlder/__init__.py index dddc710..cf0a025 100644 --- a/real_intent/deliver/neoworlder/__init__.py +++ b/real_intent/deliver/neoworlder/__init__.py @@ -3,6 +3,7 @@ import io import json import requests +from enum import StrEnum from typing import Any from real_intent.deliver.base import BaseOutputDeliverer @@ -17,6 +18,12 @@ TIMEOUT_SECONDS = 30 +class CampaignType(StrEnum): + """NeoWorlder campaign types for persona routing.""" + SELLER = "seller" + BUYER = "buyer" + + # ---- Exceptions ---- class NeoworlderAPIError(Exception): @@ -55,7 +62,7 @@ class NeoworlderDeliverer(BaseOutputDeliverer): Example: deliverer = NeoworlderDeliverer( api_key="...", - base_url=NeoworlderDeliverer.STAGING_BASE_URL, + base_url="https://public-api.neoworlder.com", customer_name="John Doe", customer_email="john@example.com", ) @@ -66,13 +73,6 @@ class NeoworlderDeliverer(BaseOutputDeliverer): accidentally sending data to the wrong environment (staging vs production). """ - # URL constants for reference - no default to force explicit choice - STAGING_BASE_URL = "https://public-api.staging.neoworlder.com" - PRODUCTION_BASE_URL = "https://public-api.neoworlder.com" - - # Valid campaign types for NeoWorlder persona routing - VALID_CAMPAIGN_TYPES = ("seller", "buyer") - def __init__( self, api_key: str, @@ -82,7 +82,7 @@ def __init__( customer_phone: str = "", company_name: str = "", address: str = "", - campaign_type: str = "seller", + campaign_type: CampaignType = CampaignType.SELLER, is_recovery: bool = False, sms_optin: bool = False, ): @@ -91,21 +91,16 @@ def __init__( Args: api_key: NeoWorlder API key (neo-api-access-key). - base_url: NeoWorlder API base URL (use STAGING_BASE_URL or PRODUCTION_BASE_URL). + base_url: NeoWorlder API base URL (supplied via configuration, not hardcoded). customer_name: Customer's full name (required). customer_email: Customer's email address (required, also used as client identifier). customer_phone: Customer's phone number (optional). company_name: Company name (optional). address: Customer address (optional). - campaign_type: Campaign type for NeoWorlder persona routing ("seller" or "buyer"). + campaign_type: Campaign type for NeoWorlder persona routing. is_recovery: Whether leads are for the lead recovery campaign. sms_optin: Whether SMS opt-in has been obtained for these leads. """ - if campaign_type not in self.VALID_CAMPAIGN_TYPES: - raise ValueError( - f"Invalid campaign_type '{campaign_type}'. Must be one of: {self.VALID_CAMPAIGN_TYPES}" - ) - self.api_key = api_key self.base_url = base_url.rstrip("/") self.customer_name = customer_name @@ -113,10 +108,9 @@ def __init__( self.customer_phone = customer_phone self.company_name = company_name self.address = address - self.campaign_type = campaign_type + self.campaign_type = CampaignType(campaign_type) self.is_recovery = is_recovery self.sms_optin = sms_optin - # Use customer email as the unique client identifier self.real_intent_client_id = customer_email @property @@ -225,23 +219,27 @@ def _convert_leads_to_csv(self, pii_md5s: list[MD5WithPII]) -> io.BytesIO: """ csv_string = CSVStringFormatter().deliver(pii_md5s) - if csv_string: - reader = csv.reader(io.StringIO(csv_string)) - rows = list(reader) + if not csv_string: + bytes_output = io.BytesIO(b"") + bytes_output.seek(0) + log("debug", "No CSV content generated (empty lead list)") + return bytes_output + + reader = csv.reader(io.StringIO(csv_string)) + rows = list(reader) - # Append campaign columns to header and data rows - buyer_val = "BUYER" if self.campaign_type == "buyer" else "" - recovery_val = "YES" if self.is_recovery else "" - sms_val = "YES" if self.sms_optin else "" + buyer_val = "BUYER" if self.campaign_type == CampaignType.BUYER else "" + recovery_val = "YES" if self.is_recovery else "" + sms_val = "YES" if self.sms_optin else "" - rows[0].extend(["BUYER", "RECOVERY", "SMS_OPTIN"]) - for row in rows[1:]: - row.extend([buyer_val, recovery_val, sms_val]) + rows[0].extend(["BUYER", "RECOVERY", "SMS_OPTIN"]) + for row in rows[1:]: + row.extend([buyer_val, recovery_val, sms_val]) - output = io.StringIO() - writer = csv.writer(output) - writer.writerows(rows) - csv_string = output.getvalue() + output = io.StringIO() + writer = csv.writer(output) + writer.writerows(rows) + csv_string = output.getvalue() bytes_output = io.BytesIO(csv_string.encode("utf-8")) bytes_output.seek(0) diff --git a/tests/test_neoworlder.py b/tests/test_neoworlder.py index cd37ee6..ce1e065 100644 --- a/tests/test_neoworlder.py +++ b/tests/test_neoworlder.py @@ -3,8 +3,11 @@ import os from dotenv import load_dotenv from unittest.mock import patch, MagicMock +import pandas as pd +from io import StringIO from real_intent.deliver.neoworlder import ( + CampaignType, NeoworlderDeliverer, NeoworlderAPIError, NeoworlderAuthError, @@ -17,7 +20,7 @@ # Test constants TEST_API_KEY = "nk_test_dummy_key_for_unit_tests" -TEST_BASE_URL = NeoworlderDeliverer.STAGING_BASE_URL +TEST_BASE_URL = "https://public-api.staging.neoworlder.com" TEST_CUSTOMER_NAME = "Test Customer" TEST_CUSTOMER_EMAIL = "test@example.com" @@ -55,7 +58,7 @@ def test_deliverer_initialization(): assert deliverer.customer_name == TEST_CUSTOMER_NAME assert deliverer.customer_email == TEST_CUSTOMER_EMAIL # Default campaign params - assert deliverer.campaign_type == "seller" + assert deliverer.campaign_type == CampaignType.SELLER assert deliverer.is_recovery is False assert deliverer.sms_optin is False @@ -72,14 +75,14 @@ def test_deliverer_initialization_with_campaign_params(): sms_optin=True, ) - assert deliverer.campaign_type == "buyer" + assert deliverer.campaign_type == CampaignType.BUYER assert deliverer.is_recovery is True assert deliverer.sms_optin is True def test_deliverer_invalid_campaign_type(): """Test that invalid campaign_type raises ValueError.""" - with pytest.raises(ValueError, match="Invalid campaign_type"): + with pytest.raises(ValueError): NeoworlderDeliverer( api_key=TEST_API_KEY, base_url=TEST_BASE_URL, @@ -89,11 +92,6 @@ def test_deliverer_invalid_campaign_type(): ) -def test_deliverer_production_url(): - """Test that the production URL constant is available.""" - assert NeoworlderDeliverer.PRODUCTION_BASE_URL == "https://public-api.neoworlder.com" - - def test_deliverer_strips_trailing_slash(): """Test that trailing slashes are stripped from base_url.""" deliverer = NeoworlderDeliverer( @@ -244,8 +242,6 @@ def test_csv_has_campaign_columns_seller_default(neoworlder_deliverer, sample_pi csv_file = neoworlder_deliverer._convert_leads_to_csv(sample_pii_md5s) csv_content = csv_file.read().decode("utf-8") - import pandas as pd - from io import StringIO df = pd.read_csv(StringIO(csv_content)) assert "BUYER" in df.columns @@ -626,7 +622,7 @@ def test_integration_deliver(sample_pii_md5s): # Note: customer_email is used as the client identifier deliverer = NeoworlderDeliverer( api_key=api_key, - base_url=NeoworlderDeliverer.STAGING_BASE_URL, + base_url=TEST_BASE_URL, customer_name="Integration Test Customer", customer_email="integration-test@realintent.co", customer_phone="555-123-4567", From 6f2ad1b2606bce6dc441c731cbc53786f7ca698b Mon Sep 17 00:00:00 2001 From: Prerit Das Date: Wed, 4 Mar 2026 18:27:37 +0000 Subject: [PATCH 4/6] Separate imports. --- real_intent/deliver/neoworlder/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/real_intent/deliver/neoworlder/__init__.py b/real_intent/deliver/neoworlder/__init__.py index cf0a025..052e7a1 100644 --- a/real_intent/deliver/neoworlder/__init__.py +++ b/real_intent/deliver/neoworlder/__init__.py @@ -1,8 +1,9 @@ """Deliverer for NeoWorlder AI lead nurturing platform.""" +import requests + import csv import io import json -import requests from enum import StrEnum from typing import Any From e52ac02cc8a427afacaba476064617401b45ecdb Mon Sep 17 00:00:00 2001 From: Prerit Das Date: Wed, 4 Mar 2026 18:28:24 +0000 Subject: [PATCH 5/6] Remove base url. --- real_intent/deliver/neoworlder/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/real_intent/deliver/neoworlder/__init__.py b/real_intent/deliver/neoworlder/__init__.py index 052e7a1..0b9c247 100644 --- a/real_intent/deliver/neoworlder/__init__.py +++ b/real_intent/deliver/neoworlder/__init__.py @@ -63,7 +63,7 @@ class NeoworlderDeliverer(BaseOutputDeliverer): Example: deliverer = NeoworlderDeliverer( api_key="...", - base_url="https://public-api.neoworlder.com", + base_url="https://apiurl.com", customer_name="John Doe", customer_email="john@example.com", ) From d90efbd264142d6f135a88f2e167df64ba1b5fc5 Mon Sep 17 00:00:00 2001 From: Prerit Das Date: Wed, 4 Mar 2026 18:29:02 +0000 Subject: [PATCH 6/6] Code spacing. --- real_intent/deliver/neoworlder/__init__.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/real_intent/deliver/neoworlder/__init__.py b/real_intent/deliver/neoworlder/__init__.py index 0b9c247..f63c503 100644 --- a/real_intent/deliver/neoworlder/__init__.py +++ b/real_intent/deliver/neoworlder/__init__.py @@ -155,11 +155,9 @@ def _register_client(self) -> dict[str, Any]: } log("debug", f"Registering/updating NeoWorlder client: {self.real_intent_client_id}") - response = requests.post(url, json=payload, headers=headers, timeout=TIMEOUT_SECONDS) log("trace", f"Raw response: {response.text}, status_code: {response.status_code}") - self._handle_response_errors(response, "register_client") log("info", f"Successfully registered/updated NeoWorlder client: {self.real_intent_client_id}") @@ -303,7 +301,6 @@ def _deliver(self, pii_md5s: list[MD5WithPII]) -> dict[str, Any]: ) log("trace", f"Raw response: {response.text}, status_code: {response.status_code}") - self._handle_response_errors(response, "execute_inbound_flow") log("info", f"Successfully delivered {len(filtered_leads)} leads to NeoWorlder")