From c4f3e3324aa0c9534deab55de48abd679b800096 Mon Sep 17 00:00:00 2001 From: Bianca Nenciu Date: Fri, 18 Aug 2023 16:08:22 +0300 Subject: [PATCH] FIX: Scan files when a single server is online (#42) * FIX: Scan files when a single server is online This commits makes sure that even when a single server is online, the plugin is available. This commit also makes sure that a random and online server is chosen when using with_session without any specified socket. It used to use the first one. * FIX: Ensure used sockets are not closed * FIX: Add a 3 second timeout * Fix lint * Refactor ClamAV service pool * Redistribute responsibilities. This change builds upon Bianca's idea of create the `ClamAVService` class and reorganized the code based on the following rationale: `ClamAV` is what glues everything together. It knows how to talk to the S3 store, the plugin store and the `ClamAVService` to display the version, check availability, and scan files. It gets the raw response from ClamAV and translates to a different format which the rest of the plugin relies on. `ClamAVService` defines an interface of which operations are possible (`online?`, `scan_file`, `version`) and takes care of the socket and session managemente, as well as writting/reading from the socket. `ClamAVServicePool` resolves the SRV record and instantiates existing services. It tells them how to open a connection. While working on this, I tried to remove the latter and do everything using only the first two, but I decided to keep it in the end because this design is handy for testing. We rely on the `FakeTCPSocket` for spying on the communication and returning specific responses, and this design let us use without relying on a mocking framework, just dependency injection. --------- Co-authored-by: Roman Rizzi --- lib/discourse_antivirus/clamav.rb | 124 +++++------------- lib/discourse_antivirus/clamav_service.rb | 78 +++++++++++ .../clamav_services_pool.rb | 61 ++++----- .../enable_discourse_antivirus_validator.rb | 2 +- plugin.rb | 47 +++---- .../background_scan_spec.rb | 11 +- spec/lib/discourse_antivirus/clamav_spec.rb | 17 ++- spec/plugin_spec.rb | 9 +- spec/support/fake_pool.rb | 18 +++ spec/support/fake_tcp_socket.rb | 42 +++--- 10 files changed, 211 insertions(+), 198 deletions(-) create mode 100644 lib/discourse_antivirus/clamav_service.rb create mode 100644 spec/support/fake_pool.rb diff --git a/lib/discourse_antivirus/clamav.rb b/lib/discourse_antivirus/clamav.rb index af2f4cf..797b013 100644 --- a/lib/discourse_antivirus/clamav.rb +++ b/lib/discourse_antivirus/clamav.rb @@ -12,6 +12,16 @@ def self.instance new(Discourse.store, DiscourseAntivirus::ClamAVServicesPool.new) end + def self.correctly_configured? + return true if Rails.env.test? + + if Rails.env.production? + SiteSetting.antivirus_srv_record.present? + else + GlobalSetting.respond_to?(:clamav_hostname) && GlobalSetting.respond_to?(:clamav_port) + end + end + def initialize(store, clamav_services_pool) @store = store @clamav_services_pool = clamav_services_pool @@ -23,11 +33,8 @@ def versions def update_versions antivirus_versions = - clamav_services_pool.all_tcp_sockets.map do |tcp_socket| - antivirus_version = - with_session(socket: tcp_socket) { |socket| write_in_socket(socket, "zVERSION\0") } - - antivirus_version = clean_msg(antivirus_version).split("/") + clamav_services_pool.online_services.map do |service| + antivirus_version = service.version.split("/") { antivirus: antivirus_version[0], @@ -41,38 +48,33 @@ def update_versions end def accepting_connections? - sockets = clamav_services_pool.all_tcp_sockets - - if sockets.empty? - update_status(true) - return false - end + unavailable = clamav_services_pool.all_offline? - available = sockets.reduce(true) { |memo, socket| memo && target_online?(socket) } + PluginStore.set(PLUGIN_NAME, UNAVAILABLE, unavailable) - available.tap do |status| - unavailable = !status - update_status(unavailable) - end + !unavailable end def scan_upload(upload) - begin - file = get_uploaded_file(upload) + file = get_uploaded_file(upload) - return error_response(DOWNLOAD_FAILED) if file.nil? + return error_response(DOWNLOAD_FAILED) if file.nil? - scan_file(file) - rescue OpenURI::HTTPError - error_response(DOWNLOAD_FAILED) - rescue StandardError => e - Rails.logger.error("Could not scan upload #{upload.id}. Error: #{e.message}") - error_response(e.message) - end + scan_file(file) + rescue OpenURI::HTTPError + error_response(DOWNLOAD_FAILED) + rescue StandardError => e + Rails.logger.error("Could not scan upload #{upload.id}. Error: #{e.message}") + error_response(e.message) end def scan_file(file) - scan_response = with_session { |socket| stream_file(socket, file) } + online_service = clamav_services_pool.find_online_service + + # We open one connection to check if the service is online and another + # to scan the file. + scan_response = online_service&.scan_file(file) + return error_response(UNAVAILABLE) unless scan_response parse_response(scan_response) end @@ -85,54 +87,14 @@ def error_response(error_message) { error: true, found: false, message: error_message } end - def update_status(unavailable) - PluginStore.set(PLUGIN_NAME, UNAVAILABLE, unavailable) - end - - def target_online?(socket) - return false if socket.nil? - - ping_result = with_session(socket: socket) { |s| write_in_socket(s, "zPING\0") } - - clean_msg(ping_result) == "PONG" - end - - def clean_msg(raw) - raw.gsub("1: ", "").strip - end - - def with_session(socket: clamav_services_pool.tcp_socket) - write_in_socket(socket, "zIDSESSION\0") - - yield(socket) - - write_in_socket(socket, "zEND\0") - - response = get_full_response_from(socket) - socket.close - response - end - def parse_response(scan_response) { - message: scan_response.gsub("1: stream:", "").gsub("\0", ""), + message: scan_response.gsub("stream:", "").gsub("\0", ""), found: scan_response.include?("FOUND"), error: scan_response.include?("ERROR"), } end - def stream_file(socket, file) - write_in_socket(socket, "zINSTREAM\0") - - while data = file.read(2048) - write_in_socket(socket, [data.length].pack("N")) - write_in_socket(socket, data) - end - - write_in_socket(socket, [0].pack("N")) - write_in_socket(socket, "") - end - def get_uploaded_file(upload) if store.external? # Upload#filesize could be approximate. @@ -143,31 +105,5 @@ def get_uploaded_file(upload) File.open(store.path_for(upload)) end end - - # ClamAV wants us to read/write in a non-blocking manner to prevent deadlocks. - # Read more about this [here](https://manpages.debian.org/testing/clamav-daemon/clamd.8.en.html#IDSESSION,) - # - # We need to peek into the socket buffer to make sure we can write/read from it, - # or we risk ClamAV abruptly closing the connection. - # For that, we use [IO#select](https://www.rubydoc.info/stdlib/core/IO.select) - def write_in_socket(socket, msg) - IO.select(nil, [socket]) - socket.sendmsg_nonblock(msg, 0, nil) - end - - def read_from_socket(socket) - IO.select([socket]) - - # Returns an array with the chunk as the first element - socket.recvmsg_nonblock(25).to_a.first.to_s - end - - def get_full_response_from(socket) - buffer = "" - - buffer += read_from_socket(socket) until buffer.ends_with?("\0") - - buffer - end end end diff --git a/lib/discourse_antivirus/clamav_service.rb b/lib/discourse_antivirus/clamav_service.rb new file mode 100644 index 0000000..ef4f9cf --- /dev/null +++ b/lib/discourse_antivirus/clamav_service.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +module DiscourseAntivirus + class ClamAVService + def initialize(connection_factory, hostname, port) + @connection_factory = connection_factory + @hostname = hostname + @port = port + end + + def version + with_session { |s| write_in_socket(s, "zVERSION\0") } + end + + def online? + ping_result = with_session { |s| write_in_socket(s, "zPING\0") } + + ping_result == "PONG" + end + + def scan_file(file) + with_session do |socket| + write_in_socket(socket, "zINSTREAM\0") + + while data = file.read(2048) + write_in_socket(socket, [data.length].pack("N")) + write_in_socket(socket, data) + end + + write_in_socket(socket, [0].pack("N")) + write_in_socket(socket, "") + end + end + + private + + attr_reader :connection_factory, :hostname, :port, :connection + + def with_session + socket = connection_factory.call(hostname, port) + return if socket.nil? + + write_in_socket(socket, "zIDSESSION\0") + + yield(socket) + + write_in_socket(socket, "zEND\0") + + response = get_full_response_from(socket) + socket.close + response + end + + # ClamAV wants us to read/write in a non-blocking manner to prevent deadlocks. + # Read more about this [here](https://manpages.debian.org/testing/clamav-daemon/clamd.8.en.html#IDSESSION,) + # + # We need to peek into the socket buffer to make sure we can write/read from it, + # or we risk ClamAV abruptly closing the connection. + # For that, we use [IO#select](https://www.rubydoc.info/stdlib/core/IO.select) + def write_in_socket(socket, msg) + IO.select(nil, [socket]) + socket.sendmsg_nonblock(msg, 0, nil) + end + + def get_full_response_from(socket) + buffer = +"" + + until buffer.ends_with?("\0") + IO.select([socket]) + + # Returns an array with the chunk as the first element + buffer << socket.recvmsg_nonblock(25).to_a.first.to_s + end + + buffer.gsub("1: ", "").strip + end + end +end diff --git a/lib/discourse_antivirus/clamav_services_pool.rb b/lib/discourse_antivirus/clamav_services_pool.rb index d25f578..f4cd0a1 100644 --- a/lib/discourse_antivirus/clamav_services_pool.rb +++ b/lib/discourse_antivirus/clamav_services_pool.rb @@ -2,53 +2,46 @@ module DiscourseAntivirus class ClamAVServicesPool - UNAVAILABLE = "unavailable" - - def self.correctly_configured? - return true if Rails.env.test? - - if Rails.env.production? - SiteSetting.antivirus_srv_record.present? - else - GlobalSetting.respond_to?(:clamav_hostname) && GlobalSetting.respond_to?(:clamav_port) - end + def online_services + instances.select(&:online?) end - def tcp_socket - build_socket(service_instance.targets.first) + def all_offline? + instances.none?(&:online?) end - def all_tcp_sockets - service_instance.targets.map { |target| build_socket(target) } + def find_online_service + instances.find(&:online?) end private - def build_socket(target) - return if target.nil? - return if target.hostname.blank? - return if target.port.blank? + def connection_factory + @factory ||= + Proc.new do |hostname, port| + begin + TCPSocket.new(@hostname, @port, connect_timeout: 3) + rescue StandardError + nil + end + end + end - begin - TCPSocket.new(target.hostname, target.port) - rescue StandardError - nil - end + def instances + @instances ||= + servers + .filter { |server| server&.hostname.present? && server&.port.present? } + .map { |server| ClamAVService.new(connection_factory, server.hostname, server.port) } end - def service_instance - @instance ||= + def servers + @servers ||= if Rails.env.production? - DNSSD::ServiceInstance.new(Resolv::DNS::Name.create(SiteSetting.antivirus_srv_record)) + DNSSD::ServiceInstance.new( + Resolv::DNS::Name.create(SiteSetting.antivirus_srv_record), + ).targets else - OpenStruct.new( - targets: [ - OpenStruct.new( - hostname: GlobalSetting.clamav_hostname, - port: GlobalSetting.clamav_port, - ), - ], - ) + [OpenStruct.new(hostname: GlobalSetting.clamav_hostname, port: GlobalSetting.clamav_port)] end end end diff --git a/lib/validators/enable_discourse_antivirus_validator.rb b/lib/validators/enable_discourse_antivirus_validator.rb index 4b08d7d..95cfff4 100644 --- a/lib/validators/enable_discourse_antivirus_validator.rb +++ b/lib/validators/enable_discourse_antivirus_validator.rb @@ -8,7 +8,7 @@ def initialize(opts = {}) def valid_value?(val) return true if val == "f" - DiscourseAntivirus::ClamAVServicesPool.correctly_configured? + DiscourseAntivirus::ClamAV.correctly_configured? end def error_message diff --git a/plugin.rb b/plugin.rb index 1594929..f266858 100644 --- a/plugin.rb +++ b/plugin.rb @@ -19,27 +19,19 @@ add_admin_route "antivirus.title", "antivirus" after_initialize do - require_dependency File.expand_path( - "../app/controllers/discourse_antivirus/antivirus_controller.rb", - __FILE__, - ) - require_dependency File.expand_path( - "../lib/discourse_antivirus/clamav_services_pool.rb", - __FILE__, - ) - require_dependency File.expand_path("../lib/discourse_antivirus/clamav.rb", __FILE__) - require_dependency File.expand_path("../lib/discourse_antivirus/background_scan.rb", __FILE__) - require_dependency File.expand_path("../models/scanned_upload.rb", __FILE__) - require_dependency File.expand_path("../models/reviewable_upload.rb", __FILE__) - require_dependency File.expand_path("../serializers/reviewable_upload_serializer.rb", __FILE__) - require_dependency File.expand_path("../jobs/scheduled/scan_batch.rb", __FILE__) - require_dependency File.expand_path("../jobs/scheduled/create_scanned_uploads.rb", __FILE__) - require_dependency File.expand_path("../jobs/scheduled/fetch_antivirus_version.rb", __FILE__) - require_dependency File.expand_path( - "../jobs/scheduled/remove_orphaned_scanned_uploads.rb", - __FILE__, - ) - require_dependency File.expand_path("../jobs/scheduled/flag_quarantined_uploads.rb", __FILE__) + require_relative "app/controllers/discourse_antivirus/antivirus_controller.rb" + require_relative "lib/discourse_antivirus/clamav_services_pool.rb" + require_relative "lib/discourse_antivirus/clamav_service.rb" + require_relative "lib/discourse_antivirus/clamav.rb" + require_relative "lib/discourse_antivirus/background_scan.rb" + require_relative "models/scanned_upload.rb" + require_relative "models/reviewable_upload.rb" + require_relative "serializers/reviewable_upload_serializer.rb" + require_relative "jobs/scheduled/scan_batch.rb" + require_relative "jobs/scheduled/create_scanned_uploads.rb" + require_relative "jobs/scheduled/fetch_antivirus_version.rb" + require_relative "jobs/scheduled/remove_orphaned_scanned_uploads.rb" + require_relative "jobs/scheduled/flag_quarantined_uploads.rb" register_reviewable_type ReviewableUpload @@ -66,19 +58,16 @@ if validate && should_scan_file && upload.valid? antivirus = DiscourseAntivirus::ClamAV.instance - if antivirus.accepting_connections? - is_positive = antivirus.scan_file(file)[:found] + response = antivirus.scan_file(file) + is_positive = response[:found] - upload.errors.add(:base, I18n.t("scan.virus_found")) if is_positive - end + upload.errors.add(:base, I18n.t("scan.virus_found")) if is_positive end end if defined?(::DiscoursePrometheus) - require_dependency File.expand_path( - "../lib/discourse_antivirus/clamav_health_metric.rb", - __FILE__, - ) + require_relative "lib/discourse_antivirus/clamav_health_metric.rb" + DiscoursePluginRegistry.register_global_collector(DiscourseAntivirus::ClamAVHealthMetric, self) end diff --git a/spec/lib/discourse_antivirus/background_scan_spec.rb b/spec/lib/discourse_antivirus/background_scan_spec.rb index 3de5e37..40b3358 100644 --- a/spec/lib/discourse_antivirus/background_scan_spec.rb +++ b/spec/lib/discourse_antivirus/background_scan_spec.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "rails_helper" +require_relative "../../support/fake_pool" require_relative "../../support/fake_tcp_socket" describe DiscourseAntivirus::BackgroundScan do @@ -54,7 +55,7 @@ .with(upload, max_file_size_kb: filesize) .raises(OpenURI::HTTPError.new("forbidden", nil)) - antivirus = DiscourseAntivirus::ClamAV.new(store, build_fake_pool(socket: socket)) + antivirus = DiscourseAntivirus::ClamAV.new(store, build_fake_pool(socket)) scanner = described_class.new(antivirus) scanned_upload = ScannedUpload.create_new!(upload) @@ -74,7 +75,7 @@ filesize = upload.filesize + 2.megabytes store.expects(:download).with(upload, max_file_size_kb: filesize).returns(nil) - antivirus = DiscourseAntivirus::ClamAV.new(store, build_fake_pool(socket: socket)) + antivirus = DiscourseAntivirus::ClamAV.new(store, build_fake_pool(socket)) scanner = described_class.new(antivirus) scanned_upload = ScannedUpload.create_new!(upload) @@ -264,14 +265,14 @@ def create_scanned_upload(updated_at: 6.hours.ago, quarantined: false, scans: 0) end end - def build_fake_pool(socket:) - OpenStruct.new(tcp_socket: socket, all_tcp_sockets: [socket]) + def build_fake_pool(socket) + FakePool.new([FakeTCPSocket.online, socket]) end def build_scanner(quarantine_files: false) IO.stubs(:select) socket = quarantine_files ? FakeTCPSocket.positive : FakeTCPSocket.negative - antivirus = DiscourseAntivirus::ClamAV.new(Discourse.store, build_fake_pool(socket: socket)) + antivirus = DiscourseAntivirus::ClamAV.new(Discourse.store, build_fake_pool(socket)) described_class.new(antivirus) end end diff --git a/spec/lib/discourse_antivirus/clamav_spec.rb b/spec/lib/discourse_antivirus/clamav_spec.rb index 4f86fd1..da037b5 100644 --- a/spec/lib/discourse_antivirus/clamav_spec.rb +++ b/spec/lib/discourse_antivirus/clamav_spec.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "rails_helper" +require_relative "../../support/fake_pool" require_relative "../../support/fake_tcp_socket" describe DiscourseAntivirus::ClamAV do @@ -15,7 +16,7 @@ describe "#scan_upload" do it "returns false when the file is clear" do fake_socket = FakeTCPSocket.negative - pool = build_fake_pool(socket: fake_socket) + pool = build_fake_pool(fake_socket) antivirus = build_antivirus(pool) scan_result = antivirus.scan_upload(upload) @@ -26,7 +27,7 @@ it "returns true when the file has a virus" do fake_socket = FakeTCPSocket.positive - pool = build_fake_pool(socket: fake_socket) + pool = build_fake_pool(fake_socket) antivirus = build_antivirus(pool) scan_result = antivirus.scan_upload(upload) @@ -37,7 +38,7 @@ it "detects if ClamAV returned an error" do fake_socket = FakeTCPSocket.error - pool = build_fake_pool(socket: fake_socket) + pool = build_fake_pool(fake_socket) antivirus = build_antivirus(pool) scan_result = antivirus.scan_upload(upload) @@ -55,10 +56,10 @@ let(:last_update) { "Wed Jun 24 10:13:27 2020" } let(:socket) do - FakeTCPSocket.new(["1: #{antivirus_version}/#{database_version}/#{last_update}\0"]) + FakeTCPSocket.new("1: #{antivirus_version}/#{database_version}/#{last_update}\0") end - let(:antivirus) { build_antivirus(build_fake_pool(socket: socket)) } + let(:antivirus) { build_antivirus(build_fake_pool(socket)) } it "returns the version from the plugin store after fetching the last one" do antivirus.update_versions @@ -100,7 +101,6 @@ def assert_version_was_requested(fake_socket) expected = ["zIDSESSION\0", "zVERSION\0", "zEND\0"] expect(fake_socket.received_before_close).to contain_exactly(*expected) - expect(fake_socket.received).to be_empty end def assert_file_was_sent_through(fake_socket, file) @@ -118,11 +118,10 @@ def assert_file_was_sent_through(fake_socket, file) expected << "zEND\0" expect(fake_socket.received_before_close).to contain_exactly(*expected) - expect(fake_socket.received).to be_empty end - def build_fake_pool(socket:) - OpenStruct.new(tcp_socket: socket, all_tcp_sockets: [socket]) + def build_fake_pool(socket) + FakePool.new([FakeTCPSocket.online, socket]) end def build_antivirus(pool) diff --git a/spec/plugin_spec.rb b/spec/plugin_spec.rb index 4796d81..cca8990 100644 --- a/spec/plugin_spec.rb +++ b/spec/plugin_spec.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "rails_helper" +require_relative "support/fake_pool" require_relative "support/fake_tcp_socket" describe DiscourseAntivirus do @@ -15,7 +16,7 @@ let(:file) { file_from_fixtures(filename, "pdf") } it "scans regular files and adds an error if the scan result is positive" do - mock_antivirus(FakeTCPSocket.positive(include_pong: true)) + mock_antivirus(FakeTCPSocket.positive) scanned_upload = UploadCreator.new(file, filename).create_for(user.id) @@ -23,7 +24,7 @@ end it "scans regular files but does nothing if the scan result is negative" do - mock_antivirus(FakeTCPSocket.negative(include_pong: true)) + mock_antivirus(FakeTCPSocket.negative) scanned_upload = UploadCreator.new(file, filename).create_for(user.id) @@ -74,7 +75,7 @@ it "scans the image if the live scan images setting is enabled" do SiteSetting.antivirus_live_scan_images = true - mock_antivirus(FakeTCPSocket.positive(include_pong: true)) + mock_antivirus(FakeTCPSocket.positive) scanned_upload = UploadCreator.new(file, filename).create_for(user.id) @@ -106,7 +107,7 @@ def mock_antivirus(socket) IO.stubs(:select).returns(true) - pool = OpenStruct.new(tcp_socket: socket, all_tcp_sockets: [socket]) + pool = FakePool.new([FakeTCPSocket.online, socket]) antivirus = DiscourseAntivirus::ClamAV.new(Discourse.store, pool) DiscourseAntivirus::ClamAV.expects(:instance).returns(antivirus) end diff --git a/spec/support/fake_pool.rb b/spec/support/fake_pool.rb new file mode 100644 index 0000000..5adf860 --- /dev/null +++ b/spec/support/fake_pool.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +class FakePool < DiscourseAntivirus::ClamAVServicesPool + def initialize(sockets) + @sockets = sockets + @connections = 0 + end + + private + + def connection_factory + Proc.new { @sockets[@connections].tap { @connections += 1 } } + end + + def servers + [OpenStruct.new(hostname: "fake.hostname", port: "8080")] + end +end diff --git a/spec/support/fake_tcp_socket.rb b/spec/support/fake_tcp_socket.rb index 9e311a4..265c257 100644 --- a/spec/support/fake_tcp_socket.rb +++ b/spec/support/fake_tcp_socket.rb @@ -1,54 +1,52 @@ # frozen_string_literal: true class FakeTCPSocket - def self.positive(include_pong: false) - responses = include_pong ? ["1: PONG\0"] : [] - responses << "1: stream: Win.Test.EICAR_HDB-1 FOUND\0" - new(responses) + def self.online + new("1: PONG\0") end - def self.negative(include_pong: false) - responses = include_pong ? ["1: PONG\0"] : [] - responses << "1: stream: OK\0" - new(responses) + def self.positive + new("1: stream: Win.Test.EICAR_HDB-1 FOUND\0") + end + + def self.negative + new("1: stream: OK\0") end def self.error - new(["1: INSTREAM size limit exceeded. ERROR\0"]) + new("1: INSTREAM size limit exceeded. ERROR\0") end - def initialize(canned_responses) - @canned_responses = canned_responses + def initialize(canned_response) + @canned_response = canned_response + @closed = false @received_before_close = [] @received = [] @next_to_read = 0 - @current_response = 0 end attr_reader :received, :received_before_close - attr_accessor :canned_responses + attr_accessor :canned_response def sendmsg_nonblock(text, _, _) + raise if @closed + received << text end def recvmsg_nonblock(maxlen) + raise if @closed + interval_end = @next_to_read + maxlen - response_chunk = canned_responses[@current_response][@next_to_read..interval_end] + response_chunk = canned_response[@next_to_read..interval_end] - if response_chunk.ends_with?("\0") - @current_response += 1 - @next_to_read = 0 - else - @next_to_read += interval_end + 1 - end + @next_to_read += interval_end + 1 unless response_chunk.ends_with?("\0") [response_chunk] end def close - @next_to_read = 0 + @closed = true @received_before_close = @received - @received = [] end end