diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 046c9c95a7..0d1729780e 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -532,6 +532,10 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se if (num != 0) num else availableCores } + def rpcSlowThresholdNs(): Long = get(RPC_SLOW_THRESHOLD) + def rpcSlowIntervalMs(): Long = get(RPC_SLOW_INTERVAL).getOrElse(-1) + def rpcDumpIntervalMs(): Long = get(RPC_SUMMARY_DUMP_INTERVAL) + def networkIoMode(module: String): String = { getTransportConf(module, NETWORK_IO_MODE) } @@ -1865,6 +1869,30 @@ object CelebornConf extends Logging { .doc("Threads number of message dispatcher event loop for roles") .fallbackConf(RPC_DISPATCHER_THREADS) + val RPC_SLOW_THRESHOLD: ConfigEntry[Long] = + buildConf("celeborn.rpc.slow.threshold") + .categories("network") + .doc("threshold for RPC framework to log slow RPC") + .version("0.6.0") + .timeConf(TimeUnit.NANOSECONDS) + .createWithDefaultString("1s") + + val RPC_SLOW_INTERVAL: OptionalConfigEntry[Long] = + buildConf("celeborn.rpc.slow.interval") + .categories("network") + .doc("min interval (ms) for RPC framework to log slow RPC") + .version("0.6.0") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional + + val RPC_SUMMARY_DUMP_INTERVAL: ConfigEntry[Long] = + buildConf("celeborn.rpc.dump.interval") + .categories("network") + .doc("min interval (ms) for RPC framework to dump performance summary") + .version("0.6.0") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("60s") + val NETWORK_IO_MODE: ConfigEntry[String] = buildConf("celeborn..io.mode") .categories("network") diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 95562c9117..428af8a909 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -255,6 +255,16 @@ abstract class AbstractSource(conf: CelebornConf, role: String) } } + def updateTimer(name: String, value: Long): Unit = { + updateTimer(name, value, Map.empty[String, String]) + } + + def updateTimer(metricsName: String, value: Long, labels: Map[String, String]): Unit = { + val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels) + val (namedTimer, _) = namedTimers.get(metricNameWithLabel) + namedTimer.timer.update(value, TimeUnit.NANOSECONDS) + } + def incCounter(metricsName: String): Unit = { incCounter(metricsName, 1) } diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala index 4e2bc35c93..31f69a377b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala @@ -163,6 +163,8 @@ abstract class RpcEnv(config: RpcEnvConfig) { * that contains [[RpcEndpointRef]]s, the deserialization codes should be wrapped by this method. */ def deserialize[T](deserializationAction: () => T): T + + def rpcSource(): RpcSource } /** diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala new file mode 100644 index 0000000000..22a715c34f --- /dev/null +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.rpc + +import java.util.concurrent.ConcurrentMap +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.JavaConverters._ + +import com.codahale.metrics.{Histogram, UniformReservoir} +import com.google.protobuf.GeneratedMessageV3 + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.common.protocol.RpcNameConstants +import org.apache.celeborn.common.protocol.message.Message +import org.apache.celeborn.common.rpc.netty.{InboxMessage, OneWayMessage, RpcEndpointVerifier, RpcMessage} +import org.apache.celeborn.common.util.JavaUtils + +private[celeborn] class RpcMetricsTracker( + val name: String, + rpcSource: RpcSource, + conf: CelebornConf) extends Logging { + + // Histogram is used for Client, eg LifecycleManager + val histogramMap: ConcurrentMap[String, Histogram] = + JavaUtils.newConcurrentHashMap[String, Histogram] + + private val maxQueueLength: AtomicLong = new AtomicLong(0) + private val slowRpcThreshold: Long = conf.rpcSlowThresholdNs() + private val slowRpcInterval: Long = conf.rpcSlowIntervalMs() + private val rpcDumpInterval: Long = conf.rpcDumpIntervalMs() + private val lastDumpTime: AtomicLong = new AtomicLong(0) + private val lastSlowLogTime: AtomicLong = new AtomicLong(0) + final private val useHistogram = + if (name == RpcNameConstants.LIFECYCLE_MANAGER_EP || name == RpcEndpointVerifier.NAME) { + true + } else { + false + } + final private val QUEUE_LENGTH_METRIC = s"${name}_${RpcSource.QUEUE_LENGTH}" + final private val QUEUE_TIME_METRIC = s"${name}_${RpcSource.QUEUE_TIME}" + final private val PROCESS_TIME_METRIC = s"${name}_${RpcSource.PROCESS_TIME}" + + private var queueLengthFunc: () => Long = _ + + def init(lengthFunc: () => Long): Unit = { + queueLengthFunc = lengthFunc + if (name != null) { + rpcSource.addGauge(QUEUE_LENGTH_METRIC)(queueLengthFunc) + + rpcSource.addTimer(QUEUE_TIME_METRIC) + rpcSource.addTimer(PROCESS_TIME_METRIC) + } + } + + def updateHistogram(name: String, value: Long): Unit = { + histogramMap.putIfAbsent(name, new Histogram(new UniformReservoir())) + val histogram = histogramMap.get(name) + histogram.update(value) + } + + def updateMaxLength(): Unit = { + val len = queueLengthFunc() + if (len > maxQueueLength.get()) { + maxQueueLength.set(len) + } + } + + private def logSlowRpc(message: InboxMessage, queueTime: Long, processTime: Long): Unit = { + if (queueTime + processTime > slowRpcThreshold) { + val lastLogTime = lastSlowLogTime.get() + if (slowRpcInterval < 0 || System.currentTimeMillis() - lastLogTime > slowRpcInterval && + lastSlowLogTime.compareAndSet(lastLogTime, System.currentTimeMillis())) { + logWarning( + s"slow rpc detected: currentQueueSize = ${queueLengthFunc()}, queueTime=$queueTime processTime=$processTime message=$message") + } + + val lastTime = lastDumpTime.get + if (useHistogram && System.currentTimeMillis() - lastTime > rpcDumpInterval && + lastDumpTime.compareAndSet(lastTime, System.currentTimeMillis())) { + dump() + } + } + } + + def record(message: Any, queueTime: Long, processTime: Long): Unit = { + def messageName(message: Any): String = { + message match { + case legacy: Message => + legacy.getClass.toString + case pb: GeneratedMessageV3 => + pb.getDescriptorForType.getFullName + case _: RpcEndpointVerifier.CheckExistence => + "CheckExistence" + case _ => + "unknown" + } + } + val msgName = messageName(message) + + if (useHistogram) { + updateHistogram(QUEUE_TIME_METRIC, queueTime) + updateHistogram(PROCESS_TIME_METRIC, processTime) + updateHistogram(msgName, processTime) + } else { + rpcSource.updateTimer(QUEUE_TIME_METRIC, queueTime) + rpcSource.updateTimer(PROCESS_TIME_METRIC, processTime) + rpcSource.updateTimer(msgName, processTime) + } + } + + def update(message: InboxMessage): Unit = { + message match { + case rpc @ RpcMessage(_, content, _) => + val queueTime = rpc.dequeueTime - rpc.enqueueTime + val processTime = rpc.endProcessTime - rpc.dequeueTime + record(content, queueTime, processTime) + logSlowRpc(message, queueTime, processTime) + case one @ OneWayMessage(_, content) => + val queueTime = one.dequeueTime - one.enqueueTime + val processTime = one.endProcessTime - one.dequeueTime + record(content, queueTime, processTime) + logSlowRpc(message, queueTime, processTime) + case _ => + } + } + + def dump(): Unit = { + if (!useHistogram) + return + + val builder = new StringBuilder(); + builder.append(s"RPC statistics for $name").append("\n") + builder.append(s"current queue size = ${queueLengthFunc()}").append("\n") + builder.append(s"max queue length = ${maxQueueLength.get()}").append("\n") + histogramMap.entrySet.asScala.foreach(entry => { + val histogram = entry.getValue + val snapshot = histogram.getSnapshot; + builder.append(s"histogram for $name RPC metrics: ").append(entry.getKey).append("\n") + builder.append("count: ").append(histogram.getCount).append("\n") + .append("min: ").append(snapshot.getMin).append("\n") + .append("mean: ").append(snapshot.getMean).append("\n") + .append("p50: ").append(snapshot.getMedian).append("\n") + .append("p75: ").append(snapshot.get75thPercentile).append("\n") + .append("p95: ").append(snapshot.get95thPercentile).append("\n") + .append("p99: ").append(snapshot.get99thPercentile()).append("\n") + .append("max: ").append(snapshot.getMax).append("\n") + }) + logInfo(builder.toString()) + } +} diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala new file mode 100644 index 0000000000..1acccf0621 --- /dev/null +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.rpc + +import java.util.concurrent.ConcurrentHashMap + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.metrics.source.AbstractSource + +class RpcSource(conf: CelebornConf) extends AbstractSource(conf, RpcSource.ROLE_RPC) { + override def sourceName: String = RpcSource.ROLE_RPC + + private val msgNameSet = ConcurrentHashMap.newKeySet[String]() + + override def updateTimer(name: String, value: Long): Unit = { + if (!msgNameSet.contains(name)) { + super.addTimer(name) + msgNameSet.add(name) + } + super.updateTimer(name, value) + } + + override def addTimer(name: String): Unit = { + if (!msgNameSet.contains(name)) { + super.addTimer(name) + msgNameSet.add(name) + } + } + + startCleaner() +} + +object RpcSource { + val ROLE_RPC = "RPC" + + val QUEUE_LENGTH = "QueueLength" + val QUEUE_TIME = "QueueTime" + val PROCESS_TIME = "ProcessTime" +} diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala index b8a93bda0c..5b73207260 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala @@ -33,14 +33,15 @@ import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils} /** * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s). */ -private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { +private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, rpcSource: RpcSource) extends Logging { private class EndpointData( val name: String, val endpoint: RpcEndpoint, val ref: NettyRpcEndpointRef) { val celebornConf = nettyEnv.celebornConf - val inbox = new Inbox(ref, endpoint, celebornConf) + val inbox = + new Inbox(ref, endpoint, celebornConf, new RpcMetricsTracker(name, rpcSource, celebornConf)) } private val endpoints: ConcurrentMap[String, EndpointData] = diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala index 7502524565..b179128fb5 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala @@ -26,9 +26,15 @@ import scala.util.control.NonFatal import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.exception.CelebornException import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.common.rpc.{RpcAddress, RpcEndpoint, ThreadSafeRpcEndpoint} +import org.apache.celeborn.common.rpc.{RpcAddress, RpcEndpoint, RpcMetricsTracker, ThreadSafeRpcEndpoint} -sealed private[celeborn] trait InboxMessage +sealed private[celeborn] trait InboxMessage extends RpcTimeMetrics + +private[celeborn] trait RpcTimeMetrics { + var enqueueTime: Long = 0 + var dequeueTime: Long = 0 + var endProcessTime: Long = 0 +} private[celeborn] case class OneWayMessage( senderAddress: RpcAddress, @@ -68,7 +74,8 @@ private[celeborn] case class RemoteProcessConnectionError( private[celeborn] class Inbox( val endpointRef: NettyRpcEndpointRef, val endpoint: RpcEndpoint, - val conf: CelebornConf) extends Logging { + val conf: CelebornConf, + val metrics: RpcMetricsTracker) extends Logging { inbox => // Give this an alias so we can use it more clearly in closures. @@ -94,10 +101,14 @@ private[celeborn] class Inbox( @GuardedBy("this") private var numActiveThreads = 0 + metrics.init(() => messageCount.get()) + // OnStart should be the first message to process try { inboxLock.lockInterruptibly() messages.add(OnStart) + RpcTimeMetrics.updateTime(OnStart, RpcTimeMetrics.Enqueue) + metrics.updateMaxLength() messageCount.incrementAndGet() } finally { inboxLock.unlock() @@ -204,6 +215,7 @@ private[celeborn] class Inbox( message = messages.poll() if (message != null) { numActiveThreads += 1 + RpcTimeMetrics.updateTime(message, RpcTimeMetrics.Dequeue) messageCount.decrementAndGet() signalNotFull() } else { @@ -214,7 +226,7 @@ private[celeborn] class Inbox( } while (true) { - safelyCall(endpoint, endpointRef.name) { + safelyCall(endpoint, endpointRef.name, message) { processInternal(dispatcher, message) } try { @@ -231,6 +243,7 @@ private[celeborn] class Inbox( numActiveThreads -= 1 return } else { + RpcTimeMetrics.updateTime(message, RpcTimeMetrics.Dequeue) messageCount.decrementAndGet() signalNotFull() } @@ -248,6 +261,8 @@ private[celeborn] class Inbox( onDrop(message) } else { addMessage(message) + RpcTimeMetrics.updateTime(message, RpcTimeMetrics.Enqueue) + metrics.updateMaxLength() } } finally { inboxLock.unlock() @@ -266,6 +281,7 @@ private[celeborn] class Inbox( enableConcurrent = false stopped = true addMessage(OnStop) + metrics.dump() // Note: The concurrent events in messages will be processed one by one. } } finally { @@ -295,7 +311,8 @@ private[celeborn] class Inbox( */ private def safelyCall( endpoint: RpcEndpoint, - endpointRefName: String)(action: => Unit): Unit = { + endpointRefName: String, + message: InboxMessage)(action: => Unit): Unit = { def dealWithFatalError(fatal: Throwable): Unit = { try { inboxLock.lockInterruptibly() @@ -327,6 +344,9 @@ private[celeborn] class Inbox( } case fatal: Throwable => dealWithFatalError(fatal) + } finally { + RpcTimeMetrics.updateTime(message, RpcTimeMetrics.Process) + metrics.update(message) } } @@ -340,3 +360,18 @@ private[celeborn] class Inbox( } } } + +private[celeborn] object RpcTimeMetrics { + trait MetricsType + case object Enqueue extends MetricsType + case object Dequeue extends MetricsType + case object Process extends MetricsType + def updateTime(message: InboxMessage, op: MetricsType): Unit = { + (message, op) match { + case (msg: RpcTimeMetrics, Enqueue) => msg.enqueueTime = System.nanoTime + case (msg: RpcTimeMetrics, Dequeue) => msg.dequeueTime = System.nanoTime + case (msg: RpcTimeMetrics, Process) => msg.endProcessTime = System.nanoTime + case _ => + } + } +} diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala index 598a589fed..5d2d31f66d 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala @@ -55,7 +55,9 @@ class NettyRpcEnv( config.transportModule, celebornConf.rpcIoThreads.getOrElse(config.numUsableCores)) - private val dispatcher: Dispatcher = new Dispatcher(this) + private val source: RpcSource = new RpcSource(celebornConf) + + private val dispatcher: Dispatcher = new Dispatcher(this, source) private var worker: RpcEndpoint = null @@ -361,6 +363,8 @@ class NettyRpcEnv( deserializationAction() } } + + override def rpcSource(): RpcSource = source } private[celeborn] object NettyRpcEnv extends Logging { diff --git a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala index 553b791d81..83b5b0a1be 100644 --- a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala @@ -25,7 +25,7 @@ import org.scalatest.BeforeAndAfter import org.apache.celeborn.CelebornFunSuite import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.rpc.{RpcAddress, TestRpcEndpoint} +import org.apache.celeborn.common.rpc.{RpcAddress, RpcMetricsTracker, RpcSource, TestRpcEndpoint} import org.apache.celeborn.common.util.ThreadUtils class InboxSuite extends CelebornFunSuite with BeforeAndAfter { @@ -37,10 +37,20 @@ class InboxSuite extends CelebornFunSuite with BeforeAndAfter { testRpcEndpoint: TestRpcEndpoint, onDropOverride: Option[InboxMessage => T]): Inbox = { val rpcEnvRef = mock(classOf[NettyRpcEndpointRef]) + val conf = new CelebornConf() + val source: RpcSource = new RpcSource(conf) if (onDropOverride.isEmpty) { - new Inbox(rpcEnvRef, testRpcEndpoint, new CelebornConf()) + new Inbox( + rpcEnvRef, + testRpcEndpoint, + new CelebornConf(), + new RpcMetricsTracker("testRpc", source, conf)) } else { - new Inbox(rpcEnvRef, testRpcEndpoint, new CelebornConf()) { + new Inbox( + rpcEnvRef, + testRpcEndpoint, + new CelebornConf(), + new RpcMetricsTracker("testRpc", source, conf)) { override protected def onDrop(message: InboxMessage): Unit = { onDropOverride.get(message) } diff --git a/docs/configuration/network.md b/docs/configuration/network.md index 59057daa01..9c3f4ad82c 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -52,9 +52,12 @@ license: | | celeborn.rpc.askTimeout | 60s | false | Timeout for RPC ask operations. It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.availableTypes` | 0.2.0 | | | celeborn.rpc.connect.threads | 64 | false | | 0.2.0 | | | celeborn.rpc.dispatcher.threads | 0 | false | Threads number of message dispatcher event loop. Default to 0, which is availableCore. | 0.3.0 | celeborn.rpc.dispatcher.numThreads | +| celeborn.rpc.dump.interval | 60s | false | min interval (ms) for RPC framework to dump performance summary | 0.6.0 | | | celeborn.rpc.inbox.capacity | 0 | false | Specifies size of the in memory bounded capacity. | 0.5.0 | | | celeborn.rpc.io.threads | <undefined> | false | Netty IO thread number of NettyRpcEnv to handle RPC request. The default threads number is the number of runtime available processors. | 0.2.0 | | | celeborn.rpc.lookupTimeout | 30s | false | Timeout for RPC lookup operations. | 0.2.0 | | +| celeborn.rpc.slow.interval | <undefined> | false | min interval (ms) for RPC framework to log slow RPC | 0.6.0 | | +| celeborn.rpc.slow.threshold | 1s | false | threshold for RPC framework to log slow RPC | 0.6.0 | | | celeborn.shuffle.io.maxChunksBeingTransferred | <undefined> | false | The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see `celeborn..io.maxRetries` and `celeborn..io.retryWait`), if those limits are reached the task will fail with fetch failure. | 0.2.0 | | | celeborn.ssl.<module>.enabled | false | false | Enables SSL for securing wire traffic. | 0.5.0 | | | celeborn.ssl.<module>.enabledAlgorithms | <undefined> | false | A comma-separated list of ciphers. The specified ciphers must be supported by JVM.
The reference list of protocols can be found in the "JSSE Cipher Suite Names" section of the Java security guide. The list for Java 11, for example, can be found at [this page](https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#jsse-cipher-suite-names)
Note: If not set, the default cipher suite for the JRE will be used | 0.5.0 | | diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 1f065b3d85..30789bb627 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -121,6 +121,7 @@ private[celeborn] class Master( Some(externalSecurityContext), None) } + metricsSystem.registerSource(rpcEnv.rpcSource()) // Visible for testing private[master] var internalRpcEnvInUse: RpcEnv = diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 62602cd06b..fca7064340 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -123,6 +123,7 @@ private[celeborn] class Worker( Some(externalSecurityContext), Some(workerSource)) } + metricsSystem.registerSource(rpcEnv.rpcSource()) private[worker] var internalRpcEnvInUse = if (!conf.internalPortEnabled) {