diff --git a/test/asynchronous/helpers.py b/test/asynchronous/helpers.py index 7758f281e1..a35c71b107 100644 --- a/test/asynchronous/helpers.py +++ b/test/asynchronous/helpers.py @@ -381,14 +381,14 @@ def disable(self): class ConcurrentRunner(PARENT): - def __init__(self, name, *args, **kwargs): + def __init__(self, **kwargs): if _IS_SYNC: - super().__init__(*args, **kwargs) - self.name = name + super().__init__(**kwargs) + self.name = kwargs.get("name", "ConcurrentRunner") self.stopped = False self.task = None - if "target" in kwargs: - self.target = kwargs["target"] + self.target = kwargs.get("target", None) + self.args = kwargs.get("args", []) if not _IS_SYNC: @@ -403,6 +403,7 @@ def is_alive(self): return not self.stopped async def run(self): - if self.target: - await self.target() - self.stopped = True + try: + await self.target(*self.args) + finally: + self.stopped = True diff --git a/test/asynchronous/test_examples.py b/test/asynchronous/test_examples.py new file mode 100644 index 0000000000..7fea9d41af --- /dev/null +++ b/test/asynchronous/test_examples.py @@ -0,0 +1,1461 @@ +# Copyright 2017 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""MongoDB documentation examples in Python.""" +from __future__ import annotations + +import asyncio +import datetime +import functools +import sys +import threading +import time +from test.asynchronous.helpers import ConcurrentRunner + +sys.path[0:0] = [""] + +from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest +from test.utils import async_wait_until + +import pymongo +from pymongo.asynchronous.helpers import anext +from pymongo.errors import ConnectionFailure, OperationFailure +from pymongo.read_concern import ReadConcern +from pymongo.read_preferences import ReadPreference +from pymongo.server_api import ServerApi +from pymongo.write_concern import WriteConcern + +_IS_SYNC = False + + +class TestSampleShellCommands(AsyncIntegrationTest): + async def asyncSetUp(self): + await super().asyncSetUp() + await self.db.inventory.drop() + + async def asyncTearDown(self): + # Run after every test. + await self.db.inventory.drop() + await self.client.drop_database("pymongo_test") + + async def test_first_three_examples(self): + db = self.db + + # Start Example 1 + await db.inventory.insert_one( + { + "item": "canvas", + "qty": 100, + "tags": ["cotton"], + "size": {"h": 28, "w": 35.5, "uom": "cm"}, + } + ) + # End Example 1 + + self.assertEqual(await db.inventory.count_documents({}), 1) + + # Start Example 2 + cursor = db.inventory.find({"item": "canvas"}) + # End Example 2 + + self.assertEqual(len(await cursor.to_list()), 1) + + # Start Example 3 + await db.inventory.insert_many( + [ + { + "item": "journal", + "qty": 25, + "tags": ["blank", "red"], + "size": {"h": 14, "w": 21, "uom": "cm"}, + }, + { + "item": "mat", + "qty": 85, + "tags": ["gray"], + "size": {"h": 27.9, "w": 35.5, "uom": "cm"}, + }, + { + "item": "mousepad", + "qty": 25, + "tags": ["gel", "blue"], + "size": {"h": 19, "w": 22.85, "uom": "cm"}, + }, + ] + ) + # End Example 3 + + self.assertEqual(await db.inventory.count_documents({}), 4) + + async def test_query_top_level_fields(self): + db = self.db + + # Start Example 6 + await db.inventory.insert_many( + [ + { + "item": "journal", + "qty": 25, + "size": {"h": 14, "w": 21, "uom": "cm"}, + "status": "A", + }, + { + "item": "notebook", + "qty": 50, + "size": {"h": 8.5, "w": 11, "uom": "in"}, + "status": "A", + }, + { + "item": "paper", + "qty": 100, + "size": {"h": 8.5, "w": 11, "uom": "in"}, + "status": "D", + }, + { + "item": "planner", + "qty": 75, + "size": {"h": 22.85, "w": 30, "uom": "cm"}, + "status": "D", + }, + { + "item": "postcard", + "qty": 45, + "size": {"h": 10, "w": 15.25, "uom": "cm"}, + "status": "A", + }, + ] + ) + # End Example 6 + + self.assertEqual(await db.inventory.count_documents({}), 5) + + # Start Example 7 + cursor = db.inventory.find({}) + # End Example 7 + + self.assertEqual(len(await cursor.to_list()), 5) + + # Start Example 9 + cursor = db.inventory.find({"status": "D"}) + # End Example 9 + + self.assertEqual(len(await cursor.to_list()), 2) + + # Start Example 10 + cursor = db.inventory.find({"status": {"$in": ["A", "D"]}}) + # End Example 10 + + self.assertEqual(len(await cursor.to_list()), 5) + + # Start Example 11 + cursor = db.inventory.find({"status": "A", "qty": {"$lt": 30}}) + # End Example 11 + + self.assertEqual(len(await cursor.to_list()), 1) + + # Start Example 12 + cursor = db.inventory.find({"$or": [{"status": "A"}, {"qty": {"$lt": 30}}]}) + # End Example 12 + + self.assertEqual(len(await cursor.to_list()), 3) + + # Start Example 13 + cursor = db.inventory.find( + {"status": "A", "$or": [{"qty": {"$lt": 30}}, {"item": {"$regex": "^p"}}]} + ) + # End Example 13 + + self.assertEqual(len(await cursor.to_list()), 2) + + async def test_query_embedded_documents(self): + db = self.db + + # Start Example 14 + # Subdocument key order matters in a few of these examples so we have + # to use bson.son.SON instead of a Python dict. + from bson.son import SON + + await db.inventory.insert_many( + [ + { + "item": "journal", + "qty": 25, + "size": SON([("h", 14), ("w", 21), ("uom", "cm")]), + "status": "A", + }, + { + "item": "notebook", + "qty": 50, + "size": SON([("h", 8.5), ("w", 11), ("uom", "in")]), + "status": "A", + }, + { + "item": "paper", + "qty": 100, + "size": SON([("h", 8.5), ("w", 11), ("uom", "in")]), + "status": "D", + }, + { + "item": "planner", + "qty": 75, + "size": SON([("h", 22.85), ("w", 30), ("uom", "cm")]), + "status": "D", + }, + { + "item": "postcard", + "qty": 45, + "size": SON([("h", 10), ("w", 15.25), ("uom", "cm")]), + "status": "A", + }, + ] + ) + # End Example 14 + + # Start Example 15 + cursor = db.inventory.find({"size": SON([("h", 14), ("w", 21), ("uom", "cm")])}) + # End Example 15 + + self.assertEqual(len(await cursor.to_list()), 1) + + # Start Example 16 + cursor = db.inventory.find({"size": SON([("w", 21), ("h", 14), ("uom", "cm")])}) + # End Example 16 + + self.assertEqual(len(await cursor.to_list()), 0) + + # Start Example 17 + cursor = db.inventory.find({"size.uom": "in"}) + # End Example 17 + + self.assertEqual(len(await cursor.to_list()), 2) + + # Start Example 18 + cursor = db.inventory.find({"size.h": {"$lt": 15}}) + # End Example 18 + + self.assertEqual(len(await cursor.to_list()), 4) + + # Start Example 19 + cursor = db.inventory.find({"size.h": {"$lt": 15}, "size.uom": "in", "status": "D"}) + # End Example 19 + + self.assertEqual(len(await cursor.to_list()), 1) + + async def test_query_arrays(self): + db = self.db + + # Start Example 20 + await db.inventory.insert_many( + [ + {"item": "journal", "qty": 25, "tags": ["blank", "red"], "dim_cm": [14, 21]}, + {"item": "notebook", "qty": 50, "tags": ["red", "blank"], "dim_cm": [14, 21]}, + { + "item": "paper", + "qty": 100, + "tags": ["red", "blank", "plain"], + "dim_cm": [14, 21], + }, + {"item": "planner", "qty": 75, "tags": ["blank", "red"], "dim_cm": [22.85, 30]}, + {"item": "postcard", "qty": 45, "tags": ["blue"], "dim_cm": [10, 15.25]}, + ] + ) + # End Example 20 + + # Start Example 21 + cursor = db.inventory.find({"tags": ["red", "blank"]}) + # End Example 21 + + self.assertEqual(len(await cursor.to_list()), 1) + + # Start Example 22 + cursor = db.inventory.find({"tags": {"$all": ["red", "blank"]}}) + # End Example 22 + + self.assertEqual(len(await cursor.to_list()), 4) + + # Start Example 23 + cursor = db.inventory.find({"tags": "red"}) + # End Example 23 + + self.assertEqual(len(await cursor.to_list()), 4) + + # Start Example 24 + cursor = db.inventory.find({"dim_cm": {"$gt": 25}}) + # End Example 24 + + self.assertEqual(len(await cursor.to_list()), 1) + + # Start Example 25 + cursor = db.inventory.find({"dim_cm": {"$gt": 15, "$lt": 20}}) + # End Example 25 + + self.assertEqual(len(await cursor.to_list()), 4) + + # Start Example 26 + cursor = db.inventory.find({"dim_cm": {"$elemMatch": {"$gt": 22, "$lt": 30}}}) + # End Example 26 + + self.assertEqual(len(await cursor.to_list()), 1) + + # Start Example 27 + cursor = db.inventory.find({"dim_cm.1": {"$gt": 25}}) + # End Example 27 + + self.assertEqual(len(await cursor.to_list()), 1) + + # Start Example 28 + cursor = db.inventory.find({"tags": {"$size": 3}}) + # End Example 28 + + self.assertEqual(len(await cursor.to_list()), 1) + + async def test_query_array_of_documents(self): + db = self.db + + # Start Example 29 + # Subdocument key order matters in a few of these examples so we have + # to use bson.son.SON instead of a Python dict. + from bson.son import SON + + await db.inventory.insert_many( + [ + { + "item": "journal", + "instock": [ + SON([("warehouse", "A"), ("qty", 5)]), + SON([("warehouse", "C"), ("qty", 15)]), + ], + }, + {"item": "notebook", "instock": [SON([("warehouse", "C"), ("qty", 5)])]}, + { + "item": "paper", + "instock": [ + SON([("warehouse", "A"), ("qty", 60)]), + SON([("warehouse", "B"), ("qty", 15)]), + ], + }, + { + "item": "planner", + "instock": [ + SON([("warehouse", "A"), ("qty", 40)]), + SON([("warehouse", "B"), ("qty", 5)]), + ], + }, + { + "item": "postcard", + "instock": [ + SON([("warehouse", "B"), ("qty", 15)]), + SON([("warehouse", "C"), ("qty", 35)]), + ], + }, + ] + ) + # End Example 29 + + # Start Example 30 + cursor = db.inventory.find({"instock": SON([("warehouse", "A"), ("qty", 5)])}) + # End Example 30 + + self.assertEqual(len(await cursor.to_list()), 1) + + # Start Example 31 + cursor = db.inventory.find({"instock": SON([("qty", 5), ("warehouse", "A")])}) + # End Example 31 + + self.assertEqual(len(await cursor.to_list()), 0) + + # Start Example 32 + cursor = db.inventory.find({"instock.0.qty": {"$lte": 20}}) + # End Example 32 + + self.assertEqual(len(await cursor.to_list()), 3) + + # Start Example 33 + cursor = db.inventory.find({"instock.qty": {"$lte": 20}}) + # End Example 33 + + self.assertEqual(len(await cursor.to_list()), 5) + + # Start Example 34 + cursor = db.inventory.find({"instock": {"$elemMatch": {"qty": 5, "warehouse": "A"}}}) + # End Example 34 + + self.assertEqual(len(await cursor.to_list()), 1) + + # Start Example 35 + cursor = db.inventory.find({"instock": {"$elemMatch": {"qty": {"$gt": 10, "$lte": 20}}}}) + # End Example 35 + + self.assertEqual(len(await cursor.to_list()), 3) + + # Start Example 36 + cursor = db.inventory.find({"instock.qty": {"$gt": 10, "$lte": 20}}) + # End Example 36 + + self.assertEqual(len(await cursor.to_list()), 4) + + # Start Example 37 + cursor = db.inventory.find({"instock.qty": 5, "instock.warehouse": "A"}) + # End Example 37 + + self.assertEqual(len(await cursor.to_list()), 2) + + async def test_query_null(self): + db = self.db + + # Start Example 38 + await db.inventory.insert_many([{"_id": 1, "item": None}, {"_id": 2}]) + # End Example 38 + + # Start Example 39 + cursor = db.inventory.find({"item": None}) + # End Example 39 + + self.assertEqual(len(await cursor.to_list()), 2) + + # Start Example 40 + cursor = db.inventory.find({"item": {"$type": 10}}) + # End Example 40 + + self.assertEqual(len(await cursor.to_list()), 1) + + # Start Example 41 + cursor = db.inventory.find({"item": {"$exists": False}}) + # End Example 41 + + self.assertEqual(len(await cursor.to_list()), 1) + + async def test_projection(self): + db = self.db + + # Start Example 42 + await db.inventory.insert_many( + [ + { + "item": "journal", + "status": "A", + "size": {"h": 14, "w": 21, "uom": "cm"}, + "instock": [{"warehouse": "A", "qty": 5}], + }, + { + "item": "notebook", + "status": "A", + "size": {"h": 8.5, "w": 11, "uom": "in"}, + "instock": [{"warehouse": "C", "qty": 5}], + }, + { + "item": "paper", + "status": "D", + "size": {"h": 8.5, "w": 11, "uom": "in"}, + "instock": [{"warehouse": "A", "qty": 60}], + }, + { + "item": "planner", + "status": "D", + "size": {"h": 22.85, "w": 30, "uom": "cm"}, + "instock": [{"warehouse": "A", "qty": 40}], + }, + { + "item": "postcard", + "status": "A", + "size": {"h": 10, "w": 15.25, "uom": "cm"}, + "instock": [{"warehouse": "B", "qty": 15}, {"warehouse": "C", "qty": 35}], + }, + ] + ) + # End Example 42 + + # Start Example 43 + cursor = db.inventory.find({"status": "A"}) + # End Example 43 + + self.assertEqual(len(await cursor.to_list()), 3) + + # Start Example 44 + cursor = db.inventory.find({"status": "A"}, {"item": 1, "status": 1}) + # End Example 44 + + async for doc in cursor: + self.assertTrue("_id" in doc) + self.assertTrue("item" in doc) + self.assertTrue("status" in doc) + self.assertFalse("size" in doc) + self.assertFalse("instock" in doc) + + # Start Example 45 + cursor = db.inventory.find({"status": "A"}, {"item": 1, "status": 1, "_id": 0}) + # End Example 45 + + async for doc in cursor: + self.assertFalse("_id" in doc) + self.assertTrue("item" in doc) + self.assertTrue("status" in doc) + self.assertFalse("size" in doc) + self.assertFalse("instock" in doc) + + # Start Example 46 + cursor = db.inventory.find({"status": "A"}, {"status": 0, "instock": 0}) + # End Example 46 + + async for doc in cursor: + self.assertTrue("_id" in doc) + self.assertTrue("item" in doc) + self.assertFalse("status" in doc) + self.assertTrue("size" in doc) + self.assertFalse("instock" in doc) + + # Start Example 47 + cursor = db.inventory.find({"status": "A"}, {"item": 1, "status": 1, "size.uom": 1}) + # End Example 47 + + async for doc in cursor: + self.assertTrue("_id" in doc) + self.assertTrue("item" in doc) + self.assertTrue("status" in doc) + self.assertTrue("size" in doc) + self.assertFalse("instock" in doc) + size = doc["size"] + self.assertTrue("uom" in size) + self.assertFalse("h" in size) + self.assertFalse("w" in size) + + # Start Example 48 + cursor = db.inventory.find({"status": "A"}, {"size.uom": 0}) + # End Example 48 + + async for doc in cursor: + self.assertTrue("_id" in doc) + self.assertTrue("item" in doc) + self.assertTrue("status" in doc) + self.assertTrue("size" in doc) + self.assertTrue("instock" in doc) + size = doc["size"] + self.assertFalse("uom" in size) + self.assertTrue("h" in size) + self.assertTrue("w" in size) + + # Start Example 49 + cursor = db.inventory.find({"status": "A"}, {"item": 1, "status": 1, "instock.qty": 1}) + # End Example 49 + + async for doc in cursor: + self.assertTrue("_id" in doc) + self.assertTrue("item" in doc) + self.assertTrue("status" in doc) + self.assertFalse("size" in doc) + self.assertTrue("instock" in doc) + for subdoc in doc["instock"]: + self.assertFalse("warehouse" in subdoc) + self.assertTrue("qty" in subdoc) + + # Start Example 50 + cursor = db.inventory.find( + {"status": "A"}, {"item": 1, "status": 1, "instock": {"$slice": -1}} + ) + # End Example 50 + + async for doc in cursor: + self.assertTrue("_id" in doc) + self.assertTrue("item" in doc) + self.assertTrue("status" in doc) + self.assertFalse("size" in doc) + self.assertTrue("instock" in doc) + self.assertEqual(len(doc["instock"]), 1) + + async def test_update_and_replace(self): + db = self.db + + # Start Example 51 + await db.inventory.insert_many( + [ + { + "item": "canvas", + "qty": 100, + "size": {"h": 28, "w": 35.5, "uom": "cm"}, + "status": "A", + }, + { + "item": "journal", + "qty": 25, + "size": {"h": 14, "w": 21, "uom": "cm"}, + "status": "A", + }, + { + "item": "mat", + "qty": 85, + "size": {"h": 27.9, "w": 35.5, "uom": "cm"}, + "status": "A", + }, + { + "item": "mousepad", + "qty": 25, + "size": {"h": 19, "w": 22.85, "uom": "cm"}, + "status": "P", + }, + { + "item": "notebook", + "qty": 50, + "size": {"h": 8.5, "w": 11, "uom": "in"}, + "status": "P", + }, + { + "item": "paper", + "qty": 100, + "size": {"h": 8.5, "w": 11, "uom": "in"}, + "status": "D", + }, + { + "item": "planner", + "qty": 75, + "size": {"h": 22.85, "w": 30, "uom": "cm"}, + "status": "D", + }, + { + "item": "postcard", + "qty": 45, + "size": {"h": 10, "w": 15.25, "uom": "cm"}, + "status": "A", + }, + { + "item": "sketchbook", + "qty": 80, + "size": {"h": 14, "w": 21, "uom": "cm"}, + "status": "A", + }, + { + "item": "sketch pad", + "qty": 95, + "size": {"h": 22.85, "w": 30.5, "uom": "cm"}, + "status": "A", + }, + ] + ) + # End Example 51 + + # Start Example 52 + await db.inventory.update_one( + {"item": "paper"}, + {"$set": {"size.uom": "cm", "status": "P"}, "$currentDate": {"lastModified": True}}, + ) + # End Example 52 + + async for doc in db.inventory.find({"item": "paper"}): + self.assertEqual(doc["size"]["uom"], "cm") + self.assertEqual(doc["status"], "P") + self.assertTrue("lastModified" in doc) + + # Start Example 53 + await db.inventory.update_many( + {"qty": {"$lt": 50}}, + {"$set": {"size.uom": "in", "status": "P"}, "$currentDate": {"lastModified": True}}, + ) + # End Example 53 + + async for doc in db.inventory.find({"qty": {"$lt": 50}}): + self.assertEqual(doc["size"]["uom"], "in") + self.assertEqual(doc["status"], "P") + self.assertTrue("lastModified" in doc) + + # Start Example 54 + await db.inventory.replace_one( + {"item": "paper"}, + { + "item": "paper", + "instock": [{"warehouse": "A", "qty": 60}, {"warehouse": "B", "qty": 40}], + }, + ) + # End Example 54 + + async for doc in db.inventory.find({"item": "paper"}, {"_id": 0}): + self.assertEqual(len(doc.keys()), 2) + self.assertTrue("item" in doc) + self.assertTrue("instock" in doc) + self.assertEqual(len(doc["instock"]), 2) + + async def test_delete(self): + db = self.db + + # Start Example 55 + await db.inventory.insert_many( + [ + { + "item": "journal", + "qty": 25, + "size": {"h": 14, "w": 21, "uom": "cm"}, + "status": "A", + }, + { + "item": "notebook", + "qty": 50, + "size": {"h": 8.5, "w": 11, "uom": "in"}, + "status": "P", + }, + { + "item": "paper", + "qty": 100, + "size": {"h": 8.5, "w": 11, "uom": "in"}, + "status": "D", + }, + { + "item": "planner", + "qty": 75, + "size": {"h": 22.85, "w": 30, "uom": "cm"}, + "status": "D", + }, + { + "item": "postcard", + "qty": 45, + "size": {"h": 10, "w": 15.25, "uom": "cm"}, + "status": "A", + }, + ] + ) + # End Example 55 + + self.assertEqual(await db.inventory.count_documents({}), 5) + + # Start Example 57 + await db.inventory.delete_many({"status": "A"}) + # End Example 57 + + self.assertEqual(await db.inventory.count_documents({}), 3) + + # Start Example 58 + await db.inventory.delete_one({"status": "D"}) + # End Example 58 + + self.assertEqual(await db.inventory.count_documents({}), 2) + + # Start Example 56 + await db.inventory.delete_many({}) + # End Example 56 + + self.assertEqual(await db.inventory.count_documents({}), 0) + + @async_client_context.require_change_streams + async def test_change_streams(self): + db = self.db + done = False + + async def insert_docs(): + nonlocal done + while not done: + await db.inventory.insert_one({"username": "alice"}) + await db.inventory.delete_one({"username": "alice"}) + await asyncio.sleep(0.005) + + t = ConcurrentRunner(target=insert_docs) + await t.start() + + try: + # 1. The database for reactive, real-time applications + # Start Changestream Example 1 + cursor = await db.inventory.watch() + await anext(cursor) + # End Changestream Example 1 + await cursor.close() + + # Start Changestream Example 2 + cursor = await db.inventory.watch(full_document="updateLookup") + await anext(cursor) + # End Changestream Example 2 + await cursor.close() + + # Start Changestream Example 3 + resume_token = cursor.resume_token + cursor = await db.inventory.watch(resume_after=resume_token) + await anext(cursor) + # End Changestream Example 3 + await cursor.close() + + # Start Changestream Example 4 + pipeline = [ + {"$match": {"fullDocument.username": "alice"}}, + {"$addFields": {"newField": "this is an added field!"}}, + ] + cursor = await db.inventory.watch(pipeline=pipeline) + await anext(cursor) + # End Changestream Example 4 + await cursor.close() + finally: + done = True + await t.join() + + async def test_aggregate_examples(self): + db = self.db + + # Start Aggregation Example 1 + await db.sales.aggregate([{"$match": {"items.fruit": "banana"}}, {"$sort": {"date": 1}}]) + # End Aggregation Example 1 + + # Start Aggregation Example 2 + await db.sales.aggregate( + [ + {"$unwind": "$items"}, + {"$match": {"items.fruit": "banana"}}, + { + "$group": { + "_id": {"day": {"$dayOfWeek": "$date"}}, + "count": {"$sum": "$items.quantity"}, + } + }, + {"$project": {"dayOfWeek": "$_id.day", "numberSold": "$count", "_id": 0}}, + {"$sort": {"numberSold": 1}}, + ] + ) + # End Aggregation Example 2 + + # Start Aggregation Example 3 + await db.sales.aggregate( + [ + {"$unwind": "$items"}, + { + "$group": { + "_id": {"day": {"$dayOfWeek": "$date"}}, + "items_sold": {"$sum": "$items.quantity"}, + "revenue": {"$sum": {"$multiply": ["$items.quantity", "$items.price"]}}, + } + }, + { + "$project": { + "day": "$_id.day", + "revenue": 1, + "items_sold": 1, + "discount": { + "$cond": {"if": {"$lte": ["$revenue", 250]}, "then": 25, "else": 0} + }, + } + }, + ] + ) + # End Aggregation Example 3 + + # Start Aggregation Example 4 + await db.air_alliances.aggregate( + [ + { + "$lookup": { + "from": "air_airlines", + "let": {"constituents": "$airlines"}, + "pipeline": [{"$match": {"$expr": {"$in": ["$name", "$$constituents"]}}}], + "as": "airlines", + } + }, + { + "$project": { + "_id": 0, + "name": 1, + "airlines": { + "$filter": { + "input": "$airlines", + "as": "airline", + "cond": {"$eq": ["$$airline.country", "Canada"]}, + } + }, + } + }, + ] + ) + # End Aggregation Example 4 + + @async_client_context.require_version_min(4, 4) + async def test_aggregate_projection_example(self): + db = self.db + + # Start Aggregation Projection Example 1 + db.inventory.find( + {}, + { + "_id": 0, + "item": 1, + "status": { + "$switch": { + "branches": [ + {"case": {"$eq": ["$status", "A"]}, "then": "Available"}, + {"case": {"$eq": ["$status", "D"]}, "then": "Discontinued"}, + ], + "default": "No status found", + } + }, + "area": { + "$concat": [ + {"$toString": {"$multiply": ["$size.h", "$size.w"]}}, + " ", + "$size.uom", + ] + }, + "reportNumber": {"$literal": 1}, + }, + ) + + # End Aggregation Projection Example 1 + + async def test_commands(self): + db = self.db + await db.restaurants.insert_one({}) + + # Start runCommand Example 1 + await db.command("buildInfo") + # End runCommand Example 1 + + # Start runCommand Example 2 + await db.command("count", "restaurants") + # End runCommand Example 2 + + async def test_index_management(self): + db = self.db + + # Start Index Example 1 + await db.records.create_index("score") + # End Index Example 1 + + # Start Index Example 1 + await db.restaurants.create_index( + [("cuisine", pymongo.ASCENDING), ("name", pymongo.ASCENDING)], + partialFilterExpression={"rating": {"$gt": 5}}, + ) + # End Index Example 1 + + @async_client_context.require_replica_set + async def test_misc(self): + # Marketing examples + client = self.client + self.addAsyncCleanup(client.drop_database, "test") + self.addAsyncCleanup(client.drop_database, "my_database") + + # 2. Tunable consistency controls + collection = client.my_database.my_collection + async with client.start_session() as session: + await collection.insert_one({"_id": 1}, session=session) + await collection.update_one({"_id": 1}, {"$set": {"a": 1}}, session=session) + async for _doc in collection.find({}, session=session): + pass + + # 3. Exploiting the power of arrays + collection = client.test.array_updates_test + await collection.update_one( + {"_id": 1}, {"$set": {"a.$[i].b": 2}}, array_filters=[{"i.b": 0}] + ) + + +class TestTransactionExamples(AsyncIntegrationTest): + @async_client_context.require_transactions + async def test_transactions(self): + # Transaction examples + client = self.client + self.addAsyncCleanup(client.drop_database, "hr") + self.addAsyncCleanup(client.drop_database, "reporting") + + employees = client.hr.employees + events = client.reporting.events + await employees.insert_one({"employee": 3, "status": "Active"}) + await events.insert_one({"employee": 3, "status": {"new": "Active", "old": None}}) + + # Start Transactions Intro Example 1 + + async def update_employee_info(session): + employees_coll = session.client.hr.employees + events_coll = session.client.reporting.events + + async with await session.start_transaction( + read_concern=ReadConcern("snapshot"), write_concern=WriteConcern(w="majority") + ): + await employees_coll.update_one( + {"employee": 3}, {"$set": {"status": "Inactive"}}, session=session + ) + await events_coll.insert_one( + {"employee": 3, "status": {"new": "Inactive", "old": "Active"}}, session=session + ) + + while True: + try: + # Commit uses write concern set at transaction start. + await session.commit_transaction() + print("Transaction committed.") + break + except (ConnectionFailure, OperationFailure) as exc: + # Can retry commit + if exc.has_error_label("UnknownTransactionCommitResult"): + print("UnknownTransactionCommitResult, retrying commit operation ...") + continue + else: + print("Error during commit ...") + raise + + # End Transactions Intro Example 1 + + async with client.start_session() as session: + await update_employee_info(session) + + employee = await employees.find_one({"employee": 3}) + assert employee is not None + self.assertIsNotNone(employee) + self.assertEqual(employee["status"], "Inactive") + + # Start Transactions Retry Example 1 + async def run_transaction_with_retry(txn_func, session): + while True: + try: + await txn_func(session) # performs transaction + break + except (ConnectionFailure, OperationFailure) as exc: + print("Transaction aborted. Caught exception during transaction.") + + # If transient error, retry the whole transaction + if exc.has_error_label("TransientTransactionError"): + print("TransientTransactionError, retrying transaction ...") + continue + else: + raise + + # End Transactions Retry Example 1 + + async with client.start_session() as session: + await run_transaction_with_retry(update_employee_info, session) + + employee = await employees.find_one({"employee": 3}) + assert employee is not None + self.assertIsNotNone(employee) + self.assertEqual(employee["status"], "Inactive") + + # Start Transactions Retry Example 2 + async def commit_with_retry(session): + while True: + try: + # Commit uses write concern set at transaction start. + await session.commit_transaction() + print("Transaction committed.") + break + except (ConnectionFailure, OperationFailure) as exc: + # Can retry commit + if exc.has_error_label("UnknownTransactionCommitResult"): + print("UnknownTransactionCommitResult, retrying commit operation ...") + continue + else: + print("Error during commit ...") + raise + + # End Transactions Retry Example 2 + + # Test commit_with_retry from the previous examples + async def _insert_employee_retry_commit(session): + async with await session.start_transaction(): + await employees.insert_one({"employee": 4, "status": "Active"}, session=session) + await events.insert_one( + {"employee": 4, "status": {"new": "Active", "old": None}}, session=session + ) + + await commit_with_retry(session) + + async with client.start_session() as session: + await run_transaction_with_retry(_insert_employee_retry_commit, session) + + employee = await employees.find_one({"employee": 4}) + assert employee is not None + self.assertIsNotNone(employee) + self.assertEqual(employee["status"], "Active") + + # Start Transactions Retry Example 3 + + async def run_transaction_with_retry(txn_func, session): + while True: + try: + await txn_func(session) # performs transaction + break + except (ConnectionFailure, OperationFailure) as exc: + # If transient error, retry the whole transaction + if exc.has_error_label("TransientTransactionError"): + print("TransientTransactionError, retrying transaction ...") + continue + else: + raise + + async def commit_with_retry(session): + while True: + try: + # Commit uses write concern set at transaction start. + await session.commit_transaction() + print("Transaction committed.") + break + except (ConnectionFailure, OperationFailure) as exc: + # Can retry commit + if exc.has_error_label("UnknownTransactionCommitResult"): + print("UnknownTransactionCommitResult, retrying commit operation ...") + continue + else: + print("Error during commit ...") + raise + + # Updates two collections in a transactions + + async def update_employee_info(session): + employees_coll = session.client.hr.employees + events_coll = session.client.reporting.events + + async with await session.start_transaction( + read_concern=ReadConcern("snapshot"), + write_concern=WriteConcern(w="majority"), + read_preference=ReadPreference.PRIMARY, + ): + await employees_coll.update_one( + {"employee": 3}, {"$set": {"status": "Inactive"}}, session=session + ) + await events_coll.insert_one( + {"employee": 3, "status": {"new": "Inactive", "old": "Active"}}, session=session + ) + + await commit_with_retry(session) + + # Start a session. + async with client.start_session() as session: + try: + await run_transaction_with_retry(update_employee_info, session) + except Exception: + # Do something with error. + raise + + # End Transactions Retry Example 3 + + employee = await employees.find_one({"employee": 3}) + assert employee is not None + self.assertIsNotNone(employee) + self.assertEqual(employee["status"], "Inactive") + + async def MongoClient(_): + return await self.async_rs_client() + + uriString = None + + # Start Transactions withTxn API Example 1 + + # For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. + # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' + # For a sharded cluster, connect to the mongos instances; e.g. + # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' + + client = await MongoClient(uriString) + wc_majority = WriteConcern("majority", wtimeout=1000) + + # Prereq: Create collections. + await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0}) + await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0}) + + # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. + async def callback(session): + collection_one = session.client.mydb1.foo + collection_two = session.client.mydb2.bar + + # Important:: You must pass the session to the operations. + await collection_one.insert_one({"abc": 1}, session=session) + await collection_two.insert_one({"xyz": 999}, session=session) + + # Step 2: Start a client session. + async with client.start_session() as session: + # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). + await session.with_transaction( + callback, + read_concern=ReadConcern("local"), + write_concern=wc_majority, + read_preference=ReadPreference.PRIMARY, + ) + + # End Transactions withTxn API Example 1 + + +class TestCausalConsistencyExamples(AsyncIntegrationTest): + @async_client_context.require_secondaries_count(1) + @async_client_context.require_no_mmap + async def test_causal_consistency(self): + # Causal consistency examples + client = self.client + self.addAsyncCleanup(client.drop_database, "test") + await client.test.drop_collection("items") + await client.test.items.insert_one( + {"sku": "111", "name": "Peanuts", "start": datetime.datetime.today()} + ) + + # Start Causal Consistency Example 1 + async with client.start_session(causal_consistency=True) as s1: + current_date = datetime.datetime.today() + items = client.get_database( + "test", + read_concern=ReadConcern("majority"), + write_concern=WriteConcern("majority", wtimeout=1000), + ).items + await items.update_one( + {"sku": "111", "end": None}, {"$set": {"end": current_date}}, session=s1 + ) + await items.insert_one( + {"sku": "nuts-111", "name": "Pecans", "start": current_date}, session=s1 + ) + # End Causal Consistency Example 1 + + assert s1.cluster_time is not None + assert s1.operation_time is not None + + # Start Causal Consistency Example 2 + async with client.start_session(causal_consistency=True) as s2: + s2.advance_cluster_time(s1.cluster_time) + s2.advance_operation_time(s1.operation_time) + + items = client.get_database( + "test", + read_preference=ReadPreference.SECONDARY, + read_concern=ReadConcern("majority"), + write_concern=WriteConcern("majority", wtimeout=1000), + ).items + async for item in items.find({"end": None}, session=s2): + print(item) + # End Causal Consistency Example 2 + + +class TestVersionedApiExamples(AsyncIntegrationTest): + @async_client_context.require_version_min(4, 7) + async def test_versioned_api(self): + # Versioned API examples + async def MongoClient(_, server_api): + return await self.async_rs_client(server_api=server_api, connect=False) + + uri = None + + # Start Versioned API Example 1 + from pymongo.server_api import ServerApi + + await MongoClient(uri, server_api=ServerApi("1")) + # End Versioned API Example 1 + + # Start Versioned API Example 2 + await MongoClient(uri, server_api=ServerApi("1", strict=True)) + # End Versioned API Example 2 + + # Start Versioned API Example 3 + await MongoClient(uri, server_api=ServerApi("1", strict=False)) + # End Versioned API Example 3 + + # Start Versioned API Example 4 + await MongoClient(uri, server_api=ServerApi("1", deprecation_errors=True)) + # End Versioned API Example 4 + + @unittest.skip("PYTHON-3167 count has been added to API version 1") + @async_client_context.require_version_min(4, 7) + async def test_versioned_api_migration(self): + # SERVER-58785 + if await async_client_context.is_topology_type( + ["sharded"] + ) and not async_client_context.version.at_least(5, 0, 2): + self.skipTest("This test needs MongoDB 5.0.2 or newer") + + client = await self.async_rs_client(server_api=ServerApi("1", strict=True)) + await client.db.sales.drop() + + # Start Versioned API Example 5 + def strptime(s): + return datetime.datetime.strptime(s, "%Y-%m-%dT%H:%M:%SZ") + + await client.db.sales.insert_many( + [ + { + "_id": 1, + "item": "abc", + "price": 10, + "quantity": 2, + "date": strptime("2021-01-01T08:00:00Z"), + }, + { + "_id": 2, + "item": "jkl", + "price": 20, + "quantity": 1, + "date": strptime("2021-02-03T09:00:00Z"), + }, + { + "_id": 3, + "item": "xyz", + "price": 5, + "quantity": 5, + "date": strptime("2021-02-03T09:05:00Z"), + }, + { + "_id": 4, + "item": "abc", + "price": 10, + "quantity": 10, + "date": strptime("2021-02-15T08:00:00Z"), + }, + { + "_id": 5, + "item": "xyz", + "price": 5, + "quantity": 10, + "date": strptime("2021-02-15T09:05:00Z"), + }, + { + "_id": 6, + "item": "xyz", + "price": 5, + "quantity": 5, + "date": strptime("2021-02-15T12:05:10Z"), + }, + { + "_id": 7, + "item": "xyz", + "price": 5, + "quantity": 10, + "date": strptime("2021-02-15T14:12:12Z"), + }, + { + "_id": 8, + "item": "abc", + "price": 10, + "quantity": 5, + "date": strptime("2021-03-16T20:20:13Z"), + }, + ] + ) + # End Versioned API Example 5 + + with self.assertRaisesRegex( + OperationFailure, + "Provided apiStrict:true, but the command count is not in API Version 1", + ): + await client.db.command("count", "sales", query={}) + # Start Versioned API Example 6 + # pymongo.errors.OperationFailure: Provided apiStrict:true, but the command count is not in API Version 1, full error: {'ok': 0.0, 'errmsg': 'Provided apiStrict:true, but the command count is not in API Version 1', 'code': 323, 'codeName': 'APIStrictError'} + # End Versioned API Example 6 + + # Start Versioned API Example 7 + await client.db.sales.count_documents({}) + # End Versioned API Example 7 + + # Start Versioned API Example 8 + # 8 + # End Versioned API Example 8 + + +class TestSnapshotQueryExamples(AsyncIntegrationTest): + @async_client_context.require_version_min(5, 0) + async def test_snapshot_query(self): + client = self.client + + if not await async_client_context.is_topology_type(["replicaset", "sharded"]): + self.skipTest("Must be a sharded or replicaset") + + self.addAsyncCleanup(client.drop_database, "pets") + db = client.pets + await db.drop_collection("cats") + await db.drop_collection("dogs") + await db.cats.insert_one( + {"name": "Whiskers", "color": "white", "age": 10, "adoptable": True} + ) + await db.dogs.insert_one( + {"name": "Pebbles", "color": "Brown", "age": 10, "adoptable": True} + ) + + async def predicate_one(): + return await self.check_for_snapshot(db.cats) + + async def predicate_two(): + return await self.check_for_snapshot(db.dogs) + + await async_wait_until(predicate_two, "success") + await async_wait_until(predicate_one, "success") + + # Start Snapshot Query Example 1 + + db = client.pets + async with client.start_session(snapshot=True) as s: + adoptablePetsCount = ( + await ( + await db.cats.aggregate( + [{"$match": {"adoptable": True}}, {"$count": "adoptableCatsCount"}], + session=s, + ) + ).next() + )["adoptableCatsCount"] + + adoptablePetsCount += ( + await ( + await db.dogs.aggregate( + [{"$match": {"adoptable": True}}, {"$count": "adoptableDogsCount"}], + session=s, + ) + ).next() + )["adoptableDogsCount"] + + print(adoptablePetsCount) + + # End Snapshot Query Example 1 + db = client.retail + self.addAsyncCleanup(client.drop_database, "retail") + await db.drop_collection("sales") + + saleDate = datetime.datetime.now() + await db.sales.insert_one({"shoeType": "boot", "price": 30, "saleDate": saleDate}) + + async def predicate_three(): + return await self.check_for_snapshot(db.sales) + + await async_wait_until(predicate_three, "success") + + # Start Snapshot Query Example 2 + db = client.retail + async with client.start_session(snapshot=True) as s: + _ = ( + await ( + await db.sales.aggregate( + [ + { + "$match": { + "$expr": { + "$gt": [ + "$saleDate", + { + "$dateSubtract": { + "startDate": "$$NOW", + "unit": "day", + "amount": 1, + } + }, + ] + } + } + }, + {"$count": "totalDailySales"}, + ], + session=s, + ) + ).next() + )["totalDailySales"] + + # End Snapshot Query Example 2 + + async def check_for_snapshot(self, collection): + """Wait for snapshot reads to become available to prevent this error: + [246:SnapshotUnavailable]: Unable to read from a snapshot due to pending collection catalog changes; please retry the operation. Snapshot timestamp is Timestamp(1646666892, 4). Collection minimum is Timestamp(1646666892, 5) (on localhost:27017, modern retry, attempt 1) + From https://github.com/mongodb/mongo-ruby-driver/commit/7c4117b58e3d12e237f7536f7521e18fc15f79ac + """ + async with self.client.start_session(snapshot=True) as s: + try: + if await collection.find_one(session=s): + return True + return False + except OperationFailure as e: + # Retry them as the server demands... + if e.code == 246: # SnapshotUnavailable + return False + raise + + +if __name__ == "__main__": + unittest.main() diff --git a/test/asynchronous/utils_spec_runner.py b/test/asynchronous/utils_spec_runner.py index d103374313..d433f1a7e6 100644 --- a/test/asynchronous/utils_spec_runner.py +++ b/test/asynchronous/utils_spec_runner.py @@ -58,7 +58,7 @@ class SpecRunnerTask(ConcurrentRunner): def __init__(self, name): - super().__init__(name) + super().__init__(name=name) self.exc = None self.daemon = True self.cond = _async_create_condition(_async_create_lock()) diff --git a/test/helpers.py b/test/helpers.py index bd9e23bba4..705843efcd 100644 --- a/test/helpers.py +++ b/test/helpers.py @@ -381,14 +381,14 @@ def disable(self): class ConcurrentRunner(PARENT): - def __init__(self, name, *args, **kwargs): + def __init__(self, **kwargs): if _IS_SYNC: - super().__init__(*args, **kwargs) - self.name = name + super().__init__(**kwargs) + self.name = kwargs.get("name", "ConcurrentRunner") self.stopped = False self.task = None - if "target" in kwargs: - self.target = kwargs["target"] + self.target = kwargs.get("target", None) + self.args = kwargs.get("args", []) if not _IS_SYNC: @@ -403,6 +403,7 @@ def is_alive(self): return not self.stopped def run(self): - if self.target: - self.target() - self.stopped = True + try: + self.target(*self.args) + finally: + self.stopped = True diff --git a/test/test_examples.py b/test/test_examples.py index 7f98226e7a..9bcc276248 100644 --- a/test/test_examples.py +++ b/test/test_examples.py @@ -15,9 +15,13 @@ """MongoDB documentation examples in Python.""" from __future__ import annotations +import asyncio import datetime +import functools import sys import threading +import time +from test.helpers import ConcurrentRunner sys.path[0:0] = [""] @@ -29,8 +33,11 @@ from pymongo.read_concern import ReadConcern from pymongo.read_preferences import ReadPreference from pymongo.server_api import ServerApi +from pymongo.synchronous.helpers import next from pymongo.write_concern import WriteConcern +_IS_SYNC = True + class TestSampleShellCommands(IntegrationTest): def setUp(self): @@ -62,7 +69,7 @@ def test_first_three_examples(self): cursor = db.inventory.find({"item": "canvas"}) # End Example 2 - self.assertEqual(len(list(cursor)), 1) + self.assertEqual(len(cursor.to_list()), 1) # Start Example 3 db.inventory.insert_many( @@ -137,31 +144,31 @@ def test_query_top_level_fields(self): cursor = db.inventory.find({}) # End Example 7 - self.assertEqual(len(list(cursor)), 5) + self.assertEqual(len(cursor.to_list()), 5) # Start Example 9 cursor = db.inventory.find({"status": "D"}) # End Example 9 - self.assertEqual(len(list(cursor)), 2) + self.assertEqual(len(cursor.to_list()), 2) # Start Example 10 cursor = db.inventory.find({"status": {"$in": ["A", "D"]}}) # End Example 10 - self.assertEqual(len(list(cursor)), 5) + self.assertEqual(len(cursor.to_list()), 5) # Start Example 11 cursor = db.inventory.find({"status": "A", "qty": {"$lt": 30}}) # End Example 11 - self.assertEqual(len(list(cursor)), 1) + self.assertEqual(len(cursor.to_list()), 1) # Start Example 12 cursor = db.inventory.find({"$or": [{"status": "A"}, {"qty": {"$lt": 30}}]}) # End Example 12 - self.assertEqual(len(list(cursor)), 3) + self.assertEqual(len(cursor.to_list()), 3) # Start Example 13 cursor = db.inventory.find( @@ -169,7 +176,7 @@ def test_query_top_level_fields(self): ) # End Example 13 - self.assertEqual(len(list(cursor)), 2) + self.assertEqual(len(cursor.to_list()), 2) def test_query_embedded_documents(self): db = self.db @@ -219,31 +226,31 @@ def test_query_embedded_documents(self): cursor = db.inventory.find({"size": SON([("h", 14), ("w", 21), ("uom", "cm")])}) # End Example 15 - self.assertEqual(len(list(cursor)), 1) + self.assertEqual(len(cursor.to_list()), 1) # Start Example 16 cursor = db.inventory.find({"size": SON([("w", 21), ("h", 14), ("uom", "cm")])}) # End Example 16 - self.assertEqual(len(list(cursor)), 0) + self.assertEqual(len(cursor.to_list()), 0) # Start Example 17 cursor = db.inventory.find({"size.uom": "in"}) # End Example 17 - self.assertEqual(len(list(cursor)), 2) + self.assertEqual(len(cursor.to_list()), 2) # Start Example 18 cursor = db.inventory.find({"size.h": {"$lt": 15}}) # End Example 18 - self.assertEqual(len(list(cursor)), 4) + self.assertEqual(len(cursor.to_list()), 4) # Start Example 19 cursor = db.inventory.find({"size.h": {"$lt": 15}, "size.uom": "in", "status": "D"}) # End Example 19 - self.assertEqual(len(list(cursor)), 1) + self.assertEqual(len(cursor.to_list()), 1) def test_query_arrays(self): db = self.db @@ -269,49 +276,49 @@ def test_query_arrays(self): cursor = db.inventory.find({"tags": ["red", "blank"]}) # End Example 21 - self.assertEqual(len(list(cursor)), 1) + self.assertEqual(len(cursor.to_list()), 1) # Start Example 22 cursor = db.inventory.find({"tags": {"$all": ["red", "blank"]}}) # End Example 22 - self.assertEqual(len(list(cursor)), 4) + self.assertEqual(len(cursor.to_list()), 4) # Start Example 23 cursor = db.inventory.find({"tags": "red"}) # End Example 23 - self.assertEqual(len(list(cursor)), 4) + self.assertEqual(len(cursor.to_list()), 4) # Start Example 24 cursor = db.inventory.find({"dim_cm": {"$gt": 25}}) # End Example 24 - self.assertEqual(len(list(cursor)), 1) + self.assertEqual(len(cursor.to_list()), 1) # Start Example 25 cursor = db.inventory.find({"dim_cm": {"$gt": 15, "$lt": 20}}) # End Example 25 - self.assertEqual(len(list(cursor)), 4) + self.assertEqual(len(cursor.to_list()), 4) # Start Example 26 cursor = db.inventory.find({"dim_cm": {"$elemMatch": {"$gt": 22, "$lt": 30}}}) # End Example 26 - self.assertEqual(len(list(cursor)), 1) + self.assertEqual(len(cursor.to_list()), 1) # Start Example 27 cursor = db.inventory.find({"dim_cm.1": {"$gt": 25}}) # End Example 27 - self.assertEqual(len(list(cursor)), 1) + self.assertEqual(len(cursor.to_list()), 1) # Start Example 28 cursor = db.inventory.find({"tags": {"$size": 3}}) # End Example 28 - self.assertEqual(len(list(cursor)), 1) + self.assertEqual(len(cursor.to_list()), 1) def test_query_array_of_documents(self): db = self.db @@ -360,49 +367,49 @@ def test_query_array_of_documents(self): cursor = db.inventory.find({"instock": SON([("warehouse", "A"), ("qty", 5)])}) # End Example 30 - self.assertEqual(len(list(cursor)), 1) + self.assertEqual(len(cursor.to_list()), 1) # Start Example 31 cursor = db.inventory.find({"instock": SON([("qty", 5), ("warehouse", "A")])}) # End Example 31 - self.assertEqual(len(list(cursor)), 0) + self.assertEqual(len(cursor.to_list()), 0) # Start Example 32 cursor = db.inventory.find({"instock.0.qty": {"$lte": 20}}) # End Example 32 - self.assertEqual(len(list(cursor)), 3) + self.assertEqual(len(cursor.to_list()), 3) # Start Example 33 cursor = db.inventory.find({"instock.qty": {"$lte": 20}}) # End Example 33 - self.assertEqual(len(list(cursor)), 5) + self.assertEqual(len(cursor.to_list()), 5) # Start Example 34 cursor = db.inventory.find({"instock": {"$elemMatch": {"qty": 5, "warehouse": "A"}}}) # End Example 34 - self.assertEqual(len(list(cursor)), 1) + self.assertEqual(len(cursor.to_list()), 1) # Start Example 35 cursor = db.inventory.find({"instock": {"$elemMatch": {"qty": {"$gt": 10, "$lte": 20}}}}) # End Example 35 - self.assertEqual(len(list(cursor)), 3) + self.assertEqual(len(cursor.to_list()), 3) # Start Example 36 cursor = db.inventory.find({"instock.qty": {"$gt": 10, "$lte": 20}}) # End Example 36 - self.assertEqual(len(list(cursor)), 4) + self.assertEqual(len(cursor.to_list()), 4) # Start Example 37 cursor = db.inventory.find({"instock.qty": 5, "instock.warehouse": "A"}) # End Example 37 - self.assertEqual(len(list(cursor)), 2) + self.assertEqual(len(cursor.to_list()), 2) def test_query_null(self): db = self.db @@ -415,19 +422,19 @@ def test_query_null(self): cursor = db.inventory.find({"item": None}) # End Example 39 - self.assertEqual(len(list(cursor)), 2) + self.assertEqual(len(cursor.to_list()), 2) # Start Example 40 cursor = db.inventory.find({"item": {"$type": 10}}) # End Example 40 - self.assertEqual(len(list(cursor)), 1) + self.assertEqual(len(cursor.to_list()), 1) # Start Example 41 cursor = db.inventory.find({"item": {"$exists": False}}) # End Example 41 - self.assertEqual(len(list(cursor)), 1) + self.assertEqual(len(cursor.to_list()), 1) def test_projection(self): db = self.db @@ -473,7 +480,7 @@ def test_projection(self): cursor = db.inventory.find({"status": "A"}) # End Example 43 - self.assertEqual(len(list(cursor)), 3) + self.assertEqual(len(cursor.to_list()), 3) # Start Example 44 cursor = db.inventory.find({"status": "A"}, {"item": 1, "status": 1}) @@ -746,8 +753,9 @@ def insert_docs(): while not done: db.inventory.insert_one({"username": "alice"}) db.inventory.delete_one({"username": "alice"}) + time.sleep(0.005) - t = threading.Thread(target=insert_docs) + t = ConcurrentRunner(target=insert_docs) t.start() try: @@ -1347,20 +1355,37 @@ def test_snapshot_query(self): db.drop_collection("dogs") db.cats.insert_one({"name": "Whiskers", "color": "white", "age": 10, "adoptable": True}) db.dogs.insert_one({"name": "Pebbles", "color": "Brown", "age": 10, "adoptable": True}) - wait_until(lambda: self.check_for_snapshot(db.cats), "success") - wait_until(lambda: self.check_for_snapshot(db.dogs), "success") + + def predicate_one(): + return self.check_for_snapshot(db.cats) + + def predicate_two(): + return self.check_for_snapshot(db.dogs) + + wait_until(predicate_two, "success") + wait_until(predicate_one, "success") # Start Snapshot Query Example 1 db = client.pets with client.start_session(snapshot=True) as s: - adoptablePetsCount = db.cats.aggregate( - [{"$match": {"adoptable": True}}, {"$count": "adoptableCatsCount"}], session=s - ).next()["adoptableCatsCount"] - - adoptablePetsCount += db.dogs.aggregate( - [{"$match": {"adoptable": True}}, {"$count": "adoptableDogsCount"}], session=s - ).next()["adoptableDogsCount"] + adoptablePetsCount = ( + ( + db.cats.aggregate( + [{"$match": {"adoptable": True}}, {"$count": "adoptableCatsCount"}], + session=s, + ) + ).next() + )["adoptableCatsCount"] + + adoptablePetsCount += ( + ( + db.dogs.aggregate( + [{"$match": {"adoptable": True}}, {"$count": "adoptableDogsCount"}], + session=s, + ) + ).next() + )["adoptableDogsCount"] print(adoptablePetsCount) @@ -1371,33 +1396,41 @@ def test_snapshot_query(self): saleDate = datetime.datetime.now() db.sales.insert_one({"shoeType": "boot", "price": 30, "saleDate": saleDate}) - wait_until(lambda: self.check_for_snapshot(db.sales), "success") + + def predicate_three(): + return self.check_for_snapshot(db.sales) + + wait_until(predicate_three, "success") # Start Snapshot Query Example 2 db = client.retail with client.start_session(snapshot=True) as s: - db.sales.aggregate( - [ - { - "$match": { - "$expr": { - "$gt": [ - "$saleDate", - { - "$dateSubtract": { - "startDate": "$$NOW", - "unit": "day", - "amount": 1, - } - }, - ] - } - } - }, - {"$count": "totalDailySales"}, - ], - session=s, - ).next()["totalDailySales"] + _ = ( + ( + db.sales.aggregate( + [ + { + "$match": { + "$expr": { + "$gt": [ + "$saleDate", + { + "$dateSubtract": { + "startDate": "$$NOW", + "unit": "day", + "amount": 1, + } + }, + ] + } + } + }, + {"$count": "totalDailySales"}, + ], + session=s, + ) + ).next() + )["totalDailySales"] # End Snapshot Query Example 2 diff --git a/test/utils_spec_runner.py b/test/utils_spec_runner.py index 6a62112afb..98949431d0 100644 --- a/test/utils_spec_runner.py +++ b/test/utils_spec_runner.py @@ -58,7 +58,7 @@ class SpecRunnerThread(ConcurrentRunner): def __init__(self, name): - super().__init__(name) + super().__init__(name=name) self.exc = None self.daemon = True self.cond = _create_condition(_create_lock()) diff --git a/tools/synchro.py b/tools/synchro.py index 9f59448bb7..8f28791367 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -210,6 +210,7 @@ def async_only_test(f: str) -> bool: "test_data_lake.py", "test_dns.py", "test_encryption.py", + "test_examples.py", "test_heartbeat_monitoring.py", "test_index_management.py", "test_grid_file.py",