Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/develop' into refactor/remove-…
Browse files Browse the repository at this point in the history
…catalog

# Conflicts:
#	src/frontend/src/catalog.rs
  • Loading branch information
v0y4g3r committed Apr 26, 2023
2 parents 2d0f6ba + 1a245f3 commit bbb15b0
Show file tree
Hide file tree
Showing 15 changed files with 80 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions src/common/grpc/src/channel_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ impl Default for ChannelConfig {
rate_limit: None,
initial_stream_window_size: None,
initial_connection_window_size: None,
http2_keep_alive_interval: None,
http2_keep_alive_interval: Some(Duration::from_secs(30)),
http2_keep_alive_timeout: None,
http2_keep_alive_while_idle: None,
http2_keep_alive_while_idle: Some(true),
http2_adaptive_window: None,
tcp_keepalive: None,
tcp_nodelay: true,
Expand Down Expand Up @@ -497,9 +497,9 @@ mod tests {
rate_limit: None,
initial_stream_window_size: None,
initial_connection_window_size: None,
http2_keep_alive_interval: None,
http2_keep_alive_interval: Some(Duration::from_secs(30)),
http2_keep_alive_timeout: None,
http2_keep_alive_while_idle: None,
http2_keep_alive_while_idle: Some(true),
http2_adaptive_window: None,
tcp_keepalive: None,
tcp_nodelay: true,
Expand Down
2 changes: 2 additions & 0 deletions src/common/telemetry/src/panic_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::panic;
use std::time::Duration;

use backtrace::Backtrace;
use metrics::increment_counter;

pub fn set_panic_hook() {
// Set a panic hook that records the panic as a `tracing` event at the
Expand All @@ -40,6 +41,7 @@ pub fn set_panic_hook() {
} else {
tracing::error!(message = %panic, backtrace = %backtrace);
}
increment_counter!("panic_counter");
default_hook(panic);
}));

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use catalog::{
};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::BoxedError;
use common_telemetry::error;
use common_telemetry::warn;
use futures::StreamExt;
use futures_util::TryStreamExt;
use meta_client::rpc::TableName;
Expand Down Expand Up @@ -235,7 +235,7 @@ impl CatalogManager for FrontendCatalogManager {
if let Ok(key) = CatalogKey::parse(catalog_key.as_ref()) {
res.insert(key.catalog_name);
} else {
error!("invalid catalog key: {:?}", catalog_key);
warn!("invalid catalog key: {:?}", catalog_key);
}
}
Ok(res.into_iter().collect())
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ futures.workspace = true
h2 = "0.3"
http-body = "0.4"
lazy_static = "1.4"
metrics.workspace = true
parking_lot = "0.12"
prost.workspace = true
rand.workspace = true
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod lease;
pub mod lock;
pub mod metadata_service;
pub mod metasrv;
mod metrics;
#[cfg(feature = "mock")]
pub mod mocks;
mod procedure;
Expand Down
5 changes: 4 additions & 1 deletion src/meta-srv/src/metadata_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use std::sync::Arc;
use api::v1::meta::CompareAndPutRequest;
use async_trait::async_trait;
use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use common_telemetry::info;
use common_telemetry::{info, timer};
use metrics::increment_counter;
use snafu::{ensure, ResultExt};

use crate::error;
Expand Down Expand Up @@ -59,6 +60,7 @@ impl MetadataService for DefaultMetadataService {
schema_name: &str,
if_not_exist: bool,
) -> Result<()> {
let _timer = timer!(crate::metrics::METRIC_META_CREATE_SCHEMA);
let kv_store = self.kv_store.clone();

let catalog_key = CatalogKey {
Expand All @@ -84,6 +86,7 @@ impl MetadataService for DefaultMetadataService {
let resp = kv_store.compare_and_put(req).await?;

if resp.success {
increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG);
info!("Successfully created a catalog: {}", catalog_name);
}

Expand Down
16 changes: 16 additions & 0 deletions src/meta-srv/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) const METRIC_META_CREATE_CATALOG: &str = "meta.create_catalog";
pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema";
4 changes: 2 additions & 2 deletions src/object-store/src/cache_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
let cache_path = self.cache_path(&path, &args);
let lru_cache = self.lru_cache.clone();

match self.cache.read(&cache_path, OpRead::default()).await {
match self.cache.read(&cache_path, args.clone()).await {
Ok((rp, r)) => {
increment_counter!(OBJECT_STORE_LRU_CACHE_HIT);

Expand All @@ -116,7 +116,7 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
writer.write(Bytes::from(buf)).await?;
writer.close().await?;

match self.cache.read(&cache_path, OpRead::default()).await {
match self.cache.read(&cache_path, args.clone()).await {
Ok((rp, reader)) => {
let r = {
// push new cache file name to lru
Expand Down
5 changes: 5 additions & 0 deletions src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use aide::transform::TransformOperation;
use axum::extract::{Json, Query, State};
use axum::{Extension, Form};
use common_error::status_code::StatusCode;
use common_telemetry::timer;
use query::parser::PromQuery;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand All @@ -42,6 +43,8 @@ pub async fn sql(
_user_info: Extension<UserInfo>,
Form(form_params): Form<SqlQuery>,
) -> Json<JsonResponse> {
let _timer = timer!(crate::metrics::METRIC_HTTP_SQL_ELAPSED);

let sql_handler = &state.sql_handler;

let start = Instant::now();
Expand Down Expand Up @@ -93,6 +96,8 @@ pub async fn promql(
// TODO(fys): pass _user_info into query context
_user_info: Extension<UserInfo>,
) -> Json<JsonResponse> {
let _timer = timer!(crate::metrics::METRIC_HTTP_PROMQL_ELAPSED);

let sql_handler = &state.sql_handler;
let exec_start = Instant::now();
let db = params.db.clone();
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod http;
pub mod influxdb;
pub mod interceptor;
pub mod line_writer;
mod metrics;
pub mod metrics_handler;
pub mod mysql;
pub mod opentsdb;
Expand Down
16 changes: 16 additions & 0 deletions src/servers/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) const METRIC_HTTP_SQL_ELAPSED: &str = "servers.http_sql_elapsed";
pub(crate) const METRIC_HTTP_PROMQL_ELAPSED: &str = "servers.http_promql_elapsed";
8 changes: 4 additions & 4 deletions src/servers/src/mysql/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::{error, info};
use common_telemetry::logging::{info, warn};
use futures::StreamExt;
use opensrv_mysql::{
plain_run_with_options, secure_run_with_options, AsyncMysqlIntermediary, IntermediaryOptions,
Expand Down Expand Up @@ -134,12 +134,12 @@ impl MysqlServer {

async move {
match tcp_stream {
Err(error) => error!("Broken pipe: {}", error), // IoError doesn't impl ErrorExt.
Err(error) => warn!("Broken pipe: {}", error), // IoError doesn't impl ErrorExt.
Ok(io_stream) => {
if let Err(error) =
Self::handle(io_stream, io_runtime, spawn_ref, spawn_config).await
{
error!(error; "Unexpected error when handling TcpStream");
warn!("Unexpected error when handling TcpStream {}", error);
};
}
};
Expand All @@ -159,7 +159,7 @@ impl MysqlServer {
if let Err(e) = Self::do_handle(stream, spawn_ref, spawn_config).await {
// TODO(LFC): Write this error to client as well, in MySQL text protocol.
// Looks like we have to expose opensrv-mysql's `PacketWriter`?
error!(e; "Internal error occurred during query exec, server actively close the channel to let client try next time.")
warn!("Internal error occurred during query exec, server actively close the channel to let client try next time: {}.", e)
}
});

Expand Down
28 changes: 19 additions & 9 deletions tests-integration/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ fn get_port() -> usize {
.fetch_add(1, Ordering::Relaxed)
}

#[derive(Debug, Eq, PartialEq)]
pub enum StorageType {
S3,
S3WithCache,
File,
Oss,
}
Expand All @@ -72,7 +74,7 @@ impl StorageType {

match self {
StorageType::File => true, // always test file
StorageType::S3 => {
StorageType::S3 | StorageType::S3WithCache => {
if let Ok(b) = env::var("GT_S3_BUCKET") {
!b.is_empty()
} else {
Expand All @@ -90,6 +92,16 @@ impl StorageType {
}
}

fn s3_test_config() -> S3Config {
S3Config {
root: uuid::Uuid::new_v4().to_string(),
access_key_id: env::var("GT_S3_ACCESS_KEY_ID").unwrap(),
secret_access_key: env::var("GT_S3_ACCESS_KEY").unwrap(),
bucket: env::var("GT_S3_BUCKET").unwrap(),
..Default::default()
}
}

fn get_test_store_config(
store_type: &StorageType,
name: &str,
Expand Down Expand Up @@ -124,14 +136,12 @@ fn get_test_store_config(
Some(TempDirGuard::Oss(TempFolder::new(&store, "/"))),
)
}
StorageType::S3 => {
let s3_config = S3Config {
root: uuid::Uuid::new_v4().to_string(),
access_key_id: env::var("GT_S3_ACCESS_KEY_ID").unwrap(),
secret_access_key: env::var("GT_S3_ACCESS_KEY").unwrap(),
bucket: env::var("GT_S3_BUCKET").unwrap(),
..Default::default()
};
StorageType::S3 | StorageType::S3WithCache => {
let mut s3_config = s3_test_config();

if *store_type == StorageType::S3WithCache {
s3_config.cache_path = Some("/tmp/greptimedb_cache".to_string());
}

let mut builder = S3::default();
builder
Expand Down
4 changes: 2 additions & 2 deletions tests-integration/tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ mod grpc;
#[macro_use]
mod http;

grpc_tests!(File, S3, Oss);
http_tests!(File, S3, Oss);
grpc_tests!(File, S3, S3WithCache, Oss);
http_tests!(File, S3, S3WithCache, Oss);

0 comments on commit bbb15b0

Please sign in to comment.