Skip to content

pinax-network/substreams-sink-sql

Repository files navigation

Substreams Sink SQL

The Substreams:SQL sink helps you quickly and easily sync Substreams modules to a PostgreSQL or Clickhouse database.

Quickstart

  1. Install substreams-sink-sql by using the pre-built binary release available in the releases page. Extract substreams-sink-sql binary into a folder and ensure this folder is referenced globally via your PATH environment variable.

    Note Or install from source directly go install github.com/streamingfast/substreams-sink-sql/cmd/substreams-sink-sql@latest.

  2. Compile the Substreams tutorial project:

    cd docs/tutorial
    cargo build --target wasm32-unknown-unknown --release
    cd ../..

    This creates the following WASM file: target/wasm32-unknown-unknown/release/substreams_postgresql_sink_tutorial.wasm

  3. Observe the "Sink Config" section of the substreams manifest in the tutorial, changing the DSN if needed.

    sink:
      module: blockmeta:db_out
      type: sf.substreams.sink.sql.v1.Service
      config:
        schema: "./schema.sql"
  4. Start Docker Compose in the background:

    Note Feel free to skip this step if you already have a running Postgres instance accessible

    # from the root of this repository
    rm -rf ./devel/data/postgres # clean up previous data
    docker-compose up -d

    Note You now have a postgres instance accessible at postgres://dev-node:insecure-change-me-in-prod@postgres:5432/dev-node?sslmode=disable > Note You also have a clickhouse instance accessible at clickhouse://default:default@localhost:9000/default

  5. Run the setup command:

    # the passwords come from the default config in `docker-compose.yml`
    export DSN="postgres://dev-node:insecure-change-me-in-prod@localhost:5432/dev-node?sslmode=disable"
    #export DSN="clickhouse://default:default@localhost:9000/default"
    substreams-sink-sql setup $DSN docs/tutorial/substreams.yaml

    This will connect to the database and create the schema, using the values from sink.config.schema

    Note For the sake of idempotency, we recommend that the schema file only contain create (...) if not exists statements.

  6. Run the sink

    Now that the code is compiled and the databse is set up, let launch the sink process.

    Note To connect to Substreams you will need an authentication token, follow this guide to obtain one. Note This will connect to the mainnet.eth.streamingfast.io:443 endpoint, because it is the default endpoint for the mainnet network, defined in docs/tutorial/substreams.yaml. You can change this either by using the endpoint flag -e another.endpoint:443 or by setting the environment variable SUBSTREAMS_ENDPOINTS_CONFIG_MAINNET to that endpoint. The last part of the environment variable is the name of the network in the manifest, in uppercase.

    substreams-sink-sql run \
        $DSN \
        docs/tutorial/substreams.yaml

    Note You can use environment variables for all commands using the syntax SINK_SQL_{COMMAND}_{FLAG}={value} (e.g. SINK_SQL_RUN_UNDO_BUFFER_SIZE=100 is equivalent to substreams-sink-sql run [...] --undo-buffer-size 100)

  7. Tear down your Docker Compose cluster

    # from the root of this repository
    docker-compose down

DSN

DSN stands for Data Source Name (or Database Source Name) and substreams-sink-sql expects a URL input that defines how to connect to the right driver. An example input for Postgres is psql://dev-node:insecure-change-me-in-prod@localhost:5432/dev-node?sslmode=disable which lists hostname, user, password, port and database (with some options) in a single string input.

The URL's scheme is used to determine the driver to use, psql, clickhouse, etc. In the example case above, the picked driver will be Postgres. The generic format of a DSN is of the form:

<scheme>:://<username>:<password>@<hostname>:<port>/<database_name>?<options>

You will find below connection details for each currently supported driver.

Clickhouse

The DSN format for Clickhouse is:

clickhouse://<user>:<password>@<host>:<port>/<dbname>[?<options>]

PostgreSQL

The DSN format for Postgres is:

psql://<user>:<password>@<host>:<port>/<dbname>[?<options>]

Where <options> is URL query parameters in <key>=<value> format, multiple options are separated by & signs. Supported options can be seen on libpq official documentation. The options <user>, <password>, <host> and <dbname> should not be passed in <options> as they are automatically extracted from the DSN URL.

Moreover, the schema option key can be used to select a particular schema within the <dbname> database.

Others

Only psql and clickhouse are supported today, adding support for a new dialect is quite easy:

  • Copy db/dialect_clickhouse.go to a new file db/dialect_<name>.go implementing the right functionality.
  • Update db.driverDialect map to add you dialect (key is the Golang type of your dialect implementation).
  • Update dsn.driverMap map to add DSN -> dialect name mapping, edit the file to accommodate for your specific driver (might not be required)
  • Update Docker Compose to have this dependency auto-started for development purposes
  • Update README and CHANGELOG to add information about the new dialect
  • Open a PR

Output Module

To be accepted by substreams-sink-sql, your module output's type must be a sf.substreams.sink.database.v1.DatabaseChanges message. The Rust crate substreams-data-change contains bindings and helpers to implement it easily. Some project implementing db_out module for reference:

By convention, we name the map module that emits sf.substreams.sink.database.v1.DatabaseChanges output db_out.

Note that using prior versions (0.2.0, 0.1.*) of substreams-database-change, you have to use substreams.database.v1.DatabaseChanges in your substreams.yaml and put the respected version of the spkg in your substreams.yaml

Protobuf models

Advanced Topics

High Throughput Injection

Important

This method will be useful if you insert a lot of data into the database. If the standard ingestion speed satisfy your needs, continue to use it, the steps below are an advanced use case.

The substreams-sink-sql contains a fast injection mechanism for cases where big data needs to be dump into the database. In those cases, it may be preferable to dump every files to CSV and then use COPYFROM to transfer data super quick to Postgres.

The idea is to first dump the Substreams data to CSV files using substreams-sink-sql generate-csv command:

substreams-sink-sql generate-csv "psql://dev-node:insecure-change-me-in-prod@localhost:5432/dev-node?sslmode=disable" --output-dir ./data/tables :14490000

Note

We are using 14490000 as our stop block, pick you stop block close to chain's HEAD or smaller like us to perform an experiment, adjust to your needs.

This will generate block segmented CSV files for each table in your schema inside the folder ./data/tables. Next step is to actually inject those CSV files into your database. You can use psql and inject directly with it.

We offer substreams-sink-sql inject-csv command as a convenience. It's a per table invocation but feel free to run each table concurrently, your are bound by your database as this point, so it's up to you to decide you much concurrency you want to use. Here a small Bash command to loop through all tables and inject them all

for i in `ls ./data/tables | grep -v state.yaml`; do \
  substreams-sink-sql inject-csv "psql://dev-node:insecure-change-me-in-prod@localhost:5432/dev-node?sslmode=disable" ./data/tables "$i" :14490000; \
  if [[ $? != 0 ]]; then break; fi; \
done

Those files are then inserted in the database efficiently by doing a COPY FROM and reading the data from a network pipe directly.

The command above will also pick up the cursors table injection as it's a standard table to write. The table is a bit special as it contains a single file which is contains the cursor that will handoff between CSV injection and going back to "live" blocks. It's extremely important that you validate that this table has been properly populated. You can do this simply by doing:

substreams-sink-sql tools --dsn="psql://dev-node:insecure-change-me-in-prod@localhost:5432/dev-node?sslmode=disable" cursor read
Module eaf2fc2ea827d6aca3d5fee4ec9af202f3d1b725: Block #14490000 (61bd396f3776f26efc3f73c44e2b8be3b90cc5171facb1f9bdeef9cb5c4fd42a) [cqR8Jx...hxNg==]

This should emit a single line, the Module <hash> should fit the for db_out (check substreams info <spkg> to see your module's hashes) and the block number should fit your last block you written.

Warning

Failure to properly populate will 'cursors' table will make the injection starts from scratch when you will do substreams-sink-sql run to bridge with "live" blocks as no cursor will exist so we will start from scratch.

Once data has been injected and you validated the cursors table, you can then simply start streaming normally using:

substreams-sink-sql run "psql://dev-node:insecure-change-me-in-prod@localhost:5432/dev-node?sslmode=disable" <spkg>

This will start back at the latest block written and will start to handoff streaming to a "live" blocks.

Performance Knobs

When generating the CSV files, optimally choosing the --buffer-max-size configuration value can drastically increase your write throughput locally but even more if your target store is an Amazon S3, Google Cloud Storage or Azure bucket. The flag controls how many bytes of the files is to be held in memory. By having bigger amount of buffered bytes, data is transferred in big chunk to the storage layer leading to improve performance. In lots of cases, the full file can be held in memory leading to a single "upload" call being performed having even better performance.

When choosing this value you should consider 2 things:

  • One buffer exist by table in your schema, so if there is 12 tables and you have a 128 MiB buffer, you could have up to 1.536 GiB (128 MiB * 12) of RAM allocated to those buffers.
  • Amount of RAM you want to allocate.

Let's take a container that is going to have 8 GiB of RAM. We suggest leaving 512 MiB for other part of the generate-csv tasks, which mean we could dedicated 7.488 GiB to buffering. If your schema has 10 tables, you should use --buffer-max-size=785173709 (7.488 GiB / 10 = 748.8 MiB = 785173709).