配置

GraphFrames 提供了多种配置选项,可用于调整算法和操作的行为。本文档记录了所有可用的配置项、其描述、默认值及使用示例。

配置表

下表列出了所有可用的 GraphFrames 配置:

配置 描述 默认值 自版本
spark.graphframes.useLocalCheckpoints 告知连通分量算法使用本地检查点。如果设置为"true",迭代算法将使用检查点机制将数据持久化到存储中。本地检查点速度更快,但可能使整个作业更不容易出错。 false 0.9.3
spark.graphframes.useLabelsAsComponents 告知连通分量算法在输出DataFrame中使用标签作为分量。如果设置为"false",将返回数据类型为LONG的随机生成标签。 可选(默认值:true 0.9.0
spark.graphframes.connectedComponents.algorithm 设置要使用的连通分量算法。支持的算法:
- "graphframes": 使用MapReduce及更高版本中的连通分量中提出的交替大星和小星迭代,并带有倾斜连接优化。
- "graphx": 将图转换为GraphX图,然后使用GraphX中的连通分量实现。
可选(默认:graphframes 0.9.0
spark.graphframes.connectedComponents.broadcastthreshold 设置传播组件分配时的广播阈值。如果在某次迭代中节点的度数大于此阈值,其组件分配将被收集并广播回其邻居以传播分配。否则,分配传播将通过常规Spark连接完成。此参数仅在算法设置为"graphframes"时使用。 可选(默认值:1000000 0.9.0
spark.graphframes.connectedComponents.checkpointinterval 设置以迭代次数为单位的检查点间隔。定期设置检查点有助于从故障中恢复、清理混洗文件、缩短计算图的谱系并降低计划优化的复杂度。自Spark 2.0起,若不设置检查点,计划优化的复杂度将呈指数级增长。因此不建议禁用检查点或设置比默认值更长的检查点间隔。检查点数据保存在org.apache.spark.SparkContext.getCheckpointDir下,前缀为"connected-components"。如果未设置检查点目录,将抛出java.io.IOException。设置非正值可禁用检查点功能。此参数仅在算法设置为"graphframes"时使用。 可选(默认值:2 0.9.0
spark.graphframes.connectedComponents.intermediatestoragelevel 设置需要多次传递的中间数据集的存储级别。 可选(默认值:MEMORY_AND_DISK 0.9.0

设置配置

GraphFrames 配置可以通过多种方式设置:

Spark 配置

您可以在创建 SparkSession 时设置配置:

Scala API

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("GraphFrames Example")
  .config("spark.graphframes.connectedComponents.algorithm", "graphframes")
  .config("spark.graphframes.connectedComponents.checkpointinterval", 3)
  .getOrCreate()

Python API

from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .appName("GraphFrames Example") \
  .config("spark.graphframes.connectedComponents.algorithm", "graphframes") \
  .config("spark.graphframes.connectedComponents.checkpointinterval", 3) \
  .getOrCreate()

运行时配置

您也可以在运行时设置配置:

Scala API

spark.conf.set("spark.graphframes.connectedComponents.algorithm", "graphframes")
spark.conf.set("spark.graphframes.connectedComponents.checkpointinterval", 3)

Python API

spark.conf.set("spark.graphframes.connectedComponents.algorithm", "graphframes")
spark.conf.set("spark.graphframes.connectedComponents.checkpointinterval", 3)

示例:使用自定义配置的连通组件

此示例展示了如何使用自定义配置运行连通分量算法:

Scala API

import org.graphframes.GraphFrame
import org.graphframes.examples

// Get example graph
val g = examples.Graphs.friends

// Set configurations
spark.conf.set("spark.graphframes.connectedComponents.algorithm", "graphframes")
spark.conf.set("spark.graphframes.connectedComponents.checkpointinterval", 3)
spark.conf.set("spark.graphframes.useLocalCheckpoints", true)

// Run connected components with custom configurations
val result = g.connectedComponents.run()
result.show()

Python API

from graphframes.examples import Graphs

# Get example graph
g = Graphs(spark).friends()

# Set configurations
spark.conf.set("spark.graphframes.connectedComponents.algorithm", "graphframes")
spark.conf.set("spark.graphframes.connectedComponents.checkpointinterval", 3)
spark.conf.set("spark.graphframes.useLocalCheckpoints", "true")

# Run connected components with custom configurations
result = g.connectedComponents()
result.show()

配置使用说明