Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support running HTTP requests as :onyx/type :function #8

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 55 additions & 15 deletions src/onyx/plugin/http_output.clj
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
:exception (pr-str e)}]))))


(defn process-message [message success? post-process ack-fn async-exception-fn retry-params]
(defn process-message [message success? post-process ack-fn async-exception-fn retry-params run-state]
"Retry params:
- allow-retry? - if we will retry or not
- initial-request-time - time of first request
Expand All @@ -59,28 +59,50 @@
(do
(when post-process
(post-process message response))
(ack-fn))
(when ack-fn
(ack-fn))
response)

next-backoff-ms
(do
(log/debugf "Backing off HTTP request: %S %s %.30s next retry in %d ms"
(name method) url args next-backoff-ms)
(Thread/sleep next-backoff-ms)
(d/recur (inc attempt)))
(if @run-state
(do
(log/debugf "Backing off HTTP request: %S %s %.30s next retry in %d ms"
(name method) url args next-backoff-ms)
(Thread/sleep next-backoff-ms)
(d/recur (inc attempt)))
(do
(log/warnf "Aborting HTTP %s request to %s due to stopped peer" (name method) url)
(async-exception-fn response)))

:else
(async-exception-fn response))))))))

(defn send-request
"Use send-request to execute HTTP requests in a :function task type. Requires function-lifecycle on the task."
[{:keys [success? post-process retry-params]} run-state message]
(let [request (or (:request message) (throw (ex-info "No :request in message" message)))
send-retry-params (assoc retry-params :initial-request-time (System/currentTimeMillis))
result (deref (process-message request success? post-process nil #(throw %) send-retry-params run-state))]
(if (instance? Throwable result)
(throw result)
(assoc message :response result))))

(defn check-exception! [async-exception-info]
(when (not-empty @async-exception-info)
(throw (ex-info "HTTP request failed!" @async-exception-info))))

(deftype HttpOutput [success? post-process retry-params
^:unsynchronized-mutable async-exception-info
^:unsynchronized-mutable in-flight-writes]
^:unsynchronized-mutable in-flight-writes
run-state]
p/Plugin
(start [this event] this)
(stop [this event] this)
(start [this event]
(reset! run-state true)
this)

(stop [this event]
(reset! run-state false)
this)

p/BarrierSynchronization
(synced? [this epoch]
Expand Down Expand Up @@ -112,24 +134,42 @@
(run! (fn [message]
(swap! in-flight-writes inc)
(process-message message
success? post-process ack-fn async-exception-fn retry))
success? post-process ack-fn async-exception-fn retry run-state))
write-batch))
true))


(defn success?-default [{:keys [status]}]
(< status 500))


(defn post-process-default [& args]
nil)


(defn output [{:keys [onyx.core/task-map] :as pipeline-data}]
(defn- prepare-task-map
[task-map]
(let [success? (kw->fn (or (:http-output/success-fn task-map)
::success?-default))
post-process (kw->fn (or (:http-output/post-process-fn task-map)
::post-process-default))
retry-params (:http-output/retry-params task-map)
retry-params (assoc retry-params :allow-retry? (some? retry-params))]
(->HttpOutput success? post-process retry-params (atom nil) (atom 0))))
{:success? success?
:post-process post-process
:retry-params retry-params}))

(defn start-function
[{:keys [onyx.core/task-map onyx.core/params]} lifecycle]
{:onyx.core/params (conj params (prepare-task-map task-map) (atom true))})

(defn stop-function
[{:keys [onyx.core/params]} lifecycle]
(reset! (last params) false)
{})

(def function-lifecycle
{:lifecycle/before-task-start start-function
:lifecycle/after-task-stop stop-function})

(defn output [{:keys [onyx.core/task-map] :as pipeline-data}]
(let [{:keys [success? post-process retry-params]} (prepare-task-map task-map)]
(->HttpOutput success? post-process retry-params (atom nil) (atom 0) (atom false))))