Skip to content

Commit 801492e

Browse files
mrehan27claude
andcommitted
chore: wire three-layer geofence transition delivery
GeofenceEventTracker drives the three-layer delivery flow: cooldown deduplication → persist to PendingGeofenceMetricStore → direct HTTP via GeofenceDeliveryTracker, with flushPending() replay on init and on every ProfileIdentifiedEvent. Each delivery attempt picks one path: - No HttpClient → EventBus, drain row - No userId → retain row for the next flush - HTTP attempt → success drains; failure retains for retry GeofenceDeliveryTrackerImpl reads apiHost from BackgroundDeliveryContextStore so customer-configured overrides apply. Delivery fails if no host is persisted yet (cold-start before any foreground SDK init). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5e11ba0 commit 801492e

5 files changed

Lines changed: 438 additions & 90 deletions

File tree

Sources/Location/Geofence/Event/GeofenceDeliveryTracker.swift

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ protocol GeofenceDeliveryTracker: AutoMockable {
1616

1717
final class GeofenceDeliveryTrackerImpl: GeofenceDeliveryTracker {
1818
private let httpClient: HttpClient
19-
private let region: Region
19+
private let contextStore: BackgroundDeliveryContextStore
2020
private let logger: Logger
2121

22-
init(httpClient: HttpClient, region: Region, logger: Logger) {
22+
init(httpClient: HttpClient, contextStore: BackgroundDeliveryContextStore, logger: Logger) {
2323
self.httpClient = httpClient
24-
self.region = region
24+
self.contextStore = contextStore
2525
self.logger = logger
2626
}
2727

@@ -34,6 +34,10 @@ final class GeofenceDeliveryTrackerImpl: GeofenceDeliveryTracker {
3434
logger.error("cannot deliver geofence metric without a userId")
3535
return onComplete(.failure(.noRequestMade(nil)))
3636
}
37+
guard let apiHost = contextStore.currentApiHost, !apiHost.isEmpty else {
38+
logger.error("cannot deliver geofence metric without a persisted apiHost")
39+
return onComplete(.failure(.noRequestMade(nil)))
40+
}
3741

3842
var properties: [String: Any] = [
3943
"geofence_id": metric.geofenceId,
@@ -52,7 +56,7 @@ final class GeofenceDeliveryTrackerImpl: GeofenceDeliveryTracker {
5256
let endpoint: CIOApiEndpoint = .trackPushMetricsCdp
5357
guard let httpParams = HttpRequestParams(
5458
endpoint: endpoint,
55-
baseUrl: Self.apiHost(for: region),
59+
baseUrl: Self.absoluteUrl(host: apiHost),
5660
headers: nil,
5761
body: try? JSONSerialization.data(withJSONObject: body)
5862
) else {
@@ -70,10 +74,13 @@ final class GeofenceDeliveryTrackerImpl: GeofenceDeliveryTracker {
7074
}
7175
}
7276

73-
private static func apiHost(for region: Region) -> String {
74-
switch region {
75-
case .US: return "https://cdp.customer.io/v1"
76-
case .EU: return "https://cdp-eu.customer.io/v1"
77+
/// `BackgroundDeliveryContextStore.currentApiHost` is host-only (no scheme); the request
78+
/// builder needs the full base URL, so prepend `https://` unless the caller has already
79+
/// qualified it.
80+
private static func absoluteUrl(host: String) -> String {
81+
if host.hasPrefix("http://") || host.hasPrefix("https://") {
82+
return host
7783
}
84+
return "https://" + host
7885
}
7986
}
Lines changed: 97 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,56 @@
11
import CioInternalCommon
22
import Foundation
33

4-
/// Sends geofence transition events to the data pipeline with cooldown-based deduplication.
4+
/// Sends geofence transition events through a three-layer delivery flow:
5+
/// 1. Cooldown-based deduplication (keyed by "geofenceId:transitionType")
6+
/// 2. Persist to `PendingGeofenceMetricStore` before any send attempt
7+
/// 3. Dispatch in `deliver`:
8+
/// - No HttpClient → EventBus, drain row
9+
/// - No userId yet → retain row for the next flush
10+
/// - HTTP attempt → success drains; failure retains for retry
511
///
6-
/// Events are keyed by "geofenceId:transitionType" and suppressed if the same event
7-
/// was emitted within the cooldown interval. Cooldowns are persisted to survive app restarts
8-
/// via the `GeofenceStorage` actor.
9-
///
10-
/// Communicates with DataPipeline via `EventBusHandler` (matching the in-app and push
11-
/// metric pattern) so the geofence module has no direct dependency on the DataPipeline module.
12+
/// `flushPending()` replays queued rows on module init and on every `ProfileIdentifiedEvent`.
13+
/// Concurrent deliveries for the same row are deduplicated in-process via the active-delivery
14+
/// ID set; on app kill the row stays on disk and is retried in the next process.
1215
final class GeofenceEventTracker {
1316
private let storage: GeofenceStorage
17+
private let pendingStore: PendingGeofenceMetricStore
18+
private let deliveryTracker: GeofenceDeliveryTracker?
19+
private let contextStore: BackgroundDeliveryContextStore
1420
private let eventBusHandler: EventBusHandler
1521
private let dateUtil: DateUtil
1622
private let logger: Logger
1723
private let cooldownInterval: TimeInterval
24+
private let activeDeliveryIds: Synchronized<Set<UUID>> = Synchronized([])
1825

1926
init(
2027
storage: GeofenceStorage,
28+
pendingStore: PendingGeofenceMetricStore,
29+
deliveryTracker: GeofenceDeliveryTracker?,
30+
contextStore: BackgroundDeliveryContextStore,
2131
eventBusHandler: EventBusHandler,
2232
dateUtil: DateUtil,
2333
logger: Logger,
2434
cooldownInterval: TimeInterval = GeofenceConstants.eventCooldownInterval
2535
) {
2636
self.storage = storage
37+
self.pendingStore = pendingStore
38+
self.deliveryTracker = deliveryTracker
39+
self.contextStore = contextStore
2740
self.eventBusHandler = eventBusHandler
2841
self.dateUtil = dateUtil
2942
self.logger = logger
3043
self.cooldownInterval = cooldownInterval
3144
}
3245

3346
/// Tracks a geofence transition event, suppressing duplicates within the cooldown window.
34-
/// Also purges expired cooldown entries opportunistically.
35-
func trackTransition(geofenceId: String, transition: GeofenceTransition) async {
47+
/// Persists the metric, then dispatches to one of the three delivery paths
48+
/// (EventBus drain / queue-retain / direct HTTP) per the rules in `deliver`.
49+
func trackTransition(
50+
geofenceId: String,
51+
transition: GeofenceTransition,
52+
location: LocationData? = nil
53+
) async {
3654
let cooldownKey = "\(geofenceId):\(transition.rawValue)"
3755
let now = dateUtil.now
3856

@@ -41,12 +59,78 @@ final class GeofenceEventTracker {
4159
return
4260
}
4361

44-
eventBusHandler.postEvent(TrackGeofenceMetricEvent(
62+
let metric = PendingGeofenceMetric(
4563
geofenceId: geofenceId,
46-
transition: transition
47-
))
48-
logger.geofenceEventTracked(geofenceId: geofenceId, transition: transition)
64+
transition: transition,
65+
latitude: location?.latitude,
66+
longitude: location?.longitude,
67+
timestamp: now
68+
)
69+
_ = await pendingStore.append(metric)
4970

71+
await deliver(metric: metric)
5072
await storage.purgeExpiredCooldowns(now: now, interval: cooldownInterval)
5173
}
74+
75+
/// Replays every queued metric through `deliver`.
76+
func flushPending() async {
77+
let pending = await pendingStore.loadAll()
78+
for metric in pending {
79+
await deliver(metric: metric)
80+
}
81+
}
82+
83+
// MARK: - Private
84+
85+
private func deliver(metric: PendingGeofenceMetric) async {
86+
// Claim before delivering so two concurrent callers (e.g. trackTransition and a
87+
// ProfileIdentifiedEvent-triggered flush, or two flushes) can't both send the
88+
// same row. The claim set is in-memory only — on app kill the row stays on
89+
// disk and is retried by flushPending in the next process.
90+
guard activeDeliveryIds.mutating({ $0.insert(metric.id).inserted }) else { return }
91+
defer { activeDeliveryIds.mutating { _ = $0.remove(metric.id) } }
92+
93+
guard let deliveryTracker else {
94+
// No HTTP path will ever be available in this process (MessagingPush not
95+
// initialized). Deliver via EventBus and drain; nothing would recover this
96+
// row otherwise.
97+
postEventBus(metric: metric)
98+
_ = await pendingStore.remove(id: metric.id)
99+
return
100+
}
101+
102+
guard let userId = contextStore.currentUserId, !userId.isEmpty else {
103+
// No userId yet (signed out / not yet identified). Leave the row so a later
104+
// ProfileIdentifiedEvent → flushPending can deliver it via direct HTTP with
105+
// the right userId. No EventBus — that would attribute the event to the
106+
// anonymous profile and duplicate when the flush succeeds.
107+
return
108+
}
109+
110+
let success = await withCheckedContinuation { (continuation: CheckedContinuation<Bool, Never>) in
111+
deliveryTracker.deliver(metric: metric, userId: userId) { result in
112+
switch result {
113+
case .success:
114+
continuation.resume(returning: true)
115+
case .failure:
116+
continuation.resume(returning: false)
117+
}
118+
}
119+
}
120+
121+
if success {
122+
_ = await pendingStore.remove(id: metric.id)
123+
logger.geofenceEventTracked(geofenceId: metric.geofenceId, transition: metric.transition)
124+
}
125+
// HTTP failure: row stays for next flush. No EventBus — same duplicate
126+
// risk if a later flush succeeds.
127+
}
128+
129+
private func postEventBus(metric: PendingGeofenceMetric) {
130+
eventBusHandler.postEvent(TrackGeofenceMetricEvent(
131+
geofenceId: metric.geofenceId,
132+
transition: metric.transition
133+
))
134+
logger.geofenceEventTracked(geofenceId: metric.geofenceId, transition: metric.transition)
135+
}
52136
}

Sources/Location/LocationModuleState.swift

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,13 @@ final class LocationModuleState {
3535
)
3636
let locationEnrichmentProvider = LocationProfileEnrichmentProvider(storage: storage, config: config)
3737
di.profileEnrichmentRegistry.register(locationEnrichmentProvider)
38-
registerEventSubscriptions(coordinator: coordinator, eventBusHandler: di.eventBusHandler)
38+
let geofenceEventTracker = makeGeofenceEventTracker(di: di)
39+
registerEventSubscriptions(
40+
coordinator: coordinator,
41+
geofenceEventTracker: geofenceEventTracker,
42+
eventBusHandler: di.eventBusHandler
43+
)
44+
Task { await geofenceEventTracker.flushPending() }
3945
let locationProvider = CoreLocationProvider(logger: di.logger)
4046
let implementation = LocationServicesImplementation(
4147
config: config,
@@ -49,10 +55,31 @@ final class LocationModuleState {
4955
Task { await implementation.setUpLifecycleObserver() }
5056
}
5157

52-
private func registerEventSubscriptions(coordinator: LocationSyncCoordinator, eventBusHandler: EventBusHandler) {
58+
private func registerEventSubscriptions(
59+
coordinator: LocationSyncCoordinator,
60+
geofenceEventTracker: GeofenceEventTracker,
61+
eventBusHandler: EventBusHandler
62+
) {
5363
eventBusHandler.addObserver(ProfileIdentifiedEvent.self) { _ in
5464
Task { await coordinator.syncCachedLocationIfNeeded() }
65+
Task { await geofenceEventTracker.flushPending() }
66+
}
67+
}
68+
69+
private func makeGeofenceEventTracker(di: DIGraphShared) -> GeofenceEventTracker {
70+
let contextStore = di.backgroundDeliveryContextStore
71+
let deliveryTracker: GeofenceDeliveryTracker? = di.getOptional(HttpClient.self).map {
72+
GeofenceDeliveryTrackerImpl(httpClient: $0, contextStore: contextStore, logger: di.logger)
5573
}
74+
return GeofenceEventTracker(
75+
storage: GeofenceStorage(),
76+
pendingStore: PendingGeofenceMetricStore(),
77+
deliveryTracker: deliveryTracker,
78+
contextStore: contextStore,
79+
eventBusHandler: di.eventBusHandler,
80+
dateUtil: di.dateUtil,
81+
logger: di.logger
82+
)
5683
}
5784

5885
/// The current Location services instance. Before initialization, returns an implementation that logs an error when used.

Tests/Location/Geofence/Event/GeofenceDeliveryTrackerTests.swift

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,22 @@ import Testing
66

77
@Suite("GeofenceDeliveryTracker")
88
struct GeofenceDeliveryTrackerTests {
9+
private func makeContextStore(host: String? = "cdp.customer.io/v1") -> BackgroundDeliveryContextStore {
10+
let store = BackgroundDeliveryContextStore(
11+
fileManager: .default,
12+
directoryURL: FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString)
13+
)
14+
if let host { store.setApiHost(host) }
15+
return store
16+
}
17+
918
private func makeTracker(
10-
region: Region = .US,
19+
contextStore: BackgroundDeliveryContextStore? = nil,
1120
httpClient: HttpClientMock = HttpClientMock()
1221
) -> (tracker: GeofenceDeliveryTrackerImpl, httpClient: HttpClientMock) {
1322
let tracker = GeofenceDeliveryTrackerImpl(
1423
httpClient: httpClient,
15-
region: region,
24+
contextStore: contextStore ?? makeContextStore(),
1625
logger: LoggerMock()
1726
)
1827
return (tracker, httpClient)
@@ -44,7 +53,7 @@ struct GeofenceDeliveryTrackerTests {
4453

4554
@Test
4655
func deliver_givenEnterTransition_expectAndroidWireFormat() async {
47-
let (tracker, httpClient) = makeTracker(region: .US)
56+
let (tracker, httpClient) = makeTracker()
4857
httpClient.requestClosure = { _, onComplete in onComplete(.success(Data())) }
4958

5059
await withCheckedContinuation { continuation in
@@ -99,8 +108,8 @@ struct GeofenceDeliveryTrackerTests {
99108
}
100109

101110
@Test
102-
func deliver_givenEURegion_expectEUHost() async {
103-
let (tracker, httpClient) = makeTracker(region: .EU)
111+
func deliver_givenEUApiHost_expectEUUrl() async {
112+
let (tracker, httpClient) = makeTracker(contextStore: makeContextStore(host: "cdp-eu.customer.io/v1"))
104113
httpClient.requestClosure = { _, onComplete in onComplete(.success(Data())) }
105114

106115
await withCheckedContinuation { continuation in
@@ -113,6 +122,21 @@ struct GeofenceDeliveryTrackerTests {
113122
#expect(url == "https://cdp-eu.customer.io/v1/track")
114123
}
115124

125+
@Test
126+
func deliver_givenSchemeQualifiedHost_expectSchemeNotDuplicated() async {
127+
let (tracker, httpClient) = makeTracker(contextStore: makeContextStore(host: "https://cdp.customer.io/v1"))
128+
httpClient.requestClosure = { _, onComplete in onComplete(.success(Data())) }
129+
130+
await withCheckedContinuation { continuation in
131+
tracker.deliver(metric: makeMetric(), userId: "user_42") { _ in
132+
continuation.resume()
133+
}
134+
}
135+
136+
let url = httpClient.requestReceivedArguments?.params.url.absoluteString
137+
#expect(url == "https://cdp.customer.io/v1/track")
138+
}
139+
116140
// MARK: - Guard clauses
117141

118142
@Test
@@ -129,6 +153,20 @@ struct GeofenceDeliveryTrackerTests {
129153
if case .success = result { Issue.record("expected failure for empty userId") }
130154
}
131155

156+
@Test
157+
func deliver_givenNoPersistedApiHost_expectFailureAndNoHttpCall() async {
158+
let (tracker, httpClient) = makeTracker(contextStore: makeContextStore(host: nil))
159+
160+
let result: Result<Void, HttpRequestError> = await withCheckedContinuation { continuation in
161+
tracker.deliver(metric: makeMetric(), userId: "user_42") { result in
162+
continuation.resume(returning: result)
163+
}
164+
}
165+
166+
#expect(httpClient.requestCallsCount == 0)
167+
if case .success = result { Issue.record("expected failure for missing apiHost") }
168+
}
169+
132170
// MARK: - Result propagation
133171

134172
@Test

0 commit comments

Comments
 (0)