fix(scrapy): async-thread startup race, shutdown lifecycle, and timeout setting#979
fix(scrapy): async-thread startup race, shutdown lifecycle, and timeout setting#979vdusek wants to merge 13 commits into
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #979 +/- ##
==========================================
+ Coverage 89.90% 91.60% +1.69%
==========================================
Files 49 49
Lines 3091 3168 +77
==========================================
+ Hits 2779 2902 +123
+ Misses 312 266 -46
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
…lure or shutdown error
| value = self._async_thread.run_coro(self._kvs.get_value(key)) | ||
| try: | ||
| value = self._async_thread.run_coro(self._kvs.get_value(key)) | ||
| except Exception: |
There was a problem hiding this comment.
This pattern is really hard to understand without more context. I think this deserves an inline comment, as the natural thing to do with this is to remove the try/except as it seems redundant.
There was a problem hiding this comment.
It is used everywhere. IIRC, there were some issues with the exception propagation from a separate thread and its event loop. This was extreme pain to make it work, so I definitely wouldn't change it.
There was a problem hiding this comment.
Swallowing all exceptions to add additional log and re-raise the exception is an intentional non-standard action, which is here due to some external context circumstances. Those external circumstances are not obvious on this level, so that is why I am asking for some comment, so that the reader without context can understand the intention of the code.
I am not asking to change this, but to help protect it by adding a comment explaining it.
There was a problem hiding this comment.
Ah, okay, I just thought you wanted an explanation, sorry, adding it.
aba09c6 to
3c4a3ec
Compare
Description
Fixes several defects in the Scrapy integration's background event-loop thread (
AsyncThread), the scheduler, and the HTTP cache storage, and makes the loop timeout configurable.Fixes
run_corostartup race — theis_running()guard fired spuriously when a coroutine was submitted before the loop thread reachedrun_forever()(observed ~122/500 inscheduler.open()). It now guards onis_closed(). A coroutine queued on a not-yet-running loop runs once the loop starts; only a closed loop raises.close()thread leak — if task cancellation timed out or raised, the loop was never stopped or joined. Stop, join, and the forced-shutdown fallback now run in afinally, and the original error still propagates.close()second call — a repeated close raisedRuntimeError: Event loop is closed. Anis_closed()early-return makes it a no-op.close()ignored itstimeoutfor the cancellation step (it used the constructor default). It now passes the caller's timeout through.run_corotimeout left the coroutine running. It now cancels the future on timeout.open_spidernow closes the thread if opening the key-value store fails (matchingApifyScheduler.open). The expiration sweep runs insidetrywithclose()in afinally.APIFY_ASYNC_THREAD_TIMEOUT_SECSsetting, wired into the scheduler (viafrom_crawler) and the cache storage.Error logging
The integration now follows consistent conventions for caught exceptions:
except … as exc:→logger.warning(f'… {exc}'), swallowed — for expected, recoverable conditions handled locally: a malformed or legacy stored payload skipped as a cache/queue miss, or non-UTF-8 headers preserved in the serialized request. A short message plus the exception text, with no traceback, because it is not a bug.except Exception:→logger.exception('…'), swallowed — for unexpected failures handled at a terminal point: the cleanup sweep, shutdown, or skip-and-continue.logger.exceptionattaches the full traceback, and nothing re-raises because the error is handled here.except …:→raise(no logging) — when the error is re-raised and the caller or Scrapy logs it with a traceback anyway.run_coro's timeout path cancels the future and re-raises without logging, so the failure is reported once.except Exception:→logger.exception('…'); raise— the boundary log, used only where local context materially helps and the propagated error would otherwise be logged only generically or not at all. The scheduler'snext_request/enqueue_request/has_pending_requestsare called synchronously by the Scrapy engine (not inside a Deferred), so without this log the Apify-specific context would be lost.Why
logger.exceptionreplacedtraceback.print_exc():traceback.print_exc()writes a bare traceback straight to stderr, bypassing logging entirely. It has no level, no logger name, no message, and ignores Scrapy's and the SDK's log configuration and handlers.logger.exception(msg)logs at ERROR through the configured logging, so it is routed, formatted, and filterable like every other log line. It adds a message explaining what failed and still attaches the full traceback automatically, which makes including the exception object in the message ({exc}) redundant (ruff TRY401).Tests
New
tests/unit/scrapy/test_async_thread.pycovers the startup race, run-after-close, timeout cancellation, idempotent close, the caller timeout reaching the shutdown step, and stop/join when task cancellation fails. The scheduler and HTTP cache test modules gain coverage for the timeout setting, closing the thread on open failure, and the cleanup-failure path still closing the thread.