Skip to content

Storage Filter Pushdown

Paul Rogers edited this page Dec 6, 2019 · 11 revisions

Create a Storage Plugin Filter Push-Down

Typically you use "filter push-down" to figure out these partitions:

SELECT * FROM myPlugin.timeSeries
WHERE eventTime BETWEEN '2019-11-15 13:00:00' AND '2019-11-15 14:00:00'

You use the WHERE clause filter to identify the time range, then use plugin-specific logic to understand that, say, eventTime is a partition column, and to work out the partitions. For file-based storage, the directory structure drives partitions. For external systems, the partitioning is likely to be very specific to that system.

Filter push-down is a complex topic because it requires you to work within the Calcite query planner. Query planning itself is complex, and it helps to have at least some familiarity with the topic.

As you read this section, you may want to look at (and play with) the DummyStoragePlugin which is the "test mule" for this functionality.

Filter Push-Down Use Cases

A "filter" is a condition in our query, typically in the WHERE clause. Filter "push-down" is where we move the filter out of the query and into our scan implementation. There are two use cases:

  • Optional: the filter can be executed in Drill, but we can improve performance by pushing the filter.
  • Mandatory: the filter cannot be executed in Drill and must be pushed into the scan.

Optional Filter Push-Down

Suppose you are given a query such as the following:

SELECT a, b, c FROM example.myTable WHERE a = 10 AND myudf(b, 3) = 'foo'

Here, myudf is a user-defined function (UDF) known only to Drill.

Let's also assume that our data source can handle simple equality clauses. Perhaps we have a partitioned directory structure:

myTable
|- a=1
|- a=2
...
|- a=10

If so, then we can restrict our scan to just the a=10 subdirectory. This is called a "filter push-down" we push the a = 10 filter out of Drill into our scan. In this case, we don't have to do the push down: we could scan all directories and let Drill throw away the unwanted data. Thus filter push-down increases performance, but does not change query results.

Note that the myudf(b, 3) = 'foo' clause cannot be pushed down: it refers to a function known only to Drill. (Though if myudf were, in fact, a function known to your data source, you could push that function as well. That, however, would be a fairly obscure use case.)

Mandatory Filter Push-Down

Or, perhaps we are using a data source such as REST where we must pass certain parameters to the target service:

SELECT * FROM example.timeseries
WHERE startTime = '2018-11-30T10:00:00'
  AND endTime = '2018-11-30T11:00:00'

In this case, we require the filter, and we want to remove the filter from the query because the startTime and endTime fields will not appear in each record. (Instead, if we leave the filters in the query, Drill will create a null value for those fields, which will never evaluate as equal to the given dates, and so the query will return no results.)

Or, perhaps we are using a data source such as JDBC where we can "push" part of the WHERE clause to the data source.

Concepts

In either case, we want to do three things:

  • Capture the simple equality clause, a = 10 so that our storage plugin can handle it, while leaving the Drill-specific predicate, myudf(b, 3) = 'foo', for Drill to execute.
  • Rewrite the query to remove the pushed-down predicates:
SELECT a, b, c FROM example.myTable WHERE myudf(b, 3) = 'foo'
  • Execute logic to implement the predicate so that the query returns the same result as if Drill were to execute the same query (but, presumably, do so faster because we do not ship unneeded data to Drill.)

Basics: CNF

Database theory speaks of "conjunctive normal form" (CNF) which is just a fancy way of describing a set of conditions joined by AND:

a = 10 AND myudf(b, 3) = 'foo'

CNF is important because conditions can be applied independently. The following are equivalent:

SELECT a, b, c FROM example.myTable WHERE a = 10 AND myudf(b, 3) = 'foo'
SELECT a, b, c FROM (
  SELECT a, b, c FROM example.myTable WHERE a = 10)
  WHERE myudf(b, 3) = 'foo'

Basics: DNF

Note that the above is not true if our WHERE clause contains top-level OR statements:

SELECT a, b, c FROM example.myTable WHERE a = 10 OR myudf(b, 3) = 'foo'

The fancy term for a series of OR statements is "disjunctive normal form" (DNF).

The conclusion is that we can push filter predicates into our storage plugin if they appear anywhere in a top-level AND clause, but not if they appear in an OR clause. (Except for a special case we discuss later.)

Calcite

Calcite presents the WHERE clause to us as a parse tree. Given the following:

SELECT a, b, c FROM example.myTable WHERE a = 10 AND myudf(b, 3) = 'foo' AND c = 30

Calcite produces a binary tree of expressions:

expression(AND)
  - a = 10
  - expression(AND)
    - myudf(b, 3) = 'foo'
    - c = 30

So, our goal is to walk the tree, find all the conjunctive terms, decide which we can rewrite, and create a new tree that includes all the terms we cannot rewrite (which could be the entire tree, part of the tree, or nothing.)

Logical vs. Physical Planning

Drill's Calcite-based planner converts the above query to an executable plan in multiple steps. It is important that we choose the correct step in which to perform push down. The major forms of the plan are:

  • SQL parse tree
  • Calcite logical plan
  • Drill logical plan
  • Drill "physical" (un-parallelized logical) plan
  • Drill parallelized operator descriptions
  • Drill executable operators

The first five steps occur in the planner, the last step occurs at run time.

Most existing plugins that implement filter push-down do so during physical planning. However, this creates a race condition: we don't know how to parallelize until we see the filters, but filter push-down happens after parallelization decisions are made.

To resolve this, the Base framework provides a set of helper classes that perform filter push-down during logical planning so that we can be sure filter push-down is done before physical planning starts.

We do this because, as explained here, we want to use filters to drive query parallelization.

Planner Rules

Calcite is a rule-driven, cost-based planner. Each planner phase gathers Calcite rules to apply to the plan tree. Our storage plugin can contribute rules unique to our plugin:

  @Override
  public Set<? extends StoragePluginOptimizerRule> getOptimizerRules(
                        OptimizerRulesContext optimizerContext, PlannerPhase phase) {
    if (phase.isFilterPushDownPhase()) {
      return ExampleFilterPushDownListener.rulesFor(optimizerContext, config);
    }
    return ImmutableSet.of();
  }

Advanced Debugging Hints

If you use the Base filter push-down framework, things should "just work." However, if you need to create your own implementation, you will need to know quite a bit about Calcite planning internals. Here are some of the basics.

The phases are driven by 'DefaultSqlHandler.convertToRawDrel(). We can insert print statements after each phase like this:

          System.out.println(RelOptUtil.toString(pruned, SqlExplainLevel.ALL_ATTRIBUTES));
          final RelNode intermediateNode = transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, pruned, logicalTraits);
          ...
          System.out.println(RelOptUtil.toString(transitiveClosureNode, SqlExplainLevel.ALL_ATTRIBUTES));
          intermediateNode2 = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.PARTITION_PRUNING, transitiveClosureNode);

And in convertToPrel():

      System.out.println(RelOptUtil.toString(drel, SqlExplainLevel.ALL_ATTRIBUTES));
      final RelNode relNode = transform(PlannerType.VOLCANO, PlannerPhase.PHYSICAL, drel, traits, false);

If we run our plugin test in the debugger, we see the following output:

DIRECTORY_PRUNING

LogicalProject(a=[$1], b=[$2]): rowcount = 25.0, cumulative cost = {150.0 rows, 251.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 27
  LogicalFilter(condition=[OR(=($1, 'bar'), =($1, 'foo'))]): rowcount = 25.0, cumulative cost = {125.0 rows, 201.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25
    EnumerableTableScan(table=[[dummy, myTable]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3

LOGICAL

DrillFilterRel(condition=[OR(=($0, 'bar'), =($0, 'foo'))]): rowcount = 2500.0, cumulative cost = {20000.0 rows, 70000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 55
  DrillScanRel(table=[[dummy, myTable]], groupscan=[DummyGroupScan [user="dummy", columns=[`a`, `b`], scanSpec=DummyScanSpec [table="myTable"]]]): rowcount = 10000.0, cumulative cost = {10000.0 rows, 20000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 47

PARTITION_PRUNING
JOIN_PLANNING
ROWKEYJOIN_CONVERSION
SUM_CONVERSION

DrillScreenRel: rowcount = 2500.0, cumulative cost = {20250.0 rows, 70250.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 69
  DrillFilterRel(condition=[OR(=($0, 'bar'), =($0, 'foo'))]): rowcount = 2500.0, cumulative cost = {20000.0 rows, 70000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 67
    DrillScanRel(table=[[dummy, myTable]], groupscan=[DummyGroupScan [user="dummy", columns=[`a`, `b`], scanSpec=DummyScanSpec [table="myTable"]]]): rowcount = 10000.0, cumulative cost = {10000.0 rows, 20000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 47

PHYSICAL

Looking at this, we see that the plan at the directory DIRECTORY_PRUNING is in pure Calcite form: our GroupScan has not yet been created, we only see our "scan spec." However, at PARTITION_PRUNING, our GroupScan is available to help with the filter push-down work. By the time few get to the PHYSICAL stage, all the plan nodes have been converted to the Drill logical form.

Base Filter Push-Down Framework

To make filter push-down easier, the Base framework provides the Calcite rules common to most use cases. Your code provides a "listener" called with a candidate "predicate" (Boolean expression) is found in the query. Your listener can accept or reject the predicate. If accepted (is eligible for filter push-down), your listener creates the group scan state needed for the filter, and can optimally remove the filter from the query.

Start by creating your implementation:

public class DummyFilterPushDownListener implements FilterPushDownListener {

  private final ExampleStoragePluginConfig config;

  public DummyFilterPushDownListener(ExampleStoragePluginConfig config) {
    this.config = config;
  }

  public static Set<StoragePluginOptimizerRule> rulesFor(
      OptimizerRulesContext optimizerRulesContext,
      ExampleStoragePluginConfig config) {
    return FilterPushDownStrategy.rulesFor(optimizerRulesContext,
        new DummyFilterPushDownListener(config));
  }

  @Override
  public String prefix() { return "Example"; }

  @Override
  public boolean isTargetScan(GroupScan groupScan) {
    return groupScan instanceof ExampleGroupScan;
  }
}

Here we are holding onto our plugin config. You might choose to store other state, or none at all. The prefix is a name that shows up in rules. We tell the framework that we only care about our scan, ExampleGroupScan.

Prevent Multiple Applications

For unclear reasons, Calcite will rerun the same set of filter push-down rules on the same scan multiple times. To avoid problems we will reject all but the first attempt:

  @Override
  public boolean needsApplication(GroupScan groupScan) {
    ExampleGroupScan exampleScan Scan = (ExampleGroupScan) groupScan;
    return ! exampleScan Scan.hasFilters();
  }

Here, hasFilters() uses some technique to determine if we have already done filter push-down. A simple flag is often adequate.

Identifying Candidate Filters

SQL is a powerful language and provides many different kinds of binary expressions. The framework picks out those of the form:

<col name> <relop> <const value>

(The filter also handles the reverse, with the column name on the right. Such expressions are rewritten to the above form.) The framework will ignore things like functions, math expressions and so on. That is, in the example at the top, a = 10 is a candidate, but myudf(b, 3) = 'foo' is not.

The <relop> is any of the binary or unary binary operators: =, !=, <, ..., is null and so on. However, not all storage plugins can handle all operators. Also, it may be that some values or columns cannot be pushed down.

To handle this, the framework applies a second tier of filtering, allowing our plugin to accept or reject predicates:

  @Override
  public RelOp accept(GroupScan groupScan, RelOp relOp) {
    ExampleGroupScan exampleScan = (ExampleGroupScan) groupScan;
    return exampleScan.acceptFilter(relOp);
  }

Suppose our scan can handle only equality operations, if so, then, in the scan, we might implement the acceptFilter() method as:

  public acceptFilter(RelOp relOp) {
    return relOp.op == RelOp.Op.EQ ? relOp : null;
  }

We return a RelOp instance if we accept the predicate, null if we do not.

We can extend the above by, say, accepting only columns foo and bar for pushdown, rejecting others.

Value Verification and Conversion

The RelOp also includes a constant value, along with the type which Drill inferred for that constant. For a = 10, the type is MinorType.INT and the value is an Integer of value 10.

Suppose that our filter only knows how to deal with strings (VARCHAR). We can reject other types:

  public acceptFilter(RelOp relOp) {
    if (relOp.op != RelOp.Op.EQ) { return null; }
    if (relOp.value.type != MinorType.VARCHAR) { return null; }
    return relOp;
  }

Now we see why the function returns a RelOp. Rather than rejecting the predicate if there is a type mismatch, we could convert the value to the type we want:

  public acceptFilter(RelOp relOp) {
    if (relOp.op != RelOp.Op.EQ) { return null; }
    if (relOp.value.type != MinorType.VARCHAR) {
      return relOp.normalize(
        new ConstantHolder(MinorType.VARCHAR, relOp.value.value.toString());
    }
    return relOp;
  }

We can also rewrite the column name (convert to our preferred case, say.)

The rewritten relop will be used only by our code; it will not (normally) be changed in the actual query.

Stub Push-Down Implementation

For now, let's provide just a stub of the actual filter push-down:

  @Override
  public Pair<GroupScan, List<RexNode>> transform(GroupScan groupScan,
      List<Pair<RexNode, RelOp>> andTerms, Pair<RexNode, DisjunctionFilterSpec> orTerm) {
    return null;
  }

Pass the Rule to Calcite

Now that our class works, we can tell Calcite to use our new rule by adding a method to our storage plugin:

  @Override
  public Set<? extends StoragePluginOptimizerRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
    if (phase.isFilterPushDownPhase()) {
      return EampleFilterPushDownListener.rulesFor(optimizerContext, config);
    }
    return ImmutableSet.of();
  }

This says that we'll use our filter push-down rules during logical planning, but we have no rules to offer for any other planning phase.

At this point, Calcite will call our rule (you can set a breakpoint in the methods and run your test), but the rule itself does nothing yet because of the stub transform() method.

Perform the Actual Filter Push-Down

The boilerplate is done, it is time to actually implement the filter push-down. The logic here is unique to each plugin. But, we can discuss the general rules.

Handle CNF Predicates

The first condition to consider is a series of AND statements (CNF). Maybe:

SELECT ... WHERE foo=10 AND bar='fred'

These are conditions we want to apply on each scan: maybe we pass these as parameters to an external system, maybe they tell us which parts of the external system to call, etc. In either case, we want to translate the RelOp form to the form we need. (In simple cases, you can even just use the RelOp form: it is designed for JSON serialization in the SubScan.)

Then, we have to decide if we want to keep the filters in the query, or remove them. Let's suppose we want to remove them. For now, let's assume we cannot handle OR (DNF) conditions.

The transform() method takes the set of CNF and DNF terms, and returns a new (rewritten) group scan, and a set of predicates to be retained in the query. The predicates are in the original Calcite form. As a result, the incoming arguments are correlated lists of Calcite nodes and our converted RelOp nodes.

Our implementation would be:

  @Override
  public Pair<GroupScan, List<RexNode>> transform(GroupScan groupScan,
      List<Pair<RexNode, RelOp>> andTerms, Pair<RexNode, DisjunctionFilterSpec> orTerm) {

    // Pick out the and predicates
    List<RelOp> andExprs;
    if (andTerms == null || andTerms.isEmpty()) {
      andExprs = null;
    } else {
      andExprs = andTerms.stream().map(t -> t.right).collect(Collectors.toList());
    }

    // Create a filter spec to hold these terms
    FilterSpec filters = FilterSpec.build(andExprs, null);

    // Build a new scan with the filters
    ExampleGroupScan exampleScan = (ExampleGroupScan) groupScan;
    GroupScan newScan = new ExampleGroupScan(exampleScan, filters);

    // Retain the OR term expressions (we don't use them)
    List<RexNode> exprs;
    if (orTerm != null) {
      exprs.add(orTerm.left);
    }
    return Pair.of(newScan, exprs);
  }

Here, we use the FilterSpec class. The group scan constructor "does the right thing" for our plugin with the filters. (In the simplest case, we just hold onto them.)

We would then pass these filters to each of the SubScan operators in whatever form our plugin needs.

Handle DNF Predicates

While AND terms are applied to every sub scan, OR terms can be used to create multiple scan segments. Suppose we have:

SELECT ... FROM example.myTable WHERE a = 10 OR a = 20

Or, equivalently:

SELECT ... FROM example.myTable WHERE a IN (10, 20)

We can interpret the above as two scans: one where we scan just data where a = 10 and another where we scan where a = 20. If a is, say, a REST parameter, we can handle this via two different scan "segments" that represent two different REST calls.

If this makes sense for our plugin, we can retain the OR conditions by modifying the above method:

  @Override
  public Pair<GroupScan, List<RexNode>> transform(GroupScan groupScan,
      List<Pair<RexNode, RelOp>> andTerms, Pair<RexNode, DisjunctionFilterSpec> orTerm) {

    // Gather AND terms
    List<RelOp> andExprs;
    if (andTerms == null || andTerms.isEmpty()) {
      andExprs = null;
    } else {
      andExprs = andTerms.stream().map(t -> t.right).collect(Collectors.toList());
    }

    // Gather OR terms
    DisjunctionFilterSpec orExprs;
    if (orTerm == null) {
      orExprs = null;
    } else {
      orExprs = orTerm.right;
    }
    FilterSpec filters = FilterSpec.build(andExprs, orExprs);

    // Build a new scan with the filters
    ExampleGroupScan exampleScan = (ExampleGroupScan) groupScan;
    GroupScan newScan = new ExampleGroupScan(exampleScan, filters);

    return Pair.of(newScan, null);
  }

The above says we can handle both the AND and OR terms. (We could, if needed, pick and choose which terms to remove from the query.)

Note that, if we have both AND and OR terms, we must apply all the AND terms to each scan segment, but apply one OR term to any one scan segment.

Test

The filter push-down is now complete. (You must still implement your plugin-specific logic.) Even without that, you can test the above and see the filters pushed into your group scan. (Remember that we've removed the filters from the query, so if your plugin does not yet handle them, they will just be dropped on the floor, which is probably not what we want.)

Interlude, Visualizing a Query Rewrite

Earlier we explained how to insert print statements to visualize a query during planning. If we've left those debugging print statements in place, we can see the effect of a filter push-down. Before filter push-down:

DrillFilterRel(condition=[OR(=($0, 'bar'), =($0, 'foo'))]): ...
  DrillScanRel(table=[[dummy2, myTable]],
      groupscan=[DummyGroupScan [user="dummy", columns=[`a`, `b`],
      scanSpec=DummyScanSpec [table="myTable"]]]): ...

After filter push-down:

DrillScanRel(table=[[dummy2, myTable]],
    groupscan=[DummyGroupScan [user="dummy", columns=[`a`, `b`],
    scanSpec=DummyScanSpec [table="myTable"],
    orFilters=DisjunctionFilterSpec [column="a", type=VARCHAR, values=[bar, foo]]]]): ...

Notice that, for this query, the filter node was removed as all predicates where pushed into the scan.

References

Clone this wiki locally