跳转到内容

数据库

数据库读取器 #

基类:EventBaseReader

简易数据库读取器。

通过查询从数据库读取数据并返回LlamaIndex文档。 允许指定元数据列(可选择重命名)并 从文本内容中排除某些列。还可以根据行数据 生成自定义文档ID。

当与 sql_database 一起传递时,不支持 schema 参数。

如果 sql_database 对象是通过模式创建的,则将使用该模式。

参数:

名称 类型 描述 默认
sql_database Optional[SQLDatabase]

要使用的SQL数据库, 包括需要指定的表名。 更多详情请参阅 :ref:Ref-Struct-Store

None
engine Optional[Engine]

数据库连接的 SQLAlchemy 引擎对象。

None
uri Optional[str]

数据库连接的URI。

None
scheme Optional[str]

数据库连接的方案。

None
host Optional[str]

数据库连接的主机地址。

None
port Optional[int]

数据库连接的端口。

None
user Optional[str]

数据库连接的用户。

None
password Optional[str]

数据库连接的密码。

None
dbname Optional[str]

数据库连接的数据库名称。

None

返回:

名称 类型 描述
DatabaseReader

一个数据库读取器对象。

备注

schema (Optional[str]): 数据库模式 仅当在此类内部创建连接对象时生效(即当您传递 engineuri 或独立 连接参数时)。 如果您提供已构建的 SQLDatabase,将使用其内部模式(如果 存在)并忽略此参数。

连接模式#

+----------------------------+-----------+---------------------------------------+ | 模式 | 支持情况 | 备注 | +============================+===========+=======================================+ | sql_database | ✖ | 如需在此处进行模式处理,请传入预配置的 SQLDatabase | | | | | +----------------------------+-----------+---------------------------------------+ | engine + schema | ✔ | | +----------------------------+-----------+---------------------------------------+ | uri + schema | ✔ | | +----------------------------+-----------+---------------------------------------+ | scheme/host/… + | ✔ | | | schema | | | +----------------------------+-----------+---------------------------------------+

(schema = 数据库命名空间;scheme = 驱动/方言,例如 postgresql+psycopg)

workflows/handler.py 中的源代码llama_index/readers/database/base.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
class DatabaseReader(BaseReader):
    """
    Simple Database reader.

    Reads data from a database via a query and returns LlamaIndex Documents.
    Allows specifying columns for metadata (with optional renaming) and
    excluding columns from text content. Can also generate custom Document IDs
    from row data.

    Note: The `schema` parameter is not supported when passed with `sql_database`.
        If the `sql_database` object was created with a schema, it will be used.

    Args:
        sql_database (Optional[SQLDatabase]): SQL database to use,
            including table names to specify.
            See :ref:`Ref-Struct-Store` for more details.

        OR

        engine (Optional[Engine]): SQLAlchemy Engine object of the database connection.

        OR

        uri (Optional[str]): uri of the database connection.

        OR

        scheme (Optional[str]): scheme of the database connection.
        host (Optional[str]): host of the database connection.
        port (Optional[int]): port of the database connection.
        user (Optional[str]): user of the database connection.
        password (Optional[str]): password of the database connection.
        dbname (Optional[str]): dbname of the database connection.

    Returns:
        DatabaseReader: A DatabaseReader object.

    Note:
        schema (Optional[str]):
            Database schema **only honored when a connection object is created
            inside this class** (i.e. when you pass `engine`, `uri`, or individual
            connection parameters).
            If you supply an already-built `SQLDatabase`, its internal schema (if
            any) is used and this argument is ignored.

    Connection patterns
    -------------------
    +----------------------------+-----------+---------------------------------------+
    | Pattern                    | Supports  | Notes                                 |
    +============================+===========+=======================================+
    | ``sql_database``           | ✖         | Pass a pre-configured ``SQLDatabase`` |
    |                            |           | if you need schema handling here.     |
    +----------------------------+-----------+---------------------------------------+
    | ``engine`` + ``schema``    | ✔         |                                       |
    +----------------------------+-----------+---------------------------------------+
    | ``uri`` + ``schema``       | ✔         |                                       |
    +----------------------------+-----------+---------------------------------------+
    | ``scheme/host/…`` +        | ✔         |                                       |
    | ``schema``                 |           |                                       |
    +----------------------------+-----------+---------------------------------------+

    (*schema* = database namespace; *scheme* = driver/dialect, e.g. ``postgresql+psycopg``)

    """

    def __init__(
        self,
        sql_database: Optional[SQLDatabase] = None,
        engine: Optional[Engine] = None,
        uri: Optional[str] = None,
        scheme: Optional[str] = None,
        host: Optional[str] = None,
        port: Optional[str] = None,
        user: Optional[str] = None,
        password: Optional[str] = None,
        dbname: Optional[str] = None,
        schema: Optional[str] = None,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        """Initialize with parameters."""
        db_kwargs = kwargs.copy()
        if schema and not sql_database:
            db_kwargs["schema"] = schema
            self.schema = schema if schema else db_kwargs.get("schema", None)
        else:
            self.schema = None

        if sql_database:
            self.sql_database = sql_database
        elif engine:
            self.sql_database = SQLDatabase(engine, *args, **db_kwargs)
        elif uri:
            self.uri = uri
            self.sql_database = SQLDatabase.from_uri(uri, *args, **db_kwargs)
        elif scheme and host and port and user and password and dbname:
            uri = f"{scheme}://{user}:{password}@{host}:{port}/{dbname}"
            self.uri = uri
            self.sql_database = SQLDatabase.from_uri(uri, *args, **db_kwargs)
        else:
            raise ValueError(
                "You must provide either a SQLDatabase, "
                "a SQL Alchemy Engine, a valid connection URI, or a valid "
                "set of credentials."
            )

    def lazy_load_data(
        self,
        query: str,
        metadata_cols: Optional[Iterable[Union[str, Tuple[str, str]]]] = None,
        excluded_text_cols: Optional[Iterable[str]] = None,
        document_id: Optional[Callable[[Dict[str, Any]], str]] = None,
        **load_kwargs: Any,
    ) -> Generator[Document, Any, None]:
        """
        Lazily query and load data from the Database.

        Args:
            query (str): SQL query to execute.
            metadata_cols (Optional[Iterable[Union[str, Tuple[str, str]]]]):
                Iterable of column names or (db_col, meta_key) tuples to include
                in Document metadata. If str, the column name is used as key.
                If tuple, uses first element as DB column and second as metadata key.
                If two entries map to the same metadata key, the latter will silently
                overwrite the former - **avoid duplicates**.
            excluded_text_cols (Optional[Iterable[str]]): Iterable of column names to be
                excluded from Document text. Useful for metadata-only columns.
            document_id (Optional[Callable[[Dict[str, Any]], str]]): A function
                that takes a row (as a dict) and returns a string to be used as the
                Document's `id_`, this replaces the deprecated `doc_id` field.
                **MUST** return a string, falling back to auto-generated UUID.
            **load_kwargs: Additional keyword arguments (ignored).

        Yields:
            Document: A Document object for each row fetched.

        Usage Pattern for Metadata-Only Columns:
            To include `my_col` ONLY in metadata (not text), specify it in
            `metadata_cols=['my_col']` and `excluded_text_cols=['my_col']`.

        Usage Pattern for Renaming Metadata Keys:
            To include DB column `db_col_name` in metadata with the key `meta_key_name`,
            use `metadata_cols=[('db_col_name', 'meta_key_name')]`.

        """
        exclude_set: Set[str] = set(excluded_text_cols or [])
        missing_columns: Set[str] = set()
        invalid_columns: Set[str] = set()

        with self.sql_database.engine.connect() as connection:
            if not query:
                raise ValueError("A query parameter is necessary.")

            result = connection.execute(text(query))
            column_names = list(result.keys())

            for row in result:
                row_values: Dict[str, Any] = dict(zip(column_names, row))
                doc_metadata: Dict[str, Any] = {}

                # Process metadata_cols based on Union type
                if metadata_cols:
                    for item in metadata_cols:
                        db_col: str
                        meta_key: str
                        if isinstance(item, str):
                            db_col = item
                            meta_key = item
                        elif (
                            isinstance(item, tuple)
                            and len(item) == 2
                            and all(isinstance(s, str) for s in item)
                        ):
                            db_col, meta_key = item
                        elif f"{item!r}" not in invalid_columns:
                            invalid_columns.add(f"{item!r}")
                            logger.warning(
                                f"Skipping invalid item in metadata_cols: {item!r}"
                            )
                            continue
                        else:
                            continue

                        if db_col in row_values:
                            doc_metadata[meta_key] = row_values[db_col]
                        elif db_col not in row_values and db_col not in missing_columns:
                            missing_columns.add(db_col)
                            logger.warning(
                                f"Column '{db_col}' specified in metadata_cols not found in query result."
                            )

                # Prepare text content
                text_parts: List[str] = [
                    f"{col}: {val}"
                    for col, val in row_values.items()
                    if col not in exclude_set
                ]
                text_resource = MediaResource(text=", ".join(text_parts))
                params = {
                    "text_resource": text_resource,
                    "metadata": doc_metadata,
                }

                if document_id:
                    try:
                        # Ensure function receives the row data
                        id_: Optional[str] = document_id(row_values)
                        if not isinstance(id_, str):
                            logger.warning(
                                f"document_id did not return a string for row {row_values}. Got: {type(id_)}"
                            )
                        if id_ is not None:
                            params["id_"] = id_
                    except Exception as e:
                        logger.warning(
                            f"document_id failed for row {row_values}: {e}",
                            exc_info=True,
                        )

                yield Document(**params)

lazy_load_data #

lazy_load_data(query: str, metadata_cols: Optional[Iterable[Union[str, Tuple[str, str]]]] = None, excluded_text_cols: Optional[Iterable[str]] = None, document_id: Optional[Callable[[Dict[str, Any]], str]] = None, **load_kwargs: Any) -> Generator[文档, Any, None]

从数据库延迟查询和加载数据。

参数:

名称 类型 描述 默认
query str

要执行的SQL查询。

required
metadata_cols Optional[Iterable[Union[str, Tuple[str, str]]]]

要包含在文档元数据中的列名或 (数据库列, 元数据键) 元组的可迭代对象。如果是字符串,则使用列名作为键。 如果是元组,使用第一个元素作为数据库列,第二个元素作为元数据键。 如果两个条目映射到相同的元数据键,后者将静默覆盖前者 - 避免重复项

None
excluded_text_cols Optional[Iterable[str]]

要从文档文本中排除的列名可迭代对象。适用于仅包含元数据的列。

None
document_id Optional[Callable[[Dict[str, Any]], str]]

一个函数 它接收一行数据(作为字典)并返回一个字符串,用作 文档的id_,这取代了已弃用的doc_id字段。 必须返回字符串,否则回退到自动生成的UUID。

None
**load_kwargs Any

额外的关键字参数(将被忽略)。

{}

返回结果:

名称 类型 描述
Document 文档

为获取的每一行创建一个文档对象。

workflows/handler.py 中的源代码llama_index/readers/database/base.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def lazy_load_data(
    self,
    query: str,
    metadata_cols: Optional[Iterable[Union[str, Tuple[str, str]]]] = None,
    excluded_text_cols: Optional[Iterable[str]] = None,
    document_id: Optional[Callable[[Dict[str, Any]], str]] = None,
    **load_kwargs: Any,
) -> Generator[Document, Any, None]:
    """
    Lazily query and load data from the Database.

    Args:
        query (str): SQL query to execute.
        metadata_cols (Optional[Iterable[Union[str, Tuple[str, str]]]]):
            Iterable of column names or (db_col, meta_key) tuples to include
            in Document metadata. If str, the column name is used as key.
            If tuple, uses first element as DB column and second as metadata key.
            If two entries map to the same metadata key, the latter will silently
            overwrite the former - **avoid duplicates**.
        excluded_text_cols (Optional[Iterable[str]]): Iterable of column names to be
            excluded from Document text. Useful for metadata-only columns.
        document_id (Optional[Callable[[Dict[str, Any]], str]]): A function
            that takes a row (as a dict) and returns a string to be used as the
            Document's `id_`, this replaces the deprecated `doc_id` field.
            **MUST** return a string, falling back to auto-generated UUID.
        **load_kwargs: Additional keyword arguments (ignored).

    Yields:
        Document: A Document object for each row fetched.

    Usage Pattern for Metadata-Only Columns:
        To include `my_col` ONLY in metadata (not text), specify it in
        `metadata_cols=['my_col']` and `excluded_text_cols=['my_col']`.

    Usage Pattern for Renaming Metadata Keys:
        To include DB column `db_col_name` in metadata with the key `meta_key_name`,
        use `metadata_cols=[('db_col_name', 'meta_key_name')]`.

    """
    exclude_set: Set[str] = set(excluded_text_cols or [])
    missing_columns: Set[str] = set()
    invalid_columns: Set[str] = set()

    with self.sql_database.engine.connect() as connection:
        if not query:
            raise ValueError("A query parameter is necessary.")

        result = connection.execute(text(query))
        column_names = list(result.keys())

        for row in result:
            row_values: Dict[str, Any] = dict(zip(column_names, row))
            doc_metadata: Dict[str, Any] = {}

            # Process metadata_cols based on Union type
            if metadata_cols:
                for item in metadata_cols:
                    db_col: str
                    meta_key: str
                    if isinstance(item, str):
                        db_col = item
                        meta_key = item
                    elif (
                        isinstance(item, tuple)
                        and len(item) == 2
                        and all(isinstance(s, str) for s in item)
                    ):
                        db_col, meta_key = item
                    elif f"{item!r}" not in invalid_columns:
                        invalid_columns.add(f"{item!r}")
                        logger.warning(
                            f"Skipping invalid item in metadata_cols: {item!r}"
                        )
                        continue
                    else:
                        continue

                    if db_col in row_values:
                        doc_metadata[meta_key] = row_values[db_col]
                    elif db_col not in row_values and db_col not in missing_columns:
                        missing_columns.add(db_col)
                        logger.warning(
                            f"Column '{db_col}' specified in metadata_cols not found in query result."
                        )

            # Prepare text content
            text_parts: List[str] = [
                f"{col}: {val}"
                for col, val in row_values.items()
                if col not in exclude_set
            ]
            text_resource = MediaResource(text=", ".join(text_parts))
            params = {
                "text_resource": text_resource,
                "metadata": doc_metadata,
            }

            if document_id:
                try:
                    # Ensure function receives the row data
                    id_: Optional[str] = document_id(row_values)
                    if not isinstance(id_, str):
                        logger.warning(
                            f"document_id did not return a string for row {row_values}. Got: {type(id_)}"
                        )
                    if id_ is not None:
                        params["id_"] = id_
                except Exception as e:
                    logger.warning(
                        f"document_id failed for row {row_values}: {e}",
                        exc_info=True,
                    )

            yield Document(**params)

选项: 成员:- DatabaseReader