Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix formatting #299

Merged
merged 1 commit into from
Jan 28, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 134 additions & 133 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,103 +123,105 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

runtime.block_on(async move {

if let Some(true) = config.general.enable_prometheus_exporter {
if let Some(true) = config.general.enable_prometheus_exporter {
let http_addr_str = format!(
"{}:{}",
config.general.host, config.general.prometheus_exporter_port
"{}:{}",
config.general.host, config.general.prometheus_exporter_port
);

let http_addr = match SocketAddr::from_str(&http_addr_str) {
Ok(addr) => addr,
Err(err) => {
Ok(addr) => addr,
Err(err) => {
error!("Invalid http address: {}", err);
std::process::exit(exitcode::CONFIG);
}
}
};

tokio::task::spawn(async move {
start_metric_server(http_addr).await;
start_metric_server(http_addr).await;
});
}
}

let addr = format!("{}:{}", config.general.host, config.general.port);
let addr = format!("{}:{}", config.general.host, config.general.port);

let listener = match TcpListener::bind(&addr).await {
let listener = match TcpListener::bind(&addr).await {
Ok(sock) => sock,
Err(err) => {
error!("Listener socket error: {:?}", err);
std::process::exit(exitcode::CONFIG);
error!("Listener socket error: {:?}", err);
std::process::exit(exitcode::CONFIG);
}
};
};

info!("Running on {}", addr);
info!("Running on {}", addr);

config.show();
config.show();

// Tracks which client is connected to which server for query cancellation.
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
// Tracks which client is connected to which server for query cancellation.
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));

// Statistics reporting.
let (stats_tx, stats_rx) = mpsc::channel(100_000);
REPORTER.store(Arc::new(Reporter::new(stats_tx.clone())));
// Statistics reporting.
let (stats_tx, stats_rx) = mpsc::channel(100_000);
REPORTER.store(Arc::new(Reporter::new(stats_tx.clone())));

// Connection pool that allows to query all shards and replicas.
match ConnectionPool::from_config(client_server_map.clone()).await {
// Connection pool that allows to query all shards and replicas.
match ConnectionPool::from_config(client_server_map.clone()).await {
Ok(_) => (),
Err(err) => {
error!("Pool error: {:?}", err);
std::process::exit(exitcode::CONFIG);
error!("Pool error: {:?}", err);
std::process::exit(exitcode::CONFIG);
}
};
};

tokio::task::spawn(async move {
tokio::task::spawn(async move {
let mut stats_collector = Collector::new(stats_rx, stats_tx.clone());
stats_collector.collect().await;
});
});

info!("Config autoreloader: {}", config.general.autoreload);

info!("Config autoreloader: {}", config.general.autoreload);
let mut autoreload_interval = tokio::time::interval(tokio::time::Duration::from_millis(15_000));
let autoreload_client_server_map = client_server_map.clone();

let mut autoreload_interval = tokio::time::interval(tokio::time::Duration::from_millis(15_000));
let autoreload_client_server_map = client_server_map.clone();
tokio::task::spawn(async move {
tokio::task::spawn(async move {
loop {
autoreload_interval.tick().await;
if config.general.autoreload {
autoreload_interval.tick().await;
if config.general.autoreload {
info!("Automatically reloading config");

if let Ok(changed) = reload_config(autoreload_client_server_map.clone()).await {
if changed {
if changed {
get_config().show()
}
}
};
}
}
}
});

let mut term_signal = unix_signal(SignalKind::terminate()).unwrap();
let mut interrupt_signal = unix_signal(SignalKind::interrupt()).unwrap();
let mut sighup_signal = unix_signal(SignalKind::hangup()).unwrap();
let (shutdown_tx, _) = broadcast::channel::<()>(1);
let (drain_tx, mut drain_rx) = mpsc::channel::<i32>(2048);
let (exit_tx, mut exit_rx) = mpsc::channel::<()>(1);
});

info!("Waiting for clients");
let mut term_signal = unix_signal(SignalKind::terminate()).unwrap();
let mut interrupt_signal = unix_signal(SignalKind::interrupt()).unwrap();
let mut sighup_signal = unix_signal(SignalKind::hangup()).unwrap();
let (shutdown_tx, _) = broadcast::channel::<()>(1);
let (drain_tx, mut drain_rx) = mpsc::channel::<i32>(2048);
let (exit_tx, mut exit_rx) = mpsc::channel::<()>(1);
let mut admin_only = false;
let mut total_clients = 0;

let mut admin_only = false;
let mut total_clients = 0;
info!("Waiting for clients");

loop {
loop {
tokio::select! {
// Reload config:
// kill -SIGHUP $(pgrep pgcat)
_ = sighup_signal.recv() => {
// Reload config:
// kill -SIGHUP $(pgrep pgcat)
_ = sighup_signal.recv() => {
info!("Reloading config");

_ = reload_config(client_server_map.clone()).await;

get_config().show();
},
},

// Initiate graceful shutdown sequence on sig int
_ = interrupt_signal.recv() => {
// Initiate graceful shutdown sequence on sig int
_ = interrupt_signal.recv() => {
info!("Got SIGINT, waiting for client connection drain now");
admin_only = true;

Expand All @@ -229,101 +231,100 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = drain_tx.send(0).await;

tokio::task::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(config.general.shutdown_timeout));
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(config.general.shutdown_timeout));

// First tick fires immediately.
interval.tick().await;
// First tick fires immediately.
interval.tick().await;

// Second one in the interval time.
interval.tick().await;
// Second one in the interval time.
interval.tick().await;

// We're done waiting.
error!("Graceful shutdown timed out. {} active clients being closed", total_clients);
// We're done waiting.
error!("Graceful shutdown timed out. {} active clients being closed", total_clients);

let _ = exit_tx.send(()).await;
let _ = exit_tx.send(()).await;
});
},
},

_ = term_signal.recv() => {
_ = term_signal.recv() => {
info!("Got SIGTERM, closing with {} clients active", total_clients);
break;
},

new_client = listener.accept() => {
let (socket, addr) = match new_client {
Ok((socket, addr)) => (socket, addr),
Err(err) => {
error!("{:?}", err);
continue;
}
};

let shutdown_rx = shutdown_tx.subscribe();
let drain_tx = drain_tx.clone();
let client_server_map = client_server_map.clone();

let tls_certificate = config.general.tls_certificate.clone();

tokio::task::spawn(async move {
let start = chrono::offset::Utc::now().naive_utc();

match client::client_entrypoint(
socket,
client_server_map,
shutdown_rx,
drain_tx,
admin_only,
tls_certificate.clone(),
config.general.log_client_connections,
)
.await
{
Ok(()) => {

let duration = chrono::offset::Utc::now().naive_utc() - start;

if config.general.log_client_disconnections {
info!(
"Client {:?} disconnected, session duration: {}",
addr,
format_duration(&duration)
);
} else {
debug!(
"Client {:?} disconnected, session duration: {}",
addr,
format_duration(&duration)
);
}
},

new_client = listener.accept() => {
let (socket, addr) = match new_client {
Ok((socket, addr)) => (socket, addr),
Err(err) => {
error!("{:?}", err);
continue;
}
};

let shutdown_rx = shutdown_tx.subscribe();
let drain_tx = drain_tx.clone();
let client_server_map = client_server_map.clone();

let tls_certificate = config.general.tls_certificate.clone();

tokio::task::spawn(async move {
let start = chrono::offset::Utc::now().naive_utc();

match client::client_entrypoint(
socket,
client_server_map,
shutdown_rx,
drain_tx,
admin_only,
tls_certificate.clone(),
config.general.log_client_connections,
)
.await
{
Ok(()) => {
let duration = chrono::offset::Utc::now().naive_utc() - start;

if config.general.log_client_disconnections {
info!(
"Client {:?} disconnected, session duration: {}",
addr,
format_duration(&duration)
);
} else {
debug!(
"Client {:?} disconnected, session duration: {}",
addr,
format_duration(&duration)
);
}
}

Err(err) => {
match err {
errors::Error::ClientBadStartup => debug!("Client disconnected with error {:?}", err),
_ => warn!("Client disconnected with error {:?}", err),
}

Err(err) => {
match err {
errors::Error::ClientBadStartup => debug!("Client disconnected with error {:?}", err),
_ => warn!("Client disconnected with error {:?}", err),
}
};
});
}

_ = exit_rx.recv() => {
break;
}
}
};
});
}

client_ping = drain_rx.recv() => {
let client_ping = client_ping.unwrap();
total_clients += client_ping;
_ = exit_rx.recv() => {
break;
}

if total_clients == 0 && admin_only {
let _ = exit_tx.send(()).await;
}
}
client_ping = drain_rx.recv() => {
let client_ping = client_ping.unwrap();
total_clients += client_ping;

if total_clients == 0 && admin_only {
let _ = exit_tx.send(()).await;
}
}
}
}
}

info!("Shutting down...");
info!("Shutting down...");
});
Ok(())
}