Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make infer role configurable and fix double parse bug #533

Merged
merged 7 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .circleci/pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ default_role = "any"
# we'll direct it to the primary.
query_parser_enabled = true

# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
infer_role_from_query = true
Copy link
Contributor

@levkk levkk Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call it something better like query_parser_read_write_splitting or something less "pgcat"-internals specific and more end-user oriented? I'm open to ideas.


# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitely selected with our custom protocol.
Expand Down Expand Up @@ -134,6 +138,7 @@ database = "shard2"
pool_mode = "session"
default_role = "primary"
query_parser_enabled = true
infer_role_from_query = true
primary_reads_enabled = true
sharding_function = "pg_bigint_hash"

Expand Down
4 changes: 4 additions & 0 deletions examples/docker/pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ default_role = "any"
# we'll direct it to the primary.
query_parser_enabled = true

# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
infer_role_from_query = true

# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitly selected with our custom protocol.
Expand Down
4 changes: 4 additions & 0 deletions pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ default_role = "any"
# we'll direct it to the primary.
query_parser_enabled = true

# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
infer_role_from_query = true

# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitly selected with our custom protocol.
Expand Down
42 changes: 27 additions & 15 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,21 @@ where
message_result = read_message(&mut self.read) => message_result?
};

// Handle admin database queries.
if self.admin {
debug!("Handling admin command");
handle_admin(&mut self.write, message, self.client_server_map.clone()).await?;
continue;
}

// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = self.get_pool().await?;
query_router.update_pool_settings(pool.settings.clone());

let mut initial_parsed_ast = None;

match message[0] as char {
// Buffer extended protocol messages even if we do not have
// a server connection yet. Hopefully, when we get the S message
Expand Down Expand Up @@ -859,6 +874,8 @@ where
};

let _ = query_router.infer(&ast);

initial_parsed_ast = Some(ast);
}
}
}
Expand Down Expand Up @@ -922,13 +939,6 @@ where
_ => (),
}

// Handle admin database queries.
if self.admin {
debug!("Handling admin command");
handle_admin(&mut self.write, message, self.client_server_map.clone()).await?;
continue;
}

// Check on plugin results.
match plugin_output {
Some(PluginOutput::Deny(error)) => {
Expand All @@ -941,11 +951,6 @@ where
_ => (),
};

// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = self.get_pool().await?;

// Check if the pool is paused and wait until it's resumed.
if pool.wait_paused().await {
// Refresh pool information, something might have changed.
Expand Down Expand Up @@ -1165,6 +1170,9 @@ where
None => {
trace!("Waiting for message inside transaction or in session mode");

// This is not an initial message so discard the initial_parsed_ast
initial_parsed_ast.take();

match tokio::time::timeout(
idle_client_timeout_duration,
read_message(&mut self.read),
Expand Down Expand Up @@ -1221,7 +1229,13 @@ where
// Query
'Q' => {
if query_router.query_parser_enabled() {
if let Ok(ast) = QueryRouter::parse(&message) {
// We don't want to parse again if we already parsed it as the initial message
let ast = match initial_parsed_ast {
Some(_) => Ok(initial_parsed_ast.take().unwrap()),
None => QueryRouter::parse(&message),
};

if let Ok(ast) = ast {
let plugin_result = query_router.execute_plugins(&ast).await;

match plugin_result {
Expand All @@ -1237,8 +1251,6 @@ where

_ => (),
};

let _ = query_router.infer(&ast);
}
}
debug!("Sending query to server");
Expand Down
22 changes: 22 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,9 @@ pub struct Pool {
#[serde(default)] // False
pub query_parser_enabled: bool,

#[serde(default)] // False
pub infer_role_from_query: bool,

#[serde(default)] // False
pub primary_reads_enabled: bool,

Expand Down Expand Up @@ -627,6 +630,16 @@ impl Pool {
}
}

if self.infer_role_from_query && !self.query_parser_enabled {
error!("infer_role_from_query is only valid when query_parser_enabled is true");
return Err(Error::BadConfig);
}

if self.plugins.is_some() && !self.query_parser_enabled {
error!("plugins are only valid when query_parser_enabled is true");
return Err(Error::BadConfig);
}

self.automatic_sharding_key = match &self.automatic_sharding_key {
Some(key) => {
// No quotes in the key so we don't have to compare quoted
Expand Down Expand Up @@ -663,6 +676,7 @@ impl Default for Pool {
users: BTreeMap::default(),
default_role: String::from("any"),
query_parser_enabled: false,
infer_role_from_query: false,
primary_reads_enabled: false,
sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: None,
Expand Down Expand Up @@ -914,6 +928,10 @@ impl From<&Config> for std::collections::HashMap<String, String> {
format!("pools.{}.query_parser_enabled", pool_name),
pool.query_parser_enabled.to_string(),
),
(
format!("pools.{}.infer_role_from_query", pool_name),
pool.infer_role_from_query.to_string(),
),
(
format!("pools.{}.default_role", pool_name),
pool.default_role.clone(),
Expand Down Expand Up @@ -1096,6 +1114,10 @@ impl Config {
"[pool: {}] Query router: {}",
pool_name, pool_config.query_parser_enabled
);
info!(
"[pool: {}] Infer role from query: {}",
pool_name, pool_config.infer_role_from_query
);
info!(
"[pool: {}] Number of shards: {}",
pool_name,
Expand Down
5 changes: 5 additions & 0 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ pub struct PoolSettings {
// Enable/disable query parser.
pub query_parser_enabled: bool,

// Infer role
pub infer_role_from_query: bool,

// Read from the primary as well or not.
pub primary_reads_enabled: bool,

Expand Down Expand Up @@ -157,6 +160,7 @@ impl Default for PoolSettings {
db: String::default(),
default_role: None,
query_parser_enabled: false,
infer_role_from_query: false,
primary_reads_enabled: true,
sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: None,
Expand Down Expand Up @@ -456,6 +460,7 @@ impl ConnectionPool {
_ => unreachable!(),
},
query_parser_enabled: pool_config.query_parser_enabled,
infer_role_from_query: pool_config.infer_role_from_query,
primary_reads_enabled: pool_config.primary_reads_enabled,
sharding_function: pool_config.sharding_function,
automatic_sharding_key: pool_config.automatic_sharding_key.clone(),
Expand Down
18 changes: 18 additions & 0 deletions src/query_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ impl QueryRouter {

/// Try to infer which server to connect to based on the contents of the query.
pub fn infer(&mut self, ast: &Vec<sqlparser::ast::Statement>) -> Result<(), Error> {
if !self.pool_settings.infer_role_from_query {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to check for this in the client code than here? I think it's confusing to return nothing from a function based on a setting vs. just not calling the function in the first place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's arguments for both approaches, I kinda like abstracting away things the client has to do so that's why I went this way. Though if you feel we should put that control on the client then happy to change.

return Ok(()); // Nothing to do
}

debug!("Inferring role");

if ast.is_empty() {
Expand Down Expand Up @@ -433,6 +437,10 @@ impl QueryRouter {
/// N.B.: Only supports anonymous prepared statements since we don't
/// keep a cache of them in PgCat.
pub fn infer_shard_from_bind(&mut self, message: &BytesMut) -> bool {
if !self.pool_settings.infer_role_from_query {
return false; // Nothing to do
}

debug!("Parsing bind message");

let mut message_cursor = Cursor::new(message);
Expand Down Expand Up @@ -910,6 +918,7 @@ mod test {
fn test_infer_replica() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.infer_role_from_query = true;
assert!(qr.try_execute_command(&simple_query("SET SERVER ROLE TO 'auto'")) != None);
assert!(qr.query_parser_enabled());

Expand All @@ -934,6 +943,7 @@ mod test {
fn test_infer_primary() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.infer_role_from_query = true;

let queries = vec![
simple_query("UPDATE items SET name = 'pumpkin' WHERE id = 5"),
Expand Down Expand Up @@ -964,6 +974,8 @@ mod test {
fn test_infer_parse_prepared() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.infer_role_from_query = true;

qr.try_execute_command(&simple_query("SET SERVER ROLE TO 'auto'"));
assert!(qr.try_execute_command(&simple_query("SET PRIMARY READS TO off")) != None);

Expand Down Expand Up @@ -1132,6 +1144,8 @@ mod test {
fn test_enable_query_parser() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.infer_role_from_query = true;

let query = simple_query("SET SERVER ROLE TO 'auto'");
assert!(qr.try_execute_command(&simple_query("SET PRIMARY READS TO off")) != None);

Expand Down Expand Up @@ -1164,6 +1178,7 @@ mod test {
user: crate::config::User::default(),
default_role: Some(Role::Replica),
query_parser_enabled: true,
infer_role_from_query: true,
primary_reads_enabled: false,
sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: Some(String::from("test.id")),
Expand Down Expand Up @@ -1239,6 +1254,7 @@ mod test {
user: crate::config::User::default(),
default_role: Some(Role::Replica),
query_parser_enabled: true,
infer_role_from_query: true,
primary_reads_enabled: false,
sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: None,
Expand Down Expand Up @@ -1284,6 +1300,7 @@ mod test {
let mut qr = QueryRouter::new();
qr.pool_settings.automatic_sharding_key = Some("data.id".to_string());
qr.pool_settings.shards = 3;
qr.pool_settings.infer_role_from_query = true;

assert!(qr
.infer(&QueryRouter::parse(&simple_query("SELECT * FROM data WHERE id = 5")).unwrap())
Expand Down Expand Up @@ -1385,6 +1402,7 @@ mod test {
let mut qr = QueryRouter::new();
qr.pool_settings.automatic_sharding_key = Some("data.id".to_string());
qr.pool_settings.shards = 3;
qr.pool_settings.infer_role_from_query = true;

assert!(qr
.infer(&QueryRouter::parse(&simple_query(stmt)).unwrap())
Expand Down
1 change: 1 addition & 0 deletions tests/ruby/helpers/pgcat_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mod
"load_balancing_mode" => lb_mode,
"primary_reads_enabled" => true,
"query_parser_enabled" => true,
"infer_role_from_query" => true,
"automatic_sharding_key" => "data.id",
"sharding_function" => "pg_bigint_hash",
"shards" => {
Expand Down