Skip to content

Commit

Permalink
attempt at schedulers on WorkManager
Browse files Browse the repository at this point in the history
  • Loading branch information
cjbrooks12 committed Oct 23, 2023
1 parent 8dba004 commit c41bfc9
Show file tree
Hide file tree
Showing 26 changed files with 878 additions and 127 deletions.
4 changes: 3 additions & 1 deletion ballast-scheduler/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ kotlin {
dependencies { }
}
val androidMain by getting {
dependencies { }
dependencies {
implementation("androidx.work:work-runtime-ktx:2.8.1")
}
}
val jsMain by getting {
dependencies { }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package com.copperleaf.ballast.scheduler.workmanager

import android.content.Context
import android.util.Log
import androidx.work.CoroutineWorker
import androidx.work.Data
import androidx.work.ExistingWorkPolicy
import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.WorkManager
import androidx.work.WorkerParameters
import androidx.work.await
import com.copperleaf.ballast.scheduler.SchedulerAdapter
import com.copperleaf.ballast.scheduler.workmanager.BallastWorkManagerScheduleWorker.Companion.DATA_ADAPTER_CLASS
import com.copperleaf.ballast.scheduler.workmanager.BallastWorkManagerScheduleWorker.Companion.DATA_INITIAL_INSTANT
import com.copperleaf.ballast.scheduler.workmanager.BallastWorkManagerScheduleWorker.Companion.DATA_LATEST_INSTANT
import com.copperleaf.ballast.scheduler.workmanager.BallastWorkManagerScheduleWorker.Companion.SCHEDULE_NAME_PREFIX
import kotlinx.coroutines.coroutineScope
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant

/**
* This is a WorkManager job which takes a [SchedulerAdapter] and creates new WorkManager tasks for each registered
* schedule. Those are represented by OneTimeJobs which schedule their next execution moment after each subsequent
* execution.
*
* The newly scheduled jobs are instances of [BallastWorkManagerScheduleWorker] which run the current execution, and
* then schedule the next execution.
*/
@Suppress("UNCHECKED_CAST")
public open class BallastWorkManagerScheduleDispatcher(
context: Context,
workerParams: WorkerParameters
) : CoroutineWorker(context, workerParams) {

protected open fun getAdapter(adapterClassName: String): SchedulerAdapter<*, *, *> {
return (Class.forName(adapterClassName) as Class<SchedulerAdapter<*, *, *>>)
.getConstructor()
.newInstance()
}

final override suspend fun doWork(): Result = coroutineScope {
Log.d("BallastWorkManager", "Handling work dispatch")
val workManager = WorkManager.getInstance(applicationContext)

val adapterClassName = inputData.getString(DATA_ADAPTER_CLASS)!!
val initialInstant = Instant.fromEpochMilliseconds(inputData.getLong(DATA_INITIAL_INSTANT, 0))
val latestInstant = Instant.fromEpochMilliseconds(inputData.getLong(DATA_LATEST_INSTANT, 0))

// run the adapter to get the schedules which should run
val adapter: SchedulerAdapter<*, *, *> = getAdapter(adapterClassName)
val schedules = adapter.getRegisteredSchedules()

// Make sure each registered schedule is set up
schedules.forEach { schedule ->
BallastWorkManagerScheduleWorker.setupSchedule(
workManager = workManager,
adapter = adapter,
registeredSchedule = schedule,
initialInstant = initialInstant,
latestInstant = latestInstant,
)
}

// remove schedules which are not part of the current adapter's schedule
try {
coroutineScope {
val scheduledJobTags = listOf("ballast", "schedule", adapterClassName)
val orphanedJobsForThisAdapter = workManager
.getWorkInfosByTag("ballast")
.await()
.filter {
// get the WorkManager jobs which were created from this adapter
it.tags.containsAll(scheduledJobTags)
}
.map {
// the remaining tag should be the schedule's key, which
tags.single { it.startsWith(SCHEDULE_NAME_PREFIX) }.removePrefix(SCHEDULE_NAME_PREFIX)
}
.filter { scheduleName ->
// get the WorkManager jobs which were created from this adapter, but are not part of the
// currently-scheduled jobs
schedules.none { it.key == scheduleName }
}

// cancel those jobs
orphanedJobsForThisAdapter.forEach { workName ->
Log.d("BallastWorkManager", "Cancelling orphaned work schedule at '$workName'")
workManager.cancelUniqueWork(workName)
}
}
} catch (e: Exception) {
// ignore
}

Result.success()
}

public companion object {

/**
* Schedule BallastWorkManagerScheduleDispatcher to run immediately as a unique job, replacing any existing
* jobs. The job is keyed off the [adapter]'s filly-qualified class name.
*
* BallastWorkManagerScheduleDispatcher will make sure schedules are configured for all the scheduled registered
* in the adapter. If a schedule does not exist, it will create it. If a registered schedule is already part of
* WorkManager, it will leave it alone, since the schedule job will do the work to enqueue each successive
* invocation. If there are enqueued jobs at keys which are not in the Adapter's registered schedules, those
* will be cancelled, as it will be assumed they were configured in a previous release, but removed in the
* current version.
*/
public fun <T, I : Any, E : Any, S : Any> scheduleWork(
workManager: WorkManager,
adapter: T,
) where T : SchedulerAdapter<I, E, S>, T : Function1<I, Unit> {
Log.d("BallastWorkManager", "Scheduling work dispatch")
val instant = Clock.System.now()
val adapterClassName = adapter.javaClassName

workManager
.beginUniqueWork(
/* uniqueWorkName = */ adapterClassName,
/* existingWorkPolicy = */ ExistingWorkPolicy.REPLACE,
/* work = */ OneTimeWorkRequestBuilder<BallastWorkManagerScheduleDispatcher>()
.setInputData(
Data.Builder()
.putString(DATA_ADAPTER_CLASS, adapterClassName)
.putLong(DATA_INITIAL_INSTANT, instant.toEpochMilliseconds())
.putLong(DATA_LATEST_INSTANT, instant.toEpochMilliseconds())
.build()
)
.addTag("ballast")
.addTag("dispatcher")
.addTag(adapterClassName)
.build()
)
.enqueue()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package com.copperleaf.ballast.scheduler.workmanager

import android.content.Context
import android.util.Log
import androidx.work.CoroutineWorker
import androidx.work.Data
import androidx.work.ExistingWorkPolicy
import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.WorkManager
import androidx.work.WorkQuery
import androidx.work.WorkerParameters
import androidx.work.await
import com.copperleaf.ballast.scheduler.SchedulerAdapter
import com.copperleaf.ballast.scheduler.executor.ScheduleExecutor
import com.copperleaf.ballast.scheduler.internal.RegisteredSchedule
import com.copperleaf.ballast.scheduler.schedule.dropHistory
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.withContext
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlin.time.toJavaDuration

/**
* This is a WorkManager job which executes on each tick of the registered schedule, then enqueues the next Instant
* that the job should rerun. It also is responsible for accessing the target VM that the Input should be sent to on
* each task tick.
*/
public class BallastWorkManagerScheduleWorker(
context: Context,
workerParams: WorkerParameters
) : CoroutineWorker(context, workerParams) {

final override suspend fun doWork(): Result = coroutineScope {
val workManager = WorkManager.getInstance(applicationContext)
val adapterClassQualifiedName: String = inputData.getString(DATA_ADAPTER_CLASS)!!
val scheduleKey: String = inputData.getString(DATA_KEY)!!
val adapterClass: Class<SchedulerAdapter<*, *, *>> =
Class.forName(adapterClassQualifiedName) as Class<SchedulerAdapter<*, *, *>>
val adapter: SchedulerAdapter<*, *, *> = adapterClass.getConstructor().newInstance()
val registeredSchedule: RegisteredSchedule<*, *, *> =
adapter.getRegisteredSchedules().single { it.key == scheduleKey }

Log.d("BallastWorkManager", "running periodic job at '${registeredSchedule.key}'")
val initialInstant = Instant.fromEpochMilliseconds(inputData.getLong(DATA_INITIAL_INSTANT, 0))
val latestInstant = Instant.fromEpochMilliseconds(inputData.getLong(DATA_LATEST_INSTANT, 0))

when (registeredSchedule.delayMode) {
ScheduleExecutor.DelayMode.FireAndForget -> {
BallastWorkManagerScheduleWorker.scheduleNextInvocation(
workManager = workManager,
adapter = adapter,
registeredSchedule = registeredSchedule,
initialInstant = initialInstant,
latestInstant = latestInstant,
)
dispatchWork(
adapter = adapter,
registeredSchedule = registeredSchedule,
deferred = null,
)
}

ScheduleExecutor.DelayMode.Suspend -> {
val deferred = CompletableDeferred<Unit>()
dispatchWork(
adapter = adapter,
registeredSchedule = registeredSchedule,
deferred = deferred,
)
deferred.await()
BallastWorkManagerScheduleWorker.scheduleNextInvocation(
workManager = workManager,
adapter = adapter,
registeredSchedule = registeredSchedule,
initialInstant = initialInstant,
latestInstant = latestInstant,
)
}
}

Result.success()
}

private suspend fun dispatchWork(
adapter: SchedulerAdapter<*, *, *>,
registeredSchedule: RegisteredSchedule<*, *, *>,
deferred: CompletableDeferred<Unit>?
) {
check(adapter is Function1<*, *>) {
"adapter must be Function1<I, Unit>"
}

invokeWith(
adapter,
registeredSchedule.scheduledInput()
)

deferred?.complete(Unit)
}

private suspend fun <P1, R> invokeWith(
fn: Function1<P1, R>,
input: Any
) {
withContext(Dispatchers.Main) {
fn.invoke(input as P1)
}
}

public companion object {
public const val DATA_ADAPTER_CLASS: String = "DATA_ADAPTER_CLASS"
public const val DATA_KEY: String = "DATA_KEY"
public const val DATA_INITIAL_INSTANT: String = "DATA_INITIAL_INSTANT"
public const val DATA_LATEST_INSTANT: String = "DATA_LATEST_INSTANT"
public const val SCHEDULE_NAME_PREFIX: String = "ballast_schedule_key_"

internal suspend fun setupSchedule(
workManager: WorkManager,
adapter: SchedulerAdapter<*, *, *>,
registeredSchedule: RegisteredSchedule<*, *, *>,
initialInstant: Instant,
latestInstant: Instant,
) {
try {
val workInfo = workManager
.getWorkInfos(WorkQuery.fromUniqueWorkNames(listOf(registeredSchedule.key)))
.await()
if (workInfo.isNotEmpty()) {
// if there is already a scheduled task at this key, just return
return
}
} catch (e: Exception) {
// ignore
}

Log.d("BallastWorkManager", "Creating periodic work schedule at '${registeredSchedule.key}'")
val nextInstant: Instant? = registeredSchedule.schedule
.dropHistory(initialInstant, latestInstant)
.firstOrNull()
if (nextInstant == null) {
// schedule has completed, don't schedule another task
Log.d("BallastWorkManager", "periodic work at '${registeredSchedule.key}' completed")
return
}

val delayAmount = nextInstant - Clock.System.now()
val adapterClassName = adapter.javaClassName

Log.d(
"BallastWorkManager",
"Scheduling next periodic work at '${registeredSchedule.key}' (to trigger at in $delayAmount at $nextInstant)"
)
workManager
.beginUniqueWork(
/* uniqueWorkName = */ registeredSchedule.key,
/* existingWorkPolicy = */ ExistingWorkPolicy.REPLACE,
/* work = */ OneTimeWorkRequestBuilder<BallastWorkManagerScheduleWorker>()
.setInputData(
Data.Builder()
.putString(DATA_ADAPTER_CLASS, adapterClassName)
.putString(DATA_KEY, registeredSchedule.key)
.putLong(DATA_INITIAL_INSTANT, initialInstant.toEpochMilliseconds())
.putLong(DATA_LATEST_INSTANT, nextInstant.toEpochMilliseconds())
.build()
)
.addTag("ballast")
.addTag("schedule")
.addTag(adapterClassName)
.addTag("$SCHEDULE_NAME_PREFIX${registeredSchedule.key}")
.setInitialDelay(delayAmount.toJavaDuration())
.build()
)
.enqueue()
}

internal fun scheduleNextInvocation(
workManager: WorkManager,
adapter: SchedulerAdapter<*, *, *>,
registeredSchedule: RegisteredSchedule<*, *, *>,
initialInstant: Instant,
latestInstant: Instant,
) {
Log.d("BallastWorkManager", "Scheduling periodic work at '${registeredSchedule.key}'")
val nextInstant: Instant? = registeredSchedule.schedule
.dropHistory(initialInstant, latestInstant)
.firstOrNull()
if (nextInstant == null) {
// schedule has completed, don't schedule another task
Log.d("BallastWorkManager", "periodic work at '${registeredSchedule.key}' completed")
return
}

val delayAmount = nextInstant - Clock.System.now()
val adapterClassName = adapter.javaClassName

Log.d(
"BallastWorkManager",
"Scheduling next periodic work at '${registeredSchedule.key}' (to trigger at in $delayAmount at $nextInstant)"
)
workManager
.beginUniqueWork(
/* uniqueWorkName = */ registeredSchedule.key,
/* existingWorkPolicy = */ ExistingWorkPolicy.REPLACE,
/* work = */ OneTimeWorkRequestBuilder<BallastWorkManagerScheduleWorker>()
.setInputData(
Data.Builder()
.putString(DATA_ADAPTER_CLASS, adapterClassName)
.putString(DATA_KEY, registeredSchedule.key)
.putLong(DATA_INITIAL_INSTANT, initialInstant.toEpochMilliseconds())
.putLong(DATA_LATEST_INSTANT, nextInstant.toEpochMilliseconds())
.build()
)
.addTag("ballast")
.addTag("schedule")
.addTag(adapterClassName)
.addTag("$SCHEDULE_NAME_PREFIX${registeredSchedule.key}")
.setInitialDelay(delayAmount.toJavaDuration())
.build()
)
.enqueue()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.copperleaf.ballast.scheduler.workmanager

import com.copperleaf.ballast.scheduler.SchedulerAdapter
import com.copperleaf.ballast.scheduler.internal.RegisteredSchedule
import com.copperleaf.ballast.scheduler.internal.SchedulerAdapterScopeImpl


internal fun <I : Any, E : Any, S : Any> SchedulerAdapter<I, E, S>.getRegisteredSchedules()
: List<RegisteredSchedule<*, *, *>> {
val adapter = this
val adapterScope = SchedulerAdapterScopeImpl<I, E, S>()

with(adapter) {
adapterScope.configureSchedules()
}

// cancel any running schedules which have the same keys as the newly requested schedules
return adapterScope.schedules
}

internal val <T: Any> T.javaClassName: String get() = this::class.java.name
Loading

0 comments on commit c41bfc9

Please sign in to comment.