Skip to content

Commit

Permalink
Print to logs (#6301)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnyaug authored Aug 1, 2023
1 parent a86cc87 commit ea3f640
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import java.net.URI
import java.nio.charset.Charset
import java.util.stream.Collectors
import scala.collection.JavaConverters._
import org.slf4j.LoggerFactory
import org.slf4j.Logger

trait BulkRemover {
private val logger: Logger = LoggerFactory.getLogger(getClass.toString)

/** Provides the max bulk size allowed by the underlying SDK client that does the actual deletion.
*
Expand All @@ -36,7 +39,7 @@ trait BulkRemover {
keepNsSchemeAndHost: Boolean,
applyUTF8Encoding: Boolean
): Seq[String] = {
println("storageNamespace: " + storageNamespace)
logger.info("storageNamespace: " + storageNamespace)
var removeKeyNames =
StorageUtils.concatKeysToStorageNamespace(keys, storageNamespace, keepNsSchemeAndHost)
if (applyUTF8Encoding) {
Expand Down Expand Up @@ -72,7 +75,7 @@ object BulkRemoverFactory {
keepNsSchemeAndHost = false,
applyUTF8Encoding = false
)
println(s"Remove keys from ${bucket}: ${removeKeyNames.take(100).mkString(", ")}")
logger.info(s"Remove keys from ${bucket}: ${removeKeyNames.take(100).mkString(", ")}")
val removeKeys = removeKeyNames.map(k => new model.DeleteObjectsRequest.KeyVersion(k)).asJava

val delObjReq = new model.DeleteObjectsRequest(bucket).withKeys(removeKeys)
Expand All @@ -86,14 +89,14 @@ object BulkRemoverFactory {

// TODO(ariels): Metric!
val errors = mde.getErrors();
println(s"deleteObjects: Partial failure: ${errors.size} errors: ${errors}")
logger.info(s"deleteObjects: Partial failure: ${errors.size} errors: ${errors}")
errors.asScala.foreach((de) =>
println(s"\t${de.getKey}: [${de.getCode}] ${de.getMessage}")
logger.info(s"\t${de.getKey}: [${de.getCode}] ${de.getMessage}")
)
mde.getDeletedObjects.asScala.map(_.getKey)
}
case e: Exception => {
println(s"deleteObjects failed: ${e}")
logger.info(s"deleteObjects failed: ${e}")
throw e
}
}
Expand All @@ -110,7 +113,7 @@ object BulkRemoverFactory {

override def deleteObjects(keys: Seq[String], storageNamespace: String): Seq[String] = {
val removeKeyNames = constructRemoveKeyNames(keys, storageNamespace, true, true)
println(s"Remove keys: ${removeKeyNames.take(100).mkString(", ")}")
logger.info(s"Remove keys: ${removeKeyNames.take(100).mkString(", ")}")
val removeKeys = removeKeyNames.asJava
val blobBatchClient = client.blobBatchClient

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ object StorageUtils {
}

class S3RetryDeleteObjectsCondition extends SDKDefaultRetryCondition {
private val logger: Logger = LoggerFactory.getLogger(getClass.toString)
private val XML_PARSE_BROKEN = "Failed to parse XML document"

private val clock = java.time.Clock.systemDefaultZone
Expand All @@ -174,15 +175,15 @@ class S3RetryDeleteObjectsCondition extends SDKDefaultRetryCondition {
exception match {
case ce: SdkClientException =>
if (ce.getMessage contains XML_PARSE_BROKEN) {
println(s"Retry $originalRequest @$now: Received non-XML: $ce")
logger.info(s"Retry $originalRequest @$now: Received non-XML: $ce")
} else if (RetryUtils.isThrottlingException(ce)) {
println(s"Retry $originalRequest @$now: Throttled: $ce")
logger.info(s"Retry $originalRequest @$now: Throttled: $ce")
} else {
println(s"Retry $originalRequest @$now: Other client exception: $ce")
logger.info(s"Retry $originalRequest @$now: Other client exception: $ce")
}
true
case e => {
println(s"Do not retry $originalRequest @$now: Non-AWS exception: $e")
logger.info(s"Do not retry $originalRequest @$now: Non-AWS exception: $e")
super.shouldRetry(originalRequest, exception, retriesAttempted)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ import java.net.URI
import java.time.format.DateTimeFormatter
import java.util.Date
import scala.collection.JavaConverters._
import org.slf4j.LoggerFactory
import org.slf4j.Logger

object GarbageCollection {
private final val logger: Logger = LoggerFactory.getLogger(getClass.toString)
final val UNIFIED_GC_SOURCE_NAME = "unified_gc"
private final val DATA_PREFIX = "data/"

Expand Down Expand Up @@ -199,10 +202,10 @@ object GarbageCollection {
// delete marked addresses
if (shouldSweep) {
val markedAddresses = if (shouldMark) {
println("deleting marked addresses from run ID: " + runID)
logger.info("deleting marked addresses from run ID: " + runID)
addressesToDelete
} else {
println("deleting marked addresses from mark ID: " + markID)
logger.info("deleting marked addresses from mark ID: " + markID)
readMarkedAddresses(storageNamespace, markID, outputPrefix)
}

Expand All @@ -212,6 +215,7 @@ object GarbageCollection {
)
val configMapper = new ConfigMapper(hcValues)
bulkRemove(configMapper, markedAddresses, storageNSForSdkClient, region, storageType)
logger.info("finished deleting")
}

// Flow completed successfully - set success to true
Expand Down Expand Up @@ -270,7 +274,7 @@ object GarbageCollection {
outputPrefix: String = "unified"
): Unit = {
val reportDst = formatRunPath(storageNamespace, runID, outputPrefix)
println(s"Report for mark_id=$runID path=$reportDst")
logger.info(s"Report for mark_id=$runID path=$reportDst")

expiredAddresses.write.parquet(s"$reportDst/deleted")
expiredAddresses.write.text(s"$reportDst/deleted.text")
Expand All @@ -284,7 +288,7 @@ object GarbageCollection {
success,
expiredAddresses.count()
)
println(s"Report summary=$summary")
logger.info(s"Report summary=$summary")
}

private def formatRunPath(
Expand Down Expand Up @@ -313,7 +317,7 @@ object GarbageCollection {
} else {
val deletedPath = new Path(formatRunPath(storageNamespace, markID, outputPrefix) + "/deleted")
if (!fs.exists(deletedPath)) {
println(s"Mark ID ($markID) does not contain deleted files")
logger.info(s"Mark ID ($markID) does not contain deleted files")
spark.emptyDataFrame.withColumn("address", lit(""))
} else {
spark.read.parquet(deletedPath.toString)
Expand Down

0 comments on commit ea3f640

Please sign in to comment.