diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..61c1c20 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +node_modules +lib/hoard.js +test/*.hoard diff --git a/README.md b/README.md index 5cf3cd0..d89f43e 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,7 @@ hoard.update('users.hoard', 1337, 1311169605, function(err) { // Update multiple values at once in an existing Hoard file. // This function is much faster when dealing with multiple values // that need to be written at once. -hoard.update('users.hoard', [[1312490305, 4976], [1312492105, 3742]], function(err) { +hoard.updateMany('users.hoard', [[1312490305, 4976], [1312492105, 3742]], function(err) { if (err) throw err; console.log('Hoard file updated!'); }); @@ -102,6 +102,12 @@ maximum compatibility. They don't require the Python version to be installed but files generated by it. The tests were implemented using Expresso after some experimentation with Vows. Ran into some issues with Vows and decided to use the much simpler (and dumber) Expresso instead. +Testing +------- + + - [ cake setup ] + - cake [ install | build ] + - cake test Authors ------- diff --git a/src/hoard.coffee b/src/hoard.coffee index d156b88..d38acac 100644 --- a/src/hoard.coffee +++ b/src/hoard.coffee @@ -35,14 +35,14 @@ create = (filename, archives, xFilesFactor, cb) -> if path.existsSync(filename) cb new Error('File ' + filename + ' already exists') - oldest = (a[0] * a[1] for a in archives).sort().reverse()[0] + oldest = (a[0] * a[1] for a in archives).sort((a) -> Number(a))[0] encodeFloat = (value) -> # Dirty hack. # Using 'buffer_ieee754' from node 0.5.x # as no libraries had a working IEEE754 encoder buffer = new Buffer(4) - require('buffer_ieee754').writeIEEE754(buffer, 0.5, 0, 'big', 23, 4); + require('../lib/buffer_ieee754').writeIEEE754(buffer, 0.5, 0, 'big', 23, 4); buffer buffer = Put() @@ -73,49 +73,61 @@ propagate = (fd, timestamp, xff, higher, lower, cb) -> lowerIntervalEnd = lowerIntervalStart + lower.secondsPerPoint packedPoint = new Buffer(pointSize) - fs.read fd, packedPoint, 0, pointSize, higher.offset, (err, written, buffer) -> - cb(err) if err - [higherBaseInterval, higherBaseValue] = pack.Unpack(pointFormat, packedPoint) - - if higherBaseInterval == 0 - higherFirstOffset = higher.offset - else - timeDistance = lowerIntervalStart - higherBaseInterval - pointDistance = timeDistance / higher.secondsPerPoint - byteDistance = pointDistance * pointSize - higherFirstOffset = higher.offset + byteDistance.mod(higher.size) - - higherPoints = lower.secondsPerPoint / higher.secondsPerPoint - higherSize = higherPoints * pointSize - relativeFirstOffset = higherFirstOffset - higher.offset - relativeLastOffset = (relativeFirstOffset + higherSize).mod(higher.size) - higherLastOffset = relativeLastOffset + higher.offset - - if higherFirstOffset < higherLastOffset - # We don't wrap the archive - seriesSize = higherLastOffset - higherFirstOffset - seriesString = new Buffer(seriesSize) - - fs.read fd, seriesString, 0, seriesSize, higherFirstOffset, (err, written, buffer) -> - parseSeries(seriesString) - else - # We do wrap the archive - higherEnd = higher.offset + higher.size - firstSeriesSize = higherEnd - higherFirstOffset - secondSeriesSize = higherLastOffset - higher.offset - - seriesString = new Buffer(firstSeriesSize + secondSeriesSize) - - fs.read fd, seriesString, 0, firstSeriesSize, higherFirstOffset, (err, written, buffer) -> - cb(err) if err - if secondSeriesSize > 0 - fs.read fd, seriesString, firstSeriesSize, secondSeriesSize, higher.offset, (err, written, buffer) -> - cb(err) if err + try + fs.read fd, packedPoint, 0, pointSize, higher.offset, (err, written, buffer) -> + cb(err) if err + [higherBaseInterval, higherBaseValue] = pack.Unpack(pointFormat, packedPoint) + + if higherBaseInterval == 0 + higherFirstOffset = higher.offset + else + timeDistance = lowerIntervalStart - higherBaseInterval + pointDistance = timeDistance / higher.secondsPerPoint + byteDistance = pointDistance * pointSize + higherFirstOffset = higher.offset + byteDistance.mod(higher.size) + + higherPoints = lower.secondsPerPoint / higher.secondsPerPoint + higherSize = higherPoints * pointSize + relativeFirstOffset = higherFirstOffset - higher.offset + relativeLastOffset = (relativeFirstOffset + higherSize).mod(higher.size) + higherLastOffset = relativeLastOffset + higher.offset + + if higherFirstOffset < higherLastOffset + # We don't wrap the archive + seriesSize = higherLastOffset - higherFirstOffset + seriesString = new Buffer(seriesSize) + + try + fs.read fd, seriesString, 0, seriesSize, higherFirstOffset, (err, written, buffer) -> parseSeries(seriesString) - else - ret = new Buffer(firstSeriesSize) - seriesString.copy(ret, 0, 0, firstSeriesSize) - parseSeries(ret) + catch err + cb(err) + else + # We do wrap the archive + higherEnd = higher.offset + higher.size + firstSeriesSize = higherEnd - higherFirstOffset + secondSeriesSize = higherLastOffset - higher.offset + + seriesString = new Buffer(firstSeriesSize + secondSeriesSize) + + try + fs.read fd, seriesString, 0, firstSeriesSize, higherFirstOffset, (err, written, buffer) -> + cb(err) if err + if secondSeriesSize > 0 + try + fs.read fd, seriesString, firstSeriesSize, secondSeriesSize, higher.offset, (err, written, buffer) -> + cb(err) if err + parseSeries(seriesString) + catch err + cb(err) + else + ret = new Buffer(firstSeriesSize) + seriesString.copy(ret, 0, 0, firstSeriesSize) + parseSeries(ret) + catch err + cb(err) + catch err + cb(err) parseSeries = (seriesString) -> # Now we unpack the series data we just read @@ -158,22 +170,25 @@ propagate = (fd, timestamp, xff, higher, lower, cb) -> # !!!!!!!!!!!!!!!!! packedPoint = new Buffer(pointSize) - fs.read fd, packedPoint, 0, pointSize, lower.offset, (err) -> - [lowerBaseInterval, lowerBaseValue] = pack.Unpack(pointFormat, packedPoint) - - if lowerBaseInterval == 0 - # First propagated update to this lower archive - offset = lower.offset - else - # Not our first propagated update to this lower archive - timeDistance = lowerIntervalStart - lowerBaseInterval - pointDistance = timeDistance / lower.secondsPerPoint - byteDistance = pointDistance * pointSize - offset = lower.offset + byteDistance.mod(lower.size) - - mypp = new Buffer(myPackedPoint) - fs.write fd, mypp, 0, pointSize, offset, (err) -> - cb(null, true) + try + fs.read fd, packedPoint, 0, pointSize, lower.offset, (err) -> + [lowerBaseInterval, lowerBaseValue] = pack.Unpack(pointFormat, packedPoint) + + if lowerBaseInterval == 0 + # First propagated update to this lower archive + offset = lower.offset + else + # Not our first propagated update to this lower archive + timeDistance = lowerIntervalStart - lowerBaseInterval + pointDistance = timeDistance / lower.secondsPerPoint + byteDistance = pointDistance * pointSize + offset = lower.offset + byteDistance.mod(lower.size) + + mypp = new Buffer(myPackedPoint) + fs.write fd, mypp, 0, pointSize, offset, (err) -> + cb(null, true) + catch err + cb(err) else cb(null, false) @@ -204,38 +219,60 @@ update = (filename, value, timestamp, cb) -> myPackedPoint = new Buffer(pack.Pack(pointFormat, [myInterval, value])) packedPoint = new Buffer(pointSize) - fs.read fd, packedPoint, 0, pointSize, archive.offset, (err, bytesRead, buffer) -> - cb(err) if err - [baseInterval, baseValue] = pack.Unpack(pointFormat, packedPoint) - - if baseInterval == 0 - # This file's first update - fs.write fd, myPackedPoint, 0, pointSize, archive.offset, (err, written, buffer) -> - cb(err) if err - [baseInterval, baseValue] = [myInterval, value] - propagateLowerArchives() - else - # File has been updated before - timeDistance = myInterval - baseInterval - pointDistance = timeDistance / archive.secondsPerPoint - byteDistance = pointDistance * pointSize - myOffset = archive.offset + byteDistance.mod(archive.size) - fs.write fd, myPackedPoint, 0, pointSize, myOffset, (err, written, buffer) -> - cb(err) if err - propagateLowerArchives() - + propagateLowerArchives = -> - # Propagate the update to lower-precision archives - #higher = archive - #for lower in lowerArchives: - # if not __propagate(fd, myInterval, header.xFilesFactor, higher, lower): - # break - # higher = lower - - #__changeLastUpdate(fh) + # complete hack (not proud of this), copied updateManyArchive's code for just one update. + alignedPoints = [ [ timestamp, value ] ] + # Now we propagate the updates to lower-precision archives + higher = archive + lowerArchives = (arc for arc in header.archives when arc.secondsPerPoint > archive.secondsPerPoint) + + if lowerArchives.length > 0 + # Collect a list of propagation calls to make + # This is easier than doing async looping + propagateCalls = [] + for lower in lowerArchives + fit = (i) -> i - i.mod(lower.secondsPerPoint) + lowerIntervals = (fit(p[0]) for p in alignedPoints) + uniqueLowerIntervals = _.uniq(lowerIntervals) + for interval in uniqueLowerIntervals + propagateCalls.push {interval: interval, header: header, higher: higher, lower: lower} + higher = lower + + callPropagate = (args, callback) -> + propagate fd, args.interval, args.header.xFilesFactor, args.higher, args.lower, (err, result) -> + cb err if err + callback err, result + + async.forEachSeries propagateCalls, callPropagate, (err, result) -> + cb err if err + fs.close fd, cb + else + fs.close fd, cb + + try + fs.read fd, packedPoint, 0, pointSize, archive.offset, (err, bytesRead, buffer) -> + cb(err) if err + [baseInterval, baseValue] = pack.Unpack(pointFormat, packedPoint) + + if baseInterval == 0 + # This file's first update + fs.write fd, myPackedPoint, 0, pointSize, archive.offset, (err, written, buffer) -> + cb(err) if err + [baseInterval, baseValue] = [myInterval, value] + propagateLowerArchives() + else + # File has been updated before + timeDistance = myInterval - baseInterval + pointDistance = timeDistance / archive.secondsPerPoint + byteDistance = pointDistance * pointSize + myOffset = archive.offset + byteDistance.mod(archive.size) + fs.write fd, myPackedPoint, 0, pointSize, myOffset, (err, written, buffer) -> + cb(err) if err + propagateLowerArchives() + catch err + cb(err) - # FIXME: Also fsync here? - fs.close fd, cb return updateMany = (filename, points, cb) -> @@ -277,12 +314,12 @@ updateMany = (filename, points, cb) -> currentPoints.push(point) async.series updateArchiveCalls, (err, results) -> - throw err if err + cb err if err if currentArchive and currentPoints.length > 0 # Don't forget to commit after we've checked all the archives currentPoints.reverse() updateManyArchive fd, header, currentArchive, currentPoints, (err) -> - throw err if err + cb err if err fs.close fd, cb else fs.close fd, cb @@ -324,70 +361,73 @@ updateManyArchive = (fd, header, archive, points, cb) -> # Read base point and determine where our writes will start packedBasePoint = new Buffer(pointSize) - fs.read fd, packedBasePoint, 0, pointSize, archive.offset, (err) -> - cb err if err - [baseInterval, baseValue] = pack.Unpack(pointFormat, packedBasePoint) - - if baseInterval == 0 - # This file's first update - # Use our first string as the base, so we start at the start - baseInterval = packedStrings[0][0] - - # Write all of our packed strings in locations determined by the baseInterval - - writePackedString = (ps, callback) -> - [interval, packedString] = ps - timeDistance = interval - baseInterval - pointDistance = timeDistance / step - byteDistance = pointDistance * pointSize - myOffset = archive.offset + byteDistance.mod(archive.size) - archiveEnd = archive.offset + archive.size - bytesBeyond = (myOffset + packedString.length) - archiveEnd - - if bytesBeyond > 0 - fs.write fd, packedString, 0, packedString.length - bytesBeyond, myOffset, (err) -> - cb err if err - assert.equal archiveEnd, myOffset + packedString.length - bytesBeyond - #assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (archiveEnd,fh.tell(),bytesBeyond,len(packedString)) - # Safe because it can't exceed the archive (retention checking logic above) - fs.write fd, packedString, packedString.length - bytesBeyond, bytesBeyond, archive.offset, (err) -> + try + fs.read fd, packedBasePoint, 0, pointSize, archive.offset, (err) -> + cb err if err + [baseInterval, baseValue] = pack.Unpack(pointFormat, packedBasePoint) + + if baseInterval == 0 + # This file's first update + # Use our first string as the base, so we start at the start + baseInterval = packedStrings[0][0] + + # Write all of our packed strings in locations determined by the baseInterval + + writePackedString = (ps, callback) -> + [interval, packedString] = ps + timeDistance = interval - baseInterval + pointDistance = timeDistance / step + byteDistance = pointDistance * pointSize + myOffset = archive.offset + byteDistance.mod(archive.size) + archiveEnd = archive.offset + archive.size + bytesBeyond = (myOffset + packedString.length) - archiveEnd + + if bytesBeyond > 0 + fs.write fd, packedString, 0, packedString.length - bytesBeyond, myOffset, (err) -> cb err if err + assert.equal archiveEnd, myOffset + packedString.length - bytesBeyond + #assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (archiveEnd,fh.tell(),bytesBeyond,len(packedString)) + # Safe because it can't exceed the archive (retention checking logic above) + fs.write fd, packedString, packedString.length - bytesBeyond, bytesBeyond, archive.offset, (err) -> + cb err if err + callback() + else + fs.write fd, packedString, 0, packedString.length, myOffset, (err) -> callback() - else - fs.write fd, packedString, 0, packedString.length, myOffset, (err) -> - callback() - - async.forEachSeries packedStrings, writePackedString, (err) -> - throw err if err - propagateLowerArchives() - - propagateLowerArchives = -> - # Now we propagate the updates to lower-precision archives - higher = archive - lowerArchives = (arc for arc in header.archives when arc.secondsPerPoint > archive.secondsPerPoint) - - if lowerArchives.length > 0 - # Collect a list of propagation calls to make - # This is easier than doing async looping - propagateCalls = [] - for lower in lowerArchives - fit = (i) -> i - i.mod(lower.secondsPerPoint) - lowerIntervals = (fit(p[0]) for p in alignedPoints) - uniqueLowerIntervals = _.uniq(lowerIntervals) - for interval in uniqueLowerIntervals - propagateCalls.push {interval: interval, header: header, higher: higher, lower: lower} - higher = lower - - callPropagate = (args, callback) -> - propagate fd, args.interval, args.header.xFilesFactor, args.higher, args.lower, (err, result) -> + + propagateLowerArchives = -> + # Now we propagate the updates to lower-precision archives + higher = archive + lowerArchives = (arc for arc in header.archives when arc.secondsPerPoint > archive.secondsPerPoint) + + if lowerArchives.length > 0 + # Collect a list of propagation calls to make + # This is easier than doing async looping + propagateCalls = [] + for lower in lowerArchives + fit = (i) -> i - i.mod(lower.secondsPerPoint) + lowerIntervals = (fit(p[0]) for p in alignedPoints) + uniqueLowerIntervals = _.uniq(lowerIntervals) + for interval in uniqueLowerIntervals + propagateCalls.push {interval: interval, header: header, higher: higher, lower: lower} + higher = lower + + callPropagate = (args, callback) -> + propagate fd, args.interval, args.header.xFilesFactor, args.higher, args.lower, (err, result) -> + cb err if err + callback err, result + + async.forEachSeries propagateCalls, callPropagate, (err, result) -> cb err if err - callback err, result - - async.forEachSeries propagateCalls, callPropagate, (err, result) -> - throw err if err + cb null + else cb null - else - cb null + + async.forEachSeries packedStrings, writePackedString, (err) -> + cb err if err + propagateLowerArchives() + catch err + cb(err) info = (path, cb) -> # FIXME: Close this stream? @@ -428,7 +468,7 @@ fetch = (path, from, to, cb) -> now = unixTime() oldestTime = now - header.maxRetention from = oldestTime if from < oldestTime - throw new Error('Invalid time interval') unless from < to + cb(new Error('Invalid time interval')) unless from < to to = now if to > now or to < from diff = now - from fd = null @@ -467,28 +507,37 @@ fetch = (path, from, to, cb) -> toOffset = getOffset(toInterval) fs.open path, 'r', (err, fd) -> - if err then throw err + cb err if err if fromOffset < toOffset # We don't wrap around, can everything in a single read size = toOffset - fromOffset seriesBuffer = new Buffer(size) - fs.read fd, seriesBuffer, 0, size, fromOffset, (err, num) -> - cb(err) if err - fs.close fd, (err) -> + try + fs.read fd, seriesBuffer, 0, size, fromOffset, (err, num) -> cb(err) if err - unpack(seriesBuffer) # We have read it, go unpack! + fs.close fd, (err) -> + cb(err) if err + unpack(seriesBuffer) # We have read it, go unpack! + catch err + cb(err) else # We wrap around the archive, we need two reads archiveEnd = archive.offset + archive.size size1 = archiveEnd - fromOffset size2 = toOffset - archive.offset seriesBuffer = new Buffer(size1 + size2) - fs.read fd, seriesBuffer, 0, size1, fromOffset, (err, num) -> - cb(err) if err - fs.read fd, seriesBuffer, size1, size2, archive.offset, (err, num) -> + try + fs.read fd, seriesBuffer, 0, size1, fromOffset, (err, num) -> cb(err) if err - unpack(seriesBuffer) # We have read it, go unpack! - fs.close(fd) + try + fs.read fd, seriesBuffer, size1, size2, archive.offset, (err, num) -> + cb(err) if err + unpack(seriesBuffer) # We have read it, go unpack! + fs.close(fd) + catch err + cb(err) + catch err + cb(err) unpack = (seriesData) -> # Optmize this? diff --git a/test/hoard.test.coffee b/test/hoard.test.coffee index acf4d7d..a4c1a73 100644 --- a/test/hoard.test.coffee +++ b/test/hoard.test.coffee @@ -1,7 +1,7 @@ assert = require 'assert' fs = require 'fs' path = require 'path' -hoard = require "hoard" +hoard = require "../lib/hoard" equal = assert.equal FILENAME = 'test/large.whisper'