-
Notifications
You must be signed in to change notification settings - Fork 24
fix(scrapy): async-thread startup race, shutdown lifecycle, and timeout setting #979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
5cca584
c025745
37aa4cb
2275ad3
8540470
50aa539
98edce2
a60a55c
29ec5ff
8b7b8c4
5e56424
4a5a77e
3c4a3ec
83d50ec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
| import io | ||
| import re | ||
| import struct | ||
| from datetime import timedelta | ||
| from logging import getLogger | ||
| from time import time | ||
| from typing import TYPE_CHECKING | ||
|
|
@@ -35,16 +36,36 @@ class ApifyCacheStorage: | |
| """ | ||
|
|
||
| def __init__(self, settings: BaseSettings) -> None: | ||
| # Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`). | ||
| self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100) | ||
| """Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`).""" | ||
|
|
||
| self._async_thread_timeout = timedelta(seconds=settings.getint('APIFY_ASYNC_THREAD_TIMEOUT_SECS', 60)) | ||
| """Caps how long each coroutine run on the background event loop may take.""" | ||
|
|
||
| self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS') | ||
| """Seconds a cached entry stays fresh; older entries are treated as expired, and `0` disables expiration.""" | ||
|
|
||
| self._spider: Spider | None = None | ||
| """The Scrapy `Spider` this cache storage is bound to; set in `open_spider`.""" | ||
|
|
||
| self._kvs: KeyValueStore | None = None | ||
| """The Apify `KeyValueStore` backing the cache; opened in `open_spider`.""" | ||
|
|
||
| self._fingerprinter: RequestFingerprinterProtocol | None = None | ||
| """Scrapy's request fingerprinter, used to derive the cache key for each request.""" | ||
|
|
||
| self._async_thread: AsyncThread | None = None | ||
| """Background event-loop thread that runs the storage coroutines from Scrapy's synchronous callbacks.""" | ||
|
|
||
| def open_spider(self, spider: Spider) -> None: | ||
| """Open the cache storage for a spider.""" | ||
| """Open the cache storage for a spider. | ||
|
|
||
| Starts the background event-loop thread and opens the spider's key-value store. If opening the store | ||
| fails, the freshly started thread is closed so it is not leaked. | ||
|
|
||
| Args: | ||
| spider: The spider the cache storage is being opened for. | ||
| """ | ||
| logger.debug('Using Apify key value cache storage', extra={'spider': spider}) | ||
| self._spider = spider | ||
| self._fingerprinter = spider.crawler.request_fingerprinter | ||
|
|
@@ -62,58 +83,75 @@ async def open_kvs() -> KeyValueStore: | |
| return await KeyValueStore.open(name=kvs_name) | ||
|
|
||
| logger.debug("Starting background thread for cache storage's event loop") | ||
| self._async_thread = AsyncThread() | ||
| self._async_thread = AsyncThread(default_timeout=self._async_thread_timeout) | ||
| logger.debug(f"Opening cache storage's {kvs_name!r} key value store") | ||
| self._kvs = self._async_thread.run_coro(open_kvs()) | ||
|
|
||
| try: | ||
| self._kvs = self._async_thread.run_coro(open_kvs()) | ||
| except Exception: | ||
| logger.exception('Failed to open the cache key-value store.') | ||
| # Opening the key-value store failed, so close the freshly started async thread instead of | ||
| # leaking its event-loop thread (`close_spider` may never run if `open_spider` fails). Guard | ||
| # the close so a secondary failure here cannot mask the original error. | ||
| try: | ||
| self._async_thread.close() | ||
| except Exception: | ||
| logger.exception('Failed to close the async thread after a failed cache storage open.') | ||
| raise | ||
|
|
||
| def close_spider(self, _: Spider, current_time: int | None = None) -> None: | ||
| """Close the cache storage for a spider.""" | ||
| """Close the cache storage for a spider. | ||
|
|
||
| Runs a best-effort cleanup sweep that deletes expired entries when expiration is enabled, then shuts | ||
| down the background event-loop thread. The thread is always closed, even if the sweep fails. | ||
|
|
||
| Args: | ||
| _: The spider being closed. Part of Scrapy's storage interface, unused here. | ||
| current_time: Unix time in seconds used as the current time when deciding which entries have | ||
| expired. Defaults to the current time. | ||
| """ | ||
| if self._async_thread is None: | ||
| raise ValueError('Async thread not initialized') | ||
|
|
||
| if current_time is None: | ||
| current_time = int(time()) | ||
|
|
||
| logger.info(f'Cleaning up cache items (max {self._expiration_max_items})') | ||
| if self._expiration_secs > 0: | ||
| if current_time is None: | ||
| current_time = int(time()) | ||
|
|
||
| async def expire_kvs() -> None: | ||
| if self._kvs is None: | ||
| raise ValueError('Key value store not initialized') | ||
| # Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order, | ||
| # so stale entries may linger. This only reclaims storage; `retrieve_response` already treats | ||
| # an expired entry as a cache miss. | ||
| processed = 0 | ||
| async for item in self._kvs.iterate_keys(): | ||
| if processed >= self._expiration_max_items: | ||
| break | ||
| processed += 1 | ||
| value = await self._kvs.get_value(item.key) | ||
| try: | ||
| gzip_time = read_gzip_time(value) | ||
| except Exception as e: | ||
| logger.warning(f'Malformed cache item {item.key}: {e}') | ||
| await self._kvs.delete_value(item.key) | ||
| else: | ||
| if self._expiration_secs < current_time - gzip_time: | ||
| logger.debug(f'Expired cache item {item.key}') | ||
| await self._kvs.delete_value(item.key) | ||
| else: | ||
| logger.debug(f'Valid cache item {item.key}') | ||
|
|
||
| self._async_thread.run_coro(expire_kvs()) | ||
|
|
||
| logger.debug('Closing cache storage') | ||
|
|
||
| # Best-effort: a cleanup failure is logged and swallowed (the sweep only reclaims storage, so failing it | ||
| # must not turn a normal spider close into an error), and `close` always runs in the `finally`, so | ||
| # neither the failure nor an early return can leak the event-loop thread. | ||
| try: | ||
| self._async_thread.close() | ||
| except KeyboardInterrupt: | ||
| logger.warning('Shutdown interrupted by KeyboardInterrupt!') | ||
| if self._expiration_secs > 0: | ||
| self._async_thread.run_coro(self._expire_kvs(current_time)) | ||
| except Exception: | ||
| logger.exception('Exception occurred while shutting down cache storage') | ||
| logger.exception('Failed to clean up expired cache items.') | ||
| finally: | ||
| logger.debug('Cache storage closed') | ||
| logger.debug('Closing cache storage') | ||
| try: | ||
| self._async_thread.close() | ||
| except KeyboardInterrupt: | ||
| logger.warning('Shutdown interrupted by KeyboardInterrupt!') | ||
| except Exception: | ||
| logger.exception('Exception occurred while shutting down cache storage') | ||
| finally: | ||
| logger.debug('Cache storage closed') | ||
|
|
||
| def retrieve_response(self, _: Spider, request: Request, current_time: int | None = None) -> Response | None: | ||
| """Retrieve a response from the cache storage.""" | ||
| """Retrieve a cached response for a request. | ||
|
|
||
| A malformed, legacy, or expired cache entry is treated as a miss, so Scrapy re-fetches the request and | ||
| re-stores it in the current format. | ||
|
|
||
| Args: | ||
| _: The spider making the request. Part of Scrapy's storage interface, unused here. | ||
| request: The request to look up in the cache. | ||
| current_time: Unix time in seconds used as the current time when checking whether the entry has | ||
| expired. Defaults to the current time. | ||
|
|
||
| Returns: | ||
| The cached response on a hit, or `None` on a miss, an expired entry, or an unreadable entry. | ||
| """ | ||
| if self._async_thread is None: | ||
| raise ValueError('Async thread not initialized') | ||
| if self._kvs is None: | ||
|
|
@@ -122,7 +160,13 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non | |
| raise ValueError('Request fingerprinter not initialized') | ||
|
|
||
| key = self._fingerprinter.fingerprint(request).hex() | ||
| value = self._async_thread.run_coro(self._kvs.get_value(key)) | ||
| # Log here before re-raising: this coroutine ran on a separate event-loop thread, and the failure is | ||
| # otherwise easy to lose as it crosses that thread boundary back into Scrapy's synchronous machinery. | ||
| try: | ||
| value = self._async_thread.run_coro(self._kvs.get_value(key)) | ||
| except Exception: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This pattern is really hard to understand without more context. I think this deserves an inline comment, as the natural thing to do with this is to remove the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is used everywhere. IIRC, there were some issues with the exception propagation from a separate thread and its event loop. This was extreme pain to make it work, so I definitely wouldn't change it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Swallowing all exceptions to add additional log and re-raise the exception is an intentional non-standard action, which is here due to some external context circumstances. Those external circumstances are not obvious on this level, so that is why I am asking for some comment, so that the reader without context can understand the intention of the code. I am not asking to change this, but to help protect it by adding a comment explaining it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, okay, I just thought you wanted an explanation, sorry, adding it. |
||
| logger.exception('Failed to retrieve a response from the cache.') | ||
| raise | ||
|
|
||
| if value is None: | ||
| logger.debug('Cache miss', extra={'request': request}) | ||
|
|
@@ -139,6 +183,7 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non | |
| if 0 < self._expiration_secs < current_time - read_gzip_time(value): | ||
| logger.debug('Cache expired', extra={'request': request}) | ||
| return None | ||
|
|
||
| data = from_gzip(value) | ||
| url = data['url'] | ||
| status = data['status'] | ||
|
|
@@ -153,7 +198,13 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non | |
| return respcls(url=url, headers=headers, status=status, body=body) | ||
|
|
||
| def store_response(self, _: Spider, request: Request, response: Response) -> None: | ||
| """Store a response in the cache storage.""" | ||
| """Store a response in the cache storage. | ||
|
|
||
| Args: | ||
| _: The spider that produced the response. Part of Scrapy's storage interface, unused here. | ||
| request: The request the response belongs to. Its fingerprint is used as the cache key. | ||
| response: The response to store in the cache. | ||
| """ | ||
| if self._async_thread is None: | ||
| raise ValueError('Async thread not initialized') | ||
| if self._kvs is None: | ||
|
|
@@ -169,7 +220,42 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non | |
| 'body': response.body, | ||
| } | ||
| value = to_gzip(data) | ||
| self._async_thread.run_coro(self._kvs.set_value(key, value)) | ||
| # Log here before re-raising: this coroutine ran on a separate event-loop thread, and the failure is | ||
| # otherwise easy to lose as it crosses that thread boundary back into Scrapy's synchronous machinery. | ||
| try: | ||
| self._async_thread.run_coro(self._kvs.set_value(key, value)) | ||
| except Exception: | ||
| logger.exception('Failed to store a response in the cache.') | ||
| raise | ||
|
|
||
| async def _expire_kvs(self, current_time: int) -> None: | ||
| """Sweep the cache key-value store, deleting expired or unreadable entries. | ||
|
|
||
| Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order, so stale | ||
| entries may linger. This only reclaims storage; `retrieve_response` already treats an expired entry as | ||
| a cache miss. | ||
| """ | ||
| if self._kvs is None: | ||
| raise ValueError('Key value store not initialized') | ||
|
|
||
| processed = 0 | ||
|
|
||
| async for item in self._kvs.iterate_keys(): | ||
| if processed >= self._expiration_max_items: | ||
| break | ||
|
|
||
| processed += 1 | ||
| value = await self._kvs.get_value(item.key) | ||
|
|
||
| try: | ||
| gzip_time = read_gzip_time(value) | ||
| except Exception as exc: | ||
| logger.warning(f'Malformed cache item {item.key}: {exc}') | ||
| await self._kvs.delete_value(item.key) | ||
| else: | ||
| if self._expiration_secs < current_time - gzip_time: | ||
| logger.debug(f'Expired cache item {item.key}') | ||
| await self._kvs.delete_value(item.key) | ||
|
|
||
|
|
||
| def to_gzip(data: dict, mtime: int | None = None) -> bytes: | ||
|
|
@@ -219,7 +305,8 @@ def get_kvs_name(spider_name: str, max_length: int = 60) -> str: | |
| spider_name: Value of the Spider instance's name attribute. | ||
| max_length: Maximum length of the key value store name. | ||
|
|
||
| Returns: Key value store name. | ||
| Returns: | ||
| Key value store name. | ||
|
|
||
| Raises: | ||
| ValueError: If the spider name contains only special characters. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.