diff --git a/src/crawlee/storages/_request_queue.py b/src/crawlee/storages/_request_queue.py index 286dfde3a4..eaa93785c9 100644 --- a/src/crawlee/storages/_request_queue.py +++ b/src/crawlee/storages/_request_queue.py @@ -11,6 +11,7 @@ from crawlee._utils.docs import docs_group from crawlee._utils.wait import wait_for_all_tasks_for_finish from crawlee.request_loaders import RequestManager +from crawlee.storage_clients.models import AddRequestsResponse from ._base import Storage from ._utils import validate_storage_name @@ -181,19 +182,18 @@ async def add_request( forefront: bool = False, ) -> ProcessedRequest | None: request = self._transform_request(request) - response = await self._client.add_batch_of_requests([request], forefront=forefront) + # Route through `_process_batch` so a single add retries unprocessed requests just like a batched one. + response = await self._process_batch([request], base_retry_wait=timedelta(seconds=1), forefront=forefront) if response.processed_requests: return response.processed_requests[0] - if response.unprocessed_requests: + # `_process_batch` already warns about requests left unprocessed after retries; only an empty response + # (neither processed nor unprocessed) is unexpected here. + if not response.unprocessed_requests: logger.warning( - f'Request {request.url} was not processed by storage client "{self._client.__class__.__name__}".' - ) - else: - logger.warning( - f'Request {request.url} was not processed by storage client "{self._client.__class__.__name__}" ' - 'received empty response.' + f'Request {request.url} was not processed by storage client ' + f'"{self._client.__class__.__name__}" (received an empty response).' ) return None @@ -352,33 +352,46 @@ async def _process_batch( base_retry_wait: timedelta, attempt: int = 1, forefront: bool = False, - ) -> None: - """Process a batch of requests with automatic retry mechanism.""" + ) -> AddRequestsResponse: + """Process a batch of requests with automatic retry mechanism. + + Returns: + A response aggregating all requests processed across attempts plus any still unprocessed once the + retries are exhausted. + """ max_attempts = 5 response = await self._client.add_batch_of_requests(batch, forefront=forefront) - if response.unprocessed_requests: - logger.debug(f'Following requests were not processed: {response.unprocessed_requests}.') - if attempt > max_attempts: - logger.warning( - f'Following requests were not processed even after {max_attempts} attempts:\n' - f'{response.unprocessed_requests}' - ) - else: - logger.debug('Retry to add requests.') - unprocessed_requests_unique_keys = {request.unique_key for request in response.unprocessed_requests} - retry_batch = [request for request in batch if request.unique_key in unprocessed_requests_unique_keys] - await asyncio.sleep((base_retry_wait * attempt).total_seconds()) - await self._process_batch( - retry_batch, - base_retry_wait=base_retry_wait, - attempt=attempt + 1, - forefront=forefront, - ) - request_count = len(batch) - len(response.unprocessed_requests) - if request_count: logger.debug( f'Added {request_count} requests to the queue. Processed requests: {response.processed_requests}' ) + + if not response.unprocessed_requests: + return response + + logger.debug(f'Following requests were not processed: {response.unprocessed_requests}.') + if attempt > max_attempts: + logger.warning( + f'Following requests were not processed even after {max_attempts} attempts:\n' + f'{response.unprocessed_requests}' + ) + return response + + logger.debug('Retry to add requests.') + unprocessed_requests_unique_keys = {request.unique_key for request in response.unprocessed_requests} + retry_batch = [request for request in batch if request.unique_key in unprocessed_requests_unique_keys] + await asyncio.sleep((base_retry_wait * attempt).total_seconds()) + retry_response = await self._process_batch( + retry_batch, + base_retry_wait=base_retry_wait, + attempt=attempt + 1, + forefront=forefront, + ) + + # Merge the retry outcome: processed requests accumulate, unprocessed is whatever the last attempt left. + return AddRequestsResponse( + processed_requests=[*response.processed_requests, *retry_response.processed_requests], + unprocessed_requests=retry_response.unprocessed_requests, + ) diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py index 9df49ec79f..2f12137625 100644 --- a/tests/unit/storages/test_request_queue.py +++ b/tests/unit/storages/test_request_queue.py @@ -311,6 +311,79 @@ async def patched_add_batch( ) +async def _no_sleep(_seconds: float) -> None: + """Drop-in replacement for `asyncio.sleep` that returns immediately, to keep retry tests fast.""" + + +async def test_add_request_retries_unprocessed(monkeypatch: pytest.MonkeyPatch) -> None: + """`add_request` must retry an unprocessed request (like `add_requests`) instead of silently dropping it.""" + rq = await RequestQueue.open(storage_client=MemoryStorageClient()) + monkeypatch.setattr('crawlee.storages._request_queue.asyncio.sleep', _no_sleep) + calls = 0 + + async def patched_add_batch( + requests: Sequence[Request], + *, + forefront: bool = False, # noqa: ARG001 + ) -> AddRequestsResponse: + nonlocal calls + calls += 1 + # First attempt reports the request as unprocessed; the retry succeeds. + if calls == 1: + return AddRequestsResponse( + processed_requests=[], + unprocessed_requests=[UnprocessedRequest(unique_key=r.unique_key, url=r.url) for r in requests], + ) + return AddRequestsResponse( + processed_requests=[ + ProcessedRequest(unique_key=r.unique_key, was_already_present=False, was_already_handled=False) + for r in requests + ], + unprocessed_requests=[], + ) + + monkeypatch.setattr(rq._client, 'add_batch_of_requests', patched_add_batch) + + try: + result = await rq.add_request('https://example.com/retry') + finally: + await rq.drop() + + assert calls == 2, f'expected one retry after the unprocessed response, got {calls} calls' + assert result is not None + assert result.was_already_present is False + + +async def test_add_request_returns_none_after_exhausting_retries(monkeypatch: pytest.MonkeyPatch) -> None: + """When a request stays unprocessed across all retries, `add_request` returns `None` rather than raising.""" + rq = await RequestQueue.open(storage_client=MemoryStorageClient()) + monkeypatch.setattr('crawlee.storages._request_queue.asyncio.sleep', _no_sleep) + calls = 0 + + async def patched_add_batch( + requests: Sequence[Request], + *, + forefront: bool = False, # noqa: ARG001 + ) -> AddRequestsResponse: + nonlocal calls + calls += 1 + return AddRequestsResponse( + processed_requests=[], + unprocessed_requests=[UnprocessedRequest(unique_key=r.unique_key, url=r.url) for r in requests], + ) + + monkeypatch.setattr(rq._client, 'add_batch_of_requests', patched_add_batch) + + try: + result = await rq.add_request('https://example.com/doomed') + finally: + await rq.drop() + + assert result is None + # One initial attempt plus five retries; the mechanism stops once `attempt` exceeds `max_attempts`. + assert calls == 6, f'expected 6 attempts (1 initial + 5 retries), got {calls}' + + async def test_add_requests_mixed_forefront(rq: RequestQueue) -> None: """Test the ordering when adding requests with mixed forefront values.""" # Add normal requests