Skip to content
Paul Rogers edited this page Dec 5, 2019 · 4 revisions

Create the Scan Operator for a Storage Plugin

We are now ready to actually read data for our storage plugin. Drill will pass our sub scan from the planner to the execution Drillbit(s) in the form of serialized JSON. Our job is to:

  • Tell Drill how to associate the sub scan with a factory that will create our scan operator.
  • Create the scan operator (which Drill calls a "scan batch").
  • Implement the reader for our data source.

This example uses the Base framework to perform the first two tasks and the enhanced vector framework (EVF) to implement the scan operator. Most existing plugins use the older ScanBatch implementation.

Operator Factory (AKA "Batch Creator")

Drill uses an operator factory (which drill calls a "batch creator") to create the actual execution operator. With the Base plugin framework, all this is done for you. But, it helps to understand how this works in case you want to understand an existing plugin.

(One of the quirks of Drill naming is that Drill uses the term "batch" to refer both to a collection of records, and to the operator which created those records. Thus "a (record) batch creates a series of (vector) batches" is something that makes sense using Drill's terminology, though it is confusing to most of us. Here we use the term "operator" for the thing which creates batches, and "batch" for the collection of records.)

Drill looks for all classes that extend BatchCreator. Then, of all such classes, Drill looks for the one where the first argument to the getBatch() method matches the class of our sub scan. If we were to write this ourselves:

public class ExampleScanBatchCreator implements BatchCreator<ExampleSubScan> {

  @Override
  public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
      ExampleSubScan subScan, List<RecordBatch> children)
      throws ExecutionSetupException {
    Preconditions.checkArgument(children == null || children.isEmpty());
    ExampleStoragePlugin plugin = (ExampleStoragePlugin) context.getStorageRegistry()
        .getPlugin(subScan.getConfig());
    return plugin.createScan(context, subScan);
  }
}

Since we are using the Base framework, we get the following for free:

  • The Base batch creator matches on the BaseSubScan class.
  • Obtains the storage plugin config from the sub scan.
  • Uses the type of the config to find our storage plugin (as explained in an earlier section.)
  • Calls a method on our plugin to create the scan.

In short, if you use the Base framework you need not create a batch creator. However, if you have special needs that go beyond the Base capabilities, you can create your own version.

The Scan Operator

Our goal is to create an operator which will read records. Scan operators have two parts:

  • The Drill Volcano-like operator (called a RecordBatch)
  • One or more readers to implement the actual scan (called a "record reader" in "classic" Drill, and a "batch reader" in the EVF.)

Existing plugins tend to create both. We'll use the Base framework which will create the scan operator for us; we just need to provide the plugin-specific batch reader, and configure the EVF for our needs.

Scan Builder

When using the EVF, we create a "scan framework builder" that sets up various options needed to drive the scan. The Base framework uses those options to create the scan itself. We create a method in our scan factory to build the framework:

public class ExampleStoragePlugin
  extends BaseStoragePlugin<ExampleStoragePluginConfig> {

  private static class ExampleScanFactory extends
        BaseScanFactory<ExampleStoragePlugin, ExampleScanSpec, ExampleGroupScan, ExampleSubScan> {

    ...
    @Override
    public ScanFrameworkBuilder scanBuilder(ExampleStoragePlugin storagePlugin,
        OptionManager options, ExampleSubScan subScan) {
      // Create a generic scan framework builder
      ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
      // Ask the plugin to do some generic setup
      storagePlugin.initFramework(builder, subScan);
      // Build the reader factory
      ReaderFactory readerFactory = new ExampleReaderFactory(storagePlugin.config(), subScan);
      builder.setReaderFactory(readerFactory);
      // Set a custom NULL type
      builder.setNullType(Types.optional(MinorType.VARCHAR));
      // Add error context unique to our plugin
      builder.setContext(
        new ChildErrorContext(builder.errorContext()) {
          @Override
          public void addContext(UserException.Builder builder) {
            builder.addContext("Table:", subScan.scanSpec().tableName());
          }
        });
      return builder;
    }

There is a bit going on here.

  • We create a scan framework builder. Since we are not using DFS, we create a generic scan factory. (If we were using DFS, which has additional features, we'd be creating a format plugin instead.)
  • We let the (base) plugin do some generic setup common to all plugins.
  • The scan must handle non-projected columns. (The query asks for column foo, but the data source does not actually contain that column.) Drill will normally create a nullable INT for this case. Here we decide it makes more sense to create a nullable VARCHAR for such columns.
  • Drill errors are, shall we say, less than easy to understand. We can help the user by providing context for any error messages. In this example we include the table name. This context is automatically applied to any UserError exceptions thrown while running our reader.
  • Our scan operator runs one or more readers. Rather than creating the readers up-front (as is done in "classic" plugins), the EVF asks us to provide a "reader factory" which will create readers on the fly. (This is done to help minimize scan operator resource usage.)

We passed the storage plugin config as part of the scan definition. Drill has given us a context that is a gateway into the Drill engine. We use that to get the storage plugin registry, then use the plugin config to locate our plugin. From there, we'll ask the plugin to create the scan operator.

Test the Operator Factory

We can now verify that our sub scan is properly serialized, sent over the wire, deserialized, and used to match our operator factory. For now, let's comment out the following lines:

      //ReaderFactory readerFactory = new ExampleReaderFactory(storagePlugin.config(), subScan);
      //builder.setReaderFactory(readerFactory);

Set a breakpoint on the following line:

      return builder;

Run the test (which should now include anything needed to get through planning.) If we get to the breakpoint we know we've got all the serialization/deserialization, batch creator and plugin lookup logic working. (For now, executing any further will cause an error.)

Reader Factory

The reader factory class creates on or more readers for each scan. A file scan, for example, normally reads all the files in a directory, or all blocks in a file. Drill distributes the file chunks (called "file work" in Drill) across multiple nodes, with each node running multiple chunks, each resulting in a batch reader. In our case, we will do only one scan:

  private static class ExampleReaderFactory implements ReaderFactory {

    private final ExampleSubScan subScan;
    private int readerCount;

    public SumoReaderFactory(ExampleSubScan subScan) {
      subScan = subScan;
    }

    @Override
    public void bind(ManagedScanFramework framework) { }

    @Override
    public ManagedReader<? extends SchemaNegotiator> next() {
      if (readerCount++ == 0) {
        return new ExampleBatchReader(subScan);
      } else {
        return null;
      }
    }
  }

In a real implementation, you would encode "shards" of work (whatever that means to your plugin) in the sub scan, then iterate over them in the reader factory. If you are creating a REST API against a time-series database, for example, you might break a scan over 15 minutes into five scans over three-minute intervals. Each would be represented as one record reader created above.

Batch Reader

Construction of an EVF-based batch reader was already described elsewhere, the work is the same whether the reader is for a format plugin or a storage plugin. The key difference is that here we use the generic ManagedScanFramework, not the FileScanFramework used for reading files.

We have designed the above steps so that the information we need is available in the storage plugin config, the storage plugin, or in the sub scan. For example, if we are calling a REST service, then the endpoint and any credentials should appear in the plugin config, while the table name and columns should tell us what to query.

Test

You can now run a full test (perhaps still using hard-coded settings for some details). Ensure your test represents a valid query and that the proper plugin config properties are set in your test. Remember to uncomment the lines we commented out above. Then, run the test and step through your reader. Verify the results by printing them:

  @Test
  public void testExampleQuery() throws Exception {
    String sql = "SELECT a, b, c\n" +
                  "FROM example.mTable\n" +
                  "LIMIT 10";
    System.out.println(sql);
    RowSet result = queryBuilder().sql(sql).rowSet();
    result.print();

    result.clear();
  }

The above limits results to 10 lines, gets the first (non-empty) batch of records, prints them, then releases the memory for the underlying value vectors.

If you get the data you expect then congratulations: you now have a complete working (if limited) storage plugin.

If not, we know the problem must be in the reader since we did tests earlier to verify the steps up to that point.

By now you should be somewhat familiar with the basic concepts of a storage plugin. You are ready to dive into some advanced topics we glossed over until now.

Clone this wiki locally