Skip to content

云存储

Polars 可以读取和写入 AWS S3、Azure Blob Storage 和 Google Cloud Storage。对于这三种存储提供商,API 是相同的。

要从云存储中读取数据,可能需要额外的依赖项,具体取决于用例和云存储提供商:

$ pip install fsspec s3fs adlfs gcsfs
$ cargo add aws_sdk_s3 aws_config tokio --features tokio/full

从云存储读取

Polars 支持从云存储读取 Parquet、CSV、IPC 和 NDJSON 文件:

read_parquet · read_csv · read_ipc

import polars as pl

source = "s3://bucket/*.parquet"

df = pl.read_parquet(source)

ParquetReader · CsvReader · IpcReader · 在功能 csv 上可用 · 在功能 parquet 上可用 · 在功能 ipc 上可用

use aws_config::BehaviorVersion;
use polars::prelude::*;

#[tokio::main]
async fn main() {
    let bucket = "";
    let path = "";

    let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
    let client = aws_sdk_s3::Client::new(&config);

    let object = client
        .get_object()
        .bucket(bucket)
        .key(path)
        .send()
        .await
        .unwrap();

    let bytes = object.body.collect().await.unwrap().into_bytes();

    let cursor = std::io::Cursor::new(bytes);
    let df = CsvReader::new(cursor).finish().unwrap();

    println!("{:?}", df);
}

从云存储扫描并进行查询优化

使用pl.scan_*函数从云存储读取数据可以受益于 谓词和投影下推,查询优化器会在文件下载之前应用这些优化。这可以显著减少需要下载的数据量。通过调用collect来触发查询评估。

import polars as pl

source = "s3://bucket/*.parquet"

df = pl.scan_parquet(source).filter(pl.col("id") < 100).select("id","value").collect()

云认证

Polars 能够自动加载一些云提供商的默认凭证配置。对于无法自动加载的情况,可以手动配置 Polars 用于身份验证的凭证。这可以通过以下几种方式实现:

使用 storage_options:

  • 凭证可以作为配置键在字典中通过storage_options参数传递:

scan_parquet

import polars as pl

source = "s3://bucket/*.parquet"

storage_options = {
    "aws_access_key_id": "",
    "aws_secret_access_key": "",
    "aws_region": "us-east-1",
}
df = pl.scan_parquet(source, storage_options=storage_options).collect()

使用可用的CredentialProvider*工具类之一

  • 可能有一个实用类 pl.CredentialProvider* 提供所需的认证功能。例如,pl.CredentialProviderAWS 支持选择 AWS 配置文件,以及承担 IAM 角色:

scan_parquet

lf = pl.scan_parquet(
    "s3://.../...",
    credential_provider=pl.CredentialProviderAWS(
        profile_name="..."
        assume_role={
            "RoleArn": f"...",
            "RoleSessionName": "...",
        }
    ),
)

df = lf.collect()

使用自定义的credential_provider函数

  • 某些环境可能需要自定义的身份验证逻辑(例如 AWS IAM 角色链)。对于这些情况,可以为 Polars 提供一个 Python 函数来获取凭证:

scan_parquet

def get_credentials() -> pl.CredentialProviderFunctionReturn:
    expiry = None

    return {
        "aws_access_key_id": "...",
        "aws_secret_access_key": "...",
        "aws_session_token": "...",
    }, expiry


lf = pl.scan_parquet(
    "s3://.../...",
    credential_provider=get_credentials,
)

df = lf.collect()

使用PyArrow进行扫描

我们也可以使用PyArrow从云存储中扫描。这对于分区数据集(如Hive分区)特别有用。

我们首先创建一个PyArrow数据集,然后从该数据集创建一个LazyFrame

scan_pyarrow_dataset

import polars as pl
import pyarrow.dataset as ds

dset = ds.dataset("s3://my-partitioned-folder/", format="parquet")
(
    pl.scan_pyarrow_dataset(dset)
    .filter(pl.col("foo") == "a")
    .select(["foo", "bar"])
    .collect()
)

写入云存储

我们可以使用Python中的s3fs将DataFrame写入S3云存储,使用adlfs写入Azure Blob存储,使用gcsfs写入Google云存储。在这个例子中,我们将一个Parquet文件写入S3。

write_parquet

import polars as pl
import s3fs

df = pl.DataFrame({
    "foo": ["a", "b", "c", "d", "d"],
    "bar": [1, 2, 3, 4, 5],
})

fs = s3fs.S3FileSystem()
destination = "s3://bucket/my_file.parquet"

# 写入parquet文件
with fs.open(destination, mode='wb') as f:
    df.write_parquet(f)