From 98919ff2f368e1c1115e5b15583258367876ac6b Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Wed, 24 May 2023 23:03:02 +0100 Subject: [PATCH] Add parquet-concat (#4274) --- parquet/Cargo.toml | 4 + parquet/src/bin/parquet-concat.rs | 118 ++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+) create mode 100644 parquet/src/bin/parquet-concat.rs diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 29f7cda1360f..cc48424a6b05 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -131,6 +131,10 @@ required-features = ["cli"] name = "parquet-rowcount" required-features = ["cli"] +[[bin]] +name = "parquet-concat" +required-features = ["cli"] + [[bin]] name = "parquet-fromcsv" required-features = ["arrow", "cli", "snap", "brotli", "flate2", "lz4", "zstd"] diff --git a/parquet/src/bin/parquet-concat.rs b/parquet/src/bin/parquet-concat.rs new file mode 100644 index 000000000000..9cbdf8e7b399 --- /dev/null +++ b/parquet/src/bin/parquet-concat.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary that concatenates the column data of one or more parquet files +//! +//! # Install +//! +//! `parquet-concat` can be installed using `cargo`: +//! ``` +//! cargo install parquet --features=cli +//! ``` +//! After this `parquet-concat` should be available: +//! ``` +//! parquet-concat out.parquet a.parquet b.parquet +//! ``` +//! +//! The binary can also be built from the source code and run as follows: +//! ``` +//! cargo run --features=cli --bin parquet-concat out.parquet a.parquet b.parquet +//! ``` +//! +//! Note: this does not currently support preserving the page index or bloom filters +//! + +use clap::Parser; +use parquet::column::writer::ColumnCloseResult; +use parquet::errors::{ParquetError, Result}; +use parquet::file::properties::WriterProperties; +use parquet::file::writer::SerializedFileWriter; +use std::fs::File; +use std::sync::Arc; + +#[derive(Debug, Parser)] +#[clap(author, version)] +/// Concatenates one or more parquet files +struct Args { + /// Path to output + output: String, + + /// Path to input files + input: Vec, +} + +impl Args { + fn run(&self) -> Result<()> { + if self.input.is_empty() { + return Err(ParquetError::General( + "Must provide at least one input file".into(), + )); + } + + let output = File::create(&self.output)?; + + let inputs = self + .input + .iter() + .map(|x| { + let reader = File::open(x)?; + let metadata = parquet::file::footer::parse_metadata(&reader)?; + Ok((reader, metadata)) + }) + .collect::>>()?; + + let expected = inputs[0].1.file_metadata().schema(); + for (_, metadata) in inputs.iter().skip(1) { + let actual = metadata.file_metadata().schema(); + if expected != actual { + return Err(ParquetError::General(format!( + "inputs must have the same schema, {expected:#?} vs {actual:#?}" + ))); + } + } + + let props = Arc::new(WriterProperties::builder().build()); + let schema = inputs[0].1.file_metadata().schema_descr().root_schema_ptr(); + let mut writer = SerializedFileWriter::new(output, schema, props)?; + + for (input, metadata) in inputs { + for rg in metadata.row_groups() { + let mut rg_out = writer.next_row_group()?; + for column in rg.columns() { + let result = ColumnCloseResult { + bytes_written: column.compressed_size() as _, + rows_written: rg.num_rows() as _, + metadata: column.clone(), + bloom_filter: None, + column_index: None, + offset_index: None, + }; + rg_out.append_column(&input, result)?; + } + rg_out.close()?; + } + } + + writer.close()?; + + Ok(()) + } +} + +fn main() -> Result<()> { + Args::parse().run() +}