From 438a52e9932caa343637378a7ad054bbed48a4d0 Mon Sep 17 00:00:00 2001 From: cisphyx Date: Fri, 20 Nov 2020 11:04:03 -0500 Subject: [PATCH 1/3] add --now to cron.at --- synapse/lib/agenda.py | 7 ++++- synapse/lib/storm.py | 6 ++-- synapse/lib/stormtypes.py | 7 ++++- synapse/tests/test_lib_agenda.py | 16 ++++++++++ synapse/tests/test_lib_stormtypes.py | 45 +++++++++++++++++++++++++++- 5 files changed, 76 insertions(+), 5 deletions(-) diff --git a/synapse/lib/agenda.py b/synapse/lib/agenda.py index 7defed6227..ac1807ab05 100644 --- a/synapse/lib/agenda.py +++ b/synapse/lib/agenda.py @@ -95,6 +95,7 @@ class TimeUnit(enum.IntEnum): DAY = enum.auto() # every day HOUR = enum.auto() MINUTE = enum.auto() + NOW = enum.auto() @classmethod def fromString(cls, s): @@ -619,15 +620,19 @@ async def add(self, cdef): reqs = [reqs] # Find all combinations of values in reqdict values and incvals values + nexttime = None recs = [] # type: ignore for req in reqs: + if TimeUnit.NOW in req: + nexttime = time.time() + continue reqdicts = self._dictproduct(req) if not isinstance(incvals, Iterable): incvals = (incvals, ) recs.extend(ApptRec(rd, incunit, v) for (rd, v) in itertools.product(reqdicts, incvals)) - appt = _Appt(self, iden, recur, indx, query, creator, recs) + appt = _Appt(self, iden, recur, indx, query, creator, recs, nexttime) self._addappt(iden, appt) appt.doc = cdef.get('doc', '') diff --git a/synapse/lib/storm.py b/synapse/lib/storm.py index eb755f5327..c400c7a5ab 100644 --- a/synapse/lib/storm.py +++ b/synapse/lib/storm.py @@ -688,13 +688,15 @@ ('--hour', {'help': 'Hour(s) to execute at.'}), ('--day', {'help': 'Day(s) to execute at.'}), ('--dt', {'help': 'Datetime(s) to execute at.'}), + ('--now', {'help': 'Execute immediately.', 'default': False, 'action': 'store_true'}), ), 'storm': ''' $cron = $lib.cron.at(query=$cmdopts.query, minute=$cmdopts.minute, hour=$cmdopts.hour, day=$cmdopts.day, - dt=$cmdopts.dt) + dt=$cmdopts.dt, + now=$cmdopts.now) $lib.print("Created cron job: {iden}", iden=$cron.iden) ''', @@ -733,7 +735,7 @@ if $crons { for $cron in $crons { $job = $cron.pack() - if (not $job.recur and $job.lastfinishtime) { + if (not $job.recs and $job.lastfinishtime) { $lib.cron.del($job.iden) $count = ($count + 1) } diff --git a/synapse/lib/stormtypes.py b/synapse/lib/stormtypes.py index 8c0d7536f7..4ece145a9d 100644 --- a/synapse/lib/stormtypes.py +++ b/synapse/lib/stormtypes.py @@ -4231,12 +4231,17 @@ def _ts_to_reqdict(ts): 'year': dt.year } - if not tslist: + atnow = kwargs.get('now') + + if not tslist and not atnow: mesg = 'At least one requirement must be provided' raise s_exc.StormRuntimeError(mesg=mesg, kwargs=kwargs) reqdicts = [_ts_to_reqdict(ts) for ts in tslist] + if atnow: + reqdicts.append({'now': True}) + cdef = {'storm': query, 'reqs': reqdicts, 'incunit': None, diff --git a/synapse/tests/test_lib_agenda.py b/synapse/tests/test_lib_agenda.py index 12d1eea309..830f445c3a 100644 --- a/synapse/tests/test_lib_agenda.py +++ b/synapse/tests/test_lib_agenda.py @@ -201,6 +201,22 @@ async def myeval(query, opts=None): await self.asyncraises(ValueError, agenda.add(cdef)) await self.asyncraises(s_exc.NoSuchIden, agenda.get('newp')) + # Schedule a one-shot to run immediately + cdef = {'useriden': rootiden, 'iden': 'DOIT', 'storm': '[test:str=doit]', + 'reqs': {s_agenda.TimeUnit.NOW: True}} + await agenda.add(cdef) + await sync.wait() # wait for the query to run + sync.clear() + self.eq(lastquery, '[test:str=doit]') + core.reset_mock() + lastquery = None + + appts = agenda.list() + self.len(1, appts) + self.eq(appts[0][1].startcount, 1) + self.eq(appts[0][1].nexttime, None) + await agenda.delete('DOIT') + # Schedule a one-shot 1 minute from now cdef = {'useriden': rootiden, 'iden': 'IDEN1', 'storm': '[test:str=foo]', 'reqs': {s_agenda.TimeUnit.MINUTE: 1}} diff --git a/synapse/tests/test_lib_stormtypes.py b/synapse/tests/test_lib_stormtypes.py index 189b61df7e..f05aee273c 100644 --- a/synapse/tests/test_lib_stormtypes.py +++ b/synapse/tests/test_lib_stormtypes.py @@ -2674,7 +2674,7 @@ async def getCronJob(text): mesgs = await core.stormlist(q) self.stormIsInErr('Query parameter is required', mesgs) - q = "cron.at --minute +5 {$lib.queue.get(foo).put(at1)}" + q = "cron.at --minute +5,+10 {$lib.queue.get(foo).put(at1)}" msgs = await core.stormlist(q) self.stormIsInPrint('Created cron job', msgs) @@ -2687,6 +2687,16 @@ async def getCronJob(text): self.eq('at1', await getNextFoo()) + # Shouldn't delete yet, still one more run scheduled + q = "cron.cleanup" + msgs = await core.stormlist(q) + self.stormIsInPrint('0 cron/at jobs deleted.', msgs) + + unixtime += 5 * MINSECS + core.agenda._wake_event.set() + + self.eq('at1', await getNextFoo()) + q = "cron.cleanup" msgs = await core.stormlist(q) self.stormIsInPrint('1 cron/at jobs deleted.', msgs) @@ -2742,6 +2752,39 @@ async def getCronJob(text): ################## + # Test --now + q = "cron.at --now {$lib.queue.get(foo).put(atnow)}" + msgs = await core.stormlist(q) + self.stormIsInPrint('Created cron job', msgs) + + self.eq('atnow', await getNextFoo()) + + q = "cron.cleanup" + msgs = await core.stormlist(q) + self.stormIsInPrint('1 cron/at jobs deleted.', msgs) + + q = "cron.at --now --minute +5 {$lib.queue.get(foo).put(atnow)}" + msgs = await core.stormlist(q) + self.stormIsInPrint('Created cron job', msgs) + + self.eq('atnow', await getNextFoo()) + + # Shouldn't delete yet, still one more run scheduled + q = "cron.cleanup" + msgs = await core.stormlist(q) + self.stormIsInPrint('0 cron/at jobs deleted.', msgs) + + unixtime += 5 * MINSECS + core.agenda._wake_event.set() + + self.eq('atnow', await getNextFoo()) + + q = "cron.cleanup" + msgs = await core.stormlist(q) + self.stormIsInPrint('1 cron/at jobs deleted.', msgs) + + ################## + # Test the aliases async with getCronJob('cron.add --hourly 15 {#foo}') as guid: mesgs = await core.stormlist(f'cron.stat {guid[:6]}') From 68e17f387b0f7fc424d7ec1613f8b41ee042953e Mon Sep 17 00:00:00 2001 From: cisphyx Date: Fri, 20 Nov 2020 13:15:12 -0500 Subject: [PATCH 2/3] prevent using "now" on a recurring job --- synapse/cortex.py | 5 +++++ synapse/lib/agenda.py | 3 +++ synapse/tests/test_lib_agenda.py | 13 +++++++++++++ 3 files changed, 21 insertions(+) diff --git a/synapse/cortex.py b/synapse/cortex.py index 50bc3670e2..27f74331ca 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -3486,6 +3486,7 @@ async def addCronJob(self, cdef): Notes: reqs must have fields present or incunit must not be None (or both) The incunit if not None it must be larger in unit size than all the keys in all reqs elements. + Non-recurring jobs may also have a req of 'now' which will cause the job to also execute immediately. ''' s_agenda.reqValidCdef(cdef) @@ -3505,6 +3506,10 @@ async def addCronJob(self, cdef): else: reqs = [self._convert_reqdict(req) for req in reqs] + if incunit is not None and s_agenda.TimeUnit.NOW in reqs: + mesg = "Recurring jobs may not be scheduled to run 'now'" + raise s_exc.BadConfValu(mesg) + cdef['reqs'] = reqs except KeyError: raise s_exc.BadConfValu('Unrecognized time unit') diff --git a/synapse/lib/agenda.py b/synapse/lib/agenda.py index ac1807ab05..9ffd2a46e2 100644 --- a/synapse/lib/agenda.py +++ b/synapse/lib/agenda.py @@ -624,6 +624,9 @@ async def add(self, cdef): recs = [] # type: ignore for req in reqs: if TimeUnit.NOW in req: + if incunit is not None: + mesg = "Recurring jobs may not be scheduled to run 'now'" + raise ValueError(mesg) nexttime = time.time() continue diff --git a/synapse/tests/test_lib_agenda.py b/synapse/tests/test_lib_agenda.py index 830f445c3a..2c82b3288f 100644 --- a/synapse/tests/test_lib_agenda.py +++ b/synapse/tests/test_lib_agenda.py @@ -201,6 +201,14 @@ async def myeval(query, opts=None): await self.asyncraises(ValueError, agenda.add(cdef)) await self.asyncraises(s_exc.NoSuchIden, agenda.get('newp')) + # Cannot schedule a recurring job with 'now' + cdef = {'useriden': rootiden, 'iden': 'DOIT', 'storm': '[test:str=doit]', + 'reqs': {s_agenda.TimeUnit.NOW: True}, + 'incunit': s_agenda.TimeUnit.MONTH, + 'incvals': 1} + await self.asyncraises(ValueError, agenda.add(cdef)) + await self.asyncraises(s_exc.NoSuchIden, agenda.get('DOIT')) + # Schedule a one-shot to run immediately cdef = {'useriden': rootiden, 'iden': 'DOIT', 'storm': '[test:str=doit]', 'reqs': {s_agenda.TimeUnit.NOW: True}} @@ -510,6 +518,11 @@ async def test_cron_perms(self): await proxy.delCronJob(cron0_iden) + cdef = {'storm': '[test:str=foo]', 'reqs': {'now': True}, + 'incunit': 'month', + 'incvals': 1} + await self.asyncraises(s_exc.BadConfValu, proxy.addCronJob(cdef)) + async with core.getLocalProxy(user='newb') as proxy: with self.raises(s_exc.AuthDeny): From 87545dfc31838b0277f5538f3df9cf2e9dc9525b Mon Sep 17 00:00:00 2001 From: cisphyx Date: Fri, 20 Nov 2020 13:24:38 -0500 Subject: [PATCH 3/3] adjust cron.cleanup --- synapse/lib/storm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/lib/storm.py b/synapse/lib/storm.py index c400c7a5ab..d3c69c95c6 100644 --- a/synapse/lib/storm.py +++ b/synapse/lib/storm.py @@ -735,7 +735,7 @@ if $crons { for $cron in $crons { $job = $cron.pack() - if (not $job.recs and $job.lastfinishtime) { + if (not $job.recs) { $lib.cron.del($job.iden) $count = ($count + 1) }