Skip to content

Commit 26d9edc

Browse files
committed
enh(MongoDB): Improve stability of the OpMsgCursor.
1 parent 041e7fe commit 26d9edc

File tree

5 files changed

+59
-30
lines changed

5 files changed

+59
-30
lines changed

MongoDB/include/Poco/MongoDB/OpMsgCursor.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ namespace MongoDB {
2828

2929
class MongoDB_API OpMsgCursor: public Document
3030
/// OpMsgCursor is an helper class for querying multiple documents using OpMsgMessage.
31+
/// Once all of the data is read with the cursor (see isActive()) it can't be reused.
3132
{
3233
public:
3334
OpMsgCursor(const std::string& dbname, const std::string& collectionName);
@@ -49,6 +50,9 @@ class MongoDB_API OpMsgCursor: public Document
4950

5051
Int64 cursorID() const;
5152

53+
bool isActive() const;
54+
/// Is there more data to acquire with this cursor?
55+
5256
OpMsgMessage& next(Connection& connection);
5357
/// Tries to get the next documents. As long as response message has a
5458
/// cursor ID next can be called to retrieve the next bunch of documents.

MongoDB/include/Poco/MongoDB/OpMsgMessage.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ class MongoDB_API OpMsgMessage: public Message
4242
static const std::string CMD_UPDATE;
4343
static const std::string CMD_FIND;
4444
static const std::string CMD_FIND_AND_MODIFY;
45-
static const std::string CMD_GET_MORE;
4645

4746
// Aggregation
4847
static const std::string CMD_AGGREGATE;
@@ -98,9 +97,6 @@ class MongoDB_API OpMsgMessage: public Message
9897
void setCommandName(const std::string& command);
9998
/// Sets the command name and clears the command document
10099

101-
void setCursor(Poco::Int64 cursorID, Poco::Int32 batchSize = -1);
102-
/// Sets the command "getMore" for the cursor id with batch size (if it is not negative).
103-
104100
const std::string& commandName() const;
105101
/// Current command name.
106102

@@ -139,6 +135,14 @@ class MongoDB_API OpMsgMessage: public Message
139135

140136
private:
141137

138+
// Only used by the cursor
139+
static const std::string CMD_GET_MORE;
140+
141+
friend class OpMsgCursor;
142+
143+
void setCursor(Poco::Int64 cursorID, Poco::Int32 batchSize = -1);
144+
/// Sets the command "getMore" for the cursor id with batch size (if it is not negative).
145+
142146
std::string _databaseName;
143147
std::string _collectionName;
144148
UInt32 _flags { MSG_FLAGS_DEFAULT };

MongoDB/src/OpMsgCursor.cpp

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@ namespace Poco {
4343
namespace MongoDB {
4444

4545

46-
static const std::string keyCursor {"cursor"};
47-
static const std::string keyFirstBatch {"firstBatch"};
48-
static const std::string keyNextBatch {"nextBatch"};
46+
static const std::string keyCursor {"cursor"s};
47+
static const std::string keyCursors {"cursors"s};
48+
static const std::string keyBatchSize {"batchSize"s};
49+
static const std::string keyId {"id"s};
50+
static const std::string keyCursorsKilled {"cursorsKilled"s};
4951

5052
static Poco::Int64 cursorIdFromResponse(const MongoDB::Document& doc);
5153

@@ -95,23 +97,43 @@ Int32 OpMsgCursor::batchSize() const
9597
}
9698

9799

100+
bool OpMsgCursor::isActive() const
101+
{
102+
const auto& cmd {_query.commandName()};
103+
return ( _cursorID > 0 || (!cmd.empty() && cmd != OpMsgMessage::CMD_GET_MORE) );
104+
}
105+
106+
98107
OpMsgMessage& OpMsgCursor::next(Connection& connection)
99108
{
100109
if (_cursorID == 0)
101110
{
102111
_response.clear();
103112

113+
if (!isActive())
114+
{
115+
// Cursor reached the end of data. Nothing to provide.
116+
return _response;
117+
}
118+
104119
if (_emptyFirstBatch || _batchSize > 0)
105120
{
106121
Int32 bsize = _emptyFirstBatch ? 0 : _batchSize;
122+
auto& body { _query.body() };
107123
if (_query.commandName() == OpMsgMessage::CMD_FIND)
108124
{
109-
_query.body().add("batchSize", bsize);
125+
// Prevent duplicated fields if next() fails due to communication
126+
// issues and is the used again.
127+
body.remove(keyBatchSize);
128+
body.add(keyBatchSize, bsize);
110129
}
111130
else if (_query.commandName() == OpMsgMessage::CMD_AGGREGATE)
112131
{
113-
auto& cursorDoc = _query.body().addNewDocument("cursor");
114-
cursorDoc.add("batchSize", bsize);
132+
// Prevent duplicated fields if next() fails due to communication
133+
// issues and is the used again.
134+
body.remove(keyCursor);
135+
auto& cursorDoc = body.addNewDocument(keyCursor);
136+
cursorDoc.add(keyBatchSize, bsize);
115137
}
116138
}
117139

@@ -155,11 +177,11 @@ void OpMsgCursor::kill(Connection& connection)
155177

156178
MongoDB::Array::Ptr cursors = new MongoDB::Array();
157179
cursors->add<Poco::Int64>(_cursorID);
158-
_query.body().add("cursors", cursors);
180+
_query.body().add(keyCursors, cursors);
159181

160182
connection.sendRequest(_query, _response);
161183

162-
const auto killed = _response.body().get<MongoDB::Array::Ptr>("cursorsKilled", nullptr);
184+
const auto killed = _response.body().get<MongoDB::Array::Ptr>(keyCursorsKilled, nullptr);
163185
if (!killed || killed->size() != 1 || killed->get<Poco::Int64>(0, -1) != _cursorID)
164186
{
165187
throw Poco::ProtocolException("Cursor not killed as expected: " + std::to_string(_cursorID));
@@ -178,7 +200,7 @@ Poco::Int64 cursorIdFromResponse(const MongoDB::Document& doc)
178200
auto cursorDoc = doc.get<Document::Ptr>(keyCursor, nullptr);
179201
if(cursorDoc)
180202
{
181-
id = cursorDoc->get<Poco::Int64>("id", 0);
203+
id = cursorDoc->get<Poco::Int64>(keyId, 0);
182204
}
183205
return id;
184206
}

MongoDB/src/OpMsgMessage.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,13 @@ static const std::string& commandIdentifier(const std::string& command);
6060
/// Commands have different names for the payload that is sent in a separate section
6161

6262

63+
static const std::string keyDb { "$db"s };
64+
static const std::string keyCollection { "collection"s };
6365
static const std::string keyCursor { "cursor"s };
66+
static const std::string keyOk { "ok"s };
6467
static const std::string keyFirstBatch { "firstBatch"s };
6568
static const std::string keyNextBatch { "nextBatch"s };
69+
static const std::string keyBatchSize { "batchSize"s };
6670

6771
constexpr static Poco::UInt8 PAYLOAD_TYPE_0 { 0 };
6872
constexpr static Poco::UInt8 PAYLOAD_TYPE_1 { 1 };
@@ -114,7 +118,7 @@ void OpMsgMessage::setCommandName(const std::string& command)
114118
{
115119
_body.add(_commandName, _collectionName);
116120
}
117-
_body.add("$db"s, _databaseName);
121+
_body.add(keyDb, _databaseName);
118122
}
119123

120124

@@ -125,11 +129,11 @@ void OpMsgMessage::setCursor(Poco::Int64 cursorID, Poco::Int32 batchSize)
125129

126130
// IMPORTANT: Command name must be first
127131
_body.add(_commandName, cursorID);
128-
_body.add("$db"s, _databaseName);
129-
_body.add("collection"s, _collectionName);
132+
_body.add(keyDb, _databaseName);
133+
_body.add(keyCollection, _collectionName);
130134
if (batchSize > 0)
131135
{
132-
_body.add("batchSize"s, batchSize);
136+
_body.add(keyBatchSize, batchSize);
133137
}
134138
}
135139

@@ -207,9 +211,9 @@ const Document::Vector& OpMsgMessage::documents() const
207211
bool OpMsgMessage::responseOk() const
208212
{
209213
Poco::Int64 ok {false};
210-
if (_body.exists("ok"s))
214+
if (_body.exists(keyOk))
211215
{
212-
ok = _body.getInteger("ok"s);
216+
ok = _body.getInteger(keyOk);
213217
}
214218
return (ok != 0);
215219
}

MongoDB/testsuite/src/MongoDBTestOpMsg.cpp

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -281,14 +281,13 @@ void MongoDBTest::testOpCmdCursor()
281281

282282
int n = 0;
283283
auto cresponse = cursor.next(*_mongo);
284-
while(true)
284+
while(cursor.isActive())
285285
{
286286
n += static_cast<int>(cresponse.documents().size());
287-
if ( cursor.cursorID() == 0 )
288-
break;
289287
cresponse = cursor.next(*_mongo);
290288
}
291289
assertEquals (10000, n);
290+
assertFalse(cursor.isActive());
292291

293292
request->setCommandName(OpMsgMessage::CMD_DROP);
294293
_mongo->sendRequest(*request, response);
@@ -325,26 +324,24 @@ void MongoDBTest::testOpCmdCursorAggregate()
325324

326325
int n = 0;
327326
auto cresponse = cursor->next(*_mongo);
328-
while(true)
327+
while(cursor->isActive())
329328
{
330329
int batchDocSize = cresponse.documents().size();
331330
if (cursor->cursorID() != 0)
332331
assertEquals (1000, batchDocSize);
333332

334333
n += batchDocSize;
335-
if ( cursor->cursorID() == 0 )
336-
break;
337334
cresponse = cursor->next(*_mongo);
338335
}
339336
assertEquals (10000, n);
337+
assertFalse(cursor->isActive());
340338

341339
request->setCommandName(OpMsgMessage::CMD_DROP);
342340
_mongo->sendRequest(*request, response);
343341
assertTrue(response.responseOk());
344342
}
345343

346344

347-
348345
void MongoDBTest::testOpCmdKillCursor()
349346
{
350347
Database db("team");
@@ -371,13 +368,11 @@ void MongoDBTest::testOpCmdKillCursor()
371368

372369
int n = 0;
373370
auto cresponse = cursor.next(*_mongo);
374-
while(true)
371+
while(cursor.isActive())
375372
{
376373
n += static_cast<int>(cresponse.documents().size());
377-
if ( cursor.cursorID() == 0 )
378-
break;
379-
380374
cursor.kill(*_mongo);
375+
assertFalse(cursor.isActive());
381376
cresponse = cursor.next(*_mongo);
382377
}
383378
assertEquals (1000, n);

0 commit comments

Comments
 (0)