Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions lib/kubeclient/informer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ def initialize(client, resource_name, reconcile_timeout: 15 * 60, logger: nil)
@logger = logger
@cache = nil
@started = nil
@stopped = false
@watching = []
end

Expand All @@ -21,22 +22,34 @@ def watch(&block)

# not implicit so users know they have to `stop`
def start_worker
@stopped = false
@worker = Thread.new do
loop do
fill_cache
watch_to_update_cache
rescue StandardError => e
# need to keep retrying since we work in the background
@logger&.error("ignoring error during background work #{e}")
ensure
sleep(1) # do not overwhelm the api-server if we are somehow broken
begin
fill_cache
watch_to_update_cache
rescue StandardError => e
# need to keep retrying since we work in the background
@logger&.error("ignoring error during background work #{e}")
ensure
sleep(1) # do not overwhelm the api-server if we are somehow broken
end
break if @stopped
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not change the loop to until @stopped then?

end
end
sleep(0.01) until @cache
sleep(0.01) until @cache || @stopped
end

def stop_worker
@worker&.kill # TODO: be nicer ?
@stopped = true
[@waiter, @worker].compact.each do |thread|
begin
thread.run # cancel sleep so either the loop sleep or the timeout sleep are interrupted
rescue ThreadError
# thread was already dead
end
thread.join
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question about @waiter possibly blocking for many minutes.
Here it'd block the app calling stop_worker right?

  • I'm thinking there is a subordinate relationship between worker thread -> single watch_to_update_cache run -> waiter thread. 🤔
    What if waiter threads did not refer to current instance variable @watcher.finish but closed over a lexically scoped reference to their watcher? Would then it be safe to leak unfinished waiter threads without .join-ing them? Would it be a good idea?

  • Waiter threads don't do much — are they safe to .kill? Or is there another way to interrupt a sleep() early?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is there another way to interrupt a sleep() early?

You can use Concurrent::Event with a wait timeout to implement an interruptible sleep.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before we killed them, but that made the tests brittle, so I like the joining since that makes anything that goes wrong more obvious (and re-raises exceptions too)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my testing @watcher.each can get blocked on the response stream. Even closing the http_client will not unblock the response body stream.

In that case join on the waiter thread that runs the @watcher.each internally will never return

end
end

private
Expand Down Expand Up @@ -69,7 +82,7 @@ def watch_to_update_cache
stop_reason = 'disconnect'

# stop watcher without using timeout
Thread.new do
@waiter = Thread.new do
sleep(@reconcile_timeout)
stop_reason = 'reconcile'
@watcher.finish
Expand All @@ -88,6 +101,14 @@ def watch_to_update_cache
@watching.each { |q| q << notice }
end
@logger&.info("watch restarted: #{stop_reason}")

# wake the waiter unless it's dead so it does not hang around
begin
Thread.pass # make sure we get into the sleep state of the waiter
@waiter.run
rescue ThreadError # rubocop:disable Lint/SuppressedException
end
@waiter.join
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I think I see — previously, leftover waiter thread(s) could execute @watcher.finish at any time, possibly interrupting unrelated watchers from later tests, right?

But if we somehow (e.g. 'ERROR') come here early during sleep(@reconcile_timeout), we might block for a long time — default 15min right?
Here, what we're potentially blocking is the existing worker thread's loop, which won't be directly felt by the app but can cause significant gaps in the watching?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would have killed a random new waiter before (not really in tests since they all build their own informer)

added a Thread.pass to fix the race condition (don't think it really happens since the watch is a http request so it will take some time, but it's cheap so 🤷 )

end
end
end
17 changes: 10 additions & 7 deletions test/test_informer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
require 'stringio'
require 'logger'

# tests with_retries in kubeclient.rb
class RetryTest < MiniTest::Test
class TestInformer < MiniTest::Test
def setup
super
skip if RUBY_ENGINE == 'truffleruby' # TODO: race condition in truffle-ruby fails random tests
Expand Down Expand Up @@ -87,14 +86,14 @@ def test_restarts_on_error
status: 200
)
slept = []
informer.stubs(:sleep).with { |x| slept << x; sleep(0.01) }
informer.stubs(:sleep).with { |x| slept << x; sleep(0.02) }

with_worker do
assert_equal(['a'], informer.list.map { |p| p.metadata.name })
sleep(0.05)
sleep(0.2) # should give us 5+ restarts (each timeout is 1 sleep and 1 sleep before restart)
end

assert slept.size >= 2, slept
assert slept.size >= 4, slept
assert_requested(list, at_least_times: 2)
assert_requested(watch, at_least_times: 2)
end
Expand Down Expand Up @@ -131,10 +130,14 @@ def test_can_watch_watches
def test_timeout
timeout = 0.1
informer.instance_variable_set(:@reconcile_timeout, timeout)
stub_list
list = stub_list
Kubeclient::Common::WatchStream.any_instance.expects(:finish)
stub_request(:get, %r{/v1/watch/pods})
watch = stub_request(:get, %r{/v1/watch/pods})

with_worker { sleep(timeout * 1.9) }

assert_requested(list)
assert_requested(watch)
end

private
Expand Down