Skip to content

Commit

Permalink
Fixed bug
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Feb 21, 2018
1 parent 601d653 commit b4c3c55
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,15 @@ abstract class StreamExecution(
Option(name).map(_ + "<br/>").getOrElse("") +
s"id = $id<br/>runId = $runId<br/>batch = $batchDescription"
}

private[sql] def withProgressLocked(f: => Unit): Unit = {
awaitProgressLock.lock()
try {
f
} finally {
awaitProgressLock.unlock()
}
}
}

object StreamExecution {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,19 +282,18 @@ class ContinuousExecution(
epoch: Long, reader: ContinuousReader, partitionOffsets: Seq[PartitionOffset]): Unit = {
assert(continuousSources.length == 1, "only one continuous source supported currently")

val globalOffset = reader.mergeOffsets(partitionOffsets.toArray)
val oldOffset = synchronized {
offsetLog.add(epoch, OffsetSeq.fill(globalOffset))
offsetLog.get(epoch - 1)
}

// If offset hasn't changed since last epoch, there's been no new data.
if (oldOffset.contains(OffsetSeq.fill(globalOffset))) {
noNewData = true
}

awaitProgressLock.lock()
try {
val globalOffset = reader.mergeOffsets(partitionOffsets.toArray)
val oldOffset = synchronized {
offsetLog.add(epoch, OffsetSeq.fill(globalOffset))
offsetLog.get(epoch - 1)
}

// If offset hasn't changed since last epoch, there's been no new data.
if (oldOffset.contains(OffsetSeq.fill(globalOffset))) {
noNewData = true
}
awaitProgressLockCondition.signalAll()
} finally {
awaitProgressLock.unlock()
Expand All @@ -308,23 +307,22 @@ class ContinuousExecution(
def commit(epoch: Long): Unit = {
assert(continuousSources.length == 1, "only one continuous source supported currently")
assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit")
synchronized {

awaitProgressLock.lock()
try {
if (queryExecutionThread.isAlive) {
commitLog.add(epoch)
val offset = offsetLog.get(epoch).get.offsets(0).get
committedOffsets ++= Seq(continuousSources(0) -> offset)
} else {
return
}
}

if (minLogEntriesToMaintain < currentBatchId) {
offsetLog.purge(currentBatchId - minLogEntriesToMaintain)
commitLog.purge(currentBatchId - minLogEntriesToMaintain)
}
if (minLogEntriesToMaintain < currentBatchId) {
offsetLog.purge(currentBatchId - minLogEntriesToMaintain)
commitLog.purge(currentBatchId - minLogEntriesToMaintain)
}

awaitProgressLock.lock()
try {
awaitProgressLockCondition.signalAll()
} finally {
awaitProgressLock.unlock()
Expand Down
Loading

0 comments on commit b4c3c55

Please sign in to comment.