Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Now message-box/bt class will restart thread if it was aborted because of a non-local exit from processing loop. #103

Merged
merged 3 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 56 additions & 15 deletions bench.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@
(wait-if-queue-larger-than 10000 wait-if-queue-larger-than-given-p)
(duration 10)
(num-iterations 60)
(load-threads 8))
(load-threads 8)
(time-out nil))

(log:config :warn)

Expand Down Expand Up @@ -124,7 +125,8 @@
:async-ask-p async-ask-p
:num-shared-workers num-shared-workers
:queue-size queue-size
:wait-if-queue-larger-than wait-if-queue-larger-than))
:wait-if-queue-larger-than wait-if-queue-larger-than
:time-out time-out))
(force-output)

(with-timing (num-iterations
Expand Down Expand Up @@ -255,33 +257,72 @@


(defun run-all (&key
(num-iterations 10)
(duration 10))
(num-iterations 60)
(duration 10)
(queue-size 100)
(time-out 3)
&aux (started-at (get-internal-real-time)))
(run-benchmark :num-iterations num-iterations
:duration duration)
:duration duration
:with-reply-p nil
:async-ask-p nil)

(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil))
(run-benchmark :num-iterations num-iterations
:duration duration
:with-reply-p t :async-ask-p nil)
:with-reply-p t
:async-ask-p nil)

(run-benchmark :num-iterations num-iterations
:duration duration
:with-reply-p t
:async-ask-p t)


(format t "With queue size limited to ~A:~2%"
queue-size)

(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t))
(run-benchmark :num-iterations num-iterations
:duration duration
:with-reply-p t :async-ask-p t)
:with-reply-p nil
:async-ask-p nil
:queue-size queue-size)

(format t "Running ~A:~%" '(run-benchmark :queue-size 100))
(run-benchmark :num-iterations num-iterations
:duration duration
:queue-size 100)
:with-reply-p t
:async-ask-p nil
:queue-size queue-size)

(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil :queue-size 100))
(run-benchmark :num-iterations num-iterations
:duration duration
:with-reply-p t :async-ask-p nil :queue-size 100)
:with-reply-p t
:async-ask-p t
:queue-size queue-size)


(format t "With time-out ~A:~2%"
time-out)

(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t :queue-size 100))
(run-benchmark :num-iterations num-iterations
:duration duration
:with-reply-p t :async-ask-p t :queue-size 100))
:with-reply-p nil
:async-ask-p nil
;; This should not make sense for with-reply-p = nil
:time-out time-out)

(run-benchmark :num-iterations num-iterations
:duration duration
:with-reply-p t
:async-ask-p nil
:time-out time-out)

(run-benchmark :num-iterations num-iterations
:duration duration
:with-reply-p t
:async-ask-p t
:time-out time-out)

(format t "All tests are performed in ~,2f seconds.~%"
(/ (- (get-internal-real-time) started-at)
internal-time-units-per-second)))

5 changes: 4 additions & 1 deletion sento.asd
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@
:author "Manfred Bergmann"
:depends-on ("sento"
"fiveam"
"serapeum"
"lparallel"
"cl-mock")
:components ((:module "tests"
:serial t
:components
((:file "all-test")
(:file "miscutils-test")
Expand Down Expand Up @@ -95,7 +97,8 @@
(:file "actor-system-test")
(:file "actor-tree-test")
(:file "spawn-in-receive-test")
)))
(:file "test-utils")
(:file "message-box-test"))))
:description "Test system for sento"
:perform (test-op (op c) (symbol-call :fiveam :run!
(uiop:find-symbol* '#:test-suite
Expand Down
71 changes: 60 additions & 11 deletions src/mbox/message-box.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,41 @@ This is used to break the environment possibly captured as closure at 'submit' s
(defclass message-box/bt (message-box-base)
((queue-thread :initform nil
:documentation
"The thread that pops queue items."))
"The thread that pops queue items.")
(thread-is-running-p :initform nil
:type boolean
:documentation
"Will be set to NIL if processing loop will be broken because of an error or a restart invocation."))
(:documentation
"Bordeaux-Threads based message-box with a single thread operating on a message queue.
This is used when the actor is created using a `:pinned` dispatcher type.
There is a limit on the maximum number of actors/agents that can be created with
this kind of queue because each message-box (and with that each actor) requires exactly one thread."))


(declaim (ftype (function (message-box/bt &key (:thread-name (or null string)))
(values &optional))
start-thread))

(defun start-thread (msgbox &key thread-name)
(with-slots (name queue-thread thread-is-running-p)
msgbox
(flet ((run-processing-loop ()
(setf thread-is-running-p t)
(unwind-protect
(message-processing-loop msgbox)
(setf thread-is-running-p
nil))))
(setf queue-thread
(bt2:make-thread #'run-processing-loop
:name (or thread-name
(mkstr "message-thread-" name))))))
(values))


(defmethod initialize-instance :after ((self message-box/bt) &key)
(with-slots (name queue-thread) self
(setf queue-thread (bt2:make-thread
(lambda () (message-processing-loop self))
:name (mkstr "message-thread-" name))))
(start-thread self)

(when (next-method-p)
(call-next-method)))

Expand Down Expand Up @@ -179,6 +202,24 @@ This function sets the result as `handler-result' in `item'. The return of this
(bt2:condition-notify withreply-cvar)))
(handler-fun)))))


(declaim (ftype (function (message-box/bt)
(values &optional))
ensure-thread-is-running))

(defun ensure-thread-is-running (msgbox)
(with-slots (queue-thread thread-is-running-p should-run)
msgbox
(when (and (not thread-is-running-p)
should-run)
;; Just to be sure that thread is not alive:
(unless (bt2:thread-alive-p queue-thread)
(let ((thread-name (bt2:thread-name queue-thread)))
(log:warn "Restarting thread" thread-name)
(start-thread msgbox
:thread-name thread-name))))
(values)))

(defmethod submit ((self message-box/bt) message withreply-p time-out handler-fun-args)
"The `handler-fun-args` argument must contain a handler function as first list item.
It will be apply'ed with the rest of the args when the message was 'popped' from queue."
Expand All @@ -200,15 +241,21 @@ It will be apply'ed with the rest of the args when the message was 'popped' from
:time-out time-out
:handler-fun-args handler-fun-args
:handler-result 'no-result)))
(log:trace "~a: withreply: waiting for arrival of result..." (name msgbox))

(bt2:with-lock-held (withreply-lock)
(log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item)
(queue:pushq queue push-item)

(if time-out
(wait-and-probe-for-msg-handler-result msgbox push-item)
(bt2:condition-wait withreply-cvar withreply-lock)))

(ensure-thread-is-running msgbox)

(log:trace "~a: withreply: waiting for arrival of result..." (name msgbox))

(unless (bt2:condition-wait withreply-cvar withreply-lock
:timeout time-out)
(log:warn "~a: time-out elapsed but result not available yet!" (name msgbox))
(setf (slot-value push-item 'cancelled-p) t)
(error 'ask-timeout
:wait-time time-out)))

(with-slots (handler-result) push-item
(log:trace "~a: withreply: result should be available: ~a" (name msgbox) handler-result)
handler-result)))
Expand All @@ -222,6 +269,7 @@ The submitting code has to await the side-effect and possibly handle a timeout."
:handler-fun-args handler-fun-args)))
(log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item)
(queue:pushq queue push-item)
(ensure-thread-is-running msgbox)
t))

(defmethod stop ((self message-box/bt) &optional (wait nil))
Expand Down Expand Up @@ -301,6 +349,7 @@ Returns the handler-result if `withreply-p' is eq to `T', otherwise the return i
processed-messages
dispatcher) self
(incf processed-messages)

(let ((push-item (make-message-item/dp
:message message
:handler-fun-args handler-fun-args
Expand Down
111 changes: 111 additions & 0 deletions tests/message-box-test.lisp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
(defpackage :sento.message-box-test
(:use :cl :fiveam :cl-mock :sento.actor :sento.future)
(:shadow #:! #:?)
(:import-from #:miscutils
#:assert-cond
#:await-cond
#:filter)
(:import-from #:timeutils
#:ask-timeout)
(:import-from #:sento.messageb
#:message-box/bt
#:submit
#:no-result
#:queue-thread
#:stop)
(:import-from #:sento.test-utils
#:parametrized-test)
(:import-from #:ac
#:actor-of))

(in-package :sento.message-box-test)

(def-suite message-box-tests
:description "message-box tests"
:in sento.tests:test-suite)

(in-suite message-box-tests)


(defun wait-while-thread-will-die (msgbox &key (timeout 10))
(let ((wait-until (+ (get-internal-real-time) (* timeout
internal-time-units-per-second))))
(with-slots (queue-thread)
msgbox
(loop :while (bt2:thread-alive-p queue-thread)
:do (sleep 0.1)
(when (< wait-until
(get-internal-real-time))
(error "Thread didn't die in ~A seconds."
timeout))))))


(parametrized-test bt-box-resurrects-thread-after-abort-if-handler-catches-all-signals
((withreply-p timeout)
(nil nil)
(t 1)
(t nil))

"Simulates a situation when error has happened during message processing, and ABORT restart was invoked.
Usually this kill a thread, but here we ensure that by the thread is resurrected when we submit a
subsequent message."

(flet ((kill-by-restart-invoke (msg)
(declare (ignore msg))
(handler-case
;; This way we are simulating that the user choose
;; an ABORT restart in the IDE during debug session:
(handler-bind ((serious-condition #'abort))
(error "Die, thread, die!"))
;; This part the same as error handling code in the
;; SENTO.ACTOR-CELL:HANDLE-MESSAGE function:
;;
;; TODO: t was used to check if it is able to
;; catch stack unwinding because of INVOKE-RESTART,
;; but it can't.
(t (c)
(log:error "error condition was raised: ~%~a~%"
c)
(cons :handler-error c)))))

(let ((box (make-instance 'message-box/bt
:name "foo")))
(unwind-protect
(progn
(let ((first-reply
(submit box "The Message"
t
;; Don't wait for result here, because we are
;; intentionally raise error here and will never
;; return a result:
nil
(list #'kill-by-restart-invoke))))
(is (equal first-reply
'no-result)))

(wait-while-thread-will-die box)

(is (not
(bt2:thread-alive-p
(slot-value box 'queue-thread))))

(let ((result (handler-case
(submit box "The Message"
withreply-p
timeout
(list (lambda (msg)
(reverse msg))))
(ask-timeout ()
:timeout))))

(cond
(withreply-p
(is (string= "egasseM ehT" result)))
(t
(is (eql result t)))))

(is (bt2:thread-alive-p
(slot-value box 'queue-thread))))

;; Cleanup a thread:
(stop box t)))))
Loading
Loading