Spark Streaming Monitoring

Vinay Badhan
6 min readOct 8, 2020

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards.

Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.

Discretized Streams (DStreams):

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable, distributed dataset (see Spark Programming Guide for more details). Each RDD in a DStream contains data from a certain interval, as shown in the following figure.

Any operation applied on a DStream translates to operations on the underlying RDDs. For example, in the earlier example of converting a stream of lines to words, the flatMap operation is applied on each RDD in the lines DStream to generate the RDDs of the words DStream. This is shown in the following figure.

These underlying RDD transformations are computed by the Spark engine. The DStream operations hide most of these details and provide the developer with a higher-level API for convenience. These operations are discussed in detail in later sections.

Monitoring Application using StreamingListener:

Beyond Spark’s monitoring capabilities, there are additional capabilities specific to Spark Streaming. When a StreamingContext is used, the Spark web UI shows an additional Streaming tab which shows statistics about running receivers (whether receivers are active, number of records received, receiver error, etc.) and completed batches (batch processing times, queueing delays, etc.). This can be used to monitor the progress of the streaming application.

The following two metrics in web UI are particularly important:

  • Processing Time — The time to process each batch of data.
  • Scheduling Delay — the time a batch waits in a queue for the processing of previous batches to finish.

If the batch processing time is consistently more than the batch interval and/or the queueing delay keeps increasing, then it indicates that the system is not able to process the batches as fast they are being generated and is falling behind. In that case, consider reducing the batch processing time.

The progress of a Spark Streaming program can also be monitored using the StreamingListener interface, which allows you to get receiver status and processing times. Note that this is a developer API and it is likely to be improved upon (i.e., more information reported) in the future.

Using the StreamingListener interface for logging the streaming metrics:

As mentioned above, the data is processed in batches. We can get the data for each of those micro-batches and log the data into the existing ELK stack.

StreamingListener:

Following are the methods for the StreamingListener interface:

onStreamingStarted : Called when the streaming has been started

onReceiverStarted : Called when a receiver has been started

onReceiverError : Called when a receiver has reported an error

onReceiverStopped : Called when a receiver has been stopped

onBatchSubmitted : Called when a batch of jobs has been submitted for processing

onBatchStarted : Called when processing of a batch of jobs has started

onBatchCompleted : Called when processing of a batch of jobs has completed

onOutputOperationStarted : Called when processing of a job of a batch has started

onOutputOperationCompleted : Called when processing of a job of a batch has completed

JavaStreamingListener extends the interface for the python StreamingListener

Receiver methods:

onReceiverStarted

onReceiverError

onReceiverStopped

accepts receiverInfo as the input

Schema of receiveInfo:

streamId: integer

name: string

active: boolean

location: string

executorId: string

lastErrorMessage: string

lastError: string

lastErrorTime: long

Batch methods:

onBatchSubmitted

onBatchStarted

onBatchCompleted

accepts batchInfo as the input

Schema of batchInfo:

batchTime : Time of the batch

streamIdToInputInfo : A map of input stream id to its input info

submissionTime : Clock time of when jobs of this batch was submitted to the streaming scheduler queue

processingStartTime : Clock time of when the first job of this batch started processing. `-1` means the batch has not yet started

processingEndTime : Clock time of when the last job of this batch finished processing. `-1` means the batch has not yet completed.

schedulingDelay : Time taken for the first job of this batch to start processing from the time this batch was submitted to the streaming scheduler. Essentially, it is `processingStartTime` — `submissionTime`. `-1` means the batch has not yet started

processingDelay : Time taken for all jobs of this batch to finish processing from the time they started processing. Essentially, it is `processingEndTime` — `processingStartTime`. `-1` means the batch has not yet completed.

totalDelay : Time taken for all the jobs of this batch to finish processing from the time they were submitted. Essentially, it is `processingDelay` + `schedulingDelay`. `-1` means the batch has not yet completed.

numRecords : The number of recorders received by the receivers in this batch

outputOperationInfos : The output operations in this batch

OutputOperation methods:

onOutputOperationStarted

onOutputOperationCompleted

accepts outputOperationInfo as the input

Schema of outputOperationInfo:

batchTime: time

id: int

name: string

description: string

startTime: long

endTime: long

failureReason: string

Implementation: The following example prints the data. Similarly other methods can be overridden. You can log all these data-points to say a Kafka topic or send it to some metrics sink like graphite or datadog.

Open a TCP connection to send data to 9999 port:

nc -lk 9999

Start a pyspark shell and execute the following code:

import pysparkfrom pyspark.streaming import StreamingContext, StreamingListenerclass SparkMonitoringListener(pyspark.streaming.StreamingListener):
def onBatchCompleted(self, batchCompleted):
start = batchCompleted.batchInfo().processingStartTime()
end = batchCompleted.batchInfo().processingEndTime()
batchTime = batchCompleted.batchInfo().batchTime()
numRecords = batchCompleted.batchInfo().numRecords()
totalDelay = batchCompleted.batchInfo().totalDelay()
if numRecords > 0:
print(f"start: {start}, end: {end}, batch time: {batchTime}, num records: {numRecords}, total delay: {totalDelay}")
ssc = StreamingContext(sc, 30)# Create a socket text stream which listenes to port 9999
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.addStreamingListener(SparkMonitoringListener())ssc.start() # Start the computationssc.awaitTermination()

References:

--

--