polars.read_database#

polars.read_database(
query: str | TextClause | Selectable,
connection: ConnectionOrCursor | str,
*,
iter_batches: bool = False,
batch_size: int | None = None,
schema_overrides: SchemaDict | None = None,
infer_schema_length: int | None = 100,
execute_options: dict[str, Any] | None = None,
) DataFrame | Iterator[DataFrame][source]#

将SQL查询的结果读取到DataFrame中,给定一个连接对象。

Parameters:
query

要执行的SQL查询(如果使用SQLAlchemy连接对象,这可以是合适的“Selectable”,否则预计为字符串)。

connection

一个已实例化的连接(或游标/客户端对象),可以针对其执行查询。也可以传递一个有效的ODBC连接字符串,如果它包含字符串“Driver={…}”,则会被识别为ODBC连接字符串,在这种情况下,将使用arrow-odbc包来建立连接并将Arrow原生数据返回给Polars。异步驱动程序连接也受支持,尽管目前认为这是不稳定的。

警告

目前认为异步连接的使用是不稳定的,可能会出现意外问题;如果发生这种情况,请报告它们。

iter_batches

返回一个DataFrames的迭代器,其中每个DataFrame代表查询返回的一批数据;这对于以内存高效的方式处理大型结果集非常有用。如果后端支持,此值将传递给底层的查询执行方法(请注意,非常低的值通常会导致性能不佳,因为它会导致在数据返回时多次往返数据库)。如果后端不支持更改批处理大小,则从迭代器中生成单个DataFrame。

batch_size

iter_batches为True时,指示每个批次的大小(请注意,即使iter_batches为False,您仍然可以设置此值,在这种情况下,生成的DataFrame在返回给您之前会在内部使用批处理返回进行构建。请注意,某些后端(如Snowflake)可能支持批处理操作但不允许设置显式大小;在这种情况下,您仍然会接收到批次,但它们的大小由后端决定(在这种情况下,此处设置的任何值都将被忽略)。

schema_overrides

一个将列名映射到数据类型的字典,用于覆盖从查询游标推断出的模式或由传入的Arrow数据提供的模式(取决于驱动程序/后端)。如果给定的类型可以更精确地定义(例如,如果您知道某个列可以声明为u32而不是i64),这将非常有用。

infer_schema_length

用于模式推断的最大扫描行数。如果设置为None,则可能会扫描完整数据(这可能会很慢)。此参数仅在数据作为行序列读取且未为给定列设置schema_overrides参数时适用;支持Arrow的驱动程序也会忽略此值。

execute_options

这些选项将作为kwargs传递给底层的查询执行方法。在使用ODBC字符串(使用arrow-odbc)建立的连接的情况下,这些选项将传递给read_arrow_batches_from_odbc方法。

另请参阅

read_database_uri

使用URI字符串从SQL查询创建DataFrame。

注释

  • 此函数支持广泛的本地数据库驱动程序(从本地数据库如SQLite到大型云数据库如Snowflake),以及通用库如ADBC、SQLAlchemy和各种版本的ODBC。如果后端支持直接返回Arrow数据,则将使用此功能高效地实例化DataFrame;否则,DataFrame将从行数据初始化。

  • 支持通过adbc-driver-flightsql包使用Arrow Flight SQL数据;有关使用此驱动程序的更多详细信息(实现Flight SQL的著名数据库包括Dremio和InfluxDB),请参见https://arrow.apache.org/adbc/current/driver/flight_sql.html

  • read_database_uri 函数在使用 SQLAlchemy 或 DBAPI2 连接时,可能比 read_database 明显更快,因为 connectorxadbc 优化了结果集到 Arrow 格式的转换。请注意,您可以通过调用 conn.engine.url.render_as_string(hide_password=False) 从 SQLAlchemy 引擎对象中确定连接的 URI。

  • 如果Polars必须从您的连接创建游标以执行查询,那么当查询完成时,该游标将自动关闭;然而,Polars永远不会关闭任何其他打开的连接或游标。

  • Polars 不仅能够支持关系型数据库和 SQL 查询,通过此函数还可以实现更多功能。例如,你可以从 KùzuDB 连接中加载本地图数据库结果,并结合 Cypher 查询使用,或者使用 SurrealQL 与 SurrealDB 进行交互。

示例

从针对用户提供的连接的SQL查询实例化一个DataFrame:

>>> df = pl.read_database(
...     query="SELECT * FROM test_data",
...     connection=user_conn,
...     schema_overrides={"normalised_score": pl.UInt8},
... )  

使用参数化的SQLAlchemy查询,通过execute_options传递命名值:

>>> df = pl.read_database(
...     query="SELECT * FROM test_data WHERE metric > :value",
...     connection=alchemy_conn,
...     execute_options={"parameters": {"value": 0}},
... )  

使用‘qmark’风格的参数化;值仍然通过execute_options传递, 但在这种情况下,“parameters”值是一个字面量序列,而不是一个字典:

>>> df = pl.read_database(
...     query="SELECT * FROM test_data WHERE metric > ?",
...     connection=alchemy_conn,
...     execute_options={"parameters": [0]},
... )  

使用ODBC连接字符串实例化一个DataFrame(需要arrow-odbc包),设置可变文本/二进制列的缓冲区大小的上限,将结果作为DataFrames的迭代器返回,每个DataFrame包含1000行:

>>> for df in pl.read_database(
...     query="SELECT * FROM test_data",
...     connection="Driver={PostgreSQL};Server=localhost;Port=5432;Database=test;Uid=usr;Pwd=",
...     execute_options={"max_text_size": 512, "max_binary_size": 1024},
...     iter_batches=True,
...     batch_size=1000,
... ):
...     do_something(df)  

KùzuDB连接和Cypher查询加载图数据库结果:

>>> df = pl.read_database(
...     query="MATCH (a:User)-[f:Follows]->(b:User) RETURN a.name, f.since, b.name",
...     connection=kuzu_db_conn,
... )  

从异步SQLAlchemy驱动/引擎加载数据;请注意,这里也支持异步连接和会话:

>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> async_engine = create_async_engine("sqlite+aiosqlite:///test.db")
>>> df = pl.read_database(
...     query="SELECT * FROM test_data",
...     connection=async_engine,
... )  

从异步SurrealDB客户端连接对象加载数据;注意,支持WS(Surreal)和HTTP(SurrealHTTP)客户端:

>>> import asyncio
>>> async def surreal_query_to_frame(query: str, url: str):
...     async with Surreal(url) as client:
...         await client.use(namespace="test", database="test")
...         return pl.read_database(query=query, connection=client)
>>> df = asyncio.run(
...     surreal_query_to_frame(
...         query="SELECT * FROM test_data",
...         url="ws://localhost:8000/rpc",
...     )
... )