Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ void GIVEN_throttled_cloud_update_requests_WHEN_cloud_updates_THEN_cloud_updates
when(mockUpdateThingShadowResponse.payload()).thenReturn(SdkBytes.fromString("{\"version\": 1}", UTF_8));
when(iotDataPlaneClientFactory.getIotDataPlaneClient().updateThingShadow(any(software.amazon.awssdk.services.iotdataplane.model.UpdateThingShadowRequest.class)))
.thenReturn(mockUpdateThingShadowResponse);
ShadowDocument shadowDocument = new ShadowDocument(localShadowContentV1.getBytes());

// mock dao calls in cloud update
when(dao.getShadowThing(anyString(), anyString())).thenReturn(Optional.of(new ShadowDocument(localShadowContentV1.getBytes())));
when(dao.getShadowThing(anyString(), anyString())).thenReturn(Optional.of(shadowDocument));
when(dao.getShadowSyncInformation(anyString(), anyString())).thenReturn(
Optional.of(SyncInformation.builder()
.lastSyncedDocument(lastSyncedDocument.getBytes())
Expand All @@ -118,7 +119,7 @@ void GIVEN_throttled_cloud_update_requests_WHEN_cloud_updates_THEN_cloud_updates
// thingName has to be unique to prevent requests from being merged
final int totalRequestCalls = 10;
for (int i = 0; i < totalRequestCalls; i++) {
syncHandler.pushCloudUpdateSyncRequest(String.valueOf(i), CLASSIC_SHADOW_IDENTIFIER, updateDocument);
syncHandler.pushCloudUpdateSyncRequest(String.valueOf(i), CLASSIC_SHADOW_IDENTIFIER, updateDocument, shadowDocument);
}

// verify that some requests have been throttled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ void GIVEN_synced_shadow_WHEN_multiple_local_updates_THEN_cloud_updates(Class<?e

assertEmptySyncQueue(clazz);
verify(syncHandler, after(10000).atLeast(4))
.pushCloudUpdateSyncRequest(anyString(), anyString(), any(JsonNode.class));
.pushCloudUpdateSyncRequest(anyString(), anyString(), any(JsonNode.class), any(ShadowDocument.class));
assertThat("sync info exists", () -> syncInfo.get().isPresent(), eventuallyEval(is(true)));
assertThat("local shadow exists", localShadow.get().isPresent(), is(true));
ShadowDocument shadowDocument = localShadow.get().get();
Expand Down Expand Up @@ -607,7 +607,7 @@ void GIVEN_synced_shadow_WHEN_multiple_cloud_and_local_received_THEN_cloud_updat

assertEmptySyncQueue(clazz);
verify(syncHandler, after(10000).times(4))
.pushCloudUpdateSyncRequest(anyString(), anyString(), any(JsonNode.class));
.pushCloudUpdateSyncRequest(anyString(), anyString(), any(JsonNode.class), any(ShadowDocument.class));
assertThat("sync info exists", () -> syncInfo.get().isPresent(), eventuallyEval(is(true)));
assertThat("local shadow exists", localShadow.get().isPresent(), is(true));
ShadowDocument shadowDocument = localShadow.get().get();
Expand Down Expand Up @@ -890,7 +890,7 @@ void GIVEN_unsynced_shadow_WHEN_local_updates_THEN_no_cloud_update(Class<?extend
request.setPayload(localShadowContentV1.getBytes(UTF_8));
updateHandler.handleRequest(request, "DoAll");
verify(syncHandler, timeout(Duration.ofSeconds(10).toMillis()).times(1))
.pushCloudUpdateSyncRequest(any(), any(), any());
.pushCloudUpdateSyncRequest(any(), any(), any(), any());
verify(iotDataPlaneClientFactory.getIotDataPlaneClient(), after(Duration.ofSeconds(5).toMillis()).never())
.updateThingShadow(any(software.amazon.awssdk.services.iotdataplane.model.UpdateThingShadowRequest.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ public UpdateThingShadowHandlerResponse handleRequest(UpdateThingShadowRequest r
.kv("service-name", serviceName)
.log("Successfully updated shadow");
removeMetadataNode(updateDocumentRequest);
this.syncHandler.pushCloudUpdateSyncRequest(thingName, shadowName, updateDocumentRequest);
this.syncHandler.pushCloudUpdateSyncRequest(thingName, shadowName, updateDocumentRequest,
updatedDocument);

return new UpdateThingShadowHandlerResponse(updateThingShadowResponse, updateDocumentBytes);
} catch (InvalidRequestParametersException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.logging.impl.LogManager;
import com.aws.greengrass.shadowmanager.model.ShadowDocument;
import com.aws.greengrass.shadowmanager.model.configuration.ThingShadowSyncConfiguration;
import com.aws.greengrass.shadowmanager.sync.model.BaseSyncRequest;
import com.aws.greengrass.shadowmanager.sync.model.CloudDeleteSyncRequest;
Expand Down Expand Up @@ -224,10 +225,13 @@ private boolean isShadowSynced(String thingName, String shadowName) {
* @param thingName The thing name associated with the sync shadow update
* @param shadowName The shadow name associated with the sync shadow update
* @param updateDocument The update shadow request
* @param localShadowDocument The local shadow document state at the time of the update
*/
public void pushCloudUpdateSyncRequest(String thingName, String shadowName, JsonNode updateDocument) {
public void pushCloudUpdateSyncRequest(String thingName, String shadowName, JsonNode updateDocument,
ShadowDocument localShadowDocument) {
if (isShadowSynced(thingName, shadowName) && !Direction.CLOUD_TO_DEVICE.equals(direction.get())) {
overallSyncStrategy.putSyncRequest(new CloudUpdateSyncRequest(thingName, shadowName, updateDocument));
overallSyncStrategy.putSyncRequest(new CloudUpdateSyncRequest(thingName, shadowName, updateDocument,
localShadowDocument));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import software.amazon.awssdk.services.iotdataplane.model.UpdateThingShadowResponse;

import java.io.IOException;
import java.util.Objects;
import java.util.Optional;

import static com.aws.greengrass.shadowmanager.model.Constants.LOG_CLOUD_VERSION_KEY;
Expand All @@ -46,6 +47,7 @@
*/
public class CloudUpdateSyncRequest extends BaseSyncRequest {
private static final Logger logger = LogManager.getLogger(CloudUpdateSyncRequest.class);
private ShadowDocument localShadowDocument;

@NonNull
JsonNode updateDocument;
Expand All @@ -56,21 +58,25 @@ public class CloudUpdateSyncRequest extends BaseSyncRequest {
* @param thingName The thing name associated with the sync shadow update
* @param shadowName The shadow name associated with the sync shadow update
* @param updateDocument The update request bytes.
* @param localShadowDocument The local shadow document state after the local update was made
*/
public CloudUpdateSyncRequest(String thingName,
String shadowName,
JsonNode updateDocument) {
JsonNode updateDocument,
ShadowDocument localShadowDocument) {
super(thingName, shadowName);
this.updateDocument = updateDocument;
this.localShadowDocument = localShadowDocument;
}

/**
* Merge the sync requests together.
*
* @param other the newer request to merge
* @param nextRequest the newer request to merge
*/
public void merge(CloudUpdateSyncRequest other) {
JsonMerger.merge(updateDocument, other.updateDocument);
public void merge(CloudUpdateSyncRequest nextRequest) {
JsonMerger.merge(updateDocument, nextRequest.updateDocument);
this.localShadowDocument = nextRequest.localShadowDocument;
}

/**
Expand All @@ -85,14 +91,12 @@ public void merge(CloudUpdateSyncRequest other) {
@Override
public void execute(SyncContext context) throws RetryableException, SkipSyncRequestException,
ConflictException, UnknownShadowException, InterruptedException {
Optional<ShadowDocument> shadowDocument = context.getDao().getShadowThing(getThingName(), getShadowName());

//TODO: store this information in a return object to avoid unnecessary calls to DAO.
SyncInformation currentSyncInformation = context.getDao()
.getShadowSyncInformation(getThingName(), getShadowName())
.orElseThrow(() -> new UnknownShadowException("Shadow not found in sync table"));

if (!isUpdateNecessary(shadowDocument, currentSyncInformation, context)) {
if (!isUpdateNecessary(currentSyncInformation, context)) {
return;
}

Expand Down Expand Up @@ -150,13 +154,13 @@ public void execute(SyncContext context) throws RetryableException, SkipSyncRequ

try {
context.getDao().updateSyncInformation(SyncInformation.builder()
.lastSyncedDocument(JsonUtil.getPayloadBytes(shadowDocument.get().toJson(false)))
.lastSyncedDocument(JsonUtil.getPayloadBytes(localShadowDocument.toJson(false)))
.cloudVersion(cloudUpdatedVersion)
.cloudDeleted(false)
.shadowName(getShadowName())
.thingName(getThingName())
.cloudUpdateTime(shadowDocument.get().getMetadata().getLatestUpdatedTimestamp())
.localVersion(shadowDocument.get().getVersion())
.cloudUpdateTime(localShadowDocument.getMetadata().getLatestUpdatedTimestamp())
.localVersion(localShadowDocument.getVersion())
.build());
} catch (JsonProcessingException | ShadowManagerDataException e) {
logger.atError()
Expand All @@ -178,20 +182,18 @@ public void execute(SyncContext context) throws RetryableException, SkipSyncRequ
*/
@Override
boolean isUpdateNecessary(SyncContext context) throws SkipSyncRequestException, UnknownShadowException {
Optional<ShadowDocument> shadowDocument = context.getDao().getShadowThing(getThingName(), getShadowName());

//TODO: store this information in a return object to avoid unnecessary calls to DAO.
SyncInformation currentSyncInformation = context.getDao()
.getShadowSyncInformation(getThingName(), getShadowName())
.orElseThrow(() -> new UnknownShadowException("Shadow not found in sync table"));

return isUpdateNecessary(shadowDocument, currentSyncInformation, context);
return isUpdateNecessary(currentSyncInformation, context);
}

private boolean isUpdateNecessary(Optional<ShadowDocument> shadowDocument, SyncInformation currentSyncInformation,
private boolean isUpdateNecessary(SyncInformation currentSyncInformation,
SyncContext context)
throws SkipSyncRequestException {
if (!shadowDocument.isPresent()) {
if (Objects.isNull(localShadowDocument)) {
logger.atDebug()
.kv(LOG_THING_NAME_KEY, getThingName())
.kv(LOG_SHADOW_NAME_KEY, getShadowName())
Expand All @@ -208,7 +210,7 @@ private boolean isUpdateNecessary(Optional<ShadowDocument> shadowDocument, SyncI
.kv(LOG_LOCAL_VERSION_KEY, currentSyncInformation.getLocalVersion())
.kv(LOG_CLOUD_VERSION_KEY, currentSyncInformation.getCloudVersion())
.log("Cloud shadow already contains update payload. No sync is necessary");
updateSyncInformationVersion(shadowDocument, currentSyncInformation, context);
updateSyncInformationVersion(Optional.of(localShadowDocument), currentSyncInformation, context);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ void GIVEN_update_document_with_delta_WHEN_handle_request_THEN_delta_is_dropped_
getJsonFromResource(RESOURCE_DIRECTORY_NAME + BAD_UPDATE_DOCUMENT_WITH_DELTA_FILE_NAME);
UpdateThingShadowRequestHandler updateThingShadowIPCHandler = new UpdateThingShadowRequestHandler(mockDao, mockAuthorizationHandlerWrapper, mockPubSubClientWrapper, mockSynchronizeHelper, mockSyncHandler);
ArgumentCaptor<JsonNode> documentCaptor = ArgumentCaptor.forClass(JsonNode.class);
doNothing().when(mockSyncHandler).pushCloudUpdateSyncRequest(any(), any(), documentCaptor.capture());
doNothing().when(mockSyncHandler).pushCloudUpdateSyncRequest(any(), any(), documentCaptor.capture(), any());
when(mockDao.updateShadowThing(any(), any(), any(), anyLong()))
.thenReturn(Optional.of(new byte[]{}));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.aws.greengrass.shadowmanager.sync;


import com.aws.greengrass.shadowmanager.model.ShadowDocument;
import com.aws.greengrass.shadowmanager.model.configuration.ThingShadowSyncConfiguration;
import com.aws.greengrass.shadowmanager.sync.model.BaseSyncRequest;
import com.aws.greengrass.shadowmanager.sync.model.CloudDeleteSyncRequest;
Expand Down Expand Up @@ -137,7 +138,7 @@ void GIVEN_synced_shadows_WHEN_pushCloudUpdateSyncRequest_THEN_calls_overall_syn
syncHandler.setSyncConfigurations(syncConfigurations);

// WHEN
syncHandler.pushCloudUpdateSyncRequest("a", "1", mock(JsonNode.class));
syncHandler.pushCloudUpdateSyncRequest("a", "1", mock(JsonNode.class), mock(ShadowDocument.class));

// THEN
verify(mockSyncStrategy, times(1)).putSyncRequest(any());
Expand Down
Loading
Loading