From 0e006ad149569f530e34f0a5487406dccb4a6372 Mon Sep 17 00:00:00 2001 From: christopher holt Date: Mon, 17 Oct 2011 13:18:18 -0400 Subject: [PATCH 01/14] update should be updateMany in second example. added "../lib" path to requiring buffer_ieee754, was breaking otherwise. --- README.md | 2 +- src/hoard.coffee | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 5cf3cd0..7a22b5e 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,7 @@ hoard.update('users.hoard', 1337, 1311169605, function(err) { // Update multiple values at once in an existing Hoard file. // This function is much faster when dealing with multiple values // that need to be written at once. -hoard.update('users.hoard', [[1312490305, 4976], [1312492105, 3742]], function(err) { +hoard.updateMany('users.hoard', [[1312490305, 4976], [1312492105, 3742]], function(err) { if (err) throw err; console.log('Hoard file updated!'); }); diff --git a/src/hoard.coffee b/src/hoard.coffee index d156b88..fe6ef42 100644 --- a/src/hoard.coffee +++ b/src/hoard.coffee @@ -42,7 +42,7 @@ create = (filename, archives, xFilesFactor, cb) -> # Using 'buffer_ieee754' from node 0.5.x # as no libraries had a working IEEE754 encoder buffer = new Buffer(4) - require('buffer_ieee754').writeIEEE754(buffer, 0.5, 0, 'big', 23, 4); + require('../lib/buffer_ieee754').writeIEEE754(buffer, 0.5, 0, 'big', 23, 4); buffer buffer = Put() From 9ef9c844054fbb42db7afdf90b8a18f0c77a5c61 Mon Sep 17 00:00:00 2001 From: christopher holt Date: Mon, 17 Oct 2011 14:02:41 -0400 Subject: [PATCH 02/14] gitignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..61c1c20 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +node_modules +lib/hoard.js +test/*.hoard From 8da4c7e5097b3275cdbb4287d2361b83a43d0aa7 Mon Sep 17 00:00:00 2001 From: christopher holt Date: Mon, 17 Oct 2011 14:06:55 -0400 Subject: [PATCH 03/14] failing when propagateLowerArchives is defined after it is called --- src/hoard.coffee | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/hoard.coffee b/src/hoard.coffee index fe6ef42..ffa98c7 100644 --- a/src/hoard.coffee +++ b/src/hoard.coffee @@ -204,6 +204,20 @@ update = (filename, value, timestamp, cb) -> myPackedPoint = new Buffer(pack.Pack(pointFormat, [myInterval, value])) packedPoint = new Buffer(pointSize) + + propagateLowerArchives = -> + # Propagate the update to lower-precision archives + #higher = archive + #for lower in lowerArchives: + # if not __propagate(fd, myInterval, header.xFilesFactor, higher, lower): + # break + # higher = lower + + #__changeLastUpdate(fh) + + # FIXME: Also fsync here? + fs.close fd, cb + fs.read fd, packedPoint, 0, pointSize, archive.offset, (err, bytesRead, buffer) -> cb(err) if err [baseInterval, baseValue] = pack.Unpack(pointFormat, packedPoint) @@ -224,18 +238,6 @@ update = (filename, value, timestamp, cb) -> cb(err) if err propagateLowerArchives() - propagateLowerArchives = -> - # Propagate the update to lower-precision archives - #higher = archive - #for lower in lowerArchives: - # if not __propagate(fd, myInterval, header.xFilesFactor, higher, lower): - # break - # higher = lower - - #__changeLastUpdate(fh) - - # FIXME: Also fsync here? - fs.close fd, cb return updateMany = (filename, points, cb) -> @@ -357,10 +359,6 @@ updateManyArchive = (fd, header, archive, points, cb) -> fs.write fd, packedString, 0, packedString.length, myOffset, (err) -> callback() - async.forEachSeries packedStrings, writePackedString, (err) -> - throw err if err - propagateLowerArchives() - propagateLowerArchives = -> # Now we propagate the updates to lower-precision archives higher = archive @@ -389,6 +387,10 @@ updateManyArchive = (fd, header, archive, points, cb) -> else cb null + async.forEachSeries packedStrings, writePackedString, (err) -> + throw err if err + propagateLowerArchives() + info = (path, cb) -> # FIXME: Close this stream? # FIXME: Signal errors to callback? From 56bc9075fa051c50a76d913753c7160d00bbdf8a Mon Sep 17 00:00:00 2001 From: christopher holt Date: Mon, 17 Oct 2011 14:07:08 -0400 Subject: [PATCH 04/14] updated tests --- README.md | 8 ++++++++ test/hoard.test.coffee | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 7a22b5e..c88b09c 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,14 @@ maximum compatibility. They don't require the Python version to be installed but files generated by it. The tests were implemented using Expresso after some experimentation with Vows. Ran into some issues with Vows and decided to use the much simpler (and dumber) Expresso instead. +Testing +------- + +# [ cake setup ] +# cake [ install | build ] +# cake test + + Authors ------- diff --git a/test/hoard.test.coffee b/test/hoard.test.coffee index acf4d7d..a4c1a73 100644 --- a/test/hoard.test.coffee +++ b/test/hoard.test.coffee @@ -1,7 +1,7 @@ assert = require 'assert' fs = require 'fs' path = require 'path' -hoard = require "hoard" +hoard = require "../lib/hoard" equal = assert.equal FILENAME = 'test/large.whisper' From fe0805e9f398b948e593039940662efd7a9ea84e Mon Sep 17 00:00:00 2001 From: christopher holt Date: Mon, 17 Oct 2011 14:11:25 -0400 Subject: [PATCH 05/14] styling --- README.md | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index c88b09c..d89f43e 100644 --- a/README.md +++ b/README.md @@ -105,11 +105,9 @@ Ran into some issues with Vows and decided to use the much simpler (and dumber) Testing ------- -# [ cake setup ] -# cake [ install | build ] -# cake test - - + - [ cake setup ] + - cake [ install | build ] + - cake test Authors ------- From 645c674992929a5f645fae18965d66ba66456984 Mon Sep 17 00:00:00 2001 From: christopher holt Date: Mon, 17 Oct 2011 16:53:22 -0400 Subject: [PATCH 06/14] copied propagateLowerArchives from updateManyArchive to update [complete hack, but works] --- src/hoard.coffee | 41 +++++++++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/src/hoard.coffee b/src/hoard.coffee index ffa98c7..ba8a7c5 100644 --- a/src/hoard.coffee +++ b/src/hoard.coffee @@ -206,18 +206,35 @@ update = (filename, value, timestamp, cb) -> packedPoint = new Buffer(pointSize) propagateLowerArchives = -> - # Propagate the update to lower-precision archives - #higher = archive - #for lower in lowerArchives: - # if not __propagate(fd, myInterval, header.xFilesFactor, higher, lower): - # break - # higher = lower - - #__changeLastUpdate(fh) - - # FIXME: Also fsync here? - fs.close fd, cb - + # complete hack (not proud of this), copied updateManyArchive's code for just one update. + alignedPoints = [ [ timestamp, value ] ] + # Now we propagate the updates to lower-precision archives + higher = archive + lowerArchives = (arc for arc in header.archives when arc.secondsPerPoint > archive.secondsPerPoint) + + if lowerArchives.length > 0 + # Collect a list of propagation calls to make + # This is easier than doing async looping + propagateCalls = [] + for lower in lowerArchives + fit = (i) -> i - i.mod(lower.secondsPerPoint) + lowerIntervals = (fit(p[0]) for p in alignedPoints) + uniqueLowerIntervals = _.uniq(lowerIntervals) + for interval in uniqueLowerIntervals + propagateCalls.push {interval: interval, header: header, higher: higher, lower: lower} + higher = lower + + callPropagate = (args, callback) -> + propagate fd, args.interval, args.header.xFilesFactor, args.higher, args.lower, (err, result) -> + cb err if err + callback err, result + + async.forEachSeries propagateCalls, callPropagate, (err, result) -> + throw err if err + fs.close fd, cb + else + fs.close fd, cb + fs.read fd, packedPoint, 0, pointSize, archive.offset, (err, bytesRead, buffer) -> cb(err) if err [baseInterval, baseValue] = pack.Unpack(pointFormat, packedPoint) From e773735555d8b5a5afa87d4cf357611517d62a50 Mon Sep 17 00:00:00 2001 From: christopher holt Date: Mon, 17 Oct 2011 17:19:06 -0400 Subject: [PATCH 07/14] hitting a race condition with reading from the buffer - "Error: Offset is out of bounds" try/catch on all fs.read handles this, but does not solve it. --- src/hoard.coffee | 317 +++++++++++++++++++++++++---------------------- 1 file changed, 169 insertions(+), 148 deletions(-) diff --git a/src/hoard.coffee b/src/hoard.coffee index ba8a7c5..a67210c 100644 --- a/src/hoard.coffee +++ b/src/hoard.coffee @@ -73,49 +73,52 @@ propagate = (fd, timestamp, xff, higher, lower, cb) -> lowerIntervalEnd = lowerIntervalStart + lower.secondsPerPoint packedPoint = new Buffer(pointSize) - fs.read fd, packedPoint, 0, pointSize, higher.offset, (err, written, buffer) -> - cb(err) if err - [higherBaseInterval, higherBaseValue] = pack.Unpack(pointFormat, packedPoint) - - if higherBaseInterval == 0 - higherFirstOffset = higher.offset - else - timeDistance = lowerIntervalStart - higherBaseInterval - pointDistance = timeDistance / higher.secondsPerPoint - byteDistance = pointDistance * pointSize - higherFirstOffset = higher.offset + byteDistance.mod(higher.size) - - higherPoints = lower.secondsPerPoint / higher.secondsPerPoint - higherSize = higherPoints * pointSize - relativeFirstOffset = higherFirstOffset - higher.offset - relativeLastOffset = (relativeFirstOffset + higherSize).mod(higher.size) - higherLastOffset = relativeLastOffset + higher.offset - - if higherFirstOffset < higherLastOffset - # We don't wrap the archive - seriesSize = higherLastOffset - higherFirstOffset - seriesString = new Buffer(seriesSize) - - fs.read fd, seriesString, 0, seriesSize, higherFirstOffset, (err, written, buffer) -> - parseSeries(seriesString) - else - # We do wrap the archive - higherEnd = higher.offset + higher.size - firstSeriesSize = higherEnd - higherFirstOffset - secondSeriesSize = higherLastOffset - higher.offset - - seriesString = new Buffer(firstSeriesSize + secondSeriesSize) - - fs.read fd, seriesString, 0, firstSeriesSize, higherFirstOffset, (err, written, buffer) -> - cb(err) if err - if secondSeriesSize > 0 - fs.read fd, seriesString, firstSeriesSize, secondSeriesSize, higher.offset, (err, written, buffer) -> - cb(err) if err - parseSeries(seriesString) - else - ret = new Buffer(firstSeriesSize) - seriesString.copy(ret, 0, 0, firstSeriesSize) - parseSeries(ret) + try + fs.read fd, packedPoint, 0, pointSize, higher.offset, (err, written, buffer) -> + cb(err) if err + [higherBaseInterval, higherBaseValue] = pack.Unpack(pointFormat, packedPoint) + + if higherBaseInterval == 0 + higherFirstOffset = higher.offset + else + timeDistance = lowerIntervalStart - higherBaseInterval + pointDistance = timeDistance / higher.secondsPerPoint + byteDistance = pointDistance * pointSize + higherFirstOffset = higher.offset + byteDistance.mod(higher.size) + + higherPoints = lower.secondsPerPoint / higher.secondsPerPoint + higherSize = higherPoints * pointSize + relativeFirstOffset = higherFirstOffset - higher.offset + relativeLastOffset = (relativeFirstOffset + higherSize).mod(higher.size) + higherLastOffset = relativeLastOffset + higher.offset + + if higherFirstOffset < higherLastOffset + # We don't wrap the archive + seriesSize = higherLastOffset - higherFirstOffset + seriesString = new Buffer(seriesSize) + + fs.read fd, seriesString, 0, seriesSize, higherFirstOffset, (err, written, buffer) -> + parseSeries(seriesString) + else + # We do wrap the archive + higherEnd = higher.offset + higher.size + firstSeriesSize = higherEnd - higherFirstOffset + secondSeriesSize = higherLastOffset - higher.offset + + seriesString = new Buffer(firstSeriesSize + secondSeriesSize) + + fs.read fd, seriesString, 0, firstSeriesSize, higherFirstOffset, (err, written, buffer) -> + cb(err) if err + if secondSeriesSize > 0 + fs.read fd, seriesString, firstSeriesSize, secondSeriesSize, higher.offset, (err, written, buffer) -> + cb(err) if err + parseSeries(seriesString) + else + ret = new Buffer(firstSeriesSize) + seriesString.copy(ret, 0, 0, firstSeriesSize) + parseSeries(ret) + catch err + cb(err) parseSeries = (seriesString) -> # Now we unpack the series data we just read @@ -158,22 +161,25 @@ propagate = (fd, timestamp, xff, higher, lower, cb) -> # !!!!!!!!!!!!!!!!! packedPoint = new Buffer(pointSize) - fs.read fd, packedPoint, 0, pointSize, lower.offset, (err) -> - [lowerBaseInterval, lowerBaseValue] = pack.Unpack(pointFormat, packedPoint) - - if lowerBaseInterval == 0 - # First propagated update to this lower archive - offset = lower.offset - else - # Not our first propagated update to this lower archive - timeDistance = lowerIntervalStart - lowerBaseInterval - pointDistance = timeDistance / lower.secondsPerPoint - byteDistance = pointDistance * pointSize - offset = lower.offset + byteDistance.mod(lower.size) - - mypp = new Buffer(myPackedPoint) - fs.write fd, mypp, 0, pointSize, offset, (err) -> - cb(null, true) + try + fs.read fd, packedPoint, 0, pointSize, lower.offset, (err) -> + [lowerBaseInterval, lowerBaseValue] = pack.Unpack(pointFormat, packedPoint) + + if lowerBaseInterval == 0 + # First propagated update to this lower archive + offset = lower.offset + else + # Not our first propagated update to this lower archive + timeDistance = lowerIntervalStart - lowerBaseInterval + pointDistance = timeDistance / lower.secondsPerPoint + byteDistance = pointDistance * pointSize + offset = lower.offset + byteDistance.mod(lower.size) + + mypp = new Buffer(myPackedPoint) + fs.write fd, mypp, 0, pointSize, offset, (err) -> + cb(null, true) + catch err + cb(err) else cb(null, false) @@ -235,25 +241,28 @@ update = (filename, value, timestamp, cb) -> else fs.close fd, cb - fs.read fd, packedPoint, 0, pointSize, archive.offset, (err, bytesRead, buffer) -> - cb(err) if err - [baseInterval, baseValue] = pack.Unpack(pointFormat, packedPoint) - - if baseInterval == 0 - # This file's first update - fs.write fd, myPackedPoint, 0, pointSize, archive.offset, (err, written, buffer) -> - cb(err) if err - [baseInterval, baseValue] = [myInterval, value] - propagateLowerArchives() - else - # File has been updated before - timeDistance = myInterval - baseInterval - pointDistance = timeDistance / archive.secondsPerPoint - byteDistance = pointDistance * pointSize - myOffset = archive.offset + byteDistance.mod(archive.size) - fs.write fd, myPackedPoint, 0, pointSize, myOffset, (err, written, buffer) -> - cb(err) if err - propagateLowerArchives() + try + fs.read fd, packedPoint, 0, pointSize, archive.offset, (err, bytesRead, buffer) -> + cb(err) if err + [baseInterval, baseValue] = pack.Unpack(pointFormat, packedPoint) + + if baseInterval == 0 + # This file's first update + fs.write fd, myPackedPoint, 0, pointSize, archive.offset, (err, written, buffer) -> + cb(err) if err + [baseInterval, baseValue] = [myInterval, value] + propagateLowerArchives() + else + # File has been updated before + timeDistance = myInterval - baseInterval + pointDistance = timeDistance / archive.secondsPerPoint + byteDistance = pointDistance * pointSize + myOffset = archive.offset + byteDistance.mod(archive.size) + fs.write fd, myPackedPoint, 0, pointSize, myOffset, (err, written, buffer) -> + cb(err) if err + propagateLowerArchives() + catch err + cb(err) return @@ -343,70 +352,73 @@ updateManyArchive = (fd, header, archive, points, cb) -> # Read base point and determine where our writes will start packedBasePoint = new Buffer(pointSize) - fs.read fd, packedBasePoint, 0, pointSize, archive.offset, (err) -> - cb err if err - [baseInterval, baseValue] = pack.Unpack(pointFormat, packedBasePoint) - - if baseInterval == 0 - # This file's first update - # Use our first string as the base, so we start at the start - baseInterval = packedStrings[0][0] - - # Write all of our packed strings in locations determined by the baseInterval - - writePackedString = (ps, callback) -> - [interval, packedString] = ps - timeDistance = interval - baseInterval - pointDistance = timeDistance / step - byteDistance = pointDistance * pointSize - myOffset = archive.offset + byteDistance.mod(archive.size) - archiveEnd = archive.offset + archive.size - bytesBeyond = (myOffset + packedString.length) - archiveEnd - - if bytesBeyond > 0 - fs.write fd, packedString, 0, packedString.length - bytesBeyond, myOffset, (err) -> - cb err if err - assert.equal archiveEnd, myOffset + packedString.length - bytesBeyond - #assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (archiveEnd,fh.tell(),bytesBeyond,len(packedString)) - # Safe because it can't exceed the archive (retention checking logic above) - fs.write fd, packedString, packedString.length - bytesBeyond, bytesBeyond, archive.offset, (err) -> + try + fs.read fd, packedBasePoint, 0, pointSize, archive.offset, (err) -> + cb err if err + [baseInterval, baseValue] = pack.Unpack(pointFormat, packedBasePoint) + + if baseInterval == 0 + # This file's first update + # Use our first string as the base, so we start at the start + baseInterval = packedStrings[0][0] + + # Write all of our packed strings in locations determined by the baseInterval + + writePackedString = (ps, callback) -> + [interval, packedString] = ps + timeDistance = interval - baseInterval + pointDistance = timeDistance / step + byteDistance = pointDistance * pointSize + myOffset = archive.offset + byteDistance.mod(archive.size) + archiveEnd = archive.offset + archive.size + bytesBeyond = (myOffset + packedString.length) - archiveEnd + + if bytesBeyond > 0 + fs.write fd, packedString, 0, packedString.length - bytesBeyond, myOffset, (err) -> cb err if err + assert.equal archiveEnd, myOffset + packedString.length - bytesBeyond + #assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (archiveEnd,fh.tell(),bytesBeyond,len(packedString)) + # Safe because it can't exceed the archive (retention checking logic above) + fs.write fd, packedString, packedString.length - bytesBeyond, bytesBeyond, archive.offset, (err) -> + cb err if err + callback() + else + fs.write fd, packedString, 0, packedString.length, myOffset, (err) -> callback() - else - fs.write fd, packedString, 0, packedString.length, myOffset, (err) -> - callback() - - propagateLowerArchives = -> - # Now we propagate the updates to lower-precision archives - higher = archive - lowerArchives = (arc for arc in header.archives when arc.secondsPerPoint > archive.secondsPerPoint) - - if lowerArchives.length > 0 - # Collect a list of propagation calls to make - # This is easier than doing async looping - propagateCalls = [] - for lower in lowerArchives - fit = (i) -> i - i.mod(lower.secondsPerPoint) - lowerIntervals = (fit(p[0]) for p in alignedPoints) - uniqueLowerIntervals = _.uniq(lowerIntervals) - for interval in uniqueLowerIntervals - propagateCalls.push {interval: interval, header: header, higher: higher, lower: lower} - higher = lower - - callPropagate = (args, callback) -> - propagate fd, args.interval, args.header.xFilesFactor, args.higher, args.lower, (err, result) -> - cb err if err - callback err, result - - async.forEachSeries propagateCalls, callPropagate, (err, result) -> - throw err if err + + propagateLowerArchives = -> + # Now we propagate the updates to lower-precision archives + higher = archive + lowerArchives = (arc for arc in header.archives when arc.secondsPerPoint > archive.secondsPerPoint) + + if lowerArchives.length > 0 + # Collect a list of propagation calls to make + # This is easier than doing async looping + propagateCalls = [] + for lower in lowerArchives + fit = (i) -> i - i.mod(lower.secondsPerPoint) + lowerIntervals = (fit(p[0]) for p in alignedPoints) + uniqueLowerIntervals = _.uniq(lowerIntervals) + for interval in uniqueLowerIntervals + propagateCalls.push {interval: interval, header: header, higher: higher, lower: lower} + higher = lower + + callPropagate = (args, callback) -> + propagate fd, args.interval, args.header.xFilesFactor, args.higher, args.lower, (err, result) -> + cb err if err + callback err, result + + async.forEachSeries propagateCalls, callPropagate, (err, result) -> + throw err if err + cb null + else cb null - else - cb null - - async.forEachSeries packedStrings, writePackedString, (err) -> - throw err if err - propagateLowerArchives() + + async.forEachSeries packedStrings, writePackedString, (err) -> + throw err if err + propagateLowerArchives() + catch err + cb(err) info = (path, cb) -> # FIXME: Close this stream? @@ -491,23 +503,32 @@ fetch = (path, from, to, cb) -> # We don't wrap around, can everything in a single read size = toOffset - fromOffset seriesBuffer = new Buffer(size) - fs.read fd, seriesBuffer, 0, size, fromOffset, (err, num) -> - cb(err) if err - fs.close fd, (err) -> + try + fs.read fd, seriesBuffer, 0, size, fromOffset, (err, num) -> cb(err) if err - unpack(seriesBuffer) # We have read it, go unpack! + fs.close fd, (err) -> + cb(err) if err + unpack(seriesBuffer) # We have read it, go unpack! + catch err + cb(err) else # We wrap around the archive, we need two reads archiveEnd = archive.offset + archive.size size1 = archiveEnd - fromOffset size2 = toOffset - archive.offset seriesBuffer = new Buffer(size1 + size2) - fs.read fd, seriesBuffer, 0, size1, fromOffset, (err, num) -> - cb(err) if err - fs.read fd, seriesBuffer, size1, size2, archive.offset, (err, num) -> + try + fs.read fd, seriesBuffer, 0, size1, fromOffset, (err, num) -> cb(err) if err - unpack(seriesBuffer) # We have read it, go unpack! - fs.close(fd) + try + fs.read fd, seriesBuffer, size1, size2, archive.offset, (err, num) -> + cb(err) if err + unpack(seriesBuffer) # We have read it, go unpack! + fs.close(fd) + catch err + cb(err) + catch err + cb(err) unpack = (seriesData) -> # Optmize this? From 989e3c0240e515469a64773260f0157bc0b05104 Mon Sep 17 00:00:00 2001 From: christopher holt Date: Mon, 17 Oct 2011 17:42:15 -0400 Subject: [PATCH 08/14] horrible implementation of last, works but needs optimization. --- src/hoard.coffee | 99 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 98 insertions(+), 1 deletion(-) diff --git a/src/hoard.coffee b/src/hoard.coffee index a67210c..d500616 100644 --- a/src/hoard.coffee +++ b/src/hoard.coffee @@ -454,6 +454,102 @@ info = (path, cb) -> archives: archives return +last = (path, cb) -> + info path, (err, header) -> + now = unixTime() + from = now-1 + to = now + diff = now - from + fd = null + + # Find closest archive to look in, that will contain our information + for archive in header.archives + break if archive.retention >= diff + + fromInterval = parseInt(from - from.mod(archive.secondsPerPoint)) + archive.secondsPerPoint + toInterval = parseInt(to - to.mod(archive.secondsPerPoint)) + archive.secondsPerPoint + + file = fs.createReadStream(path) + + Binary.stream(file) + .skip(archive.offset) + .word32bu('baseInterval') + .word32bu('baseValue') + .tap (vars) -> + if vars.baseInterval == 0 + # Nothing has been written to this hoard + step = archive.secondsPerPoint + points = (toInterval - fromInterval) / step + timeInfo = [fromInterval, toInterval, step] + values = (null for n in [0...points]) + cb(null, timeInfo, values) + else + # We have data in this hoard, let's read it + getOffset = (interval) -> + timeDistance = interval - vars.baseInterval + pointDistance = timeDistance / archive.secondsPerPoint + byteDistance = pointDistance * pointSize + a = archive.offset + byteDistance.mod(archive.size) + a + + fromOffset = getOffset(fromInterval) + toOffset = getOffset(toInterval) + + fs.open path, 'r', (err, fd) -> + if err then throw err + if fromOffset < toOffset + # We don't wrap around, can everything in a single read + size = toOffset - fromOffset + seriesBuffer = new Buffer(size) + try + fs.read fd, seriesBuffer, 0, size, fromOffset, (err, num) -> + cb(err) if err + fs.close fd, (err) -> + cb(err) if err + unpack(seriesBuffer) # We have read it, go unpack! + catch err + cb(err) + else + # We wrap around the archive, we need two reads + archiveEnd = archive.offset + archive.size + size1 = archiveEnd - fromOffset + size2 = toOffset - archive.offset + seriesBuffer = new Buffer(size1 + size2) + try + fs.read fd, seriesBuffer, 0, size1, fromOffset, (err, num) -> + cb(err) if err + try + fs.read fd, seriesBuffer, size1, size2, archive.offset, (err, num) -> + cb(err) if err + unpack(seriesBuffer) # We have read it, go unpack! + fs.close(fd) + catch err + cb(err) + catch err + cb(err) + + unpack = (seriesData) -> + # Optmize this? + numPoints = seriesData.length / pointSize + seriesFormat = "!" + ('Ld' for f in [0...numPoints]).join("") + unpackedSeries = pack.Unpack(seriesFormat, seriesData) + + # Use buffer/pre-allocate? + valueList = (null for f in [0...numPoints]) + currentInterval = fromInterval + step = archive.secondsPerPoint + + for i in [0...unpackedSeries.length] by 2 + pointTime = unpackedSeries[i] + if pointTime == currentInterval + pointValue = unpackedSeries[i + 1] + valueList[i / 2] = pointValue + currentInterval += step + + timeInfo = [fromInterval, toInterval, step] + cb(null, timeInfo, valueList) + return + fetch = (path, from, to, cb) -> info path, (err, header) -> now = unixTime() @@ -464,7 +560,7 @@ fetch = (path, from, to, cb) -> diff = now - from fd = null - # Find closest archive to look in, that iwll contain our information + # Find closest archive to look in, that will contain our information for archive in header.archives break if archive.retention >= diff @@ -556,5 +652,6 @@ exports.create = create exports.update = update exports.updateMany = updateMany exports.info = info +exports.last = last exports.fetch = fetch From ea2181b912cfe7444198ef486510aaa7260094b3 Mon Sep 17 00:00:00 2001 From: christopher holt Date: Mon, 17 Oct 2011 17:49:15 -0400 Subject: [PATCH 09/14] specify time at --- src/hoard.coffee | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/hoard.coffee b/src/hoard.coffee index d500616..983c3b7 100644 --- a/src/hoard.coffee +++ b/src/hoard.coffee @@ -454,12 +454,14 @@ info = (path, cb) -> archives: archives return -last = (path, cb) -> +last = (path, at, cb) -> info path, (err, header) -> now = unixTime() - from = now-1 - to = now - diff = now - from + oldestTime = now - header.maxRetention + throw new Error('Invalid time interval') unless at > oldestTime + from = at-1 + to = at + diff = at - from fd = null # Find closest archive to look in, that will contain our information From 879972d7ff929bab33f82da0f276019447c1f3cd Mon Sep 17 00:00:00 2001 From: christopher holt Date: Mon, 17 Oct 2011 18:12:26 -0400 Subject: [PATCH 10/14] Revert "specify time at" This reverts commit ea2181b912cfe7444198ef486510aaa7260094b3. --- src/hoard.coffee | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/hoard.coffee b/src/hoard.coffee index 983c3b7..d500616 100644 --- a/src/hoard.coffee +++ b/src/hoard.coffee @@ -454,14 +454,12 @@ info = (path, cb) -> archives: archives return -last = (path, at, cb) -> +last = (path, cb) -> info path, (err, header) -> now = unixTime() - oldestTime = now - header.maxRetention - throw new Error('Invalid time interval') unless at > oldestTime - from = at-1 - to = at - diff = at - from + from = now-1 + to = now + diff = now - from fd = null # Find closest archive to look in, that will contain our information From 9c2dfd846a34eb682bdfa2f33b203d43ca4345a0 Mon Sep 17 00:00:00 2001 From: christopher holt Date: Mon, 17 Oct 2011 18:12:53 -0400 Subject: [PATCH 11/14] Revert e773735555d8b5a5afa87d4cf357611517d62a50^..HEAD --- src/hoard.coffee | 416 +++++++++++++++++------------------------------ 1 file changed, 149 insertions(+), 267 deletions(-) diff --git a/src/hoard.coffee b/src/hoard.coffee index d500616..ba8a7c5 100644 --- a/src/hoard.coffee +++ b/src/hoard.coffee @@ -73,52 +73,49 @@ propagate = (fd, timestamp, xff, higher, lower, cb) -> lowerIntervalEnd = lowerIntervalStart + lower.secondsPerPoint packedPoint = new Buffer(pointSize) - try - fs.read fd, packedPoint, 0, pointSize, higher.offset, (err, written, buffer) -> - cb(err) if err - [higherBaseInterval, higherBaseValue] = pack.Unpack(pointFormat, packedPoint) - - if higherBaseInterval == 0 - higherFirstOffset = higher.offset - else - timeDistance = lowerIntervalStart - higherBaseInterval - pointDistance = timeDistance / higher.secondsPerPoint - byteDistance = pointDistance * pointSize - higherFirstOffset = higher.offset + byteDistance.mod(higher.size) - - higherPoints = lower.secondsPerPoint / higher.secondsPerPoint - higherSize = higherPoints * pointSize - relativeFirstOffset = higherFirstOffset - higher.offset - relativeLastOffset = (relativeFirstOffset + higherSize).mod(higher.size) - higherLastOffset = relativeLastOffset + higher.offset - - if higherFirstOffset < higherLastOffset - # We don't wrap the archive - seriesSize = higherLastOffset - higherFirstOffset - seriesString = new Buffer(seriesSize) - - fs.read fd, seriesString, 0, seriesSize, higherFirstOffset, (err, written, buffer) -> - parseSeries(seriesString) - else - # We do wrap the archive - higherEnd = higher.offset + higher.size - firstSeriesSize = higherEnd - higherFirstOffset - secondSeriesSize = higherLastOffset - higher.offset - - seriesString = new Buffer(firstSeriesSize + secondSeriesSize) - - fs.read fd, seriesString, 0, firstSeriesSize, higherFirstOffset, (err, written, buffer) -> - cb(err) if err - if secondSeriesSize > 0 - fs.read fd, seriesString, firstSeriesSize, secondSeriesSize, higher.offset, (err, written, buffer) -> - cb(err) if err - parseSeries(seriesString) - else - ret = new Buffer(firstSeriesSize) - seriesString.copy(ret, 0, 0, firstSeriesSize) - parseSeries(ret) - catch err - cb(err) + fs.read fd, packedPoint, 0, pointSize, higher.offset, (err, written, buffer) -> + cb(err) if err + [higherBaseInterval, higherBaseValue] = pack.Unpack(pointFormat, packedPoint) + + if higherBaseInterval == 0 + higherFirstOffset = higher.offset + else + timeDistance = lowerIntervalStart - higherBaseInterval + pointDistance = timeDistance / higher.secondsPerPoint + byteDistance = pointDistance * pointSize + higherFirstOffset = higher.offset + byteDistance.mod(higher.size) + + higherPoints = lower.secondsPerPoint / higher.secondsPerPoint + higherSize = higherPoints * pointSize + relativeFirstOffset = higherFirstOffset - higher.offset + relativeLastOffset = (relativeFirstOffset + higherSize).mod(higher.size) + higherLastOffset = relativeLastOffset + higher.offset + + if higherFirstOffset < higherLastOffset + # We don't wrap the archive + seriesSize = higherLastOffset - higherFirstOffset + seriesString = new Buffer(seriesSize) + + fs.read fd, seriesString, 0, seriesSize, higherFirstOffset, (err, written, buffer) -> + parseSeries(seriesString) + else + # We do wrap the archive + higherEnd = higher.offset + higher.size + firstSeriesSize = higherEnd - higherFirstOffset + secondSeriesSize = higherLastOffset - higher.offset + + seriesString = new Buffer(firstSeriesSize + secondSeriesSize) + + fs.read fd, seriesString, 0, firstSeriesSize, higherFirstOffset, (err, written, buffer) -> + cb(err) if err + if secondSeriesSize > 0 + fs.read fd, seriesString, firstSeriesSize, secondSeriesSize, higher.offset, (err, written, buffer) -> + cb(err) if err + parseSeries(seriesString) + else + ret = new Buffer(firstSeriesSize) + seriesString.copy(ret, 0, 0, firstSeriesSize) + parseSeries(ret) parseSeries = (seriesString) -> # Now we unpack the series data we just read @@ -161,25 +158,22 @@ propagate = (fd, timestamp, xff, higher, lower, cb) -> # !!!!!!!!!!!!!!!!! packedPoint = new Buffer(pointSize) - try - fs.read fd, packedPoint, 0, pointSize, lower.offset, (err) -> - [lowerBaseInterval, lowerBaseValue] = pack.Unpack(pointFormat, packedPoint) - - if lowerBaseInterval == 0 - # First propagated update to this lower archive - offset = lower.offset - else - # Not our first propagated update to this lower archive - timeDistance = lowerIntervalStart - lowerBaseInterval - pointDistance = timeDistance / lower.secondsPerPoint - byteDistance = pointDistance * pointSize - offset = lower.offset + byteDistance.mod(lower.size) - - mypp = new Buffer(myPackedPoint) - fs.write fd, mypp, 0, pointSize, offset, (err) -> - cb(null, true) - catch err - cb(err) + fs.read fd, packedPoint, 0, pointSize, lower.offset, (err) -> + [lowerBaseInterval, lowerBaseValue] = pack.Unpack(pointFormat, packedPoint) + + if lowerBaseInterval == 0 + # First propagated update to this lower archive + offset = lower.offset + else + # Not our first propagated update to this lower archive + timeDistance = lowerIntervalStart - lowerBaseInterval + pointDistance = timeDistance / lower.secondsPerPoint + byteDistance = pointDistance * pointSize + offset = lower.offset + byteDistance.mod(lower.size) + + mypp = new Buffer(myPackedPoint) + fs.write fd, mypp, 0, pointSize, offset, (err) -> + cb(null, true) else cb(null, false) @@ -241,28 +235,25 @@ update = (filename, value, timestamp, cb) -> else fs.close fd, cb - try - fs.read fd, packedPoint, 0, pointSize, archive.offset, (err, bytesRead, buffer) -> - cb(err) if err - [baseInterval, baseValue] = pack.Unpack(pointFormat, packedPoint) - - if baseInterval == 0 - # This file's first update - fs.write fd, myPackedPoint, 0, pointSize, archive.offset, (err, written, buffer) -> - cb(err) if err - [baseInterval, baseValue] = [myInterval, value] - propagateLowerArchives() - else - # File has been updated before - timeDistance = myInterval - baseInterval - pointDistance = timeDistance / archive.secondsPerPoint - byteDistance = pointDistance * pointSize - myOffset = archive.offset + byteDistance.mod(archive.size) - fs.write fd, myPackedPoint, 0, pointSize, myOffset, (err, written, buffer) -> - cb(err) if err - propagateLowerArchives() - catch err - cb(err) + fs.read fd, packedPoint, 0, pointSize, archive.offset, (err, bytesRead, buffer) -> + cb(err) if err + [baseInterval, baseValue] = pack.Unpack(pointFormat, packedPoint) + + if baseInterval == 0 + # This file's first update + fs.write fd, myPackedPoint, 0, pointSize, archive.offset, (err, written, buffer) -> + cb(err) if err + [baseInterval, baseValue] = [myInterval, value] + propagateLowerArchives() + else + # File has been updated before + timeDistance = myInterval - baseInterval + pointDistance = timeDistance / archive.secondsPerPoint + byteDistance = pointDistance * pointSize + myOffset = archive.offset + byteDistance.mod(archive.size) + fs.write fd, myPackedPoint, 0, pointSize, myOffset, (err, written, buffer) -> + cb(err) if err + propagateLowerArchives() return @@ -352,73 +343,70 @@ updateManyArchive = (fd, header, archive, points, cb) -> # Read base point and determine where our writes will start packedBasePoint = new Buffer(pointSize) - try - fs.read fd, packedBasePoint, 0, pointSize, archive.offset, (err) -> - cb err if err - [baseInterval, baseValue] = pack.Unpack(pointFormat, packedBasePoint) - - if baseInterval == 0 - # This file's first update - # Use our first string as the base, so we start at the start - baseInterval = packedStrings[0][0] - - # Write all of our packed strings in locations determined by the baseInterval - - writePackedString = (ps, callback) -> - [interval, packedString] = ps - timeDistance = interval - baseInterval - pointDistance = timeDistance / step - byteDistance = pointDistance * pointSize - myOffset = archive.offset + byteDistance.mod(archive.size) - archiveEnd = archive.offset + archive.size - bytesBeyond = (myOffset + packedString.length) - archiveEnd - - if bytesBeyond > 0 - fs.write fd, packedString, 0, packedString.length - bytesBeyond, myOffset, (err) -> + fs.read fd, packedBasePoint, 0, pointSize, archive.offset, (err) -> + cb err if err + [baseInterval, baseValue] = pack.Unpack(pointFormat, packedBasePoint) + + if baseInterval == 0 + # This file's first update + # Use our first string as the base, so we start at the start + baseInterval = packedStrings[0][0] + + # Write all of our packed strings in locations determined by the baseInterval + + writePackedString = (ps, callback) -> + [interval, packedString] = ps + timeDistance = interval - baseInterval + pointDistance = timeDistance / step + byteDistance = pointDistance * pointSize + myOffset = archive.offset + byteDistance.mod(archive.size) + archiveEnd = archive.offset + archive.size + bytesBeyond = (myOffset + packedString.length) - archiveEnd + + if bytesBeyond > 0 + fs.write fd, packedString, 0, packedString.length - bytesBeyond, myOffset, (err) -> + cb err if err + assert.equal archiveEnd, myOffset + packedString.length - bytesBeyond + #assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (archiveEnd,fh.tell(),bytesBeyond,len(packedString)) + # Safe because it can't exceed the archive (retention checking logic above) + fs.write fd, packedString, packedString.length - bytesBeyond, bytesBeyond, archive.offset, (err) -> cb err if err - assert.equal archiveEnd, myOffset + packedString.length - bytesBeyond - #assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (archiveEnd,fh.tell(),bytesBeyond,len(packedString)) - # Safe because it can't exceed the archive (retention checking logic above) - fs.write fd, packedString, packedString.length - bytesBeyond, bytesBeyond, archive.offset, (err) -> - cb err if err - callback() - else - fs.write fd, packedString, 0, packedString.length, myOffset, (err) -> callback() - - propagateLowerArchives = -> - # Now we propagate the updates to lower-precision archives - higher = archive - lowerArchives = (arc for arc in header.archives when arc.secondsPerPoint > archive.secondsPerPoint) - - if lowerArchives.length > 0 - # Collect a list of propagation calls to make - # This is easier than doing async looping - propagateCalls = [] - for lower in lowerArchives - fit = (i) -> i - i.mod(lower.secondsPerPoint) - lowerIntervals = (fit(p[0]) for p in alignedPoints) - uniqueLowerIntervals = _.uniq(lowerIntervals) - for interval in uniqueLowerIntervals - propagateCalls.push {interval: interval, header: header, higher: higher, lower: lower} - higher = lower - - callPropagate = (args, callback) -> - propagate fd, args.interval, args.header.xFilesFactor, args.higher, args.lower, (err, result) -> - cb err if err - callback err, result - - async.forEachSeries propagateCalls, callPropagate, (err, result) -> - throw err if err - cb null - else + else + fs.write fd, packedString, 0, packedString.length, myOffset, (err) -> + callback() + + propagateLowerArchives = -> + # Now we propagate the updates to lower-precision archives + higher = archive + lowerArchives = (arc for arc in header.archives when arc.secondsPerPoint > archive.secondsPerPoint) + + if lowerArchives.length > 0 + # Collect a list of propagation calls to make + # This is easier than doing async looping + propagateCalls = [] + for lower in lowerArchives + fit = (i) -> i - i.mod(lower.secondsPerPoint) + lowerIntervals = (fit(p[0]) for p in alignedPoints) + uniqueLowerIntervals = _.uniq(lowerIntervals) + for interval in uniqueLowerIntervals + propagateCalls.push {interval: interval, header: header, higher: higher, lower: lower} + higher = lower + + callPropagate = (args, callback) -> + propagate fd, args.interval, args.header.xFilesFactor, args.higher, args.lower, (err, result) -> + cb err if err + callback err, result + + async.forEachSeries propagateCalls, callPropagate, (err, result) -> + throw err if err cb null - - async.forEachSeries packedStrings, writePackedString, (err) -> - throw err if err - propagateLowerArchives() - catch err - cb(err) + else + cb null + + async.forEachSeries packedStrings, writePackedString, (err) -> + throw err if err + propagateLowerArchives() info = (path, cb) -> # FIXME: Close this stream? @@ -454,102 +442,6 @@ info = (path, cb) -> archives: archives return -last = (path, cb) -> - info path, (err, header) -> - now = unixTime() - from = now-1 - to = now - diff = now - from - fd = null - - # Find closest archive to look in, that will contain our information - for archive in header.archives - break if archive.retention >= diff - - fromInterval = parseInt(from - from.mod(archive.secondsPerPoint)) + archive.secondsPerPoint - toInterval = parseInt(to - to.mod(archive.secondsPerPoint)) + archive.secondsPerPoint - - file = fs.createReadStream(path) - - Binary.stream(file) - .skip(archive.offset) - .word32bu('baseInterval') - .word32bu('baseValue') - .tap (vars) -> - if vars.baseInterval == 0 - # Nothing has been written to this hoard - step = archive.secondsPerPoint - points = (toInterval - fromInterval) / step - timeInfo = [fromInterval, toInterval, step] - values = (null for n in [0...points]) - cb(null, timeInfo, values) - else - # We have data in this hoard, let's read it - getOffset = (interval) -> - timeDistance = interval - vars.baseInterval - pointDistance = timeDistance / archive.secondsPerPoint - byteDistance = pointDistance * pointSize - a = archive.offset + byteDistance.mod(archive.size) - a - - fromOffset = getOffset(fromInterval) - toOffset = getOffset(toInterval) - - fs.open path, 'r', (err, fd) -> - if err then throw err - if fromOffset < toOffset - # We don't wrap around, can everything in a single read - size = toOffset - fromOffset - seriesBuffer = new Buffer(size) - try - fs.read fd, seriesBuffer, 0, size, fromOffset, (err, num) -> - cb(err) if err - fs.close fd, (err) -> - cb(err) if err - unpack(seriesBuffer) # We have read it, go unpack! - catch err - cb(err) - else - # We wrap around the archive, we need two reads - archiveEnd = archive.offset + archive.size - size1 = archiveEnd - fromOffset - size2 = toOffset - archive.offset - seriesBuffer = new Buffer(size1 + size2) - try - fs.read fd, seriesBuffer, 0, size1, fromOffset, (err, num) -> - cb(err) if err - try - fs.read fd, seriesBuffer, size1, size2, archive.offset, (err, num) -> - cb(err) if err - unpack(seriesBuffer) # We have read it, go unpack! - fs.close(fd) - catch err - cb(err) - catch err - cb(err) - - unpack = (seriesData) -> - # Optmize this? - numPoints = seriesData.length / pointSize - seriesFormat = "!" + ('Ld' for f in [0...numPoints]).join("") - unpackedSeries = pack.Unpack(seriesFormat, seriesData) - - # Use buffer/pre-allocate? - valueList = (null for f in [0...numPoints]) - currentInterval = fromInterval - step = archive.secondsPerPoint - - for i in [0...unpackedSeries.length] by 2 - pointTime = unpackedSeries[i] - if pointTime == currentInterval - pointValue = unpackedSeries[i + 1] - valueList[i / 2] = pointValue - currentInterval += step - - timeInfo = [fromInterval, toInterval, step] - cb(null, timeInfo, valueList) - return - fetch = (path, from, to, cb) -> info path, (err, header) -> now = unixTime() @@ -560,7 +452,7 @@ fetch = (path, from, to, cb) -> diff = now - from fd = null - # Find closest archive to look in, that will contain our information + # Find closest archive to look in, that iwll contain our information for archive in header.archives break if archive.retention >= diff @@ -599,32 +491,23 @@ fetch = (path, from, to, cb) -> # We don't wrap around, can everything in a single read size = toOffset - fromOffset seriesBuffer = new Buffer(size) - try - fs.read fd, seriesBuffer, 0, size, fromOffset, (err, num) -> + fs.read fd, seriesBuffer, 0, size, fromOffset, (err, num) -> + cb(err) if err + fs.close fd, (err) -> cb(err) if err - fs.close fd, (err) -> - cb(err) if err - unpack(seriesBuffer) # We have read it, go unpack! - catch err - cb(err) + unpack(seriesBuffer) # We have read it, go unpack! else # We wrap around the archive, we need two reads archiveEnd = archive.offset + archive.size size1 = archiveEnd - fromOffset size2 = toOffset - archive.offset seriesBuffer = new Buffer(size1 + size2) - try - fs.read fd, seriesBuffer, 0, size1, fromOffset, (err, num) -> + fs.read fd, seriesBuffer, 0, size1, fromOffset, (err, num) -> + cb(err) if err + fs.read fd, seriesBuffer, size1, size2, archive.offset, (err, num) -> cb(err) if err - try - fs.read fd, seriesBuffer, size1, size2, archive.offset, (err, num) -> - cb(err) if err - unpack(seriesBuffer) # We have read it, go unpack! - fs.close(fd) - catch err - cb(err) - catch err - cb(err) + unpack(seriesBuffer) # We have read it, go unpack! + fs.close(fd) unpack = (seriesData) -> # Optmize this? @@ -652,6 +535,5 @@ exports.create = create exports.update = update exports.updateMany = updateMany exports.info = info -exports.last = last exports.fetch = fetch From 05171dfaf852ae327dbd80f8370728d14360d84f Mon Sep 17 00:00:00 2001 From: christopher holt Date: Mon, 17 Oct 2011 18:22:59 -0400 Subject: [PATCH 12/14] one too many reverts, added try/catch for error handling out of bounds offsets on fs.read --- src/hoard.coffee | 324 ++++++++++++++++++++++++++--------------------- 1 file changed, 177 insertions(+), 147 deletions(-) diff --git a/src/hoard.coffee b/src/hoard.coffee index ba8a7c5..9fc787a 100644 --- a/src/hoard.coffee +++ b/src/hoard.coffee @@ -73,49 +73,61 @@ propagate = (fd, timestamp, xff, higher, lower, cb) -> lowerIntervalEnd = lowerIntervalStart + lower.secondsPerPoint packedPoint = new Buffer(pointSize) - fs.read fd, packedPoint, 0, pointSize, higher.offset, (err, written, buffer) -> - cb(err) if err - [higherBaseInterval, higherBaseValue] = pack.Unpack(pointFormat, packedPoint) - - if higherBaseInterval == 0 - higherFirstOffset = higher.offset - else - timeDistance = lowerIntervalStart - higherBaseInterval - pointDistance = timeDistance / higher.secondsPerPoint - byteDistance = pointDistance * pointSize - higherFirstOffset = higher.offset + byteDistance.mod(higher.size) - - higherPoints = lower.secondsPerPoint / higher.secondsPerPoint - higherSize = higherPoints * pointSize - relativeFirstOffset = higherFirstOffset - higher.offset - relativeLastOffset = (relativeFirstOffset + higherSize).mod(higher.size) - higherLastOffset = relativeLastOffset + higher.offset - - if higherFirstOffset < higherLastOffset - # We don't wrap the archive - seriesSize = higherLastOffset - higherFirstOffset - seriesString = new Buffer(seriesSize) - - fs.read fd, seriesString, 0, seriesSize, higherFirstOffset, (err, written, buffer) -> - parseSeries(seriesString) - else - # We do wrap the archive - higherEnd = higher.offset + higher.size - firstSeriesSize = higherEnd - higherFirstOffset - secondSeriesSize = higherLastOffset - higher.offset - - seriesString = new Buffer(firstSeriesSize + secondSeriesSize) - - fs.read fd, seriesString, 0, firstSeriesSize, higherFirstOffset, (err, written, buffer) -> - cb(err) if err - if secondSeriesSize > 0 - fs.read fd, seriesString, firstSeriesSize, secondSeriesSize, higher.offset, (err, written, buffer) -> - cb(err) if err + try + fs.read fd, packedPoint, 0, pointSize, higher.offset, (err, written, buffer) -> + cb(err) if err + [higherBaseInterval, higherBaseValue] = pack.Unpack(pointFormat, packedPoint) + + if higherBaseInterval == 0 + higherFirstOffset = higher.offset + else + timeDistance = lowerIntervalStart - higherBaseInterval + pointDistance = timeDistance / higher.secondsPerPoint + byteDistance = pointDistance * pointSize + higherFirstOffset = higher.offset + byteDistance.mod(higher.size) + + higherPoints = lower.secondsPerPoint / higher.secondsPerPoint + higherSize = higherPoints * pointSize + relativeFirstOffset = higherFirstOffset - higher.offset + relativeLastOffset = (relativeFirstOffset + higherSize).mod(higher.size) + higherLastOffset = relativeLastOffset + higher.offset + + if higherFirstOffset < higherLastOffset + # We don't wrap the archive + seriesSize = higherLastOffset - higherFirstOffset + seriesString = new Buffer(seriesSize) + + try + fs.read fd, seriesString, 0, seriesSize, higherFirstOffset, (err, written, buffer) -> parseSeries(seriesString) - else - ret = new Buffer(firstSeriesSize) - seriesString.copy(ret, 0, 0, firstSeriesSize) - parseSeries(ret) + catch err + cb(err) + else + # We do wrap the archive + higherEnd = higher.offset + higher.size + firstSeriesSize = higherEnd - higherFirstOffset + secondSeriesSize = higherLastOffset - higher.offset + + seriesString = new Buffer(firstSeriesSize + secondSeriesSize) + + try + fs.read fd, seriesString, 0, firstSeriesSize, higherFirstOffset, (err, written, buffer) -> + cb(err) if err + if secondSeriesSize > 0 + try + fs.read fd, seriesString, firstSeriesSize, secondSeriesSize, higher.offset, (err, written, buffer) -> + cb(err) if err + parseSeries(seriesString) + catch err + cb(err) + else + ret = new Buffer(firstSeriesSize) + seriesString.copy(ret, 0, 0, firstSeriesSize) + parseSeries(ret) + catch err + cb(err) + catch err + cb(err) parseSeries = (seriesString) -> # Now we unpack the series data we just read @@ -158,22 +170,25 @@ propagate = (fd, timestamp, xff, higher, lower, cb) -> # !!!!!!!!!!!!!!!!! packedPoint = new Buffer(pointSize) - fs.read fd, packedPoint, 0, pointSize, lower.offset, (err) -> - [lowerBaseInterval, lowerBaseValue] = pack.Unpack(pointFormat, packedPoint) - - if lowerBaseInterval == 0 - # First propagated update to this lower archive - offset = lower.offset - else - # Not our first propagated update to this lower archive - timeDistance = lowerIntervalStart - lowerBaseInterval - pointDistance = timeDistance / lower.secondsPerPoint - byteDistance = pointDistance * pointSize - offset = lower.offset + byteDistance.mod(lower.size) - - mypp = new Buffer(myPackedPoint) - fs.write fd, mypp, 0, pointSize, offset, (err) -> - cb(null, true) + try + fs.read fd, packedPoint, 0, pointSize, lower.offset, (err) -> + [lowerBaseInterval, lowerBaseValue] = pack.Unpack(pointFormat, packedPoint) + + if lowerBaseInterval == 0 + # First propagated update to this lower archive + offset = lower.offset + else + # Not our first propagated update to this lower archive + timeDistance = lowerIntervalStart - lowerBaseInterval + pointDistance = timeDistance / lower.secondsPerPoint + byteDistance = pointDistance * pointSize + offset = lower.offset + byteDistance.mod(lower.size) + + mypp = new Buffer(myPackedPoint) + fs.write fd, mypp, 0, pointSize, offset, (err) -> + cb(null, true) + catch err + cb(err) else cb(null, false) @@ -235,25 +250,28 @@ update = (filename, value, timestamp, cb) -> else fs.close fd, cb - fs.read fd, packedPoint, 0, pointSize, archive.offset, (err, bytesRead, buffer) -> - cb(err) if err - [baseInterval, baseValue] = pack.Unpack(pointFormat, packedPoint) - - if baseInterval == 0 - # This file's first update - fs.write fd, myPackedPoint, 0, pointSize, archive.offset, (err, written, buffer) -> - cb(err) if err - [baseInterval, baseValue] = [myInterval, value] - propagateLowerArchives() - else - # File has been updated before - timeDistance = myInterval - baseInterval - pointDistance = timeDistance / archive.secondsPerPoint - byteDistance = pointDistance * pointSize - myOffset = archive.offset + byteDistance.mod(archive.size) - fs.write fd, myPackedPoint, 0, pointSize, myOffset, (err, written, buffer) -> - cb(err) if err - propagateLowerArchives() + try + fs.read fd, packedPoint, 0, pointSize, archive.offset, (err, bytesRead, buffer) -> + cb(err) if err + [baseInterval, baseValue] = pack.Unpack(pointFormat, packedPoint) + + if baseInterval == 0 + # This file's first update + fs.write fd, myPackedPoint, 0, pointSize, archive.offset, (err, written, buffer) -> + cb(err) if err + [baseInterval, baseValue] = [myInterval, value] + propagateLowerArchives() + else + # File has been updated before + timeDistance = myInterval - baseInterval + pointDistance = timeDistance / archive.secondsPerPoint + byteDistance = pointDistance * pointSize + myOffset = archive.offset + byteDistance.mod(archive.size) + fs.write fd, myPackedPoint, 0, pointSize, myOffset, (err, written, buffer) -> + cb(err) if err + propagateLowerArchives() + catch err + cb(err) return @@ -343,70 +361,73 @@ updateManyArchive = (fd, header, archive, points, cb) -> # Read base point and determine where our writes will start packedBasePoint = new Buffer(pointSize) - fs.read fd, packedBasePoint, 0, pointSize, archive.offset, (err) -> - cb err if err - [baseInterval, baseValue] = pack.Unpack(pointFormat, packedBasePoint) - - if baseInterval == 0 - # This file's first update - # Use our first string as the base, so we start at the start - baseInterval = packedStrings[0][0] - - # Write all of our packed strings in locations determined by the baseInterval - - writePackedString = (ps, callback) -> - [interval, packedString] = ps - timeDistance = interval - baseInterval - pointDistance = timeDistance / step - byteDistance = pointDistance * pointSize - myOffset = archive.offset + byteDistance.mod(archive.size) - archiveEnd = archive.offset + archive.size - bytesBeyond = (myOffset + packedString.length) - archiveEnd - - if bytesBeyond > 0 - fs.write fd, packedString, 0, packedString.length - bytesBeyond, myOffset, (err) -> - cb err if err - assert.equal archiveEnd, myOffset + packedString.length - bytesBeyond - #assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (archiveEnd,fh.tell(),bytesBeyond,len(packedString)) - # Safe because it can't exceed the archive (retention checking logic above) - fs.write fd, packedString, packedString.length - bytesBeyond, bytesBeyond, archive.offset, (err) -> + try + fs.read fd, packedBasePoint, 0, pointSize, archive.offset, (err) -> + cb err if err + [baseInterval, baseValue] = pack.Unpack(pointFormat, packedBasePoint) + + if baseInterval == 0 + # This file's first update + # Use our first string as the base, so we start at the start + baseInterval = packedStrings[0][0] + + # Write all of our packed strings in locations determined by the baseInterval + + writePackedString = (ps, callback) -> + [interval, packedString] = ps + timeDistance = interval - baseInterval + pointDistance = timeDistance / step + byteDistance = pointDistance * pointSize + myOffset = archive.offset + byteDistance.mod(archive.size) + archiveEnd = archive.offset + archive.size + bytesBeyond = (myOffset + packedString.length) - archiveEnd + + if bytesBeyond > 0 + fs.write fd, packedString, 0, packedString.length - bytesBeyond, myOffset, (err) -> cb err if err + assert.equal archiveEnd, myOffset + packedString.length - bytesBeyond + #assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (archiveEnd,fh.tell(),bytesBeyond,len(packedString)) + # Safe because it can't exceed the archive (retention checking logic above) + fs.write fd, packedString, packedString.length - bytesBeyond, bytesBeyond, archive.offset, (err) -> + cb err if err + callback() + else + fs.write fd, packedString, 0, packedString.length, myOffset, (err) -> callback() - else - fs.write fd, packedString, 0, packedString.length, myOffset, (err) -> - callback() - - propagateLowerArchives = -> - # Now we propagate the updates to lower-precision archives - higher = archive - lowerArchives = (arc for arc in header.archives when arc.secondsPerPoint > archive.secondsPerPoint) - - if lowerArchives.length > 0 - # Collect a list of propagation calls to make - # This is easier than doing async looping - propagateCalls = [] - for lower in lowerArchives - fit = (i) -> i - i.mod(lower.secondsPerPoint) - lowerIntervals = (fit(p[0]) for p in alignedPoints) - uniqueLowerIntervals = _.uniq(lowerIntervals) - for interval in uniqueLowerIntervals - propagateCalls.push {interval: interval, header: header, higher: higher, lower: lower} - higher = lower - - callPropagate = (args, callback) -> - propagate fd, args.interval, args.header.xFilesFactor, args.higher, args.lower, (err, result) -> - cb err if err - callback err, result - - async.forEachSeries propagateCalls, callPropagate, (err, result) -> - throw err if err + + propagateLowerArchives = -> + # Now we propagate the updates to lower-precision archives + higher = archive + lowerArchives = (arc for arc in header.archives when arc.secondsPerPoint > archive.secondsPerPoint) + + if lowerArchives.length > 0 + # Collect a list of propagation calls to make + # This is easier than doing async looping + propagateCalls = [] + for lower in lowerArchives + fit = (i) -> i - i.mod(lower.secondsPerPoint) + lowerIntervals = (fit(p[0]) for p in alignedPoints) + uniqueLowerIntervals = _.uniq(lowerIntervals) + for interval in uniqueLowerIntervals + propagateCalls.push {interval: interval, header: header, higher: higher, lower: lower} + higher = lower + + callPropagate = (args, callback) -> + propagate fd, args.interval, args.header.xFilesFactor, args.higher, args.lower, (err, result) -> + cb err if err + callback err, result + + async.forEachSeries propagateCalls, callPropagate, (err, result) -> + throw err if err + cb null + else cb null - else - cb null - - async.forEachSeries packedStrings, writePackedString, (err) -> - throw err if err - propagateLowerArchives() + + async.forEachSeries packedStrings, writePackedString, (err) -> + throw err if err + propagateLowerArchives() + catch err + cb(err) info = (path, cb) -> # FIXME: Close this stream? @@ -491,23 +512,32 @@ fetch = (path, from, to, cb) -> # We don't wrap around, can everything in a single read size = toOffset - fromOffset seriesBuffer = new Buffer(size) - fs.read fd, seriesBuffer, 0, size, fromOffset, (err, num) -> - cb(err) if err - fs.close fd, (err) -> + try + fs.read fd, seriesBuffer, 0, size, fromOffset, (err, num) -> cb(err) if err - unpack(seriesBuffer) # We have read it, go unpack! + fs.close fd, (err) -> + cb(err) if err + unpack(seriesBuffer) # We have read it, go unpack! + catch err + cb(err) else # We wrap around the archive, we need two reads archiveEnd = archive.offset + archive.size size1 = archiveEnd - fromOffset size2 = toOffset - archive.offset seriesBuffer = new Buffer(size1 + size2) - fs.read fd, seriesBuffer, 0, size1, fromOffset, (err, num) -> - cb(err) if err - fs.read fd, seriesBuffer, size1, size2, archive.offset, (err, num) -> + try + fs.read fd, seriesBuffer, 0, size1, fromOffset, (err, num) -> cb(err) if err - unpack(seriesBuffer) # We have read it, go unpack! - fs.close(fd) + try + fs.read fd, seriesBuffer, size1, size2, archive.offset, (err, num) -> + cb(err) if err + unpack(seriesBuffer) # We have read it, go unpack! + fs.close(fd) + catch err + cb(err) + catch err + cb(err) unpack = (seriesData) -> # Optmize this? From c39b7a46081c86bff2f5c59f219d97c4b27a653b Mon Sep 17 00:00:00 2001 From: christopher holt Date: Fri, 21 Oct 2011 17:18:53 -0400 Subject: [PATCH 13/14] fix sort method, was not setting maxRetention properly --- src/hoard.coffee | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hoard.coffee b/src/hoard.coffee index 9fc787a..e7a675f 100644 --- a/src/hoard.coffee +++ b/src/hoard.coffee @@ -35,7 +35,7 @@ create = (filename, archives, xFilesFactor, cb) -> if path.existsSync(filename) cb new Error('File ' + filename + ' already exists') - oldest = (a[0] * a[1] for a in archives).sort().reverse()[0] + oldest = (a[0] * a[1] for a in archives).sort((a) -> Number(a))[0] encodeFloat = (value) -> # Dirty hack. From 060a54eec7d33ce63eee3ff6f49c188b72cc280d Mon Sep 17 00:00:00 2001 From: christopher holt Date: Mon, 24 Oct 2011 15:58:51 -0400 Subject: [PATCH 14/14] pass errors back through to callback --- src/hoard.coffee | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/hoard.coffee b/src/hoard.coffee index e7a675f..d38acac 100644 --- a/src/hoard.coffee +++ b/src/hoard.coffee @@ -245,7 +245,7 @@ update = (filename, value, timestamp, cb) -> callback err, result async.forEachSeries propagateCalls, callPropagate, (err, result) -> - throw err if err + cb err if err fs.close fd, cb else fs.close fd, cb @@ -314,12 +314,12 @@ updateMany = (filename, points, cb) -> currentPoints.push(point) async.series updateArchiveCalls, (err, results) -> - throw err if err + cb err if err if currentArchive and currentPoints.length > 0 # Don't forget to commit after we've checked all the archives currentPoints.reverse() updateManyArchive fd, header, currentArchive, currentPoints, (err) -> - throw err if err + cb err if err fs.close fd, cb else fs.close fd, cb @@ -418,13 +418,13 @@ updateManyArchive = (fd, header, archive, points, cb) -> callback err, result async.forEachSeries propagateCalls, callPropagate, (err, result) -> - throw err if err + cb err if err cb null else cb null async.forEachSeries packedStrings, writePackedString, (err) -> - throw err if err + cb err if err propagateLowerArchives() catch err cb(err) @@ -468,7 +468,7 @@ fetch = (path, from, to, cb) -> now = unixTime() oldestTime = now - header.maxRetention from = oldestTime if from < oldestTime - throw new Error('Invalid time interval') unless from < to + cb(new Error('Invalid time interval')) unless from < to to = now if to > now or to < from diff = now - from fd = null @@ -507,7 +507,7 @@ fetch = (path, from, to, cb) -> toOffset = getOffset(toInterval) fs.open path, 'r', (err, fd) -> - if err then throw err + cb err if err if fromOffset < toOffset # We don't wrap around, can everything in a single read size = toOffset - fromOffset