Skip to content

Commit

Permalink
Fix some generic bugs in EventStore API
Browse files Browse the repository at this point in the history
  • Loading branch information
albe committed Dec 18, 2016
1 parent 22d990b commit 0f2e362
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 20 deletions.
53 changes: 33 additions & 20 deletions src/EventStore.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const EventStream = require('./EventStream');
const uuid = require('uuid').v4;
const fs = require('fs');
const mkdirpSync = require('mkdirp').sync;
const EventEmitter = require('events');
const Storage = require('./Storage');

Expand All @@ -19,24 +18,30 @@ class OptimisticConcurrencyError extends Error {}
*/
class EventStore extends EventEmitter {

constructor(storeName, config = {}) {
/**
* @param {string} storeName
* @param {Object} config
*/
constructor(storeName = 'eventstore', config = {}) {
super();
if (typeof storeName === 'object') {
config = storeName;
storeName = undefined;
}
this.streams = {};
this.storeName = storeName || 'eventstore';
this.storageDirectory = config.storageDirectory || './data';
let storageConfig = Object.assign({ storageDirectory: this.storageDirectory }, config.storage);
let storageConfig = Object.assign({ dataDirectory: this.storageDirectory, partitioner: (event) => event.stream }, config.storage);
this.storage = new Storage(this.storeName, storageConfig);
if (!fs.existsSync(this.storageDirectory)) {
mkdirpSync(this.storageDirectory);
}

// Find existing streams by scanning dir for filenames starting with 'stream-'
fs.readdir(this.storageDirectory, (err, files) => {
if (err) throw err;
let matches;
for (let file of files) {
if (matches = file.match(/^(stream-.*)\.index$/)) {
let streamName = matches[1];
let index = this.storage.ensureIndex(streamName);
if (matches = file.match(/(stream-(.*))\.index$/)) {
let streamName = matches[2];
let index = this.storage.ensureIndex(matches[1]);
this.streams[streamName] = { index };
this.emit('stream-available', streamName);
}
Expand All @@ -61,6 +66,7 @@ class EventStore extends EventEmitter {
* @param {Array<Object>|Object} events The events to commit or a single event.
* @param {number} [expectedVersion] One of ExpectedVersion constants or a positive version number that the stream is supposed to be at before commit.
* @param {function} [callback] A function that will be executed when all events have been committed.
* @throws {OptimisticConcurrencyError} if the stream is not at the expected version.
*/
commit(streamName, events, expectedVersion = ExpectedVersion.Any, callback) {
if (!streamName) {
Expand All @@ -70,6 +76,9 @@ class EventStore extends EventEmitter {
throw new Error('No events specified for commit.');
}
if (!(events instanceof Array)) {
if (typeof events !== 'object') {
throw new Error('Event must be an object.');
}
events = [events];
}
if (typeof expectedVersion === 'function' && typeof callback === 'undefined') {
Expand All @@ -78,7 +87,7 @@ class EventStore extends EventEmitter {
}

if (!(streamName in this.streams)) {
this.createEventStream(streamName, { metadata: { streamName } });
this.createEventStream(streamName, { stream: streamName });
}
if (expectedVersion !== ExpectedVersion.Any) {
let streamVersion = this.streams[streamName].index.length;
Expand All @@ -90,7 +99,7 @@ class EventStore extends EventEmitter {
let commitId = uuid();
let commitVersion = 0;
for (let event of events) {
let storedEvent = { id: uuid(), payload: event, metadata: { committedAt: Date.now(), commitVersion, commitId } };
let storedEvent = { id: uuid(), stream: streamName, payload: event, metadata: { committedAt: Date.now(), commitVersion, commitId } };
commitVersion++;
this.storage.write(storedEvent, commitVersion === events.length ? callback : undefined);
}
Expand All @@ -100,9 +109,9 @@ class EventStore extends EventEmitter {
/**
* Get an event stream for the given stream name within the revision boundaries.
*
* @param {string} streamName
* @param {number} [minRevision]
* @param {number} [maxRevision]
* @param {string} streamName The name of the stream to get.
* @param {number} [minRevision] The minimum revision to include in the events (inclusive).
* @param {number} [maxRevision] The maximum revision to include in the events (inclusive).
* @returns {EventStream|boolean} The event stream or false if a stream with the name doesn't exist.
*/
getEventStream(streamName, minRevision = 0, maxRevision = -1) {
Expand Down Expand Up @@ -140,13 +149,14 @@ class EventStore extends EventEmitter {
// TODO: Create iterator over joinStream
}

// EventStream = new index, but needs one stream per event type?! -> No, one stream per Projection!
/**
* Create a new stream with the given matcher.
*
* @param streamName
* @param matcher
* @returns {EventStream}
* @param {string} streamName The name of the stream to create.
* @param {Object|function(event)} matcher A matcher object, denoting the properties that need to match on an event a function that takes the event and returns true if the event should be added.
* @returns {EventStream} The EventStream with all existing events matching the matcher.
* @throws {Error} If a stream with that name already exists.
* @throws {Error} If the stream could not be created.
*/
createEventStream(streamName, matcher) {
if (streamName in this.streams) {
Expand All @@ -155,7 +165,7 @@ class EventStore extends EventEmitter {
let streamIndexName = 'stream-' + streamName;
let index = this.storage.ensureIndex(streamIndexName, matcher);
if (!index) {
throw new Error('Error creating stream index', streamName);
throw new Error('Error creating stream index ' + streamName);
}
this.streams[streamName] = { index, matcher };
return new EventStream(this.storage.readRange(1, 0, index));
Expand All @@ -164,7 +174,10 @@ class EventStore extends EventEmitter {
/**
* Delete an event stream. Will do nothing if the stream with the name doesn't exist.
*
* @param {string} streamName
* Note that you can delete a write stream, but that will not delete the events written to it.
* Also, on next write, that stream will be rebuilt from all existing events, which might take some time.
*
* @param {string} streamName The name of the stream to delete.
* @returns void
*/
deleteEventStream(streamName) {
Expand Down
14 changes: 14 additions & 0 deletions src/EventStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,25 @@ class EventStream extends Readable {
}
}
*/
/**
* Iterator implementation. Iterate over the stream in a `for ... of` loop.
*/
*[Symbol.iterator]() {
let next;
while ((next = this.next()) !== false) {
yield next;
}
}

next() {
let next = this.iterator.next();
return next.done ? false : next.value.payload;
}

/**
* Readable stream implementation.
* @private
*/
_read() {
let next = this.next();
this.push(next ? next : null);
Expand Down
18 changes: 18 additions & 0 deletions test/EventStore.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,24 @@ describe('EventStore', function() {
eventstore = undefined;
});

it('basically works', function(done) {
eventstore = new EventStore({
storageDirectory: 'test/data'
});

let events = [{foo: 'bar'}, {foo: 'baz'}, {foo: 'quux'}];
eventstore.on('ready', () => {
eventstore.commit('foo-bar', events, () => {
let stream = eventstore.getEventStream('foo-bar');
let i = 0;
for (let event of stream) {
expect(event).to.eql(events[i++]);
}
done();
});
});
});

it('needs to be tested.');

});

0 comments on commit 0f2e362

Please sign in to comment.