diff --git a/clients/spark/core/src/main/scala/io/treeverse/clients/BulkRemoverFactory.scala b/clients/spark/core/src/main/scala/io/treeverse/clients/BulkRemoverFactory.scala index 76a23d0906b..3badf47976a 100644 --- a/clients/spark/core/src/main/scala/io/treeverse/clients/BulkRemoverFactory.scala +++ b/clients/spark/core/src/main/scala/io/treeverse/clients/BulkRemoverFactory.scala @@ -12,8 +12,11 @@ import java.net.URI import java.nio.charset.Charset import java.util.stream.Collectors import scala.collection.JavaConverters._ +import org.slf4j.LoggerFactory +import org.slf4j.Logger trait BulkRemover { + private val logger: Logger = LoggerFactory.getLogger(getClass.toString) /** Provides the max bulk size allowed by the underlying SDK client that does the actual deletion. * @@ -36,7 +39,7 @@ trait BulkRemover { keepNsSchemeAndHost: Boolean, applyUTF8Encoding: Boolean ): Seq[String] = { - println("storageNamespace: " + storageNamespace) + logger.info("storageNamespace: " + storageNamespace) var removeKeyNames = StorageUtils.concatKeysToStorageNamespace(keys, storageNamespace, keepNsSchemeAndHost) if (applyUTF8Encoding) { @@ -72,7 +75,7 @@ object BulkRemoverFactory { keepNsSchemeAndHost = false, applyUTF8Encoding = false ) - println(s"Remove keys from ${bucket}: ${removeKeyNames.take(100).mkString(", ")}") + logger.info(s"Remove keys from ${bucket}: ${removeKeyNames.take(100).mkString(", ")}") val removeKeys = removeKeyNames.map(k => new model.DeleteObjectsRequest.KeyVersion(k)).asJava val delObjReq = new model.DeleteObjectsRequest(bucket).withKeys(removeKeys) @@ -86,14 +89,14 @@ object BulkRemoverFactory { // TODO(ariels): Metric! val errors = mde.getErrors(); - println(s"deleteObjects: Partial failure: ${errors.size} errors: ${errors}") + logger.info(s"deleteObjects: Partial failure: ${errors.size} errors: ${errors}") errors.asScala.foreach((de) => - println(s"\t${de.getKey}: [${de.getCode}] ${de.getMessage}") + logger.info(s"\t${de.getKey}: [${de.getCode}] ${de.getMessage}") ) mde.getDeletedObjects.asScala.map(_.getKey) } case e: Exception => { - println(s"deleteObjects failed: ${e}") + logger.info(s"deleteObjects failed: ${e}") throw e } } @@ -110,7 +113,7 @@ object BulkRemoverFactory { override def deleteObjects(keys: Seq[String], storageNamespace: String): Seq[String] = { val removeKeyNames = constructRemoveKeyNames(keys, storageNamespace, true, true) - println(s"Remove keys: ${removeKeyNames.take(100).mkString(", ")}") + logger.info(s"Remove keys: ${removeKeyNames.take(100).mkString(", ")}") val removeKeys = removeKeyNames.asJava val blobBatchClient = client.blobBatchClient diff --git a/clients/spark/core/src/main/scala/io/treeverse/clients/StorageUtils.scala b/clients/spark/core/src/main/scala/io/treeverse/clients/StorageUtils.scala index 450d98ad9c3..1304756ab6d 100644 --- a/clients/spark/core/src/main/scala/io/treeverse/clients/StorageUtils.scala +++ b/clients/spark/core/src/main/scala/io/treeverse/clients/StorageUtils.scala @@ -161,6 +161,7 @@ object StorageUtils { } class S3RetryDeleteObjectsCondition extends SDKDefaultRetryCondition { + private val logger: Logger = LoggerFactory.getLogger(getClass.toString) private val XML_PARSE_BROKEN = "Failed to parse XML document" private val clock = java.time.Clock.systemDefaultZone @@ -174,15 +175,15 @@ class S3RetryDeleteObjectsCondition extends SDKDefaultRetryCondition { exception match { case ce: SdkClientException => if (ce.getMessage contains XML_PARSE_BROKEN) { - println(s"Retry $originalRequest @$now: Received non-XML: $ce") + logger.info(s"Retry $originalRequest @$now: Received non-XML: $ce") } else if (RetryUtils.isThrottlingException(ce)) { - println(s"Retry $originalRequest @$now: Throttled: $ce") + logger.info(s"Retry $originalRequest @$now: Throttled: $ce") } else { - println(s"Retry $originalRequest @$now: Other client exception: $ce") + logger.info(s"Retry $originalRequest @$now: Other client exception: $ce") } true case e => { - println(s"Do not retry $originalRequest @$now: Non-AWS exception: $e") + logger.info(s"Do not retry $originalRequest @$now: Non-AWS exception: $e") super.shouldRetry(originalRequest, exception, retriesAttempted) } } diff --git a/clients/spark/core/src/main/scala/io/treeverse/gc/GarbageCollection.scala b/clients/spark/core/src/main/scala/io/treeverse/gc/GarbageCollection.scala index 7d17b46de68..cfcd08f208e 100644 --- a/clients/spark/core/src/main/scala/io/treeverse/gc/GarbageCollection.scala +++ b/clients/spark/core/src/main/scala/io/treeverse/gc/GarbageCollection.scala @@ -15,8 +15,11 @@ import java.net.URI import java.time.format.DateTimeFormatter import java.util.Date import scala.collection.JavaConverters._ +import org.slf4j.LoggerFactory +import org.slf4j.Logger object GarbageCollection { + private final val logger: Logger = LoggerFactory.getLogger(getClass.toString) final val UNIFIED_GC_SOURCE_NAME = "unified_gc" private final val DATA_PREFIX = "data/" @@ -199,10 +202,10 @@ object GarbageCollection { // delete marked addresses if (shouldSweep) { val markedAddresses = if (shouldMark) { - println("deleting marked addresses from run ID: " + runID) + logger.info("deleting marked addresses from run ID: " + runID) addressesToDelete } else { - println("deleting marked addresses from mark ID: " + markID) + logger.info("deleting marked addresses from mark ID: " + markID) readMarkedAddresses(storageNamespace, markID, outputPrefix) } @@ -212,6 +215,7 @@ object GarbageCollection { ) val configMapper = new ConfigMapper(hcValues) bulkRemove(configMapper, markedAddresses, storageNSForSdkClient, region, storageType) + logger.info("finished deleting") } // Flow completed successfully - set success to true @@ -270,7 +274,7 @@ object GarbageCollection { outputPrefix: String = "unified" ): Unit = { val reportDst = formatRunPath(storageNamespace, runID, outputPrefix) - println(s"Report for mark_id=$runID path=$reportDst") + logger.info(s"Report for mark_id=$runID path=$reportDst") expiredAddresses.write.parquet(s"$reportDst/deleted") expiredAddresses.write.text(s"$reportDst/deleted.text") @@ -284,7 +288,7 @@ object GarbageCollection { success, expiredAddresses.count() ) - println(s"Report summary=$summary") + logger.info(s"Report summary=$summary") } private def formatRunPath( @@ -313,7 +317,7 @@ object GarbageCollection { } else { val deletedPath = new Path(formatRunPath(storageNamespace, markID, outputPrefix) + "/deleted") if (!fs.exists(deletedPath)) { - println(s"Mark ID ($markID) does not contain deleted files") + logger.info(s"Mark ID ($markID) does not contain deleted files") spark.emptyDataFrame.withColumn("address", lit("")) } else { spark.read.parquet(deletedPath.toString)