Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
AngersZhuuuu committed Jul 13, 2020
1 parent dfcec3c commit e53744b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,23 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
}
}

abstract class BaseScriptTransformationWriterThread(
iter: Iterator[InternalRow],
inputSchema: Seq[DataType],
ioSchema: BaseScriptTransformIOSchema,
outputStream: OutputStream,
proc: Process,
stderrBuffer: CircularBuffer,
taskContext: TaskContext,
conf: Configuration) extends Thread with Logging {
abstract class BaseScriptTransformationWriterThread extends Thread with Logging {

def iter: Iterator[InternalRow]

def inputSchema: Seq[DataType]

def ioSchema: BaseScriptTransformIOSchema

def outputStream: OutputStream

def proc: Process

def stderrBuffer: CircularBuffer

def taskContext: TaskContext

def conf: Configuration

setDaemon(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ case class SparkScriptTransformationExec(

// This new thread will consume the ScriptTransformation's input rows and write them to the
// external process. That process's output will be read by this current thread.
val writerThread = new ScriptTransformationWriterThread(
val writerThread = ScriptTransformationWriterThread(
inputIterator.map(outputProjection),
input.map(_.dataType),
ioschema,
Expand Down Expand Up @@ -131,7 +131,7 @@ case class SparkScriptTransformationExec(
}
}

private class ScriptTransformationWriterThread(
case class ScriptTransformationWriterThread(
iter: Iterator[InternalRow],
inputSchema: Seq[DataType],
ioSchema: SparkScriptIOSchema,
Expand All @@ -140,15 +140,7 @@ private class ScriptTransformationWriterThread(
stderrBuffer: CircularBuffer,
taskContext: TaskContext,
conf: Configuration)
extends BaseScriptTransformationWriterThread(
iter,
inputSchema,
ioSchema,
outputStream,
proc,
stderrBuffer,
taskContext,
conf) {
extends BaseScriptTransformationWriterThread {

setDaemon(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ case class HiveScriptTransformationExec(

// This new thread will consume the ScriptTransformation's input rows and write them to the
// external process. That process's output will be read by this current thread.
val writerThread = new HiveScriptTransformationWriterThread(
val writerThread = HiveScriptTransformationWriterThread(
inputIterator.map(outputProjection),
input.map(_.dataType),
inputSerde,
Expand Down Expand Up @@ -208,7 +208,7 @@ case class HiveScriptTransformationExec(
}
}

private class HiveScriptTransformationWriterThread(
case class HiveScriptTransformationWriterThread(
iter: Iterator[InternalRow],
inputSchema: Seq[DataType],
@Nullable inputSerde: AbstractSerDe,
Expand All @@ -219,15 +219,7 @@ private class HiveScriptTransformationWriterThread(
stderrBuffer: CircularBuffer,
taskContext: TaskContext,
conf: Configuration)
extends BaseScriptTransformationWriterThread(
iter,
inputSchema,
ioSchema,
outputStream,
proc,
stderrBuffer,
taskContext,
conf) with HiveInspectors {
extends BaseScriptTransformationWriterThread with HiveInspectors {

override def processRows(): Unit = {
val dataOutputStream = new DataOutputStream(outputStream)
Expand Down

0 comments on commit e53744b

Please sign in to comment.