Skip to content

Commit

Permalink
FIX: Prevent deadlocks while communicating with ClamAV. (#38)
Browse files Browse the repository at this point in the history
ClamAV wants us to read/write in a [non-blocking manner](https://manpages.debian.org/testing/clamav-daemon/clamd.8.en.html#IDSESSION,) to prevent deadlocks. If they think this happened, it closes the connection, which results in a broken pipe error on send.

We need to peek into the socket buffer to make sure we can write/read from it, using [IO#select](https://www.rubydoc.info/stdlib/core/IO.select) for that.
  • Loading branch information
romanrizzi authored Feb 17, 2023
1 parent ce42d9a commit a221a28
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 69 deletions.
79 changes: 35 additions & 44 deletions lib/discourse_antivirus/clamav.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ class ClamAV
PLUGIN_NAME = "discourse-antivirus"
STORE_KEY = "clamav-versions"
DOWNLOAD_FAILED = "Download failed"
SOCKET_READ_ERROR = "Timed out while reading from socket"
UNAVAILABLE = "unavailable"

SOCKET_READ_TIMEOUT = 5

def self.instance
new(Discourse.store, DiscourseAntivirus::ClamAVServicesPool.new)
end
Expand All @@ -28,10 +25,7 @@ def update_versions
antivirus_versions =
clamav_services_pool.all_tcp_sockets.map do |tcp_socket|
antivirus_version =
with_session(socket: tcp_socket) do |socket|
socket.send("zVERSION\0", 0)
read_until(socket, "\0")
end
with_session(socket: tcp_socket) { |socket| write_in_socket(socket, "zVERSION\0") }

antivirus_version = clean_msg(antivirus_version).split("/")

Expand Down Expand Up @@ -80,8 +74,6 @@ def scan_upload(upload)
def scan_file(file)
scan_response = with_session { |socket| stream_file(socket, file) }

return error_response(SOCKET_READ_ERROR) if scan_response.nil?

parse_response(scan_response)
end

Expand All @@ -100,11 +92,7 @@ def update_status(unavailable)
def target_online?(socket)
return false if socket.nil?

ping_result =
with_session(socket: socket) do |s|
s.send("zPING\0", 0)
read_until(s, "\0")
end
ping_result = with_session(socket: socket) { |s| write_in_socket(s, "zPING\0") }

clean_msg(ping_result) == "PONG"
end
Expand All @@ -114,39 +102,35 @@ def clean_msg(raw)
end

def with_session(socket: clamav_services_pool.tcp_socket)
open_session(socket)
yield(socket).tap { |_| close_socket(socket) }
write_in_socket(socket, "zIDSESSION\0")

yield(socket)

write_in_socket(socket, "zEND\0")

response = get_full_response_from(socket)
socket.close
response
end

def parse_response(scan_response)
{
message: scan_response.gsub("1: stream:", ""),
message: scan_response.gsub("1: stream:", "").gsub("\0", ""),
found: scan_response.include?("FOUND"),
error: scan_response.include?("ERROR"),
}
end

def open_session(socket)
socket.send("nIDSESSION\n", 0)
end

def close_socket(socket)
socket.send("nEND\0", 0)
socket.close
end

def stream_file(socket, file)
socket.send("zINSTREAM\0", 0)
write_in_socket(socket, "zINSTREAM\0")

while data = file.read(2048)
socket.send([data.length].pack("N"), 0)
socket.send(data, 0)
write_in_socket(socket, [data.length].pack("N"))
write_in_socket(socket, data)
end

socket.send([0].pack("N"), 0)
socket.send("", 0)

read_until(socket, "\0")
write_in_socket(socket, [0].pack("N"))
write_in_socket(socket, "")
end

def get_uploaded_file(upload)
Expand All @@ -160,21 +144,28 @@ def get_uploaded_file(upload)
end
end

def read_until(socket, delimiter)
# It monitors given arrays of IO objects,
# waits one or more of IO objects ready for reading, are ready for writing,
# and have pending exceptions respectively, and returns an array that contains
# arrays of those IO objects. It will return nil if optional timeout value is
# given and no IO object is ready in timeout seconds.
response_ready = IO.select([socket], nil, nil, SOCKET_READ_TIMEOUT)
# ClamAV wants us to read/write in a non-blocking manner to prevent deadlocks.
# Read more about this [here](https://manpages.debian.org/testing/clamav-daemon/clamd.8.en.html#IDSESSION,)
#
# We need to peek into the socket buffer to make sure we can write/read from it,
# or we risk ClamAV abruptly closing the connection.
# For that, we use [IO#select](https://www.rubydoc.info/stdlib/core/IO.select)
def write_in_socket(socket, msg)
IO.select(nil, [socket])
socket.sendmsg_nonblock(msg, 0, nil)
end

def read_from_socket(socket)
IO.select([socket])

return nil if !response_ready
# Returns an array with the chunk as the first element
socket.recvmsg_nonblock(25).to_a.first.to_s
end

def get_full_response_from(socket)
buffer = ""

while (char = socket.getc) != delimiter
buffer += char
end
buffer += read_from_socket(socket) until buffer.ends_with?("\0")

buffer
end
Expand Down
18 changes: 2 additions & 16 deletions spec/lib/discourse_antivirus/background_scan_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,6 @@
expect(scanned_upload.next_scan_at).to be_present
expect(scanned_upload.last_scan_failed).to eq(true)
end

it "will try again if we couldn't read the response from the socket" do
scanner = build_scanner(simulate_read_error: true)
scanned_upload = ScannedUpload.create_new!(upload)

scanner.scan([scanned_upload])
scanned_upload.reload

expect(scanned_upload.scans).to eq(0)
expect(scanned_upload.scan_result).to eq(DiscourseAntivirus::ClamAV::SOCKET_READ_ERROR)
expect(scanned_upload.next_scan_at).to be_present
expect(scanned_upload.last_scan_failed).to eq(true)
end
end

describe "#queue_batch" do
Expand Down Expand Up @@ -281,9 +268,8 @@ def build_fake_pool(socket:)
OpenStruct.new(tcp_socket: socket, all_tcp_sockets: [socket])
end

def build_scanner(quarantine_files: false, simulate_read_error: false)
select_response = simulate_read_error ? nil : true
IO.stubs(:select).returns(select_response)
def build_scanner(quarantine_files: false)
IO.stubs(:select)
socket = quarantine_files ? FakeTCPSocket.positive : FakeTCPSocket.negative
antivirus = DiscourseAntivirus::ClamAV.new(Discourse.store, build_fake_pool(socket: socket))
described_class.new(antivirus)
Expand Down
8 changes: 4 additions & 4 deletions spec/lib/discourse_antivirus/clamav_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

before do
file.rewind
IO.stubs(:select).returns(true)
IO.stubs(:select)
end

describe "#scan_upload" do
Expand Down Expand Up @@ -97,14 +97,14 @@
end

def assert_version_was_requested(fake_socket)
expected = ["nIDSESSION\n", "zVERSION\0", "nEND\0"]
expected = ["zIDSESSION\0", "zVERSION\0", "zEND\0"]

expect(fake_socket.received_before_close).to contain_exactly(*expected)
expect(fake_socket.received).to be_empty
end

def assert_file_was_sent_through(fake_socket, file)
expected = ["nIDSESSION\n", "zINSTREAM\0"]
expected = ["zIDSESSION\0", "zINSTREAM\0"]

file.rewind
while data = file.read(2048)
Expand All @@ -115,7 +115,7 @@ def assert_file_was_sent_through(fake_socket, file)
expected << [0].pack("N")
expected << ""

expected << "nEND\0"
expected << "zEND\0"

expect(fake_socket.received_before_close).to contain_exactly(*expected)
expect(fake_socket.received).to be_empty
Expand Down
18 changes: 13 additions & 5 deletions spec/support/fake_tcp_socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,23 @@ def initialize(canned_responses)
attr_reader :received, :received_before_close
attr_accessor :canned_responses

def send(text, _)
def sendmsg_nonblock(text, _, _)
received << text
end

def getc
canned_responses[@current_response][@next_to_read].tap do |c|
@next_to_read += 1
@current_response += 1 if c == "\0"
def recvmsg_nonblock(maxlen)
interval_end = @next_to_read + maxlen

response_chunk = canned_responses[@current_response][@next_to_read..interval_end]

if response_chunk.ends_with?("\0")
@current_response += 1
@next_to_read = 0
else
@next_to_read += interval_end + 1
end

[response_chunk]
end

def close
Expand Down

0 comments on commit a221a28

Please sign in to comment.