This repository was archived by the owner on Aug 16, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathHTTPC.jl
739 lines (556 loc) · 19.7 KB
/
HTTPC.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
module HTTPC
using Compat
using LibCURL
using LibCURL.Mime_ext
import Base.convert, Base.show, Base.get, Base.trace
export init, cleanup, get, put, post, trace, delete, head, options
export RequestOptions, Response
def_rto = 0.0
##############################
# Struct definitions
##############################
type RequestOptions
blocking::Bool
query_params::Vector{Tuple}
request_timeout::Float64
callback::Union{Function,Bool}
content_type::AbstractString
headers::Vector{Tuple}
ostream::Union{IO, AbstractString, Void}
auto_content_type::Bool
RequestOptions(; blocking=true, query_params=Array(Tuple,0), request_timeout=def_rto, callback=null_cb, content_type="", headers=Array(Tuple,0), ostream=nothing, auto_content_type=true) =
new(blocking, query_params, request_timeout, callback, content_type, headers, ostream, auto_content_type)
end
type Response
body
headers :: Dict{AbstractString, Vector{AbstractString}}
http_code
total_time
bytes_recd::Integer
Response() = new(nothing, Dict{AbstractString, Vector{AbstractString}}(), 0, 0.0, 0)
end
function show(io::IO, o::Response)
println(io, "HTTP Code :", o.http_code)
println(io, "RequestTime :", o.total_time)
println(io, "Headers :")
for (k,vs) in o.headers
for v in vs
println(io, " $k : $v")
end
end
println(io, "Length of body : ", o.bytes_recd)
end
type ReadData
typ::Symbol
src::Any
str::AbstractString
offset::Csize_t
sz::Csize_t
ReadData() = new(:undefined, false, "", 0, 0)
end
type ConnContext
curl::Ptr{CURL}
url::AbstractString
slist::Ptr{Void}
rd::ReadData
resp::Response
options::RequestOptions
close_ostream::Bool
ConnContext(options::RequestOptions) = new(C_NULL, "", C_NULL, ReadData(), Response(), options, false)
end
immutable CURLMsg2
msg::CURLMSG
easy_handle::Ptr{CURL}
data::Ptr{Any}
end
type MultiCtxt
s::curl_socket_t # Socket
chk_read::Bool
chk_write::Bool
timeout::Float64
MultiCtxt() = new(0,false,false,0.0)
end
##############################
# Callbacks
##############################
function write_cb(buff::Ptr{UInt8}, sz::Csize_t, n::Csize_t, p_ctxt::Ptr{Void})
# println("@write_cb")
ctxt = unsafe_pointer_to_objref(p_ctxt)
nbytes = sz * n
unsafe_write(ctxt.resp.body, buff, nbytes)
ctxt.resp.bytes_recd = ctxt.resp.bytes_recd + nbytes
nbytes::Csize_t
end
c_write_cb = cfunction(write_cb, Csize_t, (Ptr{UInt8}, Csize_t, Csize_t, Ptr{Void}))
function header_cb(buff::Ptr{UInt8}, sz::Csize_t, n::Csize_t, p_ctxt::Ptr{Void})
# println("@header_cb")
ctxt = unsafe_pointer_to_objref(p_ctxt)
hdrlines = split(unsafe_string(buff, convert(Int, sz * n)), "\r\n")
# println(hdrlines)
for e in hdrlines
m = match(r"^\s*([\w\-\_]+)\s*\:(.+)", e)
if (m != nothing)
k = strip(m.captures[1])
v = strip(m.captures[2])
if haskey(ctxt.resp.headers, k)
push!(ctxt.resp.headers[k], v)
else
ctxt.resp.headers[k] = (AbstractString)[v]
end
end
end
(sz*n)::Csize_t
end
c_header_cb = cfunction(header_cb, Csize_t, (Ptr{UInt8}, Csize_t, Csize_t, Ptr{Void}))
function curl_read_cb(out::Ptr{Void}, s::Csize_t, n::Csize_t, p_ctxt::Ptr{Void})
# println("@curl_read_cb")
ctxt = unsafe_pointer_to_objref(p_ctxt)
bavail::Csize_t = s * n
breq::Csize_t = ctxt.rd.sz - ctxt.rd.offset
b2copy = bavail > breq ? breq : bavail
if (ctxt.rd.typ == :buffer)
ccall(:memcpy, Ptr{Void}, (Ptr{Void}, Ptr{Void}, UInt64),
out, convert(Ptr{UInt8}, pointer(ctxt.rd.str)) + ctxt.rd.offset, b2copy)
elseif (ctxt.rd.typ == :io)
b_read = read(ctxt.rd.src, UInt8, b2copy)
ccall(:memcpy, Ptr{Void}, (Ptr{Void}, Ptr{Void}, UInt64), out, b_read, b2copy)
end
ctxt.rd.offset = ctxt.rd.offset + b2copy
r = convert(Csize_t, b2copy)
r::Csize_t
end
c_curl_read_cb = cfunction(curl_read_cb, Csize_t, (Ptr{Void}, Csize_t, Csize_t, Ptr{Void}))
function curl_socket_cb(curl::Ptr{Void}, s::Cint, action::Cint, p_muctxt::Ptr{Void}, sctxt::Ptr{Void})
if action != CURL_POLL_REMOVE
muctxt = unsafe_pointer_to_objref(p_muctxt)
muctxt.s = s
muctxt.chk_read = false
muctxt.chk_write = false
if action == CURL_POLL_IN
muctxt.chk_read = true
elseif action == CURL_POLL_OUT
muctxt.chk_write = true
elseif action == CURL_POLL_INOUT
muctxt.chk_read = true
muctxt.chk_write = true
end
end
# NOTE: Out-of-order socket fds cause problems in the case of HTTP redirects, hence ignoring CURL_POLL_REMOVE
ret = convert(Cint, 0)
ret::Cint
end
c_curl_socket_cb = cfunction(curl_socket_cb, Cint, (Ptr{Void}, Cint, Cint, Ptr{Void}, Ptr{Void}))
function curl_multi_timer_cb(curlm::Ptr{Void}, timeout_ms::Clong, p_muctxt::Ptr{Void})
muctxt = unsafe_pointer_to_objref(p_muctxt)
muctxt.timeout = timeout_ms / 1000.0
# println("Requested timeout value : " * string(muctxt.timeout))
ret = convert(Cint, 0)
ret::Cint
end
c_curl_multi_timer_cb = cfunction(curl_multi_timer_cb, Cint, (Ptr{Void}, Clong, Ptr{Void}))
##############################
# Utility functions
##############################
macro ce_curl(f, args...)
quote
cc = CURLE_OK
cc = $(esc(f))(ctxt.curl, $(args...))
if(cc != CURLE_OK)
error(string($f) * "() failed: " * unsafe_string(curl_easy_strerror(cc)))
end
end
end
macro ce_curlm(f, args...)
quote
cc = CURLM_OK
cc = $(esc(f))(curlm, $(args...))
if(cc != CURLM_OK)
error(string($f) * "() failed: " * unsafe_string(curl_multi_strerror(cc)))
end
end
end
null_cb(curl) = return nothing
function set_opt_blocking(options::RequestOptions)
o2 = RequestOptions()
for n in filter(x -> !(x in [:ostream, :blocking]),fieldnames(o2))
setfield!(o2, n, deepcopy(getfield(options, n)))
end
o2.blocking = true
o2.ostream = options.ostream
return o2
end
function get_ct_from_ext(filename)
fparts = split(basename(filename), ".")
if (length(fparts) > 1)
if haskey(MimeExt, fparts[end]) return MimeExt[fparts[end]] end
end
return false
end
function setup_easy_handle(url, options::RequestOptions)
ctxt = ConnContext(options)
curl = curl_easy_init()
if (curl == C_NULL) throw("curl_easy_init() failed") end
ctxt.curl = curl
@ce_curl curl_easy_setopt CURLOPT_FOLLOWLOCATION 1
@ce_curl curl_easy_setopt CURLOPT_MAXREDIRS 5
if length(options.query_params) > 0
qp = urlencode_query_params(curl, options.query_params)
url = url * "?" * qp
end
ctxt.url = url
@ce_curl curl_easy_setopt CURLOPT_URL url
@ce_curl curl_easy_setopt CURLOPT_WRITEFUNCTION c_write_cb
p_ctxt = pointer_from_objref(ctxt)
@ce_curl curl_easy_setopt CURLOPT_WRITEDATA p_ctxt
@ce_curl curl_easy_setopt CURLOPT_HEADERFUNCTION c_header_cb
@ce_curl curl_easy_setopt CURLOPT_HEADERDATA p_ctxt
if options.content_type != ""
ct = "Content-Type: " * options.content_type
ctxt.slist = curl_slist_append(ctxt.slist, ct)
else
# Disable libCURL automatically setting the content type
ctxt.slist = curl_slist_append(ctxt.slist, "Content-Type:")
end
for hdr in options.headers
hdr_str = hdr[1] * ":" * hdr[2]
ctxt.slist = curl_slist_append(ctxt.slist, hdr_str)
end
# Disabling the Expect header since some webservers don't handle this properly
ctxt.slist = curl_slist_append(ctxt.slist, "Expect:")
@ce_curl curl_easy_setopt CURLOPT_HTTPHEADER ctxt.slist
if isa(options.ostream, AbstractString)
ctxt.resp.body = open(options.ostream, "w+")
ctxt.close_ostream = true
elseif isa(options.ostream, IO)
ctxt.resp.body = options.ostream
else
ctxt.resp.body = IOBuffer()
end
ctxt
end
function cleanup_easy_context(ctxt::Union{ConnContext,Bool})
if isa(ctxt, ConnContext)
if (ctxt.slist != C_NULL)
curl_slist_free_all(ctxt.slist)
ctxt.slist = C_NULL
end
if (ctxt.curl != C_NULL)
curl_easy_cleanup(ctxt.curl)
ctxt.curl = C_NULL
end
if ctxt.close_ostream
close(ctxt.resp.body)
ctxt.resp.body = nothing
ctxt.close_ostream = false
end
end
end
function process_response(ctxt)
http_code = Array(Clong,1)
@ce_curl curl_easy_getinfo CURLINFO_RESPONSE_CODE http_code
total_time = Array(Cdouble,1)
@ce_curl curl_easy_getinfo CURLINFO_TOTAL_TIME total_time
ctxt.resp.http_code = http_code[1]
ctxt.resp.total_time = total_time[1]
end
# function blocking_get (url)
# try
# ctxt=nothing
# ctxt = setup_easy_handle(url)
# curl = ctxt.curl
#
# @ce_curl curl_easy_perform
#
# process_response(ctxt)
#
# return ctxt.resp
# finally
# if isa(ctxt, ConnContext) && (ctxt.curl != 0)
# curl_easy_cleanup(ctxt.curl)
# end
# end
# end
##############################
# Library initializations
##############################
init() = curl_global_init(CURL_GLOBAL_ALL)
cleanup() = curl_global_cleanup()
##############################
# GET
##############################
function get(url::AbstractString, options::RequestOptions)
if (options.blocking)
ctxt = false
try
ctxt = setup_easy_handle(url, options)
@ce_curl curl_easy_setopt CURLOPT_HTTPGET 1
return exec_as_multi(ctxt)
finally
cleanup_easy_context(ctxt)
end
else
return remotecall(get, myid(), url, set_opt_blocking(options))
end
end
##############################
# POST & PUT
##############################
function post(url::AbstractString, data, options::RequestOptions=RequestOptions())
if (options.blocking)
return put_post(url, data, :post, options)
else
return remotecall(post, myid(), url, data, set_opt_blocking(options))
end
end
function put(url::AbstractString, data, options::RequestOptions=RequestOptions())
if (options.blocking)
return put_post(url, data, :put, options)
else
return remotecall(put, myid(), url, data, set_opt_blocking(options))
end
end
function put_post(url::AbstractString, data, putorpost::Symbol, options::RequestOptions)
rd::ReadData = ReadData()
if isa(data, AbstractString)
rd.typ = :buffer
rd.src = false
rd.str = data
rd.sz = length(data)
elseif isa(data, Dict) || (isa(data, Vector) && issubtype(eltype(data), Tuple))
arr_data = isa(data, Dict) ? Array{Tuple,1}(map((d) -> (d[1], d[2]), collect(data))) : data
rd.str = urlencode_query_params(arr_data) # Not very optimal since it creates another curl handle, but it is clean...
rd.sz = length(rd.str)
rd.typ = :buffer
rd.src = arr_data
if ((options.content_type == "") && (options.auto_content_type))
options.content_type = "application/x-www-form-urlencoded"
end
elseif isa(data, IO)
rd.typ = :io
rd.src = data
seekend(data)
rd.sz = position(data)
seekstart(data)
if ((options.content_type == "") && (options.auto_content_type))
options.content_type = "application/octet-stream"
end
elseif isa(data, Tuple)
(typsym, filename) = data
if (typsym != :file) error("Unsupported data datatype") end
rd.typ = :io
rd.src = open(filename)
rd.sz = filesize(filename)
try
if ((options.content_type == "") && (options.auto_content_type))
options.content_type = get_ct_from_ext(filename)
end
return _put_post(url, putorpost, options, rd)
finally
close(rd.src)
end
else
error("Unsupported data datatype")
end
return _put_post(url, putorpost, options, rd)
end
function _put_post(url::AbstractString, putorpost::Symbol, options::RequestOptions, rd::ReadData)
ctxt = false
try
ctxt = setup_easy_handle(url, options)
ctxt.rd = rd
if (putorpost == :post)
@ce_curl curl_easy_setopt CURLOPT_POST 1
@ce_curl curl_easy_setopt CURLOPT_POSTFIELDSIZE rd.sz
elseif (putorpost == :put)
@ce_curl curl_easy_setopt CURLOPT_UPLOAD 1
@ce_curl curl_easy_setopt CURLOPT_INFILESIZE rd.sz
end
if (rd.typ == :io) || (putorpost == :put)
p_ctxt = pointer_from_objref(ctxt)
@ce_curl curl_easy_setopt CURLOPT_READDATA p_ctxt
@ce_curl curl_easy_setopt CURLOPT_READFUNCTION c_curl_read_cb
else
ppostdata = pointer(rd.str)
@ce_curl curl_easy_setopt CURLOPT_COPYPOSTFIELDS ppostdata
end
return exec_as_multi(ctxt)
finally
cleanup_easy_context(ctxt)
end
end
##############################
# HEAD, DELETE and TRACE
##############################
function head(url::AbstractString, options::RequestOptions)
if (options.blocking)
ctxt = false
try
ctxt = setup_easy_handle(url, options)
@ce_curl curl_easy_setopt CURLOPT_NOBODY 1
return exec_as_multi(ctxt)
finally
cleanup_easy_context(ctxt)
end
else
return remotecall(head, myid(), url, set_opt_blocking(options))
end
end
delete(url::AbstractString, options::RequestOptions) = custom(url, "DELETE", options)
trace(url::AbstractString, options::RequestOptions) = custom(url, "TRACE", options)
options(url::AbstractString, options::RequestOptions) = custom(url, "OPTIONS", options)
for f in (:get, :head, :delete, :trace, :options)
@eval $(f)(url::AbstractString; kwargs...) = $(f)(url, RequestOptions(; kwargs...))
end
# put(url::AbstractString, data::AbstractString; kwargs...) = put(url, data, options=RequestOptions(; kwargs...))
# post(url::AbstractString, data::AbstractString; kwargs...) = post(url, data, options=RequestOptions(; kwargs...))
for f in (:put, :post)
@eval $(f)(url::AbstractString, data::AbstractString; kwargs...) = $(f)(url, data, RequestOptions(; kwargs...))
end
function custom(url::AbstractString, verb::AbstractString, options::RequestOptions)
if (options.blocking)
ctxt = false
try
ctxt = setup_easy_handle(url, options)
@ce_curl curl_easy_setopt CURLOPT_CUSTOMREQUEST verb
return exec_as_multi(ctxt)
finally
cleanup_easy_context(ctxt)
end
else
return remotecall(custom, myid(), url, verb, set_opt_blocking(options))
end
end
##############################
# EXPORTED UTILS
##############################
function urlencode_query_params{T<:Tuple}(params::Vector{T})
curl = curl_easy_init()
if (curl == C_NULL) throw("curl_easy_init() failed") end
querystr = urlencode_query_params(curl, params)
curl_easy_cleanup(curl)
return querystr
end
export urlencode_query_params
function urlencode_query_params{T<:Tuple}(curl, params::Vector{T})
querystr = ""
for x in params
k,v = x
if (v != "")
ep = urlencode(curl, string(k)) * "=" * urlencode(curl, string(v))
else
ep = urlencode(curl, string(k))
end
if querystr == ""
querystr = ep
else
querystr = querystr * "&" * ep
end
end
return querystr
end
function urlencode(curl, s::AbstractString)
b_arr = curl_easy_escape(curl, s, sizeof(s))
esc_s = unsafe_string(b_arr)
curl_free(b_arr)
return esc_s
end
function urlencode(s::AbstractString)
curl = curl_easy_init()
if (curl == C_NULL) throw("curl_easy_init() failed") end
esc_s = urlencode(curl, s)
curl_easy_cleanup(curl)
return esc_s
end
urlencode(s::SubString) = urlencode(unsafe_string(s))
export urlencode
function exec_as_multi(ctxt)
curl = ctxt.curl
curlm = curl_multi_init()
if (curlm == C_NULL) error("Unable to initialize curl_multi_init()") end
try
if isa(ctxt.options.callback, Function) ctxt.options.callback(curl) end
@ce_curlm curl_multi_add_handle curl
n_active = Array(Cint,1)
n_active[1] = 1
no_to = 30 * 24 * 3600.0
request_timeout = 0.001 + (ctxt.options.request_timeout == 0.0 ? no_to : ctxt.options.request_timeout)
started_at = time()
time_left = request_timeout
# poll_fd is unreliable when multiple parallel fds are active, hence using curl_multi_perform
# START curl_multi_socket_action mode
# @ce_curlm curl_multi_setopt CURLMOPT_SOCKETFUNCTION c_curl_socket_cb
# @ce_curlm curl_multi_setopt CURLMOPT_TIMERFUNCTION c_curl_multi_timer_cb
#
# muctxt = MultiCtxt()
# p_muctxt = pointer_from_objref(muctxt)
#
# @ce_curlm curl_multi_setopt CURLMOPT_SOCKETDATA p_muctxt
# @ce_curlm curl_multi_setopt CURLMOPT_TIMERDATA p_muctxt
#
#
# @ce_curlm curl_multi_socket_action CURL_SOCKET_TIMEOUT 0 n_active
#
# while (n_active[1] > 0) && (time_left > 0)
# evt_got = 0
# if (muctxt.chk_read || muctxt.chk_write)
# t1 = int64(time() * 1000)
#
# poll_to = min(muctxt.timeout < 0.0 ? no_to : muctxt.timeout, time_left)
# pfd_ret = poll_fd(RawFD(muctxt.s), poll_to, readable=muctxt.chk_read, writable=muctxt.chk_write)
#
# evt_got = (isreadable(pfd_ret) ? CURL_CSELECT_IN : 0) | (iswritable(pfd_ret) ? CURL_CSELECT_OUT : 0)
# else
# break
# end
#
# if (evt_got == 0)
# @ce_curlm curl_multi_socket_action CURL_SOCKET_TIMEOUT 0 n_active
# else
# @ce_curlm curl_multi_socket_action muctxt.s evt_got n_active
# end
#
# time_left = request_timeout - (time() - started_at)
# end
# END curl_multi_socket_action mode
# START curl_multi_perform mode
cmc = curl_multi_perform(curlm, n_active);
while (n_active[1] > 0) && (time_left > 0)
nb1 = ctxt.resp.bytes_recd
cmc = curl_multi_perform(curlm, n_active);
if(cmc != CURLM_OK) error("curl_multi_perform() failed: " * unsafe_string(curl_multi_strerror(cmc))) end
nb2 = ctxt.resp.bytes_recd
if (nb2 > nb1)
yield() # Just yield to other tasks
else
sleep(0.005) # Just to prevent unnecessary CPU spinning
end
time_left = request_timeout - (time() - started_at)
end
# END OF curl_multi_perform
if (n_active[1] == 0)
msgs_in_queue = Array(Cint,1)
p_msg::Ptr{CURLMsg2} = curl_multi_info_read(curlm, msgs_in_queue)
while (p_msg != C_NULL)
# println("Messages left in Q : " * string(msgs_in_queue[1]))
msg = unsafe_load(p_msg)
if (msg.msg == CURLMSG_DONE)
ec = convert(Int, msg.data)
if (ec != CURLE_OK)
# println("Result of transfer: " * string(msg.data))
throw("Error executing request : " * unsafe_string(curl_easy_strerror(ec)))
else
process_response(ctxt)
end
end
p_msg = curl_multi_info_read(curlm, msgs_in_queue)
end
else
error("request timed out")
end
finally
curl_multi_remove_handle(curlm, curl)
curl_multi_cleanup(curlm)
end
ctxt.resp
end
end