-
Notifications
You must be signed in to change notification settings - Fork 1
/
jericho.ml
341 lines (311 loc) · 11.7 KB
/
jericho.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
open Printf
module J = Yojson.Safe
type log_severity = Debug | Info | Warn | Error
type 'a log_print = ?exn:exn -> ('a, unit, string, unit) format4 -> 'a
let log_level : log_severity option ref = ref None
let log_from facility =
let print severity ?exn:_ (* FIXME *) fmt =
ksprintf begin fun message ->
match !log_level with
| Some log_severity when severity >= log_severity ->
let severity =
match severity with
| Debug -> "debug"
| Info -> "info"
| Warn -> "warn"
| Error -> "error"
in
fprintf stderr "[%s:%s] %s\n" facility severity message;
flush stderr
| _ -> ()
end fmt
in
object
method debug : 'a. 'a log_print = print Debug
method info : 'a. 'a log_print = print Info
method warn : 'a. 'a log_print = print Warn
method error : 'a. 'a log_print = print Error
end
let log = log_from "jericho"
let server_timestamp = `Assoc [ ".sv", `String "timestamp"; ]
let priority x = ".priority", `Float x
type print = [ `Pretty | `Silent ]
type order = [ `Key | `Value | `Priority | `Field of string ]
let string_of_print = function
| `Pretty -> "pretty"
| `Silent -> "silent"
let string_of_order = function
| `Key -> "$key"
| `Value -> "$value"
| `Priority -> "$priority"
| `Field s -> s
let curl_setup h ?(connecttimeout=30) ?(timeout=30) url =
let open Curl in
set_url h url;
set_nosignal h true;
set_connecttimeout h 30;
set_followlocation h false;
set_encoding h CURL_ENCODING_ANY;
set_timeout h timeout;
set_lowspeedlimit h 1;
set_lowspeedtime h 42;
()
type http_action = [ `GET | `POST | `DELETE | `PUT | `PATCH ]
let string_of_http_action = function
| `GET -> "GET"
| `POST -> "POST"
| `DELETE -> "DELETE"
| `PUT -> "PUT"
| `PATCH -> "PATCH"
let make_url_args args =
let urlencode = Netencoding.Url.encode ~plus:true in
let args = (List.map (fun (k, v) -> k ^ "=" ^ urlencode v) args) in
String.concat "&" args
let event_stream url =
let (chunks, push) = Lwt_stream.create () in
let rec curl ?wait () =
let%lwt () =
match wait with
| None -> Lwt.return_unit
| Some wait -> log #info "wait for %.1fs" wait; Lwt_unix.sleep wait
in
let h = Curl.init () in
let rec loop url =
let headers = ref [] in
let header s =
let k, v =
match String.index s ':' with
| i -> String.sub s 0 i, String.sub s (i + 1) (String.length s - i - 1)
| exception Not_found -> "", s
in
headers := (String.lowercase_ascii k, String.trim v) :: !headers;
String.length s
in
let write s =
let len = String.length s in
push (Some (Bytes.of_string s, ref 0, ref len));
len
in
let open Curl in
reset h;
curl_setup h ~timeout:0 url;
set_httpheader h [ "Accept: text/event-stream"; ];
set_headerfunction h header;
set_writefunction h write;
match%lwt Curl_lwt.perform h with
| CURLE_OK ->
begin match get_httpcode h with
| code when code / 100 = 2 ->
if !log_level = Some Debug then log #debug "http %d" code;
Lwt.return `Ok
| code when code / 100 = 3 ->
let%lwt url = Lwt.wrap2 List.assoc "location" !headers in
log #info "http %d location %s" code url;
loop url
| code ->
log #error "http %d" code;
Lwt.return (`Error (sprintf "http: %d" code))
end
| code ->
let msg = sprintf "curl (%d) %s" (Curl.errno code) (Curl.strerror code) in
log #error "curl error: %s" msg;
Lwt.return (`Error msg)
in
let%lwt wait =
begin match%lwt loop url with
| `Ok -> log #info "ok"; Lwt.return_none
| `Error error ->
log #error "error %s" error;
let wait = match wait with None -> 1. | Some wait -> min (wait *. 1.3) 10. in
Lwt.return_some wait
end [%finally
Curl.cleanup h;
Lwt.return_unit;
]
in
curl ?wait ()
in
let rec read buf written ofs size =
match size with
| 0 -> Lwt.return written
| _ ->
let thread = Lwt_stream.peek chunks in
match Lwt.is_sleeping thread with
| true when written > 0 -> Lwt.return written
| _ ->
match%lwt thread with
| None -> Lwt.return written
| Some (chunk, start, remaining) ->
let write = min size !remaining in
Lwt_bytes.blit_from_bytes chunk !start buf ofs write;
let%lwt () =
match write < !remaining with
| true -> start := !start + write; remaining := !remaining - write; Lwt.return_unit
| false -> Lwt_stream.junk chunks
in
read buf (written + write) (ofs + write) (size - write)
in
let read buf ofs size = read buf 0 ofs size in
let chan = Lwt_io.make ~mode:Lwt_io.Input read in
let lines = Lwt_io.read_lines chan in
Lwt.async curl;
let split_string s i =
let len = String.length s in
let j = if i < len - 1 && s.[i + 1] = ' ' then i + 2 else i + 1 in
String.sub s 0 i, String.sub s j (len - j)
in
let zero = "", [] in
let string_of_data data = String.concat "\n" (List.rev data) in
let get_kv data =
let s = string_of_data data in
let error ?exn s = log #error ?exn "get_kv %s" s; "", `Null in
match J.from_string s with
| `Assoc [ "path", `String k; "data", v; ]
| `Assoc [ "data", v; "path", `String k; ] -> k, v
| _ -> error s
| exception exn -> error ~exn s
in
let check_null data =
if data <> ["null"] then log #warn "keep-alive data %s" (string_of_data data)
in
Lwt_stream.from begin fun () ->
let rec process_line e =
match%lwt Lwt_stream.get lines with
| None -> Lwt.return_none
| Some "" -> dispatch_event e
| Some s when s.[0] = ':' ->
if !log_level = Some Debug then log #debug "stream comment %s" s;
process_line e
| Some s ->
match String.index s ':' with
| i -> process_event e (split_string s i)
| exception Not_found -> process_event e (s, "")
and process_event (event, data as e) (k, v) =
match k with
| "event" -> process_line (v, data)
| "data" -> process_line (event, v :: data)
| _ -> log #warn "ignored event %s" k; process_line e
and dispatch_event = function
| _, [] -> process_line zero
| "", _ -> log #warn "no event"; process_line zero
| "put", data -> Lwt.return_some (`Put (get_kv data))
| "patch", data -> Lwt.return_some (`Patch (get_kv data))
| "keep-alive", data -> check_null data; Lwt.return_some `KeepAlive
| "cancel", data -> check_null data; Lwt.return_some `Cancel
| "auth_revoked", data -> check_null data; Lwt.return_some `AuthRevoked
| event, data -> log #warn "unsupported event %s %s" event (string_of_data data); process_line zero
in
process_line zero
end
type event = [ `AuthRevoked | `Cancel | `KeepAlive | `Patch of string * J.json | `Put of string * J.json ]
type t = <
get : ?shallow:bool -> ?export:bool -> ?order_by:order ->
?start_at:string -> ?end_at:string -> ?equal_to:string ->
?limit_to_first:int -> ?limit_to_last:int -> ?print:print ->
string -> J.json Lwt.t;
set : ?pretty:bool -> string -> J.json -> unit Lwt.t;
update : ?pretty:bool -> string -> J.json -> unit Lwt.t;
update_multi : string -> (string * J.json) list -> unit Lwt.t;
push : string -> J.json -> string Lwt.t;
delete : string -> unit Lwt.t;
event_stream : string -> event Lwt_stream.t
>
type error = [ `Curl of int * string | `HTTP of int * string | `Response of string ]
let string_of_error = function
| `Curl (code, s) -> sprintf "curl %d : %s" code s
| `HTTP (code, s) -> sprintf "http %d : %s" code s
| `Response s -> sprintf "response : %s" s
exception Error of error
let option_map f x = match x with Some x -> Some (f x) | None -> None
let make ~auth base_url =
let log_error ?exn action path error = log #error ?exn "%s %s : %s" (string_of_http_action action) path error in
let return_error ?exn action k error = log_error ?exn action k (string_of_error error); Lwt.fail (Error error) in
let query action ?(pretty=false) ?(args=[]) ?print path data =
let body =
match data with
| None -> None
| Some data ->
match pretty with
| true -> Some (J.pretty_to_string data)
| false -> Some (J.to_string data)
in
let args = ("auth", Some auth) :: ("print", option_map string_of_print print) :: args in
let args =
List.filter (function (k, Some v) -> true | _ -> false) args |>
List.map (function (k, Some v) -> k, v | _ -> assert false)
in
let url = sprintf "%s%s.json?%s" base_url path (make_url_args args) in
let open Curl in
let h = init () in
curl_setup h url;
begin match action with
| `GET -> ()
| `DELETE -> set_customrequest h "DELETE"
| `POST -> set_post h true
| `PUT -> set_post h true; set_customrequest h "PUT"
| `PATCH -> set_post h true; set_customrequest h "PATCH"
end;
begin match body with
| None -> ()
| Some body ->
set_httpheader h [ "Content-Type: application/json"; ];
set_postfields h body;
set_postfieldsize h (String.length body)
end;
let b = Buffer.create 10 in
set_writefunction h (fun s -> Buffer.add_string b s; String.length s);
log #info "%s %s%s" (string_of_http_action action) path (match body with Some body -> sprintf " : %s" body | None -> "");
let%lwt result =
match%lwt Curl_lwt.perform h with
| CURLE_OK ->
let code = get_httpcode h in
let s = Buffer.contents b in
log #info "http %d : %d bytes" code (String.length s);
if !log_level = Some Debug then log #debug "http %d : %s" code s;
Lwt.return (code, s)
| code ->
let errno = Curl.errno code in
let error = Curl.strerror code in
log #error "curl (%d) %s" errno error;
Lwt.fail (Error (`Curl (errno, error)))
in
Curl.cleanup h;
Lwt.return result
in
let simple_query action ?pretty k v =
match%lwt query action ?pretty ~print:`Silent k v with
| 204, _ -> Lwt.return_unit
| code, s -> return_error action k (`HTTP (code, s))
in
let json_string s = J.to_string (`String s) in
(object
method get ?(shallow=false) ?(export=false) ?order_by ?start_at ?end_at ?equal_to ?limit_to_first ?limit_to_last ?print k =
let args = [
"shallow", (if shallow then Some "true" else None);
"orderBy", option_map (fun x -> json_string (string_of_order x)) order_by;
"startAt", option_map json_string start_at;
"endAt", option_map json_string end_at;
"equalTo", option_map json_string equal_to;
"limitToFirst", option_map string_of_int limit_to_first;
"limitToLast", option_map string_of_int limit_to_last;
"format", (if export then Some "export" else None);
] in
match%lwt query `GET ~args ?print k None with
| 200, s -> (try Lwt.return (J.from_string s) with exn -> return_error ~exn `GET k (`Response s))
| 204, "" -> Lwt.return `Null
| code, s -> return_error `GET k (`HTTP (code, s))
method set ?pretty k v = simple_query `PUT ?pretty k (Some v)
method update ?pretty k v = simple_query `PATCH ?pretty k (Some v)
method update_multi k l = simple_query `PATCH k (Some (`Assoc l))
method push k v =
match%lwt query `POST k (Some v) with
| 200, s ->
begin match J.from_string s with
| exception exn -> return_error ~exn `POST k (`Response s)
| `Assoc [ "name", `String name; ] -> Lwt.return name
| _ -> return_error `POST k (`Response s)
end
| code, s -> return_error `POST k (`HTTP (code, s))
method delete k = simple_query `DELETE k None
method event_stream k = event_stream (sprintf "%s%s.json?%s" base_url k (make_url_args [ "auth", auth; ]))
end : t)