From 58344be309292a4ae4d68debfdf391068ede957b Mon Sep 17 00:00:00 2001 From: andiehuang Date: Tue, 23 Aug 2022 10:31:02 +0800 Subject: [PATCH] optimize DataSummaryET --- byzer-extension-tests/pom.xml | 5 - .../mlsql/plugins/mllib/app/MLSQLMllib.scala | 3 +- .../mllib/ets/fe/SQLDataSummaryV2.scala | 434 ++++++++++++++++++ 3 files changed, 436 insertions(+), 6 deletions(-) create mode 100644 mlsql-mllib/src/main/java/tech/mlsql/plugins/mllib/ets/fe/SQLDataSummaryV2.scala diff --git a/byzer-extension-tests/pom.xml b/byzer-extension-tests/pom.xml index e54eb0fe..209b9012 100644 --- a/byzer-extension-tests/pom.xml +++ b/byzer-extension-tests/pom.xml @@ -29,11 +29,6 @@ openmldb-native 0.5.2 - - org.pegdown - pegdown - 1.6.0 - diff --git a/mlsql-mllib/src/main/java/tech/mlsql/plugins/mllib/app/MLSQLMllib.scala b/mlsql-mllib/src/main/java/tech/mlsql/plugins/mllib/app/MLSQLMllib.scala index 313b6a92..14f7efe9 100644 --- a/mlsql-mllib/src/main/java/tech/mlsql/plugins/mllib/app/MLSQLMllib.scala +++ b/mlsql-mllib/src/main/java/tech/mlsql/plugins/mllib/app/MLSQLMllib.scala @@ -5,7 +5,7 @@ import tech.mlsql.common.utils.log.Logging import tech.mlsql.dsl.CommandCollection import tech.mlsql.ets.register.ETRegister import tech.mlsql.plugins.mllib.ets._ -import tech.mlsql.plugins.mllib.ets.fe.{DataTranspose, OnehotExt, PSIExt, SQLDataSummary, SQLDescriptiveMetrics, SQLMissingValueProcess, SQLPatternDistribution, SQLUniqueIdentifier} +import tech.mlsql.plugins.mllib.ets.fe.{DataTranspose, OnehotExt, PSIExt, SQLDataSummary, SQLDataSummaryT, SQLDescriptiveMetrics, SQLMissingValueProcess, SQLPatternDistribution, SQLUniqueIdentifier} import tech.mlsql.plugins.mllib.ets.fintech.scorecard.{SQLBinning, SQLScoreCard} import tech.mlsql.version.VersionCompatibility @@ -21,6 +21,7 @@ class MLSQLMllib extends tech.mlsql.app.App with VersionCompatibility with Loggi ETRegister.register("TakeRandomSampleExt", classOf[TakeRandomSampleExt].getName) ETRegister.register("ColumnsExt", classOf[ColumnsExt].getName) ETRegister.register("DataSummary", classOf[SQLDataSummary].getName) + ETRegister.register("DataSummaryT", classOf[SQLDataSummaryT].getName) ETRegister.register("DataMissingValueProcess", classOf[SQLMissingValueProcess].getName) ETRegister.register("Binning", classOf[SQLBinning].getName) ETRegister.register("ScoreCard", classOf[SQLScoreCard].getName) diff --git a/mlsql-mllib/src/main/java/tech/mlsql/plugins/mllib/ets/fe/SQLDataSummaryV2.scala b/mlsql-mllib/src/main/java/tech/mlsql/plugins/mllib/ets/fe/SQLDataSummaryV2.scala new file mode 100644 index 00000000..8a9fd34a --- /dev/null +++ b/mlsql-mllib/src/main/java/tech/mlsql/plugins/mllib/ets/fe/SQLDataSummaryV2.scala @@ -0,0 +1,434 @@ +package tech.mlsql.plugins.mllib.ets.fe + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession, functions => F} +import org.apache.spark.sql.{functions => f} +import org.apache.spark.sql.expressions.{UserDefinedFunction, Window} +import org.apache.spark.sql.functions.{avg, coalesce, col, count, countDistinct, expr, first, last, length, lit, map_zip_with, max, min, monotonically_increasing_id, round, row_number, spark_partition_id, sqrt, stddev, sum, udf, when, window} +import org.apache.spark.sql.types.{BooleanType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, StructField, StructType, TimestampType, VarcharType} +import streaming.dsl.ScriptSQLExec +import streaming.dsl.auth.{DB_DEFAULT, MLSQLTable, OperateType, TableAuthResult, TableType} +import streaming.dsl.mmlib.{Code, SQLAlg, SQLCode} +import streaming.dsl.mmlib.algs.{CodeExampleText, Functions, MllibFunctions} +import streaming.dsl.mmlib.algs.param.BaseParams +import tech.mlsql.dsl.auth.ETAuth +import tech.mlsql.dsl.auth.dsl.mmlib.ETMethod.{ETMethod, PREDICT} + +import scala.util.Try + +class SQLDataSummaryV2(override val uid: String) extends SQLAlg with MllibFunctions with Functions with BaseParams with ETAuth { + + def this() = this(BaseParams.randomUID()) + + var round_at = 2 + + var numericCols: Array[String] = null + + def colWithFilterBlank(sc: StructField): Column = { + val col_name = sc.name + sc.dataType match { + case DoubleType => col(col_name).isNotNull && !col(col_name).isNaN + case FloatType => col(col_name).isNotNull && !col(col_name).isNaN + case StringType => col(col_name).isNotNull && col(col_name) =!= "" + case _ => col(col_name).isNotNull + } + } + + + def countColsStdDevNumber(schema: StructType, numeric_columns: Array[String]): Array[Column] = { + schema.map(sc => { + val c = sc.name + if (numeric_columns.contains(c)) { + // round(stddev(col(c)), round_at).alias(c) + val expr = round(stddev(when(colWithFilterBlank(sc), col(c))), round_at) + coalesce(expr, lit("")).alias(c) + } else { + max(lit("")).alias(c) + } + }).toArray + } + + def countColsStdErrNumber(schema: StructType, numeric_columns: Array[String]): Array[Column] = { + schema.map(sc => { + val c = sc.name + if (numeric_columns.contains(c)) { + val expr = round(stddev(when(colWithFilterBlank(sc), col(c))) / + sqrt(sum(when(colWithFilterBlank(sc), 1).otherwise(0))), round_at) + coalesce(expr, lit("")).alias(c) + // round(stddev(col(c)) / sqrt(total_count), round_at).alias(c) + } else { + max(lit("")).alias(c) + } + }).toArray + } + + def isPrimaryKey(schmea: StructType, numeric_columns: Array[String], total_count: Long): Array[Column] = { + schmea.map(sc => { + val c = sc.name + val exp1 = countDistinct(when(colWithFilterBlank(sc), col(sc.name))) / sum(when(colWithFilterBlank(sc), 1).otherwise(0)) + when(exp1 === 1, 1).otherwise(0) + }).toArray + } + + def countUniqueValueRatio(schema: StructType): Array[Column] = { + schema.map(sc => { + val sum_expr = sum(when(colWithFilterBlank(sc), 1).otherwise(0)) + + val divide_expr = countDistinct(when(colWithFilterBlank(sc), col(sc.name))) / sum(when(colWithFilterBlank(sc), 1).otherwise(0)) + val ratio_expr = when(sum_expr === 0, 0.0).otherwise(divide_expr) + round(ratio_expr, round_at + 2).alias(sc.name) + }).toArray + } + + def getMaxNum(schema: StructType, numeric_columns: Array[String]): Array[Column] = { + schema.map(sc => { + val c = sc.name + if (numeric_columns.contains(c)) { + // if not consider the empty value + val max_expr = round(max(when(colWithFilterBlank(sc), col(c))), round_at).alias(c) + coalesce(max_expr.cast(StringType), lit("")).alias(c) + // max(col(c)).cast(StringType).alias(c) + } else { + val filter_expr = when(colWithFilterBlank(sc), col(sc.name)) + val max_expr = max(filter_expr) + coalesce(max_expr.cast(StringType), lit("")).alias(c) + } + }).toArray + } + + def getMinNum(schema: StructType, numeric_columns: Array[String]): Array[Column] = { + schema.map(sc => { + val c = sc.name + if (numeric_columns.contains(c)) { + coalesce(round(min(when(colWithFilterBlank(sc), col(c))), round_at).cast(StringType), lit("")).alias(c) + // min(col(c)).cast(StringType).alias(c) + } else { + // min(col(c)).cast(StringType).alias(c) + val filter_expr = when(colWithFilterBlank(sc), col(sc.name)) + val min_expr = min(filter_expr) + coalesce(min_expr.cast(StringType), lit("")).alias(c) + } + }).toArray + } + + def roundAtSingleCol(sc: StructField, column: Column): Column = { + if (numericCols.contains(sc.name)) { + return round(column, round_at).cast(StringType) + } + column.cast(StringType) + } + + def maxUdf = udf((arr: Seq[Int]) => { + val filtered = arr.filterNot(_ == "") + if (filtered.isEmpty) 0 + else filtered.map(_.toInt).max + }) + + def processModeValue(modeCandidates: Array[Row], modeFormat: String): Any = { + // val mode = modeCandidates.length match { + // case p: Int if p >= 2 => { + // modeFormat match { + // case ModeValueFormat.empty => "" + // case ModeValueFormat.all => "[" + modeCandidates.map(_.get(0).toString).mkString(",") + "]" + // case ModeValueFormat.auto => modeCandidates(0).get(0) + // } + // } + // case _ => modeCandidates(0).get(0) + // } + val mode = if (modeCandidates.lengthCompare(2) >= 0) { + modeFormat match { + case ModeValueFormat.empty => "" + case ModeValueFormat.all => "[" + modeCandidates.map(_.get(0).toString).mkString(",") + "]" + case ModeValueFormat.auto => modeCandidates.head.get(0) + } + } else { + modeCandidates.head.get(0) + } + mode + } + + def isArrayString(mode: Any): Boolean = { + mode.toString.startsWith("[") && mode.toString.endsWith("]") + } + + def getModeNum(schema: StructType, numeric_columns: Array[String], df: DataFrame, modeFormat: String): Array[Column] = { + + schema.map(sc => { + val dfWithoutNa = df.repartition(col(sc.name)).select(col(sc.name)).na.drop() + if (dfWithoutNa.isEmpty) { + /* If no alias is set, multiple columns with null indicators will result in the following error: + org.apache.spark.sql.AnalysisException: Reference 'first()' is ambiguous, could be: first(), first(). */ + first(lit("")).alias(sc.name) + } else { + val newDF = dfWithoutNa.groupBy(col(sc.name)).count().orderBy(F.desc("count")) + val largestCount = newDF.collect().head.get(1) + val modeCandidates = newDF.select(col(sc.name), col("count")).where(col("count") === largestCount).collect() + val mode = processModeValue(modeCandidates, modeFormat) + var modeCol = first(lit(mode)).cast(StringType).alias(sc.name) + // Here to round the mode value + if (numeric_columns.contains(sc.name) && mode != "" && !isArrayString(mode)) { + modeCol = roundAtSingleCol(sc, max(lit(mode))).alias(sc.name) + } + modeCol + } + }).toArray + } + + def countNonNullValue(schema: StructType): Array[Column] = { + schema.map(sc => { + count(sc.name).cast(StringType) + }).toArray + } + + def countColsNullNumber(schema: StructType, total_count: Long): Array[Column] = { + // schema.map(sc => { + // // For proportion calculation, we remain 2 more digits. Hence, we need set up round_at + 2 + // sc.dataType match { + // case DoubleType => round(count(when(col(sc.name).isNull || col(sc.name).isNaN, sc.name)) / total_count, round_at + 2).alias(sc.name) + // case FloatType => round(count(when(col(sc.name).isNull || col(sc.name).isNaN, sc.name)) / total_count, round_at + 2).alias(sc.name) + // case _ => round(count(when(col(sc.name).isNull, sc.name)) / total_count, round_at + 2).alias(sc.name) + // } + // }).toArray + schema.map(sc => { + // For proportion calculation, we remain 2 more digits. Hence, we need set up round_at + 2 + // sc.dataType match { + // case DoubleType => round(col(sc.name) / total_count, round_at + 2).alias(sc.name) + // case FloatType => round(count(when(col(sc.name).isNull || col(sc.name).isNaN, sc.name)) / total_count, round_at + 2).alias(sc.name) + // case _ => round(count(when(col(sc.name).isNull, sc.name)) / total_count, round_at + 2).alias(sc.name) + // } + round(col(sc.name) / total_count, round_at + 2).alias(sc.name) + }).toArray + } + + def countColsEmptyNumber(columns: Array[String], total_count: Long): Array[Column] = { + // columns.map(c => { + // count(when(col(c) === "", c)).alias(c) + // }) + columns.map(c => { + round(col(c) / total_count, round_at + 2).alias(c) + }).toArray + } + + def nullValueCount(schema: StructType): Array[Column] = { + schema.map(sc => { + sc.dataType match { + case DoubleType => count(when(col(sc.name).isNull || col(sc.name).isNaN, sc.name)).alias("left_" + sc.name) + case FloatType => count(when(col(sc.name).isNull || col(sc.name).isNaN, sc.name)).alias("left_" + sc.name) + case _ => count(when(col(sc.name).isNull, sc.name)).alias("left_" + sc.name) + } + }).toArray + } + + def emptyCount(schema: StructType): Array[Column] = { + schema.map(sc => { + count(when(col(sc.name) === "", sc.name)).alias("right_" + sc.name) + }).toArray + } + + def getMaxLength(schema: StructType): Array[Column] = { + schema.map(sc => { + sc.dataType match { + case StringType => max(length(col(sc.name))) + case _ => max(lit("")).alias(sc.name) + } + }).toArray + } + + def getMinLength(schema: StructType): Array[Column] = { + schema.map(sc => { + sc.dataType match { + case StringType => min(length(col(sc.name))) + case _ => min(lit("")).alias(sc.name) + } + }).toArray + } + + + def getMeanValue(schema: StructType): Array[Column] = { + schema.map(sc => { + val new_col = if (numericCols.contains(sc.name)) { + val avgExp = avg(when(colWithFilterBlank(sc), col(sc.name))) + val roundExp = round(avgExp, round_at) + when(roundExp.isNull, lit("")).otherwise(roundExp).alias(sc.name) + } else { + last(lit("")).alias(sc.name) + } + new_col + }).toArray + } + + /** + * compute percentile from an unsorted Spark RDD + * + * @param data : input data set of Double numbers + * @param tile : percentile to compute (eg. 85 percentile) + * @return value of input data at the specified percentile + */ + def computePercentile(data: RDD[Double], tile: Double): Double = { + // NIST method; data to be sorted in ascending order + val r = data.sortBy(x => x) + val c = r.count() + val res = if (c == 1) r.first() + else { + val n = (tile / 100d) * (c + 1d) + val k = math.floor(n).toLong + val d = n - k + if (k <= 0) r.first() + else { + val index = r.zipWithIndex().map(_.swap) + val last = c + if (k >= c) { + index.lookup(last - 1).head + } else { + val topRow = index.lookup(k - 1) + topRow.head + d * (index.lookup(k).head - topRow.head) + } + } + } + r.unpersist() + res + } + + def getQuantileNum(schema: StructType, df: DataFrame, numeric_columns: Array[String]): Array[Array[Double]] = { + schema.map(sc => { + if (numeric_columns.contains(sc.name)) { + val new_df = df.select(col(sc.name)) + var res = Array(Double.NaN, Double.NaN, Double.NaN) + val data = new_df.rdd.map(x => { + val v = String.valueOf(x(0)) + v match { + case "" => Double.NaN + case "null" => Double.NaN + case _ => v.toDouble + } + }).filter(!_.isNaN) + if (data.isEmpty()) { + + } else { + val q1 = computePercentile(data, 25) + val q2 = computePercentile(data, 50) + val q3 = computePercentile(data, 75) + res = Array(q1, q2, q3) + } + res + } else { + Array(Double.NaN, Double.NaN, Double.NaN) + } + }).toArray + } + + def getTypeLength(schema: StructType): Array[Column] = { + schema.map(sc => { + sc.dataType.typeName match { + case "byte" => first(lit(1L)).alias(sc.name) + case "short" => first(lit(2L)).alias(sc.name) + case "integer" => first(lit(4L)).alias(sc.name) + case "long" => first(lit(8L)).alias(sc.name) + case "float" => first(lit(4L)).alias(sc.name) + case "double" => first(lit(8L)).alias(sc.name) + case "string" => max(length(col(sc.name))).alias(sc.name) + case "date" => first(lit(8L)).alias(sc.name) + case "timestamp" => first(lit(8L)).alias(sc.name) + case "boolean" => first(lit(1L)).alias(sc.name) + case name: String if name.contains("decimal") => first(lit(16L)).alias(sc.name) + case _ => first(lit("")).alias(sc.name) + } + }).toArray + } + + def roundNumericCols(df: DataFrame, round_at: Integer): DataFrame = { + df.select(df.schema.map(sc => { + sc.dataType match { + case DoubleType => expr(s"cast (${sc.name} as decimal(38,2)) as ${sc.name}") + case FloatType => expr(s"cast (${sc.name} as decimal(38,2)) as ${sc.name}") + case _ => col(sc.name) + } + }): _*) + } + + + def train(df: DataFrame, path: String, params: Map[String, String]): DataFrame = { + + round_at = Integer.valueOf(params.getOrElse("roundAt", "2")) + + val approxSwitch = Try(params.getOrElse("approxSwitch", "false").toBoolean).getOrElse(false) + val modeFormat = Try(params.getOrElse(DataSummary.modeFormat, ModeValueFormat.empty)).getOrElse(ModeValueFormat.empty) + var metrics = params.getOrElse(DataSummary.metrics, "").split(",").filter(!_.equalsIgnoreCase("")) + val repartitionDF = df.repartition(df.schema.map(sc => col(sc.name)).toArray: _*).cache() + val columns = repartitionDF.columns + try { + columns.map(col => { + if (col.contains(".") || col.contains("`")) { + throw new RuntimeException(s"The column name : ${col} contains special symbols, like . or `, please rename it first!! ") + } + }) + + numericCols = repartitionDF.schema.filter(sc => { + sc.dataType.typeName match { + case datatype: String => Array("integer", "short", "double", "float", "long").contains(datatype) || datatype.contains("decimal") + case _ => false + } + }).map(sc => { + sc.name + }).toArray + val schema = repartitionDF.schema + val newCols = getMaxLength(schema) ++ getMinLength(schema) ++ getMeanValue(schema) ++ countColsStdDevNumber(schema, numericCols) ++ getMaxNum(schema, numericCols) ++ getMinNum(schema, numericCols) ++ getTypeLength(schema) + var res = repartitionDF.select(newCols: _*) + repartitionDF.head(1) + res + } + + catch + { + case e: Exception => throw e + } + finally + { + repartitionDF.unpersist() + } +} + +override def load (sparkSession: SparkSession, path: String, params: Map[String, String] ): Any = { + } + + override def predict (sparkSession: SparkSession, _model: Any, name: String, params: Map[String, String] ): UserDefinedFunction = ??? + + override def batchPredict (df: DataFrame, path: String, params: Map[String, String] ): DataFrame = + train (df, path, params) + + override def codeExample: Code = Code (SQLCode, CodeExampleText.jsonStr + + """ + | + |set abc=''' + |{"name": "elena", "age": 57, "phone": 15552231521, "income": 433000, "label": 0} + |{"name": "candy", "age": 67, "phone": 15552231521, "income": 1200, "label": 0} + |{"name": "bob", "age": 57, "phone": 15252211521, "income": 89000, "label": 0} + |{"name": "candy", "age": 25, "phone": 15552211522, "income": 36000, "label": 1} + |{"name": "candy", "age": 31, "phone": 15552211521, "income": 300000, "label": 1} + |{"name": "finn", "age": 23, "phone": 15552211521, "income": 238000, "label": 1} + |'''; + | + |load jsonStr.`abc` as table1; + |select age, income from table1 as table2; + |run table2 as DataSummary.`` as summaryTable; + |; + """.stripMargin) + + + override def auth (etMethod: ETMethod, path: String, params: Map[String, String] ): List[TableAuthResult] = { + val vtable = MLSQLTable ( + Option (DB_DEFAULT.MLSQL_SYSTEM.toString), + Option ("__fe_data_summary_operator__"), + OperateType.SELECT, + Option ("select"), + TableType.SYSTEM) + + val context = ScriptSQLExec.contextGetOrForTest () + context.execListener.getTableAuth match { + case Some (tableAuth) => + tableAuth.auth (List (vtable) ) + case None => + List (TableAuthResult (granted = true, "") ) + } + } + } \ No newline at end of file