Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 35 additions & 23 deletions src/apify/scrapy/_async_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,51 +52,63 @@ def run_coro(
The result returned by the coroutine.

Raises:
RuntimeError: If the event loop is not running.
RuntimeError: If the event loop has been closed.
TimeoutError: If the coroutine does not complete within the timeout.
Exception: Any exception raised during coroutine execution.
"""
if timeout == 'default':
timeout = self._default_timeout

if not self._eventloop.is_running():
raise RuntimeError(f'The coroutine {coro} cannot be executed because the event loop is not running.')
if self._eventloop.is_closed():
raise RuntimeError(f'The coroutine {coro} cannot be executed because the event loop is closed.')

# Submit the coroutine to the event loop running in the other thread.
future = asyncio.run_coroutine_threadsafe(coro, self._eventloop)
try:
# Wait for the coroutine's result until the specified timeout.
return future.result(timeout=timeout.total_seconds())
except futures.TimeoutError as exc:
logger.exception('Coroutine execution timed out.', exc_info=exc)
raise
except Exception as exc:
logger.exception('Coroutine execution raised an exception.', exc_info=exc)
except futures.TimeoutError:
# `future.result` gave up, but the coroutine keeps running on the loop; cancel it so it does
# not outlive the timeout. The propagated error is logged once by the caller (or Scrapy), so
# this method does not log it itself.
future.cancel()
raise

def close(self, timeout: timedelta = timedelta(seconds=60)) -> None:
def close(self, timeout: timedelta | None = None) -> None:
"""Close the event loop and its thread gracefully.

This method cancels all pending tasks, stops the event loop, and waits for the thread to exit.
If the thread does not exit within the given timeout, a forced shutdown is attempted.

Args:
timeout: The maximum number of seconds to wait for the event loop thread to exit.
timeout: The maximum time to wait for the event loop thread to exit. Pass `None` to use the
`default_timeout` passed to the constructor.
"""
if self._eventloop.is_running():
# Cancel all pending tasks in the event loop.
self.run_coro(self._shutdown_tasks())
if timeout is None:
timeout = self._default_timeout

# A repeated close (e.g. a retried shutdown) would call into the already-closed loop and raise
# `RuntimeError: Event loop is closed`. The loop closes itself once it stops, so a second close
# is a no-op.
if self._eventloop.is_closed():
return

# Schedule the event loop to stop.
self._eventloop.call_soon_threadsafe(self._eventloop.stop)
try:
if self._eventloop.is_running():
# Cancel all pending tasks in the event loop, honouring the caller's timeout.
self.run_coro(self._shutdown_tasks(), timeout=timeout)
finally:
# Stop the loop and join its thread even if cancelling the pending tasks above raised or timed
# out. Skipping this would leave the loop running and leak its thread.
self._eventloop.call_soon_threadsafe(self._eventloop.stop)

# Wait for the event loop thread to finish execution.
self._thread.join(timeout=timeout.total_seconds())
# Wait for the event loop thread to finish execution.
self._thread.join(timeout=timeout.total_seconds())

# If the thread is still running after the timeout, force a shutdown.
if self._thread.is_alive():
logger.warning('Event loop thread did not exit cleanly! Forcing shutdown...')
self._force_exit_event_loop()
# If the thread is still running after the timeout, force a shutdown.
if self._thread.is_alive():
logger.warning('Event loop thread did not exit cleanly! Forcing shutdown...')
self._force_exit_event_loop()

def _start_event_loop(self) -> None:
"""Set up and run the asyncio event loop in the dedicated thread."""
Expand Down Expand Up @@ -125,5 +137,5 @@ def _force_exit_event_loop(self) -> None:
logger.info('Forced shutdown of the event loop and its thread...')
self._eventloop.call_soon_threadsafe(self._eventloop.stop)
self._thread.join(timeout=5)
except Exception as exc:
logger.exception('Exception occurred during forced event loop shutdown.', exc_info=exc)
except Exception:
logger.exception('Exception occurred during forced event loop shutdown.')
179 changes: 133 additions & 46 deletions src/apify/scrapy/extensions/_httpcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io
import re
import struct
from datetime import timedelta
from logging import getLogger
from time import time
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -35,16 +36,36 @@ class ApifyCacheStorage:
"""

def __init__(self, settings: BaseSettings) -> None:
# Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`).
self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100)
"""Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`)."""

self._async_thread_timeout = timedelta(seconds=settings.getint('APIFY_ASYNC_THREAD_TIMEOUT_SECS', 60))
"""Caps how long each coroutine run on the background event loop may take."""

self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS')
"""Seconds a cached entry stays fresh; older entries are treated as expired, and `0` disables expiration."""

self._spider: Spider | None = None
"""The Scrapy `Spider` this cache storage is bound to; set in `open_spider`."""

self._kvs: KeyValueStore | None = None
"""The Apify `KeyValueStore` backing the cache; opened in `open_spider`."""

self._fingerprinter: RequestFingerprinterProtocol | None = None
"""Scrapy's request fingerprinter, used to derive the cache key for each request."""

self._async_thread: AsyncThread | None = None
"""Background event-loop thread that runs the storage coroutines from Scrapy's synchronous callbacks."""

def open_spider(self, spider: Spider) -> None:
"""Open the cache storage for a spider."""
"""Open the cache storage for a spider.

Starts the background event-loop thread and opens the spider's key-value store. If opening the store
fails, the freshly started thread is closed so it is not leaked.

Args:
spider: The spider the cache storage is being opened for.
"""
logger.debug('Using Apify key value cache storage', extra={'spider': spider})
self._spider = spider
self._fingerprinter = spider.crawler.request_fingerprinter
Expand All @@ -62,58 +83,75 @@ async def open_kvs() -> KeyValueStore:
return await KeyValueStore.open(name=kvs_name)

logger.debug("Starting background thread for cache storage's event loop")
self._async_thread = AsyncThread()
self._async_thread = AsyncThread(default_timeout=self._async_thread_timeout)
logger.debug(f"Opening cache storage's {kvs_name!r} key value store")
self._kvs = self._async_thread.run_coro(open_kvs())

try:
self._kvs = self._async_thread.run_coro(open_kvs())
except Exception:
logger.exception('Failed to open the cache key-value store.')
# Opening the key-value store failed, so close the freshly started async thread instead of
# leaking its event-loop thread (`close_spider` may never run if `open_spider` fails). Guard
# the close so a secondary failure here cannot mask the original error.
try:
self._async_thread.close()
except Exception:
logger.exception('Failed to close the async thread after a failed cache storage open.')
raise

def close_spider(self, _: Spider, current_time: int | None = None) -> None:
"""Close the cache storage for a spider."""
"""Close the cache storage for a spider.

Runs a best-effort cleanup sweep that deletes expired entries when expiration is enabled, then shuts
down the background event-loop thread. The thread is always closed, even if the sweep fails.

Args:
_: The spider being closed. Part of Scrapy's storage interface, unused here.
current_time: Unix time in seconds used as the current time when deciding which entries have
expired. Defaults to the current time.
"""
if self._async_thread is None:
raise ValueError('Async thread not initialized')

if current_time is None:
current_time = int(time())

logger.info(f'Cleaning up cache items (max {self._expiration_max_items})')
if self._expiration_secs > 0:
if current_time is None:
current_time = int(time())

async def expire_kvs() -> None:
if self._kvs is None:
raise ValueError('Key value store not initialized')
# Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order,
# so stale entries may linger. This only reclaims storage; `retrieve_response` already treats
# an expired entry as a cache miss.
processed = 0
async for item in self._kvs.iterate_keys():
if processed >= self._expiration_max_items:
break
processed += 1
value = await self._kvs.get_value(item.key)
try:
gzip_time = read_gzip_time(value)
except Exception as e:
logger.warning(f'Malformed cache item {item.key}: {e}')
await self._kvs.delete_value(item.key)
else:
if self._expiration_secs < current_time - gzip_time:
logger.debug(f'Expired cache item {item.key}')
await self._kvs.delete_value(item.key)
else:
logger.debug(f'Valid cache item {item.key}')

self._async_thread.run_coro(expire_kvs())

logger.debug('Closing cache storage')

# Best-effort: a cleanup failure is logged and swallowed (the sweep only reclaims storage, so failing it
# must not turn a normal spider close into an error), and `close` always runs in the `finally`, so
# neither the failure nor an early return can leak the event-loop thread.
try:
Comment thread
vdusek marked this conversation as resolved.
self._async_thread.close()
except KeyboardInterrupt:
logger.warning('Shutdown interrupted by KeyboardInterrupt!')
if self._expiration_secs > 0:
self._async_thread.run_coro(self._expire_kvs(current_time))
except Exception:
logger.exception('Exception occurred while shutting down cache storage')
logger.exception('Failed to clean up expired cache items.')
finally:
logger.debug('Cache storage closed')
logger.debug('Closing cache storage')
try:
self._async_thread.close()
except KeyboardInterrupt:
logger.warning('Shutdown interrupted by KeyboardInterrupt!')
except Exception:
logger.exception('Exception occurred while shutting down cache storage')
finally:
logger.debug('Cache storage closed')

def retrieve_response(self, _: Spider, request: Request, current_time: int | None = None) -> Response | None:
"""Retrieve a response from the cache storage."""
"""Retrieve a cached response for a request.

A malformed, legacy, or expired cache entry is treated as a miss, so Scrapy re-fetches the request and
re-stores it in the current format.

Args:
_: The spider making the request. Part of Scrapy's storage interface, unused here.
request: The request to look up in the cache.
current_time: Unix time in seconds used as the current time when checking whether the entry has
expired. Defaults to the current time.

Returns:
The cached response on a hit, or `None` on a miss, an expired entry, or an unreadable entry.
"""
if self._async_thread is None:
raise ValueError('Async thread not initialized')
if self._kvs is None:
Expand All @@ -122,7 +160,13 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non
raise ValueError('Request fingerprinter not initialized')

key = self._fingerprinter.fingerprint(request).hex()
value = self._async_thread.run_coro(self._kvs.get_value(key))
# Log here before re-raising: this coroutine ran on a separate event-loop thread, and the failure is
# otherwise easy to lose as it crosses that thread boundary back into Scrapy's synchronous machinery.
try:
value = self._async_thread.run_coro(self._kvs.get_value(key))
except Exception:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is really hard to understand without more context. I think this deserves an inline comment, as the natural thing to do with this is to remove the try/except as it seems redundant.

@vdusek vdusek Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used everywhere. IIRC, there were some issues with the exception propagation from a separate thread and its event loop. This was extreme pain to make it work, so I definitely wouldn't change it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swallowing all exceptions to add additional log and re-raise the exception is an intentional non-standard action, which is here due to some external context circumstances. Those external circumstances are not obvious on this level, so that is why I am asking for some comment, so that the reader without context can understand the intention of the code.

I am not asking to change this, but to help protect it by adding a comment explaining it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, okay, I just thought you wanted an explanation, sorry, adding it.

logger.exception('Failed to retrieve a response from the cache.')
raise

if value is None:
logger.debug('Cache miss', extra={'request': request})
Expand All @@ -139,6 +183,7 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non
if 0 < self._expiration_secs < current_time - read_gzip_time(value):
logger.debug('Cache expired', extra={'request': request})
return None

data = from_gzip(value)
url = data['url']
status = data['status']
Expand All @@ -153,7 +198,13 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non
return respcls(url=url, headers=headers, status=status, body=body)

def store_response(self, _: Spider, request: Request, response: Response) -> None:
"""Store a response in the cache storage."""
"""Store a response in the cache storage.

Args:
_: The spider that produced the response. Part of Scrapy's storage interface, unused here.
request: The request the response belongs to. Its fingerprint is used as the cache key.
response: The response to store in the cache.
"""
if self._async_thread is None:
raise ValueError('Async thread not initialized')
if self._kvs is None:
Expand All @@ -169,7 +220,42 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non
'body': response.body,
}
value = to_gzip(data)
self._async_thread.run_coro(self._kvs.set_value(key, value))
# Log here before re-raising: this coroutine ran on a separate event-loop thread, and the failure is
# otherwise easy to lose as it crosses that thread boundary back into Scrapy's synchronous machinery.
try:
self._async_thread.run_coro(self._kvs.set_value(key, value))
except Exception:
logger.exception('Failed to store a response in the cache.')
raise

async def _expire_kvs(self, current_time: int) -> None:
"""Sweep the cache key-value store, deleting expired or unreadable entries.

Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order, so stale
entries may linger. This only reclaims storage; `retrieve_response` already treats an expired entry as
a cache miss.
"""
if self._kvs is None:
raise ValueError('Key value store not initialized')

processed = 0

async for item in self._kvs.iterate_keys():
if processed >= self._expiration_max_items:
break

processed += 1
value = await self._kvs.get_value(item.key)

try:
gzip_time = read_gzip_time(value)
except Exception as exc:
logger.warning(f'Malformed cache item {item.key}: {exc}')
await self._kvs.delete_value(item.key)
else:
if self._expiration_secs < current_time - gzip_time:
logger.debug(f'Expired cache item {item.key}')
await self._kvs.delete_value(item.key)


def to_gzip(data: dict, mtime: int | None = None) -> bytes:
Expand Down Expand Up @@ -219,7 +305,8 @@ def get_kvs_name(spider_name: str, max_length: int = 60) -> str:
spider_name: Value of the Spider instance's name attribute.
max_length: Maximum length of the key value store name.

Returns: Key value store name.
Returns:
Key value store name.

Raises:
ValueError: If the spider name contains only special characters.
Expand Down
4 changes: 2 additions & 2 deletions src/apify/scrapy/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequ
apify_request = ApifyRequest.from_url(**request_kwargs)
scrapy_request_dict = scrapy_request.to_dict(spider=spider)

except Exception as exc:
logger.warning(f'Conversion of Scrapy request {scrapy_request} to Apify request failed; {exc}')
except Exception:
logger.exception(f'Conversion of Scrapy request {scrapy_request} to Apify request failed; skipping it.')
return None

# Serialize the Scrapy request as JSON under 'scrapy_request'. Kept outside the broad except above so
Expand Down
Loading