Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add unload options overwrite, include_query_id and use_raw_path. #16614

Merged
merged 4 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/query/ast/src/ast/format/syntax/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ pub(crate) fn pretty_copy_into_location(copy_stmt: CopyIntoLocationStmt) -> RcDo
.append(
RcDoc::line()
.append(RcDoc::text("SINGLE = "))
.append(RcDoc::text(copy_stmt.single.to_string())),
.append(RcDoc::text(copy_stmt.options.single.to_string())),
)
}

Expand Down
48 changes: 39 additions & 9 deletions src/query/ast/src/ast/statements/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,29 @@ impl Display for CopyIntoTableStmt {
}
}

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Drive, DriveMut, Eq)]
pub struct CopyIntoLocationOptions {
pub single: bool,
pub max_file_size: usize,
pub detailed_output: bool,
pub use_raw_path: bool,
pub include_query_id: bool,
pub overwrite: bool,
}

impl Default for CopyIntoLocationOptions {
fn default() -> Self {
Self {
single: Default::default(),
max_file_size: Default::default(),
detailed_output: false,
use_raw_path: false,
include_query_id: true,
overwrite: false,
}
}
}

/// CopyIntoLocationStmt is the parsed statement of `COPY into <location> from <table> ...`
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct CopyIntoLocationStmt {
Expand All @@ -151,9 +174,7 @@ pub struct CopyIntoLocationStmt {
pub src: CopyIntoLocationSource,
pub dst: FileLocation,
pub file_format: FileFormatOptions,
pub single: bool,
pub max_file_size: usize,
pub detailed_output: bool,
pub options: CopyIntoLocationOptions,
}

impl Display for CopyIntoLocationStmt {
Expand All @@ -171,9 +192,12 @@ impl Display for CopyIntoLocationStmt {
if !self.file_format.is_empty() {
write!(f, " FILE_FORMAT = ({})", self.file_format)?;
}
write!(f, " SINGLE = {}", self.single)?;
write!(f, " MAX_FILE_SIZE = {}", self.max_file_size)?;
write!(f, " DETAILED_OUTPUT = {}", self.detailed_output)?;
write!(f, " SINGLE = {}", self.options.single)?;
write!(f, " MAX_FILE_SIZE = {}", self.options.max_file_size)?;
write!(f, " DETAILED_OUTPUT = {}", self.options.detailed_output)?;
write!(f, " INCLUDE_QUERY_ID = {}", self.options.include_query_id)?;
write!(f, " USE_RAW_PATH = {}", self.options.use_raw_path)?;
write!(f, " overwrite= {}", self.options.overwrite)?;
sundy-li marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}
Expand All @@ -183,9 +207,12 @@ impl CopyIntoLocationStmt {
pub fn apply_option(&mut self, opt: CopyIntoLocationOption) {
match opt {
CopyIntoLocationOption::FileFormat(v) => self.file_format = v,
CopyIntoLocationOption::Single(v) => self.single = v,
CopyIntoLocationOption::MaxFileSize(v) => self.max_file_size = v,
CopyIntoLocationOption::DetailedOutput(v) => self.detailed_output = v,
CopyIntoLocationOption::Single(v) => self.options.single = v,
CopyIntoLocationOption::MaxFileSize(v) => self.options.max_file_size = v,
CopyIntoLocationOption::DetailedOutput(v) => self.options.detailed_output = v,
CopyIntoLocationOption::IncludeQueryID(v) => self.options.include_query_id = v,
CopyIntoLocationOption::UseRawPath(v) => self.options.use_raw_path = v,
CopyIntoLocationOption::OverWrite(v) => self.options.overwrite = v,
}
}
}
Expand Down Expand Up @@ -482,7 +509,10 @@ pub enum CopyIntoLocationOption {
FileFormat(FileFormatOptions),
MaxFileSize(usize),
Single(bool),
IncludeQueryID(bool),
UseRawPath(bool),
DetailedOutput(bool),
OverWrite(bool),
}

#[derive(Clone, Debug, PartialEq, Eq, Default, Drive, DriveMut)]
Expand Down
16 changes: 13 additions & 3 deletions src/query/ast/src/parser/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,7 @@ fn copy_into_location(i: Input) -> IResult<Statement> {
src,
dst,
file_format: Default::default(),
single: Default::default(),
max_file_size: Default::default(),
detailed_output: false,
options: Default::default(),
};
for opt in opts {
copy_stmt.apply_option(opt);
Expand Down Expand Up @@ -210,6 +208,18 @@ fn copy_into_location_option(i: Input) -> IResult<CopyIntoLocationOption> {
rule! { DETAILED_OUTPUT ~ "=" ~ #literal_bool },
|(_, _, detailed_output)| CopyIntoLocationOption::DetailedOutput(detailed_output),
),
map(
rule! { USE_RAW_PATH ~ "=" ~ #literal_bool },
|(_, _, use_raw_path)| CopyIntoLocationOption::UseRawPath(use_raw_path),
),
map(
rule! { INCLUDE_QUERY_ID ~ "=" ~ #literal_bool },
|(_, _, include_query_id)| CopyIntoLocationOption::IncludeQueryID(include_query_id),
),
map(
rule! { OVERWRITE ~ "=" ~ #literal_bool },
|(_, _, include_query_id)| CopyIntoLocationOption::OverWrite(include_query_id),
),
map(rule! { #file_format_clause }, |options| {
CopyIntoLocationOption::FileFormat(options)
}),
Expand Down
4 changes: 4 additions & 0 deletions src/query/ast/src/parser/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,8 @@ pub enum TokenKind {
IF,
#[token("IN", ignore(ascii_case))]
IN,
#[token("INCLUDE_QUERY_ID", ignore(ascii_case))]
INCLUDE_QUERY_ID,
#[token("INCREMENTAL", ignore(ascii_case))]
INCREMENTAL,
#[token("INDEX", ignore(ascii_case))]
Expand Down Expand Up @@ -1064,6 +1066,8 @@ pub enum TokenKind {
SYNTAX,
#[token("USAGE", ignore(ascii_case))]
USAGE,
#[token("USE_RAW_PATH", ignore(ascii_case))]
USE_RAW_PATH,
#[token("UPDATE", ignore(ascii_case))]
UPDATE,
#[token("UPLOAD", ignore(ascii_case))]
Expand Down
43 changes: 29 additions & 14 deletions src/query/ast/tests/it/testdata/stmt.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14148,7 +14148,7 @@ COPY INTO 's3://mybucket/data.csv'
skip_header = 1
)
---------- Output ---------
COPY INTO 's3://mybucket/data.csv' FROM mytable FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false
COPY INTO 's3://mybucket/data.csv' FROM mytable FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false INCLUDE_QUERY_ID = true USE_RAW_PATH = false overwrite= false
---------- AST ------------
CopyIntoLocation(
CopyIntoLocationStmt {
Expand Down Expand Up @@ -14197,9 +14197,14 @@ CopyIntoLocation(
),
},
},
single: false,
max_file_size: 0,
detailed_output: false,
options: CopyIntoLocationOptions {
single: false,
max_file_size: 0,
detailed_output: false,
use_raw_path: false,
include_query_id: true,
overwrite: false,
},
},
)

Expand All @@ -14208,7 +14213,7 @@ CopyIntoLocation(
COPY INTO '@my_stage/my data'
FROM mytable;
---------- Output ---------
COPY INTO '@my_stage/my data' FROM mytable SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false
COPY INTO '@my_stage/my data' FROM mytable SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false INCLUDE_QUERY_ID = true USE_RAW_PATH = false overwrite= false
---------- AST ------------
CopyIntoLocation(
CopyIntoLocationStmt {
Expand All @@ -14235,9 +14240,14 @@ CopyIntoLocation(
file_format: FileFormatOptions {
options: {},
},
single: false,
max_file_size: 0,
detailed_output: false,
options: CopyIntoLocationOptions {
single: false,
max_file_size: 0,
detailed_output: false,
use_raw_path: false,
include_query_id: true,
overwrite: false,
},
},
)

Expand All @@ -14252,7 +14262,7 @@ COPY INTO @my_stage
skip_header = 1
);
---------- Output ---------
COPY INTO '@my_stage' FROM mytable FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false
COPY INTO '@my_stage' FROM mytable FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false INCLUDE_QUERY_ID = true USE_RAW_PATH = false overwrite= false
---------- AST ------------
CopyIntoLocation(
CopyIntoLocationStmt {
Expand Down Expand Up @@ -14292,9 +14302,14 @@ CopyIntoLocation(
),
},
},
single: false,
max_file_size: 0,
detailed_output: false,
options: CopyIntoLocationOptions {
single: false,
max_file_size: 0,
detailed_output: false,
use_raw_path: false,
include_query_id: true,
overwrite: false,
},
},
)

Expand Down Expand Up @@ -19699,7 +19714,7 @@ CreateTask(
---------- Input ----------
CREATE TASK IF NOT EXISTS MyTask1 SCHEDULE = USING CRON '0 13 * * *' AS COPY INTO @my_internal_stage FROM canadian_city_population FILE_FORMAT = (TYPE = PARQUET)
---------- Output ---------
CREATE TASK IF NOT EXISTS MyTask1 SCHEDULE = USING CRON '0 13 * * *' AS COPY INTO '@my_internal_stage' FROM canadian_city_population FILE_FORMAT = (type = PARQUET) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false
CREATE TASK IF NOT EXISTS MyTask1 SCHEDULE = USING CRON '0 13 * * *' AS COPY INTO '@my_internal_stage' FROM canadian_city_population FILE_FORMAT = (type = PARQUET) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false INCLUDE_QUERY_ID = true USE_RAW_PATH = false overwrite= false
---------- AST ------------
CreateTask(
CreateTaskStmt {
Expand All @@ -19721,7 +19736,7 @@ CreateTask(
after: [],
when_condition: None,
sql: SingleStatement(
"COPY INTO '@my_internal_stage' FROM canadian_city_population FILE_FORMAT = (type = PARQUET) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false",
"COPY INTO '@my_internal_stage' FROM canadian_city_population FILE_FORMAT = (type = PARQUET) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false INCLUDE_QUERY_ID = true USE_RAW_PATH = false overwrite= false",
),
},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::fmt::Display;
use std::fmt::Formatter;
use std::sync::Arc;

use databend_common_ast::ast::CopyIntoLocationOptions;
use databend_common_exception::Result;
use databend_common_expression::RemoteExpr;
use databend_common_expression::TableSchema;
Expand All @@ -40,6 +41,7 @@ pub struct StageTableInfo {
// - may need to be purged as well (depends on the copy options)
pub duplicated_files_detected: Vec<String>,
pub is_select: bool,
pub copy_into_location_options: CopyIntoLocationOptions,
}

impl StageTableInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::Arc;

use databend_common_ast::ast::CopyIntoLocationOptions;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_catalog::plan::StageTableInfo;
use databend_common_exception::Result;
Expand Down Expand Up @@ -86,6 +87,7 @@ impl CopyIntoLocationInterpreter {
stage: &StageInfo,
path: &str,
query: &Plan,
options: &CopyIntoLocationOptions,
) -> Result<(PipelineBuildResult, Vec<UpdateStreamMetaReq>)> {
let (query_interpreter, update_stream_meta_req) = self.build_query(query).await?;
let query_physical_plan = query_interpreter.build_physical_plan().await?;
Expand All @@ -109,6 +111,7 @@ impl CopyIntoLocationInterpreter {
duplicated_files_detected: vec![],
is_select: false,
default_values: None,
copy_into_location_options: options.clone(),
},
}));

Expand Down Expand Up @@ -145,6 +148,7 @@ impl Interpreter for CopyIntoLocationInterpreter {
&self.plan.stage,
&self.plan.path,
&self.plan.from,
&self.plan.options,
)
.await?;

Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,7 @@ impl TableContext for QueryContext {
duplicated_files_detected: vec![],
is_select: true,
default_values: None,
copy_into_location_options: Default::default(),
};
OrcTable::try_create(info).await
}
Expand All @@ -1325,6 +1326,7 @@ impl TableContext for QueryContext {
duplicated_files_detected: vec![],
is_select: true,
default_values: None,
copy_into_location_options: Default::default(),
};
StageTable::try_create(info)
}
Expand Down Expand Up @@ -1359,6 +1361,7 @@ impl TableContext for QueryContext {
duplicated_files_detected: vec![],
is_select: true,
default_values: None,
copy_into_location_options: Default::default(),
};
StageTable::try_create(info)
}
Expand Down
Loading
Loading