Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fuse_snapshot only show the latest snapshot for external table #15721

Merged
merged 4 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/actions/test_sqllogic_standalone_linux/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ inputs:
description: "storage format for databend query to test"
required: true
default: all
enable_table_meta_cache:
description: "Enable table meta cache"
required: false
default: true

runs:
using: "composite"
Expand All @@ -34,11 +38,13 @@ runs:
shell: bash
env:
TEST_HANDLERS: ${{ inputs.handlers }}
CACHE_ENABLE_TABLE_META_CACHE: ${{ inputs.enable_table_meta_cache}}
run: bash ./scripts/ci/ci-run-sqllogic-tests.sh ${{ inputs.dirs }}

- name: Run native sqllogic Tests with Standalone mode
if: inputs.storage-format == 'all' || inputs.storage-format == 'native'
shell: bash
env:
TEST_HANDLERS: ${{ inputs.handlers }}
CACHE_ENABLE_TABLE_META_CACHE: ${{ inputs.enable_table_meta_cache}}
run: bash ./scripts/ci/ci-run-sqllogic-tests-native.sh ${{ inputs.dirs }}
38 changes: 31 additions & 7 deletions .github/workflows/reuse.sqllogic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ env:

jobs:
management_mode:
runs-on: [self-hosted, X64, Linux, 4c8g, "${{ inputs.runner_provider }}"]
runs-on: [ self-hosted, X64, Linux, 4c8g, "${{ inputs.runner_provider }}" ]
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/test_sqllogic_management_mode_linux
Expand All @@ -30,7 +30,7 @@ jobs:
handlers: mysql,http

standalone:
runs-on: [self-hosted, X64, Linux, 4c8g, "${{ inputs.runner_provider }}"]
runs-on: [ self-hosted, X64, Linux, 4c8g, "${{ inputs.runner_provider }}" ]
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -61,7 +61,7 @@ jobs:
name: test-sqllogic-standalone-${{ matrix.dirs }}-${{ matrix.handler }}

standalone_udf_server:
runs-on: [self-hosted, X64, Linux, 4c8g, "${{ inputs.runner_provider }}"]
runs-on: [ self-hosted, X64, Linux, 4c8g, "${{ inputs.runner_provider }}" ]
steps:
- uses: actions/checkout@v4
- name: Start UDF Server
Expand All @@ -82,7 +82,7 @@ jobs:
name: test-sqllogic-standalone-udf_server

standalone_cloud:
runs-on: [self-hosted, X64, Linux, 4c8g, "${{ inputs.runner_provider }}"]
runs-on: [ self-hosted, X64, Linux, 4c8g, "${{ inputs.runner_provider }}" ]
steps:
- uses: actions/checkout@v4
- name: Start Cloud Control Server
Expand All @@ -103,7 +103,7 @@ jobs:
name: test-sqllogic-standalone-cloud

standalone_minio:
runs-on: [self-hosted, X64, Linux, 4c8g, "${{ inputs.runner_provider }}"]
runs-on: [ self-hosted, X64, Linux, 4c8g, "${{ inputs.runner_provider }}" ]
strategy:
fail-fast: false
matrix:
Expand All @@ -127,7 +127,7 @@ jobs:
name: test-sqllogic-standalone-${{ matrix.dirs }}-${{ matrix.handler }}

cluster:
runs-on: [self-hosted, X64, Linux, 4c8g, "${{ inputs.runner_provider }}"]
runs-on: [ self-hosted, X64, Linux, 4c8g, "${{ inputs.runner_provider }}" ]
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -157,7 +157,7 @@ jobs:
name: test-sqllogic-cluster-${{ matrix.dirs }}-${{ matrix.handler }}

stage:
runs-on: [self-hosted, X64, Linux, 4c8g, "${{ inputs.runner_provider }}"]
runs-on: [ self-hosted, X64, Linux, 4c8g, "${{ inputs.runner_provider }}" ]
strategy:
fail-fast: false
matrix:
Expand All @@ -177,3 +177,27 @@ jobs:
uses: ./.github/actions/artifact_failure
with:
name: test-sqllogic-stage-${{ matrix.storage }}

standalone_no_table_meta_cache:
runs-on: [ self-hosted, X64, Linux, 4c8g, "${{ inputs.runner_provider }}" ]
strategy:
fail-fast: false
matrix:
dirs:
- "no_table_meta_cache"
handler:
- "http"
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/test_sqllogic_standalone_linux
timeout-minutes: 15
with:
dirs: ${{ matrix.dirs }}
handlers: ${{ matrix.handler }}
storage-format: all
enable_table_meta_cache: false
- name: Upload failure
if: failure()
uses: ./.github/actions/artifact_failure
with:
name: test-sqllogic-standalone-no-table-meta-cache-${{ matrix.dirs }}-${{ matrix.handler }}
6 changes: 4 additions & 2 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,10 @@ pub trait TableContext: Send + Sync {
fn set_on_error_mode(&self, mode: OnErrorMode);
fn get_maximum_error_per_file(&self) -> Option<HashMap<String, ErrorCode>>;

// Get the storage data accessor operator from the session manager.
fn get_data_operator(&self) -> Result<DataOperator>;
/// Get the storage data accessor operator from the session manager.
/// Note that this is the application level data accessor, which may be different from
/// the table level data accessor (e.g., table with customized storage parameters).
fn get_application_level_data_operator(&self) -> Result<DataOperator>;

async fn get_file_format(&self, name: &str) -> Result<FileFormatParams>;

Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/interpreters/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ mod table;
mod task;
mod util;

mod shared_table;

pub use grant::validate_grant_object_exists;
pub use notification::get_notification_client_config;
pub use query_log::InterpreterQueryLog;
pub use shared_table::save_share_table_info;
pub use stream::dml_build_update_stream_req;
pub use stream::query_build_update_stream_req;
pub use stream::StreamTableUpdates;
Expand Down
34 changes: 34 additions & 0 deletions src/query/service/src/interpreters/common/shared_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_meta_app::share::ShareTableInfoMap;

use crate::sessions::QueryContext;

pub async fn save_share_table_info(
ctx: &QueryContext,
share_table_info: &Option<Vec<ShareTableInfoMap>>,
) -> Result<()> {
if let Some(share_table_info) = share_table_info {
databend_common_storages_share::save_share_table_info(
ctx.get_tenant().tenant_name(),
ctx.get_application_level_data_operator()?.operator(),
share_table_info,
)
.await?;
}
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl Interpreter for CreateDatabaseInterpreter {

save_share_spec(
self.ctx.get_tenant().tenant_name(),
self.ctx.get_data_operator()?.operator(),
self.ctx.get_application_level_data_operator()?.operator(),
Some(spec_vec),
Some(share_table_into),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Interpreter for DropDatabaseInterpreter {

save_share_spec(
self.ctx.get_tenant().tenant_name(),
self.ctx.get_data_operator()?.operator(),
self.ctx.get_application_level_data_operator()?.operator(),
Some(spec_vec),
Some(share_table_into),
)
Expand Down
27 changes: 7 additions & 20 deletions src/query/service/src/interpreters/interpreter_index_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::SegmentLocation;
use databend_enterprise_aggregating_index::get_agg_index_handler;
use databend_storages_common_table_meta::meta::Location;
use opendal::Operator;

use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
Expand All @@ -70,7 +69,6 @@ impl RefreshIndexInterpreter {
&self,
plan: &DataSourcePlan,
fuse_table: Arc<FuseTable>,
dal: Operator,
) -> Result<Option<Partitions>> {
let snapshot_loc = plan.statistics.snapshot.clone();
let mut lazy_init_segments = Vec::with_capacity(plan.parts.len());
Expand All @@ -91,7 +89,7 @@ impl RefreshIndexInterpreter {
let ctx = self.ctx.clone();

let (_statistics, partitions) = fuse_table
.prune_snapshot_blocks(ctx, dal, push_downs, table_schema, lazy_init_segments, 0)
.prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0)
.await?;

return Ok(Some(partitions));
Expand All @@ -105,15 +103,14 @@ impl RefreshIndexInterpreter {
&self,
plan: &DataSourcePlan,
fuse_table: Arc<FuseTable>,
dal: Operator,
segments: Vec<SegmentLocation>,
) -> Result<Option<Partitions>> {
let table_schema = self.plan.table_info.schema();
let push_downs = plan.push_downs.clone();
let ctx = self.ctx.clone();

let (_statistics, partitions) = fuse_table
.prune_snapshot_blocks(ctx, dal, push_downs, table_schema, segments, 0)
.prune_snapshot_blocks(ctx, push_downs, table_schema, segments, 0)
.await?;

Ok(Some(partitions))
Expand All @@ -124,7 +121,6 @@ impl RefreshIndexInterpreter {
&self,
query_plan: &PhysicalPlan,
fuse_table: Arc<FuseTable>,
dal: Operator,
segments: Option<Vec<Location>>,
) -> Result<Option<DataSourcePlan>> {
let mut source = vec![];
Expand Down Expand Up @@ -152,15 +148,10 @@ impl RefreshIndexInterpreter {
let partitions = match segments {
Some(segment_locs) if !segment_locs.is_empty() => {
let segment_locations = create_segment_location_vector(segment_locs, None);
self.get_partitions_with_given_segments(
&source,
fuse_table,
dal,
segment_locations,
)
.await?
self.get_partitions_with_given_segments(&source, fuse_table, segment_locations)
.await?
}
Some(_) | None => self.get_partitions(&source, fuse_table, dal).await?,
Some(_) | None => self.get_partitions(&source, fuse_table).await?,
};
if let Some(parts) = partitions {
source.parts = parts;
Expand Down Expand Up @@ -266,7 +257,6 @@ impl Interpreter for RefreshIndexInterpreter {
}
};

let data_accessor = self.ctx.get_data_operator()?;
let fuse_table = FuseTable::do_create(self.plan.table_info.clone())?;
let fuse_table: Arc<FuseTable> = fuse_table.into();

Expand All @@ -275,7 +265,6 @@ impl Interpreter for RefreshIndexInterpreter {
.get_read_source(
&query_plan,
fuse_table.clone(),
data_accessor.operator(),
self.plan.segment_locs.clone(),
)
.await?;
Expand Down Expand Up @@ -351,15 +340,13 @@ impl Interpreter for RefreshIndexInterpreter {
}
let sink_schema = Arc::new(sink_schema);

let write_settings = fuse_table.get_write_settings();

build_res.main_pipeline.try_resize(1)?;
build_res.main_pipeline.add_sink(|input| {
AggIndexSink::try_create(
input,
data_accessor.operator(),
fuse_table.get_operator(),
self.plan.index_id,
write_settings.clone(),
fuse_table.get_write_settings(),
sink_schema.clone(),
block_name_offset,
self.plan.user_defined_block_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl Interpreter for AlterShareTenantsInterpreter {

save_share_spec(
self.ctx.get_tenant().tenant_name(),
self.ctx.get_data_operator()?.operator(),
self.ctx.get_application_level_data_operator()?.operator(),
resp.spec_vec,
None,
)
Expand All @@ -80,7 +80,7 @@ impl Interpreter for AlterShareTenantsInterpreter {

save_share_spec(
self.ctx.get_tenant().tenant_name(),
self.ctx.get_data_operator()?.operator(),
self.ctx.get_application_level_data_operator()?.operator(),
resp.spec_vec,
None,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Interpreter for CreateShareInterpreter {

save_share_spec(
self.ctx.get_tenant().tenant_name(),
self.ctx.get_data_operator()?.operator(),
self.ctx.get_application_level_data_operator()?.operator(),
resp.spec_vec,
None,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Interpreter for DropShareInterpreter {

save_share_spec(
self.ctx.get_tenant().tenant_name(),
self.ctx.get_data_operator()?.operator(),
self.ctx.get_application_level_data_operator()?.operator(),
resp.spec_vec,
Some(vec![(self.plan.share.clone(), None)]),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Interpreter for GrantShareObjectInterpreter {

save_share_spec(
self.ctx.get_tenant().tenant_name(),
self.ctx.get_data_operator()?.operator(),
self.ctx.get_application_level_data_operator()?.operator(),
resp.spec_vec,
Some(vec![resp.share_table_info]),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Interpreter for RevokeShareObjectInterpreter {

save_share_spec(
self.ctx.get_tenant().tenant_name(),
self.ctx.get_data_operator()?.operator(),
self.ctx.get_application_level_data_operator()?.operator(),
resp.spec_vec,
Some(vec![resp.share_table_info]),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ use databend_common_sql::field_default_value;
use databend_common_sql::plans::AddColumnOption;
use databend_common_sql::plans::AddTableColumnPlan;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_share::save_share_table_info;
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
use databend_common_storages_view::view_table::VIEW_ENGINE;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::meta::Versioned;
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use log::info;

use crate::interpreters::common::save_share_table_info;
use crate::interpreters::interpreter_table_create::is_valid_column;
use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -133,14 +133,7 @@ impl Interpreter for AddTableColumnInterpreter {

let res = catalog.update_table_meta(table_info, req).await?;

if let Some(share_table_info) = res.share_table_info {
save_share_table_info(
self.ctx.get_tenant().tenant_name(),
self.ctx.get_data_operator()?.operator(),
share_table_info,
)
.await?;
}
save_share_table_info(&self.ctx, &res.share_table_info).await?;
};

Ok(PipelineBuildResult::create())
Expand Down
Loading
Loading