interacts with the scraperAPI service and converts the retrieved Documents to Markdown.

ScraperAPIStep

Bases: TypedStep[ScraperAPISettings, list[UrlItem], list[MarkdownDataContract]]

ScraperAPIStep uses the ScraperAPI service to srape the html by the given url through list[UrlItem]. this html gets filtered and transformed to MarkdownDataContract.

Source code in wurzel/steps/scraperapi/step.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class ScraperAPIStep(TypedStep[ScraperAPISettings, list[UrlItem], list[MarkdownDataContract]]):
    """ScraperAPIStep uses the ScraperAPI service to srape the html by the given url through list[UrlItem].
    this html gets filtered and transformed to MarkdownDataContract.
    """

    def run(self, inpt: list[UrlItem]) -> list[MarkdownDataContract]:
        def fetch_and_process(url_item: UrlItem, recursion_depth=0):
            session = requests.Session()
            retries = Retry(
                total=self.settings.RETRY, backoff_factor=0.1, raise_on_status=False, status_forcelist=[403, 500, 502, 503, 504]
            )
            session.mount("https://", HTTPAdapter(max_retries=retries))
            payload = {
                "api_key": self.settings.TOKEN.get_secret_value(),
                "url": url_item.url,
                "device_type": self.settings.DEVICE_TYPE,
                "follow_redirect": str(self.settings.FOLLOW_REDIRECT).lower(),
                "wait_for_selector": self.settings.WAIT_FOR_SELECTOR,
                "country_code": self.settings.COUNTRY_CODE,
                "render": str(self.settings.RENDER).lower(),
                "premium": str(self.settings.PREMIUM).lower(),
                "ultra_premium": str(self.settings.ULTRA_PREMIUM).lower(),
                "screenshot": str(self.settings.SCREENSHOT).lower(),
                "max_cost": str(self.settings.MAX_COST),
            }
            try:
                r = None  # for short error handling
                r = session.get(self.settings.API, params=payload, timeout=self.settings.TIMEOUT)
                r.raise_for_status()
            except requests.exceptions.ReadTimeout:
                log.warning(
                    "Crawling failed due to timeout",
                    extra={"url": url_item.url},
                )
                return None
            except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError):
                log.warning(
                    "Crawling failed",
                    extra={"url": url_item.url, "status": r.status_code if r else None, "retries": self.settings.RETRY},
                )
                return None

            try:
                md = to_markdown(self._filter_body(r.text), self.settings.HTML2MD_SETTINGS)
            except (KeyError, IndexError):
                if recursion_depth > self.settings.RETRY:
                    log.warning("xpath retry failed", extra={"filter": self.settings.XPATH, "url": url_item.url})
                    return None
                log.warning(
                    "website does not have the searched xpath, retrying", extra={"filter": self.settings.XPATH, "url": url_item.url}
                )
                return fetch_and_process(url_item, recursion_depth=recursion_depth + 1)

            progress_bar.update(1)
            return MarkdownDataContract(md=md, url=url_item.url, keywords=url_item.title)

        with tqdm(total=len(inpt), desc="Processing URLs") as progress_bar:
            results = Parallel(n_jobs=self.settings.CONCURRENCY_NUM, backend="threading")(delayed(fetch_and_process)(item) for item in inpt)

        filtered_results = [res for res in results if res]
        if not filtered_results:
            raise StepFailed("no results from scraperAPI")

        return filtered_results

    def __init__(self) -> None:
        logging.getLogger("urllib3").setLevel("ERROR")
        super().__init__()

    def finalize(self) -> None:
        logging.getLogger("urllib3").setLevel("WARNING")

        return super().finalize()

    def _filter_body(self, html: str) -> str:
        tree: lxml.html = lxml.html.fromstring(html)
        tree = tree.xpath(self.settings.XPATH)[0]
        return html2str(tree)

interacts with the scraperAPI service and converts the retrieved Documents to Markdown.

ScraperAPISettings

Bases: Settings

Settings of ScraperAPIStep. Mainly the list of https://docs.scraperapi.com/python/credits-and-requests.

Source code in wurzel/steps/scraperapi/settings.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class ScraperAPISettings(Settings):
    """Settings of ScraperAPIStep. Mainly the list of https://docs.scraperapi.com/python/credits-and-requests."""

    API: str = "https://api.scraperapi.com/"
    RETRY: int = Field(ge=0, default=5)
    TOKEN: SecretStr
    TIMEOUT: int = 61.0
    XPATH: str = "//main"
    CONCURRENCY_NUM: int = Field(gt=0, default=1)
    DEVICE_TYPE: str = "desktop"
    FOLLOW_REDIRECT: bool = True
    WAIT_FOR_SELECTOR: str = "#cookies-notification-accept-cookie"
    COUNTRY_CODE: str = "en"
    RENDER: bool = True
    PREMIUM: bool = False
    ULTRA_PREMIUM: bool = False
    SCREENSHOT: bool = False
    MAX_COST: int = Field(gt=0, default=30)
    HTML2MD_SETTINGS: MarkdownConverterSettings = Field(
        default_factory=MarkdownConverterSettings, description="Settings for the Markdown converter."
    )