-
Notifications
You must be signed in to change notification settings - Fork 2
/
trade_history.coffee
363 lines (268 loc) · 11.2 KB
/
trade_history.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
require './shared'
exchange = require './exchange'
progress_bar = require('progress')
fs = require('fs')
module.exports = history =
trades: []
load_price_data: ({start, end, callback}) ->
granularity = config.price_data_granularity or 86400
fs.mkdirSync "price_data/" if !fs.existsSync("price_data/")
fs.mkdirSync "price_data/#{config.exchange}" if !fs.existsSync("price_data/#{config.exchange}")
done = =>
# now we're going to verify the integrity of the price data
price_data = fetch 'price_data'
patch = (idx, pair, min_date) ->
console.log "Patching #{pair} candle at index #{idx} of #{price_data[pair].length}" if config.log_level > 1
copy_from_idx = if idx == 0 then 0 else idx - 1
cpy = bus.clone(price_data[pair][copy_from_idx])
if idx == 0
cpy.date = min_date
else
cpy.date += granularity
price_data[pair].splice idx, 0, cpy
# first we'll look to see if there are the same number of candles for each currency pair
if price_data.c1 && price_data.c2
console.assert config.accounting_currency != config.c1
equal_lengths = ->
lengths = {}
for k,v of price_data when k != 'key' && k != 'granularity'
lengths[k] = v.length
equal = true
for k,length of lengths
equal &= length == lengths.c1xc2
equal
if !equal_lengths()
console.log "Price data needs patching" if config.log_level > 1
i = 0
cnt = 0
while true
c1_date = price_data.c1[i]?.date
c2_date = price_data.c2[i]?.date
cx_date = price_data.c1xc2[i]?.date
break if !c1_date && !c2_date && !cx_date
patch_needed = !(c1_date == c2_date == cx_date)
if i == 0 && patch_needed
console.error "price data uneven at start. forward patching." if config.log_level > 1
if c1_date > Math.min(c2_date, cx_date)
patch i, 'c1', Math.min(c2_date, cx_date)
if c2_date > Math.min(c1_date, cx_date)
patch i, 'c2', Math.min(c1_date, cx_date)
if cx_date > Math.min(c1_date, c2_date)
patch i, 'c1xc2', Math.min(c1_date, c2_date)
if !patch_needed
i += 1
break if cnt > price_data.c1.length + price_data.c2.length + price_data.c1xc2.length # there is a bug where infinite loop possible
cnt += 1
console.error equal_lengths(),
msg: "patching price data failed"
c1_len: price_data.c1?.length
c2_len: price_data.c2?.length
c1xc2_len: price_data.c1xc2?.length
# price_data: price_data
# second we'll look to see if the candles are equally spaced by the desired granularity
for pair, data of price_data when pair != 'key' && pair != 'granularity'
for candle, idx in data
continue if idx == 0
console.assert candle.date != data[idx - 1].date, {
idx, cur: candle, prev: data[idx-1], cur2: data[idx]
}
if candle.date != data[idx - 1].date + granularity
console.log "#{pair} has a missing candle at #{idx}" if config.log_level > 1
callback()
load_history = =>
@load_chart_history "c1xc2", config.c1, config.c2, start, end, (error) =>
if !error && config.accounting_currency not in [config.c1, config.c2]
@load_chart_history "c1", config.accounting_currency, config.c1, start, end, (error) =>
return done() if error
@load_chart_history "c2", config.accounting_currency, config.c2, start, end, done
else
done()
load_history()
load_chart_history: (name, c1, c2, start, end, cb) ->
console.assert c1 != c2, {message: 'why you loading price history for the same coin traded off itself?', c1: c1, c2: c2}
price_data = fetch('price_data')
granularity = config.price_data_granularity or 86400
fname = "price_data/#{config.exchange}/#{c1}-#{c2}-#{start}-#{end}-#{granularity}"
attempts = 0
store_chart = (price_history) ->
price_data[name] = price_history
price_data.granularity = granularity
bus.save price_data
cb()
if fs.existsSync fname
fs.accessSync fname
from_file = fs.readFileSync(fname, 'utf8')
store_chart JSON.parse from_file
else
return cb() if config.offline
load_chart = ->
exchange.get_chart_data
start: start
end: end
c1: c1
c2: c2
granularity: granularity
, data_callback
data_callback = (price_history) =>
price_history ||= []
try
price_history.sort (a,b) -> a.date - b.date
catch err
console.assert false, {err, price_history}
error = price_history.length == 0 || price_history[0].date > start || price_history[price_history.length - 1].date + granularity < end
if error
console.log "#{config.exchange} returned bad price history for #{c1}-#{c2}." if config.log_level > 1
if price_history?.length > 0 && price_history[0].date > 0 && price_history[price_history.length - 1].date > 0
if config.log_level > 2 || attempts > 10
console.log
first: price_history[0].date
target_start: start
last: price_history[price_history.length - 1].date + granularity
target_end: end
first_entry: price_history[0]
last_entry: price_history[price_history.length - 1]
if !config.simulation
console.log 'not retrying.' if config.log_level > 1
cb true
else
console.log "Trying again." if config.log_level > 1 || attempts > 10
attempts += 1
setTimeout load_chart, attempts * 100
else
if !fs.existsSync fname
fs.writeFileSync fname, JSON.stringify(price_history), 'utf8'
store_chart(price_history)
load_chart()
load: (start, end, complete) ->
@trades = []
if !fs.existsSync('trade_history')
fs.mkdirSync "trade_history"
if !fs.existsSync("trade_history/#{config.exchange}")
fs.mkdirSync "trade_history/#{config.exchange}"
if !fs.existsSync("trade_history/#{config.exchange}/#{config.c1}_#{config.c2}")
fs.mkdirSync "trade_history/#{config.exchange}/#{config.c1}_#{config.c2}"
first_hour = Math.floor start / 60 / 60
last_hour = Math.ceil end / 60 / 60
if config.log_level > 1
bar = new progress_bar ' loading history [:bar] :percent :etas :elapsed',
complete: '='
incomplete: ' '
width: 40
total: last_hour - first_hour
to_run = []
for hour in [first_hour..last_hour]
func = @load_hour hour, end
if func && !config.history_loaded
to_run.push func
else
bar.tick() if config.log_level > 1
is_done = =>
bar.tick() if config.log_level > 1
if to_run.length == 0
@trades.sort (a,b) -> b.date - a.date
console.log "Done loading history! #{@trades.length} trades." if config.log_level > 1
# duplicate detection
if false
p = null
for t in @trades
if p && p.date == t.date && p.rate == t.rate && t.amount == p.amount && t.total == p.total && t.tradeID == p.tradeID
console.log "DUPLICATES: ", p, t
p = t
if false # save all trades onto statebus for visualization. usually too big!
x =
key: 'all_trades'
trades: @trades
bus.save x
complete()
else
setImmediate ->
f = to_run.pop()
f is_done
is_done()
set_longest_requested_history: (max_length) ->
# at least 10 minute frames, otherwise
# problems caused by not having enough trades
@longest_requested_history = Math.max max_length, 0 #10 * 60
latest: -> @trades[@trades.length - 1]
# remove trades from history that will never be used by any strategy
prune: (start_idx) ->
console.assert @longest_requested_history, {message: 'requested history not set'}
history_width = @longest_requested_history
start_idx ||= 0
previous_len = @trades.length
for idx in [[email protected] - 1]
if tick.time - @trades[idx].date > history_width
@trades = @trades.slice 0, idx
break
previous_len - @trades.length
disconnect_from_exchange_feed: ->
if @ws_trades
@ws_trades.destroy()
@ws_trades = null
subscribe_to_exchange_feed: (callback) ->
return callback?() if config.disabled
@disconnect_from_exchange_feed()
exchange.subscribe_to_exchange_feed
new_trade_callback: (new_trade) =>
@last_trade = @process_new_trade new_trade, true
, (conn) =>
@ws_trades = conn
callback?()
load_from_file: (fname, hour, end) ->
from_file = fs.readFileSync(fname, 'utf8')
trades = JSON.parse from_file
for trade in trades when trade.date <= end
@trades.push trade
load_hour: (hour, end) ->
fname = "trade_history/#{config.exchange}/#{config.c1}_#{config.c2}/#{hour}"
if fs.existsSync fname
@load_from_file fname, hour, end
return null
else
if !config.offline
f = (complete) =>
if fs.existsSync fname
@load_from_file fname, hour, end
complete()
else
exchange.get_trade_history
c1: config.c1
c2: config.c2
start: hour * 60 * 60
end: (hour + 1) * 60 * 60 - 1
, (trades) =>
console.assert trades, {message: "ABORT: #{config.exchange} returned empty trade history"}
for trade in trades
@process_new_trade trade
if hour + 1 <= now() / 60 / 60 && !fs.existsSync(fname) # && trades.length > 0
fs.writeFileSync fname, JSON.stringify(trades), 'utf8'
#console.log "...loaded hour #{hour} with #{trades.length} trades from #{config.exchange}. Saved to #{fname}"
complete()
return f
process_new_trade: (trade, preserve_sort) ->
trade.rate = parseFloat trade.rate
trade.amount = parseFloat trade.amount
trade.total = parseFloat trade.total
if preserve_sort
@trades.unshift trade
else
@trades.push trade # much much much faster
if preserve_sort
lag_logger?(now() - trade.date)
trade
# Advance the current time pointer in history relative to time. @trades is sorted newest first
trade_idx: null
advance: (time) ->
if @trade_idx == null
@trade_idx = @trades.length - 1
end_idx = @trade_idx
# zzz = Date.now()
while end_idx >= 0
if @trades[end_idx].date > time || end_idx < 0
end_idx += 1
break
end_idx -= 1
# t_.x += Date.now() - zzz
console.assert end_idx < history.trades.length, {message: 'ending wrong!', end_idx: end_idx, trades: history.trades.length}
@last_trade = history.trades[end_idx]
@trade_idx = end_idx