diff --git a/src/eventloop.jl b/src/eventloop.jl index fc652497..5c2a367a 100644 --- a/src/eventloop.jl +++ b/src/eventloop.jl @@ -1,47 +1,90 @@ -function eventloop(socket) - task_local_storage(:IJulia_task, "write task") - try - while true - msg = recv_ipython(socket) - try - send_status("busy", msg) - invokelatest(get(handlers, msg.header["msg_type"], unknown_request), socket, msg) - catch e - # Try to keep going if we get an exception, but - # send the exception traceback to the front-ends. - # (Ignore SIGINT since this may just be a user-requested - # kernel interruption to interrupt long calculations.) - if !isa(e, InterruptException) - content = error_content(e, msg="KERNEL EXCEPTION") - map(s -> println(orig_stderr[], s), content["traceback"]) - send_ipython(publish[], msg_pub(execute_msg, "error", content)) +function eventloop(socket::Socket, msgs::Channel, handlers) + while true + try + while true + msg = take!(msgs) + try + send_status("busy", msg) + invokelatest(get(handlers, msg.header["msg_type"], unknown_request), socket, msg) + catch e + # Try to keep going if we get an exception, but + # send the exception traceback to the front-ends. + # (Ignore SIGINT since this may just be a user-requested + # kernel interruption to interrupt long calculations.) + if !isa(e, InterruptException) + content = error_content(e, msg="KERNEL EXCEPTION") + map(s -> println(orig_stderr[], s), content["traceback"]) + send_ipython(publish[], msg_pub(execute_msg, "error", content)) + else + rethrow() + end + finally + flush_all() + send_status("idle", msg) end - finally - flush_all() - send_status("idle", msg) + yield() + end + catch e + # the Jupyter manager may send us a SIGINT if the user + # chooses to interrupt the kernel; don't crash on this + if !isa(e, InterruptException) + rethrow() end end - catch e - # the Jupyter manager may send us a SIGINT if the user - # chooses to interrupt the kernel; don't crash on this - if isa(e, InterruptException) - eventloop(socket) - else - rethrow() - end + yield() end end +const iopub_task = Ref{Task}() const requests_task = Ref{Task}() function waitloop() - @async eventloop(control[]) - requests_task[] = @async eventloop(requests[]) + control_msgs = Channel{Msg}(32) do ch + task_local_storage(:IJulia_task, "control_msgs task") + while isopen(control[]) + msg::Msg = recv_ipython(control[]) + put!(ch, msg) + yield() + end + end + + iopub_msgs = Channel{Msg}(32) + request_msgs = Channel{Msg}(32) do ch + task_local_storage(:IJulia_task, "request_msgs task") + while isopen(requests[]) + msg::Msg = recv_ipython(requests[]) + if haskey(iopub_handlers, msg.header["msg_type"]) + put!(iopub_msgs, msg) + else + put!(ch, msg) + end + yield() + end + end + + control_task = @async begin + task_local_storage(:IJulia_task, "control handle/write task") + eventloop(control[], control_msgs, handlers) + end + requests_task[] = @async begin + task_local_storage(:IJulia_task, "requests handle/write task") + eventloop(requests[], request_msgs, handlers) + end + iopub_task[] = @async begin + task_local_storage(:IJulia_task, "iopub handle/write task") + eventloop(requests[], iopub_msgs, iopub_handlers) + end + + bind(control_msgs, control_task) + bind(request_msgs, requests_task[]) + bind(iopub_msgs, iopub_task[]) + while true try wait() catch e # send interrupts (user SIGINT) to the code-execution task if isa(e, InterruptException) + @async Base.throwto(iopub_task[], e) @async Base.throwto(requests_task[], e) else rethrow() @@ -49,3 +92,4 @@ function waitloop() end end end + diff --git a/src/handlers.jl b/src/handlers.jl index 4e84f709..84ce7367 100644 --- a/src/handlers.jl +++ b/src/handlers.jl @@ -271,6 +271,7 @@ end function interrupt_request(socket, msg) @async Base.throwto(requests_task[], InterruptException()) + @async Base.throwto(iopub_task[], InterruptException()) send_ipython(requests[], msg_reply(msg, "interrupt_reply", Dict())) end @@ -291,5 +292,11 @@ const handlers = Dict{String,Function}( "comm_open" => comm_open, "comm_info_request" => comm_info_request, "comm_msg" => comm_msg, - "comm_close" => comm_close + "comm_close" => comm_close, +) + +const iopub_handlers = Dict{String,Function}( + "comm_open" => comm_open, + "comm_msg" => comm_msg, + "comm_close" => comm_close, )