Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 43 additions & 30 deletions src/crawlee/storages/_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from crawlee._utils.docs import docs_group
from crawlee._utils.wait import wait_for_all_tasks_for_finish
from crawlee.request_loaders import RequestManager
from crawlee.storage_clients.models import AddRequestsResponse

from ._base import Storage
from ._utils import validate_storage_name
Expand Down Expand Up @@ -181,19 +182,18 @@ async def add_request(
forefront: bool = False,
) -> ProcessedRequest | None:
request = self._transform_request(request)
response = await self._client.add_batch_of_requests([request], forefront=forefront)
# Route through `_process_batch` so a single add retries unprocessed requests just like a batched one.
response = await self._process_batch([request], base_retry_wait=timedelta(seconds=1), forefront=forefront)

if response.processed_requests:
return response.processed_requests[0]

if response.unprocessed_requests:
# `_process_batch` already warns about requests left unprocessed after retries; only an empty response
# (neither processed nor unprocessed) is unexpected here.
if not response.unprocessed_requests:
logger.warning(
f'Request {request.url} was not processed by storage client "{self._client.__class__.__name__}".'
)
else:
logger.warning(
f'Request {request.url} was not processed by storage client "{self._client.__class__.__name__}" '
'received empty response.'
f'Request {request.url} was not processed by storage client '
f'"{self._client.__class__.__name__}" (received an empty response).'
)
return None

Expand Down Expand Up @@ -352,33 +352,46 @@ async def _process_batch(
base_retry_wait: timedelta,
attempt: int = 1,
forefront: bool = False,
) -> None:
"""Process a batch of requests with automatic retry mechanism."""
) -> AddRequestsResponse:
"""Process a batch of requests with automatic retry mechanism.

Returns:
A response aggregating all requests processed across attempts plus any still unprocessed once the
retries are exhausted.
"""
max_attempts = 5
response = await self._client.add_batch_of_requests(batch, forefront=forefront)

if response.unprocessed_requests:
logger.debug(f'Following requests were not processed: {response.unprocessed_requests}.')
if attempt > max_attempts:
logger.warning(
f'Following requests were not processed even after {max_attempts} attempts:\n'
f'{response.unprocessed_requests}'
)
else:
logger.debug('Retry to add requests.')
unprocessed_requests_unique_keys = {request.unique_key for request in response.unprocessed_requests}
retry_batch = [request for request in batch if request.unique_key in unprocessed_requests_unique_keys]
await asyncio.sleep((base_retry_wait * attempt).total_seconds())
await self._process_batch(
retry_batch,
base_retry_wait=base_retry_wait,
attempt=attempt + 1,
forefront=forefront,
)

request_count = len(batch) - len(response.unprocessed_requests)

if request_count:
logger.debug(
f'Added {request_count} requests to the queue. Processed requests: {response.processed_requests}'
)

if not response.unprocessed_requests:
return response

logger.debug(f'Following requests were not processed: {response.unprocessed_requests}.')
if attempt > max_attempts:
logger.warning(
f'Following requests were not processed even after {max_attempts} attempts:\n'
f'{response.unprocessed_requests}'
)
return response

logger.debug('Retry to add requests.')
unprocessed_requests_unique_keys = {request.unique_key for request in response.unprocessed_requests}
retry_batch = [request for request in batch if request.unique_key in unprocessed_requests_unique_keys]
await asyncio.sleep((base_retry_wait * attempt).total_seconds())
retry_response = await self._process_batch(
retry_batch,
base_retry_wait=base_retry_wait,
attempt=attempt + 1,
forefront=forefront,
)

# Merge the retry outcome: processed requests accumulate, unprocessed is whatever the last attempt left.
return AddRequestsResponse(
processed_requests=[*response.processed_requests, *retry_response.processed_requests],
unprocessed_requests=retry_response.unprocessed_requests,
)
73 changes: 73 additions & 0 deletions tests/unit/storages/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,79 @@ async def patched_add_batch(
)


async def _no_sleep(_seconds: float) -> None:
"""Drop-in replacement for `asyncio.sleep` that returns immediately, to keep retry tests fast."""


async def test_add_request_retries_unprocessed(monkeypatch: pytest.MonkeyPatch) -> None:
"""`add_request` must retry an unprocessed request (like `add_requests`) instead of silently dropping it."""
rq = await RequestQueue.open(storage_client=MemoryStorageClient())
monkeypatch.setattr('crawlee.storages._request_queue.asyncio.sleep', _no_sleep)
calls = 0

async def patched_add_batch(
requests: Sequence[Request],
*,
forefront: bool = False, # noqa: ARG001
) -> AddRequestsResponse:
nonlocal calls
calls += 1
# First attempt reports the request as unprocessed; the retry succeeds.
if calls == 1:
return AddRequestsResponse(
processed_requests=[],
unprocessed_requests=[UnprocessedRequest(unique_key=r.unique_key, url=r.url) for r in requests],
)
return AddRequestsResponse(
processed_requests=[
ProcessedRequest(unique_key=r.unique_key, was_already_present=False, was_already_handled=False)
for r in requests
],
unprocessed_requests=[],
)

monkeypatch.setattr(rq._client, 'add_batch_of_requests', patched_add_batch)

try:
result = await rq.add_request('https://example.com/retry')
finally:
await rq.drop()

assert calls == 2, f'expected one retry after the unprocessed response, got {calls} calls'
assert result is not None
assert result.was_already_present is False


async def test_add_request_returns_none_after_exhausting_retries(monkeypatch: pytest.MonkeyPatch) -> None:
"""When a request stays unprocessed across all retries, `add_request` returns `None` rather than raising."""
rq = await RequestQueue.open(storage_client=MemoryStorageClient())
monkeypatch.setattr('crawlee.storages._request_queue.asyncio.sleep', _no_sleep)
calls = 0

async def patched_add_batch(
requests: Sequence[Request],
*,
forefront: bool = False, # noqa: ARG001
) -> AddRequestsResponse:
nonlocal calls
calls += 1
return AddRequestsResponse(
processed_requests=[],
unprocessed_requests=[UnprocessedRequest(unique_key=r.unique_key, url=r.url) for r in requests],
)

monkeypatch.setattr(rq._client, 'add_batch_of_requests', patched_add_batch)

try:
result = await rq.add_request('https://example.com/doomed')
finally:
await rq.drop()

assert result is None
# One initial attempt plus five retries; the mechanism stops once `attempt` exceeds `max_attempts`.
assert calls == 6, f'expected 6 attempts (1 initial + 5 retries), got {calls}'


async def test_add_requests_mixed_forefront(rq: RequestQueue) -> None:
"""Test the ordering when adding requests with mixed forefront values."""
# Add normal requests
Expand Down
Loading