用户定义的聚合函数 (UDAFs)

描述

用户定义的聚合函数 (UDAFs) 是用户可编程的例程,它们可以对多行数据同时操作,并返回一个单一的聚合值作为结果。此文档列出了创建和注册 UDAFs 所需的类。它还包含了演示如何在 Scala 中定义和注册 UDAFs 并在 Spark SQL 中调用它们的示例。

聚合器[-IN, BUF, OUT]

一个用于用户定义聚合的基类,可以在数据集操作中使用,将一组的所有元素减少为一个单一的值。

IN - 聚合的输入类型。

BUF - 归约的中间值的类型。

输出 - 最终输出结果的类型。

示例

类型安全的用户定义聚合函数

用户定义的聚合对于强类型数据集围绕着 Aggregator 抽象类。
例如,一个类型安全的用户定义平均值可以像这样:

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {
// 此聚合的零值。应满足属性任何 b + zero = b
def zero: Average = Average(0L, 0L)
// 合并两个值以生成新值。出于性能考虑,该函数可以修改 `buffer`
// 并返回它,而不是构造一个新对象
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// 合并两个中间值
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// 转换减法的输出
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// 指定中间值类型的编码器
def bufferEncoder: Encoder[Average] = Encoders.product
// 指定最终输出值类型的编码器
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+
// 将函数转换为 `TypedColumn` 并命名
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.
import java.io.Serializable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;
public static class Employee implements Serializable {
private String name;
private long salary;
// 构造函数,获取器,设置器...
}
public static class Average implements Serializable {
private long sum;
private long count;
// 构造函数,获取器,设置器...
}
public static class MyAverage extends Aggregator<Employee, Average, Double> {
// 这个聚合的零值。应该满足任何 b + zero = b 的属性
@Override
public Average zero() {
return new Average(0L, 0L);
}
// 合并两个值以产生一个新值。出于性能原因,该函数可能会修改 `buffer`
// 并返回它而不是构造一个新对象
@Override
public Average reduce(Average buffer, Employee employee) {
long newSum = buffer.getSum() + employee.getSalary();
long newCount = buffer.getCount() + 1;
buffer.setSum(newSum);
buffer.setCount(newCount);
return buffer;
}
// 合并两个中间值
@Override
public Average merge(Average b1, Average b2) {
long mergedSum = b1.getSum() + b2.getSum();
long mergedCount = b1.getCount() + b2.getCount();
b1.setSum(mergedSum);
b1.setCount(mergedCount);
return b1;
}
// 转换减少的输出
@Override
public Double finish(Average reduction) {
return ((double) reduction.getSum()) / reduction.getCount();
}
// 指定中间值类型的编码器
@Override
public Encoder<Average> bufferEncoder() {
return Encoders.bean(Average.class);
}
// 指定最终输出值类型的编码器
@Override
public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}
Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder);
ds.show();
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+
MyAverage myAverage = new MyAverage();
// 将函数转换为 `TypedColumn` 并给它一个名称
TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary");
Dataset<Double> result = ds.select(averageSalary);
result.show();
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java" in the Spark repo.

未类型化的用户定义聚合函数

如上所述,类型化聚合还可以注册为未类型化聚合 UDF,以便与 DataFrame 一起使用。 例如,用户定义的未类型化 DataFrame 的平均值看起来可以是:

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Long, Average, Double] {
// 此聚合的零值。应满足任意 b + zero = b 的属性
def zero: Average = Average(0L, 0L)
// 合并两个值以生成一个新值。为了性能,函数可以修改 `buffer`
// 并返回它,而不是构造新对象
def reduce(buffer: Average, data: Long): Average = {
buffer.sum += data
buffer.count += 1
buffer
}
// 合并两个中间值
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// 转换减少的输出
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// 指定中间值类型的编码器
def bufferEncoder: Encoder[Average] = Encoders.product
// 指定最终输出值类型的编码器
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
// 注册函数以访问它
spark.udf.register("myAverage", functions.udaf(MyAverage))
val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.
import java.io.Serializable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.sql.functions;
public static class Average implements Serializable {
private long sum;
private long count;
// 构造函数,获取器,设置器...
public Average() {
}
public Average(long sum, long count) {
this.sum = sum;
this.count = count;
}
public long getSum() {
return sum;
}
public void setSum(long sum) {
this.sum = sum;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
}
public static class MyAverage extends Aggregator<Long, Average, Double> {
// 此聚合的零值。应满足属性 b + zero = b
@Override
public Average zero() {
return new Average(0L, 0L);
}
// 合并两个值以生成新值。为了性能,函数可以修改 `buffer`
// 并返回它而不是构造一个新对象
@Override
public Average reduce(Average buffer, Long data) {
long newSum = buffer.getSum() + data;
long newCount = buffer.getCount() + 1;
buffer.setSum(newSum);
buffer.setCount(newCount);
return buffer;
}
// 合并两个中间值
@Override
public Average merge(Average b1, Average b2) {
long mergedSum = b1.getSum() + b2.getSum();
long mergedCount = b1.getCount() + b2.getCount();
b1.setSum(mergedSum);
b1.setCount(mergedCount);
return b1;
}
// 转换归约的输出
@Override
public Double finish(Average reduction) {
return ((double) reduction.getSum()) / reduction.getCount();
}
// 指定中间值类型的编码器
@Override
public Encoder<Average> bufferEncoder() {
return Encoders.bean(Average.class);
}
// 指定最终输出值类型的编码器
@Override
public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}
// 注册该函数以便访问
spark.udf().register("myAverage", functions.udaf(new MyAverage(), Encoders.LONG()));
Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+
Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
result.show();
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedUntypedAggregation.java" in the Spark repo.
-- 编译并将 UDAF MyAverage 放入名为 `MyAverage.jar` 的 JAR 文件中,路径为 /tmp。
CREATE FUNCTION myAverage AS 'MyAverage' USING JAR ;
SHOW USER FUNCTIONS;
+------------------+
| function|
+------------------+
| default.myAverage|
+------------------+
CREATE TEMPORARY VIEW employees
USING org.apache.spark.sql.json
OPTIONS (
path "examples/src/main/resources/employees.json"
);
SELECT * FROM employees;
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
SELECT myAverage(salary) as average_salary FROM employees;
+--------------+
|average_salary|
+--------------+
| 3750.0|
+--------------+