Skip to content

Commit

Permalink
Common: fix ambigrous empty string (close #171)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Oct 30, 2020
1 parent b8d2597 commit 439b737
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,21 +163,23 @@ object RedshiftLoadStatements {
if (transitCopy) {
TransitCopy(SqlString.unsafeCoerce(
s"""COPY ${Common.TransitEventsTable} FROM '$s3path'
| CREDENTIALS 'aws_iam_role=${target.roleArn}' REGION AS '${config.aws.s3.region}'
| DELIMITER '$EventFieldSeparator'
| CREDENTIALS 'aws_iam_role=${target.roleArn}'
| REGION AS '${config.aws.s3.region}'
| MAXERROR ${target.maxError}
| TIMEFORMAT 'auto'
| DELIMITER '$EventFieldSeparator'
| EMPTYASNULL
| FILLRECORD
| TRUNCATECOLUMNS
| ACCEPTINVCHARS $compressionFormat""".stripMargin))
} else {
StraightCopy(SqlString.unsafeCoerce(
s"""COPY $eventsTable FROM '$s3path'
| CREDENTIALS 'aws_iam_role=${target.roleArn}' REGION AS '${config.aws.s3.region}'
| DELIMITER '$EventFieldSeparator'
| CREDENTIALS 'aws_iam_role=${target.roleArn}'
| REGION AS '${config.aws.s3.region}'
| MAXERROR ${target.maxError}
| TIMEFORMAT 'auto'
| DELIMITER '$EventFieldSeparator'
| EMPTYASNULL
| FILLRECORD
| TRUNCATECOLUMNS
Expand Down Expand Up @@ -234,8 +236,9 @@ object RedshiftLoadStatements {
| TIMEFORMAT 'auto'
| DELIMITER '$EventFieldSeparator'
| TRUNCATECOLUMNS
| ACCEPTINVCHARS $compressionFormat
| EMPTYASNULL
| ACCEPTINVCHARS $compressionFormat""".stripMargin)
| REMOVEQUOTES""".stripMargin)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ class RedshiftLoaderSpec extends Specification { def is = s2"""

val atomic =
s"""COPY atomic.events FROM 's3://snowplow-acme-storage/shredded/good/run=2017-05-22-12-20-57/atomic-events/'
| CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1'
| DELIMITER '$separator'
| CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole'
| REGION AS 'us-east-1'
| MAXERROR 1
| TIMEFORMAT 'auto'
| DELIMITER '$separator'
| EMPTYASNULL
| FILLRECORD
| TRUNCATECOLUMNS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import cats.Monad
import cats.data.EitherT
import cats.syntax.either._
import cats.syntax.show._

import cats.effect.Clock

import com.snowplowanalytics.iglu.core._
Expand All @@ -37,7 +38,6 @@ import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.FailureDetails

import com.snowplowanalytics.snowplow.rdbloader.common.Flattening.getOrdered

object EventUtils {
Expand All @@ -50,8 +50,6 @@ object EventUtils {
def alterEnrichedEvent(originalLine: Event, lengths: Map[String, Int]): String = {
def tranformDate(s: String): String =
Either.catchOnly[DateTimeParseException](Instant.parse(s)).map(_.formatted).getOrElse(s)
def transformBool(b: Boolean): String =
if (b) "1" else "0"
def truncate(key: String, value: String): String =
lengths.get(key) match {
case Some(len) => value.take(len)
Expand All @@ -70,6 +68,9 @@ object EventUtils {
tabular.mkString("\t")
}

def transformBool(b: Boolean): String =
if (b) "1" else "0"

/** Build a map of columnName -> maxLength, according to `schema`. Non-string values are not present in the map */
def getAtomicLengths(schema: Json): Either[String, Map[String, Int]] =
for {
Expand All @@ -82,6 +83,7 @@ object EventUtils {
def buildMetadata(rootId: UUID, rootTstamp: Instant, schema: SchemaKey): List[String] =
List(schema.vendor, schema.name, schema.format, schema.version.asString,
rootId.toString, rootTstamp.formatted, "events", s"""["events","${schema.name}"]""", "events")
.map { c => s"'$c'"}

/**
* Transform a self-desribing entity into tabular format, using its known schemas to get a correct order of columns
Expand All @@ -90,11 +92,19 @@ object EventUtils {
* @return list of columns or flattening error
*/
def flatten[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], instance: SelfDescribingData[Json]): EitherT[F, FailureDetails.LoaderIgluError, List[String]] =
getOrdered(resolver, instance.schema).map { ordered => FlatData.flatten(instance.data, ordered, FlatData.getString(Some(escape)), "") }
getOrdered(resolver, instance.schema).map { ordered => FlatData.flatten(instance.data, ordered, getString, "") }

def getString(json: Json): String =
json.fold("",
transformBool,
_ => json.show,
escape,
_ => escape(json.noSpaces),
_ => escape(json.noSpaces))

/** Prevents data with newlines and tabs from breaking the loading process */
private def escape(s: String): String =
s.replace('\n', ' ').replace('\t', ' ')
s"""'$s'"""

/** Get maximum length for a string value */
private def getLength(schema: Schema): Option[Int] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ object ShredJob extends SparkJob {
// Job configuration
val shredConfig = ShredJobConfig
.loadConfigFrom(args)
.valueOr(e => throw FatalEtlError(e.toString))
.valueOr(e => throw FatalEtlError(e))

val job = new ShredJob(spark, shredConfig)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,13 @@ object EmptySchemaSpec {

object expected {
val contexAContents =
"com.snowplowanalytics.iglu\tanything-a\tjsonschema\t1-0-0\t2b1b25a4-c0df-4859-8201-cf21492ad61b\t2014-05-29 18:16:35.000\tevents\t[\"events\",\"anything-a\"]\tevents"
"'com.snowplowanalytics.iglu'\t'anything-a'\t'jsonschema'\t'1-0-0'\t'2b1b25a4-c0df-4859-8201-cf21492ad61b'\t'2014-05-29 18:16:35.000'\t'events'\t'[\"events\",\"anything-a\"]'\t'events'"
val contexBContents =
"com.snowplowanalytics.iglu\tanything-b\tjsonschema\t1-0-0\t2b1b25a4-c0df-4859-8201-cf21492ad61b\t2014-05-29 18:16:35.000\tevents\t[\"events\",\"anything-b\"]\tevents"
"'com.snowplowanalytics.iglu'\t'anything-b'\t'jsonschema'\t'1-0-0'\t'2b1b25a4-c0df-4859-8201-cf21492ad61b'\t'2014-05-29 18:16:35.000'\t'events'\t'[\"events\",\"anything-b\"]'\t'events'"

val contextAPath = s"shredded-tsv/vendor=com.snowplowanalytics.iglu/name=anything-a/format=jsonschema/version=1"
val contextBPath = s"shredded-tsv/vendor=com.snowplowanalytics.iglu/name=anything-b/format=jsonschema/version=1"

val eventContents =
"com.snowplowanalytics.snowplow\tapplication_error\tjsonschema\t1-0-2\t2b1b25a4-c0df-4859-8201-cf21492ad61b\t2014-05-29 18:16:35.000\tevents\t[\"events\",\"application_error\"]\tevents\tundefined is not a function\tJAVASCRIPT\tAbstractSingletonFactoryBean\t\t1\t\t\t14\t\t\t\tthis column should be last"

// Removed three JSON columns and added 7 columns at the end
val event = """snowplowweb web 2014-06-01 14:04:11.639 2014-05-29 18:16:35.000 2014-05-29 18:04:11.639 page_view 2b1b25a4-c0df-4859-8201-cf21492ad61b 836413 clojure js-2.0.0-M2 clj-0.6.0-tom-0.0.4 hadoop-0.5.0-common-0.4.0 216.207.42.134 3499345421 3b1d1a375044eede 3 2bad2a4e-aae4-4bea-8acd-399e7fe0366a US CA South San Francisco 37.654694 -122.4077 http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/ Writing Hive UDFs - a tutorial http snowplowanalytics.com 80 /blog/2013/02/08/writing-hive-udfs-and-serdes/ Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14 Safari Safari Browser WEBKIT en-us 0 0 0 0 0 0 0 0 0 1 24 1440 1845 Mac OS Mac OS Apple Inc. America/Los_Angeles Computer 0 1440 900 UTF-8 1440 6015 """
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ object TabularOutputSpec {
val contextPath = s"shredded-tsv/vendor=org.schema/name=WebPage/format=jsonschema/version=1"

val contextContents =
"org.schema\tWebPage\tjsonschema\t1-0-0\t2b1b25a4-c0df-4859-8201-cf21492ad61b\t2014-05-29 18:16:35.000\tevents\t[\"events\",\"WebPage\"]\tevents\tJonathan Almeida\t[\"blog\",\"releases\"]\t\t\t2014-07-23T00:00:00Z\tblog\ten-US\t[\"snowplow\",\"analytics\",\"java\",\"jvm\",\"tracker\"]"
"'org.schema'\t'WebPage'\t'jsonschema'\t'1-0-0'\t'2b1b25a4-c0df-4859-8201-cf21492ad61b'\t'2014-05-29 18:16:35.000'\t'events'\t'[\"events\",\"WebPage\"]'\t'events'\t'Jonathan Almeida'\t'[\"blog\",\"releases\"]'\t\t\t'2014-07-23T00:00:00Z'\t'blog'\t'en-US'\t'[\"snowplow\",\"analytics\",\"java\",\"jvm\",\"tracker\"]'"

val eventPath = s"shredded-tsv/vendor=com.snowplowanalytics.snowplow/name=application_error/format=jsonschema/version=1"

val eventContents =
"com.snowplowanalytics.snowplow\tapplication_error\tjsonschema\t1-0-2\t2b1b25a4-c0df-4859-8201-cf21492ad61b\t2014-05-29 18:16:35.000\tevents\t[\"events\",\"application_error\"]\tevents\tundefined is not a function\tJAVASCRIPT\tAbstractSingletonFactoryBean\t\t1\t\t\t14\t\t\t\tthis column should be last"
"'com.snowplowanalytics.snowplow'\t'application_error'\t'jsonschema'\t'1-0-2'\t'2b1b25a4-c0df-4859-8201-cf21492ad61b'\t'2014-05-29 18:16:35.000'\t'events'\t'[\"events\",\"application_error\"]'\t'events'\t'undefined is not a function'\t'JAVASCRIPT'\t'AbstractSingletonFactoryBean'\t\t1\t\t\t14\t\t\t\t'this column should be last'"

// Removed three JSON columns and added 7 columns at the end
val event = """snowplowweb web 2014-06-01 14:04:11.639 2014-05-29 18:16:35.000 2014-05-29 18:04:11.639 unstruct 2b1b25a4-c0df-4859-8201-cf21492ad61b 836413 clojure js-2.0.0-M2 clj-0.6.0-tom-0.0.4 hadoop-0.5.0-common-0.4.0 216.207.42.134 3499345421 3b1d1a375044eede 3 2bad2a4e-aae4-4bea-8acd-399e7fe0366a US CA South San Francisco 37.654694 -122.4077 http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/ Writing Hive UDFs - a tutorial http snowplowanalytics.com 80 /blog/2013/02/08/writing-hive-udfs-and-serdes/ Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14 Safari Safari Browser WEBKIT en-us 0 0 0 0 0 0 0 0 0 1 24 1440 1845 Mac OS Mac OS Apple Inc. America/Los_Angeles Computer 0 1440 900 UTF-8 1440 6015 """
Expand Down

0 comments on commit 439b737

Please sign in to comment.