Skip to content

Commit

Permalink
Merge branch 'release/0.36.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
Pascal Holy committed Dec 1, 2021
2 parents 85c4aeb + 4e737d9 commit ad9f007
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 132 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.36.0
0.36.1
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void startStream() {
.payload(messaging.toString()).build()
);
} catch (Exception e) {
log.warn("Skipping facebook error for record " + entry.toString(), e);
log.warn("Skipping facebook error for record " + entry, e);
return null;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Stream;

Expand Down Expand Up @@ -53,7 +54,7 @@ String getSourceConversationId(final JsonNode webhookMessaging) throws NullPoint
: webhookMessaging.get("sender").get("id").asText();
}

public List<ProducerRecord<String, SpecificRecordBase>> getRecords(Event event, Function<String, Optional<String>> getMessageId) throws Exception {
public List<ProducerRecord<String, SpecificRecordBase>> getRecords(Event event, Function<String, Optional<String>> getMessageIdFn) throws Exception {
final String payload = event.getPayload();
final JsonNode rootNode = objectMapper.readTree(payload);

Expand All @@ -74,13 +75,16 @@ public List<ProducerRecord<String, SpecificRecordBase>> getRecords(Event event,
// Third party app
senderId = appId;
} else if (appId == null) {
// Sent by Facebook moderator via Facebook inbox
// Sent by moderator via Facebook inbox
senderId = getSourceConversationId(rootNode);
} else {
// Filter out echoes coming from this app
return List.of();
}


final Function<String, Optional<String>> getMessageId = getMessageIdFunctor(getMessageIdFn, channel.getSource().equals("instagram") && isEcho);

if (rootNode.has("reaction")) {
// In case that this is an existing message, try retrieving its id
final String facebookMessageId = rootNode.get("reaction").get("mid").textValue();
Expand Down Expand Up @@ -130,6 +134,32 @@ public List<ProducerRecord<String, SpecificRecordBase>> getRecords(Event event,
.build()));
}

// Instagram does not send an app id in its echoes. Therefore, we use the metadata Facebook mid for de-duplication
// The Facebook echo however can arrive before the metadata record is even written to the metadata topic
// Therefore we implement a retry mechanism only for Instagram echoes
// See: https://github.com/airyhq/airy/issues/2434
private Function<String, Optional<String>> getMessageIdFunctor(Function<String, Optional<String>> getMessageId, boolean isInstagramEcho) {
if (!isInstagramEcho) {
return getMessageId;
}

return (String facebookMessagId) -> {
for (int i = 3; i > 0; i--) {
final Optional<String> maybeMessagId = getMessageId.apply(facebookMessagId);
if (maybeMessagId.isPresent()) {
return maybeMessagId;
}
try {
TimeUnit.MILLISECONDS.sleep((4 - i) * 500L); // wait 0.5, 1.0, 1.5, 2.0 seconds
} catch (InterruptedException e) {
e.printStackTrace();
}
}

return getMessageId.apply(facebookMessagId);
};
}

private List<ProducerRecord<String, SpecificRecordBase>> getReaction(String messageId, JsonNode rootNode) throws Exception {
final JsonNode reaction = rootNode.get("reaction");

Expand Down
Loading

0 comments on commit ad9f007

Please sign in to comment.