Skip to content

Commit 9fc4648

Browse files
authored
feat: use SessionPool in BasicCrawler (#128)
- closes #110 - `BasicCrawler` now uses `SessionPool` to fill in a session into the crawling context - there is a separate retry mechanism for session errors (when we get blocked) - cookies from HTTP responses are persisted in the respective sessions
1 parent 93001a8 commit 9fc4648

16 files changed

Lines changed: 470 additions & 87 deletions

File tree

src/crawlee/_utils/blocked.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Inspiration: https://github.com/apify/crawlee/blob/v3.9.2/packages/utils/src/internals/blocked.ts
2+
3+
CLOUDFLARE_RETRY_CSS_SELECTORS = [
4+
'#turnstile-wrapper iframe[src^="https://challenges.cloudflare.com"]',
5+
]
6+
7+
RETRY_CSS_SELECTORS = [
8+
*CLOUDFLARE_RETRY_CSS_SELECTORS,
9+
'div#infoDiv0 a[href*="//www.google.com/policies/terms/"]',
10+
'iframe[src*="_Incapsula_Resource"]',
11+
]
12+
"""
13+
CSS selectors for elements that should trigger a retry, as the crawler is likely getting blocked.
14+
"""
15+
16+
ROTATE_PROXY_ERRORS = [
17+
'ECONNRESET',
18+
'ECONNREFUSED',
19+
'ERR_PROXY_CONNECTION_FAILED',
20+
'ERR_TUNNEL_CONNECTION_FAILED',
21+
'Proxy responded with',
22+
]
23+
"""
24+
Content of proxy errors that should trigger a retry, as the proxy is likely getting blocked / is malfunctioning.
25+
"""

src/crawlee/basic_crawler/basic_crawler.py

Lines changed: 103 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from __future__ import annotations
33

44
import tempfile
5+
from contextlib import AsyncExitStack
56
from datetime import timedelta
67
from functools import partial
78
from logging import getLogger
@@ -18,9 +19,13 @@
1819
from crawlee.autoscaling.system_status import SystemStatus
1920
from crawlee.basic_crawler.context_pipeline import (
2021
ContextPipeline,
22+
)
23+
from crawlee.basic_crawler.errors import (
2124
ContextPipelineInitializationError,
2225
ContextPipelineInterruptedError,
2326
RequestHandlerError,
27+
SessionError,
28+
UserDefinedErrorHandlerError,
2429
)
2530
from crawlee.basic_crawler.router import Router
2631
from crawlee.basic_crawler.types import (
@@ -34,12 +39,14 @@
3439
from crawlee.events.local_event_manager import LocalEventManager
3540
from crawlee.http_clients.httpx_client import HttpxClient
3641
from crawlee.request import BaseRequestData, Request, RequestState
42+
from crawlee.sessions import SessionPool
3743
from crawlee.storages.request_queue import RequestQueue
3844

3945
if TYPE_CHECKING:
4046
import re
4147

4248
from crawlee.http_clients.base_http_client import BaseHttpClient, HttpResponse
49+
from crawlee.sessions.session import Session
4350
from crawlee.storages.request_provider import RequestProvider
4451

4552
TCrawlingContext = TypeVar('TCrawlingContext', bound=BasicCrawlingContext, default=BasicCrawlingContext)
@@ -49,10 +56,6 @@
4956
logger = getLogger(__name__)
5057

5158

52-
class UserDefinedErrorHandlerError(Exception):
53-
"""Wraps an exception thrown from an user-defined error handler."""
54-
55-
5659
class BasicCrawler(Generic[TCrawlingContext]):
5760
"""Provides a simple framework for parallel crawling of web pages.
5861
@@ -72,8 +75,12 @@ def __init__(
7275
http_client: BaseHttpClient | None = None,
7376
concurrency_settings: ConcurrencySettings | None = None,
7477
max_request_retries: int = 3,
78+
max_session_rotations: int = 10,
7579
configuration: Configuration | None = None,
7680
request_handler_timeout: timedelta = timedelta(minutes=1),
81+
session_pool: SessionPool | None = None,
82+
use_session_pool: bool = True,
83+
retry_on_blocked: bool = True,
7784
_context_pipeline: ContextPipeline[TCrawlingContext] | None = None,
7885
) -> None:
7986
"""Initialize the BasicCrawler.
@@ -84,8 +91,14 @@ def __init__(
8491
http_client: HTTP client to be used for `BasicCrawlingContext.send_request` and HTTP-only crawling.
8592
concurrency_settings: Allows fine-tuning concurrency levels
8693
max_request_retries: Maximum amount of attempts at processing a request
94+
max_session_rotations: Maximum number of session rotations per request.
95+
The crawler will automatically rotate the session in case of a proxy error or if it gets blocked by
96+
the website.
8797
configuration: Crawler configuration
8898
request_handler_timeout: How long is a single request handler allowed to run
99+
use_session_pool: Enables using the session pool for crawling
100+
session_pool: A preconfigured SessionPool instance if you wish to use non-default configuration
101+
retry_on_blocked: If set to True, the crawler will try to automatically bypass any detected bot protection
89102
_context_pipeline: Allows extending the request lifecycle and modifying the crawling context.
90103
This parameter is meant to be used by child classes, not when BasicCrawler is instantiated directly.
91104
"""
@@ -105,6 +118,7 @@ def __init__(
105118
self._failed_request_handler: FailedRequestHandler[TCrawlingContext] | None = None
106119

107120
self._max_request_retries = max_request_retries
121+
self._max_session_rotations = max_session_rotations
108122

109123
self._request_provider = request_provider
110124
self._configuration = configuration or Configuration()
@@ -129,6 +143,11 @@ def __init__(
129143
concurrency_settings=concurrency_settings,
130144
)
131145

146+
self._use_session_pool = use_session_pool
147+
self._session_pool: SessionPool = session_pool or SessionPool()
148+
149+
self._retry_on_blocked = retry_on_blocked
150+
132151
@property
133152
def router(self) -> Router[TCrawlingContext]:
134153
"""The router used to handle each individual crawling request."""
@@ -144,6 +163,20 @@ def router(self, router: Router[TCrawlingContext]) -> None:
144163

145164
self._router = router
146165

166+
async def _get_session(self) -> Session | None:
167+
"""If session pool is being used, try to take a session from it."""
168+
if not self._use_session_pool:
169+
return None
170+
171+
return await wait_for(
172+
self._session_pool.get_session,
173+
timeout=self._internal_timeout,
174+
timeout_message='Fetching a session from the pool timed out after '
175+
f'{self._internal_timeout.total_seconds()} seconds',
176+
max_retries=3,
177+
logger=logger,
178+
)
179+
147180
async def get_request_provider(self) -> RequestProvider:
148181
"""Return the configured request provider. If none is configured, open and return the default request queue."""
149182
if not self._request_provider:
@@ -188,19 +221,29 @@ async def run(self, requests: list[str | BaseRequestData] | None = None) -> Fina
188221
if requests is not None:
189222
await self.add_requests(requests)
190223

191-
async with self._event_manager, self._snapshotter:
224+
async with AsyncExitStack() as exit_stack:
225+
await exit_stack.enter_async_context(self._event_manager)
226+
await exit_stack.enter_async_context(self._snapshotter)
227+
228+
if self._use_session_pool:
229+
await exit_stack.enter_async_context(self._session_pool)
230+
192231
await self._pool.run()
193232

194233
return FinalStatistics()
195234

196-
def _should_retry_request(self, crawling_context: BasicCrawlingContext) -> bool:
235+
def _should_retry_request(self, crawling_context: BasicCrawlingContext, error: Exception) -> bool:
236+
if crawling_context.request.no_retry:
237+
return False
238+
239+
if isinstance(error, SessionError):
240+
return ((crawling_context.request.session_rotation_count or 0) + 1) < self._max_session_rotations
241+
197242
max_request_retries = crawling_context.request.max_retries
198243
if max_request_retries is None:
199244
max_request_retries = self._max_request_retries
200245

201-
return (
202-
not crawling_context.request.no_retry and (crawling_context.request.retry_count + 1) < max_request_retries
203-
)
246+
return (crawling_context.request.retry_count + 1) < max_request_retries
204247

205248
async def _check_url_after_redirects(
206249
self, crawling_context: TCrawlingContext
@@ -273,7 +316,7 @@ def _check_url_patterns(
273316
async def _handle_request_error(self, crawling_context: TCrawlingContext, error: Exception) -> None:
274317
request_provider = await self.get_request_provider()
275318

276-
if self._should_retry_request(crawling_context):
319+
if self._should_retry_request(crawling_context, error):
277320
request = crawling_context.request
278321
request.retry_count += 1
279322

@@ -307,9 +350,16 @@ async def _handle_failed_request(self, crawling_context: TCrawlingContext, error
307350
except Exception as e:
308351
raise UserDefinedErrorHandlerError('Exception thrown in user-defined failed request handler') from e
309352

310-
def _prepare_send_request_function(self) -> SendRequestFunction:
311-
async def send_request(url: str, *, method: str = 'get', headers: dict[str, str] | None = None) -> HttpResponse:
312-
return await self._http_client.send_request(url, method=method, headers=httpx.Headers(headers))
353+
def _prepare_send_request_function(self, session: Session | None) -> SendRequestFunction:
354+
async def send_request(
355+
url: str,
356+
*,
357+
method: str = 'get',
358+
headers: dict[str, str] | None = None,
359+
) -> HttpResponse:
360+
return await self._http_client.send_request(
361+
url, method=method, headers=httpx.Headers(headers), session=session
362+
)
313363

314364
return send_request
315365

@@ -350,7 +400,7 @@ async def __is_task_ready_function(self) -> bool:
350400
request_provider = await self.get_request_provider()
351401
return not await request_provider.is_empty()
352402

353-
async def __run_task_function(self) -> None:
403+
async def __run_task_function(self) -> None: # noqa: PLR0912
354404
request_provider = await self.get_request_provider()
355405

356406
request = await wait_for(
@@ -364,14 +414,13 @@ async def __run_task_function(self) -> None:
364414
if request is None:
365415
return
366416

367-
# TODO: fetch session from the session pool
368-
# https://github.com/apify/crawlee-py/issues/110
369-
417+
session = await self._get_session()
370418
result = RequestHandlerRunResult()
371419

372420
crawling_context = BasicCrawlingContext(
373421
request=request,
374-
send_request=self._prepare_send_request_function(),
422+
session=session,
423+
send_request=self._prepare_send_request_function(session),
375424
add_requests=result.add_requests,
376425
)
377426

@@ -398,6 +447,9 @@ async def __run_task_function(self) -> None:
398447
)
399448

400449
request.state = RequestState.DONE
450+
451+
if crawling_context.session:
452+
crawling_context.session.mark_good()
401453
except RequestHandlerError as primary_error:
402454
primary_error = cast(
403455
RequestHandlerError[TCrawlingContext], primary_error
@@ -428,6 +480,34 @@ async def __run_task_function(self) -> None:
428480
)
429481
request.state = RequestState.ERROR
430482
raise
483+
484+
if crawling_context.session:
485+
crawling_context.session.mark_bad()
486+
except SessionError as session_error:
487+
if not crawling_context.session:
488+
raise RuntimeError('SessionError raised in a crawling context without a session') from session_error
489+
490+
if self._should_retry_request(crawling_context, session_error):
491+
logger.warning('Encountered a session error, rotating session and retrying')
492+
493+
crawling_context.session.retire()
494+
495+
if crawling_context.request.session_rotation_count is None:
496+
crawling_context.request.session_rotation_count = 0
497+
crawling_context.request.session_rotation_count += 1
498+
499+
await request_provider.reclaim_request(request)
500+
else:
501+
logger.exception('Request failed and reached maximum retries', exc_info=session_error)
502+
503+
await wait_for(
504+
lambda: request_provider.mark_request_as_handled(crawling_context.request),
505+
timeout=self._internal_timeout,
506+
timeout_message='Marking request as handled timed out after '
507+
f'{self._internal_timeout.total_seconds()} seconds',
508+
logger=logger,
509+
max_retries=3,
510+
)
431511
except ContextPipelineInterruptedError as interruped_error:
432512
logger.debug('The context pipeline was interrupted', exc_info=interruped_error)
433513

@@ -440,9 +520,9 @@ async def __run_task_function(self) -> None:
440520
max_retries=3,
441521
)
442522
except ContextPipelineInitializationError as initialization_error:
443-
if self._should_retry_request(crawling_context):
523+
if self._should_retry_request(crawling_context, initialization_error):
444524
logger.debug(
445-
'An exception occured during the initialization of crawling context, a retry is in order',
525+
'An exception occurred during the initialization of crawling context, a retry is in order',
446526
exc_info=initialization_error,
447527
)
448528

@@ -461,6 +541,9 @@ async def __run_task_function(self) -> None:
461541
logger=logger,
462542
max_retries=3,
463543
)
544+
545+
if crawling_context.session:
546+
crawling_context.session.mark_bad()
464547
except Exception as internal_error:
465548
logger.exception(
466549
'An exception occurred during handling of a request. This places the crawler '

src/crawlee/basic_crawler/context_pipeline.py

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,46 +4,19 @@
44

55
from typing_extensions import TypeVar
66

7+
from crawlee.basic_crawler.errors import (
8+
ContextPipelineFinalizationError,
9+
ContextPipelineInitializationError,
10+
ContextPipelineInterruptedError,
11+
RequestHandlerError,
12+
SessionError,
13+
)
714
from crawlee.basic_crawler.types import BasicCrawlingContext
815

916
TCrawlingContext = TypeVar('TCrawlingContext', bound=BasicCrawlingContext, default=BasicCrawlingContext)
1017
TMiddlewareCrawlingContext = TypeVar('TMiddlewareCrawlingContext', bound=BasicCrawlingContext)
1118

1219

13-
class RequestHandlerError(Exception, Generic[TCrawlingContext]):
14-
"""Wraps an exception thrown from a request handler (router) and extends it with crawling context."""
15-
16-
def __init__(self, wrapped_exception: Exception, crawling_context: TCrawlingContext) -> None:
17-
self.wrapped_exception = wrapped_exception
18-
self.crawling_context = crawling_context
19-
20-
21-
class ContextPipelineInitializationError(Exception):
22-
"""Wraps an exception thrown in the initialization step of a context pipeline middleware.
23-
24-
We may not have the complete context at this point, so only `BasicCrawlingContext` is provided.
25-
"""
26-
27-
def __init__(self, wrapped_exception: Exception, crawling_context: BasicCrawlingContext) -> None:
28-
self.wrapped_exception = wrapped_exception
29-
self.crawling_context = crawling_context
30-
31-
32-
class ContextPipelineInterruptedError(Exception):
33-
"""May be thrown in the initialization phase of a middleware to signal that the request should not be processed."""
34-
35-
36-
class ContextPipelineFinalizationError(Exception):
37-
"""Wraps an exception thrown in the finalization step of a context pipeline middleware.
38-
39-
We may not have the complete context at this point, so only `BasicCrawlingContext` is provided.
40-
"""
41-
42-
def __init__(self, wrapped_exception: Exception, crawling_context: BasicCrawlingContext) -> None:
43-
self.wrapped_exception = wrapped_exception
44-
self.crawling_context = crawling_context
45-
46-
4720
class ContextPipeline(Generic[TCrawlingContext]):
4821
"""Encapsulates the logic of gradually enhancing the crawling context with additional information and utilities.
4922
@@ -87,6 +60,8 @@ async def __call__(
8760
middleware_instance = member._middleware(crawling_context) # noqa: SLF001
8861
try:
8962
result = await middleware_instance.__anext__()
63+
except SessionError: # Session errors get special treatment
64+
raise
9065
except StopAsyncIteration as e:
9166
raise RuntimeError('The middleware did not yield') from e
9267
except ContextPipelineInterruptedError:
@@ -99,6 +74,8 @@ async def __call__(
9974

10075
try:
10176
await final_context_consumer(cast(TCrawlingContext, crawling_context))
77+
except SessionError: # Session errors get special treatment
78+
raise
10279
except Exception as e:
10380
raise RequestHandlerError(e, crawling_context) from e
10481
finally:

0 commit comments

Comments
 (0)