From fe24bc368ad20a72f0c507626184f43efe50dfb7 Mon Sep 17 00:00:00 2001 From: Phil Budne Date: Wed, 13 Mar 2024 15:06:20 -0400 Subject: [PATCH 1/6] Take another page from scrapy scheduling internals Override _on_input_message, which runs in the Pika thread when a new message is received from RabbitMQ, and rather than just queuing the message to the internal work queue (for consumption by worker threads), decode it, and see when it could next be started (using the Slot.issue_interval calculated from avg_seconds for request completion to keep "next_issue" time). If the delay is less than the fast delay queue time (set with --busy-delay-minutes), use the Pika connection "call_later" method to delay putting the message on the work queue until it's ripe to be issued. If the delay is longer than busy-delay-minutes, requeue the message to the -fast queue. This GREATLY reduces use of the -fast queue (lower CPU load) AND means that requests can be started as soon as possible, without waiting for the message to come around through RabbitMQ (better thruput). Also: default worker count to the number of available CPU cores. --- indexer/storyapp.py | 26 +- indexer/worker.py | 54 ++-- indexer/workers/fetcher/sched.py | 277 ++++++++++++-------- indexer/workers/fetcher/tqfetcher.py | 368 ++++++++++++++++++--------- 4 files changed, 472 insertions(+), 253 deletions(-) diff --git a/indexer/storyapp.py b/indexer/storyapp.py index d870b4f3..324336f3 100644 --- a/indexer/storyapp.py +++ b/indexer/storyapp.py @@ -10,6 +10,7 @@ import argparse import logging +import multiprocessing import os import queue import sys @@ -206,15 +207,22 @@ def __init__(self, process_name: str, descr: str): # and avoid possible (if unlikely) surprise later. self.senders: Dict[BlockingChannel, StorySender] = {} - def process_message(self, im: InputMessage) -> None: - chan = im.channel - if chan in self.senders: - sender = self.senders[chan] - else: + def decode_story(self, im: InputMessage) -> BaseStory: + story = BaseStory.load(im.body) + assert isinstance(story, BaseStory) + return story + + def _story_sender(self, chan: BlockingChannel) -> StorySender: + sender = self.senders.get(chan) + if not sender: sender = self.senders[chan] = StorySender(self, chan) + return sender + + def process_message(self, im: InputMessage) -> None: + sender = self._story_sender(im.channel) # raised exceptions will cause retry; quarantine immediately? - story = BaseStory.load(im.body) + story = self.decode_story(im) self.process_story(sender, story) @@ -369,10 +377,8 @@ def end_of_batch(self) -> None: class MultiThreadStoryWorker(IntervalMixin, StoryWorker): # include thread name in log message format LOG_FORMAT = "thread" - - # subclass must set value! - # (else will see AttributeError) - WORKER_THREADS_DEFAULT: int + CPU_COUNT = multiprocessing.cpu_count() + WORKER_THREADS_DEFAULT = CPU_COUNT def __init__(self, process_name: str, descr: str): super().__init__(process_name, descr) diff --git a/indexer/worker.py b/indexer/worker.py index 22fc2134..b49afa11 100644 --- a/indexer/worker.py +++ b/indexer/worker.py @@ -349,7 +349,7 @@ def start_pika_thread(self) -> None: return self._pika_thread = threading.Thread( - target=self._pika_thread_body, name="Pika-thread", daemon=True + target=self._pika_thread_body, name="Pika", daemon=True ) self._pika_thread.start() @@ -410,11 +410,8 @@ def _pika_thread_body(self) -> None: logger.info("Pika thread exiting") def _call_in_pika_thread(self, cb: Callable[[], None]) -> None: - assert self.connection - - # XXX this will need a lock if app runs in multiple threads if self._pika_thread is None: - # here from a QApp + # here from a QApp in Main thread # transactions will NOT be enabled # (unless _subscribe is overridden) self.start_pika_thread() @@ -428,6 +425,7 @@ def _call_in_pika_thread(self, cb: Callable[[], None]) -> None: # NOTE! add_callback_threadsafe is documented (in the Pika # 1.3.2 comments) as the ONLY thread-safe connection method!!! + assert self.connection # checked above self.connection.add_callback_threadsafe(cb) def _stop_pika_thread(self) -> None: @@ -565,8 +563,17 @@ def _on_message( """ im = InputMessage(chan, method, properties, body, time.monotonic()) msglogger.debug("on_message tag #%s", method.delivery_tag) + self._on_input_message(im) + + def _on_input_message(self, im: InputMessage) -> None: + """ + called in Pika thread. + override to interrupt direct delivery of messages to _message_queue + """ self._message_queue.put(im) + _put_message_queue = _on_input_message + def _subscribe(self) -> None: """ Called from Pika thread with newly opened connection. @@ -623,9 +630,9 @@ def _process_one_message(self, im: InputMessage) -> bool: except QuarantineException as e: status = "error" self._quarantine(im, e) - except RequeueException as e: + except RequeueException: status = "requeue" - self._requeue(im, e) + self._requeue(im) except Exception as e: if self._crash_on_exception: raise # for debug @@ -655,18 +662,25 @@ def _ack_and_commit(self, im: InputMessage, multiple: bool = False) -> None: This avoids using functools.partial, which I find less illustrative of a function call with captured values. -phil """ - tag = im.method.delivery_tag # tag from last message - assert tag is not None def acker() -> None: - msglogger.debug("ack and commit #%s", tag) + self._pika_ack_and_commit(im, multiple) - im.channel.basic_ack(delivery_tag=tag, multiple=multiple) + self._call_in_pika_thread(acker) - # AFTER basic_ack! - im.channel.tx_commit() # commit sent messages and ack atomically! + def _pika_ack_and_commit(self, im: InputMessage, multiple: bool = False) -> None: + """ + call ONLY from pika thread!! + """ + tag = im.method.delivery_tag # tag from last message + assert tag is not None - self._call_in_pika_thread(acker) + chan = im.channel + + msglogger.debug("ack and commit #%s", tag) + chan.basic_ack(delivery_tag=tag, multiple=multiple) + # AFTER basic_ack! + chan.tx_commit() # commit sent messages and ack atomically! def _exc_headers(self, e: Exception) -> Dict: """ @@ -774,7 +788,7 @@ def _retry(self, im: InputMessage, e: Exception) -> bool: def set_requeue_delay_ms(self, ms: int) -> None: self.requeue_delay_str = str(int(ms)) - def _requeue(self, im: InputMessage, e: Exception) -> bool: + def _requeue(self, im: InputMessage) -> bool: """ Requeue message to -fast queue, which has no consumers, with an expiration/TTL; when messages expire, they are routed @@ -797,6 +811,16 @@ def _requeue(self, im: InputMessage, e: Exception) -> bool: ) return True # requeued + def message_queue_len(self) -> int: + """ + NOTE! the underlying qsize method is described as "unreliable" + USE ONLY FOR LOGGING/STATS!! + """ + if self._message_queue: + return self._message_queue.qsize() + else: + return 0 + def process_message(self, im: InputMessage) -> None: raise NotImplementedError("Worker.process_message not overridden") diff --git a/indexer/workers/fetcher/sched.py b/indexer/workers/fetcher/sched.py index 95919970..4589f9cc 100644 --- a/indexer/workers/fetcher/sched.py +++ b/indexer/workers/fetcher/sched.py @@ -14,10 +14,8 @@ from enum import Enum from typing import Any, Callable, Dict, List, NamedTuple, NoReturn, Optional, Type -from indexer.app import App - # number of seconds after start of last request to keep idle slot around -# (maintains request RTT) +# if no active/delayed requests and no errors (maintains request RTT) SLOT_RECENT_MINUTES = 5 # exponential moving average coefficient for avg_seconds. @@ -28,21 +26,6 @@ logger = logging.getLogger(__name__) -# _could_ try and map Slots by IP address(es), since THAT gets us closer -# to the point (of not hammering a particular server); -# -# BUT: Would have to deal with: -# 1. A particular FQDN may map to multiple IP addrs -# 2. The order of the IP addreses often changes between queries -# (so doing a lookup first, then connecting by name -# will likely give different first answers) -# 3. The ENTIRE SET might change if a CDN is involved! -# 4. Whether or not we're using IPv6 (if not, can ignore IPv6) -# 5. IP addresses can serve multiple domains -# 6. But domains served by shared servers (see #5) -# might have disjoint IP addr sets. - - class LockError(RuntimeError): """ base class for locking exceptions @@ -158,14 +141,39 @@ def __str__(self) -> str: return f"{self.elapsed():.3f}" -class IssueStatus(Enum): +class Alarm: """ - return value from Slot._issue + time until a future event (born expired) """ - OK = 0 # slot assigned - BUSY = 1 # too many fetches active or too soon - SKIPPED = 2 # recent connection error + def __init__(self) -> None: + self.time = 0.0 + + def set(self, delay: float) -> None: + """ + if unexpired, extend expiration by delay seconds; + else set expiration to delay seconds in future + """ + now = time.monotonic() + if self.time > now: + self.time += delay + else: + self.time = now + delay + + def delay(self) -> float: + """ + return seconds until alarm expires + """ + return self.time - time.monotonic() + + def __str__(self) -> str: + """ + used in log messages! + """ + delay = self.delay() + if delay >= 0: + return "%.2f" % delay + return "READY" class ConnStatus(Enum): @@ -179,6 +187,20 @@ class ConnStatus(Enum): DATA = 1 +class StartStatus(Enum): + """ + status from start() + """ + + OK = 1 # ok to start + SKIP = 2 # recently reported down + BUSY = 3 # currently busy + + +DELAY_SKIP = -1.0 # recent attempt failed +DELAY_LONG = -2.0 # delay to long to hold + + class Slot: """ A slot for a single id (eg domain) within a ScoreBoard @@ -188,59 +210,83 @@ def __init__(self, slot_id: str, sb: "ScoreBoard"): self.slot_id = slot_id # ie; domain self.sb = sb - self.active_count = 0 - self.last_issue = Timer(SLOT_RECENT_MINUTES * 60) - # time since last error at this workplace + # time since last error at this workplace: self.last_conn_error = Timer(sb.conn_retry_seconds) + self.avg_seconds = 0.0 # smoothed average - self.issue_interval = 0.0 + self.issue_interval = sb.min_interval_seconds + + self.last_start = Timer(SLOT_RECENT_MINUTES) + + self.next_issue = Alarm() + self.delayed = 0 + self.active = 0 # O(n) removal, only used for debug_info # unclear if using a Set would be better or not... self.active_threads: List[str] = [] - def _issue(self) -> IssueStatus: + def _get_delay(self) -> float: """ - return True if safe to issue (must call "retire" after) - return False if cannot be issued now + return delay in seconds until fetch can begin. + or value < 0 (DELAY_XXX) """ + # NOTE! Lock held: avoid logging! self.sb.big_lock.assert_held() - # scrapy issue interval is avg_latency / concurrency - # goal is to keep "concurrency" requests active - if self.avg_seconds == 0: # no running average yet. - # issue up to concurrency limit requests: - if self.active_count >= self.sb.target_concurrency: - return IssueStatus.BUSY - else: # have running average of request times. - elapsed = self.last_issue.elapsed() - if elapsed < self.issue_interval: - # WISH: return delta, for second try sleep time?? - return IssueStatus.BUSY + # see if connection to domain failed "recently". + if not self.last_conn_error.expired(): + return DELAY_SKIP + + delay = self.next_issue.delay() + if delay > self.sb.max_delay_seconds: + return DELAY_LONG + + if delay < 0: + delay = 0.0 # never issued or past due + + self.next_issue.set(self.issue_interval) + self.delayed += 1 + self.sb.delayed += 1 + return delay + + def _start(self) -> StartStatus: + """ + Here in a worker thread, convert from delayed to active + returns False if connection failed recently + """ + # NOTE! Lock held: avoid logging! + self.sb.big_lock.assert_held() + + self.delayed -= 1 + self.sb.delayed -= 1 # see if connection to domain failed "recently". # last test so that preference is short delay # (and hope an active fetch succeeds). if not self.last_conn_error.expired(): - return IssueStatus.SKIPPED + return StartStatus.SKIP - self.active_count += 1 - self.last_issue.reset() + if self.active >= self.sb.target_concurrency: + return StartStatus.BUSY + + self.active += 1 + self.last_start.reset() self.active_threads.append(threading.current_thread().name) - return IssueStatus.OK + return StartStatus.OK - def retire(self, conn_status: ConnStatus, sec: float) -> None: + def finish(self, conn_status: ConnStatus, sec: float) -> None: """ called when a fetch attempt has ended. """ with self.sb.big_lock: # NOTE! Avoid logging while holding lock!!! - assert self.active_count > 0 - self.active_count -= 1 - # remove on list is O(n), but n is small (concurrency limit) + assert self.active > 0 + self.active -= 1 + # remove on list is O(n), but n is small (concurrency target) self.active_threads.remove(threading.current_thread().name) oavg = self.avg_seconds if conn_status == ConnStatus.NOCONN: @@ -258,27 +304,25 @@ def retire(self, conn_status: ConnStatus, sec: float) -> None: self.avg_seconds = sec if self.avg_seconds != oavg: - self.issue_interval = self.avg_seconds / self.sb.target_concurrency + interval = self.avg_seconds / self.sb.target_concurrency + if interval < self.sb.min_interval_seconds: + interval = self.sb.min_interval_seconds + self.issue_interval = interval # adjust scoreboard counters - self.sb._slot_retired(self.active_count == 0) + self.sb._slot_finished(self.active == 0) def _consider_removing(self) -> None: self.sb.big_lock.assert_held() # PARANOIA if ( - self.active_count == 0 - and self.last_issue.expired() + self.active == 0 + and self.delayed == 0 + and self.last_start.expired() and self.last_conn_error.expired() ): self.sb._remove_slot(self.slot_id) -# status/value tuple: popular in GoLang -class IssueReturn(NamedTuple): - status: IssueStatus - slot: Optional[Slot] # if status == OK - - TS_IDLE = "idle" @@ -287,19 +331,41 @@ class ThreadStatus: ts: float # time.monotonic +class StartRet(NamedTuple): + """ + return value from start() + """ + + status: StartStatus + slot: Optional[Slot] + + +class Stats(NamedTuple): + """ + statistics returned by periodic() + """ + + slots: int + active_fetches: int + active_slots: int + delayed: int + + class ScoreBoard: """ keep score of active requests by "id" (str) """ + # arguments changed often in development, + # so all must be passed by keyword def __init__( self, - app: App, # for stats - max_active: int, # total concurrent active limit + *, target_concurrency: int, # max active with same id (domain) + max_delay_seconds: float, # max time to hold conn_retry_seconds: float, # seconds before connect retry + min_interval_seconds: float, ): - self.app = app # single lock, rather than one each for scoreboard, active count, # and each slot. Time spent with lock held should be small, # and lock ordering issues likely to make code complex and fragile! @@ -307,12 +373,15 @@ def __init__( self.big_lock = Lock( "big_lock", self.debug_info_nolock ) # covers ALL variables! - self.max_active = max_active self.target_concurrency = target_concurrency + self.max_delay_seconds = max_delay_seconds self.conn_retry_seconds = conn_retry_seconds + self.min_interval_seconds = min_interval_seconds + self.slots: Dict[str, Slot] = {} # map "id" (domain) to Slot - self.active_fetches = 0 self.active_slots = 0 + self.active_fetches = 0 + self.delayed = 0 # map thread name to ThreadStatus self.thread_status: Dict[str, ThreadStatus] = {} @@ -330,25 +399,35 @@ def _get_slot(self, slot_id: str) -> Slot: def _remove_slot(self, slot_id: str) -> None: del self.slots[slot_id] - def issue(self, slot_id: str, note: str) -> IssueReturn: + def get_delay(self, slot_id: str) -> float: + """ + called when story first picked up from message queue. + return time to hold message before starting (delayed counts incremented) + if too long (more than max_delay_seconds), returns -1 + """ with self.big_lock: - if self.active_fetches < self.max_active: - slot = self._get_slot(slot_id) - status = slot._issue() - if status == IssueStatus.OK: - # *MUST* call slot.retire() when done - if slot.active_count == 1: # was idle - self.active_slots += 1 - self.active_fetches += 1 - self._set_thread_status(note) # full URL - return IssueReturn(status, slot) - else: - status = IssueStatus.BUSY - return IssueReturn(status, None) + slot = self._get_slot(slot_id) + return slot._get_delay() - def _slot_retired(self, slot_idle: bool) -> None: + def start(self, slot_id: str, note: str) -> StartRet: + """ + here from worker thread, after delay (if any) + """ + with self.big_lock: + slot = self._get_slot(slot_id) + status = slot._start() + if status == StartStatus.OK: + # *MUST* call slot.finished() when done + if slot.active == 1: # was idle + self.active_slots += 1 + self.active_fetches += 1 + self._set_thread_status(note) # full URL + return StartRet(status, slot) + return StartRet(status, None) + + def _slot_finished(self, slot_idle: bool) -> None: """ - here from slot.retired() + here from slot.finished() """ # NOTE! lock held: avoid logging self.big_lock.assert_held() @@ -357,7 +436,6 @@ def _slot_retired(self, slot_idle: bool) -> None: if slot_idle: assert self.active_slots > 0 self.active_slots -= 1 - # XXX _consider_removing self._set_thread_status(TS_IDLE) def _set_thread_status(self, info: str) -> None: @@ -372,33 +450,26 @@ def _set_thread_status(self, info: str) -> None: ts.info = info ts.ts = time.monotonic() - def periodic(self, dump_slots: bool = False) -> None: + def periodic(self, dump_slots: bool = False) -> Stats: """ - called periodically from main thread + called periodically from main thread. + NOTE!! dump_slots logs with lock held!!!! + Use only for debug! """ with self.big_lock: # do this less frequently? for slot in list(self.slots.values()): slot._consider_removing() - # avoid stats, logging with lock held!!! - recent = len(self.slots) - active_fetches = self.active_fetches - active_slots = self.active_slots - - if dump_slots: + if dump_slots: # NOTE!!! logs with lock held!!! self.debug_info_nolock() - logger.info( - "%d recently active; %d URLs in %d domains active", - recent, - active_fetches, - active_slots, - ) - - self.app.gauge("active.recent", recent) - self.app.gauge("active.fetches", active_fetches) - self.app.gauge("active.slots", active_slots) + return Stats( + slots=len(self.slots), + active_fetches=self.active_fetches, + active_slots=self.active_slots, + delayed=self.delayed, + ) def debug_info_nolock(self) -> None: """ @@ -407,11 +478,15 @@ def debug_info_nolock(self) -> None: """ for domain, slot in list(self.slots.items()): logger.info( - "%s: %s last issue: %s last err: %s", + "%s: %s avg %.3f, %da, %dd, last issue: %s last err: %s next: %s", domain, ",".join(slot.active_threads), - slot.last_issue, + slot.avg_seconds, + slot.active, + slot.delayed, + slot.last_start, slot.last_conn_error, + slot.next_issue, ) # here without lock, so grab before examining: diff --git a/indexer/workers/fetcher/tqfetcher.py b/indexer/workers/fetcher/tqfetcher.py index 9f18a2e8..e3a51bd2 100644 --- a/indexer/workers/fetcher/tqfetcher.py +++ b/indexer/workers/fetcher/tqfetcher.py @@ -11,33 +11,29 @@ be waiting for I/O (due to network/server latency), or CPU bound in SSL processing, neither of which requires holding the GIL. -When a Story can't be fetched because of connect rate or concurrency -limits, the Story is queued to a "fast delay" queue to avoid -bookkeeping complexity (and having an API that allows not ACKing a -message immediately). - -In theory we have thirty minutes to ACK a message before RabbitMQ has -a fit (closes connection), so holding on to Stories that can't be -processed immediately is POSSIBLE, *BUT* the existing framework acks -the story after process_message/story (in _process_messages) and -handles exceptions for retry and quarantine. - -Some thoughts on holding messages (2023-02-05): -* keep absolute "next_issue_time" in Slot -* assign incomming stories an absolute issue time based on next_issue_time -* if next_issue_time too far in the future (over 20 minutes), requeue. -* else call pika_connection.call_later w/ the entire InputMessage & callback -* increment Slot.next_issue_time by Slot.issue_interval -* callback queues InputMessage to work queue -* how to prevent starvation/stalling (hitting prefetch limit) caused by big sources/long delays? - + limit held stories per slot (calculate via (next_issue - now)/issue_interval???)? - + calculate limit as fraction of worker pool/prefetch size??? +We have thirty minutes to ACK a message before RabbitMQ has a fit +(closes connection), so: + * All scheduling done in Pika thread, as messages delivered by Pika + * As messages come to _on_input_message, the next time a fetch could + be issued is assigned by calling scoreboard.get_delay + * If the delay would mean the fetch would start more than BUSY_DELAY_MINUTES + in the future, the message is requeued to the "-fast" delay queue + (and will return in BUSY_DELAY_MINUTES). + * If connections to the server have failed "recently", behave as if + this connection failed, and requeue the story for retry. + * Else call pika_connection.call_later w/ the entire InputMessage and + a callback to queue the InputMessage to the work queue (_message_queue) + and the InputMessage will be picked up by a worker thread and passed + to process_story() """ +# To find all stories_incr label names: +# egrep 'FetchReturn\(|GetIdReturn\(|incr_stor' tqfetcher.py + import argparse import logging -import random +import os import signal import sys import time @@ -46,6 +42,7 @@ import requests from mcmetadata.webpages import MEDIA_CLOUD_USER_AGENT +from pika.adapters.blocking_connection import BlockingChannel from requests.exceptions import RequestException from indexer.app import run @@ -56,27 +53,31 @@ non_news_fqdn, url_fqdn, ) -from indexer.worker import QuarantineException, RequeueException +from indexer.worker import ( + CONSUMER_TIMEOUT_SECONDS, + InputMessage, + QuarantineException, + RequeueException, +) from indexer.workers.fetcher.sched import ( + DELAY_LONG, + DELAY_SKIP, ConnStatus, - IssueReturn, - IssueStatus, ScoreBoard, + StartStatus, ) TARGET_CONCURRENCY = 10 # scrapy fetcher AUTOTHROTTLE_TARGET_CONCURRENCY -# Unless the input RSS entries are well mixed (and this would not be -# the case if the rss-fetcher queued RSS entries to us directly), RSS -# entries for the same domain will travel in packs/clumps/trains. If -# the "fast" delay is too long, that allows only one URL to be issued -# each time the train passes. So set the "fast" delay JUST long -# enough so they come back when intra-request issue interval has -# passed. Note: the intra-request interval is based on response time -# divided by SLOT_REQUESTS, so is not a constant. -BUSY_DELAY_SECONDS = 30.0 - -# time cache server as bad after a connection failure +# minimum interval between initiation of requests to a site +# lower values increase chance of concurrent connections to +# sites that respond quickly. +MIN_INTERVAL_SECONDS = 0.5 + +# default delay time for "fast" queue, and max time to delay stories w/ call_later +BUSY_DELAY_MINUTES = 10 + +# time to cache server as down after a connection failure CONN_RETRY_MINUTES = 10 # requests timeouts: @@ -127,9 +128,13 @@ class FetchReturn(NamedTuple): quarantine: bool -class Fetcher(MultiThreadStoryWorker): - WORKER_THREADS_DEFAULT = 200 # equiv to 20 fetchers, with 10 active fetches +class GetIdReturn(NamedTuple): + status: str # counter name if != "ok" + url: str + id: str + +class Fetcher(MultiThreadStoryWorker): # Exceptions to discard instead of quarantine after repeated retries: # RequestException hierarchy includes bad URLs NO_QUARANTINE = (Retry, RequestException) @@ -138,15 +143,15 @@ def __init__(self, process_name: str, descr: str): super().__init__(process_name, descr) self.scoreboard: Optional[ScoreBoard] = None - self.previous_fragment = "" + self.prefetch = 0 def define_options(self, ap: argparse.ArgumentParser) -> None: super().define_options(ap) ap.add_argument( - "--busy-delay-seconds", + "--busy-delay-minutes", type=float, - default=BUSY_DELAY_SECONDS, - help=f"busy (fast) queue delay in seconds (default: {BUSY_DELAY_SECONDS})", + default=BUSY_DELAY_MINUTES, + help=f"busy (fast) queue delay in minutes (default: {BUSY_DELAY_MINUTES})", ) ap.add_argument( @@ -156,6 +161,13 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: help=f"minutes to cache connection failure (default: {CONN_RETRY_MINUTES})", ) + ap.add_argument( + "--min-interval-seconds", + type=float, + default=MIN_INTERVAL_SECONDS, + help=f"minimum connection interval in seconds (default: {MIN_INTERVAL_SECONDS})", + ) + ap.add_argument( "--target-concurrency", type=int, @@ -163,13 +175,6 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: help=f"goal for concurrent requests/fqdn (default: {TARGET_CONCURRENCY})", ) - ap.add_argument( - "--no-second-try", - default=False, - action="store_true", - help="don't try sleep/retry after busy", - ) - ap.add_argument( "--dump-slots", default=False, @@ -188,16 +193,15 @@ def process_args(self) -> None: ) sys.exit(1) + self.busy_delay_seconds = self.args.busy_delay_minutes * 60 + self.scoreboard = ScoreBoard( - self, - self.workers, - self.args.target_concurrency, - self.args.conn_retry_minutes * 60, + target_concurrency=self.args.target_concurrency, + max_delay_seconds=self.busy_delay_seconds, + conn_retry_seconds=self.args.conn_retry_minutes * 60, + min_interval_seconds=MIN_INTERVAL_SECONDS, ) - self.busy_delay_seconds = self.args.busy_delay_seconds - self.second_try = not self.args.no_second_try - self.set_requeue_delay_ms(1000 * self.busy_delay_seconds) # enable debug dump on SIGUSR1 @@ -207,6 +211,38 @@ def usr1_handler(sig: int, frame: Optional[FrameType]) -> None: signal.signal(signal.SIGUSR1, usr1_handler) + def _qos(self, chan: BlockingChannel) -> None: + """ + set "prefetch" limit, the number of unacked messages + RabbitMQ will send us at any time. + + Active requests, ready messages waiting in _message_queue, + and delayed (call_later) should total to the prefetch limit. + + NOTE!!! Failure to send an ACK to RabbitMQ for + CONSUMER_TIMEOUT_SECONDS for ANY message will cause + RabbitMQ to close the connection!!! So the estimate + MUST be prssimistic!! + """ + # time available to handle a request (in a worker thread) + # after maximum call_later delay + work_time = CONSUMER_TIMEOUT_SECONDS - self.busy_delay_seconds + + # Maximum time handling a request. This is both low (read + # timeout applies to EACH network read) but also possibly + # high. Would have to have working DNS but no TCP connectivity + # to have ALL requests fail (UNLESS Internet unreachable!!!). + max_request_seconds = CONNECT_SECONDS + READ_SECONDS + + # only hand us as many as we KNOW it's possible to handle + # given multiple worker threads: + prefetch = int(self.workers * work_time / max_request_seconds) + if prefetch > 65535: # 16-bit field + prefetch = 65535 + logger.info("prefetch %d", prefetch) + chan.basic_qos(prefetch_count=prefetch) + self.prefetch = prefetch + def periodic(self) -> None: """ called from main_loop @@ -215,9 +251,43 @@ def periodic(self) -> None: assert self.args with self.timer("status"): - self.scoreboard.periodic(self.args.dump_slots) + stats = self.scoreboard.periodic(self.args.dump_slots) + + ready = self.message_queue_len() # ready for workers + # delayed counts not adjusted until "start" called, + # so subtract messages in message_queue: + delayed = stats.delayed - ready + + load_avgs = os.getloadavg() + + # when input queue non-empty, first three should total to self.prefetch + logger.info( + "%d active, %d ready, %d delayed, for %d sites, %d recent; lavg %.2f", + stats.active_fetches, + ready, + delayed, + stats.active_slots, + stats.slots, + load_avgs[0], + ) + + def requests(label: str, count: int) -> None: + self.gauge("requests", count, labels=[("status", label)]) + + requests("active", stats.active_fetches) + requests("ready", ready) + requests("delayed", delayed) - def fetch(self, sess: requests.Session, fqdn: str, url: str) -> FetchReturn: + # above three should total to prefetch: + self.gauge("prefetch", self.prefetch) + + def slots(label: str, count: int) -> None: + self.gauge("slots", count, labels=[("status", label)]) + + slots("recent", stats.slots) + slots("active", stats.active_slots) + + def fetch(self, sess: requests.Session, url: str) -> FetchReturn: """ perform HTTP get, tracking redirects looking for non-news domains @@ -275,88 +345,132 @@ def fetch(self, sess: requests.Session, fqdn: str, url: str) -> FetchReturn: # end infinite redirect loop - def try_issue(self, id: str, url: str) -> IssueReturn: - # report time to issue: if this jumps up, it's - # likely due to lock contention! - assert self.scoreboard - with self.timer("issue"): - return self.scoreboard.issue(id, url) - - # OOF! flake8 complains "C901 'Fetcher.process_story' is too complex (19)" - # flake8: noqa: C901 - def process_story(self, sender: StorySender, story: BaseStory) -> None: + def get_id(self, story: BaseStory) -> GetIdReturn: """ - called from multiple worker threads!! - - This routine should call incr_stories EXACTLY once! + This function determines what stories are treated as from + the same "server". + + NOT using "domain" from RSS file because I originally + was planning to move the "issue" call inside the + redirect loop (getting clearance for each FQDN along the + chain), but if we ended up with a "busy", we'd have to + retry and start ALL over, or add a field to the Story + indicating the "next URL" to attempt to fetch, along + with a count of followed redirects. AND, using + "canonical" domain means EVERYTHING inside a domain + looks to be one server (when that may not be the case). + + *COULD* look up addresses, sort them, and pick the lowest or + highest?! this would avoid hitting single servers that handle + many thing.dom.ain names hard, but incurrs overhead (and + unless the id is stashed in the story object would require + multiple DNS lookups: initial Pika thread dispatch, in worker + thread for "start" call, and again for actual connection. + Hopefully the result is cached nearby, but it would still incurr + latency for due to system calls, network delay etc. """ rss = story.rss_entry() url = rss.link if not url: - return self.incr_stories("no-url", repr(url)) - assert isinstance(url, str) - assert self.scoreboard is not None + return GetIdReturn("no-url", repr(url), "bad") - # NOTE!! fqdn isn't QUITE right: it means every - # foobar.blogspot.com is treated as a separate server. - # Really want to use IP address (see sched.py for more - # gory details), but that would require resolving the - # FQDN, and *THEN* using that address to make the HTTP - # connection *AND* subsequent redirect fetches (if the - # FQDN stays the same). - - # NOT using "domain" from RSS file because I originally - # was planning to move the "issue" call inside the - # redirect loop (getting clearance for each FQDN along the - # chain), but if we ended up with a "busy", we'd have to - # retry and start ALL over, or add a field to the Story - # indicating the "next URL" to attempt to fetch, along - # with a count of followed redirects. AND, using - # "canonical" domain means EVERYTHING inside a domain - # looks to be one server (when that may not be the case). + assert isinstance(url, str) # BEFORE issue (discard without locking/delay) try: fqdn = url_fqdn(url) except (TypeError, ValueError): - return self.incr_stories("badurl1", url) + return GetIdReturn("badurl1", url, fqdn) if non_news_fqdn(fqdn): # unlikely, if queuer does their job! - return self.incr_stories("non-news", url) - - ir = self.try_issue(fqdn, url) - if ir.status == IssueStatus.BUSY and self.second_try: - # Not inside "issue" to avoid messing up timing stats. - # All messages in the fast queue get the same delay, so - # introduce some randomness here, AND check a second time - # (improves output rate). With low RTT and high - # target_concurreny, most slots have an issue_interval - # under a second. - time.sleep(random.random()) - ir = self.try_issue(fqdn, url) - - if ir.slot is None: # could not be issued - if ir.status == IssueStatus.SKIPPED: - # Skipped due to recent connection error: Treat as if - # we saw an error as well (incrementing retry count on - # the Story) rather than possibly waiting 30 seconds - # for connection to time out again. After a failure - # the scheduler remembers the slot as failing for - # conn_retry_minutes - self.incr_stories("skipped", url) + return GetIdReturn("non-news", url, fqdn) + + return GetIdReturn("ok", url, fqdn) + + def _on_input_message(self, im: InputMessage) -> None: + """ + YIKES!! override a basic Worker method!!! + Performs an additional decode of serialized Story! + NOTE! Not covered by exception catching for retry!!! + MUST ack and commit before returning!!! + + pre-processes incomming stories, delaying them + (using the Pika "channel.call_later" method) + so that they're queued to the worker pool + with suitable inter-request delays for each server. + + DOES NOT INCREMENT STORY COUNTER!!! + (perhaps have a different counter??) + """ + assert self.scoreboard is not None + assert self.connection is not None + + try: + story = self.decode_story(im) + + status, url, id = self.get_id(story) + if status != "ok": + self.incr_stories(status, url) + self._pika_ack_and_commit(im) # drop (ack without requeuing) + return + + with self.timer("get_delay"): + delay = self.scoreboard.get_delay(id) + + logger.info("%s: delay %.3f", url, delay) + if delay >= 0: + # NOTE! Using pika connection.call_later because it's available. + # "put" does not need to be run in the Pika thread, and the + # delay _could_ be managed in another thread. + def put() -> None: + # _put_message queue is the normal "_on_input_message" handler + logger.debug("put #%s", im.method.delivery_tag) + self._put_message_queue(im) + + # holding message, will be acked when processed + if delay == 0: + put() + else: + logger.debug("delay #%s", im.method.delivery_tag) + self.connection.call_later(delay, put) + return + elif delay == DELAY_SKIP: raise Retry("skipped due to recent connection failure") + elif delay == DELAY_LONG: + self._requeue(im) else: - # here when "busy", due to one of (in order of likelihood): - # 1. per-fqdn connect interval not reached - # 2. per-fqdn currency limit reached - # 3. total concurrecy limit reached. - # requeue in short-delay queue, without counting as retry. - self.incr_stories("busy", url, log_level=logging.DEBUG) - raise RequeueException("busy") # does not increment retry count - - # ***NOTE*** here with slot marked active *MUST* call ir.slot.retire!!!! + raise Retry(f"unknown delay {delay}") + except Exception as exc: + self._retry(im, exc) + self._pika_ack_and_commit(im) + + def process_story(self, sender: StorySender, story: BaseStory) -> None: + """ + called in a worker thread + retry/quarantine exceptions handled normally + """ + istatus, url, id = self.get_id(story) + if istatus != "ok": + logger.warning("get_id returned ('%s', '%s')", istatus, id) + self.incr_stories(istatus, id) + return + + assert self.scoreboard is not None + start_status, slot = self.scoreboard.start(id, url) + if start_status == StartStatus.SKIP: + self.incr_stories("skipped2", url) + raise Retry("skipped due to recent connection failure") + elif start_status == StartStatus.BUSY: + self.incr_stories("busy", url) + raise RequeueException("busy") + elif start_status != StartStatus.OK: + logger.warning("start status %s: %s", start_status, url) + raise Retry(f"start status {start_status}") + assert slot is not None + + # ***NOTE*** here with slot marked active *MUST* call slot.finish!!!! t0 = time.monotonic() with self.timer("fetch"): # log starting URL @@ -364,7 +478,7 @@ def process_story(self, sender: StorySender, story: BaseStory) -> None: sess = requests.Session() try: # call retire on exit - fret = self.fetch(sess, fqdn, url) + fret = self.fetch(sess, url) if fret.resp and fret.resp.status_code == 200: conn_status = ConnStatus.DATA else: @@ -385,9 +499,10 @@ def process_story(self, sender: StorySender, story: BaseStory) -> None: raise # re-raised for retry counting finally: # ALWAYS: report slot now idle!! - with self.timer("retire"): + # jumps in timing indicate lock contention!! + with self.timer("finish"): # keep track of connection success, latency - ir.slot.retire(conn_status, time.monotonic() - t0) + slot.finish(conn_status, time.monotonic() - t0) sess.close() resp = fret.resp # requests.Response @@ -410,7 +525,6 @@ def process_story(self, sender: StorySender, story: BaseStory) -> None: raise Retry(msg) else: return self.incr_stories(counter, msg) - # here with status == 200 content = resp.content # bytes lcontent = len(content) From 6c2a0d7acbce4571b76c124d139c1c9b70dd8272 Mon Sep 17 00:00:00 2001 From: Phil Budne Date: Thu, 14 Mar 2024 14:08:29 -0400 Subject: [PATCH 2/6] Convert indexer/workers/fetcher/rss-queuer.py to lxml iterparse xml.sax parser cannot ignore non-conforming XML (control characters in titles) * added lxml-stubs package * ran make upgrade --- .pre-commit-config.yaml | 2 +- indexer/workers/fetcher/rss-queuer.py | 202 ++++++++++++-------------- pyproject.toml | 4 +- requirements-dev.txt | 18 ++- requirements.txt | 8 +- 5 files changed, 113 insertions(+), 121 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 05668f8f..dc566769 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,7 +16,7 @@ repos: - id: black language_version: python3.10 - repo: http://github.com/pre-commit/mirrors-mypy - rev: v1.8.0 + rev: v1.9.0 hooks: - id: mypy entry: bin/pre-commit-wrapper.py mypy diff --git a/indexer/workers/fetcher/rss-queuer.py b/indexer/workers/fetcher/rss-queuer.py index 387a050b..121335c5 100644 --- a/indexer/workers/fetcher/rss-queuer.py +++ b/indexer/workers/fetcher/rss-queuer.py @@ -8,7 +8,8 @@ Queuer framework handles local files, http/https/s3 URLs, transparently un-gzips. -On-the-fly XML parsing using "SAX" parser. +On-the-fly XML parsing using iterparse +(w/o reading whole file into memory) NOTE! --yesterday only valid after 01:00 GMT (before that, gets the day before) @@ -18,8 +19,11 @@ import html import logging import time -import xml.sax -from typing import BinaryIO, List, Optional +from typing import BinaryIO + +# xml.etree.ElementTree.iterparse doesn't have recover argument +# (fatal error on control characters in element text), so using lxml +from lxml.etree import iterparse from indexer.app import run from indexer.queuer import Queuer @@ -27,120 +31,34 @@ S3_URL_BASE = "https://mediacloud-public.s3.amazonaws.com/backup-daily-rss" -Attrs = xml.sax.xmlreader.AttributesImpl - Story = StoryFactory() logger = logging.getLogger("queue-rss") -def optional_int(input: Optional[str]) -> Optional[int]: +def optional_int(input: str | None) -> int | None: if not input or not input.isdigit(): return None return int(input) -class RSSHandler(xml.sax.ContentHandler): - link: str # required! - domain: Optional[str] - pub_date: Optional[str] - title: Optional[str] - source_url: Optional[str] - source_feed_id: Optional[int] - source_source_id: Optional[int] - content: List[str] - - def __init__(self, app: "RSSQueuer", fname: str): - self.app = app - self.parsed = self.bad = 0 - self.in_item = False - self.file_name = fname - self.reset_item() - self.content = [] - - def reset_item(self) -> None: - self.link = "" - self.domain = "" - self.pub_date = "" - self.title = "" - self.source_url = None - self.source_feed_id = None - self.source_source_id = None - - def startElement(self, name: str, attrs: Attrs) -> None: - if name == "item": - # error if in_item is True (missing end element?) - self.in_item = True - elif self.in_item: - if name == "source": - self.source_url = attrs.get("url") - self.source_feed_id = optional_int(attrs.get("mcFeedId")) - self.source_source_id = optional_int(attrs.get("mcSourceId")) - self.content = [] # DOES NOT WORK FOR NESTED TAGS! - - def characters(self, content: str) -> None: - """ - handle text content inside current tag; - may come in multiple calls!! - DOES NOT WORK FOR NESTED TAGS!! - (would need a stack pushed by startElement, popped by endElement) - """ - self.content.append(content) - - def endElement(self, name: str): # type: ignore[no-untyped-def] - if not self.in_item: - return - - # here at end of a tag inside an - - # join bits of tag content together - # DOES NOT WORK FOR NESTED TAGS!! - content = "".join(self.content) - - if name == "link": - # undo HTML entity escapes: - self.link = html.unescape(content).strip() - elif name == "domain": - self.domain = content.strip() or None - elif name == "pubDate": - self.pub_date = content.strip() or None - elif name == "title": - self.title = content.strip() or None - elif name == "item": - # domain not required by queue-based fetcher - if self.link: - s = Story() - # mypy reval_type(rss) in "with s.rss_entry() as rss" gives Any!! - rss = s.rss_entry() - with rss: - rss.link = self.link - rss.domain = self.domain - rss.pub_date = self.pub_date - rss.title = self.title - rss.source_url = self.source_url - rss.source_feed_id = self.source_feed_id - rss.source_source_id = self.source_source_id - rss.via = self.file_name # instead of fetch_date - self.app.send_story(s) - self.reset_item() - self.parsed += 1 - else: - assert self.app.args - # don't muddy the water if just a dry-run: - if not self.app.args.dry_run: - self.app.incr_stories("bad", self.link) - self.bad += 1 - self.in_item = False - - class RSSQueuer(Queuer): HANDLE_GZIP = True # transparently handle .gz files def __init__(self, process_name: str, descr: str): super().__init__(process_name, descr) - - self.sample_size: Optional[int] = None - self.dry_run = False + self.reset_item(True) + + def reset_item(self, first: bool = False) -> None: + if first: + self.ok = self.bad = 0 + self.link: str | None = "" + self.domain: str | None = "" + self.pub_date: str | None = "" + self.title: str | None = "" + self.source_url: str | None = None + self.source_feed_id: int | None = None + self.source_source_id: int | None = None def define_options(self, ap: argparse.ArgumentParser) -> None: super().define_options(ap) @@ -209,15 +127,85 @@ def add_previous(days: int) -> None: for days in range(1, args.days + 1): add_previous(days) + def end_item(self, fname: str) -> None: + # domain not required by queue-based fetcher + if self.link: + s = Story() + # mypy reval_type(rss) in "with s.rss_entry() as rss" gives Any!! + rss = s.rss_entry() + with rss: + rss.link = self.link + rss.domain = self.domain + rss.pub_date = self.pub_date + rss.title = self.title + rss.source_url = self.source_url + rss.source_feed_id = self.source_feed_id + rss.source_source_id = self.source_source_id + rss.via = fname # instead of fetch_date + self.send_story(s) + self.ok += 1 + else: + assert self.args + # don't muddy the water if just a dry-run: + if not self.args.dry_run: + self.incr_stories("bad", self.link or "no-link") + self.bad += 1 + self.reset_item() + def process_file(self, fname: str, fobj: BinaryIO) -> None: """ - called for each file/url on command line, and those - implied by --fetch-date, --days and --yesterday + called for each file/url on command line, + each file in a directory on the command line, + each S3 object matching an s3 URL prefix, + and URLs implied by --fetch-date, --days and --yesterday with an uncompressed (binary) byte stream """ - handler = RSSHandler(self, fname) - xml.sax.parse(fobj, handler) - logger.info("processed %s: %d ok, %d bad", fname, handler.parsed, handler.bad) + path: list[str] = [] + self.reset_item(True) + + # recover=True avoids fatal error when control characters seen in title + for event, element in iterparse(fobj, events=("start", "end"), recover=True): + name = element.tag + logger.debug("%s tag %s level %d", event, name, len(path)) + if event == "start": + path.append(name) + else: # event == "end" + path.pop() + if name == "item": + self.end_item(fname) + continue + + lpath = len(path) + if ( + (lpath > 0 and path[0] != "rss") + or (lpath > 1 and path[1] != "channel") + or (lpath > 2 and path[2] != "item") + or lpath > 3 + ): + logger.warning( + "%s: unexpected tag %s path %s", + name, + "/".join(path), + ) + continue + + # here at end of a tag inside an + content = element.text or "" + assert isinstance(content, str) + if name == "link": + # undo HTML entity escapes: + self.link = html.unescape(content).strip() + elif name == "domain": + self.domain = content.strip() or None + elif name == "pubDate": + self.pub_date = content.strip() or None + elif name == "title": + self.title = content.strip() or None + elif name == "source": + self.source_url = element.get("url") + self.source_feed_id = optional_int(element.get("mcFeedId")) + self.source_source_id = optional_int(element.get("mcSourceId")) + logger.info("processed %s: %d ok, %d bad", fname, self.ok, self.bad) if __name__ == "__main__": diff --git a/pyproject.toml b/pyproject.toml index 204ebf59..2754e197 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ dependencies = [ "boto3 ~= 1.28.44", "docker ~= 6.1.0", "elasticsearch ~= 8.12.0", + # lxml installed by some other package? "mediacloud-metadata ~= 0.12.0", "pika ~= 1.3.2", "rabbitmq-admin ~= 0.2", @@ -23,8 +24,9 @@ dependencies = [ [project.optional-dependencies] dev = [ "boto3-stubs[s3] ~= 1.34.13", - "mypy ~= 1.5.1", "jinja2-cli ~= 0.8.2", + "lxml-stubs ~= 0.5.1", + "mypy ~= 1.5.1", "pre-commit ~= 3.4.0", "pytest ~= 7.4.2", "types-beautifulsoup4 ~= 4.12.0.20240106", diff --git a/requirements-dev.txt b/requirements-dev.txt index fe81bf62..9e1b0399 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -22,13 +22,13 @@ boilerpy3==1.0.7 # via mediacloud-metadata boto3==1.28.85 # via story-indexer (pyproject.toml) -boto3-stubs==1.34.54 +boto3-stubs==1.34.62 # via story-indexer (pyproject.toml) botocore==1.31.85 # via # boto3 # s3transfer -botocore-stubs==1.34.54 +botocore-stubs==1.34.62 # via boto3-stubs certifi==2024.2.2 # via @@ -146,13 +146,15 @@ lxml==4.9.4 # readability-lxml # scrapy # trafilatura +lxml-stubs==0.5.1 + # via story-indexer (pyproject.toml) markupsafe==2.1.5 # via jinja2 mediacloud-metadata==0.12.0 # via story-indexer (pyproject.toml) mypy==1.5.1 # via story-indexer (pyproject.toml) -mypy-boto3-s3==1.34.14 +mypy-boto3-s3==1.34.62 # via boto3-stubs mypy-extensions==1.0.0 # via mypy @@ -166,13 +168,13 @@ numpy==1.26.4 # via py3langid orderedmultidict==1.0.1 # via furl -packaging==23.2 +packaging==24.0 # via # docker # parsel # pytest # scrapy -parsel==1.8.1 +parsel==1.9.0 # via # itemloaders # scrapy @@ -204,7 +206,7 @@ pycparser==2.21 # via cffi pydispatcher==2.0.7 # via scrapy -pyopenssl==24.0.0 +pyopenssl==24.1.0 # via scrapy pytest==7.4.4 # via story-indexer (pyproject.toml) @@ -301,7 +303,7 @@ types-html5lib==1.1.11.20240228 # via types-beautifulsoup4 types-pika==1.2.0b1 # via story-indexer (pyproject.toml) -types-requests==2.31.0.20240218 +types-requests==2.31.0.20240311 # via story-indexer (pyproject.toml) types-s3transfer==0.10.0 # via boto3-stubs @@ -343,7 +345,7 @@ zope-interface==6.2 # twisted # The following packages are considered to be unsafe in a requirements file: -setuptools==69.1.1 +setuptools==69.2.0 # via # nodeenv # scrapy diff --git a/requirements.txt b/requirements.txt index 1394cb14..63fdf301 100644 --- a/requirements.txt +++ b/requirements.txt @@ -136,12 +136,12 @@ numpy==1.26.4 # via py3langid orderedmultidict==1.0.1 # via furl -packaging==23.2 +packaging==24.0 # via # docker # parsel # scrapy -parsel==1.8.1 +parsel==1.9.0 # via # itemloaders # scrapy @@ -167,7 +167,7 @@ pycparser==2.21 # via cffi pydispatcher==2.0.7 # via scrapy -pyopenssl==24.0.0 +pyopenssl==24.1.0 # via scrapy python-dateutil==2.9.0.post0 # via @@ -279,7 +279,7 @@ zope-interface==6.2 # twisted # The following packages are considered to be unsafe in a requirements file: -setuptools==69.1.1 +setuptools==69.2.0 # via # scrapy # supervisor From 866e8894e511dfc3920acf524d3f4d7c0564d872 Mon Sep 17 00:00:00 2001 From: Phil Budne Date: Thu, 14 Mar 2024 22:19:00 -0400 Subject: [PATCH 3/6] indexer/workers/fetcher/tqfetcher.py: fix post redirect logging/tests --- indexer/workers/fetcher/tqfetcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/workers/fetcher/tqfetcher.py b/indexer/workers/fetcher/tqfetcher.py index e3a51bd2..e84a319e 100644 --- a/indexer/workers/fetcher/tqfetcher.py +++ b/indexer/workers/fetcher/tqfetcher.py @@ -320,8 +320,8 @@ def fetch(self, sess: requests.Session, url: str) -> FetchReturn: # here with redirect: nextreq = resp.next # PreparedRequest | None if nextreq: - url = prepreq.url or "" prepreq = nextreq + url = prepreq.url or "" else: url = "" From 16f3b92cfa4b9faeccce519a74f3c1d64406ef45 Mon Sep 17 00:00:00 2001 From: Phil Budne Date: Fri, 15 Mar 2024 23:05:40 -0400 Subject: [PATCH 4/6] Comments and parameter tuning --- indexer/workers/fetcher/sched.py | 37 ++++++++++++++++++++------ indexer/workers/fetcher/tqfetcher.py | 39 ++++++++++------------------ 2 files changed, 42 insertions(+), 34 deletions(-) diff --git a/indexer/workers/fetcher/sched.py b/indexer/workers/fetcher/sched.py index 4589f9cc..0f7c0d3a 100644 --- a/indexer/workers/fetcher/sched.py +++ b/indexer/workers/fetcher/sched.py @@ -18,10 +18,11 @@ # if no active/delayed requests and no errors (maintains request RTT) SLOT_RECENT_MINUTES = 5 -# exponential moving average coefficient for avg_seconds. -# (typ. used by TCP for RTT est) -# https://en.wikipedia.org/wiki/Exponential_smoothing -ALPHA = 0.25 +# exponentially decayed moving average coefficient for avg_seconds. +# (used in TCP for RTT averaging, and in system load averages) +# see https://en.wikipedia.org/wiki/Exponential_smoothing +# Scrapy essentially uses 0.5: (old_avg + sample) / 2 +ALPHA = 0.25 # applied to new samples logger = logging.getLogger(__name__) @@ -229,7 +230,7 @@ def __init__(self, slot_id: str, sb: "ScoreBoard"): def _get_delay(self) -> float: """ return delay in seconds until fetch can begin. - or value < 0 (DELAY_XXX) + or value < 0 (DELAY_{SKIP,LONG}) """ # NOTE! Lock held: avoid logging! @@ -244,6 +245,11 @@ def _get_delay(self) -> float: return DELAY_LONG if delay < 0: + # this site/slot clear to issue: consider some sort of + # rate limit here to avoid filling up _message_queue when + # incomming requests are well mixed (100 requests to 100 + # sites will go right to the message queue)! Could have a + # scoreboard wide "next_issue" clock?? delay = 0.0 # never issued or past due self.next_issue.set(self.issue_interval) @@ -291,19 +297,34 @@ def finish(self, conn_status: ConnStatus, sec: float) -> None: oavg = self.avg_seconds if conn_status == ConnStatus.NOCONN: self.last_conn_error.reset() + # discard avg connection time estimate: + self.avg_seconds = 0 elif conn_status == ConnStatus.DATA: if self.avg_seconds == 0: self.avg_seconds = sec else: - # exponentially moving average (typ. used by TCP for RTT est) - # https://en.wikipedia.org/wiki/Exponential_smoothing - self.avg_seconds += (sec - self.avg_seconds) * ALPHA + if sec > self.avg_seconds: + # per scrapy: adopt larger values directly + self.avg_seconds = sec + else: + # exponentially decayed moving average + # see comments on ALPHA declaration above. + self.avg_seconds += (sec - self.avg_seconds) * ALPHA + + # note: the above is a simplification of: + # avg = ALPHA * new + BETA * avg + # where ALPHA + BETA == 1.0, or BETA = 1.0 - ALPHA + # => avg = new * ALPHA + (1 - ALPHA) * avg + # => avg = new * ALPHA + avg - avg * ALPHA + # => avg = (new - avg) * ALPHA + avg + # => avg += (new - avg) * ALPHA elif conn_status == ConnStatus.NODATA: # got connection but no data # better to have some estimate of connection average time than none if self.avg_seconds == 0: self.avg_seconds = sec if self.avg_seconds != oavg: + # average success time has changed, update issue interval interval = self.avg_seconds / self.sb.target_concurrency if interval < self.sb.min_interval_seconds: interval = self.sb.min_interval_seconds diff --git a/indexer/workers/fetcher/tqfetcher.py b/indexer/workers/fetcher/tqfetcher.py index e84a319e..04d5821e 100644 --- a/indexer/workers/fetcher/tqfetcher.py +++ b/indexer/workers/fetcher/tqfetcher.py @@ -53,12 +53,7 @@ non_news_fqdn, url_fqdn, ) -from indexer.worker import ( - CONSUMER_TIMEOUT_SECONDS, - InputMessage, - QuarantineException, - RequeueException, -) +from indexer.worker import InputMessage, QuarantineException, RequeueException from indexer.workers.fetcher.sched import ( DELAY_LONG, DELAY_SKIP, @@ -74,8 +69,10 @@ # sites that respond quickly. MIN_INTERVAL_SECONDS = 0.5 -# default delay time for "fast" queue, and max time to delay stories w/ call_later -BUSY_DELAY_MINUTES = 10 +# default delay time for "fast" queue, and max time to delay stories +# w/ call_later. Large values allow more requests to be delayed, so +# keeping it small, hopefully breaking up clumps. +BUSY_DELAY_MINUTES = 2 # time to cache server as down after a connection failure CONN_RETRY_MINUTES = 10 @@ -224,24 +221,12 @@ def _qos(self, chan: BlockingChannel) -> None: RabbitMQ to close the connection!!! So the estimate MUST be prssimistic!! """ - # time available to handle a request (in a worker thread) - # after maximum call_later delay - work_time = CONSUMER_TIMEOUT_SECONDS - self.busy_delay_seconds - - # Maximum time handling a request. This is both low (read - # timeout applies to EACH network read) but also possibly - # high. Would have to have working DNS but no TCP connectivity - # to have ALL requests fail (UNLESS Internet unreachable!!!). - max_request_seconds = CONNECT_SECONDS + READ_SECONDS - - # only hand us as many as we KNOW it's possible to handle - # given multiple worker threads: - prefetch = int(self.workers * work_time / max_request_seconds) - if prefetch > 65535: # 16-bit field - prefetch = 65535 - logger.info("prefetch %d", prefetch) - chan.basic_qos(prefetch_count=prefetch) - self.prefetch = prefetch + # Want to avoid very large numbers of requests in the "ready" + # state (in message queue), since there is no inter-request + # delay enforced once requests land there. + self.prefetch = self.workers * 2 + logger.info("prefetch %d", self.prefetch) + chan.basic_qos(prefetch_count=self.prefetch) def periodic(self) -> None: """ @@ -431,6 +416,8 @@ def put() -> None: # holding message, will be acked when processed if delay == 0: + # enforce SOME kind of rate limit? + # see comments in Slot._get_delay() put() else: logger.debug("delay #%s", im.method.delivery_tag) From 6b5f12421b89a361a47bc5f808a52fd8251e7703 Mon Sep 17 00:00:00 2001 From: Phil Budne Date: Sun, 17 Mar 2024 14:02:57 -0400 Subject: [PATCH 5/6] slots gauges --- indexer/workers/fetcher/tqfetcher.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/indexer/workers/fetcher/tqfetcher.py b/indexer/workers/fetcher/tqfetcher.py index 04d5821e..d3e55da3 100644 --- a/indexer/workers/fetcher/tqfetcher.py +++ b/indexer/workers/fetcher/tqfetcher.py @@ -266,11 +266,8 @@ def requests(label: str, count: int) -> None: # above three should total to prefetch: self.gauge("prefetch", self.prefetch) - def slots(label: str, count: int) -> None: - self.gauge("slots", count, labels=[("status", label)]) - - slots("recent", stats.slots) - slots("active", stats.active_slots) + self.gauge("slots.recent", stats.slots) + self.gauge("slots.active", stats.active_slots) def fetch(self, sess: requests.Session, url: str) -> FetchReturn: """ From fa552023792c319e20cb5a08c2a8d3faf86a63ee Mon Sep 17 00:00:00 2001 From: Phil Budne Date: Wed, 20 Mar 2024 00:34:05 -0400 Subject: [PATCH 6/6] workers/fetcher/tqfetcher.py: use args.min_interval_seconds! caught in review by @kilemensi --- indexer/workers/fetcher/tqfetcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/workers/fetcher/tqfetcher.py b/indexer/workers/fetcher/tqfetcher.py index d3e55da3..c779133f 100644 --- a/indexer/workers/fetcher/tqfetcher.py +++ b/indexer/workers/fetcher/tqfetcher.py @@ -196,7 +196,7 @@ def process_args(self) -> None: target_concurrency=self.args.target_concurrency, max_delay_seconds=self.busy_delay_seconds, conn_retry_seconds=self.args.conn_retry_minutes * 60, - min_interval_seconds=MIN_INTERVAL_SECONDS, + min_interval_seconds=self.args.min_interval_seconds, ) self.set_requeue_delay_ms(1000 * self.busy_delay_seconds)