Skip to content

Commit ce99f51

Browse files
saturday_sMarcelo Vanzin
authored andcommitted
[SPARK-18537][WEB UI] Add a REST api to serve spark streaming information
## What changes were proposed in this pull request? This PR is an inheritance from apache#16000, and is a completion of apache#15904. **Description** - Augment the `org.apache.spark.status.api.v1` package for serving streaming information. - Retrieve the streaming information through StreamingJobProgressListener. > this api should cover exceptly the same amount of information as you can get from the web interface > the implementation is base on the current REST implementation of spark-core > and will be available for running applications only > > https://issues.apache.org/jira/browse/SPARK-18537 ## How was this patch tested? Local test. Author: saturday_s <[email protected]> Author: Chan Chor Pang <[email protected]> Author: peterCPChan <[email protected]> Closes apache#16253 from saturday-shi/SPARK-18537.
1 parent 31da755 commit ce99f51

24 files changed

Lines changed: 689 additions & 5 deletions

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ private[spark] class SparkUI private (
6060

6161
var appId: String = _
6262

63+
private var streamingJobProgressListener: Option[SparkListener] = None
64+
6365
/** Initialize all components of the server. */
6466
def initialize() {
6567
val jobsTab = new JobsTab(this)
@@ -124,6 +126,12 @@ private[spark] class SparkUI private (
124126
def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
125127
getApplicationInfoList.find(_.id == appId)
126128
}
129+
130+
def getStreamingJobProgressListener: Option[SparkListener] = streamingJobProgressListener
131+
132+
def setStreamingJobProgressListener(sparkListener: SparkListener): Unit = {
133+
streamingJobProgressListener = Option(sparkListener)
134+
}
127135
}
128136

129137
private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)

project/MimaExcludes.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,12 @@ object MimaExcludes {
3838
lazy val v22excludes = v21excludes ++ Seq(
3939
// [SPARK-18663][SQL] Simplify CountMinSketch aggregate implementation
4040
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.util.sketch.CountMinSketch.toByteArray"),
41+
4142
// [SPARK-18949] [SQL] Add repairTable API to Catalog
42-
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions")
43+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions"),
44+
45+
// [SPARK-18537] Add a REST api to spark streaming
46+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted")
4347
)
4448

4549
// Exclude rules for 2.1.x
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.status.api.v1.streaming;
19+
20+
import org.apache.spark.util.EnumUtil;
21+
22+
public enum BatchStatus {
23+
COMPLETED,
24+
QUEUED,
25+
PROCESSING;
26+
27+
public static BatchStatus fromString(String str) {
28+
return EnumUtil.parseIgnoreCase(BatchStatus.class, str);
29+
}
30+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.status.api.v1.streaming
19+
20+
import java.util.{ArrayList => JArrayList, Arrays => JArrays, Date, List => JList}
21+
import javax.ws.rs.{GET, Produces, QueryParam}
22+
import javax.ws.rs.core.MediaType
23+
24+
import org.apache.spark.status.api.v1.streaming.AllBatchesResource._
25+
import org.apache.spark.streaming.ui.StreamingJobProgressListener
26+
27+
@Produces(Array(MediaType.APPLICATION_JSON))
28+
private[v1] class AllBatchesResource(listener: StreamingJobProgressListener) {
29+
30+
@GET
31+
def batchesList(@QueryParam("status") statusParams: JList[BatchStatus]): Seq[BatchInfo] = {
32+
batchInfoList(listener, statusParams).sortBy(- _.batchId)
33+
}
34+
}
35+
36+
private[v1] object AllBatchesResource {
37+
38+
def batchInfoList(
39+
listener: StreamingJobProgressListener,
40+
statusParams: JList[BatchStatus] = new JArrayList[BatchStatus]()): Seq[BatchInfo] = {
41+
42+
listener.synchronized {
43+
val statuses =
44+
if (statusParams.isEmpty) JArrays.asList(BatchStatus.values(): _*) else statusParams
45+
val statusToBatches = Seq(
46+
BatchStatus.COMPLETED -> listener.retainedCompletedBatches,
47+
BatchStatus.QUEUED -> listener.waitingBatches,
48+
BatchStatus.PROCESSING -> listener.runningBatches
49+
)
50+
51+
val batchInfos = for {
52+
(status, batches) <- statusToBatches
53+
batch <- batches if statuses.contains(status)
54+
} yield {
55+
val batchId = batch.batchTime.milliseconds
56+
val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
57+
58+
new BatchInfo(
59+
batchId = batchId,
60+
batchTime = new Date(batchId),
61+
status = status.toString,
62+
batchDuration = listener.batchDuration,
63+
inputSize = batch.numRecords,
64+
schedulingDelay = batch.schedulingDelay,
65+
processingTime = batch.processingDelay,
66+
totalDelay = batch.totalDelay,
67+
numActiveOutputOps = batch.numActiveOutputOp,
68+
numCompletedOutputOps = batch.numCompletedOutputOp,
69+
numFailedOutputOps = batch.numFailedOutputOp,
70+
numTotalOutputOps = batch.outputOperations.size,
71+
firstFailureReason = firstFailureReason
72+
)
73+
}
74+
75+
batchInfos
76+
}
77+
}
78+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.status.api.v1.streaming
19+
20+
import java.util.Date
21+
import javax.ws.rs.{GET, PathParam, Produces}
22+
import javax.ws.rs.core.MediaType
23+
24+
import org.apache.spark.status.api.v1.NotFoundException
25+
import org.apache.spark.status.api.v1.streaming.AllOutputOperationsResource._
26+
import org.apache.spark.streaming.Time
27+
import org.apache.spark.streaming.ui.StreamingJobProgressListener
28+
29+
@Produces(Array(MediaType.APPLICATION_JSON))
30+
private[v1] class AllOutputOperationsResource(listener: StreamingJobProgressListener) {
31+
32+
@GET
33+
def operationsList(@PathParam("batchId") batchId: Long): Seq[OutputOperationInfo] = {
34+
outputOperationInfoList(listener, batchId).sortBy(_.outputOpId)
35+
}
36+
}
37+
38+
private[v1] object AllOutputOperationsResource {
39+
40+
def outputOperationInfoList(
41+
listener: StreamingJobProgressListener,
42+
batchId: Long): Seq[OutputOperationInfo] = {
43+
44+
listener.synchronized {
45+
listener.getBatchUIData(Time(batchId)) match {
46+
case Some(batch) =>
47+
for ((opId, op) <- batch.outputOperations) yield {
48+
val jobIds = batch.outputOpIdSparkJobIdPairs
49+
.filter(_.outputOpId == opId).map(_.sparkJobId).toSeq.sorted
50+
51+
new OutputOperationInfo(
52+
outputOpId = opId,
53+
name = op.name,
54+
description = op.description,
55+
startTime = op.startTime.map(new Date(_)),
56+
endTime = op.endTime.map(new Date(_)),
57+
duration = op.duration,
58+
failureReason = op.failureReason,
59+
jobIds = jobIds
60+
)
61+
}
62+
case None => throw new NotFoundException("unknown batch: " + batchId)
63+
}
64+
}.toSeq
65+
}
66+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.status.api.v1.streaming
19+
20+
import java.util.Date
21+
import javax.ws.rs.{GET, Produces}
22+
import javax.ws.rs.core.MediaType
23+
24+
import org.apache.spark.status.api.v1.streaming.AllReceiversResource._
25+
import org.apache.spark.streaming.ui.StreamingJobProgressListener
26+
27+
@Produces(Array(MediaType.APPLICATION_JSON))
28+
private[v1] class AllReceiversResource(listener: StreamingJobProgressListener) {
29+
30+
@GET
31+
def receiversList(): Seq[ReceiverInfo] = {
32+
receiverInfoList(listener).sortBy(_.streamId)
33+
}
34+
}
35+
36+
private[v1] object AllReceiversResource {
37+
38+
def receiverInfoList(listener: StreamingJobProgressListener): Seq[ReceiverInfo] = {
39+
listener.synchronized {
40+
listener.receivedRecordRateWithBatchTime.map { case (streamId, eventRates) =>
41+
42+
val receiverInfo = listener.receiverInfo(streamId)
43+
val streamName = receiverInfo.map(_.name)
44+
.orElse(listener.streamName(streamId)).getOrElse(s"Stream-$streamId")
45+
val avgEventRate =
46+
if (eventRates.isEmpty) None else Some(eventRates.map(_._2).sum / eventRates.size)
47+
48+
val (errorTime, errorMessage, error) = receiverInfo match {
49+
case None => (None, None, None)
50+
case Some(info) =>
51+
val someTime =
52+
if (info.lastErrorTime >= 0) Some(new Date(info.lastErrorTime)) else None
53+
val someMessage =
54+
if (info.lastErrorMessage.length > 0) Some(info.lastErrorMessage) else None
55+
val someError =
56+
if (info.lastError.length > 0) Some(info.lastError) else None
57+
58+
(someTime, someMessage, someError)
59+
}
60+
61+
new ReceiverInfo(
62+
streamId = streamId,
63+
streamName = streamName,
64+
isActive = receiverInfo.map(_.active),
65+
executorId = receiverInfo.map(_.executorId),
66+
executorHost = receiverInfo.map(_.location),
67+
lastErrorTime = errorTime,
68+
lastErrorMessage = errorMessage,
69+
lastError = error,
70+
avgEventRate = avgEventRate,
71+
eventRates = eventRates
72+
)
73+
}.toSeq
74+
}
75+
}
76+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.status.api.v1.streaming
19+
20+
import javax.ws.rs.{Path, PathParam}
21+
22+
import org.apache.spark.status.api.v1.UIRootFromServletContext
23+
24+
@Path("/v1")
25+
private[v1] class ApiStreamingApp extends UIRootFromServletContext {
26+
27+
@Path("applications/{appId}/streaming")
28+
def getStreamingRoot(@PathParam("appId") appId: String): ApiStreamingRootResource = {
29+
uiRoot.withSparkUI(appId, None) { ui =>
30+
new ApiStreamingRootResource(ui)
31+
}
32+
}
33+
34+
@Path("applications/{appId}/{attemptId}/streaming")
35+
def getStreamingRoot(
36+
@PathParam("appId") appId: String,
37+
@PathParam("attemptId") attemptId: String): ApiStreamingRootResource = {
38+
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
39+
new ApiStreamingRootResource(ui)
40+
}
41+
}
42+
}

0 commit comments

Comments
 (0)