Skip to content

Commit

Permalink
Merge pull request #652 from OpenSRP/651-p2p-sync-by-locations
Browse files Browse the repository at this point in the history
Support P2P sync by locations
  • Loading branch information
githengi authored Sep 23, 2020
2 parents f177cd7 + 20b0336 commit 1da0ccb
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 60 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VERSION_NAME=2.1.2-SNAPSHOT
VERSION_NAME=2.1.3-SNAPSHOT
VERSION_CODE=1
GROUP=org.smartregister
POM_SETTING_DESCRIPTION=OpenSRP Client Core Application
Expand Down
9 changes: 9 additions & 0 deletions opensrp-app/src/main/java/org/smartregister/P2POptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class P2POptions {
private SyncFinishedCallback syncFinishedCallback;
@Nullable
private RecalledIdentifier recalledIdentifier;
private String[] locationsFilter;

private boolean enableP2PLibrary;
private int batchSize = AllConstants.PeerToPeer.P2P_LIBRARY_DEFAULT_BATCH_SIZE;
Expand Down Expand Up @@ -81,4 +82,12 @@ public RecalledIdentifier getRecalledIdentifier() {
public void setRecalledIdentifier(@Nullable RecalledIdentifier recalledIdentifier) {
this.recalledIdentifier = recalledIdentifier;
}

public String[] getLocationsFilter() {
return locationsFilter;
}

public void setLocationsFilter(String[] locationsFilter) {
this.locationsFilter = locationsFilter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,17 @@ public static void createAdditionalColumns(SQLiteDatabase db) {
createAdditionalColumns(db, Table.event.name(), Table.client.name());
}

/**
* add locationId column on event table
*
* @param db the database being upgraded
*/
public static void addEventLocationId(SQLiteDatabase db) {
DatabaseMigrationUtils.addColumnIfNotExists(db, Table.event.name(), event_column.locationId.name(), VARCHAR);
DatabaseMigrationUtils.addIndexIfNotExists(db, Table.event.name(), event_column.locationId.name());
db.execSQL(String.format("UPDATE %s set %s = %s WHERE %s >0", Table.event.name(), event_column.locationId.name(), "substr(json,instr(json, '\"locationId\":')+14,36)", "instr(json, '\"locationId\":')"));
}

public static void dropIndexes(SQLiteDatabase db, BaseTable table) {
Cursor cursor = null;
try {
Expand Down Expand Up @@ -304,6 +315,7 @@ private boolean populateStatement(SQLiteStatement statement, Table table, JSONOb
statement.bindString(columnOrder.get(event_column.syncStatus.name()), syncStatus);
statement.bindString(columnOrder.get(event_column.validationStatus.name()), BaseRepository.TYPE_Valid);
statement.bindString(columnOrder.get(event_column.baseEntityId.name()), jsonObject.getString(event_column.baseEntityId.name()));
statement.bindString(columnOrder.get(event_column.locationId.name()), jsonObject.optString(event_column.locationId.name()));
if (jsonObject.has(EVENT_ID))
statement.bindString(columnOrder.get(event_column.eventId.name()), jsonObject.getString(EVENT_ID));
else if (jsonObject.has(_ID))
Expand Down Expand Up @@ -1494,15 +1506,17 @@ public List<EventClient> getEventsByBaseEntityIdsAndSyncStatus(String syncStatus
* default properties as the one fetched from the DB with an additional property that holds the {@code syncStatus}
* and {@code rowid} which are used for peer-to-peer sync.
*
* @param lastRowId
* @param lastRowId the last rowId sent
* @param locationId optional locationId filter
* @return JsonData which contains a {@link JSONArray} and the maximum row id in the array
* of {@link Event}s returned. This enables this method to be called again for the consequent batches
*/
@Nullable
public JsonData getEvents(long lastRowId, int limit) {
public JsonData getEvents(long lastRowId, int limit, @Nullable String locationId) {
JsonData jsonData = null;
JSONArray jsonArray = new JSONArray();
long maxRowId = 0;
String locationFilter = locationId != null ? String.format(" %s =? AND ", event_column.locationId.name()) : "";

String query = "SELECT "
+ event_column.json
Expand All @@ -1513,13 +1527,14 @@ public JsonData getEvents(long lastRowId, int limit) {
+ " FROM "
+ eventTable.name()
+ " WHERE "
+ locationFilter
+ ROWID
+ " > ? "
+ " ORDER BY " + ROWID + " ASC LIMIT ?";
Cursor cursor = null;

try {
cursor = getWritableDatabase().rawQuery(query, new Object[]{lastRowId, limit});
cursor = getWritableDatabase().rawQuery(query, locationId != null ? new Object[]{locationId, lastRowId, limit} : new Object[]{lastRowId, limit});

while (cursor.moveToNext()) {
long rowId = cursor.getLong(2);
Expand Down Expand Up @@ -1642,33 +1657,43 @@ public JsonData getClientsWithLastLocationID(long lastRowId, int limit) {
/**
* Fetches {@link Client}s whose rowid > #lastRowId up to the #limit provided.
*
* @param lastRowId
* @param lastRowId the last row Id queries
* @param limit the number of rows to the pulled
* @param locationId an optional locationId filter for getting the data
* @return JsonData which contains a {@link JSONArray} and the maximum row id in the array
* of {@link Client}s returned or {@code null} if no records match the conditions or an exception occurred.
* This enables this method to be called again for the consequent batches
*/
@Nullable
public JsonData getClients(long lastRowId, int limit) {
public JsonData getClients(long lastRowId, int limit, @Nullable String locationId) {
JsonData jsonData = null;
JSONArray jsonArray = new JSONArray();
long maxRowId = 0;

String locationFilter = locationId != null ? String.format(" %s =? AND ", client_column.locationId.name()) : "";
String query = "SELECT "
+ event_column.json
+ client_column.json
+ ","
+ event_column.syncStatus
+ client_column.syncStatus
+ ","
+ ROWID
+ " FROM "
+ clientTable.name()
+ " WHERE "
+ locationFilter
+ ROWID
+ " > ? "
+ " ORDER BY " + ROWID + " ASC LIMIT ?";
Cursor cursor = null;

try {
cursor = getWritableDatabase().rawQuery(query, new Object[]{lastRowId, limit});
Object[] params;
if (locationId == null) {
params = new Object[]{lastRowId, limit};
} else {
params = new Object[]{locationId, lastRowId, limit};
}
cursor = getWritableDatabase().rawQuery(query, params);

while (cursor.moveToNext()) {
long rowId = cursor.getLong(2);
Expand Down Expand Up @@ -1775,6 +1800,7 @@ public void addEvent(String baseEntityId, JSONObject jsonObject, String syncStat
values.put(event_column.updatedAt.name(), dateFormat.format(new Date()));
values.put(event_column.baseEntityId.name(), baseEntityId);
values.put(event_column.syncStatus.name(), syncStatus);
values.put(event_column.locationId.name(), jsonObject.optString(event_column.locationId.name()));
JSONObject details = jsonObject.optJSONObject(AllConstants.DETAILS);
if (details != null)
values.put(event_column.planId.name(), details.optString(AllConstants.PLAN_IDENTIFIER));
Expand Down Expand Up @@ -2058,7 +2084,8 @@ public enum event_column implements Column {
formSubmissionId(ColumnAttribute.Type.text, false, true),
updatedAt(ColumnAttribute.Type.date, false, true),
serverVersion(ColumnAttribute.Type.longnum, false, true),
planId(ColumnAttribute.Type.text, false, true);
planId(ColumnAttribute.Type.text, false, true),
locationId(ColumnAttribute.Type.text, false, true);

private ColumnAttribute column;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public long receiveJson(@NonNull DataType dataType, @NonNull JSONArray jsonArray
return 0;
}

if (dataType.getName().equals(event.getName())) {
if (dataType.getName().startsWith(event.getName())) {

Timber.e("Received %s total events", String.valueOf(jsonArray.length()));

Expand All @@ -92,7 +92,7 @@ public long receiveJson(@NonNull DataType dataType, @NonNull JSONArray jsonArray
if (foreignData.length() > 0)
foreignEventClientRepository.batchInsertEvents(foreignData, 0);

} else if (dataType.getName().equals(client.getName())) {
} else if (dataType.getName().startsWith(client.getName())) {

Timber.e("Received %s clients", String.valueOf(jsonArray.length()));

Expand All @@ -104,13 +104,13 @@ public long receiveJson(@NonNull DataType dataType, @NonNull JSONArray jsonArray
if (foreignData.length() > 0)
foreignEventClientRepository.batchInsertClients(foreignData);

} else if (dataType.getName().equals(structure.getName())) {
} else if (dataType.getName().startsWith(structure.getName())) {
Timber.e("Received %s structures", String.valueOf(jsonArray.length()));
structureRepository.batchInsertStructures(jsonArray);
} else if (dataType.getName().equals(task.getName())) {
} else if (dataType.getName().startsWith(task.getName())) {
Timber.e("Received %s tasks", String.valueOf(jsonArray.length()));
taskRepository.batchInsertTasks(jsonArray);
} else if (dataType.getName().equals(foreignClient.getName())) {
} else if (dataType.getName().startsWith(foreignClient.getName())) {

Timber.e("Received %s foreign clients", String.valueOf(jsonArray.length()));

Expand All @@ -122,7 +122,7 @@ public long receiveJson(@NonNull DataType dataType, @NonNull JSONArray jsonArray
if (foreignData.length() > 0)
foreignEventClientRepository.batchInsertClients(foreignData);

} else if (dataType.getName().equals(foreignEvent.getName())) {
} else if (dataType.getName().startsWith(foreignEvent.getName())) {

Timber.e("Received %s foreign events", String.valueOf(jsonArray.length()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;

import org.apache.commons.lang3.ArrayUtils;
import org.smartregister.AllConstants;
import org.smartregister.CoreLibrary;
import org.smartregister.P2POptions;
import org.smartregister.R;
import org.smartregister.p2p.P2PLibrary;
import org.smartregister.p2p.model.DataType;
Expand All @@ -25,40 +27,62 @@

public class P2PSenderTransferDao extends BaseP2PTransferDao implements SenderTransferDao {

private static final String SEPARATOR = "-";

@Nullable
@Override
public TreeSet<DataType> getDataTypes() {
return (TreeSet<DataType>) dataTypes.clone();
TreeSet<DataType> dataTypeTreeSet = new TreeSet<>();

if (locationFilterEnabled()) {
for (String location : getP2POptions().getLocationsFilter()) {
for (DataType dataType : dataTypes) {
dataTypeTreeSet.add(new DataType(dataType.getName() + SEPARATOR + location, dataType.getType(), dataTypeTreeSet.size()));
}
}
return dataTypeTreeSet;
} else {
return new TreeSet<>(dataTypes);
}
}

private boolean locationFilterEnabled() {
return getP2POptions() != null && !ArrayUtils.isEmpty(getP2POptions().getLocationsFilter());
}

@Nullable
@Override
public JsonData getJsonData(@NonNull DataType dataType, long lastRecordId, int batchSize) {
if (dataType.getName().equals(event.getName())) {
String locationId = null;
if (locationFilterEnabled()) {
String[] dataTypeParams = dataType.getName().split(SEPARATOR);
locationId = dataTypeParams.length == 1 ? null : dataTypeParams[1];
}
if (dataType.getName().startsWith(event.getName())) {
return CoreLibrary.getInstance().context()
.getEventClientRepository().getEvents(lastRecordId, batchSize);
} else if (dataType.getName().equals(client.getName())) {
.getEventClientRepository().getEvents(lastRecordId, batchSize, locationId);
} else if (dataType.getName().startsWith(client.getName())) {

if (DrishtiApplication.getInstance().getP2PClassifier() == null) {
return CoreLibrary.getInstance().context()
.getEventClientRepository().getClients(lastRecordId, batchSize);
.getEventClientRepository().getClients(lastRecordId, batchSize, locationId);
} else {
return CoreLibrary.getInstance().context()
.getEventClientRepository().getClientsWithLastLocationID(lastRecordId, batchSize);
}

} else if (dataType.getName().equals(structure.getName())) {
} else if (dataType.getName().startsWith(structure.getName())) {
return CoreLibrary.getInstance().context()
.getStructureRepository().getStructures(lastRecordId, batchSize);
} else if (dataType.getName().equals(task.getName())) {
.getStructureRepository().getStructures(lastRecordId, batchSize, locationId);
} else if (dataType.getName().startsWith(task.getName())) {
return CoreLibrary.getInstance().context()
.getTaskRepository().getTasks(lastRecordId, batchSize);
} else if (CoreLibrary.getInstance().context().hasForeignEvents() && dataType.getName().equals(foreignClient.getName())) {
.getTaskRepository().getTasks(lastRecordId, batchSize, locationId);
} else if (CoreLibrary.getInstance().context().hasForeignEvents() && dataType.getName().startsWith(foreignClient.getName())) {
return CoreLibrary.getInstance().context()
.getForeignEventClientRepository().getClients(lastRecordId, batchSize);
} else if (CoreLibrary.getInstance().context().hasForeignEvents() && dataType.getName().equals(foreignEvent.getName())) {
.getForeignEventClientRepository().getClients(lastRecordId, batchSize, locationId);
} else if (CoreLibrary.getInstance().context().hasForeignEvents() && dataType.getName().startsWith(foreignEvent.getName())) {
return CoreLibrary.getInstance().context()
.getForeignEventClientRepository().getEvents(lastRecordId, batchSize);
.getForeignEventClientRepository().getEvents(lastRecordId, batchSize, locationId);
} else {
Timber.e(P2PLibrary.getInstance().getContext().getString(R.string.log_data_type_provided_does_not_exist_in_the_sender)
, dataType.getName());
Expand Down Expand Up @@ -107,4 +131,7 @@ public MultiMediaData getMultiMediaData(@NonNull DataType dataType, long lastRec
}
}

public P2POptions getP2POptions() {
return CoreLibrary.getInstance().getP2POptions();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,20 +156,22 @@ public boolean batchInsertStructures(JSONArray array) {
* Fetches {@link Location}s whose rowid > #lastRowId up to the #limit provided.
*
* @param lastRowId
* @param parentLocationId
* @return JsonData which contains a {@link JSONArray} and the maximum row id in the array
* of {@link Client}s returned or {@code null} if no records match the conditions or an exception occurred.
* of {@link Location}s returned or {@code null} if no records match the conditions or an exception occurred.
* This enables this method to be called again for the consequent batches
*/
@Nullable
public JsonData getStructures(long lastRowId, int limit) {
public JsonData getStructures(long lastRowId, int limit, String parentLocationId) {
JsonData jsonData = null;
long maxRowId = 0;

String locationFilter = parentLocationId != null ? String.format(" %s =? AND ", PARENT_ID) : "";
String query = "SELECT "
+ ROWID
+",* FROM "
+ ",* FROM "
+ STRUCTURE_TABLE
+ " WHERE "
+ locationFilter
+ ROWID
+ " > ? "
+ " ORDER BY " + ROWID + " ASC LIMIT ?";
Expand All @@ -178,7 +180,7 @@ public JsonData getStructures(long lastRowId, int limit) {
JSONArray jsonArray = new JSONArray();

try {
cursor = getWritableDatabase().rawQuery(query, new Object[]{lastRowId, limit});
cursor = getWritableDatabase().rawQuery(query, parentLocationId != null ? new Object[]{parentLocationId, lastRowId, limit} : new Object[]{lastRowId, limit});


while (cursor.moveToNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,23 +502,25 @@ public boolean batchInsertTasks(JSONArray array) {
}

/**
* Fetches {@link Location}s whose rowid > #lastRowId up to the #limit provided.
* Fetches {@link Task}s whose rowid > #lastRowId up to the #limit provided.
*
* @param lastRowId
* @param jurisdictionId
* @return JsonData which contains a {@link JSONArray} and the maximum row id in the array
* of {@link Client}s returned or {@code null} if no records match the conditions or an exception occurred.
* of {@link Task}s returned or {@code null} if no records match the conditions or an exception occurred.
* This enables this method to be called again for the consequent batches
*/
@Nullable
public JsonData getTasks(long lastRowId, int limit) {
public JsonData getTasks(long lastRowId, int limit, String jurisdictionId) {
JsonData jsonData = null;
long maxRowId = 0;

String locationFilter = jurisdictionId != null ? String.format(" %s =? AND ", GROUP_ID) : "";
String query = "SELECT "
+ ROWID
+ ",* FROM "
+ TASK_TABLE
+ " WHERE "
+ locationFilter
+ ROWID
+ " > ? "
+ " ORDER BY " + ROWID + " ASC LIMIT ?";
Expand All @@ -527,7 +529,7 @@ public JsonData getTasks(long lastRowId, int limit) {
JSONArray jsonArray = new JSONArray();

try {
cursor = getWritableDatabase().rawQuery(query, new Object[]{lastRowId, limit});
cursor = getWritableDatabase().rawQuery(query, jurisdictionId != null ? new Object[]{jurisdictionId, lastRowId, limit} : new Object[]{lastRowId, limit});

while (cursor.moveToNext()) {
long rowId = cursor.getLong(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ public void getEventsShouldReturnGenerateMaxRowIdAndIncludeRowIdAndSyncStatusInJ
}
Mockito.doReturn(matrixCursor).when(sqliteDatabase).rawQuery(Mockito.eq("SELECT json,syncStatus,rowid FROM event WHERE rowid > ? ORDER BY rowid ASC LIMIT ?"), Mockito.any(Object[].class));

JsonData jsonData = eventClientRepository.getEvents(0, 20);
JsonData jsonData = eventClientRepository.getEvents(0, 20, null);

Assert.assertEquals(30L, jsonData.getHighestRecordId());
JSONObject jsonObject = jsonData.getJsonArray().getJSONObject(0);
Expand All @@ -396,7 +396,7 @@ public void getClientShouldReturnGenerateMaxRowIdAndIncludeRowIdAndSyncStatusInJ

Mockito.doReturn(matrixCursor).when(sqliteDatabase).rawQuery(Mockito.eq("SELECT json,syncStatus,rowid FROM client WHERE rowid > ? ORDER BY rowid ASC LIMIT ?"), Mockito.any(Object[].class));

JsonData jsonData = eventClientRepository.getClients(0, 20);
JsonData jsonData = eventClientRepository.getClients(0, 20,null);

Assert.assertEquals(30L, jsonData.getHighestRecordId());
JSONObject jsonObject = jsonData.getJsonArray().getJSONObject(0);
Expand Down
Loading

0 comments on commit 1da0ccb

Please sign in to comment.