跳转到内容

网页

初始化文件。

AgentQL网页阅读器 #

基类:EventBasePydanticReader

使用或不使用 agentql 查询来抓取 URL,并以 JSON 格式返回文档。

参数:

名称 类型 描述 默认
api_key str

AgentQL API 密钥,请在 https://dev.agentql.com 获取

required
params dict

传递给 AgentQL API 的额外参数。请访问 https://docs.agentql.com/rest-api/api-reference 获取详细信息。

None
workflows/handler.py 中的源代码llama_index/readers/web/agentql_web/base.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class AgentQLWebReader(BasePydanticReader):
    """
    Scrape a URL with or without a agentql query and returns document in json format.

    Args:
        api_key (str): The AgentQL API key, get one at https://dev.agentql.com
        params (dict): Additional parameters to pass to the AgentQL API. Visit https://docs.agentql.com/rest-api/api-reference for details.

    """

    api_key: str
    params: Optional[dict]

    def __init__(
        self,
        api_key: str,
        params: Optional[dict] = None,
    ) -> None:
        super().__init__(api_key=api_key, params=params)

    def load_data(
        self, url: str, query: Optional[str] = None, prompt: Optional[str] = None
    ) -> List[Document]:
        """
        Load data from the input directory.

        Args:
            url (str): URL to scrape or crawl.
            query (Optional[str]): AgentQL query used to specify the scraped data.
            prompt (Optional[str]): Natural language description of the data you want to scrape.
            Either query or prompt must be provided.
            params (Optional[dict]): Additional parameters to pass to the AgentQL API. Visit https://docs.agentql.com/rest-api/api-reference for details.

        Returns:
            List[Document]: List of documents.

        """
        payload = {"url": url, "query": query, "prompt": prompt, "params": self.params}

        headers = {
            "X-API-Key": f"{self.api_key}",
            "Content-Type": "application/json",
            "X-TF-Request-Origin": REQUEST_ORIGIN,
        }

        try:
            response = httpx.post(
                QUERY_DATA_ENDPOINT,
                headers=headers,
                json=payload,
                timeout=API_TIMEOUT_SECONDS,
            )
            response.raise_for_status()

        except httpx.HTTPStatusError as e:
            response = e.response
            if response.status_code in [401, 403]:
                raise ValueError(
                    "Please, provide a valid API Key. You can create one at https://dev.agentql.com."
                ) from e
            else:
                try:
                    error_json = response.json()
                    msg = (
                        error_json["error_info"]
                        if "error_info" in error_json
                        else error_json["detail"]
                    )
                except (ValueError, TypeError):
                    msg = f"HTTP {e}."
                raise ValueError(msg) from e
        else:
            json = response.json()

            return [Document(text=str(json["data"]), metadata=json["metadata"])]

load_data #

load_data(url: str, query: Optional[str] = None, prompt: Optional[str] = None) -> List[文档]

从输入目录加载数据。

参数:

名称 类型 描述 默认
url str

要抓取或爬取的URL。

required
query Optional[str]

用于指定抓取数据的AgentQL查询。

None
prompt Optional[str]

您想要抓取的数据的自然语言描述。

None
params Optional[dict]

传递给 AgentQL API 的附加参数。请访问 https://docs.agentql.com/rest-api/api-reference 获取详细信息。

required

返回:

类型 描述
List[文档]

List[Document]: 文档列表。

workflows/handler.py 中的源代码llama_index/readers/web/agentql_web/base.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def load_data(
    self, url: str, query: Optional[str] = None, prompt: Optional[str] = None
) -> List[Document]:
    """
    Load data from the input directory.

    Args:
        url (str): URL to scrape or crawl.
        query (Optional[str]): AgentQL query used to specify the scraped data.
        prompt (Optional[str]): Natural language description of the data you want to scrape.
        Either query or prompt must be provided.
        params (Optional[dict]): Additional parameters to pass to the AgentQL API. Visit https://docs.agentql.com/rest-api/api-reference for details.

    Returns:
        List[Document]: List of documents.

    """
    payload = {"url": url, "query": query, "prompt": prompt, "params": self.params}

    headers = {
        "X-API-Key": f"{self.api_key}",
        "Content-Type": "application/json",
        "X-TF-Request-Origin": REQUEST_ORIGIN,
    }

    try:
        response = httpx.post(
            QUERY_DATA_ENDPOINT,
            headers=headers,
            json=payload,
            timeout=API_TIMEOUT_SECONDS,
        )
        response.raise_for_status()

    except httpx.HTTPStatusError as e:
        response = e.response
        if response.status_code in [401, 403]:
            raise ValueError(
                "Please, provide a valid API Key. You can create one at https://dev.agentql.com."
            ) from e
        else:
            try:
                error_json = response.json()
                msg = (
                    error_json["error_info"]
                    if "error_info" in error_json
                    else error_json["detail"]
                )
            except (ValueError, TypeError):
                msg = f"HTTP {e}."
            raise ValueError(msg) from e
    else:
        json = response.json()

        return [Document(text=str(json["data"]), metadata=json["metadata"])]

异步网页读取器 #

基类:EventBaseReader

异步网页阅读器。

异步读取网页内容。

参数:

名称 类型 描述 默认
html_to_text bool

是否将HTML转换为文本。 需要 html2text 包。

False
limit int

最大并发请求数。

10
dedupe bool

如果给定列表中存在完全匹配项,则对URL进行去重

True
fail_on_error bool

如果请求的URL未返回状态码200,该程序将引发ValueError

False
workflows/handler.py 中的源代码llama_index/readers/web/async_web/base.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class AsyncWebPageReader(BaseReader):
    """
    Asynchronous web page reader.

    Reads pages from the web asynchronously.

    Args:
        html_to_text (bool): Whether to convert HTML to text.
            Requires `html2text` package.
        limit (int): Maximum number of concurrent requests.
        dedupe (bool): to deduplicate urls if there is exact-match within given list
        fail_on_error (bool): if requested url does not return status code 200 the routine will raise an ValueError

    """

    def __init__(
        self,
        html_to_text: bool = False,
        limit: int = 10,
        dedupe: bool = True,
        fail_on_error: bool = False,
        timeout: Optional[int] = 60,
    ) -> None:
        """Initialize with parameters."""
        try:
            import html2text  # noqa: F401
        except ImportError:
            raise ImportError(
                "`html2text` package not found, please run `pip install html2text`"
            )
        try:
            import aiohttp  # noqa: F401
        except ImportError:
            raise ImportError(
                "`aiohttp` package not found, please run `pip install aiohttp`"
            )
        self._limit = limit
        self._html_to_text = html_to_text
        self._dedupe = dedupe
        self._fail_on_error = fail_on_error
        self._timeout = timeout

    async def aload_data(self, urls: List[str]) -> List[Document]:
        """
        Load data from the input urls.

        Args:
            urls (List[str]): List of URLs to scrape.

        Returns:
            List[Document]: List of documents.

        """
        if self._dedupe:
            urls = list(dict.fromkeys(urls))

        import aiohttp

        def chunked_http_client(limit: int):
            semaphore = asyncio.Semaphore(limit)

            async def http_get(url: str, session: aiohttp.ClientSession):
                async with semaphore:
                    async with session.get(url) as response:
                        return response, await response.text()

            return http_get

        async def fetch_urls(urls: List[str]):
            http_client = chunked_http_client(self._limit)

            timeout = (
                aiohttp.ClientTimeout(total=self._timeout) if self._timeout else None
            )

            async with aiohttp.ClientSession(timeout=timeout) as session:
                tasks = [http_client(url, session) for url in urls]
                return await asyncio.gather(*tasks, return_exceptions=True)

        if not isinstance(urls, list):
            raise ValueError("urls must be a list of strings.")

        documents = []
        responses = await fetch_urls(urls)

        for i, response_tuple in enumerate(responses):
            if not isinstance(response_tuple, tuple):
                if self._fail_on_error:
                    raise ValueError(f"Error fetching {urls[i]}")
                continue

            response, raw_page = response_tuple

            if response.status != 200:
                logger.warning(f"Error fetching page from {urls[i]}")
                logger.info(response)

                if self._fail_on_error:
                    raise ValueError(
                        f"Error fetching page from {urls[i]}. server returned status:"
                        f" {response.status} and response {raw_page}"
                    )

                continue

            if self._html_to_text:
                import html2text

                response_text = html2text.html2text(raw_page)
            else:
                response_text = raw_page

            documents.append(
                Document(text=response_text, extra_info={"Source": str(response.url)})
            )

        return documents

    def load_data(self, urls: List[str]) -> List[Document]:
        """
        Load data from the input urls.

        Args:
            urls (List[str]): List of URLs to scrape.

        Returns:
            List[Document]: List of documents.

        """
        return asyncio.run(self.aload_data(urls))

aload_data async #

aload_data(urls: List[str]) -> List[文档]

从输入的网址加载数据。

参数:

名称 类型 描述 默认
urls List[str]

要抓取的网址列表。

required

返回:

类型 描述
List[文档]

List[Document]: 文档列表。

workflows/handler.py 中的源代码llama_index/readers/web/async_web/base.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
async def aload_data(self, urls: List[str]) -> List[Document]:
    """
    Load data from the input urls.

    Args:
        urls (List[str]): List of URLs to scrape.

    Returns:
        List[Document]: List of documents.

    """
    if self._dedupe:
        urls = list(dict.fromkeys(urls))

    import aiohttp

    def chunked_http_client(limit: int):
        semaphore = asyncio.Semaphore(limit)

        async def http_get(url: str, session: aiohttp.ClientSession):
            async with semaphore:
                async with session.get(url) as response:
                    return response, await response.text()

        return http_get

    async def fetch_urls(urls: List[str]):
        http_client = chunked_http_client(self._limit)

        timeout = (
            aiohttp.ClientTimeout(total=self._timeout) if self._timeout else None
        )

        async with aiohttp.ClientSession(timeout=timeout) as session:
            tasks = [http_client(url, session) for url in urls]
            return await asyncio.gather(*tasks, return_exceptions=True)

    if not isinstance(urls, list):
        raise ValueError("urls must be a list of strings.")

    documents = []
    responses = await fetch_urls(urls)

    for i, response_tuple in enumerate(responses):
        if not isinstance(response_tuple, tuple):
            if self._fail_on_error:
                raise ValueError(f"Error fetching {urls[i]}")
            continue

        response, raw_page = response_tuple

        if response.status != 200:
            logger.warning(f"Error fetching page from {urls[i]}")
            logger.info(response)

            if self._fail_on_error:
                raise ValueError(
                    f"Error fetching page from {urls[i]}. server returned status:"
                    f" {response.status} and response {raw_page}"
                )

            continue

        if self._html_to_text:
            import html2text

            response_text = html2text.html2text(raw_page)
        else:
            response_text = raw_page

        documents.append(
            Document(text=response_text, extra_info={"Source": str(response.url)})
        )

    return documents

load_data #

load_data(urls: List[str]) -> List[文档]

从输入的网址加载数据。

参数:

名称 类型 描述 默认
urls List[str]

要抓取的网址列表。

required

返回:

类型 描述
List[文档]

List[Document]: 文档列表。

workflows/handler.py 中的源代码llama_index/readers/web/async_web/base.py
129
130
131
132
133
134
135
136
137
138
139
140
def load_data(self, urls: List[str]) -> List[Document]:
    """
    Load data from the input urls.

    Args:
        urls (List[str]): List of URLs to scrape.

    Returns:
        List[Document]: List of documents.

    """
    return asyncio.run(self.aload_data(urls))

BeautifulSoup网页阅读器 #

基类:EventBasePydanticReader

BeautifulSoup 网页阅读器。

从网页读取页面。 需要 bs4urllib 软件包。

参数:

名称 类型 描述 默认
website_extractor Optional[Dict[str, Callable]]

网站主机名(例如 google.com)到指定如何从 BeautifulSoup 对象提取文本的函数的映射。请参阅 DEFAULT_WEBSITE_EXTRACTOR。

None
workflows/handler.py 中的源代码llama_index/readers/web/beautiful_soup_web/base.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class BeautifulSoupWebReader(BasePydanticReader):
    """
    BeautifulSoup web page reader.

    Reads pages from the web.
    Requires the `bs4` and `urllib` packages.

    Args:
        website_extractor (Optional[Dict[str, Callable]]): A mapping of website
            hostname (e.g. google.com) to a function that specifies how to
            extract text from the BeautifulSoup obj. See DEFAULT_WEBSITE_EXTRACTOR.

    """

    is_remote: bool = True
    _website_extractor: Dict[str, Callable] = PrivateAttr()

    def __init__(self, website_extractor: Optional[Dict[str, Callable]] = None) -> None:
        super().__init__()
        self._website_extractor = website_extractor or DEFAULT_WEBSITE_EXTRACTOR

    @classmethod
    def class_name(cls) -> str:
        """Get the name identifier of the class."""
        return "BeautifulSoupWebReader"

    def load_data(
        self,
        urls: List[str],
        custom_hostname: Optional[str] = None,
        include_url_in_text: Optional[bool] = True,
    ) -> List[Document]:
        """
        Load data from the urls.

        Args:
            urls (List[str]): List of URLs to scrape.
            custom_hostname (Optional[str]): Force a certain hostname in the case
                a website is displayed under custom URLs (e.g. Substack blogs)
            include_url_in_text (Optional[bool]): Include the reference url in the text of the document

        Returns:
            List[Document]: List of documents.

        """
        from urllib.parse import urlparse

        import requests
        from bs4 import BeautifulSoup

        documents = []
        for url in urls:
            try:
                page = requests.get(url)
            except Exception:
                raise ValueError(f"One of the inputs is not a valid url: {url}")

            hostname = custom_hostname or urlparse(url).hostname or ""

            soup = BeautifulSoup(page.content, "html.parser")

            data = ""
            extra_info = {"URL": url}
            if hostname in self._website_extractor:
                data, metadata = self._website_extractor[hostname](
                    soup=soup, url=url, include_url_in_text=include_url_in_text
                )
                extra_info.update(metadata)

            else:
                data = soup.getText()

            documents.append(
                Document(text=data, id_=str(uuid.uuid4()), extra_info=extra_info)
            )

        return documents

class_name classmethod #

class_name() -> str

获取类的名称标识符。

workflows/handler.py 中的源代码llama_index/readers/web/beautiful_soup_web/base.py
157
158
159
160
@classmethod
def class_name(cls) -> str:
    """Get the name identifier of the class."""
    return "BeautifulSoupWebReader"

load_data #

load_data(urls: List[str], custom_hostname: Optional[str] = None, include_url_in_text: Optional[bool] = True) -> List[文档]

从网址加载数据。

参数:

名称 类型 描述 默认
urls List[str]

要抓取的网址列表。

required
custom_hostname Optional[str]

在网站通过自定义URL(例如Substack博客)显示的情况下,强制使用特定主机名

None
include_url_in_text Optional[bool]

在文档文本中包含参考网址

True

返回:

类型 描述
List[文档]

List[Document]: 文档列表。

workflows/handler.py 中的源代码llama_index/readers/web/beautiful_soup_web/base.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def load_data(
    self,
    urls: List[str],
    custom_hostname: Optional[str] = None,
    include_url_in_text: Optional[bool] = True,
) -> List[Document]:
    """
    Load data from the urls.

    Args:
        urls (List[str]): List of URLs to scrape.
        custom_hostname (Optional[str]): Force a certain hostname in the case
            a website is displayed under custom URLs (e.g. Substack blogs)
        include_url_in_text (Optional[bool]): Include the reference url in the text of the document

    Returns:
        List[Document]: List of documents.

    """
    from urllib.parse import urlparse

    import requests
    from bs4 import BeautifulSoup

    documents = []
    for url in urls:
        try:
            page = requests.get(url)
        except Exception:
            raise ValueError(f"One of the inputs is not a valid url: {url}")

        hostname = custom_hostname or urlparse(url).hostname or ""

        soup = BeautifulSoup(page.content, "html.parser")

        data = ""
        extra_info = {"URL": url}
        if hostname in self._website_extractor:
            data, metadata = self._website_extractor[hostname](
                soup=soup, url=url, include_url_in_text=include_url_in_text
            )
            extra_info.update(metadata)

        else:
            data = soup.getText()

        documents.append(
            Document(text=data, id_=str(uuid.uuid4()), extra_info=extra_info)
        )

    return documents

浏览器基础网页阅读器 #

基类:EventBaseReader

BrowserbaseWebReader。

使用托管在Browserbase上的无头浏览器加载预渲染的网页。 依赖于 browserbase 包。 从 https://browserbase.com 获取您的API密钥

workflows/handler.py 中的源代码llama_index/readers/web/browserbase_web/base.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class BrowserbaseWebReader(BaseReader):
    """
    BrowserbaseWebReader.

    Load pre-rendered web pages using a headless browser hosted on Browserbase.
    Depends on `browserbase` package.
    Get your API key from https://browserbase.com
    """

    def __init__(
        self,
        api_key: Optional[str] = None,
        project_id: Optional[str] = None,
    ) -> None:
        try:
            from browserbase import Browserbase
        except ImportError:
            raise ImportError(
                "`browserbase` package not found, please run `pip install browserbase`"
            )

        self.browserbase = Browserbase(api_key, project_id)

    def lazy_load_data(
        self,
        urls: Sequence[str],
        text_content: bool = False,
        session_id: Optional[str] = None,
        proxy: Optional[bool] = None,
    ) -> Iterator[Document]:
        """Load pages from URLs."""
        pages = self.browserbase.load_urls(urls, text_content, session_id, proxy)

        for i, page in enumerate(pages):
            yield Document(
                text=page,
                metadata={
                    "url": urls[i],
                },
            )

lazy_load_data #

lazy_load_data(urls: Sequence[str], text_content: bool = False, session_id: Optional[str] = None, proxy: Optional[bool] = None) -> Iterator[文档]

从URL加载页面。

workflows/handler.py 中的源代码llama_index/readers/web/browserbase_web/base.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def lazy_load_data(
    self,
    urls: Sequence[str],
    text_content: bool = False,
    session_id: Optional[str] = None,
    proxy: Optional[bool] = None,
) -> Iterator[Document]:
    """Load pages from URLs."""
    pages = self.browserbase.load_urls(urls, text_content, session_id, proxy)

    for i, page in enumerate(pages):
        yield Document(
            text=page,
            metadata={
                "url": urls[i],
            },
        )

FireCrawl网页阅读器 #

基类:EventBasePydanticReader

使用 Firecrawl.dev 将网址转换为LLM可访问的Markdown格式。

参数:

名称 类型 描述 默认
api_key str

Firecrawl API 密钥。

required
api_url Optional[str]

Firecrawl 部署的可选基础 URL

None
mode Optional[str]

加载器运行的模式。默认为"crawl"。 选项包括"scrape"(单网址)、 "crawl"(所有可访问的子页面)、 "map"(映射所有可访问的子页面)、 "search"(搜索内容),以及 "extract"(使用提示从网址提取结构化数据)。

'crawl'
params Optional[dict]

传递给 Firecrawl API 的参数。

None

示例包括爬虫选项。 更多详情请访问:https://docs.firecrawl.dev/sdks/python

workflows/handler.py 中的源代码llama_index/readers/web/firecrawl_web/base.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
class FireCrawlWebReader(BasePydanticReader):
    """
    turn a url to llm accessible markdown with `Firecrawl.dev`.

    Args:
        api_key (str): The Firecrawl API key.
        api_url (Optional[str]): Optional base URL for Firecrawl deployment
        mode (Optional[str]):
            The mode to run the loader in. Default is "crawl".
            Options include "scrape" (single url),
            "crawl" (all accessible sub pages),
            "map" (map all accessible sub pages),
            "search" (search for content), and
            "extract" (extract structured data from URLs using a prompt).
        params (Optional[dict]): The parameters to pass to the Firecrawl API.

    Examples include crawlerOptions.
    For more details, visit: https://docs.firecrawl.dev/sdks/python

    """

    firecrawl: Any
    api_key: str
    api_url: Optional[str]
    mode: Optional[str]
    params: Optional[dict]

    _metadata_fn: Optional[Callable[[str], Dict]] = PrivateAttr()

    # --------------------
    # Aux methods (init)
    # --------------------
    def _import_firecrawl(self) -> Any:
        try:
            from firecrawl import Firecrawl  # type: ignore
        except Exception as exc:
            raise ImportError(
                "firecrawl not found, please run `pip install 'firecrawl-py>=4.3.3'`"
            ) from exc
        return Firecrawl

    def _init_client(self, api_key: str, api_url: Optional[str]) -> Any:
        Firecrawl = self._import_firecrawl()
        client_kwargs: Dict[str, Any] = {"api_key": api_key}
        if api_url is not None:
            client_kwargs["api_url"] = api_url
        return Firecrawl(**client_kwargs)

    def _params_copy(self) -> Dict[str, Any]:
        params: Dict[str, Any] = self.params.copy() if self.params else {}
        return params

    # --------------------
    # Aux helpers (common)
    # --------------------
    def _safe_get_attr(self, obj: Any, *names: str) -> Optional[Any]:
        for name in names:
            try:
                val = getattr(obj, name, None)
            except Exception:
                val = None
            if val:
                return val
        return None

    def _to_dict_best_effort(self, obj: Any) -> Dict[str, Any]:
        # pydantic v2
        if hasattr(obj, "model_dump") and callable(obj.model_dump):
            try:
                return obj.model_dump()  # type: ignore[attr-defined]
            except Exception:
                pass
        # pydantic v1
        if hasattr(obj, "dict") and callable(obj.dict):
            try:
                return obj.dict()  # type: ignore[attr-defined]
            except Exception:
                pass
        # dataclass or simple object
        if hasattr(obj, "__dict__"):
            try:
                return {k: v for k, v in vars(obj).items() if not k.startswith("_")}
            except Exception:
                pass
        # reflect over attributes
        result: Dict[str, Any] = {}
        try:
            for attr in dir(obj):
                if attr.startswith("_"):
                    continue
                try:
                    val = getattr(obj, attr)
                except Exception:
                    continue
                if callable(val):
                    continue
                result[attr] = val
        except Exception:
            pass
        return result

    # --------------------
    # Aux handlers (SCRAPE)
    # --------------------
    def _scrape_get_first(self, data_obj: Dict[str, Any], *keys: str) -> Optional[Any]:
        for k in keys:
            if isinstance(data_obj, dict) and k in data_obj and data_obj.get(k):
                return data_obj.get(k)
        return None

    def _scrape_from_dict(
        self, firecrawl_docs: Dict[str, Any]
    ) -> (str, Dict[str, Any]):
        data_obj = firecrawl_docs.get("data", firecrawl_docs)
        text_value = (
            self._scrape_get_first(
                data_obj,
                "markdown",
                "content",
                "html",
                "raw_html",
                "rawHtml",
                "summary",
            )
            or ""
        )

        meta_obj = data_obj.get("metadata", {}) if isinstance(data_obj, dict) else {}
        metadata_value: Dict[str, Any] = {}

        if isinstance(meta_obj, dict):
            metadata_value = meta_obj
        else:
            try:
                metadata_value = self._to_dict_best_effort(meta_obj)
            except Exception:
                metadata_value = {"metadata": str(meta_obj)}

        if isinstance(data_obj, dict):
            for extra_key in (
                "links",
                "actions",
                "screenshot",
                "warning",
                "changeTracking",
            ):
                if extra_key in data_obj and data_obj.get(extra_key) is not None:
                    metadata_value[extra_key] = data_obj.get(extra_key)

        if "success" in firecrawl_docs:
            metadata_value["success"] = firecrawl_docs.get("success")
        if "warning" in firecrawl_docs and firecrawl_docs.get("warning") is not None:
            metadata_value["warning_top"] = firecrawl_docs.get("warning")

        return text_value, metadata_value

    def _scrape_from_obj(self, firecrawl_docs: Any) -> (str, Dict[str, Any]):
        text_value = (
            self._safe_get_attr(
                firecrawl_docs,
                "markdown",
                "content",
                "html",
                "raw_html",
                "summary",
            )
            or ""
        )

        meta_obj = getattr(firecrawl_docs, "metadata", None)
        metadata_value: Dict[str, Any] = {}
        if meta_obj is not None:
            try:
                metadata_value = self._to_dict_best_effort(meta_obj)
            except Exception:
                metadata_value = {"metadata": str(meta_obj)}

        for extra_attr in (
            "links",
            "actions",
            "screenshot",
            "warning",
            "change_tracking",
        ):
            try:
                extra_val = getattr(firecrawl_docs, extra_attr, None)
            except Exception:
                extra_val = None
            if extra_val is not None:
                metadata_value[extra_attr] = extra_val

        return text_value, metadata_value

    def _handle_scrape_response(self, firecrawl_docs: Any) -> (str, Dict[str, Any]):
        if isinstance(firecrawl_docs, dict):
            return self._scrape_from_dict(firecrawl_docs)
        else:
            return self._scrape_from_obj(firecrawl_docs)

    # --------------------
    # Aux handlers (CRAWL)
    # --------------------
    def _normalize_crawl_response(self, firecrawl_docs: Any) -> List[Dict[str, Any]]:
        return firecrawl_docs.get("data", firecrawl_docs)

    # --------------------
    # Aux handlers (MAP)
    # --------------------
    def _handle_map_error_or_links(self, response: Any, url: str) -> List[Document]:
        docs: List[Document] = []
        if (
            isinstance(response, dict)
            and "error" in response
            and not response.get("success", False)
        ):
            error_message = response.get("error", "Unknown error")
            docs.append(
                Document(
                    text=f"Map request failed: {error_message}",
                    metadata={"source": "map", "url": url, "error": error_message},
                )
            )
            return docs

        links = response.links or []
        for link in links:
            link_url = link.url
            title = link.title
            description = link.description
            text_content = title or description or link_url
            docs.append(
                Document(
                    text=text_content,
                    metadata={
                        "source": "map",
                        "url": link_url,
                        "title": title,
                        "description": description,
                    },
                )
            )
        return docs

    # --------------------
    # Aux handlers (SEARCH)
    # --------------------
    def _process_search_dict(
        self, search_response: Dict[str, Any], query: str
    ) -> List[Document]:
        documents: List[Document] = []
        if search_response.get("success", False):
            search_results = search_response.get("data", [])
            for result in search_results:
                text = result.get("markdown", "")
                if not text:
                    text = result.get("description", "")
                metadata = {
                    "title": result.get("title", ""),
                    "url": result.get("url", ""),
                    "description": result.get("description", ""),
                    "source": "search",
                    "query": query,
                }
                if "metadata" in result and isinstance(result["metadata"], dict):
                    metadata.update(result["metadata"])
                documents.append(Document(text=text, metadata=metadata))
        else:
            warning = search_response.get("warning", "Unknown error")
            print(f"Search was unsuccessful: {warning}")
            documents.append(
                Document(
                    text=f"Search for '{query}' was unsuccessful: {warning}",
                    metadata={"source": "search", "query": query, "error": warning},
                )
            )
        return documents

    def _process_search_items(
        self, result_list: Any, result_type: str, query: str
    ) -> List[Document]:
        docs: List[Document] = []
        if not result_list:
            return docs
        for item in result_list:
            item_url = getattr(item, "url", "")
            item_title = getattr(item, "title", "")
            item_description = getattr(item, "description", "")
            text_content = item_title or item_description or item_url

            metadata = {
                "title": item_title,
                "url": item_url,
                "description": item_description,
                "source": "search",
                "search_type": result_type,
                "query": query,
            }
            base_keys = set(metadata.keys())
            extra_attrs = self._to_dict_best_effort(item)
            for k, v in extra_attrs.items():
                if k not in base_keys:
                    metadata[k] = v
            docs.append(Document(text=text_content, metadata=metadata))
        return docs

    def _process_search_sdk(self, search_response: Any, query: str) -> List[Document]:
        documents: List[Document] = []
        documents += self._process_search_items(
            getattr(search_response, "web", None), "web", query
        )  # type: ignore[attr-defined]
        documents += self._process_search_items(
            getattr(search_response, "news", None), "news", query
        )  # type: ignore[attr-defined]
        documents += self._process_search_items(
            getattr(search_response, "images", None), "images", query
        )  # type: ignore[attr-defined]
        return documents

    # --------------------
    # Aux handlers (EXTRACT)
    # --------------------
    def _format_extract_text(self, extract_data: Dict[str, Any]) -> str:
        text_parts = []
        for key, value in extract_data.items():
            text_parts.append(f"{key}: {value}")
        return "\n".join(text_parts)

    # --------------------
    # __init__ (unchanged behavior)
    # --------------------
    def __init__(
        self,
        api_key: str,
        api_url: Optional[str] = None,
        mode: Optional[str] = "crawl",
        params: Optional[dict] = None,
    ) -> None:
        """Initialize with parameters."""
        # Ensure firecrawl client is installed and instantiate
        try:
            from firecrawl import Firecrawl  # type: ignore
        except Exception as exc:
            raise ImportError(
                "firecrawl not found, please run `pip install 'firecrawl-py>=4.3.3'`"
            ) from exc

        # Instantiate the new Firecrawl client
        client_kwargs: Dict[str, Any] = {"api_key": api_key}
        if api_url is not None:
            client_kwargs["api_url"] = api_url

        firecrawl = Firecrawl(**client_kwargs)

        params = params or {}
        params["integration"] = "llamaindex"

        super().__init__(
            firecrawl=firecrawl,
            api_key=api_key,
            api_url=api_url,
            mode=mode,
            params=params,
        )

    @classmethod
    def class_name(cls) -> str:
        return "Firecrawl_reader"

    def load_data(
        self,
        url: Optional[str] = None,
        query: Optional[str] = None,
        urls: Optional[List[str]] = None,
    ) -> List[Document]:
        """
        Load data from the input directory.

        Args:
            url (Optional[str]): URL to scrape or crawl.
            query (Optional[str]): Query to search for.
            urls (Optional[List[str]]): List of URLs for extract mode.

        Returns:
            List[Document]: List of documents.

        Raises:
            ValueError: If invalid combination of parameters is provided.

        """
        if sum(x is not None for x in [url, query, urls]) != 1:
            raise ValueError("Exactly one of url, query, or urls must be provided.")

        documents = []

        if self.mode == "scrape":
            # [SCRAPE] params: https://docs.firecrawl.dev/api-reference/endpoint/scrape
            if url is None:
                raise ValueError("URL must be provided for scrape mode.")
            # Map params to new client call signature
            scrape_params = self._params_copy()
            firecrawl_docs = self.firecrawl.scrape(url, **scrape_params)
            # Support both dict and SDK object responses
            text_value = ""
            metadata_value: Dict[str, Any] = {}

            if isinstance(firecrawl_docs, dict):
                # Newer API may return { success, data: {...} }
                data_obj = firecrawl_docs.get("data", firecrawl_docs)

                def _get_first(*keys: str) -> Optional[Any]:
                    for k in keys:
                        if (
                            isinstance(data_obj, dict)
                            and k in data_obj
                            and data_obj.get(k)
                        ):
                            return data_obj.get(k)
                    return None

                text_value = (
                    _get_first(
                        "markdown", "content", "html", "raw_html", "rawHtml", "summary"
                    )
                    or ""
                )

                meta_obj = (
                    data_obj.get("metadata", {}) if isinstance(data_obj, dict) else {}
                )
                if isinstance(meta_obj, dict):
                    metadata_value = meta_obj
                else:
                    # Convert metadata object to dict if needed
                    try:
                        if hasattr(meta_obj, "model_dump") and callable(
                            meta_obj.model_dump
                        ):
                            metadata_value = meta_obj.model_dump()  # type: ignore[attr-defined]
                        elif hasattr(meta_obj, "dict") and callable(meta_obj.dict):
                            metadata_value = meta_obj.dict()  # type: ignore[attr-defined]
                        elif hasattr(meta_obj, "__dict__"):
                            metadata_value = {
                                k: v
                                for k, v in vars(meta_obj).items()
                                if not k.startswith("_")
                            }
                    except Exception:
                        metadata_value = {"metadata": str(meta_obj)}

                # Capture other helpful fields into metadata
                if isinstance(data_obj, dict):
                    for extra_key in (
                        "links",
                        "actions",
                        "screenshot",
                        "warning",
                        "changeTracking",
                    ):
                        if (
                            extra_key in data_obj
                            and data_obj.get(extra_key) is not None
                        ):
                            metadata_value[extra_key] = data_obj.get(extra_key)
                # Bubble up success/warning if at top-level
                if "success" in firecrawl_docs:
                    metadata_value["success"] = firecrawl_docs.get("success")
                if (
                    "warning" in firecrawl_docs
                    and firecrawl_docs.get("warning") is not None
                ):
                    metadata_value["warning_top"] = firecrawl_docs.get("warning")
            else:
                # SDK object with attributes
                def _safe_get(obj: Any, *names: str) -> Optional[Any]:
                    for name in names:
                        try:
                            val = getattr(obj, name, None)
                        except Exception:
                            val = None
                        if val:
                            return val
                    return None

                text_value = (
                    _safe_get(
                        firecrawl_docs,
                        "markdown",
                        "content",
                        "html",
                        "raw_html",
                        "summary",
                    )
                    or ""
                )

                meta_obj = getattr(firecrawl_docs, "metadata", None)
                if meta_obj is not None:
                    try:
                        if hasattr(meta_obj, "model_dump") and callable(
                            meta_obj.model_dump
                        ):
                            metadata_value = meta_obj.model_dump()  # type: ignore[attr-defined]
                        elif hasattr(meta_obj, "dict") and callable(meta_obj.dict):
                            metadata_value = meta_obj.dict()  # type: ignore[attr-defined]
                        elif hasattr(meta_obj, "__dict__"):
                            metadata_value = {
                                k: v
                                for k, v in vars(meta_obj).items()
                                if not k.startswith("_")
                            }
                        else:
                            metadata_value = {"metadata": str(meta_obj)}
                    except Exception:
                        metadata_value = {"metadata": str(meta_obj)}

                # Attach extra top-level attributes if present on SDK object
                for extra_attr in (
                    "links",
                    "actions",
                    "screenshot",
                    "warning",
                    "change_tracking",
                ):
                    try:
                        extra_val = getattr(firecrawl_docs, extra_attr, None)
                    except Exception:
                        extra_val = None
                    if extra_val is not None:
                        metadata_value[extra_attr] = extra_val

            documents.append(Document(text=text_value or "", metadata=metadata_value))
        elif self.mode == "crawl":
            # [CRAWL] params: https://docs.firecrawl.dev/api-reference/endpoint/crawl-post
            if url is None:
                raise ValueError("URL must be provided for crawl mode.")
            crawl_params = self._params_copy()
            # Remove deprecated/unsupported parameters
            if "maxDepth" in crawl_params:
                crawl_params.pop("maxDepth", None)
            firecrawl_docs = self.firecrawl.crawl(url, **crawl_params)
            # Normalize Crawl response across SDK versions
            items: List[Any] = []
            if isinstance(firecrawl_docs, dict):
                data = firecrawl_docs.get("data", firecrawl_docs)
                if isinstance(data, list):
                    items = data
            else:
                # Try common list-bearing attributes first
                for attr_name in ("data", "results", "documents", "items", "pages"):
                    try:
                        candidate = getattr(firecrawl_docs, attr_name, None)
                    except Exception:
                        candidate = None
                    if isinstance(candidate, list) and candidate:
                        items = candidate
                        break
                # Fallback to model dump reflection
                if not items:
                    try:
                        if hasattr(firecrawl_docs, "model_dump") and callable(
                            firecrawl_docs.model_dump
                        ):
                            dump_obj = firecrawl_docs.model_dump()  # type: ignore[attr-defined]
                        elif hasattr(firecrawl_docs, "dict") and callable(
                            firecrawl_docs.dict
                        ):
                            dump_obj = firecrawl_docs.dict()  # type: ignore[attr-defined]
                        else:
                            dump_obj = {}
                    except Exception:
                        dump_obj = {}
                    if isinstance(dump_obj, dict):
                        data = (
                            dump_obj.get("data")
                            or dump_obj.get("results")
                            or dump_obj.get("documents")
                        )
                        if isinstance(data, list):
                            items = data

            for doc in items:
                if isinstance(doc, dict):
                    text_val = (
                        doc.get("markdown")
                        or doc.get("content")
                        or doc.get("text")
                        or ""
                    )
                    metadata_val = doc.get("metadata", {})
                else:
                    text_val = (
                        self._safe_get_attr(
                            doc,
                            "markdown",
                            "content",
                            "text",
                            "html",
                            "raw_html",
                            "rawHtml",
                            "summary",
                        )
                        or ""
                    )
                    meta_obj = getattr(doc, "metadata", None)
                    if isinstance(meta_obj, dict):
                        metadata_val = meta_obj
                    elif meta_obj is not None:
                        try:
                            metadata_val = self._to_dict_best_effort(meta_obj)
                        except Exception:
                            metadata_val = {"metadata": str(meta_obj)}
                    else:
                        metadata_val = {}
                documents.append(Document(text=text_val, metadata=metadata_val))
        elif self.mode == "map":
            # [MAP] params: https://docs.firecrawl.dev/api-reference/endpoint/map
            # Expected response: { "success": true, "links": [{"url":..., "title":..., "description":...}, ...] }
            if url is None:
                raise ValueError("URL must be provided for map mode.")

            map_params = self._params_copy()
            # Pass through optional parameters like sitemap, includeSubdomains, ignoreQueryParameters, limit, timeout, search
            response = self.firecrawl.map(url, **map_params)  # type: ignore[attr-defined]

            # Handle error response format: { "error": "..." }
            if (
                isinstance(response, dict)
                and "error" in response
                and not response.get("success", False)
            ):
                error_message = response.get("error", "Unknown error")
                documents.append(
                    Document(
                        text=f"Map request failed: {error_message}",
                        metadata={"source": "map", "url": url, "error": error_message},
                    )
                )
                return documents

            # Extract links from success response
            links = response.links or []

            for link in links:
                link_url = link.url
                title = link.title
                description = link.description
                text_content = title or description or link_url
                documents.append(
                    Document(
                        text=text_content,
                        metadata={
                            "source": "map",
                            "url": link_url,
                            "title": title,
                            "description": description,
                        },
                    )
                )
        elif self.mode == "search":
            # [SEARCH] params: https://docs.firecrawl.dev/api-reference/endpoint/search
            if query is None:
                raise ValueError("Query must be provided for search mode.")

            # Remove query from params if it exists to avoid duplicate
            search_params = self._params_copy()
            if "query" in search_params:
                del search_params["query"]

            # Get search results
            search_response = self.firecrawl.search(query, **search_params)

            # Handle the search response format
            if isinstance(search_response, dict):
                # Check for success
                if search_response.get("success", False):
                    # Get the data array
                    search_results = search_response.get("data", [])

                    # Process each search result
                    for result in search_results:
                        # Extract text content (prefer markdown if available)
                        text = result.get("markdown", "")
                        if not text:
                            # Fall back to description if markdown is not available
                            text = result.get("description", "")

                        # Extract metadata
                        metadata = {
                            "title": result.get("title", ""),
                            "url": result.get("url", ""),
                            "description": result.get("description", ""),
                            "source": "search",
                            "query": query,
                        }

                        # Add additional metadata if available
                        if "metadata" in result and isinstance(
                            result["metadata"], dict
                        ):
                            metadata.update(result["metadata"])

                        # Create document
                        documents.append(
                            Document(
                                text=text,
                                metadata=metadata,
                            )
                        )
                else:
                    # Handle unsuccessful response
                    warning = search_response.get("warning", "Unknown error")
                    print(f"Search was unsuccessful: {warning}")
                    documents.append(
                        Document(
                            text=f"Search for '{query}' was unsuccessful: {warning}",
                            metadata={
                                "source": "search",
                                "query": query,
                                "error": warning,
                            },
                        )
                    )
            elif (
                hasattr(search_response, "web")
                or hasattr(search_response, "news")
                or hasattr(search_response, "images")
            ):
                # New SDK object response like: web=[SearchResultWeb(...)] news=None images=None
                def _process_results(result_list, result_type: str) -> None:
                    if not result_list:
                        return
                    for item in result_list:
                        # Try to access attributes with safe fallbacks
                        item_url = getattr(item, "url", "")
                        item_title = getattr(item, "title", "")
                        item_description = getattr(item, "description", "")
                        text_content = item_title or item_description or item_url

                        metadata = {
                            "title": item_title,
                            "url": item_url,
                            "description": item_description,
                            "source": "search",
                            "search_type": result_type,
                            "query": query,
                        }

                        # Collect all other attributes dynamically without whitelisting
                        base_keys = set(metadata.keys())

                        def _item_to_dict(obj: Any) -> Dict[str, Any]:
                            # pydantic v2
                            if hasattr(obj, "model_dump") and callable(obj.model_dump):
                                try:
                                    return obj.model_dump()  # type: ignore[attr-defined]
                                except Exception:
                                    pass
                            # pydantic v1
                            if hasattr(obj, "dict") and callable(obj.dict):
                                try:
                                    return obj.dict()  # type: ignore[attr-defined]
                                except Exception:
                                    pass
                            # dataclass or simple object
                            if hasattr(obj, "__dict__"):
                                try:
                                    return {
                                        k: v
                                        for k, v in vars(obj).items()
                                        if not k.startswith("_")
                                    }
                                except Exception:
                                    pass
                            # Fallback: reflect over attributes
                            result: Dict[str, Any] = {}
                            try:
                                for attr in dir(obj):
                                    if attr.startswith("_"):
                                        continue
                                    try:
                                        val = getattr(obj, attr)
                                    except Exception:
                                        continue
                                    if callable(val):
                                        continue
                                    result[attr] = val
                            except Exception:
                                pass
                            return result

                        extra_attrs = _item_to_dict(item)
                        for k, v in extra_attrs.items():
                            if k not in base_keys:
                                metadata[k] = v

                        documents.append(
                            Document(
                                text=text_content,
                                metadata=metadata,
                            )
                        )

                _process_results(getattr(search_response, "web", None), "web")  # type: ignore[attr-defined]
                _process_results(getattr(search_response, "news", None), "news")  # type: ignore[attr-defined]
                _process_results(getattr(search_response, "images", None), "images")  # type: ignore[attr-defined]
            else:
                # Handle unexpected response format
                print(f"Unexpected search response format: {type(search_response)}")
                documents.append(
                    Document(
                        text=str(search_response),
                        metadata={"source": "search", "query": query},
                    )
                )
        elif self.mode == "extract":
            # [EXTRACT] params: https://docs.firecrawl.dev/api-reference/endpoint/extract
            if urls is None:
                # For backward compatibility, convert single URL to list if provided
                if url is not None:
                    urls = [url]
                else:
                    raise ValueError("URLs must be provided for extract mode.")

            # Ensure we have a prompt in params
            extract_params = self._params_copy()
            if "prompt" not in extract_params:
                raise ValueError("A 'prompt' parameter is required for extract mode.")

            # Prepare the payload according to the new API structure
            payload = {"prompt": extract_params.pop("prompt")}
            payload["integration"] = "llamaindex"

            # Call the extract method with the urls and params
            extract_response = self.firecrawl.extract(urls=urls, **payload)

            # Handle the extract response format
            if isinstance(extract_response, dict):
                # Check for success
                if extract_response.get("success", False):
                    # Get the data from the response
                    extract_data = extract_response.get("data", {})

                    # Get the sources if available
                    sources = extract_response.get("sources", {})

                    # Convert the extracted data to text
                    if extract_data:
                        # Convert the data to a formatted string
                        text_parts = []
                        for key, value in extract_data.items():
                            text_parts.append(f"{key}: {value}")

                        text = "\n".join(text_parts)

                        # Create metadata
                        metadata = {
                            "urls": urls,
                            "source": "extract",
                            "status": extract_response.get("status"),
                            "expires_at": extract_response.get("expiresAt"),
                        }

                        # Add sources to metadata if available
                        if sources:
                            metadata["sources"] = sources

                        # Create document
                        documents.append(
                            Document(
                                text=text,
                                metadata=metadata,
                            )
                        )
                    else:
                        # Handle empty data in successful response
                        print("Extract response successful but no data returned")
                        documents.append(
                            Document(
                                text="Extraction was successful but no data was returned",
                                metadata={"urls": urls, "source": "extract"},
                            )
                        )
                else:
                    # Handle unsuccessful response
                    warning = extract_response.get("warning", "Unknown error")
                    print(f"Extraction was unsuccessful: {warning}")
                    documents.append(
                        Document(
                            text=f"Extraction was unsuccessful: {warning}",
                            metadata={
                                "urls": urls,
                                "source": "extract",
                                "error": warning,
                            },
                        )
                    )
            else:
                # Handle unexpected response format
                print(f"Unexpected extract response format: {type(extract_response)}")
                documents.append(
                    Document(
                        text=str(extract_response),
                        metadata={"urls": urls, "source": "extract"},
                    )
                )
        else:
            raise ValueError(
                "Invalid mode. Please choose 'scrape', 'crawl', 'search', or 'extract'."
            )

        return documents

load_data #

load_data(url: Optional[str] = None, query: Optional[str] = None, urls: Optional[List[str]] = None) -> List[文档]

从输入目录加载数据。

参数:

名称 类型 描述 默认
url Optional[str]

要抓取或爬取的URL。

None
query Optional[str]

要搜索的查询。

None
urls Optional[List[str]]

提取模式下的URL列表。

None

返回:

类型 描述
List[文档]

List[Document]: 文档列表。

引发:

类型 描述
ValueError

如果提供了无效的参数组合。

workflows/handler.py 中的源代码llama_index/readers/web/firecrawl_web/base.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
def load_data(
    self,
    url: Optional[str] = None,
    query: Optional[str] = None,
    urls: Optional[List[str]] = None,
) -> List[Document]:
    """
    Load data from the input directory.

    Args:
        url (Optional[str]): URL to scrape or crawl.
        query (Optional[str]): Query to search for.
        urls (Optional[List[str]]): List of URLs for extract mode.

    Returns:
        List[Document]: List of documents.

    Raises:
        ValueError: If invalid combination of parameters is provided.

    """
    if sum(x is not None for x in [url, query, urls]) != 1:
        raise ValueError("Exactly one of url, query, or urls must be provided.")

    documents = []

    if self.mode == "scrape":
        # [SCRAPE] params: https://docs.firecrawl.dev/api-reference/endpoint/scrape
        if url is None:
            raise ValueError("URL must be provided for scrape mode.")
        # Map params to new client call signature
        scrape_params = self._params_copy()
        firecrawl_docs = self.firecrawl.scrape(url, **scrape_params)
        # Support both dict and SDK object responses
        text_value = ""
        metadata_value: Dict[str, Any] = {}

        if isinstance(firecrawl_docs, dict):
            # Newer API may return { success, data: {...} }
            data_obj = firecrawl_docs.get("data", firecrawl_docs)

            def _get_first(*keys: str) -> Optional[Any]:
                for k in keys:
                    if (
                        isinstance(data_obj, dict)
                        and k in data_obj
                        and data_obj.get(k)
                    ):
                        return data_obj.get(k)
                return None

            text_value = (
                _get_first(
                    "markdown", "content", "html", "raw_html", "rawHtml", "summary"
                )
                or ""
            )

            meta_obj = (
                data_obj.get("metadata", {}) if isinstance(data_obj, dict) else {}
            )
            if isinstance(meta_obj, dict):
                metadata_value = meta_obj
            else:
                # Convert metadata object to dict if needed
                try:
                    if hasattr(meta_obj, "model_dump") and callable(
                        meta_obj.model_dump
                    ):
                        metadata_value = meta_obj.model_dump()  # type: ignore[attr-defined]
                    elif hasattr(meta_obj, "dict") and callable(meta_obj.dict):
                        metadata_value = meta_obj.dict()  # type: ignore[attr-defined]
                    elif hasattr(meta_obj, "__dict__"):
                        metadata_value = {
                            k: v
                            for k, v in vars(meta_obj).items()
                            if not k.startswith("_")
                        }
                except Exception:
                    metadata_value = {"metadata": str(meta_obj)}

            # Capture other helpful fields into metadata
            if isinstance(data_obj, dict):
                for extra_key in (
                    "links",
                    "actions",
                    "screenshot",
                    "warning",
                    "changeTracking",
                ):
                    if (
                        extra_key in data_obj
                        and data_obj.get(extra_key) is not None
                    ):
                        metadata_value[extra_key] = data_obj.get(extra_key)
            # Bubble up success/warning if at top-level
            if "success" in firecrawl_docs:
                metadata_value["success"] = firecrawl_docs.get("success")
            if (
                "warning" in firecrawl_docs
                and firecrawl_docs.get("warning") is not None
            ):
                metadata_value["warning_top"] = firecrawl_docs.get("warning")
        else:
            # SDK object with attributes
            def _safe_get(obj: Any, *names: str) -> Optional[Any]:
                for name in names:
                    try:
                        val = getattr(obj, name, None)
                    except Exception:
                        val = None
                    if val:
                        return val
                return None

            text_value = (
                _safe_get(
                    firecrawl_docs,
                    "markdown",
                    "content",
                    "html",
                    "raw_html",
                    "summary",
                )
                or ""
            )

            meta_obj = getattr(firecrawl_docs, "metadata", None)
            if meta_obj is not None:
                try:
                    if hasattr(meta_obj, "model_dump") and callable(
                        meta_obj.model_dump
                    ):
                        metadata_value = meta_obj.model_dump()  # type: ignore[attr-defined]
                    elif hasattr(meta_obj, "dict") and callable(meta_obj.dict):
                        metadata_value = meta_obj.dict()  # type: ignore[attr-defined]
                    elif hasattr(meta_obj, "__dict__"):
                        metadata_value = {
                            k: v
                            for k, v in vars(meta_obj).items()
                            if not k.startswith("_")
                        }
                    else:
                        metadata_value = {"metadata": str(meta_obj)}
                except Exception:
                    metadata_value = {"metadata": str(meta_obj)}

            # Attach extra top-level attributes if present on SDK object
            for extra_attr in (
                "links",
                "actions",
                "screenshot",
                "warning",
                "change_tracking",
            ):
                try:
                    extra_val = getattr(firecrawl_docs, extra_attr, None)
                except Exception:
                    extra_val = None
                if extra_val is not None:
                    metadata_value[extra_attr] = extra_val

        documents.append(Document(text=text_value or "", metadata=metadata_value))
    elif self.mode == "crawl":
        # [CRAWL] params: https://docs.firecrawl.dev/api-reference/endpoint/crawl-post
        if url is None:
            raise ValueError("URL must be provided for crawl mode.")
        crawl_params = self._params_copy()
        # Remove deprecated/unsupported parameters
        if "maxDepth" in crawl_params:
            crawl_params.pop("maxDepth", None)
        firecrawl_docs = self.firecrawl.crawl(url, **crawl_params)
        # Normalize Crawl response across SDK versions
        items: List[Any] = []
        if isinstance(firecrawl_docs, dict):
            data = firecrawl_docs.get("data", firecrawl_docs)
            if isinstance(data, list):
                items = data
        else:
            # Try common list-bearing attributes first
            for attr_name in ("data", "results", "documents", "items", "pages"):
                try:
                    candidate = getattr(firecrawl_docs, attr_name, None)
                except Exception:
                    candidate = None
                if isinstance(candidate, list) and candidate:
                    items = candidate
                    break
            # Fallback to model dump reflection
            if not items:
                try:
                    if hasattr(firecrawl_docs, "model_dump") and callable(
                        firecrawl_docs.model_dump
                    ):
                        dump_obj = firecrawl_docs.model_dump()  # type: ignore[attr-defined]
                    elif hasattr(firecrawl_docs, "dict") and callable(
                        firecrawl_docs.dict
                    ):
                        dump_obj = firecrawl_docs.dict()  # type: ignore[attr-defined]
                    else:
                        dump_obj = {}
                except Exception:
                    dump_obj = {}
                if isinstance(dump_obj, dict):
                    data = (
                        dump_obj.get("data")
                        or dump_obj.get("results")
                        or dump_obj.get("documents")
                    )
                    if isinstance(data, list):
                        items = data

        for doc in items:
            if isinstance(doc, dict):
                text_val = (
                    doc.get("markdown")
                    or doc.get("content")
                    or doc.get("text")
                    or ""
                )
                metadata_val = doc.get("metadata", {})
            else:
                text_val = (
                    self._safe_get_attr(
                        doc,
                        "markdown",
                        "content",
                        "text",
                        "html",
                        "raw_html",
                        "rawHtml",
                        "summary",
                    )
                    or ""
                )
                meta_obj = getattr(doc, "metadata", None)
                if isinstance(meta_obj, dict):
                    metadata_val = meta_obj
                elif meta_obj is not None:
                    try:
                        metadata_val = self._to_dict_best_effort(meta_obj)
                    except Exception:
                        metadata_val = {"metadata": str(meta_obj)}
                else:
                    metadata_val = {}
            documents.append(Document(text=text_val, metadata=metadata_val))
    elif self.mode == "map":
        # [MAP] params: https://docs.firecrawl.dev/api-reference/endpoint/map
        # Expected response: { "success": true, "links": [{"url":..., "title":..., "description":...}, ...] }
        if url is None:
            raise ValueError("URL must be provided for map mode.")

        map_params = self._params_copy()
        # Pass through optional parameters like sitemap, includeSubdomains, ignoreQueryParameters, limit, timeout, search
        response = self.firecrawl.map(url, **map_params)  # type: ignore[attr-defined]

        # Handle error response format: { "error": "..." }
        if (
            isinstance(response, dict)
            and "error" in response
            and not response.get("success", False)
        ):
            error_message = response.get("error", "Unknown error")
            documents.append(
                Document(
                    text=f"Map request failed: {error_message}",
                    metadata={"source": "map", "url": url, "error": error_message},
                )
            )
            return documents

        # Extract links from success response
        links = response.links or []

        for link in links:
            link_url = link.url
            title = link.title
            description = link.description
            text_content = title or description or link_url
            documents.append(
                Document(
                    text=text_content,
                    metadata={
                        "source": "map",
                        "url": link_url,
                        "title": title,
                        "description": description,
                    },
                )
            )
    elif self.mode == "search":
        # [SEARCH] params: https://docs.firecrawl.dev/api-reference/endpoint/search
        if query is None:
            raise ValueError("Query must be provided for search mode.")

        # Remove query from params if it exists to avoid duplicate
        search_params = self._params_copy()
        if "query" in search_params:
            del search_params["query"]

        # Get search results
        search_response = self.firecrawl.search(query, **search_params)

        # Handle the search response format
        if isinstance(search_response, dict):
            # Check for success
            if search_response.get("success", False):
                # Get the data array
                search_results = search_response.get("data", [])

                # Process each search result
                for result in search_results:
                    # Extract text content (prefer markdown if available)
                    text = result.get("markdown", "")
                    if not text:
                        # Fall back to description if markdown is not available
                        text = result.get("description", "")

                    # Extract metadata
                    metadata = {
                        "title": result.get("title", ""),
                        "url": result.get("url", ""),
                        "description": result.get("description", ""),
                        "source": "search",
                        "query": query,
                    }

                    # Add additional metadata if available
                    if "metadata" in result and isinstance(
                        result["metadata"], dict
                    ):
                        metadata.update(result["metadata"])

                    # Create document
                    documents.append(
                        Document(
                            text=text,
                            metadata=metadata,
                        )
                    )
            else:
                # Handle unsuccessful response
                warning = search_response.get("warning", "Unknown error")
                print(f"Search was unsuccessful: {warning}")
                documents.append(
                    Document(
                        text=f"Search for '{query}' was unsuccessful: {warning}",
                        metadata={
                            "source": "search",
                            "query": query,
                            "error": warning,
                        },
                    )
                )
        elif (
            hasattr(search_response, "web")
            or hasattr(search_response, "news")
            or hasattr(search_response, "images")
        ):
            # New SDK object response like: web=[SearchResultWeb(...)] news=None images=None
            def _process_results(result_list, result_type: str) -> None:
                if not result_list:
                    return
                for item in result_list:
                    # Try to access attributes with safe fallbacks
                    item_url = getattr(item, "url", "")
                    item_title = getattr(item, "title", "")
                    item_description = getattr(item, "description", "")
                    text_content = item_title or item_description or item_url

                    metadata = {
                        "title": item_title,
                        "url": item_url,
                        "description": item_description,
                        "source": "search",
                        "search_type": result_type,
                        "query": query,
                    }

                    # Collect all other attributes dynamically without whitelisting
                    base_keys = set(metadata.keys())

                    def _item_to_dict(obj: Any) -> Dict[str, Any]:
                        # pydantic v2
                        if hasattr(obj, "model_dump") and callable(obj.model_dump):
                            try:
                                return obj.model_dump()  # type: ignore[attr-defined]
                            except Exception:
                                pass
                        # pydantic v1
                        if hasattr(obj, "dict") and callable(obj.dict):
                            try:
                                return obj.dict()  # type: ignore[attr-defined]
                            except Exception:
                                pass
                        # dataclass or simple object
                        if hasattr(obj, "__dict__"):
                            try:
                                return {
                                    k: v
                                    for k, v in vars(obj).items()
                                    if not k.startswith("_")
                                }
                            except Exception:
                                pass
                        # Fallback: reflect over attributes
                        result: Dict[str, Any] = {}
                        try:
                            for attr in dir(obj):
                                if attr.startswith("_"):
                                    continue
                                try:
                                    val = getattr(obj, attr)
                                except Exception:
                                    continue
                                if callable(val):
                                    continue
                                result[attr] = val
                        except Exception:
                            pass
                        return result

                    extra_attrs = _item_to_dict(item)
                    for k, v in extra_attrs.items():
                        if k not in base_keys:
                            metadata[k] = v

                    documents.append(
                        Document(
                            text=text_content,
                            metadata=metadata,
                        )
                    )

            _process_results(getattr(search_response, "web", None), "web")  # type: ignore[attr-defined]
            _process_results(getattr(search_response, "news", None), "news")  # type: ignore[attr-defined]
            _process_results(getattr(search_response, "images", None), "images")  # type: ignore[attr-defined]
        else:
            # Handle unexpected response format
            print(f"Unexpected search response format: {type(search_response)}")
            documents.append(
                Document(
                    text=str(search_response),
                    metadata={"source": "search", "query": query},
                )
            )
    elif self.mode == "extract":
        # [EXTRACT] params: https://docs.firecrawl.dev/api-reference/endpoint/extract
        if urls is None:
            # For backward compatibility, convert single URL to list if provided
            if url is not None:
                urls = [url]
            else:
                raise ValueError("URLs must be provided for extract mode.")

        # Ensure we have a prompt in params
        extract_params = self._params_copy()
        if "prompt" not in extract_params:
            raise ValueError("A 'prompt' parameter is required for extract mode.")

        # Prepare the payload according to the new API structure
        payload = {"prompt": extract_params.pop("prompt")}
        payload["integration"] = "llamaindex"

        # Call the extract method with the urls and params
        extract_response = self.firecrawl.extract(urls=urls, **payload)

        # Handle the extract response format
        if isinstance(extract_response, dict):
            # Check for success
            if extract_response.get("success", False):
                # Get the data from the response
                extract_data = extract_response.get("data", {})

                # Get the sources if available
                sources = extract_response.get("sources", {})

                # Convert the extracted data to text
                if extract_data:
                    # Convert the data to a formatted string
                    text_parts = []
                    for key, value in extract_data.items():
                        text_parts.append(f"{key}: {value}")

                    text = "\n".join(text_parts)

                    # Create metadata
                    metadata = {
                        "urls": urls,
                        "source": "extract",
                        "status": extract_response.get("status"),
                        "expires_at": extract_response.get("expiresAt"),
                    }

                    # Add sources to metadata if available
                    if sources:
                        metadata["sources"] = sources

                    # Create document
                    documents.append(
                        Document(
                            text=text,
                            metadata=metadata,
                        )
                    )
                else:
                    # Handle empty data in successful response
                    print("Extract response successful but no data returned")
                    documents.append(
                        Document(
                            text="Extraction was successful but no data was returned",
                            metadata={"urls": urls, "source": "extract"},
                        )
                    )
            else:
                # Handle unsuccessful response
                warning = extract_response.get("warning", "Unknown error")
                print(f"Extraction was unsuccessful: {warning}")
                documents.append(
                    Document(
                        text=f"Extraction was unsuccessful: {warning}",
                        metadata={
                            "urls": urls,
                            "source": "extract",
                            "error": warning,
                        },
                    )
                )
        else:
            # Handle unexpected response format
            print(f"Unexpected extract response format: {type(extract_response)}")
            documents.append(
                Document(
                    text=str(extract_response),
                    metadata={"urls": urls, "source": "extract"},
                )
            )
    else:
        raise ValueError(
            "Invalid mode. Please choose 'scrape', 'crawl', 'search', or 'extract'."
        )

    return documents

超浏览器网页阅读器 #

基类:EventBaseReader

超浏览器网页阅读器。

使用可选参数配置内容提取来抓取或爬取网页。 需要 hyperbrowser 包。 从 https://app.hyperbrowser.ai/ 获取您的API密钥。

参数:

名称 类型 描述 默认
api_key Optional[str]

Hyperbrowser API 密钥,可设置为环境变量 HYPERBROWSER_API_KEY 或直接传递

None
workflows/handler.py 中的源代码llama_index/readers/web/hyperbrowser_web/base.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
class HyperbrowserWebReader(BaseReader):
    """
    Hyperbrowser Web Reader.

    Scrape or crawl web pages with optional parameters for configuring content extraction.
    Requires the `hyperbrowser` package.
    Get your API Key from https://app.hyperbrowser.ai/

    Args:
        api_key: The Hyperbrowser API key, can be set as an environment variable `HYPERBROWSER_API_KEY` or passed directly

    """

    def __init__(self, api_key: Optional[str] = None):
        api_key = api_key or os.getenv("HYPERBROWSER_API_KEY")
        if not api_key:
            raise ValueError(
                "`api_key` is required, please set the `HYPERBROWSER_API_KEY` environment variable or pass it directly"
            )

        try:
            from hyperbrowser import Hyperbrowser, AsyncHyperbrowser
        except ImportError:
            raise ImportError(
                "`hyperbrowser` package not found, please run `pip install hyperbrowser`"
            )

        self.hyperbrowser = Hyperbrowser(api_key=api_key)
        self.async_hyperbrowser = AsyncHyperbrowser(api_key=api_key)

    def _prepare_params(self, params: Dict) -> Dict:
        """Prepare session and scrape options parameters."""
        try:
            from hyperbrowser.models.session import CreateSessionParams
            from hyperbrowser.models.scrape import ScrapeOptions
        except ImportError:
            raise ImportError(
                "`hyperbrowser` package not found, please run `pip install hyperbrowser`"
            )

        if "scrape_options" in params:
            if "formats" in params["scrape_options"]:
                formats = params["scrape_options"]["formats"]
                if not all(fmt in ["markdown", "html"] for fmt in formats):
                    raise ValueError("formats can only contain 'markdown' or 'html'")

        if "session_options" in params:
            params["session_options"] = CreateSessionParams(**params["session_options"])
        if "scrape_options" in params:
            params["scrape_options"] = ScrapeOptions(**params["scrape_options"])
        return params

    def _create_document(self, content: str, metadata: dict) -> Document:
        """Create a Document with text and metadata."""
        return Document(text=content, metadata=metadata)

    def _extract_content_metadata(self, data: Union[Any, None]):
        """Extract content and metadata from response data."""
        content = ""
        metadata = {}
        if data:
            content = data.markdown or data.html or ""
            if data.metadata:
                metadata = data.metadata
        return content, metadata

    def lazy_load_data(
        self,
        urls: List[str],
        operation: Literal["scrape", "crawl"] = "scrape",
        params: Optional[Dict] = {},
    ) -> Iterable[Document]:
        """
        Lazy load documents.

        Args:
            urls: List of URLs to scrape or crawl
            operation: Operation to perform. Can be "scrape" or "crawl"
            params: Optional params for scrape or crawl. For more information on the supported params, visit https://docs.hyperbrowser.ai/reference/sdks/python/scrape#start-scrape-job-and-wait or https://docs.hyperbrowser.ai/reference/sdks/python/crawl#start-crawl-job-and-wait

        """
        try:
            from hyperbrowser.models.scrape import StartScrapeJobParams
            from hyperbrowser.models.crawl import StartCrawlJobParams
        except ImportError:
            raise ImportError(
                "`hyperbrowser` package not found, please run `pip install hyperbrowser`"
            )

        if operation == "crawl" and len(urls) > 1:
            raise ValueError("`crawl` operation can only accept a single URL")
        params = self._prepare_params(params)

        if operation == "scrape":
            for url in urls:
                scrape_params = StartScrapeJobParams(url=url, **params)
                try:
                    scrape_resp = self.hyperbrowser.scrape.start_and_wait(scrape_params)
                    content, metadata = self._extract_content_metadata(scrape_resp.data)
                    yield self._create_document(content, metadata)
                except Exception as e:
                    logger.error(f"Error scraping {url}: {e}")
                    yield self._create_document("", {})
        else:
            crawl_params = StartCrawlJobParams(url=urls[0], **params)
            try:
                crawl_resp = self.hyperbrowser.crawl.start_and_wait(crawl_params)
                for page in crawl_resp.data:
                    content = page.markdown or page.html or ""
                    yield self._create_document(content, page.metadata or {})
            except Exception as e:
                logger.error(f"Error crawling {urls[0]}: {e}")
                yield self._create_document("", {})

    async def alazy_load_data(
        self,
        urls: Sequence[str],
        operation: Literal["scrape", "crawl"] = "scrape",
        params: Optional[Dict] = {},
    ) -> AsyncIterable[Document]:
        """
        Async lazy load documents.

        Args:
            urls: List of URLs to scrape or crawl
            operation: Operation to perform. Can be "scrape" or "crawl"
            params: Optional params for scrape or crawl. For more information on the supported params, visit https://docs.hyperbrowser.ai/reference/sdks/python/scrape#start-scrape-job-and-wait or https://docs.hyperbrowser.ai/reference/sdks/python/crawl#start-crawl-job-and-wait

        """
        try:
            from hyperbrowser.models.scrape import StartScrapeJobParams
            from hyperbrowser.models.crawl import StartCrawlJobParams
        except ImportError:
            raise ImportError(
                "`hyperbrowser` package not found, please run `pip install hyperbrowser`"
            )

        if operation == "crawl" and len(urls) > 1:
            raise ValueError("`crawl` operation can only accept a single URL")
        params = self._prepare_params(params)

        if operation == "scrape":
            for url in urls:
                scrape_params = StartScrapeJobParams(url=url, **params)
                try:
                    scrape_resp = await self.async_hyperbrowser.scrape.start_and_wait(
                        scrape_params
                    )
                    content, metadata = self._extract_content_metadata(scrape_resp.data)
                    yield self._create_document(content, metadata)
                except Exception as e:
                    logger.error(f"Error scraping {url}: {e}")
                    yield self._create_document("", {})
        else:
            crawl_params = StartCrawlJobParams(url=urls[0], **params)
            try:
                crawl_resp = await self.async_hyperbrowser.crawl.start_and_wait(
                    crawl_params
                )
                for page in crawl_resp.data:
                    content = page.markdown or page.html or ""
                    yield self._create_document(content, page.metadata or {})
            except Exception as e:
                logger.error(f"Error crawling {urls[0]}: {e}")
                yield self._create_document("", {})

lazy_load_data #

lazy_load_data(urls: List[str], operation: Literal['scrape', 'crawl'] = 'scrape', params: Optional[Dict] = {}) -> Iterable[文档]

懒加载文档。

参数:

名称 类型 描述 默认
urls List[str]

待抓取或爬取的网址列表

required
operation Literal['scrape', 'crawl']

要执行的操作。可以是 "scrape" 或 "crawl"

'scrape'
params Optional[Dict]

用于抓取或爬取的可选参数。有关支持的参数的更多信息,请访问 https://docs.hyperbrowser.ai/reference/sdks/python/scrape#start-scrape-job-and-wait 或 https://docs.hyperbrowser.ai/reference/sdks/python/crawl#start-crawl-job-and-wait

{}
workflows/handler.py 中的源代码llama_index/readers/web/hyperbrowser_web/base.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def lazy_load_data(
    self,
    urls: List[str],
    operation: Literal["scrape", "crawl"] = "scrape",
    params: Optional[Dict] = {},
) -> Iterable[Document]:
    """
    Lazy load documents.

    Args:
        urls: List of URLs to scrape or crawl
        operation: Operation to perform. Can be "scrape" or "crawl"
        params: Optional params for scrape or crawl. For more information on the supported params, visit https://docs.hyperbrowser.ai/reference/sdks/python/scrape#start-scrape-job-and-wait or https://docs.hyperbrowser.ai/reference/sdks/python/crawl#start-crawl-job-and-wait

    """
    try:
        from hyperbrowser.models.scrape import StartScrapeJobParams
        from hyperbrowser.models.crawl import StartCrawlJobParams
    except ImportError:
        raise ImportError(
            "`hyperbrowser` package not found, please run `pip install hyperbrowser`"
        )

    if operation == "crawl" and len(urls) > 1:
        raise ValueError("`crawl` operation can only accept a single URL")
    params = self._prepare_params(params)

    if operation == "scrape":
        for url in urls:
            scrape_params = StartScrapeJobParams(url=url, **params)
            try:
                scrape_resp = self.hyperbrowser.scrape.start_and_wait(scrape_params)
                content, metadata = self._extract_content_metadata(scrape_resp.data)
                yield self._create_document(content, metadata)
            except Exception as e:
                logger.error(f"Error scraping {url}: {e}")
                yield self._create_document("", {})
    else:
        crawl_params = StartCrawlJobParams(url=urls[0], **params)
        try:
            crawl_resp = self.hyperbrowser.crawl.start_and_wait(crawl_params)
            for page in crawl_resp.data:
                content = page.markdown or page.html or ""
                yield self._create_document(content, page.metadata or {})
        except Exception as e:
            logger.error(f"Error crawling {urls[0]}: {e}")
            yield self._create_document("", {})

alazy_load_data async #

alazy_load_data(urls: Sequence[str], operation: Literal['scrape', 'crawl'] = 'scrape', params: Optional[Dict] = {}) -> AsyncIterable[文档]

异步懒加载文档。

参数:

名称 类型 描述 默认
urls Sequence[str]

待抓取或爬取的网址列表

required
operation Literal['scrape', 'crawl']

要执行的操作。可以是 "scrape" 或 "crawl"

'scrape'
params Optional[Dict]

用于抓取或爬取的可选参数。有关支持的参数的更多信息,请访问 https://docs.hyperbrowser.ai/reference/sdks/python/scrape#start-scrape-job-and-wait 或 https://docs.hyperbrowser.ai/reference/sdks/python/crawl#start-crawl-job-and-wait

{}
workflows/handler.py 中的源代码llama_index/readers/web/hyperbrowser_web/base.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
async def alazy_load_data(
    self,
    urls: Sequence[str],
    operation: Literal["scrape", "crawl"] = "scrape",
    params: Optional[Dict] = {},
) -> AsyncIterable[Document]:
    """
    Async lazy load documents.

    Args:
        urls: List of URLs to scrape or crawl
        operation: Operation to perform. Can be "scrape" or "crawl"
        params: Optional params for scrape or crawl. For more information on the supported params, visit https://docs.hyperbrowser.ai/reference/sdks/python/scrape#start-scrape-job-and-wait or https://docs.hyperbrowser.ai/reference/sdks/python/crawl#start-crawl-job-and-wait

    """
    try:
        from hyperbrowser.models.scrape import StartScrapeJobParams
        from hyperbrowser.models.crawl import StartCrawlJobParams
    except ImportError:
        raise ImportError(
            "`hyperbrowser` package not found, please run `pip install hyperbrowser`"
        )

    if operation == "crawl" and len(urls) > 1:
        raise ValueError("`crawl` operation can only accept a single URL")
    params = self._prepare_params(params)

    if operation == "scrape":
        for url in urls:
            scrape_params = StartScrapeJobParams(url=url, **params)
            try:
                scrape_resp = await self.async_hyperbrowser.scrape.start_and_wait(
                    scrape_params
                )
                content, metadata = self._extract_content_metadata(scrape_resp.data)
                yield self._create_document(content, metadata)
            except Exception as e:
                logger.error(f"Error scraping {url}: {e}")
                yield self._create_document("", {})
    else:
        crawl_params = StartCrawlJobParams(url=urls[0], **params)
        try:
            crawl_resp = await self.async_hyperbrowser.crawl.start_and_wait(
                crawl_params
            )
            for page in crawl_resp.data:
                content = page.markdown or page.html or ""
                yield self._create_document(content, page.metadata or {})
        except Exception as e:
            logger.error(f"Error crawling {urls[0]}: {e}")
            yield self._create_document("", {})

知识库网页阅读器 #

基类:EventBaseReader

知识库阅读器。

使用 Playwright 从知识库/帮助中心抓取并读取文章。 已在 Zendesk 和 Intercom CMS 上测试通过,可能也适用于其他系统。 可在无头模式下运行,但可能会被 Cloudflare 拦截。为保险起见,建议使用有头模式运行。 偶尔会出现超时情况,若发生只需增加默认超时时间即可。 需要安装 playwright 软件包。

参数:

名称 类型 描述 默认
root_url str

知识库的基础URL,不带尾部斜杠 例如 'https://support.intercom.com'

required
link_selectors List[str]

在爬取过程中用于查找文章链接的CSS选择器列表 例如 ['.article-list a', '.article-list a']

required
article_path str

该域名下文章的URL路径,以便爬虫知道何时停止 例如 '/articles'

required
title_selector Optional[str]

用于查找文章标题的CSS选择器 例如:'.article-title'

None
subtitle_selector Optional[str]

用于查找文章副标题/描述的CSS选择器 例如 '.article-subtitle'

None
body_selector Optional[str]

用于定位文章正文的CSS选择器 例如 '.article-body'

None
workflows/handler.py 中的源代码llama_index/readers/web/knowledge_base/base.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class KnowledgeBaseWebReader(BaseReader):
    """
    Knowledge base reader.

    Crawls and reads articles from a knowledge base/help center with Playwright.
    Tested on Zendesk and Intercom CMS, may work on others.
    Can be run in headless mode but it may be blocked by Cloudflare. Run it headed to be safe.
    Times out occasionally, just increase the default time out if it does.
    Requires the `playwright` package.

    Args:
        root_url (str): the base url of the knowledge base, with no trailing slash
            e.g. 'https://support.intercom.com'
        link_selectors (List[str]): list of css selectors to find links to articles while crawling
            e.g. ['.article-list a', '.article-list a']
        article_path (str): the url path of articles on this domain so the crawler knows when to stop
            e.g. '/articles'
        title_selector (Optional[str]): css selector to find the title of the article
            e.g. '.article-title'
        subtitle_selector (Optional[str]): css selector to find the subtitle/description of the article
            e.g. '.article-subtitle'
        body_selector (Optional[str]): css selector to find the body of the article
            e.g. '.article-body'

    """

    def __init__(
        self,
        root_url: str,
        link_selectors: List[str],
        article_path: str,
        title_selector: Optional[str] = None,
        subtitle_selector: Optional[str] = None,
        body_selector: Optional[str] = None,
        max_depth: int = 100,
    ) -> None:
        """Initialize with parameters."""
        self.root_url = root_url
        self.link_selectors = link_selectors
        self.article_path = article_path
        self.title_selector = title_selector
        self.subtitle_selector = subtitle_selector
        self.body_selector = body_selector
        self.max_depth = max_depth

    def load_data(self) -> List[Document]:
        """Load data from the knowledge base."""
        from playwright.sync_api import sync_playwright

        with sync_playwright() as p:
            browser = p.chromium.launch(headless=False)

            # Crawl
            article_urls = self.get_article_urls(
                browser, self.root_url, self.root_url, self.max_depth
            )

            # Scrape
            documents = []
            for url in article_urls:
                article = self.scrape_article(
                    browser,
                    url,
                )
                extra_info = {
                    "title": article["title"],
                    "subtitle": article["subtitle"],
                    "url": article["url"],
                }
                documents.append(Document(text=article["body"], extra_info=extra_info))

            browser.close()

            return documents

    def scrape_article(
        self,
        browser: Any,
        url: str,
    ) -> Dict[str, str]:
        """
        Scrape a single article url.

        Args:
            browser (Any): a Playwright Chromium browser.
            url (str): URL of the article to scrape.

        Returns:
            Dict[str, str]: a mapping of article attributes to their values.

        """
        page = browser.new_page(ignore_https_errors=True)
        page.set_default_timeout(60000)
        page.goto(url, wait_until="domcontentloaded")

        title = (
            (
                page.query_selector(self.title_selector).evaluate(
                    "node => node.innerText"
                )
            )
            if self.title_selector
            else ""
        )
        subtitle = (
            (
                page.query_selector(self.subtitle_selector).evaluate(
                    "node => node.innerText"
                )
            )
            if self.subtitle_selector
            else ""
        )
        body = (
            (page.query_selector(self.body_selector).evaluate("node => node.innerText"))
            if self.body_selector
            else ""
        )

        page.close()
        print("scraped:", url)
        return {"title": title, "subtitle": subtitle, "body": body, "url": url}

    def get_article_urls(
        self,
        browser: Any,
        root_url: str,
        current_url: str,
        max_depth: int = 100,
        depth: int = 0,
    ) -> List[str]:
        """
        Recursively crawl through the knowledge base to find a list of articles.

        Args:
            browser (Any): a Playwright Chromium browser.
            root_url (str): root URL of the knowledge base.
            current_url (str): current URL that is being crawled.
            max_depth (int): maximum recursion level for the crawler
            depth (int): current depth level

        Returns:
            List[str]: a list of URLs of found articles.

        """
        if depth >= max_depth:
            print(f"Reached max depth ({max_depth}): {current_url}")
            return []

        page = browser.new_page(ignore_https_errors=True)
        page.set_default_timeout(60000)
        page.goto(current_url, wait_until="domcontentloaded")

        # If this is a leaf node aka article page, return itself
        if self.article_path in current_url:
            print("Found an article: ", current_url)
            page.close()
            return [current_url]

        # Otherwise crawl this page and find all the articles linked from it
        article_urls = []
        links = []

        for link_selector in self.link_selectors:
            ahrefs = page.query_selector_all(link_selector)
            links.extend(ahrefs)

        for link in links:
            url = root_url + page.evaluate("(node) => node.getAttribute('href')", link)
            article_urls.extend(
                self.get_article_urls(browser, root_url, url, max_depth, depth + 1)
            )

        page.close()

        return article_urls

load_data #

load_data() -> List[文档]

从知识库加载数据。

workflows/handler.py 中的源代码llama_index/readers/web/knowledge_base/base.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def load_data(self) -> List[Document]:
    """Load data from the knowledge base."""
    from playwright.sync_api import sync_playwright

    with sync_playwright() as p:
        browser = p.chromium.launch(headless=False)

        # Crawl
        article_urls = self.get_article_urls(
            browser, self.root_url, self.root_url, self.max_depth
        )

        # Scrape
        documents = []
        for url in article_urls:
            article = self.scrape_article(
                browser,
                url,
            )
            extra_info = {
                "title": article["title"],
                "subtitle": article["subtitle"],
                "url": article["url"],
            }
            documents.append(Document(text=article["body"], extra_info=extra_info))

        browser.close()

        return documents

scrape_article #

scrape_article(browser: Any, url: str) -> Dict[str, str]

抓取单个文章链接。

参数:

名称 类型 描述 默认
browser Any

一个 Playwright Chromium 浏览器。

required
url str

要抓取的文章URL。

required

返回:

类型 描述
Dict[str, str]

Dict[str, str]: 文章属性与其值的映射关系。

workflows/handler.py 中的源代码llama_index/readers/web/knowledge_base/base.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def scrape_article(
    self,
    browser: Any,
    url: str,
) -> Dict[str, str]:
    """
    Scrape a single article url.

    Args:
        browser (Any): a Playwright Chromium browser.
        url (str): URL of the article to scrape.

    Returns:
        Dict[str, str]: a mapping of article attributes to their values.

    """
    page = browser.new_page(ignore_https_errors=True)
    page.set_default_timeout(60000)
    page.goto(url, wait_until="domcontentloaded")

    title = (
        (
            page.query_selector(self.title_selector).evaluate(
                "node => node.innerText"
            )
        )
        if self.title_selector
        else ""
    )
    subtitle = (
        (
            page.query_selector(self.subtitle_selector).evaluate(
                "node => node.innerText"
            )
        )
        if self.subtitle_selector
        else ""
    )
    body = (
        (page.query_selector(self.body_selector).evaluate("node => node.innerText"))
        if self.body_selector
        else ""
    )

    page.close()
    print("scraped:", url)
    return {"title": title, "subtitle": subtitle, "body": body, "url": url}

get_article_urls #

get_article_urls(browser: Any, root_url: str, current_url: str, max_depth: int = 100, depth: int = 0) -> List[str]

递归遍历知识库以查找文章列表。

参数:

名称 类型 描述 默认
browser Any

一个 Playwright Chromium 浏览器。

required
root_url str

知识库的根URL。

required
current_url str

当前正在抓取的URL。

required
max_depth int

爬虫的最大递归层级

100
depth int

当前深度级别

0

返回:

类型 描述
List[str]

List[str]: 找到的文章URL列表。

workflows/handler.py 中的源代码llama_index/readers/web/knowledge_base/base.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def get_article_urls(
    self,
    browser: Any,
    root_url: str,
    current_url: str,
    max_depth: int = 100,
    depth: int = 0,
) -> List[str]:
    """
    Recursively crawl through the knowledge base to find a list of articles.

    Args:
        browser (Any): a Playwright Chromium browser.
        root_url (str): root URL of the knowledge base.
        current_url (str): current URL that is being crawled.
        max_depth (int): maximum recursion level for the crawler
        depth (int): current depth level

    Returns:
        List[str]: a list of URLs of found articles.

    """
    if depth >= max_depth:
        print(f"Reached max depth ({max_depth}): {current_url}")
        return []

    page = browser.new_page(ignore_https_errors=True)
    page.set_default_timeout(60000)
    page.goto(current_url, wait_until="domcontentloaded")

    # If this is a leaf node aka article page, return itself
    if self.article_path in current_url:
        print("Found an article: ", current_url)
        page.close()
        return [current_url]

    # Otherwise crawl this page and find all the articles linked from it
    article_urls = []
    links = []

    for link_selector in self.link_selectors:
        ahrefs = page.query_selector_all(link_selector)
        links.extend(ahrefs)

    for link in links:
        url = root_url + page.evaluate("(node) => node.getAttribute('href')", link)
        article_urls.extend(
            self.get_article_urls(browser, root_url, url, max_depth, depth + 1)
        )

    page.close()

    return article_urls

主内容提取阅读器 #

基类:EventBaseReader

MainContentExtractor 网页阅读器。

从网页读取页面。

参数:

名称 类型 描述 默认
text_format str

文本的格式。默认为 "markdown"。 需要 MainContentExtractor 包。

'markdown'
workflows/handler.py 中的源代码llama_index/readers/web/main_content_extractor/base.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class MainContentExtractorReader(BaseReader):
    """
    MainContentExtractor web page reader.

    Reads pages from the web.

    Args:
        text_format (str, optional): The format of the text. Defaults to "markdown".
            Requires `MainContentExtractor` package.

    """

    def __init__(self, text_format: str = "markdown") -> None:
        """Initialize with parameters."""
        self.text_format = text_format

    def load_data(self, urls: List[str]) -> List[Document]:
        """
        Load data from the input directory.

        Args:
            urls (List[str]): List of URLs to scrape.

        Returns:
            List[Document]: List of documents.

        """
        if not isinstance(urls, list):
            raise ValueError("urls must be a list of strings.")

        from main_content_extractor import MainContentExtractor

        documents = []
        for url in urls:
            response = requests.get(url).text
            response = MainContentExtractor.extract(
                response, output_format=self.text_format, include_links=False
            )

            documents.append(Document(text=response))

        return documents

load_data #

load_data(urls: List[str]) -> List[文档]

从输入目录加载数据。

参数:

名称 类型 描述 默认
urls List[str]

要抓取的网址列表。

required

返回:

类型 描述
List[文档]

List[Document]: 文档列表。

workflows/handler.py 中的源代码llama_index/readers/web/main_content_extractor/base.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def load_data(self, urls: List[str]) -> List[Document]:
    """
    Load data from the input directory.

    Args:
        urls (List[str]): List of URLs to scrape.

    Returns:
        List[Document]: List of documents.

    """
    if not isinstance(urls, list):
        raise ValueError("urls must be a list of strings.")

    from main_content_extractor import MainContentExtractor

    documents = []
    for url in urls:
        response = requests.get(url).text
        response = MainContentExtractor.extract(
            response, output_format=self.text_format, include_links=False
        )

        documents.append(Document(text=response))

    return documents

新闻文章阅读器 #

基类:EventBaseReader

简单的新闻文章阅读器。

从网络读取新闻文章并使用 newspaper 库进行解析。

参数:

名称 类型 描述 默认
text_mode bool

是否加载内容的文本版本或HTML版本(默认=True)。

True
use_nlp bool

是否使用自然语言处理提取额外的摘要和关键词(默认=True)。

True
newspaper_kwargs Any

传递给 newspaper.Article 的额外关键字参数。请参阅 https://newspaper.readthedocs.io/en/latest/user_guide/quickstart.html#article

{}
workflows/handler.py 中的源代码llama_index/readers/web/news/base.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class NewsArticleReader(BaseReader):
    """
    Simple news article reader.

    Reads news articles from the web and parses them using the `newspaper` library.

    Args:
        text_mode (bool): Whether to load a text version or HTML version of the content (default=True).
        use_nlp (bool): Whether to use NLP to extract additional summary and keywords (default=True).
        newspaper_kwargs: Additional keyword arguments to pass to newspaper.Article. See
            https://newspaper.readthedocs.io/en/latest/user_guide/quickstart.html#article

    """

    def __init__(
        self, text_mode: bool = True, use_nlp: bool = True, **newspaper_kwargs: Any
    ) -> None:
        """Initialize with parameters."""
        if find_spec("newspaper") is None:
            raise ImportError(
                "`newspaper` package not found, please run `pip install newspaper3k`"
            )
        self.load_text = text_mode
        self.use_nlp = use_nlp
        self.newspaper_kwargs = newspaper_kwargs

    def load_data(self, urls: List[str]) -> List[Document]:
        """
        Load data from the list of news article urls.

        Args:
            urls (List[str]): List of URLs to load news articles.

        Returns:
            List[Document]: List of documents.

        """
        if not isinstance(urls, list) and not isinstance(urls, Generator):
            raise ValueError("urls must be a list or generator.")
        documents = []
        for url in urls:
            from newspaper import Article

            try:
                article = Article(url, **self.newspaper_kwargs)
                article.download()
                article.parse()

                if self.use_nlp:
                    article.nlp()

            except Exception as e:
                logger.error(f"Error fetching or processing {url}, exception: {e}")
                continue

            metadata = {
                "title": getattr(article, "title", ""),
                "link": getattr(article, "url", getattr(article, "canonical_link", "")),
                "authors": getattr(article, "authors", []),
                "language": getattr(article, "meta_lang", ""),
                "description": getattr(article, "meta_description", ""),
                "publish_date": getattr(article, "publish_date", ""),
            }

            if self.load_text:
                content = article.text
            else:
                content = article.html

            if self.use_nlp:
                metadata["keywords"] = getattr(article, "keywords", [])
                metadata["summary"] = getattr(article, "summary", "")

            documents.append(Document(text=content, metadata=metadata))

        return documents

load_data #

load_data(urls: List[str]) -> List[文档]

从新闻文章网址列表中加载数据。

参数:

名称 类型 描述 默认
urls List[str]

用于加载新闻文章的网址列表。

required

返回:

类型 描述
List[文档]

List[Document]: 文档列表。

workflows/handler.py 中的源代码llama_index/readers/web/news/base.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def load_data(self, urls: List[str]) -> List[Document]:
    """
    Load data from the list of news article urls.

    Args:
        urls (List[str]): List of URLs to load news articles.

    Returns:
        List[Document]: List of documents.

    """
    if not isinstance(urls, list) and not isinstance(urls, Generator):
        raise ValueError("urls must be a list or generator.")
    documents = []
    for url in urls:
        from newspaper import Article

        try:
            article = Article(url, **self.newspaper_kwargs)
            article.download()
            article.parse()

            if self.use_nlp:
                article.nlp()

        except Exception as e:
            logger.error(f"Error fetching or processing {url}, exception: {e}")
            continue

        metadata = {
            "title": getattr(article, "title", ""),
            "link": getattr(article, "url", getattr(article, "canonical_link", "")),
            "authors": getattr(article, "authors", []),
            "language": getattr(article, "meta_lang", ""),
            "description": getattr(article, "meta_description", ""),
            "publish_date": getattr(article, "publish_date", ""),
        }

        if self.load_text:
            content = article.text
        else:
            content = article.html

        if self.use_nlp:
            metadata["keywords"] = getattr(article, "keywords", [])
            metadata["summary"] = getattr(article, "summary", "")

        documents.append(Document(text=content, metadata=metadata))

    return documents

Oxylabs网页阅读器 #

基类:EventBasePydanticReader

使用 Oxylabs 网页抓取 API 抓取任意网站,并以 Markdown 格式获取结果。

查看API文档

参数:

名称 类型 描述 默认
username str

Oxylabs API 用户名。

required
password str

Oxylabs API 密码。

required
示例

.. code-block:: python from llama_index.readers.web.oxylabs_web.base import OxylabsWebReader

reader = OxylabsWebReader(
    username=os.environ["OXYLABS_USERNAME"], password=os.environ["OXYLABS_PASSWORD"]
)

docs = reader.load_data(
    [
        "https://sandbox.oxylabs.io/products/1",
        "https://sandbox.oxylabs.io/products/2"
    ],
    {
        "parse": True,
    }
)

print(docs[0].text)
workflows/handler.py 中的源代码llama_index/readers/web/oxylabs_web/base.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class OxylabsWebReader(BasePydanticReader):
    """
    Scrape any website with Oxylabs Web Scraper API and get results in Markdown format.

    [See the API documentation](https://developers.oxylabs.io/scraper-apis/web-scraper-api/other-websites)

    Args:
        username: Oxylabs API username.
        password: Oxylabs API password.

    Example:
        .. code-block:: python
            from llama_index.readers.web.oxylabs_web.base import OxylabsWebReader

            reader = OxylabsWebReader(
                username=os.environ["OXYLABS_USERNAME"], password=os.environ["OXYLABS_PASSWORD"]
            )

            docs = reader.load_data(
                [
                    "https://sandbox.oxylabs.io/products/1",
                    "https://sandbox.oxylabs.io/products/2"
                ],
                {
                    "parse": True,
                }
            )

            print(docs[0].text)

    """

    timeout_s: int = 100
    oxylabs_scraper_url: str = "https://realtime.oxylabs.io/v1/queries"
    api: "RealtimeAPI"
    async_api: "AsyncAPI"
    default_config: dict[str, Any] = Field(default_factory=get_default_config)

    def __init__(self, username: str, password: str, **kwargs) -> None:
        from oxylabs.internal.api import AsyncAPI, APICredentials, RealtimeAPI

        credentials = APICredentials(username=username, password=password)

        bits, _ = architecture()
        sdk_type = (
            f"oxylabs-llama-index-web-sdk-python/"
            f"{version('llama-index-readers-web')} "
            f"({python_version()}; {bits})"
        )

        api = RealtimeAPI(credentials, sdk_type=sdk_type)
        async_api = AsyncAPI(credentials, sdk_type=sdk_type)

        super().__init__(api=api, async_api=async_api, **kwargs)

    @classmethod
    def class_name(cls) -> str:
        return "OxylabsWebReader"

    def _get_document_from_response(self, response: dict[str, Any]) -> Document:
        content = response["results"][0]["content"]

        if isinstance(content, (dict, list)):
            text = json_to_markdown(content)
        else:
            striped_html = strip_html(str(content))
            text = markdownify(striped_html)

        return Document(
            metadata={"oxylabs_job": response["job"]},
            text=text,
        )

    async def aload_data(
        self,
        urls: list[str],
        additional_params: Optional[Dict[str, Any]] = None,
    ) -> List[Document]:
        """
        Asynchronously load data from urls.

        Args:
            urls: List of URLs to load.
            additional_params: Dictionary of scraper parameters as described
                [here](https://developers.oxylabs.io/scraper-apis/web-scraper-api/targets/generic-target#additional)

        """
        if additional_params is None:
            additional_params = {}

        responses = await asyncio.gather(
            *[
                self.async_api.get_response(
                    {**additional_params, "url": url},
                    self.default_config,
                )
                for url in urls
            ]
        )

        return [
            self._get_document_from_response(response)
            for response in responses
            if response
        ]

    def load_data(
        self,
        urls: list[str],
        additional_params: Optional[Dict[str, Any]] = None,
    ) -> List[Document]:
        """
        Load data from urls.

        Args:
            urls: List of URLs to load.
            additional_params: Dictionary of scraper parameters as described
                [here](https://developers.oxylabs.io/scraper-apis/web-scraper-api/targets/generic-target#additional)

        """
        if additional_params is None:
            additional_params = {}

        responses = [
            self.api.get_response(
                {**additional_params, "url": url},
                self.default_config,
            )
            for url in urls
        ]

        return [
            self._get_document_from_response(response)
            for response in responses
            if response
        ]

aload_data async #

aload_data(urls: list[str], additional_params: Optional[Dict[str, Any]] = None) -> List[文档]

从URL异步加载数据。

参数:

名称 类型 描述 默认
urls list[str]

要加载的URL列表。

required
additional_params Optional[Dict[str, Any]]

爬虫参数字典,如此处所述

None
workflows/handler.py 中的源代码llama_index/readers/web/oxylabs_web/base.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def aload_data(
    self,
    urls: list[str],
    additional_params: Optional[Dict[str, Any]] = None,
) -> List[Document]:
    """
    Asynchronously load data from urls.

    Args:
        urls: List of URLs to load.
        additional_params: Dictionary of scraper parameters as described
            [here](https://developers.oxylabs.io/scraper-apis/web-scraper-api/targets/generic-target#additional)

    """
    if additional_params is None:
        additional_params = {}

    responses = await asyncio.gather(
        *[
            self.async_api.get_response(
                {**additional_params, "url": url},
                self.default_config,
            )
            for url in urls
        ]
    )

    return [
        self._get_document_from_response(response)
        for response in responses
        if response
    ]

load_data #

load_data(urls: list[str], additional_params: Optional[Dict[str, Any]] = None) -> List[文档]

从网址加载数据。

参数:

名称 类型 描述 默认
urls list[str]

要加载的URL列表。

required
additional_params Optional[Dict[str, Any]]

爬虫参数字典,如此处所述

None
workflows/handler.py 中的源代码llama_index/readers/web/oxylabs_web/base.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def load_data(
    self,
    urls: list[str],
    additional_params: Optional[Dict[str, Any]] = None,
) -> List[Document]:
    """
    Load data from urls.

    Args:
        urls: List of URLs to load.
        additional_params: Dictionary of scraper parameters as described
            [here](https://developers.oxylabs.io/scraper-apis/web-scraper-api/targets/generic-target#additional)

    """
    if additional_params is None:
        additional_params = {}

    responses = [
        self.api.get_response(
            {**additional_params, "url": url},
            self.default_config,
        )
        for url in urls
    ]

    return [
        self._get_document_from_response(response)
        for response in responses
        if response
    ]

可读性网页阅读器 #

基类:EventBaseReader

可读性网页加载器。

从完全渲染的网页中提取相关信息。 在处理过程中,始终假设用作数据源的网页包含文本内容。

  1. 加载页面并等待其渲染完成。(playwright)
  2. 注入 Readability.js 以提取主要内容。

参数:

名称 类型 描述 默认
proxy Optional[str]

代理服务器。默认为无。

None
wait_until Optional[Literal['commit', 'domcontentloaded', 'load', 'networkidle']]

等待页面加载完成。默认为"domcontentloaded"。

'domcontentloaded'
text_splitter TextSplitter

文本分割器。默认为无。

None
normalizer Optional[Callable[[str], str]]

文本规范化器。默认为 nfkc_normalize。

required
workflows/handler.py 中的源代码llama_index/readers/web/readability_web/base.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class ReadabilityWebPageReader(BaseReader):
    """
    Readability Webpage Loader.

    Extracting relevant information from a fully rendered web page.
    During the processing, it is always assumed that web pages used as data sources contain textual content.

    1. Load the page and wait for it rendered. (playwright)
    2. Inject Readability.js to extract the main content.

    Args:
        proxy (Optional[str], optional): Proxy server. Defaults to None.
        wait_until (Optional[Literal["commit", "domcontentloaded", "load", "networkidle"]], optional): Wait until the page is loaded. Defaults to "domcontentloaded".
        text_splitter (TextSplitter, optional): Text splitter. Defaults to None.
        normalizer (Optional[Callable[[str], str]], optional): Text normalizer. Defaults to nfkc_normalize.

    """

    def __init__(
        self,
        proxy: Optional[str] = None,
        wait_until: Optional[
            Literal["commit", "domcontentloaded", "load", "networkidle"]
        ] = "domcontentloaded",
        text_splitter: Optional[TextSplitter] = None,
        normalize: Optional[Callable[[str], str]] = nfkc_normalize,
    ) -> None:
        self._launch_options = {
            "headless": True,
        }
        self._wait_until = wait_until
        if proxy:
            self._launch_options["proxy"] = {
                "server": proxy,
            }
        self._text_splitter = text_splitter
        self._normalize = normalize
        self._readability_js = None

    async def async_load_data(self, url: str) -> List[Document]:
        """
        Render and load data content from url.

        Args:
            url (str): URL to scrape.

        Returns:
            List[Document]: List of documents.

        """
        from playwright.async_api import async_playwright

        async with async_playwright() as async_playwright:
            browser = await async_playwright.chromium.launch(**self._launch_options)

            article = await self.scrape_page(
                browser,
                url,
            )
            extra_info = {
                key: article[key]
                for key in [
                    "title",
                    "length",
                    "excerpt",
                    "byline",
                    "dir",
                    "lang",
                    "siteName",
                ]
            }

            if self._normalize is not None:
                article["textContent"] = self._normalize(article["textContent"])
            texts = []
            if self._text_splitter is not None:
                texts = self._text_splitter.split_text(article["textContent"])
            else:
                texts = [article["textContent"]]

            await browser.close()

            return [Document(text=x, extra_info=extra_info) for x in texts]

    def load_data(self, url: str) -> List[Document]:
        return async_to_sync(self.async_load_data(url))

    async def scrape_page(
        self,
        browser: Browser,
        url: str,
    ) -> Dict[str, str]:
        """
        Scrape a single article url.

        Args:
            browser (Any): a Playwright Chromium browser.
            url (str): URL of the article to scrape.

        Returns:
            Ref: https://github.com/mozilla/readability
            title: article title;
            content: HTML string of processed article content;
            textContent: text content of the article, with all the HTML tags removed;
            length: length of an article, in characters;
            excerpt: article description, or short excerpt from the content;
            byline: author metadata;
            dir: content direction;
            siteName: name of the site.
            lang: content language

        """
        if self._readability_js is None:
            with open(path) as f:
                self._readability_js = f.read()

        inject_readability = f"""
            (function(){{
            {self._readability_js}
            function executor() {{
                return new Readability({{}}, document).parse();
            }}
            return executor();
            }}())
        """

        # browser = cast(Browser, browser)
        page = await browser.new_page(ignore_https_errors=True)
        page.set_default_timeout(60000)
        await page.goto(url, wait_until=self._wait_until)

        r = await page.evaluate(inject_readability)

        await page.close()
        print("scraped:", url)

        return r

async_load_data async #

async_load_data(url: str) -> List[文档]

从URL渲染并加载数据内容。

参数:

名称 类型 描述 默认
url str

要抓取的URL。

required

返回:

类型 描述
List[文档]

List[Document]: 文档列表。

workflows/handler.py 中的源代码llama_index/readers/web/readability_web/base.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
async def async_load_data(self, url: str) -> List[Document]:
    """
    Render and load data content from url.

    Args:
        url (str): URL to scrape.

    Returns:
        List[Document]: List of documents.

    """
    from playwright.async_api import async_playwright

    async with async_playwright() as async_playwright:
        browser = await async_playwright.chromium.launch(**self._launch_options)

        article = await self.scrape_page(
            browser,
            url,
        )
        extra_info = {
            key: article[key]
            for key in [
                "title",
                "length",
                "excerpt",
                "byline",
                "dir",
                "lang",
                "siteName",
            ]
        }

        if self._normalize is not None:
            article["textContent"] = self._normalize(article["textContent"])
        texts = []
        if self._text_splitter is not None:
            texts = self._text_splitter.split_text(article["textContent"])
        else:
            texts = [article["textContent"]]

        await browser.close()

        return [Document(text=x, extra_info=extra_info) for x in texts]

scrape_page async #

scrape_page(browser: Browser, url: str) -> Dict[str, str]

抓取单个文章链接。

参数:

名称 类型 描述 默认
browser Any

一个 Playwright Chromium 浏览器。

required
url str

要抓取的文章URL。

required

返回:

名称 类型 描述
Ref Dict[str, str]

https://github.com/mozilla/readability

title Dict[str, str]

文章标题;

content Dict[str, str]

经过处理的文章内容的HTML字符串;

textContent Dict[str, str]

文章文本内容,已移除所有HTML标签;

length Dict[str, str]

文章长度,以字符数表示;

excerpt Dict[str, str]

文章描述,或内容简短摘录;

byline Dict[str, str]

作者元数据;

dir Dict[str, str]

内容方向;

siteName Dict[str, str]

网站名称。

lang Dict[str, str]

内容语言

workflows/handler.py 中的源代码llama_index/readers/web/readability_web/base.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
async def scrape_page(
    self,
    browser: Browser,
    url: str,
) -> Dict[str, str]:
    """
    Scrape a single article url.

    Args:
        browser (Any): a Playwright Chromium browser.
        url (str): URL of the article to scrape.

    Returns:
        Ref: https://github.com/mozilla/readability
        title: article title;
        content: HTML string of processed article content;
        textContent: text content of the article, with all the HTML tags removed;
        length: length of an article, in characters;
        excerpt: article description, or short excerpt from the content;
        byline: author metadata;
        dir: content direction;
        siteName: name of the site.
        lang: content language

    """
    if self._readability_js is None:
        with open(path) as f:
            self._readability_js = f.read()

    inject_readability = f"""
        (function(){{
        {self._readability_js}
        function executor() {{
            return new Readability({{}}, document).parse();
        }}
        return executor();
        }}())
    """

    # browser = cast(Browser, browser)
    page = await browser.new_page(ignore_https_errors=True)
    page.set_default_timeout(60000)
    await page.goto(url, wait_until=self._wait_until)

    r = await page.evaluate(inject_readability)

    await page.close()
    print("scraped:", url)

    return r

RSS阅读器 #

基类:EventBasePydanticReader

RSS阅读器。

从RSS订阅源读取内容。

workflows/handler.py 中的源代码llama_index/readers/web/rss/base.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class RssReader(BasePydanticReader):
    """
    RSS reader.

    Reads content from an RSS feed.

    """

    is_remote: bool = True
    html_to_text: bool = False
    user_agent: Union[str, None] = None

    def __init__(self, **kwargs: Any) -> None:
        super().__init__(**kwargs)
        # https://pythonhosted.org/feedparser/http-useragent.html
        self.user_agent = kwargs.get("user_agent")

    @classmethod
    def class_name(cls) -> str:
        return "RssReader"

    def load_data(self, urls: List[str]) -> List[Document]:
        """
        Load data from RSS feeds.

        Args:
            urls (List[str]): List of RSS URLs to load.

        Returns:
            List[Document]: List of documents.

        """
        import feedparser

        if self.user_agent:
            feedparser.USER_AGENT = self.user_agent

        if not isinstance(urls, list):
            raise ValueError("urls must be a list of strings.")

        documents = []

        for url in urls:
            parsed = feedparser.parse(url)
            for entry in parsed.entries:
                doc_id = getattr(entry, "id", None) or getattr(entry, "link", None)
                data = entry.get("content", [{}])[0].get(
                    "value", entry.get("description", entry.get("summary", ""))
                )

                if self.html_to_text:
                    import html2text

                    data = html2text.html2text(data)

                extra_info = {
                    "title": getattr(entry, "title", None),
                    "link": getattr(entry, "link", None),
                    "date": getattr(entry, "published", None),
                }

                if doc_id:
                    documents.append(
                        Document(text=data, id_=doc_id, extra_info=extra_info)
                    )
                else:
                    documents.append(Document(text=data, extra_info=extra_info))

        return documents

load_data #

load_data(urls: List[str]) -> List[文档]

从RSS源加载数据。

参数:

名称 类型 描述 默认
urls List[str]

要加载的RSS网址列表。

required

返回:

类型 描述
List[文档]

List[Document]: 文档列表。

workflows/handler.py 中的源代码llama_index/readers/web/rss/base.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def load_data(self, urls: List[str]) -> List[Document]:
    """
    Load data from RSS feeds.

    Args:
        urls (List[str]): List of RSS URLs to load.

    Returns:
        List[Document]: List of documents.

    """
    import feedparser

    if self.user_agent:
        feedparser.USER_AGENT = self.user_agent

    if not isinstance(urls, list):
        raise ValueError("urls must be a list of strings.")

    documents = []

    for url in urls:
        parsed = feedparser.parse(url)
        for entry in parsed.entries:
            doc_id = getattr(entry, "id", None) or getattr(entry, "link", None)
            data = entry.get("content", [{}])[0].get(
                "value", entry.get("description", entry.get("summary", ""))
            )

            if self.html_to_text:
                import html2text

                data = html2text.html2text(data)

            extra_info = {
                "title": getattr(entry, "title", None),
                "link": getattr(entry, "link", None),
                "date": getattr(entry, "published", None),
            }

            if doc_id:
                documents.append(
                    Document(text=data, id_=doc_id, extra_info=extra_info)
                )
            else:
                documents.append(Document(text=data, extra_info=extra_info))

    return documents

RSS新闻阅读器 #

基类:EventBaseReader

RSS新闻阅读器。

从RSS订阅源读取新闻内容,并使用NewsArticleReader进行解析。

workflows/handler.py 中的源代码llama_index/readers/web/rss_news/base.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class RssNewsReader(BaseReader):
    """
    RSS news reader.

    Reads news content from RSS feeds and parses with NewsArticleReader.

    """

    def __init__(self, **reader_kwargs: Any) -> None:
        """
        Initialize with parameters.

        Args:
            html_to_text (bool): Whether to convert HTML to text.
                Requires `html2text` package.

        """
        try:
            import feedparser  # noqa: F401
        except ImportError:
            raise ImportError(
                "`feedparser` package not found, please run `pip install feedparser`"
            )

        try:
            import listparser  # noqa: F401
        except ImportError:
            raise ImportError(
                "`listparser` package not found, please run `pip install listparser`"
            )

        self.reader_kwargs = reader_kwargs

    def load_data(self, urls: List[str] = None, opml: str = None) -> List[Document]:
        """
        Load data from either RSS feeds or OPML.

        Args:
            urls (List[str]): List of RSS URLs to load.
            opml (str): URL to OPML file or string or byte OPML content.

        Returns:
            List[Document]: List of documents.

        """
        if (urls is None) == (
            opml is None
        ):  # This is True if both are None or neither is None
            raise ValueError(
                "Provide either the urls or the opml argument, but not both."
            )

        import feedparser

        if urls and not isinstance(urls, list):
            raise ValueError("urls must be a list of strings.")

        documents = []

        if not urls and opml:
            try:
                import listparser
            except ImportError as e:
                raise ImportError(
                    "Package listparser must be installed if the opml arg is used. "
                    "Please install with 'pip install listparser' or use the "
                    "urls arg instead."
                ) from e
            rss = listparser.parse(opml)
            urls = [feed.url for feed in rss.feeds]

        for url in urls:
            try:
                feed = feedparser.parse(url)
                for i, entry in enumerate(feed.entries):
                    article = NewsArticleReader(**self.reader_kwargs).load_data(
                        urls=[entry.link],
                    )[0]
                    article.metadata["feed"] = url

                    documents.append(
                        Document(text=article.text, metadata=article.metadata)
                    )

            except Exception as e:
                logger.error(f"Error fetching or processing {url}, exception: {e}")
                continue

        return documents

load_data #

load_data(urls: List[str] = None, opml: str = None) -> List[文档]

从RSS订阅源或OPML文件加载数据。

参数:

名称 类型 描述 默认
urls List[str]

要加载的RSS网址列表。

None
opml str

OPML文件的URL或字符串或字节形式的OPML内容。

None

返回:

类型 描述
List[文档]

List[Document]: 文档列表。

workflows/handler.py 中的源代码llama_index/readers/web/rss_news/base.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def load_data(self, urls: List[str] = None, opml: str = None) -> List[Document]:
    """
    Load data from either RSS feeds or OPML.

    Args:
        urls (List[str]): List of RSS URLs to load.
        opml (str): URL to OPML file or string or byte OPML content.

    Returns:
        List[Document]: List of documents.

    """
    if (urls is None) == (
        opml is None
    ):  # This is True if both are None or neither is None
        raise ValueError(
            "Provide either the urls or the opml argument, but not both."
        )

    import feedparser

    if urls and not isinstance(urls, list):
        raise ValueError("urls must be a list of strings.")

    documents = []

    if not urls and opml:
        try:
            import listparser
        except ImportError as e:
            raise ImportError(
                "Package listparser must be installed if the opml arg is used. "
                "Please install with 'pip install listparser' or use the "
                "urls arg instead."
            ) from e
        rss = listparser.parse(opml)
        urls = [feed.url for feed in rss.feeds]

    for url in urls:
        try:
            feed = feedparser.parse(url)
            for i, entry in enumerate(feed.entries):
                article = NewsArticleReader(**self.reader_kwargs).load_data(
                    urls=[entry.link],
                )[0]
                article.metadata["feed"] = url

                documents.append(
                    Document(text=article.text, metadata=article.metadata)
                )

        except Exception as e:
            logger.error(f"Error fetching or processing {url}, exception: {e}")
            continue

    return documents

Scrapfly阅读器 #

基类:EventBasePydanticReader

使用 Scrapfly.io 将网址转换为可供大语言模型访问的 Markdown 格式。

参数: api_key: Scrapfly API密钥。 scrape_config: Scrapfly ScrapeConfig对象。 ignore_scrape_failures: 是否在失败时继续执行。 urls: 要抓取的URL列表。 scrape_format: 抓取结果格式(markdown或text) 更多详细信息,请访问:https://scrapfly.io/docs/sdk/python

workflows/handler.py 中的源代码llama_index/readers/web/scrapfly_web/base.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class ScrapflyReader(BasePydanticReader):
    """
    Turn a url to llm accessible markdown with `Scrapfly.io`.

    Args:
    api_key: The Scrapfly API key.
    scrape_config: The Scrapfly ScrapeConfig object.
    ignore_scrape_failures: Whether to continue on failures.
    urls: List of urls to scrape.
    scrape_format: Scrape result format (markdown or text)
    For further details, visit: https://scrapfly.io/docs/sdk/python

    """

    api_key: str
    ignore_scrape_failures: bool = True
    scrapfly: "ScrapflyClient"

    def __init__(self, api_key: str, ignore_scrape_failures: bool = True) -> None:
        """Initialize client."""
        try:
            from scrapfly import ScrapflyClient
        except ImportError:
            raise ImportError(
                "`scrapfly` package not found, please run `pip install scrapfly-sdk`"
            )
        scrapfly = ScrapflyClient(key=api_key)
        super().__init__(
            api_key=api_key,
            ignore_scrape_failures=ignore_scrape_failures,
            scrapfly=scrapfly,
        )

    @classmethod
    def class_name(cls) -> str:
        return "Scrapfly_reader"

    def load_data(
        self,
        urls: List[str],
        scrape_format: Literal["markdown", "text"] = "markdown",
        scrape_config: Optional[dict] = None,
    ) -> List[Document]:
        """
        Load data from the urls.

        Args:
            urls: List[str]): List of URLs to scrape.
            scrape_config: Optional[dict]: Dictionary of ScrapFly scrape config object.

        Returns:
            List[Document]: List of documents.

        Raises:
            ValueError: If URLs aren't provided.

        """
        from scrapfly import ScrapeApiResponse, ScrapeConfig

        if urls is None:
            raise ValueError("URLs must be provided.")
        scrape_config = scrape_config if scrape_config is not None else {}

        documents = []
        for url in urls:
            try:
                response: ScrapeApiResponse = self.scrapfly.scrape(
                    ScrapeConfig(url, format=scrape_format, **scrape_config)
                )
                documents.append(
                    Document(
                        text=response.scrape_result["content"], extra_info={"url": url}
                    )
                )
            except Exception as e:
                if self.ignore_scrape_failures:
                    logger.error(f"Error fetching data from {url}, exception: {e}")
                else:
                    raise e  # noqa: TRY201

        return documents

load_data #

load_data(urls: List[str], scrape_format: Literal['markdown', 'text'] = 'markdown', scrape_config: Optional[dict] = None) -> List[文档]

从网址加载数据。

参数:

名称 类型 描述 默认
urls List[str]

List[str]): 要抓取的URL列表。

required
scrape_config Optional[dict]

Optional[dict]: ScrapFly 抓取配置对象的字典。

None

返回:

类型 描述
List[文档]

List[Document]: 文档列表。

引发:

类型 描述
ValueError

如果未提供URL。

workflows/handler.py 中的源代码llama_index/readers/web/scrapfly_web/base.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def load_data(
    self,
    urls: List[str],
    scrape_format: Literal["markdown", "text"] = "markdown",
    scrape_config: Optional[dict] = None,
) -> List[Document]:
    """
    Load data from the urls.

    Args:
        urls: List[str]): List of URLs to scrape.
        scrape_config: Optional[dict]: Dictionary of ScrapFly scrape config object.

    Returns:
        List[Document]: List of documents.

    Raises:
        ValueError: If URLs aren't provided.

    """
    from scrapfly import ScrapeApiResponse, ScrapeConfig

    if urls is None:
        raise ValueError("URLs must be provided.")
    scrape_config = scrape_config if scrape_config is not None else {}

    documents = []
    for url in urls:
        try:
            response: ScrapeApiResponse = self.scrapfly.scrape(
                ScrapeConfig(url, format=scrape_format, **scrape_config)
            )
            documents.append(
                Document(
                    text=response.scrape_result["content"], extra_info={"url": url}
                )
            )
        except Exception as e:
            if self.ignore_scrape_failures:
                logger.error(f"Error fetching data from {url}, exception: {e}")
            else:
                raise e  # noqa: TRY201

    return documents

Scrapy网页阅读器 #

基类:EventBasePydanticReader

Scrapy 网页阅读器。

从网页读取页面。

参数:

名称 类型 描述 默认
project_path Optional[str]

用于加载项目设置(包含中间件和管道)的Scrapy项目路径。 项目路径应包含 scrapy.cfg 文件。 如果未指定路径或未找到路径,设置将置为空。 默认为 ""。

''
metadata_keys Optional[List[str]]

用作从抓取项目中提取文档元数据的键列表。默认为 []。

[]
keep_keys bool

是否在项目中保留元数据键。 默认为 False。

False
workflows/handler.py 中的源代码llama_index/readers/web/scrapy_web/base.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class ScrapyWebReader(BasePydanticReader):
    """
    Scrapy web page reader.

    Reads pages from the web.

    Args:
        project_path (Optional[str]): The path to the Scrapy project for
            loading the project settings (with middlewares and pipelines).
            The project path should contain the `scrapy.cfg` file.
            Settings will be set to empty if path not specified or not found.
            Defaults to "".

        metadata_keys (Optional[List[str]]): List of keys to use
            as document metadata from the scraped item. Defaults to [].

        keep_keys (bool): Whether to keep metadata keys in items.
            Defaults to False.

    """

    project_path: Optional[str] = ""
    metadata_keys: Optional[List[str]] = []
    keep_keys: bool = False

    def __init__(
        self,
        project_path: Optional[str] = "",
        metadata_keys: Optional[List[str]] = [],
        keep_keys: bool = False,
    ):
        super().__init__(
            project_path=project_path,
            metadata_keys=metadata_keys,
            keep_keys=keep_keys,
        )

    @classmethod
    def class_name(cls) -> str:
        return "ScrapyWebReader"

    def load_data(self, spider: Union[Spider, str]) -> List[Document]:
        """
        Load data from the input spider.

        Args:
            spider (Union[Spider, str]): The Scrapy spider class or
                the spider name from the project to use for scraping.

        Returns:
            List[Document]: List of documents extracted from the web pages.

        """
        if not self._is_spider_correct_type(spider):
            raise ValueError(
                "Invalid spider type. Provide a Spider class or spider name with project path."
            )

        documents_queue = Queue()

        config = {
            "keep_keys": self.keep_keys,
            "metadata_keys": self.metadata_keys,
            "settings": load_scrapy_settings(self.project_path),
        }

        # Running each spider in a separate process as Scrapy uses
        # twisted reactor which can only be run once in a process
        process = Process(
            target=run_spider_process, args=(spider, documents_queue, config)
        )

        process.start()
        process.join()

        if documents_queue.empty():
            return []

        return documents_queue.get()

    def _is_spider_correct_type(self, spider: Union[Spider, str]) -> bool:
        return not (isinstance(spider, str) and not self.project_path)

load_data #

load_data(spider: Union[Spider, str]) -> List[文档]

从输入蜘蛛加载数据。

参数:

名称 类型 描述 默认
spider Union[Spider, str]

用于爬取的Scrapy爬虫类或项目中的爬虫名称。

required

返回:

类型 描述
List[文档]

List[Document]: 从网页中提取的文档列表。

workflows/handler.py 中的源代码llama_index/readers/web/scrapy_web/base.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def load_data(self, spider: Union[Spider, str]) -> List[Document]:
    """
    Load data from the input spider.

    Args:
        spider (Union[Spider, str]): The Scrapy spider class or
            the spider name from the project to use for scraping.

    Returns:
        List[Document]: List of documents extracted from the web pages.

    """
    if not self._is_spider_correct_type(spider):
        raise ValueError(
            "Invalid spider type. Provide a Spider class or spider name with project path."
        )

    documents_queue = Queue()

    config = {
        "keep_keys": self.keep_keys,
        "metadata_keys": self.metadata_keys,
        "settings": load_scrapy_settings(self.project_path),
    }

    # Running each spider in a separate process as Scrapy uses
    # twisted reactor which can only be run once in a process
    process = Process(
        target=run_spider_process, args=(spider, documents_queue, config)
    )

    process.start()
    process.join()

    if documents_queue.empty():
        return []

    return documents_queue.get()

简易网页阅读器 #

基类:EventBasePydanticReader

简易网页阅读器。

从网页读取页面。

参数:

名称 类型 描述 默认
html_to_text bool

是否将HTML转换为文本。 需要 html2text 包。

False
metadata_fn Optional[Callable[[str], Dict]]

一个接收URL并返回元数据字典的函数。 默认为None。

None
workflows/handler.py 中的源代码llama_index/readers/web/simple_web/base.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class SimpleWebPageReader(BasePydanticReader):
    """
    Simple web page reader.

    Reads pages from the web.

    Args:
        html_to_text (bool): Whether to convert HTML to text.
            Requires `html2text` package.
        metadata_fn (Optional[Callable[[str], Dict]]): A function that takes in
            a URL and returns a dictionary of metadata.
            Default is None.

    """

    is_remote: bool = True
    html_to_text: bool

    _metadata_fn: Optional[Callable[[str], Dict]] = PrivateAttr()
    _timeout: Optional[int] = PrivateAttr()
    _fail_on_error: bool = PrivateAttr()

    def __init__(
        self,
        html_to_text: bool = False,
        metadata_fn: Optional[Callable[[str], Dict]] = None,
        timeout: Optional[int] = 60,
        fail_on_error: bool = False,
    ) -> None:
        """Initialize with parameters."""
        try:
            import html2text  # noqa
        except ImportError:
            raise ImportError(
                "`html2text` package not found, please run `pip install html2text`"
            )
        super().__init__(html_to_text=html_to_text)
        self._metadata_fn = metadata_fn
        self._timeout = timeout
        self._fail_on_error = fail_on_error

    @classmethod
    def class_name(cls) -> str:
        return "SimpleWebPageReader"

    def load_data(self, urls: List[str]) -> List[Document]:
        """
        Load data from the input directory.

        Args:
            urls (List[str]): List of URLs to scrape.

        Returns:
            List[Document]: List of documents.

        """
        if not isinstance(urls, list):
            raise ValueError("urls must be a list of strings.")
        documents = []
        for url in urls:
            try:
                response = requests.get(url, headers=None, timeout=self._timeout)
            except Exception:
                if self._fail_on_error:
                    raise
                continue

            response_text = response.text

            if response.status_code != 200 and self._fail_on_error:
                raise ValueError(
                    f"Error fetching page from {url}. server returned status:"
                    f" {response.status_code} and response {response_text}"
                )

            if self.html_to_text:
                import html2text

                response_text = html2text.html2text(response_text)

            metadata: Dict = {"url": url}
            if self._metadata_fn is not None:
                metadata = self._metadata_fn(url)
                if "url" not in metadata:
                    metadata["url"] = url

            documents.append(
                Document(text=response_text, id_=str(uuid.uuid4()), metadata=metadata)
            )

        return documents

load_data #

load_data(urls: List[str]) -> List[文档]

从输入目录加载数据。

参数:

名称 类型 描述 默认
urls List[str]

要抓取的网址列表。

required

返回:

类型 描述
List[文档]

List[Document]: 文档列表。

workflows/handler.py 中的源代码llama_index/readers/web/simple_web/base.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def load_data(self, urls: List[str]) -> List[Document]:
    """
    Load data from the input directory.

    Args:
        urls (List[str]): List of URLs to scrape.

    Returns:
        List[Document]: List of documents.

    """
    if not isinstance(urls, list):
        raise ValueError("urls must be a list of strings.")
    documents = []
    for url in urls:
        try:
            response = requests.get(url, headers=None, timeout=self._timeout)
        except Exception:
            if self._fail_on_error:
                raise
            continue

        response_text = response.text

        if response.status_code != 200 and self._fail_on_error:
            raise ValueError(
                f"Error fetching page from {url}. server returned status:"
                f" {response.status_code} and response {response_text}"
            )

        if self.html_to_text:
            import html2text

            response_text = html2text.html2text(response_text)

        metadata: Dict = {"url": url}
        if self._metadata_fn is not None:
            metadata = self._metadata_fn(url)
            if "url" not in metadata:
                metadata["url"] = url

        documents.append(
            Document(text=response_text, id_=str(uuid.uuid4()), metadata=metadata)
        )

    return documents

站点地图阅读器 #

基类:EventBaseReader

用于网络的异步站点地图阅读器。

根据网站的站点地图文件读取网页内容。

参数:

名称 类型 描述 默认
sitemap_url string

站点地图 XML 文件的路径。例如:https://gpt-index.readthedocs.io/sitemap.xml

required
html_to_text bool

是否将HTML转换为文本。 需要 html2text 包。

False
limit int

最大并发请求数。

10
workflows/handler.py 中的源代码llama_index/readers/web/sitemap/base.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class SitemapReader(BaseReader):
    """
    Asynchronous sitemap reader for web.

    Reads pages from the web based on their sitemap.xml.

    Args:
        sitemap_url (string): Path to the sitemap.xml. e.g. https://gpt-index.readthedocs.io/sitemap.xml
        html_to_text (bool): Whether to convert HTML to text.
            Requires `html2text` package.
        limit (int): Maximum number of concurrent requests.

    """

    xml_schema_sitemap = "http://www.sitemaps.org/schemas/sitemap/0.9"

    def __init__(self, html_to_text: bool = False, limit: int = 10) -> None:
        """Initialize with parameters."""
        self._async_loader = AsyncWebPageReader(html_to_text=html_to_text, limit=limit)
        self._html_to_text = html_to_text
        self._limit = limit

    def _load_sitemap(self, sitemap_url: str) -> str:
        sitemap_url_request = httpx.get(sitemap_url)

        return sitemap_url_request.content

    def _parse_sitemap(self, raw_sitemap: str, filter_locs: str = None) -> list:
        sitemap = fromstring(raw_sitemap)
        sitemap_urls = []

        for url in sitemap.findall(f"{{{self.xml_schema_sitemap}}}url"):
            location = url.find(f"{{{self.xml_schema_sitemap}}}loc").text

            if filter_locs is None or filter_locs in location:
                sitemap_urls.append(location)

        return sitemap_urls

    def load_data(self, sitemap_url: str, filter: str = None) -> List[Document]:
        sitemap = self._load_sitemap(sitemap_url=sitemap_url)
        sitemap_urls = self._parse_sitemap(sitemap, filter)

        return self._async_loader.load_data(urls=sitemap_urls)

蜘蛛网阅读器 #

基类:EventBasePydanticReader

抓取URL数据并返回可供大语言模型使用的数据,使用Spider.cloud

必须安装Python包spider-client并拥有Spider API密钥。 详情请参阅https://spider.cloud。

参数:

名称 类型 描述 默认
api_key str

Spider API密钥,请在 https://spider.cloud 获取

None
mode Mode

"抓取"网址(默认)或"爬取"网址并跟踪所有子页面。

'scrape'
params dict

传递给 Spider API 的额外参数。

None
workflows/handler.py 中的源代码llama_index/readers/web/spider_web/base.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class SpiderWebReader(BasePydanticReader):
    """
    Scrapes a URL for data and returns llm-ready data with `Spider.cloud`.

    Must have the Python package `spider-client` installed and a Spider API key.
    See https://spider.cloud for more.

    Args:
        api_key (str): The Spider API key, get one at https://spider.cloud
        mode (Mode): "Scrape" the url (default) or "crawl" the url following all subpages.
        params (dict): Additional parameters to pass to the Spider API.

    """

    class Config:
        use_enum_values = True
        extra = "allow"

    def __init__(
        self,
        *,
        api_key: Optional[str] = None,
        mode: Literal["scrape", "crawl"] = "scrape",
        params: Optional[dict] = None,
    ) -> None:
        super().__init__(api_key=api_key, mode=mode, params=params)

        if params is None:
            params = {"return_format": "markdown", "metadata": True}
        try:
            from spider import Spider
        except ImportError:
            raise ImportError(
                "`spider-client` package not found, please run `pip install spider-client`"
            )
        self.spider = Spider(api_key=api_key)
        self.mode = mode
        self.params = params

    def load_data(self, url: str) -> List[Document]:
        if self.mode != "scrape" and self.mode != "crawl":
            raise ValueError(
                "Unknown mode in `mode` parameter, `scrape` or `crawl` is the allowed modes"
            )
        action = (
            self.spider.scrape_url if self.mode == "scrape" else self.spider.crawl_url
        )
        spider_docs = action(url=url, params=self.params)

        if not spider_docs:
            return [Document(page_content="", metadata={})]

        documents = []
        if isinstance(spider_docs, list):
            for doc in spider_docs:
                text = doc.get("content", "")
                if text is not None:
                    documents.append(
                        Document(
                            text=text,
                            metadata=doc.get("metadata", {}),
                        )
                    )

        return documents

Trafilatura网页阅读器 #

基类:EventBasePydanticReader

Trafilatura 网页阅读器。

从网页读取页面。 需要 trafilatura 包。

workflows/handler.py 中的源代码llama_index/readers/web/trafilatura_web/base.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class TrafilaturaWebReader(BasePydanticReader):
    """
    Trafilatura web page reader.

    Reads pages from the web.
    Requires the `trafilatura` package.

    """

    is_remote: bool = True

    @classmethod
    def class_name(cls) -> str:
        """Get the name identifier of the class."""
        return "TrafilaturaWebReader"

    def load_data(
        self,
        urls: List[str],
        include_comments=True,
        output_format="txt",
        include_tables=True,
        include_images=False,
        include_formatting=False,
        include_links=False,
        show_progress=False,
        no_ssl=False,
        **kwargs,
    ) -> List[Document]:
        """
        Load data from the urls.

        Args:
            urls (List[str]): List of URLs to scrape.
            include_comments (bool, optional): Include comments in the output. Defaults to True.
            output_format (str, optional): Output format. Defaults to 'txt'.
            include_tables (bool, optional): Include tables in the output. Defaults to True.
            include_images (bool, optional): Include images in the output. Defaults to False.
            include_formatting (bool, optional): Include formatting in the output. Defaults to False.
            include_links (bool, optional): Include links in the output. Defaults to False.
            show_progress (bool, optional): Show progress bar. Defaults to False
            no_ssl (bool, optional): Bypass SSL verification. Defaults to False.
            kwargs: Additional keyword arguments for the `trafilatura.extract` function.

        Returns:
            List[Document]: List of documents.

        """
        import trafilatura

        if not isinstance(urls, list):
            raise ValueError("urls must be a list of strings.")
        documents = []

        if show_progress:
            from tqdm import tqdm

            iterator = tqdm(urls, desc="Downloading pages")
        else:
            iterator = urls
        for url in iterator:
            downloaded = trafilatura.fetch_url(url, no_ssl=no_ssl)
            response = trafilatura.extract(
                downloaded,
                include_comments=include_comments,
                output_format=output_format,
                include_tables=include_tables,
                include_images=include_images,
                include_formatting=include_formatting,
                include_links=include_links,
                **kwargs,
            )
            documents.append(
                Document(text=response, id_=str(uuid.uuid4()), metadata={"url": url})
            )

        return documents

class_name classmethod #

class_name() -> str

获取类的名称标识符。

workflows/handler.py 中的源代码llama_index/readers/web/trafilatura_web/base.py
19
20
21
22
@classmethod
def class_name(cls) -> str:
    """Get the name identifier of the class."""
    return "TrafilaturaWebReader"

load_data #

load_data(urls: List[str], include_comments=True, output_format='txt', include_tables=True, include_images=False, include_formatting=False, include_links=False, show_progress=False, no_ssl=False, **kwargs) -> List[文档]

从网址加载数据。

参数:

名称 类型 描述 默认
urls List[str]

要抓取的网址列表。

required
include_comments bool

在输出中包含注释。默认为 True。

True
output_format str

输出格式。默认为 'txt'。

'txt'
include_tables bool

在输出中包含表格。默认为 True。

True
include_images bool

在输出中包含图像。默认为 False。

False
include_formatting bool

在输出中包含格式。默认为 False。

False
include_links bool

在输出中包含链接。默认为 False。

False
show_progress bool

显示进度条。默认为 False

False
no_ssl bool

绕过SSL验证。默认为False。

False
kwargs

trafilatura.extract 函数的附加关键字参数。

{}

返回:

类型 描述
List[文档]

List[Document]: 文档列表。

workflows/handler.py 中的源代码llama_index/readers/web/trafilatura_web/base.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def load_data(
    self,
    urls: List[str],
    include_comments=True,
    output_format="txt",
    include_tables=True,
    include_images=False,
    include_formatting=False,
    include_links=False,
    show_progress=False,
    no_ssl=False,
    **kwargs,
) -> List[Document]:
    """
    Load data from the urls.

    Args:
        urls (List[str]): List of URLs to scrape.
        include_comments (bool, optional): Include comments in the output. Defaults to True.
        output_format (str, optional): Output format. Defaults to 'txt'.
        include_tables (bool, optional): Include tables in the output. Defaults to True.
        include_images (bool, optional): Include images in the output. Defaults to False.
        include_formatting (bool, optional): Include formatting in the output. Defaults to False.
        include_links (bool, optional): Include links in the output. Defaults to False.
        show_progress (bool, optional): Show progress bar. Defaults to False
        no_ssl (bool, optional): Bypass SSL verification. Defaults to False.
        kwargs: Additional keyword arguments for the `trafilatura.extract` function.

    Returns:
        List[Document]: List of documents.

    """
    import trafilatura

    if not isinstance(urls, list):
        raise ValueError("urls must be a list of strings.")
    documents = []

    if show_progress:
        from tqdm import tqdm

        iterator = tqdm(urls, desc="Downloading pages")
    else:
        iterator = urls
    for url in iterator:
        downloaded = trafilatura.fetch_url(url, no_ssl=no_ssl)
        response = trafilatura.extract(
            downloaded,
            include_comments=include_comments,
            output_format=output_format,
            include_tables=include_tables,
            include_images=include_images,
            include_formatting=include_formatting,
            include_links=include_links,
            **kwargs,
        )
        documents.append(
            Document(text=response, id_=str(uuid.uuid4()), metadata={"url": url})
        )

    return documents

非结构化URL加载器 #

基类:EventBaseReader

使用unstructured加载HTML文件的加载器。

workflows/handler.py 中的源代码llama_index/readers/web/unstructured_web/base.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class UnstructuredURLLoader(BaseReader):
    """Loader that uses unstructured to load HTML files."""

    def __init__(
        self, urls: List[str], continue_on_failure: bool = True, headers: dict = {}
    ):
        """Initialize with file path."""
        try:
            import unstructured  # noqa:F401
            from unstructured.__version__ import __version__ as __unstructured_version__

            self.__version = __unstructured_version__
        except ImportError:
            raise ValueError(
                "unstructured package not found, please install it with "
                "`pip install unstructured`"
            )

        if not self.__is_headers_available() and len(headers.keys()) != 0:
            logger.warning(
                "You are using old version of unstructured. "
                "The headers parameter is ignored"
            )

        self.urls = urls
        self.continue_on_failure = continue_on_failure
        self.headers = headers

    def __is_headers_available(self) -> bool:
        _unstructured_version = self.__version.split("-")[0]
        unstructured_version = tuple([int(x) for x in _unstructured_version.split(".")])

        return unstructured_version >= (0, 5, 7)

    def load_data(self) -> List[Document]:
        """Load file."""
        from unstructured.partition.html import partition_html

        docs: List[Document] = []
        for url in self.urls:
            try:
                if self.__is_headers_available():
                    elements = partition_html(url=url, headers=self.headers)
                else:
                    elements = partition_html(url=url)
                text = "\n\n".join([str(el) for el in elements])
                metadata = {"source": url}
                docs.append(Document(text=text, extra_info=metadata))
            except Exception as e:
                if self.continue_on_failure:
                    logger.error(f"Error fetching or processing {url}, exception: {e}")
                else:
                    raise e  # noqa: TRY201
        return docs

load_data #

load_data() -> List[文档]

加载文件。

workflows/handler.py 中的源代码llama_index/readers/web/unstructured_web/base.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def load_data(self) -> List[Document]:
    """Load file."""
    from unstructured.partition.html import partition_html

    docs: List[Document] = []
    for url in self.urls:
        try:
            if self.__is_headers_available():
                elements = partition_html(url=url, headers=self.headers)
            else:
                elements = partition_html(url=url)
            text = "\n\n".join([str(el) for el in elements])
            metadata = {"source": url}
            docs.append(Document(text=text, extra_info=metadata))
        except Exception as e:
            if self.continue_on_failure:
                logger.error(f"Error fetching or processing {url}, exception: {e}")
            else:
                raise e  # noqa: TRY201
    return docs

全站阅读器 #

基类:EventBaseReader

用于网站的广度优先搜索网页抓取工具。

该类提供使用广度优先搜索算法抓取整个网站的功能。 它从给定的基础URL开始导航网页,跟踪符合指定前缀的链接。

属性:

名称 类型 描述
prefix str

用于聚焦抓取操作的URL前缀。

max_depth int

BFS算法的最大深度。

参数:

名称 类型 描述 默认
prefix str

用于抓取的URL前缀。

required
max_depth int

广度优先搜索的最大深度。默认为10。

10
uri_as_id bool

是否使用URI作为文档ID。默认为False。

False
workflows/handler.py 中的源代码llama_index/readers/web/whole_site/base.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class WholeSiteReader(BaseReader):
    """
    BFS Web Scraper for websites.

    This class provides functionality to scrape entire websites using a breadth-first search algorithm.
    It navigates web pages from a given base URL, following links that match a specified prefix.

    Attributes:
        prefix (str): URL prefix to focus the scraping.
        max_depth (int): Maximum depth for BFS algorithm.

    Args:
        prefix (str): URL prefix for scraping.
        max_depth (int, optional): Maximum depth for BFS. Defaults to 10.
        uri_as_id (bool, optional): Whether to use the URI as the document ID. Defaults to False.

    """

    def __init__(
        self,
        prefix: str,
        max_depth: int = 10,
        uri_as_id: bool = False,
        driver: Optional[webdriver.Chrome] = None,
    ) -> None:
        """
        Initialize the WholeSiteReader with the provided prefix and maximum depth.
        """
        self.prefix = prefix
        self.max_depth = max_depth
        self.uri_as_id = uri_as_id
        self.driver = driver if driver else self.setup_driver()

    def setup_driver(self):
        """
        Sets up the Selenium WebDriver for Chrome.

        Returns:
            WebDriver: An instance of Chrome WebDriver.

        """
        try:
            import chromedriver_autoinstaller
        except ImportError:
            raise ImportError("Please install chromedriver_autoinstaller")

        opt = webdriver.ChromeOptions()
        opt.add_argument("--start-maximized")
        chromedriver_autoinstaller.install()
        return webdriver.Chrome(options=opt)

    def clean_url(self, url):
        return url.split("#")[0]

    def restart_driver(self):
        self.driver.quit()
        self.driver = self.setup_driver()

    def extract_content(self):
        WebDriverWait(self.driver, 10).until(
            EC.presence_of_element_located((By.TAG_NAME, "body"))
        )
        body_element = self.driver.find_element(By.TAG_NAME, "body")
        return body_element.text.strip()

    def extract_links(self):
        js_script = """
            var links = [];
            var elements = document.getElementsByTagName('a');
            for (var i = 0; i < elements.length; i++) {
                var href = elements[i].href;
                if (href) {
                    links.push(href);
                }
            }
            return links;
            """
        return self.driver.execute_script(js_script)

    def load_data(self, base_url: str) -> List[Document]:
        """
        Load data from the base URL using BFS algorithm.

        Args:
            base_url (str): Base URL to start scraping.


        Returns:
            List[Document]: List of scraped documents.

        """
        added_urls = set()
        urls_to_visit = [(base_url, 0)]
        documents = []

        while urls_to_visit:
            current_url, depth = urls_to_visit.pop(0)
            print(f"Visiting: {current_url}, {len(urls_to_visit)} left")

            try:
                self.driver.get(current_url)
                page_content = self.extract_content()
                added_urls.add(current_url)

                next_depth = depth + 1
                if next_depth <= self.max_depth:
                    # links = self.driver.find_elements(By.TAG_NAME, 'a')
                    links = self.extract_links()
                    # clean all urls
                    links = [self.clean_url(link) for link in links]
                    # extract new links
                    links = [link for link in links if link not in added_urls]
                    print(f"Found {len(links)} new potential links")

                    for href in links:
                        try:
                            if href.startswith(self.prefix) and href not in added_urls:
                                urls_to_visit.append((href, next_depth))
                                added_urls.add(href)
                        except Exception:
                            continue

                doc = Document(text=page_content, extra_info={"URL": current_url})
                if self.uri_as_id:
                    warnings.warn(
                        "Setting the URI as the id of the document might break the code execution downstream and should be avoided."
                    )
                    doc.id_ = current_url
                documents.append(doc)
                time.sleep(1)

            except WebDriverException:
                print("WebDriverException encountered, restarting driver...")
                self.restart_driver()
            except Exception as e:
                print(f"An unexpected exception occurred: {e}, skipping URL...")
                continue

        self.driver.quit()
        return documents

setup_driver #

setup_driver()

设置用于Chrome的Selenium WebDriver。

返回:

名称 类型 描述
WebDriver

Chrome WebDriver 的一个实例。

workflows/handler.py 中的源代码llama_index/readers/web/whole_site/base.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def setup_driver(self):
    """
    Sets up the Selenium WebDriver for Chrome.

    Returns:
        WebDriver: An instance of Chrome WebDriver.

    """
    try:
        import chromedriver_autoinstaller
    except ImportError:
        raise ImportError("Please install chromedriver_autoinstaller")

    opt = webdriver.ChromeOptions()
    opt.add_argument("--start-maximized")
    chromedriver_autoinstaller.install()
    return webdriver.Chrome(options=opt)

load_data #

load_data(base_url: str) -> List[文档]

使用广度优先搜索算法从基础URL加载数据。

参数:

名称 类型 描述 默认
base_url str

开始抓取的基础URL。

required

返回:

类型 描述
List[文档]

List[Document]: 已抓取文档的列表。

workflows/handler.py 中的源代码llama_index/readers/web/whole_site/base.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def load_data(self, base_url: str) -> List[Document]:
    """
    Load data from the base URL using BFS algorithm.

    Args:
        base_url (str): Base URL to start scraping.


    Returns:
        List[Document]: List of scraped documents.

    """
    added_urls = set()
    urls_to_visit = [(base_url, 0)]
    documents = []

    while urls_to_visit:
        current_url, depth = urls_to_visit.pop(0)
        print(f"Visiting: {current_url}, {len(urls_to_visit)} left")

        try:
            self.driver.get(current_url)
            page_content = self.extract_content()
            added_urls.add(current_url)

            next_depth = depth + 1
            if next_depth <= self.max_depth:
                # links = self.driver.find_elements(By.TAG_NAME, 'a')
                links = self.extract_links()
                # clean all urls
                links = [self.clean_url(link) for link in links]
                # extract new links
                links = [link for link in links if link not in added_urls]
                print(f"Found {len(links)} new potential links")

                for href in links:
                    try:
                        if href.startswith(self.prefix) and href not in added_urls:
                            urls_to_visit.append((href, next_depth))
                            added_urls.add(href)
                    except Exception:
                        continue

            doc = Document(text=page_content, extra_info={"URL": current_url})
            if self.uri_as_id:
                warnings.warn(
                    "Setting the URI as the id of the document might break the code execution downstream and should be avoided."
                )
                doc.id_ = current_url
            documents.append(doc)
            time.sleep(1)

        except WebDriverException:
            print("WebDriverException encountered, restarting driver...")
            self.restart_driver()
        except Exception as e:
            print(f"An unexpected exception occurred: {e}, skipping URL...")
            continue

    self.driver.quit()
    return documents

Zyte网页阅读器 #

基类:EventBasePydanticReader

使用 Zyte api 从URL加载文本。

参数:

名称 类型 描述 默认
api_key str

Zyte API 密钥。

required
mode Literal['article', 'html', 'html-text']

确定如何提取文本作为页面内容。 它可以采用以下值之一:'html', 'html-text', 'article'

'article'
n_conn int

这是要使用的最大并发请求数。

15
**download_kwargs Optional[Dict[str, Any]]

任何要传递的额外下载参数。 参见:https://docs.zyte.com/zyte-api/usage/reference.html

None
示例

.. 代码块:: python

from llama_index.readers.web import ZyteWebReader

reader = ZyteWebReader(
   api_key="ZYTE_API_KEY",
)
docs = reader.load_data(
    urls=["<url-1>", "<url-2>"],
)
Zyte-API 参考文档

https://www.zyte.com/zyte-api/

workflows/handler.py 中的源代码llama_index/readers/web/zyte_web/base.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class ZyteWebReader(BasePydanticReader):
    """
    Load text from URLs using `Zyte api`.

    Args:
        api_key: Zyte API key.
        mode: Determines how the text is extracted for the page content.
            It can take one of the following values: 'html', 'html-text', 'article'
        n_conn: It is the maximum number of concurrent requests to use.
        **download_kwargs: Any additional download arguments to pass for download.
            See: https://docs.zyte.com/zyte-api/usage/reference.html

    Example:
        .. code-block:: python

            from llama_index.readers.web import ZyteWebReader

            reader = ZyteWebReader(
               api_key="ZYTE_API_KEY",
            )
            docs = reader.load_data(
                urls=["<url-1>", "<url-2>"],
            )

    Zyte-API reference:
        https://www.zyte.com/zyte-api/

    """

    client_async: Optional[object] = Field(None)
    api_key: str
    mode: str
    n_conn: int
    download_kwargs: Optional[dict]
    continue_on_failure: bool

    def __init__(
        self,
        api_key: str,
        mode: Literal["article", "html", "html-text"] = "article",
        n_conn: int = 15,
        download_kwargs: Optional[Dict[str, Any]] = None,
        continue_on_failure: bool = True,
    ) -> None:
        """Initialize with file path."""
        super().__init__(
            api_key=api_key,
            mode=mode,
            n_conn=n_conn,
            download_kwargs=download_kwargs,
            continue_on_failure=continue_on_failure,
        )
        try:
            from zyte_api import AsyncZyteAPI
            from zyte_api.utils import USER_AGENT as PYTHON_ZYTE_API_USER_AGENT

        except ImportError:
            raise ImportError(
                "zyte-api package not found, please install it with "
                "`pip install zyte-api`"
            )
        if mode not in ("article", "html", "html-text"):
            raise ValueError(
                f"Unrecognized mode '{mode}'. Expected one of "
                f"'article', 'html', 'html-text'."
            )

        user_agent = f"llama-index-zyte-api/{PYTHON_ZYTE_API_USER_AGENT}"
        self.client_async = AsyncZyteAPI(
            api_key=api_key, user_agent=user_agent, n_conn=n_conn
        )

    @classmethod
    def class_name(cls) -> str:
        return "ZyteWebReader"

    def _zyte_html_option(self) -> str:
        if self.download_kwargs and "browserHtml" in self.download_kwargs:
            return "browserHtml"
        return "httpResponseBody"

    def _get_article(self, page: Dict) -> str:
        headline = page["article"].get("headline", "")
        article_body = page["article"].get("articleBody", "")
        return headline + "\n\n" + article_body

    def _zyte_request_params(self, url: str) -> dict:
        request_params: Dict[str, Any] = {"url": url}
        if self.mode == "article":
            request_params.update({"article": True})

        if self.mode in ("html", "html-text"):
            request_params.update({self._zyte_html_option(): True})

        if self.download_kwargs:
            request_params.update(self.download_kwargs)
        return request_params

    async def fetch_items(self, urls) -> List:
        results = []
        queries = [self._zyte_request_params(url) for url in urls]
        async with self.client_async.session() as session:
            for i, future in enumerate(session.iter(queries)):
                try:
                    result = await future
                    results.append(result)
                except Exception as e:
                    url = queries[i]["url"]
                    if self.continue_on_failure:
                        logger.warning(
                            f"Error {e} while fetching url {url}, "
                            f"skipping because continue_on_failure is True"
                        )
                        continue
                    else:
                        logger.exception(
                            f"Error fetching {url} and aborting, use "
                            f"continue_on_failure=True to continue loading "
                            f"urls after encountering an error."
                        )
                        raise
        return results

    def _get_content(self, response: Dict) -> str:
        if self.mode == "html-text":
            try:
                from html2text import html2text

            except ImportError:
                raise ImportError(
                    "html2text package not found, please install it with "
                    "`pip install html2text`"
                )
        if self.mode in ("html", "html-text"):
            content = response[self._zyte_html_option()]

            if self._zyte_html_option() == "httpResponseBody":
                content = b64decode(content).decode()

            if self.mode == "html-text":
                content = html2text(content)
        elif self.mode == "article":
            content = self._get_article(response)
        return content

    def load_data(self, urls) -> List[Document]:
        docs = []
        responses = asyncio.run(self.fetch_items(urls))
        for response in responses:
            content = self._get_content(response)
            doc = Document(text=content, metadata={"url": response["url"]})
            docs.append(doc)
        return docs

ZenRows网页阅读器 #

基类:EventBasePydanticReader

ZenRows 网页阅读器。

使用 ZenRows 通用爬虫 API 读取网页,具备以下高级功能: - 动态内容的 JavaScript 渲染 - 反机器人绕过 - 支持地理位置的高级住宅代理 - 自定义请求头和会话管理 - 使用 CSS 选择器进行高级数据提取 - 多种输出格式(HTML、Markdown、文本、PDF) - 截图功能

参数:

名称 类型 描述 默认
api_key str

ZenRows API密钥。请访问 https://app.zenrows.com/register 获取

required
js_render Optional[bool]

启用无头浏览器以支持 JavaScript 渲染。默认值为 False。

required
js_instructions Optional[str]

在页面上执行自定义 JavaScript 以与元素进行交互。

required
premium_proxy Optional[bool]

使用住宅IP绕过反机器人保护。默认值为False。

required
proxy_country Optional[str]

设置用于请求的IP所在国家(需要高级代理)。

required
session_id Optional[int]

为多个请求保持相同的IP地址,最长可达10分钟。

required
custom_headers Optional[Dict[str, str]]

在您的请求中包含自定义标头以模拟浏览器行为。

required
wait_for Optional[str]

等待特定的CSS选择器出现在DOM中后再返回内容。

required
wait Optional[int]

页面加载后等待固定的毫秒数。

required
block_resources Optional[str]

阻止特定资源(图像、字体等)加载。

required
response_type Optional[Literal['markdown', 'plaintext', 'pdf']]

将HTML转换为其他格式。

required
css_extractor Optional[str]

使用CSS选择器提取特定元素(JSON格式)。

required
autoparse Optional[bool]

自动从HTML中提取结构化数据。默认为False。

required
screenshot Optional[str]

捕获页面首屏截图。

required
screenshot_fullpage Optional[str]

捕获整页截图。

required
screenshot_selector Optional[str]

使用CSS选择器捕获特定元素的屏幕截图。

required
original_status Optional[bool]

返回目标页面的原始HTTP状态码。默认为False。

required
allowed_status_codes Optional[str]

即使目标页面因指定状态码而失败,仍返回内容。

required
json_response Optional[bool]

以JSON格式捕获网络请求。默认为False。

required
screenshot_format Optional[Literal['png', 'jpeg']]

在截图时选择 png 或 jpeg 格式。

required
screenshot_quality Optional[int]

对于JPEG格式,请将质量设置为1到100之间。

required
outputs Optional[str]

指定要从抓取的HTML中提取哪些数据类型。

required
workflows/handler.py 中的源代码llama_index/readers/web/zenrows_web/base.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
class ZenRowsWebReader(BasePydanticReader):
    """
    ZenRows Web Reader.

    Read web pages using ZenRows Universal Scraper API with advanced features like:
    - JavaScript rendering for dynamic content
    - Anti-bot bypass
    - Premium residential proxies with geo-location
    - Custom headers and session management
    - Advanced data extraction with CSS selectors
    - Multiple output formats (HTML, Markdown, Text, PDF)
    - Screenshot capabilities

    Args:
        api_key (str): ZenRows API key. Get one at https://app.zenrows.com/register
        js_render (Optional[bool]): Enable JavaScript rendering with a headless browser. Default False.
        js_instructions (Optional[str]): Execute custom JavaScript on the page to interact with elements.
        premium_proxy (Optional[bool]): Use residential IPs to bypass anti-bot protection. Default False.
        proxy_country (Optional[str]): Set the country of the IP used for the request (requires Premium Proxies).
        session_id (Optional[int]): Maintain the same IP for multiple requests for up to 10 minutes.
        custom_headers (Optional[Dict[str, str]]): Include custom headers in your request to mimic browser behavior.
        wait_for (Optional[str]): Wait for a specific CSS Selector to appear in the DOM before returning content.
        wait (Optional[int]): Wait a fixed amount of milliseconds after page load.
        block_resources (Optional[str]): Block specific resources (images, fonts, etc.) from loading.
        response_type (Optional[Literal["markdown", "plaintext", "pdf"]]): Convert HTML to other formats.
        css_extractor (Optional[str]): Extract specific elements using CSS selectors (JSON format).
        autoparse (Optional[bool]): Automatically extract structured data from HTML. Default False.
        screenshot (Optional[str]): Capture an above-the-fold screenshot of the page.
        screenshot_fullpage (Optional[str]): Capture a full-page screenshot.
        screenshot_selector (Optional[str]): Capture a screenshot of a specific element using CSS Selector.
        original_status (Optional[bool]): Return the original HTTP status code from the target page. Default False.
        allowed_status_codes (Optional[str]): Returns content even if target page fails with specified status codes.
        json_response (Optional[bool]): Capture network requests in JSON format. Default False.
        screenshot_format (Optional[Literal["png", "jpeg"]]): Choose between png and jpeg formats for screenshots.
        screenshot_quality (Optional[int]): For JPEG format, set quality from 1 to 100.
        outputs (Optional[str]): Specify which data types to extract from the scraped HTML.

    """

    is_remote: bool = True
    api_key: str = Field(description="ZenRows API key")
    js_render: Optional[bool] = Field(
        default=False,
        description="Enable JavaScript rendering with a headless browser. Essential for modern web apps, SPAs, and sites with dynamic content.",
    )
    js_instructions: Optional[str] = Field(
        default=None,
        description="Execute custom JavaScript on the page to interact with elements, scroll, click buttons, or manipulate content.",
    )
    premium_proxy: Optional[bool] = Field(
        default=False,
        description="Use residential IPs to bypass anti-bot protection. Essential for accessing protected sites.",
    )
    proxy_country: Optional[str] = Field(
        default=None,
        description="Set the country of the IP used for the request (requires Premium Proxies). Use for accessing geo-restricted content.",
    )
    session_id: Optional[int] = Field(
        default=None,
        description="Maintain the same IP for multiple requests for up to 10 minutes. Essential for multi-step processes.",
    )
    custom_headers: Optional[Dict[str, str]] = Field(
        default=None,
        description="Include custom headers in your request to mimic browser behavior.",
    )
    wait_for: Optional[str] = Field(
        default=None,
        description="Wait for a specific CSS Selector to appear in the DOM before returning content.",
    )
    wait: Optional[int] = Field(
        default=None, description="Wait a fixed amount of milliseconds after page load."
    )
    block_resources: Optional[str] = Field(
        default=None,
        description="Block specific resources (images, fonts, etc.) from loading to speed up scraping.",
    )
    response_type: Optional[Literal["markdown", "plaintext", "pdf"]] = Field(
        default=None,
        description="Convert HTML to other formats. Options: markdown, plaintext, pdf.",
    )
    css_extractor: Optional[str] = Field(
        default=None,
        description="Extract specific elements using CSS selectors (JSON format).",
    )
    autoparse: Optional[bool] = Field(
        default=False, description="Automatically extract structured data from HTML."
    )
    screenshot: Optional[str] = Field(
        default=None, description="Capture an above-the-fold screenshot of the page."
    )
    screenshot_fullpage: Optional[str] = Field(
        default=None, description="Capture a full-page screenshot."
    )
    screenshot_selector: Optional[str] = Field(
        default=None,
        description="Capture a screenshot of a specific element using CSS Selector.",
    )
    original_status: Optional[bool] = Field(
        default=False,
        description="Return the original HTTP status code from the target page.",
    )
    allowed_status_codes: Optional[str] = Field(
        default=None,
        description="Returns the content even if the target page fails with specified status codes.",
    )
    json_response: Optional[bool] = Field(
        default=False,
        description="Capture network requests in JSON format, including XHR or Fetch data.",
    )
    screenshot_format: Optional[Literal["png", "jpeg"]] = Field(
        default=None,
        description="Choose between png (default) and jpeg formats for screenshots.",
    )
    screenshot_quality: Optional[int] = Field(
        default=None,
        description="For JPEG format, set quality from 1 to 100.",
    )
    outputs: Optional[str] = Field(
        default=None,
        description="Specify which data types to extract from the scraped HTML.",
    )

    _base_url: str = PrivateAttr(default="https://api.zenrows.com/v1/")

    @field_validator("css_extractor")
    @classmethod
    def validate_css_extractor(cls, v):
        """Validate that css_extractor is valid JSON if provided."""
        if v is not None:
            try:
                json.loads(v)
            except json.JSONDecodeError:
                raise ValueError("css_extractor must be valid JSON")
        return v

    @field_validator("proxy_country")
    @classmethod
    def validate_proxy_country(cls, v):
        """Validate that proxy_country is a two-letter country code."""
        if v is not None and len(v) != 2:
            raise ValueError("proxy_country must be a two-letter country code")
        return v

    def __init__(self, **kwargs):
        """Initialize ZenRows Web Reader."""
        super().__init__(**kwargs)
        if not self.api_key:
            raise ValueError(
                "ZenRows API key is required. Get one at https://app.zenrows.com/register"
            )

    @classmethod
    def class_name(cls) -> str:
        """Get the name identifier of the class."""
        return "ZenRowsWebReader"

    def _prepare_request_params(
        self, url: str, extra_params: Optional[Dict] = None
    ) -> tuple[Dict[str, Any], Optional[Dict[str, str]]]:
        """Prepare request parameters for ZenRows API."""
        params = {"url": url, "apikey": self.api_key}

        # Add all configured parameters
        if self.js_render:
            params["js_render"] = self.js_render
        if self.js_instructions:
            params["js_instructions"] = self.js_instructions
        if self.premium_proxy:
            params["premium_proxy"] = self.premium_proxy
        if self.proxy_country:
            params["proxy_country"] = self.proxy_country
        if self.session_id:
            params["session_id"] = self.session_id
        if self.wait_for:
            params["wait_for"] = self.wait_for
        if self.wait:
            params["wait"] = self.wait
        if self.block_resources:
            params["block_resources"] = self.block_resources
        if self.response_type:
            params["response_type"] = self.response_type
        if self.css_extractor:
            params["css_extractor"] = self.css_extractor
        if self.autoparse:
            params["autoparse"] = self.autoparse
        if self.screenshot:
            params["screenshot"] = self.screenshot
        if self.screenshot_fullpage:
            params["screenshot_fullpage"] = self.screenshot_fullpage
        if self.screenshot_selector:
            params["screenshot_selector"] = self.screenshot_selector
        if self.original_status:
            params["original_status"] = self.original_status
        if self.allowed_status_codes:
            params["allowed_status_codes"] = self.allowed_status_codes
        if self.json_response:
            params["json_response"] = self.json_response
        if self.screenshot_format:
            params["screenshot_format"] = self.screenshot_format
        if self.screenshot_quality:
            params["screenshot_quality"] = self.screenshot_quality
        if self.outputs:
            params["outputs"] = self.outputs

        # Add any extra parameters for this specific request
        if extra_params:
            params.update(extra_params)

        # Auto-enable js_render for parameters that require JavaScript rendering
        js_required_params = [
            "screenshot",
            "screenshot_fullpage",
            "screenshot_selector",
            "js_instructions",
            "json_response",
            "wait",
            "wait_for",
        ]
        js_required = any(params.get(param) for param in js_required_params)

        if js_required:
            params["js_render"] = True

        # Special handling for screenshot variants
        screenshot_variants = ["screenshot_fullpage", "screenshot_selector"]
        if any(params.get(param) for param in screenshot_variants):
            params["screenshot"] = "true"

        # Auto-enable premium_proxy when proxy_country is specified
        if params.get("proxy_country"):
            params["premium_proxy"] = True

        # Handle custom headers
        request_headers = None
        if "custom_headers" in params and params["custom_headers"]:
            # Store the headers dictionary for the request
            request_headers = params["custom_headers"]
            # Set custom_headers to "true" to enable custom header support in the API
            params["custom_headers"] = "true"
        elif self.custom_headers:
            request_headers = self.custom_headers
            params["custom_headers"] = "true"
        else:
            # Remove custom_headers if not provided or empty
            params.pop("custom_headers", None)

        # Remove None values to avoid sending unnecessary parameters
        params = {k: v for k, v in params.items() if v is not None}

        return params, request_headers

    def _make_request(
        self, url: str, extra_params: Optional[Dict] = None
    ) -> requests.Response:
        """Make request to ZenRows API."""
        params, request_headers = self._prepare_request_params(url, extra_params)

        response = requests.get(
            self._base_url,
            params=params,
            headers=request_headers,
        )
        response.raise_for_status()
        return response

    def _extract_metadata(
        self, response: requests.Response, url: str
    ) -> Dict[str, Any]:
        """Extract metadata from ZenRows response."""
        metadata = {
            "source_url": url,
            "scraped_at": time.time(),
        }

        # Extract ZenRows specific headers
        if "X-Request-Cost" in response.headers:
            metadata["request_cost"] = float(response.headers["X-Request-Cost"])
        if "X-Request-Id" in response.headers:
            metadata["request_id"] = response.headers["X-Request-Id"]
        if "Zr-Final-Url" in response.headers:
            metadata["final_url"] = response.headers["Zr-Final-Url"]
        if "Concurrency-Remaining" in response.headers:
            metadata["concurrency_remaining"] = int(
                response.headers["Concurrency-Remaining"]
            )
        if "Concurrency-Limit" in response.headers:
            metadata["concurrency_limit"] = int(response.headers["Concurrency-Limit"])

        # Add response info
        metadata["status_code"] = response.status_code
        metadata["content_type"] = response.headers.get("Content-Type", "")
        metadata["content_length"] = len(response.content)

        # Add scraping configuration used
        metadata["zenrows_config"] = {
            "js_render": self.js_render,
            "premium_proxy": self.premium_proxy,
            "proxy_country": self.proxy_country,
            "session_id": self.session_id,
            "response_type": self.response_type,
        }

        return metadata

    def _process_response_content(self, response: requests.Response) -> str:
        """Process response content based on whether it's a screenshot or not."""
        # Handle screenshot responses
        screenshot_params = ["screenshot", "screenshot_fullpage", "screenshot_selector"]
        if any(getattr(self, param, None) for param in screenshot_params):
            return response.content

        # For all other responses, return text
        return response.text

    def load_data(
        self, urls: Union[str, List[str]], extra_params: Optional[Dict] = None, **kwargs
    ) -> List[Document]:
        """
        Load data from URLs using ZenRows API.

        Args:
            urls: Single URL string or list of URLs to scrape
            extra_params: Additional parameters for this specific request
            **kwargs: Additional keyword arguments (for compatibility)

        Returns:
            List of Document objects containing scraped content and metadata

        """
        if isinstance(urls, str):
            urls = [urls]

        documents = []

        for url in urls:
            try:
                response = self._make_request(url, extra_params)
                content = self._process_response_content(response)
                metadata = self._extract_metadata(response, url)

                # Create document
                document = Document(
                    text=content,
                    metadata=metadata,
                )
                documents.append(document)

            except Exception as e:
                # Create error document for failed URLs
                error_metadata = {
                    "source_url": url,
                    "error": str(e),
                    "scraped_at": time.time(),
                    "status": "failed",
                }
                error_document = Document(
                    text=f"Error scraping {url}: {e!s}",
                    metadata=error_metadata,
                )
                documents.append(error_document)

        return documents

validate_css_extractor classmethod #

validate_css_extractor(v)

如果提供了 css_extractor,请验证其是否为有效的 JSON。

workflows/handler.py 中的源代码llama_index/readers/web/zenrows_web/base.py
137
138
139
140
141
142
143
144
145
146
@field_validator("css_extractor")
@classmethod
def validate_css_extractor(cls, v):
    """Validate that css_extractor is valid JSON if provided."""
    if v is not None:
        try:
            json.loads(v)
        except json.JSONDecodeError:
            raise ValueError("css_extractor must be valid JSON")
    return v

validate_proxy_country classmethod #

validate_proxy_country(v)

验证 proxy_country 是否为两位字母的国家代码。

workflows/handler.py 中的源代码llama_index/readers/web/zenrows_web/base.py
148
149
150
151
152
153
154
@field_validator("proxy_country")
@classmethod
def validate_proxy_country(cls, v):
    """Validate that proxy_country is a two-letter country code."""
    if v is not None and len(v) != 2:
        raise ValueError("proxy_country must be a two-letter country code")
    return v

class_name classmethod #

class_name() -> str

获取类的名称标识符。

workflows/handler.py 中的源代码llama_index/readers/web/zenrows_web/base.py
164
165
166
167
@classmethod
def class_name(cls) -> str:
    """Get the name identifier of the class."""
    return "ZenRowsWebReader"

load_data #

load_data(urls: Union[str, List[str]], extra_params: Optional[Dict] = None, **kwargs) -> List[文档]

使用 ZenRows API 从 URL 加载数据。

参数:

名称 类型 描述 默认
urls Union[str, List[str]]

单个URL字符串或要抓取的URL列表

required
extra_params Optional[Dict]

此特定请求的附加参数

None
**kwargs

额外的关键字参数(用于兼容性)

{}

返回:

类型 描述
List[文档]

包含抓取内容和元数据的文档对象列表

workflows/handler.py 中的源代码llama_index/readers/web/zenrows_web/base.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def load_data(
    self, urls: Union[str, List[str]], extra_params: Optional[Dict] = None, **kwargs
) -> List[Document]:
    """
    Load data from URLs using ZenRows API.

    Args:
        urls: Single URL string or list of URLs to scrape
        extra_params: Additional parameters for this specific request
        **kwargs: Additional keyword arguments (for compatibility)

    Returns:
        List of Document objects containing scraped content and metadata

    """
    if isinstance(urls, str):
        urls = [urls]

    documents = []

    for url in urls:
        try:
            response = self._make_request(url, extra_params)
            content = self._process_response_content(response)
            metadata = self._extract_metadata(response, url)

            # Create document
            document = Document(
                text=content,
                metadata=metadata,
            )
            documents.append(document)

        except Exception as e:
            # Create error document for failed URLs
            error_metadata = {
                "source_url": url,
                "error": str(e),
                "scraped_at": time.time(),
                "status": "failed",
            }
            error_document = Document(
                text=f"Error scraping {url}: {e!s}",
                metadata=error_metadata,
            )
            documents.append(error_document)

    return documents

选项: 成员:- AgentQLWebReader - AsyncWebPageReader - BeautifulSoupWebReader - BrowserbaseWebReader - FireCrawlWebReader - HyperbrowserWebReader - KnowledgeBaseWebReader - MainContentExtractorReader - NewsArticleReader - OlostepWebReader - OxylabsWebReader - ReadabilityWebPageReader - RssNewsReader - RssReader - ScrapflyReader - ScrapyWebReader - SimpleWebPageReader - SitemapReader - SpiderReader - TrafilaturaWebReader - UnstructuredURLLoader - WholeSiteReader - ZenRowsWebReader