This post a cross-post, originally available at the Materialize blog.
We all at Materialize are working from home, and while this is all a bit weird and different, it gives me some time to write a bit more and try and show off some of what we have been up to. I hope that this is equally interesting and helpfully distracting!
tl;dr: We'll set up materialized
, fire up some queries against a few gigabytes of data, observe that it could be faster than it is, and then tweak a few settings to make it faster. It may or may not be representative of your experience, but the queries and data came from someone else and the result might be a bit surprising!
Over at the Ballista benchmarks page, author Andy Grove (who also started the SQL parser that Materialize got started with) has framed a relatively simple task to compare his work with Apache Spark: you take the 2018 NYC taxi data, and generally try and hammer it with the following SQL query:
SELECT
passenger_count,
MIN(fare_amount),
MAX(fare_amount)
FROM
tripdata
GROUP BY
passenger_count
In the context of batch computation, which is what Ballista and Spark are about, the goal is to see how fast you can repeat the query, over and over. The systems are allowed to pre-arrange the data into compact columnar representations in order to facilitate maximal throughput. It looks like they get up to repeating the query about 40 times a second, which .. I have no clue. Seems pretty solid to be honest. I can't even type 40 queries per second, so I may not be the target audience.
We are going to do something different.
Materialize is designed to absorb the change streams of your data, and maintain any queries you've expressed over the data. You can very cheaply investigate the results of any maintained query. This has the effect of pivoting the metric of interest from "queries per second", which for Materialize are effectively unbounded, to "updates per second" which we can also think about as "distinct query results per second".
We are going to grab the same data, prepare some views, and then feed the data in and see what sorts of latencies we get when querying it! Even as the data are continually changing!
The easiest way to get Materialized is at materialize.io/download.
By the end of the post, we are going to be using a flag (--logical-compaction-window
) that hasn't made it to the release channel at the time of writing. If you'd like to follow along with that (strictly optional) part of the post, you can download OS X and Linux binaries that contain this flag. You may instead just want to clone the materialize
repository; you'll need Rust for this case. If you head in to your cloned repository directory and type
cargo build --release --bin materialized
then that will start a release build. You'll now have a bunch of time to get some reading done.
Actually, we'll also want to grab the 2018 taxi data from the link up above. That takes a while to download. It's a race between the Rust compile and the several gigs of data to download! If you don't have a great connection you won't need all of the data; you can also just read along for now.
Welcome back!
We are about to start up materialized
and frame some queries. Ideally you've compiled the code in release mode, and have a pile of taxi data in some directory.
You'll need two shells. In the first one, we'll type
shell1% materialized
This starts up materialized
, which is the beating heart of incremental view maintenance. If you built from source, you'll find the materialized
binary in target/release/materialized
.
In a second shell, fire up psql
, which if you don't have you should run and get now (sorry!)
shell2% psql -h localhost -p 6875 -d materialize
This connects an interactive postgres session to materialized
on its default port (and sets up the right initial database).
You should now see something that looks like this:
shell2% psql -h localhost -p 6875 -d materialize
psql (11.5, server 9.5.0)
Type "help" for help.
materialize=>
First thing, let's type \timing
which will enable client-side timing for our queries. That will make it impossible to ignore the latency of each of these operations.
materialize=> \timing
Timing is on.
materialize=>
We are now good to go! Let's see what it looks like to analyze some streaming data!
Optional: I'm going to create a temporary file into which I can fire change streams, using the mkfifo
command:
shell3% mkfifo taxidata.csv
You can also just point Materialize at the raw source files, but this makes it a bit easier to manually stream changes around, without modifying the downloaded data or making substantial copies of it. When I type things into shell3
, much later on, it will be about piping taxi data at this file handle, and you should do that too unless you pointed materialize
at the source taxi data files.
Just to start things out, I'll pipe in the first month of data
shell3% cat data_2018_01.csv > taxidata.csv
which preps the fifo with a fair bit of data that we'll read in just a moment.
Materialize asks you to start by identifying sources of data: places it can go to find data, which it can then stream in at you and present for analysis. You do this with the CREATE SOURCE command. We are going to use a pretty simple one: the local file source:
CREATE SOURCE data_2018
FROM FILE '/Users/mcsherry/taxidata.csv' WITH (tail = true)
FORMAT CSV WITH 17 COLUMNS;
If we copy/paste that in to our shell, we should see something like:
materialize=> CREATE SOURCE data_2018
materialize-> FROM FILE '/Users/mcsherry/taxidata.csv' WITH (tail = true)
materialize-> FORMAT CSV WITH 17 COLUMNS;
CREATE SOURCE
Time: 7.993 ms
materialize=>
There is a bunch of text there, but the long and the short of it is that we are pointing Materialize at a local file and announcing it as CSV formatted. Because we need to produce a relation with a schema, we also need to announce an expected number of columns (we'll drop records without this many columns).
Creating a source doesn't actually pull in any data! We don't know that you actually need all of that data, and there is no point pre-emptively pulling in massive volumes of data you may not require.
With sources defined, the next thing Materialize asks you to do is define queries over those sources. In SQL, these are called "views", and you create them in Materialize with the CREATE VIEW command. If you'd like Materialize to evaluate the contents of the view and maintain it for you as the data change, you use the CREATE MATERIALIZED VIEW command.
In particular, the query we are looking at uses just two fields from the source data: passenger_count
and fare_amount
. Let's define a view that extracts those two (of seventeen) columns, and casts them to the right types.
materialize=> CREATE MATERIALIZED VIEW tripdata AS
materialize-> SELECT
materialize-> cast(column4 AS int) AS passenger_count,
materialize-> cast(column11 AS decimal) AS fare_amount
materialize-> FROM
materialize-> data_2018;
CREATE VIEW
Time: 7.931 ms
materialize=>
The names column4
and column11
are the automatically generated CSV column names; they all have type string because that's really all we know about your comma-separated text, and so the cast
method converts these strings to their intended types. And we give them each a nice name.
This method has a really important keyword in it: MATERIALIZED
. This is where the money is at.
Materialized views are the ones that materialized
will maintain for you (really easy to remember, right?). You can also define views without materializing them, but they will just behave as shorthand for the longer query you've bound to them; nothing will be computed and maintained for you until you materialize something.
With some data materialized, we are now able to query it! You can do this with standard SQL Select statements, like so:
materialize=> SELECT COUNT(*) FROM tripdata;
count
---------
8759875
(1 row)
Time: 796.667 ms
materialize=>
The count you see will depend on which file you've pointed Materialize at, and how promptly it has read the data in. In the example above, I had only introduced at January's data, and I've waited long enough that all of the data have loaded (I'm typing this at the same time!).
You might say "wow, what a fast time to count 8.7 million records", but I bet you didn't. It's not a great time, actually. Don't worry, we'll fix that by the end of the post; we're doing several things that we don't need to be doing.
For the moment, let's get back to that exciting query:
materialize=> SELECT passenger_count, MIN(fare_amount), MAX(fare_amount)
materialize-> FROM tripdata
materialize-> GROUP BY passenger_count;
passenger_count | MIN | MAX
-----------------+------+------
| |
0 | -16 | 557
1 | -450 | 8016
2 | -198 | 700
3 | -100 | 499
4 | -75 | 888
5 | -57 | 237
6 | -52 | 266
7 | 0 | 78
8 | 0 | 88
9 | 5 | 98
(11 rows)
Time: 1741.500 ms (00:01.742)
materialize=>
This is also not an exceptionally fast time. Not that a second or two is terrible, but I happen to know it should be much better. We will get to that!
You could also totally take this query and make it a materialized view. This is one of the neat features of Materialize, that everything just composes.
materialize=> CREATE MATERIALIZED VIEW aggregates AS
materialize-> SELECT passenger_count, MIN(fare_amount), MAX(fare_amount)
materialize-> FROM tripdata
materialize-> GROUP BY passenger_count;
CREATE VIEW
Time: 7.550 ms
materialize=>
And now we can just select out the data, and
materialize=> SELECT * FROM aggregates;
passenger_count | MIN | MAX
-----------------+------+------
| |
0 | -16 | 557
1 | -450 | 8016
2 | -198 | 700
3 | -100 | 499
4 | -75 | 888
5 | -57 | 237
6 | -52 | 266
7 | 0 | 78
8 | 0 | 88
9 | 5 | 98
(11 rows)
Time: 0.669 ms
materialize=>
Well, that was much faster! Apparently things don't have to be slow. Reading that over and over should turn out over one thousand queries per second. That was just to make a point; you don't need to do that.
So we can materialize all of our views and then query them (secret: it took about 1741ms to populate the materialization). But we'd really rather have interactive access to tripdata
! What is the problem here that makes using it slow? Is it that it has 8.7 million rows? (no, it is not).
It turns out there is a great reason that our queries take while. Or at least there is a fairly specific reason that we'll ask materialized
to work around. To see what is going on, we'll need to do a bit of introspection into materialized
's state, which I find super interesting!
First up, why would we expect this to be fast? Should we be able to keep up with optimized high-throughput analytic processors? Not generally, right?
Not generally, but in this case, yes. It turns out there just isn't that much data there, once we define the tripdata
view. Although tripdata
reflects 8.7 million rows, it just doesn't have that many distinct records in it:
materialize=> SELECT COUNT(*) FROM (SELECT DISTINCT * FROM tripdata);
count
-------
1585
(1 row)
Time: 608.168 ms
materialize=>
There are only 1,585 distinct rows in there. Even though they reflect 8.7 million source rows, differential dataflow is smart enough to only track distinct records (attaching a count to each). Something else must be up, because grinding through the 1,585 distinct records just doesn't take that long.
Let's dive in to materialized
's logging data. It all lives in the mz_catalog
namespace:
materialize=> SHOW SOURCES FROM mz_catalog;
SOURCES
---------------------------------
mz_arrangement_sharing
mz_arrangement_sizes
mz_catalog_names
mz_dataflow_channels
mz_dataflow_operator_addresses
mz_dataflow_operators
mz_materialization_dependencies
mz_materialization_frontiers
mz_materializations
mz_peek_active
mz_peek_durations
mz_scheduling_elapsed
mz_scheduling_histogram
mz_scheduling_parks
mz_view_foreign_keys
mz_view_keys
(16 rows)
Time: 0.447 ms
materialize=>
These are all collections that are automatically maintained by the system, which talk about what is going on in the system. For example, we could look at mz_dataflow_operators
to see which dataflow operators are currently installed, and mz_scheduling_histogram
to get a histogram of the execution latencies for each operator. It's actually really cool, and I spend a bunch of time in here trying to track down what is going on in weirdly performing views.
What we actually want to do is dive in to mz_arrangement_sizes
, which is where we maintain information about the volume of state that materialized
is currently sitting on, in the form of differential dataflow arrangements.
materialize=> SELECT * FROM mz_arrangement_sizes ORDER BY records DESC LIMIT 10;
operator | worker | records | batches
----------+--------+---------+---------
789 | 0 | 1227055 | 13
515 | 0 | 1823 | 6
311 | 0 | 1823 | 6
350 | 0 | 882 | 6
370 | 0 | 882 | 6
258 | 0 | 602 | 3
271 | 0 | 602 | 3
180 | 0 | 602 | 3
182 | 0 | 602 | 3
216 | 0 | 602 | 3
(10 rows)
Time: 0.818 ms
materialize=>
Hey first, that was really fast to execute too, right? Things are generally fast in materialized
. And it turns out the reason not everything is fast is staring us in the face.
In the first row up there, some operator 789 is sitting on more than one million records. It turns out that is our tripdata
materialization. For some reason despite only representing 1,585 distinct records, we are sitting on a million plus differential updates. Are these the 8.7 million source records? No, not exactly. They are something else that Materialize keeps around to try and help you out: the last 60 seconds of history of the tripdata
collection.
Optional read: Materialize keeps around some historical data because you may want to join multiple collections together, and rather than ask you to perfectly align the two in time, we can maintain a bit of buffer around them and be certain to give you the correct answer. How large that buffer should be depends a bit on your environment; 60 seconds was a choice we made a while back, and it will probably change as we learn more about the ergonomics and needs of temporal data analysis.
We can totally go back and improve these numbers, with just a bit of tweaking materialized
's configuration parameters.
There are two things we are going to do now. I want you to go back to the first shell and control-C materialized
, then type in:
shell1% DIFFERENTIAL_EAGER_MERGE=1000 materialized --logical-compaction-window 1ms
This spins up materialized
with one new environment variable and one new argument. The environment variable instructs differential dataflow, on which all of this is built, to more assertive merge its underlying data structures, even when there appears to be no justification to do so; this has the effect of filling idle CPU time with maintenance work, which we'll want. The command line option instructs materialized
to maintain less history about each collection; by default we are maintaining the past 60 seconds of history, which for a collection that can chance once a millisecond can be quite a lot of changes; in this case, a single millisecond is enough, because we just want to be able to check out the current state.
If you head back to your other shell, you'll be able to type but you've probably been disconnected. You should reconnect, but you may need to type your query a second time. Because of the magic of materialized
, your sources and views will have their definitions persisted across shutdowns and restarts. We'll still need to re-read the data for you, though.
Give it a moment, and then type something like that COUNT(*)
query again:
materialize=> SELECT COUNT(*) FROM tripdata;
count
---------
8759875
(1 row)
Time: 11.524 ms
materialize=>
Oooo, that is a lot faster. It was 800 milliseconds before. How about run our query again
materialize=> SELECT passenger_count, MIN(fare_amount), MAX(fare_amount)
materialize-> FROM tripdata
materialize-> GROUP BY passenger_count
materialize-> ;
passenger_count | MIN | MAX
-----------------+------+------
| |
0 | -16 | 557
1 | -450 | 8016
2 | -198 | 700
3 | -100 | 499
4 | -75 | 888
5 | -57 | 237
6 | -52 | 266
7 | 0 | 78
8 | 0 | 88
9 | 5 | 98
(11 rows)
Time: 58.558 ms
materialize=>
That seems pretty decent now. At least, a fair bit faster than the second before.
It seems like our diagnosis was correct. We can double check by issuing the diagnostic query from before, checking out the number of records maintained for each operator:
materialize=> SELECT * FROM mz_arrangement_sizes ORDER BY records DESC LIMIT 10;
operator | worker | records | batches
----------+--------+---------+---------
789 | 0 | 1585 | 2
258 | 0 | 602 | 2
271 | 0 | 602 | 2
180 | 0 | 602 | 2
182 | 0 | 602 | 2
216 | 0 | 602 | 2
282 | 0 | 303 | 2
300 | 0 | 303 | 2
223 | 0 | 303 | 2
504 | 0 | 303 | 2
(10 rows)
Time: 0.863 ms
materialize=>
That naughty operator 789 is still here, but its record count has gone down by three orders of magnitude! Actually, it seems to have landed exactly at the number of distinct pairs of values from our 8.7 million records.
We haven't really exercised the streaming aspects of the system yet, but let's do that now that we have things up and running better. There are lots of options here, but I'm just going to use the FIFO file handle I've set up already.
shell3% cat data_2018_02.csv > taxidata.csv
shell3% cat data_2018_03.csv > taxidata.csv
shell3% cat data_2018_04.csv > taxidata.csv
shell3% cat data_2018_05.csv > taxidata.csv
shell3% cat data_2018_06.csv > taxidata.csv
shell3% cat data_2018_07.csv > taxidata.csv
shell3% cat data_2018_08.csv > taxidata.csv
shell3% cat data_2018_09.csv > taxidata.csv
shell3% cat data_2018_10.csv > taxidata.csv
shell3% cat data_2018_11.csv > taxidata.csv
shell3% cat data_2018_12.csv > taxidata.csv
Back in our psql
shell, we can peek at the data as it evolves!
materialize=> select count(*) from tripdata;
count
----------
11973903
(1 row)
Time: 23.611 ms
materialize=>
A little while later
materialize=> select count(*) from tripdata;
count
----------
36617309
(1 row)
Time: 25.061 ms
materialize=>
The data don't load instantaneously, there are some 102 million rows in total, but this is mostly because we are only using a single worker thread. We do care a lot about the single-threaded performance, but you can use as many as you like with the --workers <number>
argument to materialized
.
We'll eventually make our way up to 102 million rows, but along the way we can issue our neat query
materialize=> SELECT passenger_count, MIN(fare_amount), MAX(fare_amount)
materialize-> FROM tripdata
materialize-> GROUP BY passenger_count
materialize-> ;
passenger_count | MIN | MAX
-----------------+------+--------
| |
0 | -90 | 5000
1 | -485 | 234631
2 | -498 | 214748
3 | -498 | 349026
4 | -415 | 888
5 | -75 | 593
6 | -66 | 394
7 | -70 | 140
8 | -89 | 90
9 | 0 | 100
192 | 6 | 6
(12 rows)
Time: 169.186 ms
materialize=>
This takes a bit more time, because the system is actually doing some work, and the data aren't always in their most compact form. Also, who carted 192 passengers around? If we want results even faster, we can probe our materialized view aggregates
:
materialize=> select * from aggregates;
passenger_count | MIN | MAX
-----------------+------+--------
| |
0 | -90 | 5000
1 | -800 | 907070
2 | -498 | 214748
3 | -498 | 349026
4 | -415 | 974
5 | -300 | 1271
6 | -66 | 433
7 | -70 | 140
8 | -89 | 129
9 | 0 | 100
192 | 6 | 6
(12 rows)
Time: 4.563 ms
materialize=>
We eventually reach the full data set, and get what appear to be the correct results, modulo some different opinions on decimal precision (the SQL standard apparently calls for integer precision by default).
materialize=> SELECT COUNT(*) FROM tripdata;
count
-----------
102804262
(1 row)
Time: 12.504 ms
materialize=> SELECT * FROM aggregates;
passenger_count | MIN | MAX
-----------------+------+--------
| |
0 | -90 | 40502
1 | -800 | 907070
2 | -498 | 214748
3 | -498 | 349026
4 | -415 | 974
5 | -300 | 1271
6 | -100 | 433
7 | -70 | 140
8 | -89 | 129
9 | 0 | 110
96 | 6 | 6
192 | 6 | 6
(13 rows)
Time: 0.935 ms
materialize=>
Before departing, let's check out how much state we are tracking for these 102 million rows of taxi data:
materialize=> SELECT * FROM mz_arrangement_sizes ORDER BY records DESC LIMIT 10;
operator | worker | records | batches
----------+--------+---------+---------
789 | 0 | 3105 | 2
258 | 0 | 894 | 2
271 | 0 | 894 | 2
180 | 0 | 894 | 2
182 | 0 | 894 | 2
216 | 0 | 894 | 2
282 | 0 | 448 | 2
300 | 0 | 448 | 2
223 | 0 | 448 | 2
504 | 0 | 448 | 2
(10 rows)
Time: 1.067 ms
materialize=>
We only maintain 3,105 records in differential dataflow's internal state, despite having ground through 102 million input records. This is because once we've projected down to the two relevant columns for the vew, there are only exactly that many distinct settings of the two values:
materialize=> select count(*) from (select distinct * from tripdata);
count
-------
3105
(1 row)
Time: 14.764 ms
materialize=>
By declaring ahead of time the restricted view we want to track, we can do substantially better than stashing the whole relation. In this case, better by about 30,000x, which means that even when we want to do from-scratch computation on these two columns, it takes just a moment.
Maybe you have a few other views you want to track at the same time? They could certainly take more or less space, but by being careful about expressing what you need, you open up the doors to a lot more interactive computation than you might previously have thought available!
I hope this has been a bit of an interesting tour through Materialize!
We looked at a very simple use case, drawn from some other folk's benchmarking interests. Specifically, shuffling data around in a local FIFO, we analyzed some 102 million rows, representing about 9GB of data. Despite that, we managed to stash all of the information in a few thousand records in differential dataflow, all just using vanilla SQL!
There is a lot more to learn about Materialize, especially if you want to use it for something more enterprise-y than moving files around on your laptop. Materialize supports a few other ways to get data in, for example Kafka topics, and we are busily programming away to get other input and output options for you. The plan is that, with enough of these, there will be basically no reason for you not to use Materialize for all of your view maintenance needs!
Check out Materialize now, sign up below for stimulating recurring email content, check out the documentation, or just rush over and grab a copy of the code itself from our repository and start trying it out!