From 9aad7fa91dfccde07f33048eeb6da5afcae2c32d Mon Sep 17 00:00:00 2001 From: shiyuhang <1136742008@qq.com> Date: Mon, 2 Sep 2024 17:50:29 +0800 Subject: [PATCH] gc --- .../pingcap/tispark/safepoint/ServiceSafePoint.scala | 12 +++++++++++- .../main/scala/org/apache/spark/sql/TiContext.scala | 3 ++- .../resources/tidb_config.properties.TLS.template | 2 +- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/com/pingcap/tispark/safepoint/ServiceSafePoint.scala b/core/src/main/scala/com/pingcap/tispark/safepoint/ServiceSafePoint.scala index 1b6a072962..cc0983b1fe 100644 --- a/core/src/main/scala/com/pingcap/tispark/safepoint/ServiceSafePoint.scala +++ b/core/src/main/scala/com/pingcap/tispark/safepoint/ServiceSafePoint.scala @@ -18,6 +18,7 @@ package com.pingcap.tispark.safepoint import com.google.common.util.concurrent.ThreadFactoryBuilder import com.pingcap.tikv.ClientSession +import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactory import org.tikv.common.exception.TiInternalException import org.tikv.common.meta.TiTimestamp @@ -29,7 +30,8 @@ case class ServiceSafePoint( serviceId: String, ttl: Long, GCMaxWaitTime: Long, - clientSession: ClientSession) { + clientSession: ClientSession, + sparkSession: SparkSession) { private val PD_UPDATE_SAFE_POINT_BACKOFF: Int = 20 * 1000 private final val logger = LoggerFactory.getLogger(getClass.getName) @@ -38,6 +40,14 @@ case class ServiceSafePoint( new ThreadFactoryBuilder().setNameFormat("serviceSafePoint-thread-%d").setDaemon(true).build) service.scheduleAtFixedRate( () => { + val now = clientSession.getTiKVSession.getTimestamp + if (now.getPhysical - TiTimestamp.extractPhysical(minStartTs) >= GCMaxWaitTime * 1000) { + val msg = + s"Can not pause GC more than spark.tispark.gc_max_wait_time=$GCMaxWaitTime s. start_ts: ${minStartTs}, now: ${now.getVersion}. You can adjust spark.tispark.gc_max_wait_time to increase the gc max wait time." + logger.error(msg) + sparkSession.stop() + throw new TiInternalException(msg) + } if (minStartTs != Long.MaxValue) { val safePoint = clientSession.getTiKVSession.getPDClient.updateServiceGCSafePoint( serviceId, diff --git a/core/src/main/scala/org/apache/spark/sql/TiContext.scala b/core/src/main/scala/org/apache/spark/sql/TiContext.scala index ad2ce788ab..90334cc59b 100644 --- a/core/src/main/scala/org/apache/spark/sql/TiContext.scala +++ b/core/src/main/scala/org/apache/spark/sql/TiContext.scala @@ -65,7 +65,8 @@ class TiContext(val sparkSession: SparkSession) extends Serializable with Loggin "tispark_" + UUID.randomUUID, TiConfigConst.DEFAULT_GC_SAFE_POINT_TTL, GCMaxWaitTime, - clientSession) + clientSession, + sparkSession) sparkSession.sparkContext.addSparkListener(new SparkListener() { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { diff --git a/core/src/test/resources/tidb_config.properties.TLS.template b/core/src/test/resources/tidb_config.properties.TLS.template index 004b576353..0f2612564a 100644 --- a/core/src/test/resources/tidb_config.properties.TLS.template +++ b/core/src/test/resources/tidb_config.properties.TLS.template @@ -17,4 +17,4 @@ spark.tispark.jdbc.tls_enable=true spark.tispark.jdbc.server_cert_store = /config/cert/jks/server-cert-store spark.tispark.jdbc.server_cert_password = 12345678 spark.tispark.jdbc.client_cert_store = /config/cert/jks/client-keystore -spark.tispark.jdbc.client_cert_password = 123456 \ No newline at end of file +spark.tispark.jdbc.client_cert_password = 123456