Skip to content

Commit

Permalink
[#2434] Fix instagram ingestion race condition (#2625)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrismatix authored Nov 30, 2021
1 parent 804d2f0 commit 31686c1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void startStream() {
.payload(messaging.toString()).build()
);
} catch (Exception e) {
log.warn("Skipping facebook error for record " + entry.toString(), e);
log.warn("Skipping facebook error for record " + entry, e);
return null;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Stream;

Expand Down Expand Up @@ -53,7 +54,7 @@ String getSourceConversationId(final JsonNode webhookMessaging) throws NullPoint
: webhookMessaging.get("sender").get("id").asText();
}

public List<ProducerRecord<String, SpecificRecordBase>> getRecords(Event event, Function<String, Optional<String>> getMessageId) throws Exception {
public List<ProducerRecord<String, SpecificRecordBase>> getRecords(Event event, Function<String, Optional<String>> getMessageIdFn) throws Exception {
final String payload = event.getPayload();
final JsonNode rootNode = objectMapper.readTree(payload);

Expand All @@ -74,13 +75,16 @@ public List<ProducerRecord<String, SpecificRecordBase>> getRecords(Event event,
// Third party app
senderId = appId;
} else if (appId == null) {
// Sent by Facebook moderator via Facebook inbox
// Sent by moderator via Facebook inbox
senderId = getSourceConversationId(rootNode);
} else {
// Filter out echoes coming from this app
return List.of();
}


final Function<String, Optional<String>> getMessageId = getMessageIdFunctor(getMessageIdFn, channel.getSource().equals("instagram") && isEcho);

if (rootNode.has("reaction")) {
// In case that this is an existing message, try retrieving its id
final String facebookMessageId = rootNode.get("reaction").get("mid").textValue();
Expand Down Expand Up @@ -130,6 +134,32 @@ public List<ProducerRecord<String, SpecificRecordBase>> getRecords(Event event,
.build()));
}

// Instagram does not send an app id in its echoes. Therefore, we use the metadata Facebook mid for de-duplication
// The Facebook echo however can arrive before the metadata record is even written to the metadata topic
// Therefore we implement a retry mechanism only for Instagram echoes
// See: https://github.com/airyhq/airy/issues/2434
private Function<String, Optional<String>> getMessageIdFunctor(Function<String, Optional<String>> getMessageId, boolean isInstagramEcho) {
if (!isInstagramEcho) {
return getMessageId;
}

return (String facebookMessagId) -> {
for (int i = 3; i > 0; i--) {
final Optional<String> maybeMessagId = getMessageId.apply(facebookMessagId);
if (maybeMessagId.isPresent()) {
return maybeMessagId;
}
try {
TimeUnit.MILLISECONDS.sleep((4 - i) * 500L); // wait 0.5, 1.0, 1.5, 2.0 seconds
} catch (InterruptedException e) {
e.printStackTrace();
}
}

return getMessageId.apply(facebookMessagId);
};
}

private List<ProducerRecord<String, SpecificRecordBase>> getReaction(String messageId, JsonNode rootNode) throws Exception {
final JsonNode reaction = rootNode.get("reaction");

Expand Down

0 comments on commit 31686c1

Please sign in to comment.