Skip to content

Commit

Permalink
Redistribute responsibilities.
Browse files Browse the repository at this point in the history
This change builds upon Bianca's idea of create the `ClamAVService` class and reorganized the code based on the following rationale:

`ClamAV` is what glues everything together. It knows how to talk to the S3 store, the plugin store and the `ClamAVService` to display the version, check availability, and scan files. It gets the raw response from ClamAV and translates to a different format which the rest of the plugin relies on.
`ClamAVService` defines an interface of which operations are possible (`online?`, `scan_file`, `version`) and takes care of the socket and session managemente, as well as writting/reading from the socket.
`ClamAVServicePool` resolves the SRV record and instantiates existing services. It tells them how to open a connection.

While working on this, I tried to remove the latter and do everything using only the first two, but I decided to keep it in the end because this design is handy for testing. We rely on the `FakeTCPSocket` for spying on the communication and returning specific responses, and this design let us use without relying on a mocking framework, just dependency injection.
  • Loading branch information
romanrizzi committed Aug 13, 2023
1 parent aabb12a commit 4865f9b
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 144 deletions.
113 changes: 20 additions & 93 deletions lib/discourse_antivirus/clamav.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ def versions

def update_versions
antivirus_versions =
clamav_services_pool.instances.map do |instance|
antivirus_version = with_session(instance) { |s| write_in_socket(s, "zVERSION\0") }
antivirus_version = clean_msg(antivirus_version).split("/")
clamav_services_pool.online_services.map do |service|
antivirus_version = service.version.split("/")

{
antivirus: antivirus_version[0],
Expand All @@ -49,35 +48,33 @@ def update_versions
end

def accepting_connections?
available = online_target.present?
unavailable = clamav_services_pool.all_offline?

update_status(!available)
PluginStore.set(PLUGIN_NAME, UNAVAILABLE, unavailable)

available
!unavailable
end

def scan_upload(upload)
begin
file = get_uploaded_file(upload)
file = get_uploaded_file(upload)

return error_response(DOWNLOAD_FAILED) if file.nil?
return error_response(DOWNLOAD_FAILED) if file.nil?

scan_file(file)
rescue OpenURI::HTTPError
error_response(DOWNLOAD_FAILED)
rescue StandardError => e
Rails.logger.error("Could not scan upload #{upload.id}. Error: #{e.message}")
error_response(e.message)
end
scan_file(file)
rescue OpenURI::HTTPError
error_response(DOWNLOAD_FAILED)
rescue StandardError => e
Rails.logger.error("Could not scan upload #{upload.id}. Error: #{e.message}")
error_response(e.message)
end

def scan_file(file)
scan_response =
begin
with_session { |socket| stream_file(socket, file) }
rescue StandardError => e
e.message
end
online_service = clamav_services_pool.find_online_service

# We open one connection to check if the service is online and another
# to scan the file. Code belows check if one of them failed with `&` and `unless`:
scan_response = online_service&.scan_file(file)
return error_response(UNAVAILABLE) unless scan_response

parse_response(scan_response)
end
Expand All @@ -90,58 +87,14 @@ def error_response(error_message)
{ error: true, found: false, message: error_message }
end

def update_status(unavailable)
PluginStore.set(PLUGIN_NAME, UNAVAILABLE, unavailable)
end

def online_target
clamav_services_pool.instances.find do |instance|
ping_result = with_session(instance) { |s| write_in_socket(s, "zPING\0") }

clean_msg(ping_result) == "PONG"
end
end

def clean_msg(raw)
raw.gsub("1: ", "").strip
end

def with_session(instance = online_target)
socket = instance&.connect!

raise "ERROR: socket cannot be open" if !socket

write_in_socket(socket, "zIDSESSION\0")

yield(socket)

write_in_socket(socket, "zEND\0")

response = get_full_response_from(socket)
socket.close
response
end

def parse_response(scan_response)
{
message: scan_response.gsub("1: stream:", "").gsub("\0", ""),
message: scan_response.gsub("stream:", "").gsub("\0", ""),
found: scan_response.include?("FOUND"),
error: scan_response.include?("ERROR"),
}
end

def stream_file(socket, file)
write_in_socket(socket, "zINSTREAM\0")

while data = file.read(2048)
write_in_socket(socket, [data.length].pack("N"))
write_in_socket(socket, data)
end

write_in_socket(socket, [0].pack("N"))
write_in_socket(socket, "")
end

def get_uploaded_file(upload)
if store.external?
# Upload#filesize could be approximate.
Expand All @@ -152,31 +105,5 @@ def get_uploaded_file(upload)
File.open(store.path_for(upload))
end
end

# ClamAV wants us to read/write in a non-blocking manner to prevent deadlocks.
# Read more about this [here](https://manpages.debian.org/testing/clamav-daemon/clamd.8.en.html#IDSESSION,)
#
# We need to peek into the socket buffer to make sure we can write/read from it,
# or we risk ClamAV abruptly closing the connection.
# For that, we use [IO#select](https://www.rubydoc.info/stdlib/core/IO.select)
def write_in_socket(socket, msg)
IO.select(nil, [socket])
socket.sendmsg_nonblock(msg, 0, nil)
end

def read_from_socket(socket)
IO.select([socket])

# Returns an array with the chunk as the first element
socket.recvmsg_nonblock(25).to_a.first.to_s
end

def get_full_response_from(socket)
buffer = ""

buffer += read_from_socket(socket) until buffer.ends_with?("\0")

buffer
end
end
end
72 changes: 66 additions & 6 deletions lib/discourse_antivirus/clamav_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,77 @@

module DiscourseAntivirus
class ClamAVService
def initialize(hostname, port)
def initialize(connection_factory, hostname, port)
@connection_factory = connection_factory
@hostname = hostname
@port = port
end

def connect!
begin
TCPSocket.new(@hostname, @port, connect_timeout: 3)
rescue StandardError
nil
def version
with_session { |s| write_in_socket(s, "zVERSION\0") }
end

def online?
ping_result = with_session { |s| write_in_socket(s, "zPING\0") }

ping_result == "PONG"
end

def scan_file(file)
with_session do |socket|
write_in_socket(socket, "zINSTREAM\0")

while data = file.read(2048)
write_in_socket(socket, [data.length].pack("N"))
write_in_socket(socket, data)
end

write_in_socket(socket, [0].pack("N"))
write_in_socket(socket, "")
end
end

private

attr_reader :connection_factory, :hostname, :port, :connection

def with_session
socket = connection_factory.call(hostname, port)
return if socket.nil?

write_in_socket(socket, "zIDSESSION\0")

yield(socket)

write_in_socket(socket, "zEND\0")

response = get_full_response_from(socket)
socket.close
response
end

# ClamAV wants us to read/write in a non-blocking manner to prevent deadlocks.
# Read more about this [here](https://manpages.debian.org/testing/clamav-daemon/clamd.8.en.html#IDSESSION,)
#
# We need to peek into the socket buffer to make sure we can write/read from it,
# or we risk ClamAV abruptly closing the connection.
# For that, we use [IO#select](https://www.rubydoc.info/stdlib/core/IO.select)
def write_in_socket(socket, msg)
IO.select(nil, [socket])
socket.sendmsg_nonblock(msg, 0, nil)
end

def get_full_response_from(socket)
buffer = +""

until buffer.ends_with?("\0")
IO.select([socket])

# Returns an array with the chunk as the first element
buffer << socket.recvmsg_nonblock(25).to_a.first.to_s
end

buffer.gsub("1: ", "").strip
end
end
end
29 changes: 26 additions & 3 deletions lib/discourse_antivirus/clamav_services_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,38 @@

module DiscourseAntivirus
class ClamAVServicesPool
def online_services
instances.select(&:online?)
end

def all_offline?
instances.none?(&:online?)
end

def find_online_service
instances.detect(&:online?)
end

private

def connection_factory
@factory ||=
Proc.new do |hostname, port|
begin
TCPSocket.new(@hostname, @port, connect_timeout: 3)
rescue StandardError
nil
end
end
end

def instances
@instances ||=
servers
.filter { |server| server&.hostname.present? && server&.port.present? }
.map { |server| ClamAVService.new(server.hostname, server.port) }
.map { |server| ClamAVService.new(connection_factory, server.hostname, server.port) }
end

private

def servers
@servers ||=
if Rails.env.production?
Expand Down
8 changes: 3 additions & 5 deletions plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,10 @@
if validate && should_scan_file && upload.valid?
antivirus = DiscourseAntivirus::ClamAV.instance

if antivirus.accepting_connections?
response = antivirus.scan_file(file)
is_positive = response[:found]
response = antivirus.scan_file(file)
is_positive = response[:found]

upload.errors.add(:base, I18n.t("scan.virus_found")) if is_positive
end
upload.errors.add(:base, I18n.t("scan.virus_found")) if is_positive
end
end

Expand Down
11 changes: 6 additions & 5 deletions spec/lib/discourse_antivirus/background_scan_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "rails_helper"
require_relative "../../support/fake_pool"
require_relative "../../support/fake_tcp_socket"

describe DiscourseAntivirus::BackgroundScan do
Expand Down Expand Up @@ -54,7 +55,7 @@
.with(upload, max_file_size_kb: filesize)
.raises(OpenURI::HTTPError.new("forbidden", nil))

antivirus = DiscourseAntivirus::ClamAV.new(store, build_fake_pool(socket: socket))
antivirus = DiscourseAntivirus::ClamAV.new(store, build_fake_pool(socket))
scanner = described_class.new(antivirus)
scanned_upload = ScannedUpload.create_new!(upload)

Expand All @@ -74,7 +75,7 @@
filesize = upload.filesize + 2.megabytes
store.expects(:download).with(upload, max_file_size_kb: filesize).returns(nil)

antivirus = DiscourseAntivirus::ClamAV.new(store, build_fake_pool(socket: socket))
antivirus = DiscourseAntivirus::ClamAV.new(store, build_fake_pool(socket))
scanner = described_class.new(antivirus)
scanned_upload = ScannedUpload.create_new!(upload)

Expand Down Expand Up @@ -264,14 +265,14 @@ def create_scanned_upload(updated_at: 6.hours.ago, quarantined: false, scans: 0)
end
end

def build_fake_pool(socket:)
OpenStruct.new(tcp_socket: socket, all_tcp_sockets: [socket])
def build_fake_pool(socket)
FakePool.new([FakeTCPSocket.online, socket])
end

def build_scanner(quarantine_files: false)
IO.stubs(:select)
socket = quarantine_files ? FakeTCPSocket.positive : FakeTCPSocket.negative
antivirus = DiscourseAntivirus::ClamAV.new(Discourse.store, build_fake_pool(socket: socket))
antivirus = DiscourseAntivirus::ClamAV.new(Discourse.store, build_fake_pool(socket))
described_class.new(antivirus)
end
end
Loading

0 comments on commit 4865f9b

Please sign in to comment.