Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for AtomicCreateTableAsSelectExec #895

Merged
merged 2 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/resources/operatorsScore-databricks-aws-t4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ ObjectHashAggregateExec,2.45
SortAggregateExec,2.45
InMemoryTableScanExec,2.45
DataWritingCommandExec,2.45
AtomicReplaceTableAsSelectExec,2.45
AtomicCreateTableAsSelectExec,2.45
OverwriteByExpressionExecV1,2.45
AppendDataExecV1,2.45
ExecutedCommandExec,2.45
BatchScanExec,2.45
BroadcastExchangeExec,2.45
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ ObjectHashAggregateExec,2.73
SortAggregateExec,2.73
InMemoryTableScanExec,2.73
DataWritingCommandExec,2.73
AtomicReplaceTableAsSelectExec,2.73
AtomicCreateTableAsSelectExec,2.73
OverwriteByExpressionExecV1,2.73
AppendDataExecV1,2.73
ExecutedCommandExec,2.73
BatchScanExec,2.73
BroadcastExchangeExec,2.73
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/resources/operatorsScore-dataproc-gke-l4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ HashAggregateExec,4.29
ObjectHashAggregateExec,4.29
SortAggregateExec,4.29
DataWritingCommandExec,3.74
AtomicReplaceTableAsSelectExec,3.74
AtomicCreateTableAsSelectExec,3.74
OverwriteByExpressionExecV1,3.74
AppendDataExecV1,3.74
ExecutedCommandExec,3.74
BatchScanExec,2.65
ShuffleExchangeExec,3.2
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/resources/operatorsScore-dataproc-gke-t4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ HashAggregateExec,4.1
ObjectHashAggregateExec,4.1
SortAggregateExec,4.1
DataWritingCommandExec,3.65
AtomicReplaceTableAsSelectExec,3.65
AtomicCreateTableAsSelectExec,3.65
OverwriteByExpressionExecV1,3.65
AppendDataExecV1,3.65
ExecutedCommandExec,3.65
BatchScanExec,2.84
ShuffleExchangeExec,3.69
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/resources/operatorsScore-dataproc-l4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ ObjectHashAggregateExec,4.16
SortAggregateExec,4.16
InMemoryTableScanExec,4.16
DataWritingCommandExec,4.16
AtomicReplaceTableAsSelectExec,4.16
AtomicCreateTableAsSelectExec,4.16
OverwriteByExpressionExecV1,4.16
AppendDataExecV1,4.16
ExecutedCommandExec,4.16
BatchScanExec,4.16
BroadcastExchangeExec,4.16
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ HashAggregateExec,5.54
ObjectHashAggregateExec,5.54
SortAggregateExec,5.54
DataWritingCommandExec,4.25
AtomicReplaceTableAsSelectExec,4.25
AtomicCreateTableAsSelectExec,4.25
OverwriteByExpressionExecV1,4.25
AppendDataExecV1,4.25
ExecutedCommandExec,4.25
BatchScanExec,3.64
ShuffleExchangeExec,5.21
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/resources/operatorsScore-dataproc-t4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ ObjectHashAggregateExec,4.88
SortAggregateExec,4.88
InMemoryTableScanExec,4.88
DataWritingCommandExec,4.88
AtomicReplaceTableAsSelectExec,4.88
AtomicCreateTableAsSelectExec,4.88
OverwriteByExpressionExecV1,4.88
AppendDataExecV1,4.88
ExecutedCommandExec,4.88
BatchScanExec,4.88
BroadcastExchangeExec,4.88
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/resources/operatorsScore-emr-a10.csv
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ ObjectHashAggregateExec,2.59
SortAggregateExec,2.59
InMemoryTableScanExec,2.59
DataWritingCommandExec,2.59
AtomicReplaceTableAsSelectExec,2.59
AtomicCreateTableAsSelectExec,2.59
OverwriteByExpressionExecV1,2.59
AppendDataExecV1,2.59
ExecutedCommandExec,2.59
BatchScanExec,2.59
BroadcastExchangeExec,2.59
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/resources/operatorsScore-emr-t4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ ObjectHashAggregateExec,2.07
SortAggregateExec,2.07
InMemoryTableScanExec,2.07
DataWritingCommandExec,2.07
AtomicReplaceTableAsSelectExec,2.07
AtomicCreateTableAsSelectExec,2.07
OverwriteByExpressionExecV1,2.07
AppendDataExecV1,2.07
ExecutedCommandExec,2.07
BatchScanExec,2.07
BroadcastExchangeExec,2.07
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/resources/operatorsScore-onprem-a100.csv
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ ObjectHashAggregateExec,3.0
SortAggregateExec,3.0
InMemoryTableScanExec,3.0
DataWritingCommandExec,3.0
AtomicReplaceTableAsSelectExec,3.0
AtomicCreateTableAsSelectExec,3.0
OverwriteByExpressionExecV1,3.0
AppendDataExecV1,3.0
ExecutedCommandExec,3.0
BatchScanExec,3.0
BroadcastExchangeExec,3.0
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/resources/supportedExecs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ InMemoryTableScanExec,S,This is disabled by default because there could be compl
DataWritingCommandExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,PS,NS,S,NS,PS,PS,PS,NS,S,S
ExecutedCommandExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S,S,S
WriteFilesExec,TNEW,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S,S,S
AppendDataExecV1,TNEW,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,NS,S,NS,PS,PS,PS,NS,S,S
AtomicCreateTableAsSelectExec,TNEW,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,NS,S,NS,PS,PS,PS,NS,S,S
AtomicReplaceTableAsSelectExec,TNEW,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,NS,S,NS,PS,PS,PS,NS,S,S
AppendDataExecV1,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,NS,S,NS,PS,PS,PS,NS,S,S
AtomicCreateTableAsSelectExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,NS,S,NS,PS,PS,PS,NS,S,S
AtomicReplaceTableAsSelectExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,NS,S,NS,PS,PS,PS,NS,S,S
BatchScanExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,NS,S,NS,PS,PS,PS,NS,S,S
OverwriteByExpressionExecV1,TNEW,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,NS,S,NS,PS,PS,PS,NS,S,S
OverwriteByExpressionExecV1,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,NS,S,NS,PS,PS,PS,NS,S,S
BroadcastExchangeExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
ShuffleExchangeExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,S,S
BroadcastHashJoinExec,S,None,leftKeys,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NA,PS,NS,NS,NS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ object DataWritingCommandExecParser {
val appendDataExecV1 = "AppendDataExecV1"
val overwriteByExprExecV1 = "OverwriteByExpressionExecV1"
val atomicReplaceTableExec = "AtomicReplaceTableAsSelect"
val atomicCreateTableExec = "AtomicCreateTableAsSelect"
// Note: List of writeExecs that represent a physical command.
// hardcode because InsertIntoHadoopFsRelationCommand uses this same exec
// and InsertIntoHadoopFsRelationCommand doesn't have an entry in the
Expand All @@ -74,13 +75,16 @@ object DataWritingCommandExecParser {
// - InsertIntoHiveTable is generated by Hive catalogue implementation
// - OverwriteByExprExecV1 is a spark write Op but it has special handling for DeltaLake
// - AppendDataExecV1 is a spark write Op but it has special handling for DeltaLake
// - AtomicReplaceTableAsSelectExec and AtomicCreateTableAsSelectExec are spark write Op but
// they have special handling for DeltaLake
private val logicalWriteCommands = Set(
dataWriteCMD,
insertIntoHadoopCMD,
HiveParseHelper.INSERT_INTO_HIVE_LABEL,
appendDataExecV1,
overwriteByExprExecV1,
atomicReplaceTableExec
atomicReplaceTableExec,
atomicCreateTableExec
)

// Note: Defines a list of the execs that include formatted data.
Expand All @@ -92,9 +96,10 @@ object DataWritingCommandExecParser {
private val logicalToPhysicalCmdMap = Map(
insertIntoHadoopCMD -> defaultPhysicalCMD,
HiveParseHelper.INSERT_INTO_HIVE_LABEL-> defaultPhysicalCMD,
appendDataExecV1 -> defaultPhysicalCMD,
overwriteByExprExecV1 -> defaultPhysicalCMD,
atomicReplaceTableExec -> defaultPhysicalCMD
appendDataExecV1 -> appendDataExecV1,
overwriteByExprExecV1 -> overwriteByExprExecV1,
atomicReplaceTableExec -> "AtomicReplaceTableAsSelectExec",
atomicCreateTableExec -> "AtomicCreateTableAsSelectExec"
)

// Map to hold the relation between writeExecCmd and the format.
Expand All @@ -105,14 +110,20 @@ object DataWritingCommandExecParser {
// if overwriteByExprExecV1 is not deltaLakeProvider, then we want to mark it as unsupported
overwriteByExprExecV1 -> "unknown",
// if atomicReplaceTableExec is not deltaLakeProvider, then we want to mark it as unsupported
atomicReplaceTableExec -> "unknown"
atomicReplaceTableExec -> "unknown",
// if atomicCreateTableExec is not deltaLakeProvider, then we want to mark it as unsupported
atomicCreateTableExec -> "unknown"
)

// Checks whether a node is a write CMD Exec
def isWritingCmdExec(nodeName: String): Boolean = {
logicalWriteCommands.exists(nodeName.contains(_)) || DeltaLakeHelper.accepts(nodeName)
}

def getPhysicalExecName(opName: String): String = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this look risky? Earlier if the logical operator was not present in logicalToPhysicalCmdMap it would throw an error. Now we would be returning defaultPhysicalCMD value.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is safer to have a default value. For example, this allows us to use the same method to get a physical representation of Delta Operators that are missing in the CSV files (i.e., MergeIntoCommandEdge, WriteIntoDeltaCommand, and SaveIntoDataSrcCMD).
Otherwise, we will have fill in the content of the map with all the Operators in Hive/Delta, which can be crash-able if someone forget to update the map with the new entry.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. This is definitely reducing redundancy. We should be aware of this default behaviour.

logicalToPhysicalCmdMap.getOrElse(opName, defaultPhysicalCMD)
}

def getWriteCMDWrapper(node: SparkPlanGraphNode): Option[DataWritingCmdWrapper] = {
val processedNodeName = node.name.trim
logicalWriteCommands.find(processedNodeName.contains(_)) match {
Expand All @@ -132,7 +143,7 @@ object DataWritingCommandExecParser {
getWriteFormatString(node.desc)
}
}
val physicalCmd = logicalToPhysicalCmdMap(wCmd)
val physicalCmd = getPhysicalExecName(wCmd)
Some(DataWritingCmdWrapper(wCmd, physicalCmd, dataFormat))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,25 @@ class DLWriteWithFormatAndSchemaParser(node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {
// We use "DataWritingCommandExec" to get the speedupFactor of AppendDataExecV1
val fullExecName: String = DataWritingCommandExecParser.dataWriteCMD
val fullExecName: String = DataWritingCommandExecParser.getPhysicalExecName(node.name)
override def parse: ExecInfo = {
// The node description has information about the table schema and its format.
// We do not want the op to me marked as RDD or UDF if the node description contains some
// expressions that match UDF/RDD.
val dataFormat = DeltaLakeHelper.getWriteFormat
val writeSupported = checker.isWriteFormatSupported(dataFormat)
val speedupFactor = if (writeSupported) {
checker.getSpeedupFactor(fullExecName)
val (speedupFactor, isExecSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
1.0
(1.0, false)
}
val dataFormat = DeltaLakeHelper.getWriteFormat
val writeSupported = checker.isWriteFormatSupported(dataFormat)
val finalSpeedupFactor = if (writeSupported) speedupFactor else 1.0

// execs like SaveIntoDataSourceCommand has prefix "Execute". So, we need to get rid of it.
val nodeName = node.name.replace("Execute ", "")
ExecInfo.createExecNoNode(sqlID, nodeName,
s"Format: $dataFormat", speedupFactor, None, node.id, OpTypes.WriteExec,
isSupported = writeSupported, children = None)
s"Format: $dataFormat", finalSpeedupFactor, None, node.id, OpTypes.WriteExec,
isSupported = writeSupported && isExecSupported, children = None)
}
}

Expand All @@ -62,12 +64,13 @@ object DeltaLakeHelper {
// [fieldName, type]
private val schemaRegex =
"\\s+\\|--\\s+([a-zA-Z]+(?:_[a-zA-Z]+)*):\\s+([a-zA-Z]+(?:_[a-zA-Z]+)*)\\s+\\(".r
val saveIntoDataSrcCMD = "SaveIntoDataSourceCommand"
private val atomicReplaceTableExec = "AtomicReplaceTableAsSelect"
private val appendDataExecV1 = "AppendDataExecV1"
private val overwriteByExprExecV1 = "OverwriteByExpressionExecV1"
private val atomicCreateTableExec = DataWritingCommandExecParser.atomicCreateTableExec
private val atomicReplaceTableExec = DataWritingCommandExecParser.atomicReplaceTableExec
private val appendDataExecV1 = DataWritingCommandExecParser.appendDataExecV1
private val overwriteByExprExecV1 = DataWritingCommandExecParser.overwriteByExprExecV1
private val mergeIntoCommandEdgeExec = "MergeIntoCommandEdge"
private val writeIntoDeltaCommandExec = "WriteIntoDeltaCommand"
val saveIntoDataSrcCMD = "SaveIntoDataSourceCommand"
// Note that the SaveIntoDataSourceCommand node name appears as
// "Execute SaveIntoDataSourceCommand"
// Same for Execute MergeIntoCommandEdge
Expand All @@ -79,7 +82,8 @@ object DeltaLakeHelper {
private val deltaExecsFromSpark = Set(
appendDataExecV1,
overwriteByExprExecV1,
atomicReplaceTableExec)
atomicReplaceTableExec,
atomicCreateTableExec)

// keywords used to verify that the operator provider is DeltaLake
private val nodeDescKeywords = Set(
Expand All @@ -95,8 +99,16 @@ object DeltaLakeHelper {
true
} else if (deltaExecsFromSpark.contains(node.name)) {
if (node.name.contains(appendDataExecV1) || node.name.contains(overwriteByExprExecV1)) {
// those two execs require the existence of the keywords in the node description
nodeDescKeywords.forall(s => node.desc.contains(s))
} else if (node.name.contains(atomicReplaceTableExec)) {
} else if (node.name.contains(atomicReplaceTableExec)
|| node.name.contains(atomicCreateTableExec)) {
// AtomicReplace and AtomicCreate have different format.
// To decide whether they are supported or not, we need to check whether the TableSpec
// second argument is "delta" provider. The sample below shows a table Spec with
// Delta Provider. If the argument is none, we assume the provider is not Delta.
// For simplicity, we will match regex on "*delta*".
//
// AtomicReplaceTableAsSelectExec has a different format
// AtomicReplaceTableAsSelect [num_affected_rows#ID_0L, num_inserted_rows#ID_1L],
// com.databricks.sql.managedcatalog.UnityCatalogV2Proxy@XXXXX,
Expand Down
40 changes: 0 additions & 40 deletions scripts/sync_plugin_files/override_supported_configs.json
Original file line number Diff line number Diff line change
Expand Up @@ -451,46 +451,6 @@
}
]
},
{
"Exec": "AppendDataExecV1",
"Params": "Input/Output",
"override": [
{
"key": "Supported",
"value": "TNEW"
}
]
},
{
"Exec": "AtomicCreateTableAsSelectExec",
"Params": "Input/Output",
"override": [
{
"key": "Supported",
"value": "TNEW"
}
]
},
{
"Exec": "AtomicReplaceTableAsSelectExec",
"Params": "Input/Output",
"override": [
{
"key": "Supported",
"value": "TNEW"
}
]
},
{
"Exec": "OverwriteByExpressionExecV1",
"Params": "Input/Output",
"override": [
{
"key": "Supported",
"value": "TNEW"
}
]
},
{
"Exec": "PythonMapInArrowExec",
"Params": "Input/Output",
Expand Down