Skip to content

Commit

Permalink
Optimize lift and filter queries (#1966)
Browse files Browse the repository at this point in the history
add additional optimization to lift+filter queries
  • Loading branch information
Cisphyx authored Nov 25, 2020
1 parent 8240661 commit 534838a
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 11 deletions.
98 changes: 87 additions & 11 deletions synapse/lib/ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -1177,19 +1177,50 @@ async def lift(self, runt, path):

assert len(self.kids) == 1

# check if we can optimize a form lift with a tag filter...
# check if we can optimize a form lift
if prop.isform:
async for hint in self.getRightHints():
async for hint in self.getRightHints(runt, path):
if hint[0] == 'tag':
tagname = hint[1].get('name')
async for node in runt.snap.nodesByTag(tagname, form=name):
yield node
return

if hint[0] == 'relprop':
relpropname = hint[1].get('name')
isuniv = hint[1].get('univ')

if isuniv:
fullname = ''.join([name, relpropname])
else:
fullname = ':'.join([name, relpropname])

prop = runt.model.prop(fullname)
if prop is None:
continue

cmpr = hint[1].get('cmpr')
valu = hint[1].get('valu')

if cmpr is not None and valu is not None:
try:
# try lifting by valu but no guarantee a cmpr is available
async for node in runt.snap.nodesByPropValu(fullname, cmpr, valu):
yield node
return
except asyncio.CancelledError: # pragma: no cover
raise
except:
pass

async for node in runt.snap.nodesByProp(fullname):
yield node
return

async for node in runt.snap.nodesByProp(name):
yield node

async def getRightHints(self):
async def getRightHints(self, runt, path):

for oper in self.iterright():

Expand All @@ -1198,8 +1229,11 @@ async def getRightHints(self):
continue

if isinstance(oper, FiltOper):
for hint in await oper.getLiftHints():
for hint in await oper.getLiftHints(runt, path):
yield hint
continue

return

class LiftPropBy(LiftOper):

Expand Down Expand Up @@ -1785,7 +1819,7 @@ async def run(self, runt, genr):

class Cond(AstNode):

async def getLiftHints(self):
async def getLiftHints(self, runt, path):
return []

async def getCondEval(self, runt): # pragma: no cover
Expand Down Expand Up @@ -1935,9 +1969,9 @@ class AndCond(Cond):
'''
<cond> and <cond>
'''
async def getLiftHints(self):
h0 = await self.kids[0].getLiftHints()
h1 = await self.kids[0].getLiftHints()
async def getLiftHints(self, runt, path):
h0 = await self.kids[0].getLiftHints(runt, path)
h1 = await self.kids[0].getLiftHints(runt, path)
return h0 + h1

async def getCondEval(self, runt):
Expand Down Expand Up @@ -1972,7 +2006,7 @@ class TagCond(Cond):
'''
#foo.bar
'''
async def getLiftHints(self):
async def getLiftHints(self, runt, path):

kid = self.kids[0]

Expand Down Expand Up @@ -2028,6 +2062,26 @@ async def cond(node, path):

return cond

async def getLiftHints(self, runt, path):

relprop = self.kids[0]

name = await relprop.compute(runt, path)
ispiv = name.find('::') != -1
if ispiv:
return (
('relprop', {'name': name.split('::')[0]}),
)

hint = {
'name': name,
'univ': isinstance(relprop, UnivProp),
}

return (
('relprop', hint),
)

class HasTagPropCond(Cond):

async def getCondEval(self, runt):
Expand Down Expand Up @@ -2201,6 +2255,28 @@ async def cond(node, path):

return cond

async def getLiftHints(self, runt, path):

relprop = self.kids[0].kids[0]

name = await relprop.compute(runt, path)
ispiv = name.find('::') != -1
if ispiv:
return (
('relprop', {'name': name.split('::')[0]}),
)

hint = {
'name': name,
'univ': isinstance(relprop, UnivProp),
'cmpr': await self.kids[1].compute(runt, path),
'valu': await self.kids[2].compute(runt, path),
}

return (
('relprop', hint),
)

class TagPropCond(Cond):

async def getCondEval(self, runt):
Expand Down Expand Up @@ -2232,12 +2308,12 @@ async def cond(node, path):

class FiltOper(Oper):

async def getLiftHints(self):
async def getLiftHints(self, runt, path):

if await self.kids[0].compute(None, None) != '+':
return []

return await self.kids[1].getLiftHints()
return await self.kids[1].getLiftHints(runt, path)

async def run(self, runt, genr):

Expand Down
121 changes: 121 additions & 0 deletions synapse/tests/test_lib_ast.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import json

from unittest import mock

import synapse.exc as s_exc
import synapse.common as s_common

import synapse.lib.ast as s_ast
import synapse.lib.snap as s_snap

import synapse.tests.utils as s_test

Expand Down Expand Up @@ -1609,3 +1612,121 @@ async def test_ast_exprs(self):
'''
msgs = await core.stormlist(q)
self.stormIsInPrint('no', msgs)

async def test_ast_optimization(self):

calls = []

origprop = s_snap.Snap.nodesByProp
origvalu = s_snap.Snap.nodesByPropValu

async def checkProp(self, name):
calls.append(('prop', name))
async for node in origprop(self, name):
yield node

async def checkValu(self, name, cmpr, valu):
calls.append(('valu', name, cmpr, valu))
async for node in origvalu(self, name, cmpr, valu):
yield node

with mock.patch('synapse.lib.snap.Snap.nodesByProp', checkProp):
with mock.patch('synapse.lib.snap.Snap.nodesByPropValu', checkValu):
async with self.getTestCore() as core:

self.len(1, await core.nodes('[inet:asn=200 :name=visi]'))
self.len(1, await core.nodes('[inet:ipv4=1.2.3.4 :asn=200]'))
self.len(1, await core.nodes('[inet:ipv4=5.6.7.8]'))
self.len(1, await core.nodes('[inet:ipv4=5.6.7.9 :loc=us]'))
self.len(1, await core.nodes('[inet:ipv4=5.6.7.10 :loc=uk]'))
self.len(1, await core.nodes('[test:str=a :bar=(test:str, a) :tick=19990101]'))
self.len(1, await core.nodes('[test:str=m :bar=(test:str, m) :tick=20200101]'))

await core.nodes('.created [.seen=20200101]')
calls = []

nodes = await core.nodes('inet:ipv4 +:loc=us')
self.len(1, nodes)
self.eq(calls, [('valu', 'inet:ipv4:loc', '=', 'us')])
calls = []

nodes = await core.nodes('inet:ipv4 +:loc')
self.len(2, nodes)
self.eq(calls, [('prop', 'inet:ipv4:loc')])
calls = []

nodes = await core.nodes('$loc=us inet:ipv4 +:loc=$loc')
self.len(1, nodes)
self.eq(calls, [('valu', 'inet:ipv4:loc', '=', 'us')])
calls = []

nodes = await core.nodes('$prop=loc inet:ipv4 +:$prop=us')
self.len(1, nodes)
self.eq(calls, [('valu', 'inet:ipv4:loc', '=', 'us')])
calls = []

# Don't optimize if a non-lift happens before the filter
nodes = await core.nodes('$loc=us inet:ipv4 $loc=uk +:loc=$loc')
self.len(1, nodes)
self.eq(calls, [('prop', 'inet:ipv4')])
calls = []

nodes = await core.nodes('inet:ipv4:loc {$loc=:loc inet:ipv4 +:loc=$loc}')
self.len(2, nodes)
exp = [
('prop', 'inet:ipv4:loc'),
('valu', 'inet:ipv4:loc', '=', 'uk'),
('valu', 'inet:ipv4:loc', '=', 'us'),
]
self.eq(calls, exp)
calls = []

nodes = await core.nodes('inet:ipv4 +.seen')
self.len(4, nodes)
self.eq(calls, [('prop', 'inet:ipv4.seen')])
calls = []

# Should optimize both lifts
nodes = await core.nodes('inet:ipv4 test:str +.seen@=2020')
self.len(6, nodes)
exp = [
('valu', 'inet:ipv4.seen', '@=', '2020'),
('valu', 'test:str.seen', '@=', '2020'),
]
self.eq(calls, exp)
calls = []

# Optimize pivprop filter a bit
nodes = await core.nodes('inet:ipv4 +:asn::name=visi')
self.len(1, nodes)
self.eq(calls, [('prop', 'inet:ipv4:asn')])
calls = []

nodes = await core.nodes('inet:ipv4 +:asn::name')
self.len(0, nodes)
self.eq(calls, [('prop', 'inet:ipv4:asn')])
calls = []

nodes = await core.nodes('test:str +:tick*range=(19701125, 20151212)')
self.len(1, nodes)
self.eq(calls, [('valu', 'test:str:tick', 'range=', ['19701125', '20151212'])])
calls = []

# Lift by value will fail since stortype is MSGP
# can still optimize a bit though
nodes = await core.nodes('test:str +:bar*range=((test:str, c), (test:str, q))')
self.len(1, nodes)

exp = [
('valu', 'test:str:bar', 'range=', [['test:str', 'c'], ['test:str', 'q']]),
('prop', 'test:str:bar'),
]

self.eq(calls, exp)
calls = []

# Shouldn't optimize this, make sure the edit happens
msgs = await core.stormlist('inet:ipv4 | limit 1 | [.seen=now] +#notag')
self.len(1, [m for m in msgs if m[0] == 'node:edits'])
self.len(0, [m for m in msgs if m[0] == 'node'])
self.eq(calls, [('prop', 'inet:ipv4')])

0 comments on commit 534838a

Please sign in to comment.