Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SqliteObserver and use it to collect stats related to queries and db size #2707

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ wd_cc_library(
":trace",
":worker-interface",
"//src/workerd/jsg:observer",
"//src/workerd/util:sqlite",
],
)

Expand Down
3 changes: 2 additions & 1 deletion src/workerd/io/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <workerd/io/trace.h>
#include <workerd/io/features.capnp.h>
#include <workerd/jsg/observer.h>
#include <workerd/util/sqlite.h>

namespace workerd {

Expand Down Expand Up @@ -237,7 +238,7 @@ class WorkerObserver: public kj::AtomicRefcounted {
virtual void teardownFinished() {}
};

class ActorObserver: public kj::Refcounted {
class ActorObserver: public kj::Refcounted, public SqliteObserver {
public:
// Allows the observer to run in the background, periodically making observations. Owner must
// call this and store the promise. `limitEnforcer` is used to collect CPU usage metrics, it
Expand Down
3 changes: 2 additions & 1 deletion src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3224,7 +3224,8 @@ struct Worker::Actor::Impl {
transient.emplace(js, js.obj());
}

actorCache = makeActorCache(self.worker->getIsolate().impl->actorCacheLru, outputGate, hooks);
actorCache = makeActorCache(
self.worker->getIsolate().impl->actorCacheLru, outputGate, hooks, *metrics);
});
}
};
Expand Down
7 changes: 5 additions & 2 deletions src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,11 @@ class Worker::Actor final: public kj::Refcounted {
// Callback which constructs the `ActorCacheInterface` instance (if any) for the Actor. This
// can be used to customize the storage implementation. This will be called synchronously in
// the constructor.
using MakeActorCacheFunc = kj::Function<kj::Maybe<kj::Own<ActorCacheInterface>>(
const ActorCache::SharedLru& sharedLru, OutputGate& outputGate, ActorCache::Hooks& hooks)>;
using MakeActorCacheFunc =
kj::Function<kj::Maybe<kj::Own<ActorCacheInterface>>(const ActorCache::SharedLru& sharedLru,
OutputGate& outputGate,
ActorCache::Hooks& hooks,
SqliteObserver& sqliteObserver)>;

// Callback which constructs the `DurableObjectStorage` instance for an actor. This can be used
// to customize the JavaScript API.
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1924,7 +1924,7 @@ public:
auto& channels = KJ_ASSERT_NONNULL(service.ioChannels.tryGet<LinkedIoChannels>());

auto makeActorCache = [&](const ActorCache::SharedLru& sharedLru, OutputGate& outputGate,
ActorCache::Hooks& hooks) {
ActorCache::Hooks& hooks, SqliteObserver& sqliteObserver) {
return config.tryGet<Durable>().map(
[&](const Durable& d) -> kj::Own<ActorCacheInterface> {
KJ_IF_SOME(as, channels.actorStorage) {
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/tests/test-fixture.c++
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ TestFixture::TestFixture(SetupParams&& params)
KJ_IF_SOME(id, params.actorId) {
worker->runInLockScope(Worker::Lock::TakeSynchronously(kj::none), [&](Worker::Lock& lock) {
auto makeActorCache = [](const ActorCache::SharedLru& sharedLru, OutputGate& outputGate,
ActorCache::Hooks& hooks) {
ActorCache::Hooks& hooks, SqliteObserver& sqliteObserver) {
return kj::heap<ActorCache>(
kj::heap<server::EmptyReadOnlyActorStorageImpl>(), sharedLru, outputGate, hooks);
};
Expand Down
70 changes: 70 additions & 0 deletions src/workerd/util/sqlite-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -782,5 +782,75 @@ KJ_TEST("reset database") {
}
}

KJ_TEST("SQLite observer addQueryStats") {
class TestSqliteObserver: public SqliteObserver {
public:
void addQueryStats(uint64_t read, uint64_t written) override {
rowsRead += read;
rowsWritten += written;
}

uint64_t rowsRead = 0;
uint64_t rowsWritten = 0;
};

TempDirOnDisk dir;
SqliteDatabase::Vfs vfs(*dir);
TestSqliteObserver sqliteObserver = TestSqliteObserver();
SqliteDatabase db(
vfs, kj::Path({"foo"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY, sqliteObserver);

db.run(R"(
CREATE TABLE things (
id INTEGER PRIMARY KEY
);
)");

// There are some rows read and written when we create the db, we offset this in the test
int rowsReadBefore = sqliteObserver.rowsRead;
int rowsWrittenBefore = sqliteObserver.rowsWritten;
constexpr int dbRowCount = 3;
{
db.run("INSERT INTO things (id) VALUES (10)");
db.run("INSERT INTO things (id) VALUES (11)");
db.run("INSERT INTO things (id) VALUES (12)");
}
KJ_EXPECT(sqliteObserver.rowsRead - rowsReadBefore == dbRowCount);
justin-mp marked this conversation as resolved.
Show resolved Hide resolved
KJ_EXPECT(sqliteObserver.rowsWritten - rowsWrittenBefore == dbRowCount);

rowsReadBefore = sqliteObserver.rowsRead;
rowsWrittenBefore = sqliteObserver.rowsWritten;
{
auto getCount = db.prepare("SELECT COUNT(*) FROM things");
KJ_EXPECT(getCount.run().getInt(0) == dbRowCount);
}
KJ_EXPECT(sqliteObserver.rowsRead - rowsReadBefore == dbRowCount);
KJ_EXPECT(sqliteObserver.rowsWritten - rowsWrittenBefore == 0);

// Verify if addQueryStats works correctly when we call query.nextRow()
rowsReadBefore = sqliteObserver.rowsRead;
rowsWrittenBefore = sqliteObserver.rowsWritten;
{
auto stmt = db.prepare("SELECT * FROM things");
auto query = stmt.run();
KJ_ASSERT(!query.isDone());
while (!query.isDone()) {
query.nextRow();
}
}
KJ_EXPECT(sqliteObserver.rowsRead - rowsReadBefore == dbRowCount);
KJ_EXPECT(sqliteObserver.rowsWritten - rowsWrittenBefore == 0);

// Verify addQueryStats works correctly when db is reset
rowsReadBefore = sqliteObserver.rowsRead;
rowsWrittenBefore = sqliteObserver.rowsWritten;
{
auto query = db.run("INSERT INTO things (id) VALUES (100)");
db.reset();
}
KJ_EXPECT(sqliteObserver.rowsRead - rowsReadBefore == 1);
KJ_EXPECT(sqliteObserver.rowsWritten - rowsWrittenBefore == 1);
}

} // namespace
} // namespace workerd
18 changes: 16 additions & 2 deletions src/workerd/util/sqlite.c++
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,18 @@ static constexpr PragmaInfo ALLOWED_PRAGMAS[] = {{"data_version"_kj, PragmaSigna

// =======================================================================================

SqliteObserver SqliteObserver::DEFAULT = SqliteObserver{};

constexpr SqliteDatabase::Regulator SqliteDatabase::TRUSTED;

SqliteDatabase::SqliteDatabase(const Vfs& vfs, kj::Path path, kj::Maybe<kj::WriteMode> maybeMode)
SqliteDatabase::SqliteDatabase(const Vfs& vfs,
kj::Path path,
kj::Maybe<kj::WriteMode> maybeMode,
SqliteObserver& sqliteObserver)
: vfs(vfs),
path(kj::mv(path)),
readOnly(maybeMode == kj::none) {
readOnly(maybeMode == kj::none),
sqliteObserver(sqliteObserver) {
init(maybeMode);
}

Expand Down Expand Up @@ -979,6 +985,9 @@ SqliteDatabase::Query::Query(SqliteDatabase& db,
}

SqliteDatabase::Query::~Query() noexcept(false) {
//Update the db stats that we have collected for the query
db.sqliteObserver.addQueryStats(rowsRead, rowsWritten);

// We only need to reset the statement if we don't own it. If we own it, it's about to be
// destroyed anyway.
if (ownStatement.get() == nullptr) {
Expand Down Expand Up @@ -1096,6 +1105,11 @@ void SqliteDatabase::Query::nextRow() {
db.currentRegulator = regulator;

int err = sqlite3_step(statement);
// TODO(perf): This is slightly inefficient to call for every row read, but not bad enough to
// fix it immediately. The alternate way would be to getRowsRead/Written once when we emit it
// in the Dtor, and handle the case where the statement could be null when the Query gets destructed
rowsRead = getRowsRead();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems inefficient as we're querying SQLite for rows read/written counts on every single row just in case the database is later resent before the query is destroyed -- which is very rare.

Instead, how about putting code into Query::beforeSqliteReset() that posts the metrics early (calls addQueryStats directly), and then have the destructor only post metrics if it hasn't been reset (i.e. maybeStatement is non-null).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did try that approach first. The problem with that comes in the way we do getRowsRead() and the way the Statement and Query are setup. There are a couple of corner cases that have to be handled - when the Query has its own statement vs passed in a statement, handling the beforeSqliteReset in both Query and Statement.
In the end, I decided to go this route as it is easy to reason about.

As far as inefficiency in querying Sqlite for rows read/written, looking at the implementation of getRowsRead/Written, it does a sqlite3_stmt_status(...) which does a pointer read, so its not as inefficient as we think it is

Copy link
Contributor Author

@shrima-cf shrima-cf Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a todo with more details

rowsWritten = getRowsWritten();
if (err == SQLITE_DONE) {
done = true;
} else if (err != SQLITE_ROW) {
Expand Down
22 changes: 21 additions & 1 deletion src/workerd/util/sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ namespace workerd {
using kj::byte;
using kj::uint;

// Used to collect periodic metrics about queries and size of sqlite db
class SqliteObserver {
public:
virtual void addQueryStats(uint64_t rowsRead, uint64_t rowsWritten) {}
// The method is not used by the SqliteDatabase, it is added here for convenience
virtual void setSqliteStoredBytes(uint64_t sqliteStoredBytes) {}
shrima-cf marked this conversation as resolved.
Show resolved Hide resolved

static SqliteObserver DEFAULT;
shrima-cf marked this conversation as resolved.
Show resolved Hide resolved
};

// C++/KJ API for SQLite.
//
// In addition to providing a more modern C++ interface vs. the classic C API, this API layers
Expand All @@ -43,7 +53,10 @@ class SqliteDatabase {
uint64_t statementCount;
};

SqliteDatabase(const Vfs& vfs, kj::Path path, kj::Maybe<kj::WriteMode> maybeMode = kj::none);
SqliteDatabase(const Vfs& vfs,
kj::Path path,
kj::Maybe<kj::WriteMode> maybeMode = kj::none,
SqliteObserver& sqliteObserver = SqliteObserver::DEFAULT);
~SqliteDatabase() noexcept(false);
KJ_DISALLOW_COPY_AND_MOVE(SqliteDatabase);

Expand Down Expand Up @@ -209,6 +222,7 @@ class SqliteDatabase {
const Vfs& vfs;
kj::Path path;
bool readOnly;
SqliteObserver& sqliteObserver;
shrima-cf marked this conversation as resolved.
Show resolved Hide resolved

// This pointer can be left null if a call to reset() failed to re-open the database.
kj::Maybe<sqlite3&> maybeDb;
Expand Down Expand Up @@ -390,6 +404,12 @@ class SqliteDatabase::Query final: private ResetListener {
kj::Maybe<sqlite3_stmt&> maybeStatement; // null if database was reset
bool done = false;

// Storing the rowsRead and rowsWritten here to use in cases where a DB is reset.
// When the DB is reset, getRowdRead and getRowsWritten will fail as the statement they
// refer to gets destroyed as part of the reset process.
uint64_t rowsRead = 0;
uint64_t rowsWritten = 0;

friend class SqliteDatabase;

Query(SqliteDatabase& db,
Expand Down
Loading