diff --git a/lib/agent.js b/lib/agent.js index a07650c8a..699f2e8d7 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -5,6 +5,7 @@ var ACTIONS = require('./message-actions').ACTIONS; var types = require('./types'); var util = require('./util'); var protocol = require('./protocol'); +const Transaction = require('./transaction/transaction'); var ERROR_CODE = ShareDBError.CODES; @@ -70,6 +71,8 @@ function Agent(backend, stream) { this._firstReceivedMessage = null; this._handshakeReceived = false; + this._transactions = Object.create(null); + // Send the legacy message to initialize old clients with the random agent Id this.send(this._initMessage(ACTIONS.initLegacy)); } @@ -461,16 +464,7 @@ Agent.prototype._handleMessage = function(request, callback) { case ACTIONS.unsubscribe: return this._unsubscribe(request.c, request.d, callback); case ACTIONS.op: - // Normalize the properties submitted - var op = createClientOp(request, this._src()); - if (op.seq >= util.MAX_SAFE_INTEGER) { - return callback(new ShareDBError( - ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW, - 'Connection seq has exceeded the max safe integer, maybe from being open for too long' - )); - } - if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message')); - return this._submit(request.c, request.d, op, callback); + return this._submit(request, callback); case ACTIONS.snapshotFetch: return this._fetchSnapshot(request.c, request.d, request.v, callback); case ACTIONS.snapshotFetchByTimestamp: @@ -494,6 +488,8 @@ Agent.prototype._handleMessage = function(request, callback) { return this._requestPresence(request.ch, callback); case ACTIONS.pingPong: return this._pingPong(callback); + case ACTIONS.transactionCommit: + return this._commitTransaction(request, callback); default: callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message')); } @@ -761,9 +757,24 @@ Agent.prototype._unsubscribeBulk = function(collection, ids, callback) { util.nextTick(callback); }; -Agent.prototype._submit = function(collection, id, op, callback) { +Agent.prototype._submit = function(request, callback) { + // Normalize the properties submitted + var op = createClientOp(request, this._src()); + if (op.seq >= util.MAX_SAFE_INTEGER) { + return callback(new ShareDBError( + ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW, + 'Connection seq has exceeded the max safe integer, maybe from being open for too long' + )); + } + if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message')); + + var collection = request.c; + var id = request.d; + var options = {}; + if (request.t) options.transaction = request.t; + var agent = this; - this.backend.submit(this, collection, id, op, null, function(err, ops, request) { + this.backend.submit(this, collection, id, op, options, function(err, ops, request) { // Message to acknowledge the op was successfully submitted var ack = {src: op.src, seq: op.seq, v: op.v}; if (request._fixupOps.length) ack[ACTIONS.fixup] = request._fixupOps; @@ -982,6 +993,12 @@ Agent.prototype._setProtocol = function(request) { this.protocol.minor = request.protocolMinor; }; +Agent.prototype._commitTransaction = function(request, callback) { + var transaction = new Transaction(this, request.id, request.o); + this._transactions[transaction.id] = transaction; + transaction.submit(callback); +}; + function createClientOp(request, clientId) { // src can be provided if it is not the same as the current agent, // such as a resubmission after a reconnect, but it usually isn't needed diff --git a/lib/client/connection.js b/lib/client/connection.js index da89dfef5..b66f58eca 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -12,6 +12,7 @@ var util = require('../util'); var logger = require('../logger'); var DocPresenceEmitter = require('./presence/doc-presence-emitter'); var protocol = require('../protocol'); +var Transaction = require('./transaction'); var ERROR_CODE = ShareDBError.CODES; @@ -64,6 +65,8 @@ function Connection(socket) { // A unique message number for presence this._presenceSeq = 1; + this._transactions = Object.create(null); + // Equals agent.src on the server this.id = null; @@ -258,6 +261,8 @@ Connection.prototype.handleMessage = function(message) { return this._handlePresenceRequest(err, message); case ACTIONS.pingPong: return this._handlePingPong(err); + case ACTIONS.transactionCommit: + return this._handleTransactionCommit(err, message); default: logger.warn('Ignoring unrecognized message', message); @@ -454,6 +459,29 @@ Connection.prototype.sendUnsubscribe = function(doc) { Connection.prototype.sendOp = function(doc, op) { // Ensure the doc is registered so that it receives the reply message this._addDoc(doc); + var message = this._opMessage(doc, op); + this.send(message); +}; + +Connection.prototype._opMessage = function(doc, op) { + // The src + seq number is a unique ID representing this operation. This tuple + // is used on the server to detect when ops have been sent multiple times and + // on the client to match acknowledgement of an op back to the inflightOp. + // Note that the src could be different from this.connection.id after a + // reconnect, since an op may still be pending after the reconnection and + // this.connection.id will change. In case an op is sent multiple times, we + // also need to be careful not to override the original seq value. + if (op.seq == null) { + if (this.seq >= util.MAX_SAFE_INTEGER) { + return doc.emit('error', new ShareDBError( + ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW, + 'Connection seq has exceeded the max safe integer, maybe from being open for too long' + )); + } + + op.seq = this.seq++; + } + var message = { a: ACTIONS.op, c: doc.collection, @@ -463,14 +491,16 @@ Connection.prototype.sendOp = function(doc, op) { seq: op.seq, x: {} }; + if ('op' in op) message.op = op.op; if (op.create) message.create = op.create; if (op.del) message.del = op.del; + if (doc.submitSource) message.x.source = op.source; - this.send(message); + if (op.transaction) message.t = op.transaction; + return message; }; - /** * Sends a message down the socket */ @@ -782,6 +812,18 @@ Connection.prototype._initialize = function(message) { this._setState('connected'); }; +Connection.prototype.startTransaction = function() { + var transaction = new Transaction(this); + return this._transactions[transaction.id] = transaction; +}; + +Connection.prototype._handleTransactionCommit = function(error, message) { + var transaction = this._transactions[message.id]; + if (!transaction) return; + transaction._handleCommit(error, message); + delete this._transactions[message.id]; +}; + Connection.prototype.getPresence = function(channel) { var connection = this; var presence = util.digOrCreate(this._presences, channel, function() { diff --git a/lib/client/doc.js b/lib/client/doc.js index acb457374..12f8d58cb 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -121,6 +121,8 @@ function Doc(connection, collection, id) { // Internal counter that gets incremented every time doc.data is updated. // Used as a cheap way to check if doc.data has changed. this._dataStateVersion = 0; + + this._transaction = null; } emitter.mixin(Doc); @@ -335,17 +337,20 @@ Doc.prototype._handleOp = function(err, message) { // committing the op to the database, and we should just clear the in-flight // op and call the callbacks. However, let's first catch ourselves up to // the remote, so that we're in a nice consistent state - return this.fetch(this._clearInflightOp.bind(this)); + return this.fetch(this._clearInflight.bind(this)); } - if (this.inflightOp) { + if (this._hasInflight()) { return this._rollback(err); } return this.emit('error', err); } - if (this.inflightOp && - message.src === this.inflightOp.src && - message.seq === this.inflightOp.seq) { + var isInflightAck = this.inflightOp && + message.src === this.inflightOp.src && + message.seq === this.inflightOp.seq; + var isTransactionAck = !!this._transaction && message.src === this.connection.id; + var shouldAck = isInflightAck || isTransactionAck; + if (shouldAck) { // The op has already been applied locally. Just update the version // and pending state appropriately this._opAcknowledged(message); @@ -519,6 +524,10 @@ Doc.prototype.flush = function() { // Ignore if we can't send or we are already sending an op if (!this.connection.canSend || this.inflightOp) return; + // TODO: Transactions should wait for all docs to only have transaction ops left in pendingOps + // Transactions will be flushed as one call separately + if (this._transaction) return; + // Send first pending op unless paused if (!this.paused && this.pendingOps.length) { this._sendOp(); @@ -704,24 +713,6 @@ Doc.prototype._sendOp = function() { op.sentAt = Date.now(); op.retries = (op.retries == null) ? 0 : op.retries + 1; - // The src + seq number is a unique ID representing this operation. This tuple - // is used on the server to detect when ops have been sent multiple times and - // on the client to match acknowledgement of an op back to the inflightOp. - // Note that the src could be different from this.connection.id after a - // reconnect, since an op may still be pending after the reconnection and - // this.connection.id will change. In case an op is sent multiple times, we - // also need to be careful not to override the original seq value. - if (op.seq == null) { - if (this.connection.seq >= util.MAX_SAFE_INTEGER) { - return this.emit('error', new ShareDBError( - ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW, - 'Connection seq has exceeded the max safe integer, maybe from being open for too long' - )); - } - - op.seq = this.connection.seq++; - } - this.connection.sendOp(this, op); // src isn't needed on the first try, since the server session will have the @@ -759,10 +750,14 @@ Doc.prototype._submit = function(op, source, callback) { if (this.type.normalize) op.op = this.type.normalize(op.op); } + var transactionError = this._setTransaction(op.transaction); + if (transactionError) return this._callbackOrEmitError(error); + try { this._pushOp(op, source, callback); this._otApply(op, source); } catch (error) { + // TODO: Transactions should abort and rollback all docs in transaction return this._hardRollback(error); } @@ -865,8 +860,10 @@ Doc.prototype.submitOp = function(component, options, callback) { callback = options; options = null; } + options = options || {}; var op = {op: component}; - var source = options && options.source; + if (options.transaction) op.transaction = this._normalizeTransactionId(options.transaction); + var source = options.source; this._submit(op, source, callback); }; @@ -888,6 +885,7 @@ Doc.prototype.create = function(data, type, options, callback) { callback = options; options = null; } + options = options || {}; if (!type) { type = types.defaultType.uri; } @@ -897,7 +895,8 @@ Doc.prototype.create = function(data, type, options, callback) { return this.emit('error', err); } var op = {create: {type: type, data: data}}; - var source = options && options.source; + if (options.transaction) op.transaction = this._normalizeTransactionId(options.transaction); + var source = options.source; this._submit(op, source, callback); }; @@ -919,11 +918,11 @@ Doc.prototype.del = function(options, callback) { return this.emit('error', err); } var op = {del: true}; - var source = options && options.source; + if (options.transaction) op.transaction = this._normalizeTransactionId(options.transaction); + var source = options.source; this._submit(op, source, callback); }; - // Stops the document from sending any operations to the server. Doc.prototype.pause = function() { this.paused = true; @@ -948,7 +947,7 @@ Doc.prototype.toSnapshot = function() { // This is called when the server acknowledges an operation from the client. Doc.prototype._opAcknowledged = function(message) { - if (this.inflightOp.create) { + if (this.inflightOp && this.inflightOp.create) { this.version = message.v; } else if (message.v !== this.version) { // We should already be at the same version, because the server should @@ -979,7 +978,7 @@ Doc.prototype._opAcknowledged = function(message) { // The op was committed successfully. Increment the version number this.version++; - this._clearInflightOp(); + this._clearInflight(null, message.seq); }; Doc.prototype._rollback = function(err) { @@ -987,46 +986,57 @@ Doc.prototype._rollback = function(err) { // just the inflight op if possible. If not possible to invert, cancel all // pending ops and fetch the latest from the server to get us back into a // working state, then call back - var op = this.inflightOp; - - if (!('op' in op && op.type.invert)) { - return this._hardRollback(err); + var ops = []; + if (this.inflightOp) ops.push(this.inflightOp); + if (this._transaction) { + for (var op of this.pendingOps) { + if (op.transaction !== this._transaction.id) break; + ops.push(op); + } } - try { - op.op = op.type.invert(op.op); - } catch (error) { - // If the op doesn't support `.invert()`, we just reload the doc - // instead of trying to locally revert it. - return this._hardRollback(err); - } + for (var op of ops) { + if (!('op' in op && op.type.invert)) { + return this._hardRollback(err); + } - // Transform the undo operation by any pending ops. - for (var i = 0; i < this.pendingOps.length; i++) { - var transformErr = transformX(this.pendingOps[i], op); - if (transformErr) return this._hardRollback(transformErr); - } + try { + op.op = op.type.invert(op.op); + } catch (error) { + // If the op doesn't support `.invert()`, we just reload the doc + // instead of trying to locally revert it. + return this._hardRollback(err); + } - // ... and apply it locally, reverting the changes. - // - // This operation is applied to look like it comes from a remote source. - // I'm still not 100% sure about this functionality, because its really a - // local op. Basically, the problem is that if the client's op is rejected - // by the server, the editor window should update to reflect the undo. - try { - this._otApply(op, false); - } catch (error) { - return this._hardRollback(error); + if (!this._transaction) { + // Transform the undo operation by any pending ops. + for (var i = 0; i < this.pendingOps.length; i++) { + var transformErr = transformX(this.pendingOps[i], op); + if (transformErr) return this._hardRollback(transformErr); + } + } + + // ... and apply it locally, reverting the changes. + // + // This operation is applied to look like it comes from a remote source. + // I'm still not 100% sure about this functionality, because its really a + // local op. Basically, the problem is that if the client's op is rejected + // by the server, the editor window should update to reflect the undo. + try { + this._otApply(op, false); + } catch (error) { + return this._hardRollback(error); + } } // The server has rejected submission of the current operation. If we get // an "Op submit rejected" error, this was done intentionally // and we should roll back but not return an error to the user. if (err.code === ERROR_CODE.ERR_OP_SUBMIT_REJECTED) { - return this._clearInflightOp(null); + return this._clearInflight(null); } - this._clearInflightOp(err); + this._clearInflight(err); }; Doc.prototype._hardRollback = function(err) { @@ -1041,6 +1051,7 @@ Doc.prototype._hardRollback = function(err) { // Cancel all pending ops and reset if we can't invert this._setType(null); this.version = null; + // TODO: Clear transaction this.inflightOp = null; this.pendingOps = []; @@ -1093,17 +1104,78 @@ Doc.prototype._hardRollback = function(err) { }); }; -Doc.prototype._clearInflightOp = function(err) { - var inflightOp = this.inflightOp; +Doc.prototype._hasInflight = function() { + return !!(this.inflightOp || this._transaction); +}; - this.inflightOp = null; +Doc.prototype._clearInflight = function(err, seq) { + var callbacks = []; + if (this.inflightOp) callbacks = this._clearInflightOp(); + else if (this._transaction) { + if (!seq) { + callbacks = this._clearTransaction(); + } else { + var i = this.pendingOps.findIndex(function(pendingOp) { + return pendingOp.seq === seq; + }); + if (i >= 0) { + const op = this.pendingOps.splice(i, 1)[0]; + callbacks = callbacks.concat(op.callbacks); + } + } + } - var called = util.callEach(inflightOp.callbacks, err); + var called = util.callEach(callbacks, err); - this.flush(); - this._emitNothingPending(); + if (!this._transaction) { + this.flush(); + this._emitNothingPending(); + } if (err && !called) return this.emit('error', err); }; +Doc.prototype._clearInflightOp = function() { + var inflightOp = this.inflightOp; + this.inflightOp = null; + return inflightOp.callbacks; +}; + +Doc.prototype._clearTransaction = function() { + var transactionId = this._transaction.id; + var callbacks = []; + var i = 0; + for (; i < this.pendingOps.length; i++) { + var op = this.pendingOps[i]; + if (op.transaction !== transactionId) break; + callbacks = callbacks.concat(op.callbacks); + } + + this.pendingOps.splice(0, i); + this._transaction = null; + + return callbacks; +}; + +Doc.prototype._setTransaction = function(transactionId) { + if (transactionId == util.dig(this, '_transaction', 'id')) return; + if (this._transaction) { + return new Error('Transaction in progress. Must commit transaction before submitting any other ops'); + } + + var transaction = this.connection._transactions[transactionId]; + if (!transaction) throw new Error('Transaction not found'); + transaction._registerDoc(this); +}; + +Doc.prototype._normalizeTransactionId = function(transaction) { + if (!transaction) return null; + if (typeof transaction === 'string') return transaction; + return transaction.id; +}; + +Doc.prototype._callbackOrEmitError = function(error, callback) { + if (callback) return util.nextTick(callback, error); + this.emit('error', error); +}; diff --git a/lib/client/transaction.js b/lib/client/transaction.js new file mode 100644 index 000000000..2e0524dbb --- /dev/null +++ b/lib/client/transaction.js @@ -0,0 +1,70 @@ +var emitter = require('../emitter'); +var {ACTIONS} = require('../message-actions'); + +var idCounter = 1; + +module.exports = Transaction; +function Transaction(connection) { + emitter.EventEmitter.call(this); + + // TODO: UUIDs? + this.id = (idCounter++).toString(); + + this._connection = connection; + this._callback = null; + this._docs = Object.create(null); +} +emitter.mixin(Transaction); + +Transaction.prototype.commit = function(callback) { + // TODO: Catch multiple calls + // TODO: Handle network changes + this._callback = callback; + this._connection.send({ + a: ACTIONS.transactionCommit, + id: this.id, + o: this._getOps() + }); +}; + +Transaction.prototype._handleCommit = function(error, message) { + if (typeof this._callback === 'function') this._callback(error); + else if (error) this.emit('error', error); + + var acks = Object.create(null); + if (message.acks) { + for (var ack of message.acks) acks[ack.seq] = ack; + } + + for (var collection in this._docs) { + for (var id in this._docs[collection]) { + var doc = this._docs[collection][id]; + for (var op of doc.pendingOps) { + if (op.transaction !== this.id) break; + doc._handleOp(error, acks[op.seq]); + } + } + } +}; + +Transaction.prototype._registerDoc = function(doc) { + var collection = this._docs[doc.collection] = this._docs[doc.collection] || Object.create(null); + collection[doc.id] = doc; + doc._transaction = this; +}; + +Transaction.prototype._getOps = function() { + var ops = []; + + for (var collection in this._docs) { + for (var id in this._docs[collection]) { + var doc = this._docs[collection][id]; + for (var op of doc.pendingOps) { + if (op.transaction !== this.id) break; + ops.push(this._connection._opMessage(doc, op)); + } + } + } + + return ops; +}; diff --git a/lib/db/memory.js b/lib/db/memory.js index 2db274d6e..92725eefb 100644 --- a/lib/db/memory.js +++ b/lib/db/memory.js @@ -41,19 +41,43 @@ MemoryDB.prototype.commit = function(collection, id, op, snapshot, options, call var db = this; if (typeof callback !== 'function') throw new Error('Callback required'); util.nextTick(function() { - var version = db._getVersionSync(collection, id); - if (snapshot.v !== version + 1) { - var succeeded = false; - return callback(null, succeeded); + var result = db._commitSync(collection, id, op, snapshot); + callback(result.error, result.succeeded); + }); +}; + +MemoryDB.prototype.commitTransaction = function(commits, options, callback) { + // TODO: Replace with rfdc + var docs = JSON.stringify(this.docs); + var ops = JSON.stringify(this.ops); + + var error; + var succeeded = true; + for (var commit of commits) { + var result = this._commitSync(commit.collection, commit.id, commit.op, commit.snapshot); + succeeded = succeeded && result.succeeded; + error = result.error; + + if (!succeeded) { + this.docs = JSON.parse(docs); + this.ops = JSON.parse(ops); + break; } - var err = db._writeOpSync(collection, id, op); - if (err) return callback(err); - err = db._writeSnapshotSync(collection, id, snapshot); - if (err) return callback(err); + } - var succeeded = true; - callback(null, succeeded); - }); + callback(error, succeeded); +}; + +MemoryDB.prototype._commitSync = function(collection, id, op, snapshot) { + var version = this._getVersionSync(collection, id); + if (snapshot.v !== version + 1) { + return {error: null, succeeded: false}; + } + var error = this._writeOpSync(collection, id, op); + if (error) return {error: error, succeeded: false}; + error = this._writeSnapshotSync(collection, id, snapshot); + if (error) return {error: error, succeeded: false}; + return {error: null, succeeded: true}; }; // Get the named document from the database. The callback is called with (err, diff --git a/lib/message-actions.js b/lib/message-actions.js index e1a8e9942..aa2f51204 100644 --- a/lib/message-actions.js +++ b/lib/message-actions.js @@ -19,5 +19,6 @@ exports.ACTIONS = { presence: 'p', presenceSubscribe: 'ps', presenceUnsubscribe: 'pu', - presenceRequest: 'pr' + presenceRequest: 'pr', + transactionCommit: 'tc' }; diff --git a/lib/submit-request.js b/lib/submit-request.js index 8e6715123..61deab891 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -16,7 +16,7 @@ function SubmitRequest(backend, agent, index, id, op, options) { this.collection = (projection) ? projection.target : index; this.id = id; this.op = op; - this.options = options; + this.options = options || {}; this.extra = op.x; delete op.x; @@ -43,6 +43,7 @@ function SubmitRequest(backend, agent, index, id, op, options) { this.ops = []; this.channels = null; this._fixupOps = []; + this._transaction = agent._transactions[this.options.transaction]; } module.exports = SubmitRequest; @@ -95,7 +96,30 @@ SubmitRequest.prototype.submit = function(callback) { var snapshotOptions = {}; snapshotOptions.agentCustom = request.agent.custom; - backend.db.getSnapshot(collection, id, fields, snapshotOptions, function(err, snapshot) { + + var getSnapshotFromDb = backend.db.getSnapshot.bind(backend.db); + var getSnapshot = getSnapshotFromDb; + // If we're in a transaction, try getting the snapshot from the transaction first, + // since the DB won't have an up-to-date Snapshot. The Snapshot on the transaction + // will also include any modifications made in middleware, for example, which can't + // be reproduced by just transforming the existing snapshot by ops + if (this._transaction) { + var transaction = this._transaction; + getSnapshot = function(collection, id, fields, snapshotOptions, callback) { + transaction.getSnapshotAndOps(request, function(error, snapshot, ops) { + if (!snapshot) return getSnapshotFromDb(collection, id, fields, snapshotOptions, callback); + var type = snapshot.type; + // TODO: Use this._transformOp()? Get a version mismatch, and don't want ops on this.ops though + for (var op of ops) { + var error = ot.transform(type, request.op, op); + if (error) return callback(error); + } + callback(null, snapshot); + }); + }; + } + + getSnapshot(collection, id, fields, snapshotOptions, function(err, snapshot) { if (err) return callback(err); request.snapshot = snapshot; @@ -217,6 +241,10 @@ SubmitRequest.prototype.commit = function(callback) { }; } + if (request._transaction) { + return request._transaction.ready(request, request._handleCommitted(callback)); + } + // Try committing the operation and snapshot to the database atomically backend.db.commit( request.collection, @@ -224,31 +252,38 @@ SubmitRequest.prototype.commit = function(callback) { request.op, request.snapshot, request.options, - function(err, succeeded) { - if (err) return callback(err); - if (!succeeded) { - // Between our fetch and our call to commit, another client committed an - // operation. We expect this to be relatively infrequent but normal. - return request.retry(callback); - } - if (!request.suppressPublish) { - var op = request.op; - op.c = request.collection; - op.d = request.id; - op.m = undefined; - // Needed for agent to detect if it can ignore sending the op back to - // the client that submitted it in subscriptions - if (request.collection !== request.index) op.i = request.index; - backend.pubsub.publish(request.channels, op); - } - if (request._shouldSaveMilestoneSnapshot(request.snapshot)) { - request.backend.milestoneDb.saveMilestoneSnapshot(request.collection, request.snapshot); - } - callback(); - }); + request._handleCommitted(callback) + ); }); }; +SubmitRequest.prototype._handleCommitted = function(callback) { + var request = this; + var backend = this.backend; + return function(err, succeeded) { + if (err) return callback(err); + if (!succeeded) { + // Between our fetch and our call to commit, another client committed an + // operation. We expect this to be relatively infrequent but normal. + return request.retry(callback); + } + if (!request.suppressPublish) { + var op = request.op; + op.c = request.collection; + op.d = request.id; + op.m = undefined; + // Needed for agent to detect if it can ignore sending the op back to + // the client that submitted it in subscriptions + if (request.collection !== request.index) op.i = request.index; + backend.pubsub.publish(request.channels, op); + } + if (request._shouldSaveMilestoneSnapshot(request.snapshot)) { + request.backend.milestoneDb.saveMilestoneSnapshot(request.collection, request.snapshot); + } + callback(); + } +}; + SubmitRequest.prototype.retry = function(callback) { this.retries++; if (this.maxRetries != null && this.retries > this.maxRetries) { diff --git a/lib/transaction/transaction.js b/lib/transaction/transaction.js new file mode 100644 index 000000000..596a0a9ba --- /dev/null +++ b/lib/transaction/transaction.js @@ -0,0 +1,155 @@ +var util = require('../util'); +var emitter = require('../emitter'); + +function Transaction(agent, id, ops) { + emitter.EventEmitter.call(this); + + this.id = id; + + this._agent = agent; + this._backend = agent.backend; + + this._ops = ops; + this._docOps = Object.create(null); + + for (var op of ops) { + var docOps = util.digOrCreate(this._docOps, op.c, op.d, function() { + return []; + }); + op.v = op.v + docOps.length; + docOps.push(op); + } + + this._finished = false; + this._callback = null; + + this._readyRequests = Object.create(null); + this._requestCallbacks = []; + this._acks = []; +} +emitter.mixin(Transaction); +module.exports = Transaction; + +Transaction.prototype.submit = function(callback) { + // TODO: Handle multiple calls? + this._callback = callback; + + for (var op of this._ops) { + this._submitOp(op); + } +}; + +Transaction.prototype.ready = function(request, callback) { + // TODO: Clear these on retry + var docRequests = util.digOrCreate(this._readyRequests, request.collection, request.id, function() { + return []; + }); + docRequests.push(request); + this._requestCallbacks.push(callback); + + this.emit('requestReady', request); + + if (this._isReady()) return this._commitTransaction(); +}; + +Transaction.prototype.getSnapshotAndOps = function(request, callback) { + // TODO: Support fields? + // TODO: Support options? + var op = {c: request.collection, d: request.id, seq: request.op.seq}; + this._waitForPreviousOpRequest(op, function(req) { + if (!req) return callback(); + var versionDiff = req.snapshot.v - request.op.v; + var ops = util.clone(req.ops.slice(-versionDiff)); + if (ops.length) { + var offset = request.op.v - ops[0].v; + for (var op of ops) { + op.v = op.v + offset; + } + } + callback(null, util.clone(req.snapshot), ops); + }); +}; + +Transaction.prototype._waitForPreviousOpRequest = function(op, callback) { + var collection = op.c; + var id = op.d; + + var previousOp; + var docOps = this._docOps[collection][id]; + for (var docOp of docOps) { + if (op.seq === docOp.seq) break; + previousOp = docOp; + } + + if (!previousOp) return callback(); + + var requests = util.dig(this._readyRequests, collection, id) || []; + for (var request of requests) { + if (request.op.seq === previousOp.seq) return callback(request); + } + + var transaction = this; + var handler = function(request) { + if (request.collection !== collection || request.id !== id || request.op.seq !== previousOp.seq) return; + transaction.off('requestReady', handler); + callback(request); + }; + + this.on('requestReady', handler); +}; + +Transaction.prototype._submitOp = function(op) { + var transaction = this; + var agent = this._agent; + this._waitForPreviousOpRequest(op, function() { + agent._submit(op, function(error, ack) { + if (error) transaction._finish(error); + transaction._acks.push(ack); + if (transaction._acks.length === transaction._ops.length) transaction._finish(); + }); + }); +}; + +Transaction.prototype._isReady = function() { + return this._requestCallbacks.length === this._ops.length; +}; + +Transaction.prototype._commitTransaction = function() { + var requests = this._flatReadyRequests(); + this._readyRequests = Object.create(null); + var commits = requests.map(function(request) { + return { + collection: request.collection, + id: request.id, + op: request.op, + snapshot: request.snapshot, + options: request.options, + }; + }); + var callbacks = this._requestCallbacks; + this._requestCallbacks = []; + + var transaction = this; + var options = null; + this._backend.db.commitTransaction(commits, options, function(error, succeeded) { + if (error) return transaction._finish(error); + util.callEach(callbacks, null, succeeded); + }); +}; + +Transaction.prototype._flatReadyRequests = function() { + var requests = []; + for (var collection in this._readyRequests) { + for (var id in this._readyRequests[collection]) { + requests = requests.concat(this._readyRequests[collection][id]); + } + } + return requests; +}; + +Transaction.prototype._finish = function(error) { + if (this._finished) return; + this._finished = true; + if (error) return this._callback(error); + this._callback(null, {acks: this._acks}); +}; diff --git a/lib/util.js b/lib/util.js index 0d613ed58..a66609e0d 100644 --- a/lib/util.js +++ b/lib/util.js @@ -33,7 +33,7 @@ exports.dig = function() { var obj = arguments[0]; for (var i = 1; i < arguments.length; i++) { var key = arguments[i]; - obj = hasOwn(obj, key) ? obj[key] : (i === arguments.length - 1 ? undefined : Object.create(null)); + obj = obj && hasOwn(obj, key) ? obj[key] : (i === arguments.length - 1 ? undefined : Object.create(null)); } return obj; }; @@ -71,11 +71,13 @@ exports.supportsPresence = function(type) { return type && typeof type.transformPresence === 'function'; }; -exports.callEach = function(callbacks, error) { +exports.callEach = function() { + var args = Array.from(arguments); + var callbacks = args.shift(); var called = false; callbacks.forEach(function(callback) { if (callback) { - callback(error); + callback.apply(null, args); called = true; } }); diff --git a/test/client/transaction.js b/test/client/transaction.js new file mode 100644 index 000000000..37490ff34 --- /dev/null +++ b/test/client/transaction.js @@ -0,0 +1,259 @@ +var async = require('async'); +var expect = require('chai').expect; +var errorHandler = require('../util').errorHandler; + +var idCounter = 0; + +module.exports = function() { + describe.only('transaction', function() { + var backend; + var connection; + + beforeEach(function() { + backend = this.backend; + connection = backend.connect(); + }); + + describe('single Doc', function() { + var id; + var doc; + var remoteDoc; + var transaction; + + beforeEach(function() { + id = (idCounter++).toString(); + doc = connection.get('dogs', id); + doc.preventCompose = true; + remoteDoc = backend.connect().get('dogs', id); + transaction = connection.startTransaction(); + + // TODO: Discuss if this is an acceptable API? Doc will always emit error on + // a failed transaction, since the ops may have been successfully acked for this Doc, and + // we force a hard rollback with no callback, which causes an 'error' event + doc.on('error', function() {}); + }); + + it('commits two ops as a transaction', function(done) { + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + function (next) { + doc.submitOp([{p: ['age'], oi: 3}], {transaction: transaction}, errorHandler(next)); + doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}, errorHandler(next)); + expect(doc.data).to.eql({ + name: 'Gaspode', + age: 3, + tricks: ['fetch'] + }); + next(); + }, + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + next(); + }, + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({ + name: 'Gaspode', + age: 3, + tricks: ['fetch'] + }); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('fires the submitOp callback after a transaction commits', function(done) { + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + function(next) { + doc.submitOp([{p: ['age'], oi: 3}], {transaction: transaction}, next); + transaction.commit(errorHandler(next)); + }, + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql(doc.data); + next(); + } + ], done); + }); + + it('does not commit the first op if the second op fails', function(done) { + backend.use('commit', function(request, next) { + if (!request.snapshot.data.tricks) return next(); + next(new Error('fail')); + }); + + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + function(next) { + var count = 0; + function handler(error) { + count++; + expect(error.message).to.equal('fail'); + if (count === 3) next(); + }; + + doc.submitOp([{p: ['age'], oi: 3}], {transaction: transaction}, handler); + doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}, handler); + transaction.commit(handler); + }, + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + expect(doc.data).to.eql(remoteDoc.data); + expect(remoteDoc.version).to.equal(doc.version); + next(); + } + ], done); + }); + + it('deletes and creates as part of a transaction', function(done) { + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + function(next) { + doc.del({transaction: transaction}, errorHandler(next)); + doc.create({name: 'Recreated'}, 'json0', {transaction: transaction}, errorHandler(next)); + next(); + }, + remoteDoc.fetch.bind(remoteDoc), + function (next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + next(); + }, + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Recreated'}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('does not delete if the following create fails', function(done) { + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + function(next) { + backend.use('commit', function(request, next) { + var error = request.op.create ? new Error('Create not allowed') : null; + next(error); + }); + next(); + }, + function(next) { + function handler(error) { + expect(error.message).to.equal('Create not allowed'); + } + + doc.del({transaction: transaction}, handler); + doc.create({name: 'Recreated'}, 'json0', {transaction: transaction}, handler); + transaction.commit(handler); + + // Should trigger hard rollback + doc.once('load', next); + }, + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('transaction is behind remote', function(done) { + async.series([ + doc.create.bind(doc, {tricks: ['fetch']}), + remoteDoc.fetch.bind(remoteDoc), + remoteDoc.submitOp.bind(remoteDoc, [{p: ['tricks', 0], ld: 'fetch'}]), + function(next) { + doc.submitOp([{p: ['tricks', 1], li: 'sit'}], {transaction: transaction}, errorHandler(next)); + doc.submitOp([{p: ['tricks', 2], li: 'shake'}], {transaction: transaction}, errorHandler(next)); + transaction.commit(next); + }, + doc.fetch.bind(doc), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({tricks: ['sit', 'shake']}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('remote submits after but commits first', function(done) { + var hasSubmittedRemote = false; + backend.use('commit', function(request, next) { + if (!request._transaction || hasSubmittedRemote) return next(); + hasSubmittedRemote = true; + remoteDoc.submitOp([{p: ['tricks', 0], ld: 'fetch'}], next); + }); + + async.series([ + doc.create.bind(doc, {tricks: ['fetch']}), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + doc.submitOp([{p: ['tricks', 1], li: 'sit'}], {transaction: transaction}, errorHandler(next)); + doc.submitOp([{p: ['tricks', 2], li: 'shake'}], {transaction: transaction}, errorHandler(next)); + transaction.commit(next); + }, + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({tricks: ['sit', 'shake']}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + }); + + describe('multiple Docs', function() { + it('rolls back multiple Docs if one commit fails', function(done) { + var id1 = (idCounter++).toString(); + var id2 = (idCounter++).toString(); + var doc1 = connection.get('dogs', id1); + var doc2 = connection.get('dogs', id2); + var remoteDoc1 = backend.connect().get('dogs', id1); + var remoteDoc2 = backend.connect().get('dogs', id2); + + var transaction = connection.startTransaction(); + + // Doc1 will throw even though its op is accepted, since the + // whole transaction is rejected + doc1.on('error', function() {}); + doc2.on('error', function() {}); + + async.series([ + doc1.create.bind(doc1, {name: 'Gaspode'}), + doc2.create.bind(doc2, {name: 'Snoopy'}), + function(next) { + backend.use('commit', function(request, next) { + var error = request.id === id2 ? new Error('fail') : null; + next(error); + }); + next(); + }, + doc1.submitOp.bind(doc1, [{p: ['age'], oi: 3}], {transaction: transaction}), + function(next) { + doc2.submitOp([{p: ['age'], oi: 4}], {transaction: transaction}, function(error) { + expect(error.message).to.equal('fail'); + }); + doc2.once('load', next); + }, + remoteDoc1.fetch.bind(remoteDoc1), + remoteDoc2.fetch.bind(remoteDoc2), + function(next) { + expect(remoteDoc1.data).to.eql({name: 'Gaspode'}); + expect(remoteDoc2.data).to.eql({name: 'Snoopy'}); + expect(doc1.data).to.eql(remoteDoc1.data); + expect(doc2.data).to.eql(remoteDoc2.data); + next(); + } + ], done); + }); + }); + }); +} diff --git a/test/db-memory.js b/test/db-memory.js index 8c4774418..2a6febc58 100644 --- a/test/db-memory.js +++ b/test/db-memory.js @@ -69,7 +69,8 @@ describe('MemoryDB', function() { }, getQuery: function(options) { return {filter: options.query, sort: options.sort}; - } + }, + transactions: true }); describe('deleteOps', function() { diff --git a/test/db.js b/test/db.js index 5fc8a293d..16135a0e6 100644 --- a/test/db.js +++ b/test/db.js @@ -64,6 +64,10 @@ module.exports = function(options) { require('./client/projections')({getQuery: options.getQuery}); } + if (options.transactions) { + require('./client/transaction')(); + } + require('./client/submit')(); require('./client/submit-json1')(); require('./client/subscribe')();