diff --git a/lib/discourse_antivirus/clamav.rb b/lib/discourse_antivirus/clamav.rb index c37621c..bccad81 100644 --- a/lib/discourse_antivirus/clamav.rb +++ b/lib/discourse_antivirus/clamav.rb @@ -33,9 +33,8 @@ def versions def update_versions antivirus_versions = - clamav_services_pool.instances.map do |instance| - antivirus_version = with_session(instance) { |s| write_in_socket(s, "zVERSION\0") } - antivirus_version = clean_msg(antivirus_version).split("/") + clamav_services_pool.online_services.map do |service| + antivirus_version = service.version.split("/") { antivirus: antivirus_version[0], @@ -49,35 +48,33 @@ def update_versions end def accepting_connections? - available = online_target.present? + unavailable = clamav_services_pool.all_offline? - update_status(!available) + PluginStore.set(PLUGIN_NAME, UNAVAILABLE, unavailable) - available + !unavailable end def scan_upload(upload) - begin - file = get_uploaded_file(upload) + file = get_uploaded_file(upload) - return error_response(DOWNLOAD_FAILED) if file.nil? + return error_response(DOWNLOAD_FAILED) if file.nil? - scan_file(file) - rescue OpenURI::HTTPError - error_response(DOWNLOAD_FAILED) - rescue StandardError => e - Rails.logger.error("Could not scan upload #{upload.id}. Error: #{e.message}") - error_response(e.message) - end + scan_file(file) + rescue OpenURI::HTTPError + error_response(DOWNLOAD_FAILED) + rescue StandardError => e + Rails.logger.error("Could not scan upload #{upload.id}. Error: #{e.message}") + error_response(e.message) end def scan_file(file) - scan_response = - begin - with_session { |socket| stream_file(socket, file) } - rescue StandardError => e - e.message - end + online_service = clamav_services_pool.find_online_service + + # We open one connection to check if the service is online and another + # to scan the file. Code belows check if one of them failed with `&` and `unless`: + scan_response = online_service&.scan_file(file) + return error_response(UNAVAILABLE) unless scan_response parse_response(scan_response) end @@ -90,58 +87,14 @@ def error_response(error_message) { error: true, found: false, message: error_message } end - def update_status(unavailable) - PluginStore.set(PLUGIN_NAME, UNAVAILABLE, unavailable) - end - - def online_target - clamav_services_pool.instances.find do |instance| - ping_result = with_session(instance) { |s| write_in_socket(s, "zPING\0") } - - clean_msg(ping_result) == "PONG" - end - end - - def clean_msg(raw) - raw.gsub("1: ", "").strip - end - - def with_session(instance = online_target) - socket = instance&.connect! - - raise "ERROR: socket cannot be open" if !socket - - write_in_socket(socket, "zIDSESSION\0") - - yield(socket) - - write_in_socket(socket, "zEND\0") - - response = get_full_response_from(socket) - socket.close - response - end - def parse_response(scan_response) { - message: scan_response.gsub("1: stream:", "").gsub("\0", ""), + message: scan_response.gsub("stream:", "").gsub("\0", ""), found: scan_response.include?("FOUND"), error: scan_response.include?("ERROR"), } end - def stream_file(socket, file) - write_in_socket(socket, "zINSTREAM\0") - - while data = file.read(2048) - write_in_socket(socket, [data.length].pack("N")) - write_in_socket(socket, data) - end - - write_in_socket(socket, [0].pack("N")) - write_in_socket(socket, "") - end - def get_uploaded_file(upload) if store.external? # Upload#filesize could be approximate. @@ -152,31 +105,5 @@ def get_uploaded_file(upload) File.open(store.path_for(upload)) end end - - # ClamAV wants us to read/write in a non-blocking manner to prevent deadlocks. - # Read more about this [here](https://manpages.debian.org/testing/clamav-daemon/clamd.8.en.html#IDSESSION,) - # - # We need to peek into the socket buffer to make sure we can write/read from it, - # or we risk ClamAV abruptly closing the connection. - # For that, we use [IO#select](https://www.rubydoc.info/stdlib/core/IO.select) - def write_in_socket(socket, msg) - IO.select(nil, [socket]) - socket.sendmsg_nonblock(msg, 0, nil) - end - - def read_from_socket(socket) - IO.select([socket]) - - # Returns an array with the chunk as the first element - socket.recvmsg_nonblock(25).to_a.first.to_s - end - - def get_full_response_from(socket) - buffer = "" - - buffer += read_from_socket(socket) until buffer.ends_with?("\0") - - buffer - end end end diff --git a/lib/discourse_antivirus/clamav_service.rb b/lib/discourse_antivirus/clamav_service.rb index 0784945..ef4f9cf 100644 --- a/lib/discourse_antivirus/clamav_service.rb +++ b/lib/discourse_antivirus/clamav_service.rb @@ -2,17 +2,77 @@ module DiscourseAntivirus class ClamAVService - def initialize(hostname, port) + def initialize(connection_factory, hostname, port) + @connection_factory = connection_factory @hostname = hostname @port = port end - def connect! - begin - TCPSocket.new(@hostname, @port, connect_timeout: 3) - rescue StandardError - nil + def version + with_session { |s| write_in_socket(s, "zVERSION\0") } + end + + def online? + ping_result = with_session { |s| write_in_socket(s, "zPING\0") } + + ping_result == "PONG" + end + + def scan_file(file) + with_session do |socket| + write_in_socket(socket, "zINSTREAM\0") + + while data = file.read(2048) + write_in_socket(socket, [data.length].pack("N")) + write_in_socket(socket, data) + end + + write_in_socket(socket, [0].pack("N")) + write_in_socket(socket, "") end end + + private + + attr_reader :connection_factory, :hostname, :port, :connection + + def with_session + socket = connection_factory.call(hostname, port) + return if socket.nil? + + write_in_socket(socket, "zIDSESSION\0") + + yield(socket) + + write_in_socket(socket, "zEND\0") + + response = get_full_response_from(socket) + socket.close + response + end + + # ClamAV wants us to read/write in a non-blocking manner to prevent deadlocks. + # Read more about this [here](https://manpages.debian.org/testing/clamav-daemon/clamd.8.en.html#IDSESSION,) + # + # We need to peek into the socket buffer to make sure we can write/read from it, + # or we risk ClamAV abruptly closing the connection. + # For that, we use [IO#select](https://www.rubydoc.info/stdlib/core/IO.select) + def write_in_socket(socket, msg) + IO.select(nil, [socket]) + socket.sendmsg_nonblock(msg, 0, nil) + end + + def get_full_response_from(socket) + buffer = +"" + + until buffer.ends_with?("\0") + IO.select([socket]) + + # Returns an array with the chunk as the first element + buffer << socket.recvmsg_nonblock(25).to_a.first.to_s + end + + buffer.gsub("1: ", "").strip + end end end diff --git a/lib/discourse_antivirus/clamav_services_pool.rb b/lib/discourse_antivirus/clamav_services_pool.rb index ac438f6..506266a 100644 --- a/lib/discourse_antivirus/clamav_services_pool.rb +++ b/lib/discourse_antivirus/clamav_services_pool.rb @@ -2,15 +2,38 @@ module DiscourseAntivirus class ClamAVServicesPool + def online_services + instances.select(&:online?) + end + + def all_offline? + instances.none?(&:online?) + end + + def find_online_service + instances.detect(&:online?) + end + + private + + def connection_factory + @factory ||= + Proc.new do |hostname, port| + begin + TCPSocket.new(@hostname, @port, connect_timeout: 3) + rescue StandardError + nil + end + end + end + def instances @instances ||= servers .filter { |server| server&.hostname.present? && server&.port.present? } - .map { |server| ClamAVService.new(server.hostname, server.port) } + .map { |server| ClamAVService.new(connection_factory, server.hostname, server.port) } end - private - def servers @servers ||= if Rails.env.production? diff --git a/plugin.rb b/plugin.rb index 8a96a49..f266858 100644 --- a/plugin.rb +++ b/plugin.rb @@ -58,12 +58,10 @@ if validate && should_scan_file && upload.valid? antivirus = DiscourseAntivirus::ClamAV.instance - if antivirus.accepting_connections? - response = antivirus.scan_file(file) - is_positive = response[:found] + response = antivirus.scan_file(file) + is_positive = response[:found] - upload.errors.add(:base, I18n.t("scan.virus_found")) if is_positive - end + upload.errors.add(:base, I18n.t("scan.virus_found")) if is_positive end end diff --git a/spec/lib/discourse_antivirus/background_scan_spec.rb b/spec/lib/discourse_antivirus/background_scan_spec.rb index 3de5e37..40b3358 100644 --- a/spec/lib/discourse_antivirus/background_scan_spec.rb +++ b/spec/lib/discourse_antivirus/background_scan_spec.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "rails_helper" +require_relative "../../support/fake_pool" require_relative "../../support/fake_tcp_socket" describe DiscourseAntivirus::BackgroundScan do @@ -54,7 +55,7 @@ .with(upload, max_file_size_kb: filesize) .raises(OpenURI::HTTPError.new("forbidden", nil)) - antivirus = DiscourseAntivirus::ClamAV.new(store, build_fake_pool(socket: socket)) + antivirus = DiscourseAntivirus::ClamAV.new(store, build_fake_pool(socket)) scanner = described_class.new(antivirus) scanned_upload = ScannedUpload.create_new!(upload) @@ -74,7 +75,7 @@ filesize = upload.filesize + 2.megabytes store.expects(:download).with(upload, max_file_size_kb: filesize).returns(nil) - antivirus = DiscourseAntivirus::ClamAV.new(store, build_fake_pool(socket: socket)) + antivirus = DiscourseAntivirus::ClamAV.new(store, build_fake_pool(socket)) scanner = described_class.new(antivirus) scanned_upload = ScannedUpload.create_new!(upload) @@ -264,14 +265,14 @@ def create_scanned_upload(updated_at: 6.hours.ago, quarantined: false, scans: 0) end end - def build_fake_pool(socket:) - OpenStruct.new(tcp_socket: socket, all_tcp_sockets: [socket]) + def build_fake_pool(socket) + FakePool.new([FakeTCPSocket.online, socket]) end def build_scanner(quarantine_files: false) IO.stubs(:select) socket = quarantine_files ? FakeTCPSocket.positive : FakeTCPSocket.negative - antivirus = DiscourseAntivirus::ClamAV.new(Discourse.store, build_fake_pool(socket: socket)) + antivirus = DiscourseAntivirus::ClamAV.new(Discourse.store, build_fake_pool(socket)) described_class.new(antivirus) end end diff --git a/spec/lib/discourse_antivirus/clamav_spec.rb b/spec/lib/discourse_antivirus/clamav_spec.rb index ad5d8e5..da037b5 100644 --- a/spec/lib/discourse_antivirus/clamav_spec.rb +++ b/spec/lib/discourse_antivirus/clamav_spec.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "rails_helper" +require_relative "../../support/fake_pool" require_relative "../../support/fake_tcp_socket" describe DiscourseAntivirus::ClamAV do @@ -15,7 +16,7 @@ describe "#scan_upload" do it "returns false when the file is clear" do fake_socket = FakeTCPSocket.negative - pool = build_fake_pool(socket: fake_socket) + pool = build_fake_pool(fake_socket) antivirus = build_antivirus(pool) scan_result = antivirus.scan_upload(upload) @@ -26,7 +27,7 @@ it "returns true when the file has a virus" do fake_socket = FakeTCPSocket.positive - pool = build_fake_pool(socket: fake_socket) + pool = build_fake_pool(fake_socket) antivirus = build_antivirus(pool) scan_result = antivirus.scan_upload(upload) @@ -37,7 +38,7 @@ it "detects if ClamAV returned an error" do fake_socket = FakeTCPSocket.error - pool = build_fake_pool(socket: fake_socket) + pool = build_fake_pool(fake_socket) antivirus = build_antivirus(pool) scan_result = antivirus.scan_upload(upload) @@ -55,10 +56,10 @@ let(:last_update) { "Wed Jun 24 10:13:27 2020" } let(:socket) do - FakeTCPSocket.new(["1: #{antivirus_version}/#{database_version}/#{last_update}\0"]) + FakeTCPSocket.new("1: #{antivirus_version}/#{database_version}/#{last_update}\0") end - let(:antivirus) { build_antivirus(build_fake_pool(socket: socket)) } + let(:antivirus) { build_antivirus(build_fake_pool(socket)) } it "returns the version from the plugin store after fetching the last one" do antivirus.update_versions @@ -100,11 +101,10 @@ def assert_version_was_requested(fake_socket) expected = ["zIDSESSION\0", "zVERSION\0", "zEND\0"] expect(fake_socket.received_before_close).to contain_exactly(*expected) - expect(fake_socket.received).to be_empty end def assert_file_was_sent_through(fake_socket, file) - expected = ["zPING\0", "zIDSESSION\0", "zINSTREAM\0"] + expected = ["zIDSESSION\0", "zINSTREAM\0"] file.rewind while data = file.read(2048) @@ -118,11 +118,10 @@ def assert_file_was_sent_through(fake_socket, file) expected << "zEND\0" expect(fake_socket.received_before_close).to contain_exactly(*expected) - expect(fake_socket.received).to be_empty end - def build_fake_pool(socket:) - OpenStruct.new(tcp_socket: socket, all_tcp_sockets: [socket]) + def build_fake_pool(socket) + FakePool.new([FakeTCPSocket.online, socket]) end def build_antivirus(pool) diff --git a/spec/plugin_spec.rb b/spec/plugin_spec.rb index f384b84..cca8990 100644 --- a/spec/plugin_spec.rb +++ b/spec/plugin_spec.rb @@ -107,7 +107,7 @@ def mock_antivirus(socket) IO.stubs(:select).returns(true) - pool = FakePool.new([socket]) + pool = FakePool.new([FakeTCPSocket.online, socket]) antivirus = DiscourseAntivirus::ClamAV.new(Discourse.store, pool) DiscourseAntivirus::ClamAV.expects(:instance).returns(antivirus) end diff --git a/spec/support/fake_pool.rb b/spec/support/fake_pool.rb index 5b7bc37..5adf860 100644 --- a/spec/support/fake_pool.rb +++ b/spec/support/fake_pool.rb @@ -1,15 +1,18 @@ # frozen_string_literal: true -class FakePool +class FakePool < DiscourseAntivirus::ClamAVServicesPool def initialize(sockets) @sockets = sockets + @connections = 0 end - def tcp_socket - @sockets.first.dup + private + + def connection_factory + Proc.new { @sockets[@connections].tap { @connections += 1 } } end - def all_tcp_sockets - @sockets.map(&:dup) + def servers + [OpenStruct.new(hostname: "fake.hostname", port: "8080")] end end diff --git a/spec/support/fake_tcp_socket.rb b/spec/support/fake_tcp_socket.rb index f491cc7..265c257 100644 --- a/spec/support/fake_tcp_socket.rb +++ b/spec/support/fake_tcp_socket.rb @@ -1,28 +1,31 @@ # frozen_string_literal: true class FakeTCPSocket + def self.online + new("1: PONG\0") + end + def self.positive - new(["1: PONG\0", "1: stream: Win.Test.EICAR_HDB-1 FOUND\0"]) + new("1: stream: Win.Test.EICAR_HDB-1 FOUND\0") end def self.negative - new(["1: PONG\0", "1: stream: OK\0"]) + new("1: stream: OK\0") end def self.error - new(["1: PONG\0", "1: INSTREAM size limit exceeded. ERROR\0"]) + new("1: INSTREAM size limit exceeded. ERROR\0") end - def initialize(canned_responses) - @canned_responses = canned_responses + def initialize(canned_response) + @canned_response = canned_response @closed = false @received_before_close = [] @received = [] @next_to_read = 0 - @current_response = 0 end attr_reader :received, :received_before_close - attr_accessor :canned_responses + attr_accessor :canned_response def sendmsg_nonblock(text, _, _) raise if @closed @@ -35,22 +38,15 @@ def recvmsg_nonblock(maxlen) interval_end = @next_to_read + maxlen - response_chunk = canned_responses[@current_response][@next_to_read..interval_end] + response_chunk = canned_response[@next_to_read..interval_end] - if response_chunk.ends_with?("\0") - @current_response += 1 - @next_to_read = 0 - else - @next_to_read += interval_end + 1 - end + @next_to_read += interval_end + 1 unless response_chunk.ends_with?("\0") [response_chunk] end def close @closed = true - @next_to_read = 0 @received_before_close = @received - @received = [] end end