Skip to content

Commit cbb5f54

Browse files
authored
Merge branch 'develop' into TASK-7645
2 parents c19e138 + ce78093 commit cbb5f54

File tree

3 files changed

+435
-1
lines changed

3 files changed

+435
-1
lines changed

commons-datastore/commons-datastore-mongodb/src/main/java/org/opencb/commons/datastore/mongodb/MongoDBCollection.java

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,123 @@ public DataResult update(ClientSession clientSession, List<? extends Bson> queri
444444
start);
445445
}
446446

447+
/**
448+
* Update documents using an aggregation pipeline.
449+
*
450+
* @param query the filter to select the documents to update
451+
* @param pipeline the aggregation pipeline to apply for the update
452+
* @param options additional options for the query and update operation
453+
* @return a DataResult containing the result of the update operation
454+
*/
455+
public DataResult updateWithPipeline(Bson query, List<? extends Bson> pipeline, QueryOptions options) {
456+
return updateWithPipeline(null, query, pipeline, options);
457+
}
458+
459+
/**
460+
* Update documents using an aggregation pipeline.
461+
*
462+
* @param clientSession the client session to use for the operation, or null if not using sessions
463+
* @param query the filter to select the documents to update
464+
* @param pipeline the aggregation pipeline to apply for the update
465+
* @param options additional options for the query and update operation
466+
* @return a DataResult containing the result of the update operation
467+
*/
468+
public DataResult updateWithPipeline(ClientSession clientSession, Bson query, List<? extends Bson> pipeline, QueryOptions options) {
469+
long start = startQuery();
470+
471+
boolean upsert = false;
472+
boolean multi = false;
473+
if (options != null) {
474+
upsert = options.getBoolean(UPSERT);
475+
multi = options.getBoolean(MULTI);
476+
}
477+
478+
UpdateResult updateResult = mongoDBNativeQuery.updateWithPipeline(clientSession, query, pipeline, upsert, multi);
479+
return endWrite(updateResult.getMatchedCount(), updateResult.getUpsertedId() != null ? 1 : 0,
480+
updateResult.getUpsertedId() == null ? updateResult.getModifiedCount() : 0, 0, 0, start);
481+
}
482+
483+
/**
484+
* Finds a document matching the given query, applies an aggregation pipeline to update it, and returns the updated document.
485+
*
486+
* @param query the filter to select the document to update
487+
* @param projection the fields to return in the resulting document
488+
* @param sort the sort criteria to apply before finding the document
489+
* @param pipeline the aggregation pipeline to apply for the update
490+
* @param options additional options for the query and update operation
491+
* @return a DataResult containing the updated document, or an empty result if no document matched
492+
*/
493+
public DataResult<Document> findAndUpdateWithPipeline(Bson query, Bson projection, Bson sort,
494+
List<? extends Bson> pipeline, QueryOptions options) {
495+
return privateFindAndUpdateWithPipeline(null, query, projection, sort, pipeline, options, null, null);
496+
}
497+
498+
/**
499+
* Finds a document matching the given query, applies an aggregation pipeline to update it, and returns the updated document.
500+
*
501+
* @param clientSession the client session to use for the operation, or null if not using sessions
502+
* @param query the filter to select the document to update
503+
* @param projection the fields to return in the resulting document
504+
* @param sort the sort criteria to apply before finding the document
505+
* @param pipeline the aggregation pipeline to apply for the update
506+
* @param options additional options for the query and update operation
507+
* @return a DataResult containing the updated document, or an empty result if no document matched
508+
*/
509+
public DataResult<Document> findAndUpdateWithPipeline(ClientSession clientSession, Bson query, Bson projection, Bson sort,
510+
List<? extends Bson> pipeline, QueryOptions options) {
511+
return privateFindAndUpdateWithPipeline(clientSession, query, projection, sort, pipeline, options, null, null);
512+
}
513+
514+
/**
515+
* Finds a document matching the given query, applies an aggregation pipeline to update it, and returns the updated document.
516+
*
517+
* @param query the filter to select the document to update
518+
* @param projection the fields to return in the resulting document
519+
* @param sort the sort criteria to apply before finding the document
520+
* @param pipeline the aggregation pipeline to apply for the update
521+
* @param clazz the class type to convert the result to; if null or Document.class, returns a Document
522+
* @param options additional options for the query and update operation
523+
* @param <T> the type of the returned result
524+
* @return a DataResult containing the updated document, or an empty result if no document matched
525+
*/
526+
public <T> DataResult<T> findAndUpdateWithPipeline(Bson query, Bson projection, Bson sort,
527+
List<? extends Bson> pipeline, Class<T> clazz, QueryOptions options) {
528+
return privateFindAndUpdateWithPipeline(null, query, projection, sort, pipeline, options, clazz, null);
529+
}
530+
531+
/**
532+
* Finds a document matching the given query, applies an aggregation pipeline to update it, and returns the updated document.
533+
*
534+
* @param clientSession the client session to use for the operation, or null if not using sessions
535+
* @param query the filter to select the document to update
536+
* @param projection the fields to return in the resulting document
537+
* @param sort the sort criteria to apply before finding the document
538+
* @param pipeline the aggregation pipeline to apply for the update
539+
* @param clazz the class type to convert the result to; if null or Document.class, returns a Document
540+
* @param options additional options for the query and update operation
541+
* @param <T> the type of the returned result
542+
* @return a DataResult containing the updated document, or an empty result if no document matched
543+
*/
544+
public <T> DataResult<T> findAndUpdateWithPipeline(ClientSession clientSession, Bson query, Bson projection, Bson sort,
545+
List<? extends Bson> pipeline, Class<T> clazz, QueryOptions options) {
546+
return privateFindAndUpdateWithPipeline(clientSession, query, projection, sort, pipeline, options, clazz, null);
547+
}
548+
549+
private <T> DataResult<T> privateFindAndUpdateWithPipeline(ClientSession clientSession, Bson query, Bson projection, Bson sort,
550+
List<? extends Bson> pipeline, QueryOptions options, Class<T> clazz,
551+
ComplexTypeConverter<T, Document> converter) {
552+
long start = startQuery();
553+
Document result = mongoDBNativeQuery.findAndUpdateWithPipeline(clientSession, query, projection, sort, pipeline, options);
554+
if (clazz != null && !clazz.equals(Document.class)) {
555+
try {
556+
return endQuery(Collections.singletonList(objectMapper.readValue(objectWriter.writeValueAsString(result), clazz)), start);
557+
} catch (IOException e) {
558+
logger.error("Error deserializing result: " + e.getMessage(), e);
559+
}
560+
}
561+
return endQuery(Collections.singletonList(result), start);
562+
}
563+
447564
public DataResult remove(Bson query, QueryOptions options) {
448565
return remove(null, query, options);
449566
}

commons-datastore/commons-datastore-mongodb/src/main/java/org/opencb/commons/datastore/mongodb/MongoDBNativeQuery.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,99 @@ public BulkWriteResult update(ClientSession clientSession, List<? extends Bson>
453453
}
454454
}
455455

456+
/**
457+
* Update documents using an aggregation pipeline.
458+
*
459+
* @param query The filter to select the documents to update.
460+
* @param pipeline The aggregation pipeline specifying the update operations.
461+
* @param upsert Whether to insert a new document if no documents match the query.
462+
* @param multi Whether to update multiple documents or just one.
463+
* @return The result of the update operation.
464+
*/
465+
public UpdateResult updateWithPipeline(Bson query, List<? extends Bson> pipeline, boolean upsert, boolean multi) {
466+
return updateWithPipeline(null, query, pipeline, upsert, multi);
467+
}
468+
469+
/**
470+
* Update documents using an aggregation pipeline.
471+
*
472+
* @param clientSession Session in which the operation will be performed. Can be null.
473+
* @param query The filter to select the documents to update.
474+
* @param pipeline The aggregation pipeline specifying the update operations.
475+
* @param upsert Whether to insert a new document if no documents match the query.
476+
* @param multi Whether to update multiple documents or just one.
477+
* @return The result of the update operation.
478+
*/
479+
public UpdateResult updateWithPipeline(ClientSession clientSession, Bson query, List<? extends Bson> pipeline,
480+
boolean upsert, boolean multi) {
481+
UpdateOptions updateOptions = new UpdateOptions().upsert(upsert);
482+
if (multi) {
483+
if (clientSession != null) {
484+
return dbCollection.updateMany(clientSession, query, pipeline, updateOptions);
485+
} else {
486+
return dbCollection.updateMany(query, pipeline, updateOptions);
487+
}
488+
} else {
489+
if (clientSession != null) {
490+
return dbCollection.updateOne(clientSession, query, pipeline, updateOptions);
491+
} else {
492+
return dbCollection.updateOne(query, pipeline, updateOptions);
493+
}
494+
}
495+
}
496+
497+
/**
498+
* Finds and updates a single document using an aggregation pipeline.
499+
*
500+
* @param query The filter to select the document to update.
501+
* @param projection The fields to return in the resulting document.
502+
* @param sort The sort criteria to apply before updating.
503+
* @param pipeline The aggregation pipeline specifying the update operations.
504+
* @param options Additional options such as upsert and returnNew.
505+
* @return The updated document, or null if no document matched the query.
506+
*/
507+
public Document findAndUpdateWithPipeline(Bson query, Bson projection, Bson sort,
508+
List<? extends Bson> pipeline, QueryOptions options) {
509+
return findAndUpdateWithPipeline(null, query, projection, sort, pipeline, options);
510+
}
511+
512+
/**
513+
* Finds and updates a single document using an aggregation pipeline.
514+
*
515+
* @param clientSession Session in which the operation will be performed. Can be null.
516+
* @param query The filter to select the document to update.
517+
* @param projection The fields to return in the resulting document.
518+
* @param sort The sort criteria to apply before updating.
519+
* @param pipeline The aggregation pipeline specifying the update operations.
520+
* @param options Additional options such as upsert and returnNew.
521+
* @return The updated document, or null if no document matched the query.
522+
*/
523+
public Document findAndUpdateWithPipeline(ClientSession clientSession, Bson query, Bson projection, Bson sort,
524+
List<? extends Bson> pipeline, QueryOptions options) {
525+
boolean upsert = false;
526+
boolean returnNew = false;
527+
528+
if (options != null) {
529+
if (projection == null) {
530+
projection = getProjection(projection, options);
531+
}
532+
upsert = options.getBoolean("upsert", false);
533+
returnNew = options.getBoolean("returnNew", false);
534+
}
535+
536+
FindOneAndUpdateOptions findOneAndUpdateOptions = new FindOneAndUpdateOptions()
537+
.sort(sort)
538+
.projection(projection)
539+
.upsert(upsert)
540+
.returnDocument(returnNew ? ReturnDocument.AFTER : ReturnDocument.BEFORE);
541+
542+
if (clientSession != null) {
543+
return dbCollection.findOneAndUpdate(clientSession, query, pipeline, findOneAndUpdateOptions);
544+
} else {
545+
return dbCollection.findOneAndUpdate(query, pipeline, findOneAndUpdateOptions);
546+
}
547+
}
548+
456549
private IndexOutOfBoundsException wrongQueryUpdateSize(List<? extends Bson> queries, List<? extends Bson> updates) {
457550
return new IndexOutOfBoundsException("QueryList.size=" + queries.size()
458551
+ " and UpdatesList.size=" + updates.size() + " must be the same size.");

0 commit comments

Comments
 (0)