Skip to content

Commit

Permalink
[SPARK-19613][SS][TEST] Random.nextString is not safe for directory n…
Browse files Browse the repository at this point in the history
…amePrefix

## What changes were proposed in this pull request?

`Random.nextString` is good for generating random string data, but it's not proper for directory name prefix in `Utils.createDirectory(tempDir, Random.nextString(10))`. This PR uses more safe directory namePrefix.

```scala
scala> scala.util.Random.nextString(10)
res0: String = 馨쭔ᎰႻ穚䃈兩㻞藑並
```

```scala
StateStoreRDDSuite:
- versioning and immutability
- recovering from files
- usage with iterators - only gets and only puts
- preferred locations using StateStoreCoordinator *** FAILED ***
  java.io.IOException: Failed to create a temp directory (under /.../spark/sql/core/target/tmp/StateStoreRDDSuite8712796397908632676) after 10 attempts!
  at org.apache.spark.util.Utils$.createDirectory(Utils.scala:295)
  at org.apache.spark.sql.execution.streaming.state.StateStoreRDDSuite$$anonfun$13$$anonfun$apply$6.apply(StateStoreRDDSuite.scala:152)
  at org.apache.spark.sql.execution.streaming.state.StateStoreRDDSuite$$anonfun$13$$anonfun$apply$6.apply(StateStoreRDDSuite.scala:149)
  at org.apache.spark.sql.catalyst.util.package$.quietly(package.scala:42)
  at org.apache.spark.sql.execution.streaming.state.StateStoreRDDSuite$$anonfun$13.apply(StateStoreRDDSuite.scala:149)
  at org.apache.spark.sql.execution.streaming.state.StateStoreRDDSuite$$anonfun$13.apply(StateStoreRDDSuite.scala:149)
...
- distributed test *** FAILED ***
  java.io.IOException: Failed to create a temp directory (under /.../spark/sql/core/target/tmp/StateStoreRDDSuite8712796397908632676) after 10 attempts!
  at org.apache.spark.util.Utils$.createDirectory(Utils.scala:295)
```

## How was this patch tested?

Pass the existing tests.StateStoreRDDSuite:

Author: Dongjoon Hyun <[email protected]>

Closes apache#21446 from dongjoon-hyun/SPARK-19613.
  • Loading branch information
dongjoon-hyun authored and HyukjinKwon committed May 29, 2018
1 parent fa2ae9d commit b31b587
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn

test("versioning and immutability") {
withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
val path = Utils.createDirectory(tempDir, Random.nextString(10)).toString
val path = Utils.createDirectory(tempDir, Random.nextFloat.toString).toString
val rdd1 = makeRDD(spark.sparkContext, Seq("a", "b", "a")).mapPartitionsWithStateStore(
spark.sqlContext, operatorStateInfo(path, version = 0), keySchema, valueSchema, None)(
increment)
Expand All @@ -73,7 +73,7 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn
}

test("recovering from files") {
val path = Utils.createDirectory(tempDir, Random.nextString(10)).toString
val path = Utils.createDirectory(tempDir, Random.nextFloat.toString).toString

def makeStoreRDD(
spark: SparkSession,
Expand Down Expand Up @@ -101,7 +101,7 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn
test("usage with iterators - only gets and only puts") {
withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
implicit val sqlContext = spark.sqlContext
val path = Utils.createDirectory(tempDir, Random.nextString(10)).toString
val path = Utils.createDirectory(tempDir, Random.nextFloat.toString).toString
val opId = 0

// Returns an iterator of the incremented value made into the store
Expand Down Expand Up @@ -149,7 +149,7 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn
quietly {
val queryRunId = UUID.randomUUID
val opId = 0
val path = Utils.createDirectory(tempDir, Random.nextString(10)).toString
val path = Utils.createDirectory(tempDir, Random.nextFloat.toString).toString

withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
implicit val sqlContext = spark.sqlContext
Expand Down Expand Up @@ -189,7 +189,7 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn
.config(sparkConf.setMaster("local-cluster[2, 1, 1024]"))
.getOrCreate()) { spark =>
implicit val sqlContext = spark.sqlContext
val path = Utils.createDirectory(tempDir, Random.nextString(10)).toString
val path = Utils.createDirectory(tempDir, Random.nextFloat.toString).toString
val opId = 0
val rdd1 = makeRDD(spark.sparkContext, Seq("a", "b", "a")).mapPartitionsWithStateStore(
sqlContext, operatorStateInfo(path, version = 0), keySchema, valueSchema, None)(increment)
Expand Down

0 comments on commit b31b587

Please sign in to comment.