diff --git a/synapse/cortex.py b/synapse/cortex.py index 50bc3670e2..27f74331ca 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -3486,6 +3486,7 @@ async def addCronJob(self, cdef): Notes: reqs must have fields present or incunit must not be None (or both) The incunit if not None it must be larger in unit size than all the keys in all reqs elements. + Non-recurring jobs may also have a req of 'now' which will cause the job to also execute immediately. ''' s_agenda.reqValidCdef(cdef) @@ -3505,6 +3506,10 @@ async def addCronJob(self, cdef): else: reqs = [self._convert_reqdict(req) for req in reqs] + if incunit is not None and s_agenda.TimeUnit.NOW in reqs: + mesg = "Recurring jobs may not be scheduled to run 'now'" + raise s_exc.BadConfValu(mesg) + cdef['reqs'] = reqs except KeyError: raise s_exc.BadConfValu('Unrecognized time unit') diff --git a/synapse/lib/agenda.py b/synapse/lib/agenda.py index 7defed6227..9ffd2a46e2 100644 --- a/synapse/lib/agenda.py +++ b/synapse/lib/agenda.py @@ -95,6 +95,7 @@ class TimeUnit(enum.IntEnum): DAY = enum.auto() # every day HOUR = enum.auto() MINUTE = enum.auto() + NOW = enum.auto() @classmethod def fromString(cls, s): @@ -619,15 +620,22 @@ async def add(self, cdef): reqs = [reqs] # Find all combinations of values in reqdict values and incvals values + nexttime = None recs = [] # type: ignore for req in reqs: + if TimeUnit.NOW in req: + if incunit is not None: + mesg = "Recurring jobs may not be scheduled to run 'now'" + raise ValueError(mesg) + nexttime = time.time() + continue reqdicts = self._dictproduct(req) if not isinstance(incvals, Iterable): incvals = (incvals, ) recs.extend(ApptRec(rd, incunit, v) for (rd, v) in itertools.product(reqdicts, incvals)) - appt = _Appt(self, iden, recur, indx, query, creator, recs) + appt = _Appt(self, iden, recur, indx, query, creator, recs, nexttime) self._addappt(iden, appt) appt.doc = cdef.get('doc', '') diff --git a/synapse/lib/storm.py b/synapse/lib/storm.py index eb755f5327..d3c69c95c6 100644 --- a/synapse/lib/storm.py +++ b/synapse/lib/storm.py @@ -688,13 +688,15 @@ ('--hour', {'help': 'Hour(s) to execute at.'}), ('--day', {'help': 'Day(s) to execute at.'}), ('--dt', {'help': 'Datetime(s) to execute at.'}), + ('--now', {'help': 'Execute immediately.', 'default': False, 'action': 'store_true'}), ), 'storm': ''' $cron = $lib.cron.at(query=$cmdopts.query, minute=$cmdopts.minute, hour=$cmdopts.hour, day=$cmdopts.day, - dt=$cmdopts.dt) + dt=$cmdopts.dt, + now=$cmdopts.now) $lib.print("Created cron job: {iden}", iden=$cron.iden) ''', @@ -733,7 +735,7 @@ if $crons { for $cron in $crons { $job = $cron.pack() - if (not $job.recur and $job.lastfinishtime) { + if (not $job.recs) { $lib.cron.del($job.iden) $count = ($count + 1) } diff --git a/synapse/lib/stormtypes.py b/synapse/lib/stormtypes.py index 8c0d7536f7..4ece145a9d 100644 --- a/synapse/lib/stormtypes.py +++ b/synapse/lib/stormtypes.py @@ -4231,12 +4231,17 @@ def _ts_to_reqdict(ts): 'year': dt.year } - if not tslist: + atnow = kwargs.get('now') + + if not tslist and not atnow: mesg = 'At least one requirement must be provided' raise s_exc.StormRuntimeError(mesg=mesg, kwargs=kwargs) reqdicts = [_ts_to_reqdict(ts) for ts in tslist] + if atnow: + reqdicts.append({'now': True}) + cdef = {'storm': query, 'reqs': reqdicts, 'incunit': None, diff --git a/synapse/tests/test_lib_agenda.py b/synapse/tests/test_lib_agenda.py index 12d1eea309..2c82b3288f 100644 --- a/synapse/tests/test_lib_agenda.py +++ b/synapse/tests/test_lib_agenda.py @@ -201,6 +201,30 @@ async def myeval(query, opts=None): await self.asyncraises(ValueError, agenda.add(cdef)) await self.asyncraises(s_exc.NoSuchIden, agenda.get('newp')) + # Cannot schedule a recurring job with 'now' + cdef = {'useriden': rootiden, 'iden': 'DOIT', 'storm': '[test:str=doit]', + 'reqs': {s_agenda.TimeUnit.NOW: True}, + 'incunit': s_agenda.TimeUnit.MONTH, + 'incvals': 1} + await self.asyncraises(ValueError, agenda.add(cdef)) + await self.asyncraises(s_exc.NoSuchIden, agenda.get('DOIT')) + + # Schedule a one-shot to run immediately + cdef = {'useriden': rootiden, 'iden': 'DOIT', 'storm': '[test:str=doit]', + 'reqs': {s_agenda.TimeUnit.NOW: True}} + await agenda.add(cdef) + await sync.wait() # wait for the query to run + sync.clear() + self.eq(lastquery, '[test:str=doit]') + core.reset_mock() + lastquery = None + + appts = agenda.list() + self.len(1, appts) + self.eq(appts[0][1].startcount, 1) + self.eq(appts[0][1].nexttime, None) + await agenda.delete('DOIT') + # Schedule a one-shot 1 minute from now cdef = {'useriden': rootiden, 'iden': 'IDEN1', 'storm': '[test:str=foo]', 'reqs': {s_agenda.TimeUnit.MINUTE: 1}} @@ -494,6 +518,11 @@ async def test_cron_perms(self): await proxy.delCronJob(cron0_iden) + cdef = {'storm': '[test:str=foo]', 'reqs': {'now': True}, + 'incunit': 'month', + 'incvals': 1} + await self.asyncraises(s_exc.BadConfValu, proxy.addCronJob(cdef)) + async with core.getLocalProxy(user='newb') as proxy: with self.raises(s_exc.AuthDeny): diff --git a/synapse/tests/test_lib_stormtypes.py b/synapse/tests/test_lib_stormtypes.py index 189b61df7e..f05aee273c 100644 --- a/synapse/tests/test_lib_stormtypes.py +++ b/synapse/tests/test_lib_stormtypes.py @@ -2674,7 +2674,7 @@ async def getCronJob(text): mesgs = await core.stormlist(q) self.stormIsInErr('Query parameter is required', mesgs) - q = "cron.at --minute +5 {$lib.queue.get(foo).put(at1)}" + q = "cron.at --minute +5,+10 {$lib.queue.get(foo).put(at1)}" msgs = await core.stormlist(q) self.stormIsInPrint('Created cron job', msgs) @@ -2687,6 +2687,16 @@ async def getCronJob(text): self.eq('at1', await getNextFoo()) + # Shouldn't delete yet, still one more run scheduled + q = "cron.cleanup" + msgs = await core.stormlist(q) + self.stormIsInPrint('0 cron/at jobs deleted.', msgs) + + unixtime += 5 * MINSECS + core.agenda._wake_event.set() + + self.eq('at1', await getNextFoo()) + q = "cron.cleanup" msgs = await core.stormlist(q) self.stormIsInPrint('1 cron/at jobs deleted.', msgs) @@ -2742,6 +2752,39 @@ async def getCronJob(text): ################## + # Test --now + q = "cron.at --now {$lib.queue.get(foo).put(atnow)}" + msgs = await core.stormlist(q) + self.stormIsInPrint('Created cron job', msgs) + + self.eq('atnow', await getNextFoo()) + + q = "cron.cleanup" + msgs = await core.stormlist(q) + self.stormIsInPrint('1 cron/at jobs deleted.', msgs) + + q = "cron.at --now --minute +5 {$lib.queue.get(foo).put(atnow)}" + msgs = await core.stormlist(q) + self.stormIsInPrint('Created cron job', msgs) + + self.eq('atnow', await getNextFoo()) + + # Shouldn't delete yet, still one more run scheduled + q = "cron.cleanup" + msgs = await core.stormlist(q) + self.stormIsInPrint('0 cron/at jobs deleted.', msgs) + + unixtime += 5 * MINSECS + core.agenda._wake_event.set() + + self.eq('atnow', await getNextFoo()) + + q = "cron.cleanup" + msgs = await core.stormlist(q) + self.stormIsInPrint('1 cron/at jobs deleted.', msgs) + + ################## + # Test the aliases async with getCronJob('cron.add --hourly 15 {#foo}') as guid: mesgs = await core.stormlist(f'cron.stat {guid[:6]}')