-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathhttp_output.clj
175 lines (149 loc) · 6.29 KB
/
http_output.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
(ns onyx.plugin.http-output
(:require [onyx.static.util :refer [kw->fn]]
[onyx.plugin.protocols :as p]
[taoensso.timbre :as log]
[aleph.http :as http]
[manifold.deferred :as d])
(:import [java.util Random]))
(defn next-backoff [attempt params]
(when (:allow-retry? params)
(let [fuzzy-multiplier (min (max (+ 1.0 (* 0.2 (.nextGaussian (Random.)))) 0) 2)
next-backoff-ms (int (min (:max-sleep-ms params)
(* (:base-sleep-ms params)
(Math/pow 2 attempt)
fuzzy-multiplier)))]
(when (< (+ (System/currentTimeMillis) next-backoff-ms)
(+ (:initial-request-time params) (:max-total-sleep-ms params)))
next-backoff-ms))))
(defn http-request [method url args success? async-exception-fn]
(-> (http/request (assoc args
:request-method method
:url url))
(d/chain
(fn [response]
(if (success? response)
[true response]
[false {:method method :url url :args args
:response response}])))
(d/catch Exception
(fn [e]
[false {:method method :url url :args args
:exception (pr-str e)}]))))
(defn process-message [message success? post-process ack-fn async-exception-fn retry-params run-state]
"Retry params:
- allow-retry? - if we will retry or not
- initial-request-time - time of first request
- base-sleep-ms - ...
- max-sleep-ms - ...
- max-total-sleep-ms - ..."
(let [{:keys [method url args]
:or {method :post}} message]
(d/loop [attempt 0]
(log/infof "Making HTTP request: %S %s %.30s attempt %d"
(name method) url args attempt)
(d/chain
(http-request method url args success? async-exception-fn)
(fn [[is-successful response]]
(let [next-backoff-ms (next-backoff attempt retry-params)]
(cond
is-successful
(do
(when post-process
(post-process message response))
(when ack-fn
(ack-fn))
response)
next-backoff-ms
(if @run-state
(do
(log/debugf "Backing off HTTP request: %S %s %.30s next retry in %d ms"
(name method) url args next-backoff-ms)
(Thread/sleep next-backoff-ms)
(d/recur (inc attempt)))
(do
(log/warnf "Aborting HTTP %s request to %s due to stopped peer" (name method) url)
(async-exception-fn response)))
:else
(async-exception-fn response))))))))
(defn send-request
"Use send-request to execute HTTP requests in a :function task type. Requires function-lifecycle on the task."
[{:keys [success? post-process retry-params]} run-state message]
(let [request (or (:request message) (throw (ex-info "No :request in message" message)))
send-retry-params (assoc retry-params :initial-request-time (System/currentTimeMillis))
result (deref (process-message request success? post-process nil #(throw %) send-retry-params run-state))]
(if (instance? Throwable result)
(throw result)
(assoc message :response result))))
(defn check-exception! [async-exception-info]
(when (not-empty @async-exception-info)
(throw (ex-info "HTTP request failed!" @async-exception-info))))
(deftype HttpOutput [success? post-process retry-params
^:unsynchronized-mutable async-exception-info
^:unsynchronized-mutable in-flight-writes
run-state]
p/Plugin
(start [this event]
(reset! run-state true)
this)
(stop [this event]
(reset! run-state false)
this)
p/BarrierSynchronization
(synced? [this epoch]
(check-exception! async-exception-info)
(zero? @in-flight-writes))
(completed? [this]
(check-exception! async-exception-info)
(zero? @in-flight-writes))
p/Checkpointed
(recover! [this replica-version checkpointed]
;; need a whole new atom so async writes from before the recover
;; don't alter the counter
(set! in-flight-writes (atom 0))
(set! async-exception-info (atom nil))
this)
(checkpoint [this])
(checkpointed! [this epoch])
p/Output
(prepare-batch [this event _ _] true)
(write-batch [this {:keys [onyx.core/write-batch onyx.core/params] :as event} _ _]
(check-exception! async-exception-info)
(let [ack-fn #(swap! in-flight-writes dec)
async-exception-fn #(reset! async-exception-info %)
retry (assoc retry-params :initial-request-time
(System/currentTimeMillis))
post-process (apply partial post-process params)]
(run! (fn [message]
(swap! in-flight-writes inc)
(process-message message
success? post-process ack-fn async-exception-fn retry run-state))
write-batch))
true))
(defn success?-default [{:keys [status]}]
(< status 500))
(defn post-process-default [& args]
nil)
(defn- prepare-task-map
[task-map]
(let [success? (kw->fn (or (:http-output/success-fn task-map)
::success?-default))
post-process (kw->fn (or (:http-output/post-process-fn task-map)
::post-process-default))
retry-params (:http-output/retry-params task-map)
retry-params (assoc retry-params :allow-retry? (some? retry-params))]
{:success? success?
:post-process post-process
:retry-params retry-params}))
(defn start-function
[{:keys [onyx.core/task-map onyx.core/params]} lifecycle]
{:onyx.core/params (conj params (prepare-task-map task-map) (atom true))})
(defn stop-function
[{:keys [onyx.core/params]} lifecycle]
(reset! (last params) false)
{})
(def function-lifecycle
{:lifecycle/before-task-start start-function
:lifecycle/after-task-stop stop-function})
(defn output [{:keys [onyx.core/task-map] :as pipeline-data}]
(let [{:keys [success? post-process retry-params]} (prepare-task-map task-map)]
(->HttpOutput success? post-process retry-params (atom nil) (atom 0) (atom false))))