Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --now option to cron.at #1963

Merged
merged 4 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -3486,6 +3486,7 @@ async def addCronJob(self, cdef):
Notes:
reqs must have fields present or incunit must not be None (or both)
The incunit if not None it must be larger in unit size than all the keys in all reqs elements.
Non-recurring jobs may also have a req of 'now' which will cause the job to also execute immediately.
'''
s_agenda.reqValidCdef(cdef)

Expand All @@ -3505,6 +3506,10 @@ async def addCronJob(self, cdef):
else:
reqs = [self._convert_reqdict(req) for req in reqs]

if incunit is not None and s_agenda.TimeUnit.NOW in reqs:
mesg = "Recurring jobs may not be scheduled to run 'now'"
raise s_exc.BadConfValu(mesg)

cdef['reqs'] = reqs
except KeyError:
raise s_exc.BadConfValu('Unrecognized time unit')
Expand Down
10 changes: 9 additions & 1 deletion synapse/lib/agenda.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class TimeUnit(enum.IntEnum):
DAY = enum.auto() # every day
HOUR = enum.auto()
MINUTE = enum.auto()
NOW = enum.auto()

@classmethod
def fromString(cls, s):
Expand Down Expand Up @@ -619,15 +620,22 @@ async def add(self, cdef):
reqs = [reqs]

# Find all combinations of values in reqdict values and incvals values
nexttime = None
recs = [] # type: ignore
for req in reqs:
if TimeUnit.NOW in req:
if incunit is not None:
mesg = "Recurring jobs may not be scheduled to run 'now'"
raise ValueError(mesg)
nexttime = time.time()
continue

reqdicts = self._dictproduct(req)
if not isinstance(incvals, Iterable):
incvals = (incvals, )
recs.extend(ApptRec(rd, incunit, v) for (rd, v) in itertools.product(reqdicts, incvals))

appt = _Appt(self, iden, recur, indx, query, creator, recs)
appt = _Appt(self, iden, recur, indx, query, creator, recs, nexttime)
self._addappt(iden, appt)

appt.doc = cdef.get('doc', '')
Expand Down
6 changes: 4 additions & 2 deletions synapse/lib/storm.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,13 +688,15 @@
('--hour', {'help': 'Hour(s) to execute at.'}),
('--day', {'help': 'Day(s) to execute at.'}),
('--dt', {'help': 'Datetime(s) to execute at.'}),
('--now', {'help': 'Execute immediately.', 'default': False, 'action': 'store_true'}),
),
'storm': '''
$cron = $lib.cron.at(query=$cmdopts.query,
minute=$cmdopts.minute,
hour=$cmdopts.hour,
day=$cmdopts.day,
dt=$cmdopts.dt)
dt=$cmdopts.dt,
now=$cmdopts.now)

$lib.print("Created cron job: {iden}", iden=$cron.iden)
''',
Expand Down Expand Up @@ -733,7 +735,7 @@
if $crons {
for $cron in $crons {
$job = $cron.pack()
if (not $job.recur and $job.lastfinishtime) {
if (not $job.recs) {
$lib.cron.del($job.iden)
$count = ($count + 1)
}
Expand Down
7 changes: 6 additions & 1 deletion synapse/lib/stormtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4231,12 +4231,17 @@ def _ts_to_reqdict(ts):
'year': dt.year
}

if not tslist:
atnow = kwargs.get('now')

if not tslist and not atnow:
mesg = 'At least one requirement must be provided'
raise s_exc.StormRuntimeError(mesg=mesg, kwargs=kwargs)

reqdicts = [_ts_to_reqdict(ts) for ts in tslist]

if atnow:
reqdicts.append({'now': True})

cdef = {'storm': query,
'reqs': reqdicts,
'incunit': None,
Expand Down
29 changes: 29 additions & 0 deletions synapse/tests/test_lib_agenda.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,30 @@ async def myeval(query, opts=None):
await self.asyncraises(ValueError, agenda.add(cdef))
await self.asyncraises(s_exc.NoSuchIden, agenda.get('newp'))

# Cannot schedule a recurring job with 'now'
cdef = {'useriden': rootiden, 'iden': 'DOIT', 'storm': '[test:str=doit]',
'reqs': {s_agenda.TimeUnit.NOW: True},
'incunit': s_agenda.TimeUnit.MONTH,
'incvals': 1}
await self.asyncraises(ValueError, agenda.add(cdef))
await self.asyncraises(s_exc.NoSuchIden, agenda.get('DOIT'))

# Schedule a one-shot to run immediately
cdef = {'useriden': rootiden, 'iden': 'DOIT', 'storm': '[test:str=doit]',
'reqs': {s_agenda.TimeUnit.NOW: True}}
await agenda.add(cdef)
await sync.wait() # wait for the query to run
sync.clear()
self.eq(lastquery, '[test:str=doit]')
core.reset_mock()
lastquery = None

appts = agenda.list()
self.len(1, appts)
self.eq(appts[0][1].startcount, 1)
self.eq(appts[0][1].nexttime, None)
await agenda.delete('DOIT')

# Schedule a one-shot 1 minute from now
cdef = {'useriden': rootiden, 'iden': 'IDEN1', 'storm': '[test:str=foo]',
'reqs': {s_agenda.TimeUnit.MINUTE: 1}}
Expand Down Expand Up @@ -494,6 +518,11 @@ async def test_cron_perms(self):

await proxy.delCronJob(cron0_iden)

cdef = {'storm': '[test:str=foo]', 'reqs': {'now': True},
'incunit': 'month',
'incvals': 1}
await self.asyncraises(s_exc.BadConfValu, proxy.addCronJob(cdef))

async with core.getLocalProxy(user='newb') as proxy:

with self.raises(s_exc.AuthDeny):
Expand Down
45 changes: 44 additions & 1 deletion synapse/tests/test_lib_stormtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2674,7 +2674,7 @@ async def getCronJob(text):
mesgs = await core.stormlist(q)
self.stormIsInErr('Query parameter is required', mesgs)

q = "cron.at --minute +5 {$lib.queue.get(foo).put(at1)}"
q = "cron.at --minute +5,+10 {$lib.queue.get(foo).put(at1)}"
msgs = await core.stormlist(q)
self.stormIsInPrint('Created cron job', msgs)

Expand All @@ -2687,6 +2687,16 @@ async def getCronJob(text):

self.eq('at1', await getNextFoo())

# Shouldn't delete yet, still one more run scheduled
q = "cron.cleanup"
msgs = await core.stormlist(q)
self.stormIsInPrint('0 cron/at jobs deleted.', msgs)

unixtime += 5 * MINSECS
core.agenda._wake_event.set()

self.eq('at1', await getNextFoo())

q = "cron.cleanup"
msgs = await core.stormlist(q)
self.stormIsInPrint('1 cron/at jobs deleted.', msgs)
Expand Down Expand Up @@ -2742,6 +2752,39 @@ async def getCronJob(text):

##################

# Test --now
q = "cron.at --now {$lib.queue.get(foo).put(atnow)}"
msgs = await core.stormlist(q)
self.stormIsInPrint('Created cron job', msgs)

self.eq('atnow', await getNextFoo())

q = "cron.cleanup"
msgs = await core.stormlist(q)
self.stormIsInPrint('1 cron/at jobs deleted.', msgs)

q = "cron.at --now --minute +5 {$lib.queue.get(foo).put(atnow)}"
msgs = await core.stormlist(q)
self.stormIsInPrint('Created cron job', msgs)

self.eq('atnow', await getNextFoo())

# Shouldn't delete yet, still one more run scheduled
q = "cron.cleanup"
msgs = await core.stormlist(q)
self.stormIsInPrint('0 cron/at jobs deleted.', msgs)

unixtime += 5 * MINSECS
core.agenda._wake_event.set()

self.eq('atnow', await getNextFoo())

q = "cron.cleanup"
msgs = await core.stormlist(q)
self.stormIsInPrint('1 cron/at jobs deleted.', msgs)

##################

# Test the aliases
async with getCronJob('cron.add --hourly 15 {#foo}') as guid:
mesgs = await core.stormlist(f'cron.stat {guid[:6]}')
Expand Down