配置
GraphFrames 提供了多种配置选项,可用于调整算法和操作的行为。本文档记录了所有可用的配置项、其描述、默认值及使用示例。
配置表
下表列出了所有可用的 GraphFrames 配置:
| 配置 | 描述 | 默认值 | 自版本 |
|---|---|---|---|
spark.graphframes.useLocalCheckpoints |
告知连通分量算法使用本地检查点。如果设置为"true",迭代算法将使用检查点机制将数据持久化到存储中。本地检查点速度更快,但可能使整个作业更不容易出错。 | false |
0.9.3 |
spark.graphframes.useLabelsAsComponents |
告知连通分量算法在输出DataFrame中使用标签作为分量。如果设置为"false",将返回数据类型为LONG的随机生成标签。 | 可选(默认值:true) |
0.9.0 |
spark.graphframes.connectedComponents.algorithm |
设置要使用的连通分量算法。支持的算法: - "graphframes": 使用MapReduce及更高版本中的连通分量中提出的交替大星和小星迭代,并带有倾斜连接优化。 - "graphx": 将图转换为GraphX图,然后使用GraphX中的连通分量实现。 |
可选(默认:graphframes) |
0.9.0 |
spark.graphframes.connectedComponents.broadcastthreshold |
设置传播组件分配时的广播阈值。如果在某次迭代中节点的度数大于此阈值,其组件分配将被收集并广播回其邻居以传播分配。否则,分配传播将通过常规Spark连接完成。此参数仅在算法设置为"graphframes"时使用。 | 可选(默认值:1000000) |
0.9.0 |
spark.graphframes.connectedComponents.checkpointinterval |
设置以迭代次数为单位的检查点间隔。定期设置检查点有助于从故障中恢复、清理混洗文件、缩短计算图的谱系并降低计划优化的复杂度。自Spark 2.0起,若不设置检查点,计划优化的复杂度将呈指数级增长。因此不建议禁用检查点或设置比默认值更长的检查点间隔。检查点数据保存在org.apache.spark.SparkContext.getCheckpointDir下,前缀为"connected-components"。如果未设置检查点目录,将抛出java.io.IOException。设置非正值可禁用检查点功能。此参数仅在算法设置为"graphframes"时使用。 |
可选(默认值:2) |
0.9.0 |
spark.graphframes.connectedComponents.intermediatestoragelevel |
设置需要多次传递的中间数据集的存储级别。 | 可选(默认值:MEMORY_AND_DISK) |
0.9.0 |
设置配置
GraphFrames 配置可以通过多种方式设置:
Spark 配置
您可以在创建 SparkSession 时设置配置:
Scala API
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("GraphFrames Example")
.config("spark.graphframes.connectedComponents.algorithm", "graphframes")
.config("spark.graphframes.connectedComponents.checkpointinterval", 3)
.getOrCreate()
Python API
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("GraphFrames Example") \
.config("spark.graphframes.connectedComponents.algorithm", "graphframes") \
.config("spark.graphframes.connectedComponents.checkpointinterval", 3) \
.getOrCreate()
运行时配置
您也可以在运行时设置配置:
Scala API
spark.conf.set("spark.graphframes.connectedComponents.algorithm", "graphframes")
spark.conf.set("spark.graphframes.connectedComponents.checkpointinterval", 3)
Python API
spark.conf.set("spark.graphframes.connectedComponents.algorithm", "graphframes")
spark.conf.set("spark.graphframes.connectedComponents.checkpointinterval", 3)
示例:使用自定义配置的连通组件
此示例展示了如何使用自定义配置运行连通分量算法:
Scala API
import org.graphframes.GraphFrame
import org.graphframes.examples
// Get example graph
val g = examples.Graphs.friends
// Set configurations
spark.conf.set("spark.graphframes.connectedComponents.algorithm", "graphframes")
spark.conf.set("spark.graphframes.connectedComponents.checkpointinterval", 3)
spark.conf.set("spark.graphframes.useLocalCheckpoints", true)
// Run connected components with custom configurations
val result = g.connectedComponents.run()
result.show()
Python API
from graphframes.examples import Graphs
# Get example graph
g = Graphs(spark).friends()
# Set configurations
spark.conf.set("spark.graphframes.connectedComponents.algorithm", "graphframes")
spark.conf.set("spark.graphframes.connectedComponents.checkpointinterval", 3)
spark.conf.set("spark.graphframes.useLocalCheckpoints", "true")
# Run connected components with custom configurations
result = g.connectedComponents()
result.show()
配置使用说明
- 检查点目录:对于与检查点相关的配置,请确保在运行使用检查点的算法之前,使用
spark.sparkContext.setCheckpointDir("path/to/checkpoint/dir")设置检查点目录。 - 存储级别: 当设置
spark.graphframes.connectedComponents.intermediatestoragelevel配置时,请使用以下值之一:MEMORY_ONLY,MEMORY_AND_DISK,MEMORY_ONLY_SER,MEMORY_AND_DISK_SER,DISK_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK_2, 等。 - 算法选择: 连通分量算法的选择会显著影响性能。graphframes算法通常对大型图更具可扩展性,而graphx算法对小型图可能更快。
- 本地检查点:本地检查点速度更快且更不易出错,但如果本地磁盘空间不足可能会造成压力。由于本地检查点不需要设置
checkpointDir,因此是推荐选项。