Skip to content

Commit

Permalink
Migrate GroupStateSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxGekk committed Feb 12, 2024
1 parent ac43d35 commit c60c8ad
Showing 1 changed file with 16 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ class GroupStateSuite extends SparkFunSuite {
)
for (state <- states) {
// for streaming queries
testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)

// for batch queries
testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
}
}
}
Expand All @@ -135,7 +135,7 @@ class GroupStateSuite extends SparkFunSuite {
assert(state.getTimeoutTimestampMs.get() === 2000)
state.setTimeoutDuration(500)
assert(state.getTimeoutTimestampMs.get() === 1500) // can be set without initializing state
testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)

state.update(5)
assert(state.getTimeoutTimestampMs.isPresent())
Expand All @@ -144,37 +144,37 @@ class GroupStateSuite extends SparkFunSuite {
assert(state.getTimeoutTimestampMs.get() === 2000)
state.setTimeoutDuration("2 second")
assert(state.getTimeoutTimestampMs.get() === 3000)
testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)

state.remove()
assert(state.getTimeoutTimestampMs.isPresent())
assert(state.getTimeoutTimestampMs.get() === 3000) // does not change
state.setTimeoutDuration(500) // can still be set
assert(state.getTimeoutTimestampMs.get() === 1500)
testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)

// for batch queries
state = GroupStateImpl.createForBatch(
ProcessingTimeTimeout, watermarkPresent = false).asInstanceOf[GroupStateImpl[Int]]
assert(!state.getTimeoutTimestampMs.isPresent())
state.setTimeoutDuration(500)
testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)

state.update(5)
state.setTimeoutDuration(1000)
state.setTimeoutDuration("2 second")
testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)

state.remove()
state.setTimeoutDuration(500)
testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
}

test("GroupState - setTimeout - with EventTimeTimeout") {
var state = TestGroupState.create[Int](
Optional.empty[Int], EventTimeTimeout, 1000, Optional.of(1000), hasTimedOut = false)
assert(!state.getTimeoutTimestampMs.isPresent())
testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
state.setTimeoutTimestamp(5000)
assert(state.getTimeoutTimestampMs.get() === 5000) // can be set without initializing state

Expand All @@ -184,29 +184,29 @@ class GroupStateSuite extends SparkFunSuite {
assert(state.getTimeoutTimestampMs.get() === 10000)
state.setTimeoutTimestamp(new Date(20000))
assert(state.getTimeoutTimestampMs.get() === 20000)
testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)

state.remove()
assert(state.getTimeoutTimestampMs.get() === 20000)
state.setTimeoutTimestamp(5000)
assert(state.getTimeoutTimestampMs.get() === 5000) // can be set after removing state
testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)

// for batch queries
state = GroupStateImpl.createForBatch(
EventTimeTimeout, watermarkPresent = false).asInstanceOf[GroupStateImpl[Int]]
assert(!state.getTimeoutTimestampMs.isPresent())
testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
state.setTimeoutTimestamp(5000)

state.update(5)
state.setTimeoutTimestamp(10000)
state.setTimeoutTimestamp(new Date(20000))
testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)

state.remove()
state.setTimeoutTimestamp(5000)
testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
}

test("GroupState - illegal params to setTimeout") {
Expand Down

0 comments on commit c60c8ad

Please sign in to comment.