Spark Streaming 自定义接收器

Spark Streaming 可以从任何任意数据源接收流式数据,而不仅限于它内置支持的数据源(即,不仅限于 Kafka、Kinesis、文件、套接字等)。这需要开发者实现一个针对所需数据源定制的 receiver 。本指南将详细介绍实现自定义接收器的过程以及在 Spark Streaming 应用程序中使用它。请注意,自定义接收器可以用 Scala 或 Java 实现。

实现自定义接收器

这首先是通过实现一个 Receiver ( Scala 文档 , Java 文档 )。 自定义接收器必须通过实现两个方法来扩展这个抽象类

Both onStart() onStop() 不能无限期阻塞。通常情况下, onStart() 会启动负责接收数据的线程,而 onStop() 会确保这些接收数据的线程被停止。接收线程还可以使用 isStopped() ,这是一个 Receiver 方法,用于检查它们是否应该停止接收数据。

一旦接收到数据,这些数据可以通过调用 store(data) 存储到 Spark 中,这是一种由 Receiver 类提供的方法。有多种形式的 store() 可以让你逐条记录或作为整个对象集合/序列化字节来存储接收到的数据。请注意,实现接收器所使用的 store() 形式会影响其可靠性和容错语义。这个内容在 后面 进行了更详细的讨论。

接收线程中的任何异常都应该被捕获并正确处理,以避免接收器的静默失败。 restart( ) 将通过异步调用 onStop() 来重启接收器,然后在延迟后调用 onStart() stop( ) 将调用 onStop() 并终止接收器。此外, reportError( ) 会向驱动程序报告错误消息(在日志和用户界面中可见),而不会停止/重启接收器。

以下是一个自定义接收器,它通过套接字接收文本流。它将文本流中的以‘\n’分隔的行视为记录,并使用Spark存储它们。如果接收线程在连接或接收时遇到任何错误,将重新启动接收器以尝试再次连接。

class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// 启动接收数据的线程
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// 因为调用receive()的线程如果isStopped()返回false会自动停止,实际上没有太多要做的
}
/** 创建一个socket连接并接收数据直到接收器停止 */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// 连接到主机:端口
socket = new Socket(host, port)
// 直到停止或连接中断,继续读取
val reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
// 在服务器再次活动时,尝试重新连接
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// 如果无法连接到服务器,则重启
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// 如果有其他错误,则重启
restart("Error receiving data", t)
}
}
}
public class JavaCustomReceiver extends Receiver<String> {
String host = null;
int port = -1;
public JavaCustomReceiver(String host_ , int port_) {
super(StorageLevel.MEMORY_AND_DISK_2());
host = host_;
port = port_;
}
@Override
public void onStart() {
// 启动一个线程以通过连接接收数据
new Thread(this::receive).start();
}
@Override
public void onStop() {
// 没有什么需要做的,因为调用 receive() 的线程
// 设计为在 isStopped() 返回 false 时自动停止
}
/** 创建一个套接字连接并接收数据直到接收器被停止 */
private void receive() {
Socket socket = null;
String userInput = null;
try {
// 连接到服务器
socket = new Socket(host, port);
BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
// 继续读取直到停止或连接中断
while (!isStopped() && (userInput = reader.readLine()) != null) {
System.out.println("Received data '" + userInput + "'");
store(userInput);
}
reader.close();
socket.close();
// 在服务器再次活跃时尝试重新连接
restart("Trying to connect again");
} catch(ConnectException ce) {
// 如果无法连接到服务器则重启
restart("Could not connect", ce);
} catch(Throwable t) {
// 如果有任何其他错误则重启
restart("Error receiving data", t);
}
}
}

在Spark Streaming应用中使用自定义接收器

自定义接收器可以通过使用 streamingContext.receiverStream( ) 在Spark Streaming应用程序中使用。这将创建一个输入DStream,使用自定义接收器实例接收到的数据,如下所示:

// 假设 ssc 是 StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = customReceiverStream.flatMap(_.split(" "))
...

完整的源代码在示例 CustomReceiver.scala 中。

// 假设 ssc 是 JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = customReceiverStream.flatMap(s -> ...);
...

完整的源代码在示例 JavaCustomReceiver.java 中。

接收器可靠性

Spark Streaming 编程指南 中简要讨论了基于其可靠性和容错语义有两种类型的接收器。

  1. 可靠接收器 - 对于允许已发送数据被确认的 可靠源 可靠接收器 会正确地向源确认数据已被接收并且在Spark中可靠地存储(即成功复制)。通常,实现这个接收器需要仔细考虑源确认的语义。
  2. 不可靠接收器 - 不可靠接收器 不会向源发送确认。这可以用于不支持确认的源,甚至可以用于可靠源,当不想或不需要进入确认的复杂性时。

要实现一个 可靠接收器 ,你必须使用 store(multiple-records) 来存储数据。 这种 store 形式是一个阻塞调用,只有在所有给定记录都被存储到Spark中后才会返回。如果接收器配置的存储级别使用了复制(默认启用),那么此调用将在复制完成后返回。 因此,它确保数据被可靠地存储,而接收器现在可以适当地确认来源。这确保了在接收器在复制数据中途失败时不会丢失任何数据——缓冲的数据将不会被确认,因此将由源重新发送。

一个 不可靠的接收器 并不需要实现任何这些逻辑。它可以简单地从源接收记录,并使用 store(single-record) 一次插入一条记录。虽然它没有 store(multiple-records) 的可靠性保证,但它具有以下优点:

下表总结了两种接收器的特点

接收者类型 特性
不可靠接收者 实现简单。
系统负责区块生成和速率控制。 没有容错保证,接收者出现故障可能会丢失数据。
可靠接收者 强大的容错保证,可以确保零数据丢失。
区块生成和速率控制由接收者的实现来处理。
实现复杂性取决于源的确认机制。