Skip to content

Commit 83864e0

Browse files
Send one heartbeat on scheduling service startup anyway
1 parent a92df70 commit 83864e0

File tree

3 files changed

+31
-3
lines changed

3 files changed

+31
-3
lines changed

connectors/protocol/connectors.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -734,9 +734,10 @@ def sync_cursor(self):
734734
def api_key_secret_id(self):
735735
return self.get("api_key_secret_id")
736736

737-
async def heartbeat(self, interval):
737+
async def heartbeat(self, interval, force=False):
738738
if (
739-
self.last_seen is None
739+
force
740+
or self.last_seen is None
740741
or (datetime.now(timezone.utc) - self.last_seen).total_seconds() > interval
741742
):
742743
self.log_debug("Sending heartbeat")

connectors/services/job_scheduling.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,14 @@ def __init__(self, config):
3737
self.idling = self.service_config["idling"]
3838
self.heartbeat_interval = self.service_config["heartbeat"]
3939
self.source_list = config["sources"]
40+
self.first_run = True
4041
self.last_wake_up_time = datetime.now(timezone.utc)
4142

4243
async def _schedule(self, connector):
44+
# To do some first-time stuff
45+
just_started = self.first_run
46+
self.first_run = False
47+
4348
if self.running is False:
4449
connector.log_debug("Skipping run because service is terminating")
4550
return
@@ -66,7 +71,7 @@ async def _schedule(self, connector):
6671
raise
6772

6873
# the heartbeat is always triggered
69-
await connector.heartbeat(self.heartbeat_interval)
74+
await connector.heartbeat(self.heartbeat_interval, force=just_started)
7075

7176
connector.log_debug(f"Status is {connector.status}")
7277

tests/services/test_job_scheduling.py

+22
Original file line numberDiff line numberDiff line change
@@ -438,3 +438,25 @@ def _source_klass(config):
438438
data_source_mock.close.assert_awaited_once()
439439

440440
connector.error.assert_awaited_with(error)
441+
442+
443+
@pytest.mark.asyncio
444+
async def test_initial_loop_run_heartbeat_only_once(
445+
connector_index_mock, sync_job_index_mock, set_env
446+
):
447+
connector = mock_connector(next_sync=None)
448+
connector_index_mock.supported_connectors.return_value = AsyncIterator(
449+
[connector, connector, connector, connector]
450+
) # to emulate 4 runs
451+
await create_and_run_service(JobSchedulingService)
452+
453+
connector.prepare.assert_awaited()
454+
connector.heartbeat.assert_awaited()
455+
assert connector.heartbeat.call_count == 4
456+
# Only the first call to heartbeat should pass "True"
457+
assert connector.heartbeat.call_args_list[0].kwargs["force"] is True
458+
assert connector.heartbeat.call_args_list[1].kwargs["force"] is False
459+
assert connector.heartbeat.call_args_list[2].kwargs["force"] is False
460+
assert connector.heartbeat.call_args_list[3].kwargs["force"] is False
461+
connector.update_last_sync_scheduled_at_by_job_type.assert_not_awaited()
462+
sync_job_index_mock.create.assert_not_awaited()

0 commit comments

Comments
 (0)