Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport start/end options in schedule #1750

Merged
merged 2 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class ScheduleExecutor
implements BackgroundExecutor
{
private static final Logger logger = LoggerFactory.getLogger(ScheduleExecutor.class);
public static final List<String> BUILT_IN_SCHEDULE_PARAMS = Arrays.asList("skip_on_overtime", "skip_delayed_by");
public static final List<String> BUILT_IN_SCHEDULE_PARAMS = Arrays.asList("skip_on_overtime", "skip_delayed_by", "start", "end");

private final ProjectStoreManager rm;
private final ScheduleStoreManager sm;
Expand Down
21 changes: 18 additions & 3 deletions digdag-docs/src/scheduling_workflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Scheduling workflow
.. contents::
:local:

Setting up a schedule:
Setting up a schedule
----------------------------------

To run a workflow periodically, add a ``schedule:`` option to top-level workflow definitions.
Expand Down Expand Up @@ -135,7 +135,7 @@ It’s this case it’s best to skip the next hour’s workflow session, and ins
* Added a ``skip_on_overtime: true | false`` schedule option that can be used to control whether scheduled session execution should be skipped if another session is already running.
* Scheduled workflow sessions now have a ``last_executed_session_time`` variable which contains the previously executed session time. It is usually same with ``last_session_time`` but has different value when ``skip_on_overtime: true`` is set or the session is the first execution.

Skipping backfill.
Skipping backfill
------------------

The ``skip_delayed_by`` option enables `backfill <command_reference.html#backfill>`_ command to skip creating sessions delayed by the specified time. When Digdag restarts, sessions of a schedule are automatically created until the next of ``last_session_time``.
Expand All @@ -162,4 +162,19 @@ If the workflow is executed at 16:02 due to some reason, the session will be ski

schedule:
cron>: '0 16 * * *'
skip_delayed_by: 1s
skip_delayed_by: 1s

Set start/end
-------------
The ``start`` and ``end`` options set period of schedule.
The option accepts date format `YYYY-MM-DD`.
When ``start`` is set, the schedule will start on and after ``start``.
When ``end`` is set, the schedule will run until the day (include the day).
When next run time will be after the ``end``, next schedule will be set to `9999-01-01 00:00:00+0000` and never kicked.

.. note::
After the schedule ends, if you change ``end`` date to extends the period, the schedule will be resumed from the last session.
It causes multiple session running unexpectedly.
For example, you set `end: 2022-03-31` and current date is `2022-04-15`.
Then you update `end: 2022-04-31`, the sessions between `2022-04-01` and `2022-04-15` will be kicked.
To avoid the case, recommend to set ``skip_delayed_by``.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.digdag.standards.scheduler;

import io.digdag.spi.Scheduler;

import java.time.Instant;

public abstract class BaseScheduler implements Scheduler {
Instant SCHEDULE_END = Instant.ofEpochSecond(253370764800L); // 9999-01-01 00:00:00 +0000

boolean isScheduleFinished(Instant time)
{
if (time.equals(SCHEDULE_END) || time.isAfter(SCHEDULE_END)) {
return true;
}
else {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,26 @@
import java.util.TimeZone;
import java.time.Instant;
import java.time.ZoneId;

import com.google.common.base.Optional;
import io.digdag.spi.ScheduleTime;
import io.digdag.spi.Scheduler;
import it.sauronsoftware.cron4j.SchedulingPattern;
import it.sauronsoftware.cron4j.Predictor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CronScheduler
implements Scheduler
extends BaseScheduler
{
private static final Logger logger = LoggerFactory.getLogger(CronScheduler.class);

private final SchedulingPattern pattern;
private final ZoneId timeZone;
private final long delaySeconds;
protected final Optional<Instant> start;
protected final Optional<Instant> end;

CronScheduler(String cronPattern, ZoneId timeZone, long delaySeconds)
CronScheduler(String cronPattern, ZoneId timeZone, long delaySeconds, Optional<Instant> start, Optional<Instant> end)
{
this.pattern = new SchedulingPattern(cronPattern) {
// workaround for a bug of cron4j:
Expand All @@ -29,6 +36,8 @@ public boolean match(long millis)
};
this.timeZone = timeZone;
this.delaySeconds = delaySeconds;
this.start = start;
this.end = end;
}

@Override
Expand All @@ -40,13 +49,11 @@ public ZoneId getTimeZone()
@Override
public ScheduleTime getFirstScheduleTime(Instant currentTime)
{
Instant startTime = currentTime; // TODO make this from config

// truncate to seconds
Instant truncated = Instant.ofEpochSecond(currentTime.getEpochSecond());
if (truncated.equals(currentTime)) {
// in this particular case, minus 1 second to include this currentTime
// because Predictor doesn't include this time at "next"MatchingTime() method
// because Predictor doesn't include this time at nextMatchingTime() method
truncated = truncated.minusSeconds(1);
}
Instant lastTime = truncated.minusSeconds(delaySeconds);
Expand All @@ -57,8 +64,14 @@ public ScheduleTime getFirstScheduleTime(Instant currentTime)
@Override
public ScheduleTime nextScheduleTime(Instant lastScheduleTime)
{
Instant next = next(lastScheduleTime);
return ScheduleTime.of(next, next.plusSeconds(delaySeconds));
Instant next = nextWithStartEnd(lastScheduleTime);

if (isScheduleFinished(next)) {
return ScheduleTime.of(next, next);
}
else {
return ScheduleTime.of(next, next.plusSeconds(delaySeconds));
}
}

@Override
Expand Down Expand Up @@ -92,4 +105,20 @@ private Instant next(Instant time)
predictor.setTimeZone(TimeZone.getTimeZone(timeZone));
return Instant.ofEpochMilli(predictor.nextMatchingTime());
}

private Instant nextWithStartEnd(Instant lastScheduleTime)
{
Instant next = next(lastScheduleTime);
Instant nextRun = next.plusSeconds(delaySeconds);
if (end.isPresent() && (end.get().equals(nextRun) || end.get().isBefore(nextRun))) {
logger.debug("next run time is after to end. next_run:{}, end:{}", nextRun, end.get());
next = SCHEDULE_END;
}
else if (start.isPresent() && start.get().isAfter(nextRun)) {
logger.debug("next run time is before the start. next_run:{}, end:{}", nextRun, start.get());
// next run is earlier than start. recalculate from start
next = next(start.get().minusSeconds(1)); // -1s is required because predictor doesn't include this time at nextMatchingTime() method
}
return next;
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
package io.digdag.standards.scheduler;

import java.time.Instant;
import java.time.ZoneId;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import io.digdag.client.config.Config;
import io.digdag.spi.Scheduler;
import io.digdag.spi.SchedulerFactory;

public class CronSchedulerFactory
implements SchedulerFactory
{
private final ScheduleConfigHelper configHelper;

@Inject
public CronSchedulerFactory(ScheduleConfigHelper configHelper)
{
this.configHelper = configHelper;
}

@Override
public String getType()
{
Expand All @@ -17,9 +29,16 @@ public String getType()
@Override
public Scheduler newScheduler(Config config, ZoneId timeZone)
{
Optional<Instant> start = configHelper.getDateTimeStart(config, "start", timeZone);
Optional<Instant> end = configHelper.getDateTimeEnd(config, "end", timeZone);
configHelper.validateStartEnd(start, end);

return new CronScheduler(
config.get("_command", String.class),
timeZone,
config.get("delay", long.class, 0L));
config.get("delay", long.class, 0L),
start,
end
);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package io.digdag.standards.scheduler;

import java.time.Instant;
import java.time.ZoneId;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigException;
import io.digdag.spi.Scheduler;
Expand All @@ -9,6 +13,14 @@
public class DailySchedulerFactory
implements SchedulerFactory
{
private final ScheduleConfigHelper configHelper;

@Inject
public DailySchedulerFactory(ScheduleConfigHelper configHelper)
{
this.configHelper = configHelper;
}

@Override
public String getType()
{
Expand All @@ -19,7 +31,17 @@ public String getType()
public Scheduler newScheduler(Config config, ZoneId timeZone)
{
String at = config.getOptional("_command", String.class).or(() -> config.get("at", String.class));
return new CronScheduler("0 0 * * *", timeZone, parseAt("daily>", at));
Optional<Instant> start = configHelper.getDateTimeStart(config, "start", timeZone);
Optional<Instant> end = configHelper.getDateTimeEnd(config, "end", timeZone);
configHelper.validateStartEnd(start, end);

return new CronScheduler(
"0 0 * * *",
timeZone,
parseAt("daily>", at),
start,
end
);
}

static long parseAt(String kind, String at)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package io.digdag.standards.scheduler;

import java.time.Instant;
import java.time.ZoneId;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigException;
import io.digdag.spi.Scheduler;
Expand All @@ -9,6 +13,14 @@
public class HourlySchedulerFactory
implements SchedulerFactory
{
private final ScheduleConfigHelper configHelper;

@Inject
public HourlySchedulerFactory(ScheduleConfigHelper configHelper)
{
this.configHelper = configHelper;
}

@Override
public String getType()
{
Expand All @@ -19,7 +31,17 @@ public String getType()
public Scheduler newScheduler(Config config, ZoneId timeZone)
{
String at = config.getOptional("_command", String.class).or(() -> config.get("at", String.class));
return new CronScheduler("0 * * * *", timeZone, parseAt(at));
Optional<Instant> start = configHelper.getDateTimeStart(config, "start", timeZone);
Optional<Instant> end = configHelper.getDateTimeEnd(config, "end", timeZone);
configHelper.validateStartEnd(start, end);

return new CronScheduler(
"0 * * * *",
timeZone,
parseAt(at),
start,
end
);
}

private long parseAt(String at)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package io.digdag.standards.scheduler;

import java.time.Instant;
import java.time.ZoneId;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigException;
import io.digdag.spi.Scheduler;
Expand All @@ -9,6 +13,14 @@
public class MinutesIntervalSchedulerFactory
implements SchedulerFactory
{
private final ScheduleConfigHelper configHelper;

@Inject
public MinutesIntervalSchedulerFactory(ScheduleConfigHelper configHelper)
{
this.configHelper = configHelper;
}

@Override
public String getType()
{
Expand All @@ -20,6 +32,16 @@ public Scheduler newScheduler(Config config, ZoneId timeZone)
{
int interval = config.get("_command", int.class);
long delay = config.get("delay", long.class, 0L);
return new CronScheduler("*/" + interval + " * * * *", timeZone, delay);
Optional<Instant> start = configHelper.getDateTimeStart(config, "start", timeZone);
Optional<Instant> end = configHelper.getDateTimeEnd(config, "end", timeZone);
configHelper.validateStartEnd(start, end);

return new CronScheduler(
"*/" + interval + " * * * *",
timeZone,
delay,
start,
end
);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package io.digdag.standards.scheduler;

import java.time.Instant;
import java.time.ZoneId;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigException;
import io.digdag.spi.Scheduler;
Expand All @@ -11,6 +15,14 @@
public class MonthlySchedulerFactory
implements SchedulerFactory
{
private final ScheduleConfigHelper configHelper;

@Inject
public MonthlySchedulerFactory(ScheduleConfigHelper configHelper)
{
this.configHelper = configHelper;
}

@Override
public String getType()
{
Expand All @@ -37,6 +49,16 @@ public Scheduler newScheduler(Config config, ZoneId timeZone)

long dailyDelay = parseAt("monthly>", fragments[1]);

return new CronScheduler("0 0 " + day + " * *", timeZone, dailyDelay);
Optional<Instant> start = configHelper.getDateTimeStart(config, "start", timeZone);
Optional<Instant> end = configHelper.getDateTimeEnd(config, "end", timeZone);
configHelper.validateStartEnd(start, end);

return new CronScheduler(
"0 0 " + day + " * *",
timeZone,
dailyDelay,
start,
end
);
}
}
Loading