Skip to content

Commit

Permalink
feat(safe-web-driver): enchanced the original AsyncChromiumLoader w…
Browse files Browse the repository at this point in the history
…eb driver with proxy protection and flexible kwargs and backend

the original class prevents passing kwargs down to the playwright backend, making some config unfeasible, including passing a proxy server to the web driver.

the new class has backward compatibility with the original, but 1) allows any kwarg to be passed down to the web driver, 2) allows specifying the web driver backend (only playwright is supported for now) in case more (e.g., selenium) will be supported in the future and 3) automatically fetches a suitable proxy if one is not passed already
  • Loading branch information
DiTo97 committed May 10, 2024
1 parent 2170131 commit 768719c
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 40 deletions.
3 changes: 3 additions & 0 deletions scrapegraphai/docloaders/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""__init__.py file for docloaders folder"""

from .chromium import ChromiumLoader
125 changes: 125 additions & 0 deletions scrapegraphai/docloaders/chromium.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import asyncio
import logging
from typing import Any, AsyncIterator, Iterator, List, Optional

from langchain_core.documents import Document

from ..utils import Proxy, dynamic_import, parse_or_search_proxy


logger = logging.getLogger(__name__)


class ChromiumLoader:
"""scrapes HTML pages from URLs using a (headless) instance of the
Chromium web driver with proxy protection
Attributes:
backend: The web driver backend library; defaults to 'playwright'.
browser_config: A dictionary containing additional browser kwargs.
headless: whether to run browser in headless mode.
proxy: A dictionary containing proxy settings; None disables protection.
urls: A list of URLs to scrape content from.
"""

def __init__(
self,
urls: List[str],
*,
backend: str = "playwright",
headless: bool = True,
proxy: Optional[Proxy] = None,
**kwargs: Any,
):
"""Initialize the loader with a list of URL paths.
Args:
backend: The web driver backend library; defaults to 'playwright'.
headless: whether to run browser in headless mode.
proxy: A dictionary containing proxy information; None disables protection.
urls: A list of URLs to scrape content from.
kwargs: A dictionary containing additional browser kwargs.
Raises:
ImportError: If the required backend package is not installed.
"""
message = (
f"{backend} is required for ChromiumLoader. "
f"Please install it with `pip install {backend}`."
)

dynamic_import(backend, message)

self.backend = backend
self.browser_config = kwargs
self.headless = headless
self.proxy = parse_or_search_proxy(proxy) if proxy else None
self.urls = urls

async def ascrape_playwright(self, url: str) -> str:
"""
Asynchronously scrape the content of a given URL using Playwright's async API.
Args:
url (str): The URL to scrape.
Returns:
str: The scraped HTML content or an error message if an exception occurs.
"""
from playwright.async_api import async_playwright

logger.info("Starting scraping...")
results = ""
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=self.headless, proxy=self.proxy, **self.browser_config
)
try:
page = await browser.new_page()
await page.goto(url)
results = await page.content() # Simply get the HTML content
logger.info("Content scraped")
except Exception as e:
results = f"Error: {e}"
await browser.close()
return results

def lazy_load(self) -> Iterator[Document]:
"""
Lazily load text content from the provided URLs.
This method yields Documents one at a time as they're scraped,
instead of waiting to scrape all URLs before returning.
Yields:
Document: The scraped content encapsulated within a Document object.
"""
scraping_fn = getattr(self, f"ascrape_{self.backend}")

for url in self.urls:
html_content = asyncio.run(scraping_fn(url))
metadata = {"source": url}
yield Document(page_content=html_content, metadata=metadata)

async def alazy_load(self) -> AsyncIterator[Document]:
"""
Asynchronously load text content from the provided URLs.
This method leverages asyncio to initiate the scraping of all provided URLs
simultaneously. It improves performance by utilizing concurrent asynchronous
requests. Each Document is yielded as soon as its content is available,
encapsulating the scraped content.
Yields:
Document: A Document object containing the scraped content, along with its
source URL as metadata.
"""
scraping_fn = getattr(self, f"ascrape_{self.backend}")

tasks = [scraping_fn(url) for url in self.urls]
results = await asyncio.gather(*tasks)
for url, content in zip(self.urls, results):
metadata = {"source": url}
yield Document(page_content=content, metadata=metadata)
92 changes: 52 additions & 40 deletions scrapegraphai/nodes/fetch_node.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
"""
FetchNode Module
"""
import pandas as pd

import json
from typing import List, Optional
from langchain_community.document_loaders import AsyncChromiumLoader
from langchain_core.documents import Document

import pandas as pd
from langchain_community.document_loaders import PyPDFLoader
from .base_node import BaseNode
from langchain_core.documents import Document

from ..docloaders import ChromiumLoader
from ..utils.remover import remover
from .base_node import BaseNode


class FetchNode(BaseNode):
"""
A node responsible for fetching the HTML content of a specified URL and updating
the graph's state with this content. It uses the AsyncChromiumLoader to fetch the
content asynchronously.
the graph's state with this content. It uses ChromiumLoader to fetch
the content from a web page asynchronously (with proxy protection).
This node acts as a starting point in many scraping workflows, preparing the state
with the necessary HTML content for further processing by subsequent nodes in the graph.
Expand All @@ -31,13 +34,21 @@ class FetchNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "Fetch".
"""

def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None, node_name: str = "Fetch"):
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "Fetch",
):
super().__init__(node_name, "node", input, output, 1)

self.headless = True if node_config is None else node_config.get(
"headless", True)
self.verbose = False if node_config is None else node_config.get(
"verbose", False)
self.headless = (
True if node_config is None else node_config.get("headless", True)
)
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)

def execute(self, state):
"""
Expand All @@ -64,10 +75,14 @@ def execute(self, state):
input_data = [state[key] for key in input_keys]

source = input_data[0]
if self.input == "json_dir" or self.input == "xml_dir" or self.input == "csv_dir":
compressed_document = [Document(page_content=source, metadata={
"source": "local_dir"
})]
if (
self.input == "json_dir"
or self.input == "xml_dir"
or self.input == "csv_dir"
):
compressed_document = [
Document(page_content=source, metadata={"source": "local_dir"})
]
# if it is a local directory

# handling for pdf
Expand All @@ -76,45 +91,42 @@ def execute(self, state):
compressed_document = loader.load()

elif self.input == "csv":
compressed_document = [Document(page_content=str(pd.read_csv(source)), metadata={
"source": "csv"
})]
compressed_document = [
Document(
page_content=str(pd.read_csv(source)), metadata={"source": "csv"}
)
]
elif self.input == "json":
f = open(source)
compressed_document = [Document(page_content=str(json.load(f)), metadata={
"source": "json"
})]
compressed_document = [
Document(page_content=str(json.load(f)), metadata={"source": "json"})
]
elif self.input == "xml":
with open(source, 'r', encoding='utf-8') as f:
with open(source, "r", encoding="utf-8") as f:
data = f.read()
compressed_document = [Document(page_content=data, metadata={
"source": "xml"
})]
compressed_document = [
Document(page_content=data, metadata={"source": "xml"})
]
elif self.input == "pdf_dir":
pass

elif not source.startswith("http"):
compressed_document = [Document(page_content=remover(source), metadata={
"source": "local_dir"
})]
compressed_document = [
Document(page_content=remover(source), metadata={"source": "local_dir"})
]

else:
if self.node_config is not None and self.node_config.get("endpoint") is not None:
loader_kwargs = {}

loader = AsyncChromiumLoader(
[source],
proxies={"http": self.node_config["endpoint"]},
headless=self.headless,
)
else:
loader = AsyncChromiumLoader(
[source],
headless=self.headless,
)
if self.node_config is not None:
loader_kwargs = self.node_config.get("loader_kwargs", {})

loader = ChromiumLoader([source], headless=self.headless, **loader_kwargs)

document = loader.load()
compressed_document = [
Document(page_content=remover(str(document[0].page_content)))]
Document(page_content=remover(str(document[0].page_content)))
]

state.update({self.output[0]: compressed_document})
return state

0 comments on commit 768719c

Please sign in to comment.