云存储
Polars 可以读取和写入 AWS S3、Azure Blob Storage 和 Google Cloud Storage。对于这三种存储提供商,API 是相同的。
要从云存储中读取数据,可能需要额外的依赖项,具体取决于用例和云存储提供商:
$ pip install fsspec s3fs adlfs gcsfs
$ cargo add aws_sdk_s3 aws_config tokio --features tokio/full
从云存储读取
Polars 支持从云存储读取 Parquet、CSV、IPC 和 NDJSON 文件:
read_parquet · read_csv · read_ipc
import polars as pl
source = "s3://bucket/*.parquet"
df = pl.read_parquet(source)
ParquetReader · CsvReader · IpcReader · 在功能 csv 上可用 · 在功能 parquet 上可用 · 在功能 ipc 上可用
use aws_config::BehaviorVersion;
use polars::prelude::*;
#[tokio::main]
async fn main() {
let bucket = "" ;
let path = "" ;
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let client = aws_sdk_s3::Client::new(&config);
let object = client
.get_object()
.bucket(bucket)
.key(path)
.send()
.await
.unwrap();
let bytes = object.body.collect().await.unwrap().into_bytes();
let cursor = std::io::Cursor::new(bytes);
let df = CsvReader::new(cursor).finish().unwrap();
println!("{:?}", df);
}
从云存储扫描并进行查询优化
使用pl.scan_*函数从云存储读取数据可以受益于
谓词和投影下推,查询优化器会在文件下载之前应用这些优化。这可以显著减少需要下载的数据量。通过调用collect来触发查询评估。
import polars as pl
source = "s3://bucket/*.parquet"
df = pl.scan_parquet(source).filter(pl.col("id") < 100).select("id","value").collect()
云认证
Polars 能够自动加载一些云提供商的默认凭证配置。对于无法自动加载的情况,可以手动配置 Polars 用于身份验证的凭证。这可以通过以下几种方式实现:
使用 storage_options:
- 凭证可以作为配置键在字典中通过
storage_options参数传递:
import polars as pl
source = "s3://bucket/*.parquet"
storage_options = {
"aws_access_key_id": "" ,
"aws_secret_access_key": "" ,
"aws_region": "us-east-1",
}
df = pl.scan_parquet(source, storage_options=storage_options).collect()
使用可用的CredentialProvider*工具类之一
- 可能有一个实用类
pl.CredentialProvider*提供所需的认证功能。例如,pl.CredentialProviderAWS支持选择 AWS 配置文件,以及承担 IAM 角色:
lf = pl.scan_parquet(
"s3://.../...",
credential_provider=pl.CredentialProviderAWS(
profile_name="..."
assume_role={
"RoleArn": f"...",
"RoleSessionName": "...",
}
),
)
df = lf.collect()
使用自定义的credential_provider函数
- 某些环境可能需要自定义的身份验证逻辑(例如 AWS IAM 角色链)。对于这些情况,可以为 Polars 提供一个 Python 函数来获取凭证:
def get_credentials() -> pl.CredentialProviderFunctionReturn:
expiry = None
return {
"aws_access_key_id": "...",
"aws_secret_access_key": "...",
"aws_session_token": "...",
}, expiry
lf = pl.scan_parquet(
"s3://.../...",
credential_provider=get_credentials,
)
df = lf.collect()
使用PyArrow进行扫描
我们也可以使用PyArrow从云存储中扫描。这对于分区数据集(如Hive分区)特别有用。
我们首先创建一个PyArrow数据集,然后从该数据集创建一个LazyFrame。
import polars as pl
import pyarrow.dataset as ds
dset = ds.dataset("s3://my-partitioned-folder/", format="parquet")
(
pl.scan_pyarrow_dataset(dset)
.filter(pl.col("foo") == "a")
.select(["foo", "bar"])
.collect()
)
写入云存储
我们可以使用Python中的s3fs将DataFrame写入S3云存储,使用adlfs写入Azure Blob存储,使用gcsfs写入Google云存储。在这个例子中,我们将一个Parquet文件写入S3。
import polars as pl
import s3fs
df = pl.DataFrame({
"foo": ["a", "b", "c", "d", "d"],
"bar": [1, 2, 3, 4, 5],
})
fs = s3fs.S3FileSystem()
destination = "s3://bucket/my_file.parquet"
# 写入parquet文件
with fs.open(destination, mode='wb') as f:
df.write_parquet(f)