-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathstream.class.js
144 lines (118 loc) · 3.59 KB
/
stream.class.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
var Duplex = require('stream').Duplex;
var util = require("util");
var lob = require('lob-enc');
var log = require("../lib/util/log")("Stream")
module.exports = ChannelStream
/** ChannelStream impliments a Duplex stream API over Telehash channels.
* for Duplex stream usage, consult core node docs. for an idea of how you might
* expand upon streams within the Telehash ecosystem, see thtp
* @class ChannelStream
* @constructor
* @param {Channel} channel - a Telehash channel (generated by e3x)
* @param {stringa} encoding - 'binary' or 'json'
* @return {Stream}
*/
function ChannelStream(chan, encoding){
if(!encoding) encoding = 'binary';
if(typeof chan != 'object' || !chan.isChannel)
{
log.warn('invalid channel passed to streamize');
return false;
}
var allowHalfOpen = (chan.type === "thtp") ? true : false;
Duplex.call(this,{allowHalfOpen: allowHalfOpen, objectMode:true})
this.on('finish',function(){
chan.send({json:{end:true}});
});
this.on('error',function(err){
if(err == chan.err) return; // ignore our own generated errors
chan.send({json:{err:err.toString()}});
});
var stream = this
this.on('pipe', function(from){
from.on('end',function(){
stream.end()
})
})
chan.receiving = chan_to_stream(this);
this._chan = chan;
this._encoding = encoding;
return this;
}
util.inherits(ChannelStream, Duplex)
ChannelStream.prototype._read = function(size){
if(this._getNextPacket) this._getNextPacket();
this._getNextPacket = false;
};
ChannelStream.prototype._write = function(data,enc,cbWrite)
{
cbWrite = cbWrite || function(){};
if(this._chan.state == 'gone') return cbWrite('closed');
// switch to our default encoding syntax
// dynamically detect object streams and change encoding
if(!Buffer.isBuffer(data) && typeof data != 'string')
{
data = JSON.stringify(data);
this._encoding = 'json';
}
// fragment it
while(data.length)
{
var frag = data.slice(0,1000);
data = data.slice(1000);
var packet = {json:{},body:frag};
// last packet gets continuation callback
if(!data.length)
{
if(enc != 'binary') packet.json.enc = this._encoding;
packet.callback = cbWrite;
}else{
packet.json.frag = true;
}
this._chan.send(packet);
}
}
function chan_to_stream (stream){
var data = new Buffer(0);
return function receiving(err, packet, getNextPacket) {
// was a wait writing, let it through
if(err)
stream.emit('error',err);
if(packet.body.length || data.length)
{
data = Buffer.concat([data,packet.body]);
if(!packet.json.frag)
{
var body = data;
data = new Buffer(0);
if(packet.json.enc == 'json') try{
body = JSON.parse(body)
}catch(E){
low.warn('stream json frag parse error',E,body.toString());
err = E;
}
if(packet.json.enc == 'lob')
{
var packet = lob.decode(body);
if(!packet)
{
log.warn('stream lob frag decode error',body.toString('hex'));
err = 'lob decode failed';
}else{
body = packet;
}
}
// stream consumer is not ready for another packet yet, so hold on
// before getting more to send to readable...
if(!err && !stream.push(body))
stream._getNextPacket = getNextPacket;
}
}
//the packet has been read by stream consumer, so get the next one
if(!stream._getNextPacket)
getNextPacket();
//close the stream if this is the last packet
if(packet.json.end)
stream.push(null);
};
}