Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1634] implement queue time/processing time metrics for rpc framework #2784

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,10 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
if (num != 0) num else availableCores
}

def rpcSlowThresholdNs(): Long = get(RPC_SLOW_THRESHOLD)
def rpcSlowIntervalMs(): Long = get(RPC_SLOW_INTERVAL).getOrElse(-1)
def rpcDumpIntervalMs(): Long = get(RPC_SUMMARY_DUMP_INTERVAL)

def networkIoMode(module: String): String = {
getTransportConf(module, NETWORK_IO_MODE)
}
Expand Down Expand Up @@ -1865,6 +1869,30 @@ object CelebornConf extends Logging {
.doc("Threads number of message dispatcher event loop for roles")
.fallbackConf(RPC_DISPATCHER_THREADS)

val RPC_SLOW_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.rpc.slow.threshold")
.categories("network")
.doc("threshold for RPC framework to log slow RPC")
.version("0.6.0")
.timeConf(TimeUnit.NANOSECONDS)
.createWithDefaultString("1s")

val RPC_SLOW_INTERVAL: OptionalConfigEntry[Long] =
buildConf("celeborn.rpc.slow.interval")
.categories("network")
.doc("min interval (ms) for RPC framework to log slow RPC")
.version("0.6.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

val RPC_SUMMARY_DUMP_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.rpc.dump.interval")
.categories("network")
.doc("min interval (ms) for RPC framework to dump performance summary")
.version("0.6.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")

val NETWORK_IO_MODE: ConfigEntry[String] =
buildConf("celeborn.<module>.io.mode")
.categories("network")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,16 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
}
}

def updateTimer(name: String, value: Long): Unit = {
updateTimer(name, value, Map.empty[String, String])
}

def updateTimer(metricsName: String, value: Long, labels: Map[String, String]): Unit = {
val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels)
val (namedTimer, _) = namedTimers.get(metricNameWithLabel)
namedTimer.timer.update(value, TimeUnit.NANOSECONDS)
}

def incCounter(metricsName: String): Unit = {
incCounter(metricsName, 1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ abstract class RpcEnv(config: RpcEnvConfig) {
* that contains [[RpcEndpointRef]]s, the deserialization codes should be wrapped by this method.
*/
def deserialize[T](deserializationAction: () => T): T

def rpcSource(): RpcSource
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.common.rpc

import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._

import com.codahale.metrics.{Histogram, UniformReservoir}
import com.google.protobuf.GeneratedMessageV3

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.RpcNameConstants
import org.apache.celeborn.common.protocol.message.Message
import org.apache.celeborn.common.rpc.netty.{InboxMessage, OneWayMessage, RpcEndpointVerifier, RpcMessage}
import org.apache.celeborn.common.util.JavaUtils

private[celeborn] class RpcMetricsTracker(
val name: String,
rpcSource: RpcSource,
conf: CelebornConf) extends Logging {

// Histogram is used for Client, eg LifecycleManager
val histogramMap: ConcurrentMap[String, Histogram] =
JavaUtils.newConcurrentHashMap[String, Histogram]

private val maxQueueLength: AtomicLong = new AtomicLong(0)
private val slowRpcThreshold: Long = conf.rpcSlowThresholdNs()
private val slowRpcInterval: Long = conf.rpcSlowIntervalMs()
private val rpcDumpInterval: Long = conf.rpcDumpIntervalMs()
private val lastDumpTime: AtomicLong = new AtomicLong(0)
private val lastSlowLogTime: AtomicLong = new AtomicLong(0)
final private val useHistogram =
if (name == RpcNameConstants.LIFECYCLE_MANAGER_EP || name == RpcEndpointVerifier.NAME) {
true
} else {
false
}
final private val QUEUE_LENGTH_METRIC = s"${name}_${RpcSource.QUEUE_LENGTH}"
final private val QUEUE_TIME_METRIC = s"${name}_${RpcSource.QUEUE_TIME}"
final private val PROCESS_TIME_METRIC = s"${name}_${RpcSource.PROCESS_TIME}"

private var queueLengthFunc: () => Long = _

def init(lengthFunc: () => Long): Unit = {
queueLengthFunc = lengthFunc
if (name != null) {
rpcSource.addGauge(QUEUE_LENGTH_METRIC)(queueLengthFunc)

rpcSource.addTimer(QUEUE_TIME_METRIC)
rpcSource.addTimer(PROCESS_TIME_METRIC)
}
}

def updateHistogram(name: String, value: Long): Unit = {
histogramMap.putIfAbsent(name, new Histogram(new UniformReservoir()))
val histogram = histogramMap.get(name)
histogram.update(value)
}

def updateMaxLength(): Unit = {
val len = queueLengthFunc()
if (len > maxQueueLength.get()) {
maxQueueLength.set(len)
}
}

private def logSlowRpc(message: InboxMessage, queueTime: Long, processTime: Long): Unit = {
if (queueTime + processTime > slowRpcThreshold) {
val lastLogTime = lastSlowLogTime.get()
if (slowRpcInterval < 0 || System.currentTimeMillis() - lastLogTime > slowRpcInterval &&
lastSlowLogTime.compareAndSet(lastLogTime, System.currentTimeMillis())) {
logWarning(
s"slow rpc detected: currentQueueSize = ${queueLengthFunc()}, queueTime=$queueTime processTime=$processTime message=$message")
}

val lastTime = lastDumpTime.get
if (useHistogram && System.currentTimeMillis() - lastTime > rpcDumpInterval &&
lastDumpTime.compareAndSet(lastTime, System.currentTimeMillis())) {
dump()
}
}
}

def record(message: Any, queueTime: Long, processTime: Long): Unit = {
def messageName(message: Any): String = {
message match {
case legacy: Message =>
legacy.getClass.toString
case pb: GeneratedMessageV3 =>
pb.getDescriptorForType.getFullName
case _: RpcEndpointVerifier.CheckExistence =>
"CheckExistence"
case _ =>
"unknown"
}
}
val msgName = messageName(message)

if (useHistogram) {
updateHistogram(QUEUE_TIME_METRIC, queueTime)
updateHistogram(PROCESS_TIME_METRIC, processTime)
updateHistogram(msgName, processTime)
} else {
rpcSource.updateTimer(QUEUE_TIME_METRIC, queueTime)
rpcSource.updateTimer(PROCESS_TIME_METRIC, processTime)
rpcSource.updateTimer(msgName, processTime)
}
}

def update(message: InboxMessage): Unit = {
message match {
case rpc @ RpcMessage(_, content, _) =>
val queueTime = rpc.dequeueTime - rpc.enqueueTime
val processTime = rpc.endProcessTime - rpc.dequeueTime
record(content, queueTime, processTime)
logSlowRpc(message, queueTime, processTime)
case one @ OneWayMessage(_, content) =>
val queueTime = one.dequeueTime - one.enqueueTime
val processTime = one.endProcessTime - one.dequeueTime
record(content, queueTime, processTime)
logSlowRpc(message, queueTime, processTime)
case _ =>
}
}

def dump(): Unit = {
if (!useHistogram)
return

val builder = new StringBuilder();
builder.append(s"RPC statistics for $name").append("\n")
builder.append(s"current queue size = ${queueLengthFunc()}").append("\n")
builder.append(s"max queue length = ${maxQueueLength.get()}").append("\n")
histogramMap.entrySet.asScala.foreach(entry => {
val histogram = entry.getValue
val snapshot = histogram.getSnapshot;
builder.append(s"histogram for $name RPC metrics: ").append(entry.getKey).append("\n")
builder.append("count: ").append(histogram.getCount).append("\n")
.append("min: ").append(snapshot.getMin).append("\n")
.append("mean: ").append(snapshot.getMean).append("\n")
.append("p50: ").append(snapshot.getMedian).append("\n")
.append("p75: ").append(snapshot.get75thPercentile).append("\n")
.append("p95: ").append(snapshot.get95thPercentile).append("\n")
.append("p99: ").append(snapshot.get99thPercentile()).append("\n")
.append("max: ").append(snapshot.getMax).append("\n")
})
logInfo(builder.toString())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.common.rpc

import java.util.concurrent.ConcurrentHashMap

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.metrics.source.AbstractSource

class RpcSource(conf: CelebornConf) extends AbstractSource(conf, RpcSource.ROLE_RPC) {
override def sourceName: String = RpcSource.ROLE_RPC

private val msgNameSet = ConcurrentHashMap.newKeySet[String]()

override def updateTimer(name: String, value: Long): Unit = {
if (!msgNameSet.contains(name)) {
super.addTimer(name)
msgNameSet.add(name)
ErikFang marked this conversation as resolved.
Show resolved Hide resolved
}
super.updateTimer(name, value)
}

override def addTimer(name: String): Unit = {
if (!msgNameSet.contains(name)) {
super.addTimer(name)
msgNameSet.add(name)
}
}

startCleaner()
}

object RpcSource {
val ROLE_RPC = "RPC"

val QUEUE_LENGTH = "QueueLength"
val QUEUE_TIME = "QueueTime"
val PROCESS_TIME = "ProcessTime"
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils}
/**
* A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
*/
private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, rpcSource: RpcSource) extends Logging {

private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val celebornConf = nettyEnv.celebornConf
val inbox = new Inbox(ref, endpoint, celebornConf)
val inbox =
new Inbox(ref, endpoint, celebornConf, new RpcMetricsTracker(name, rpcSource, celebornConf))
}

private val endpoints: ConcurrentMap[String, EndpointData] =
Expand Down
Loading
Loading