diff --git a/.gitignore b/.gitignore index 5995335..bfa7f7e 100644 --- a/.gitignore +++ b/.gitignore @@ -135,3 +135,4 @@ dmypy.json *.parquet data/ +tmp/ diff --git a/notebooks/pyarrow-vs-pyspark-parquet-metadata.ipynb b/notebooks/pyarrow-vs-pyspark-parquet-metadata.ipynb new file mode 100644 index 0000000..14a902a --- /dev/null +++ b/notebooks/pyarrow-vs-pyspark-parquet-metadata.ipynb @@ -0,0 +1,246 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "8dcbcfc2-79be-4e36-9633-f16b384bbb84", + "metadata": {}, + "outputs": [], + "source": [ + "import pyarrow.parquet as pq" + ] + }, + { + "cell_type": "markdown", + "id": "678fec87-1c2c-4ee9-8862-3148adff1a1c", + "metadata": {}, + "source": [ + "## Dask Parquet file" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "429c1c5e-15fd-4eb7-b4a1-5b3199dc580b", + "metadata": {}, + "outputs": [], + "source": [ + "parquet_file = pq.ParquetFile('../tmp/mrpowers-h2o/groupby-1e7/parquet-dask/part.0.parquet')" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "257f078c-4fec-4b70-807a-59e3a4dfa1ac", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "\n", + " created_by: parquet-cpp-arrow version 6.0.1\n", + " num_columns: 10\n", + " num_rows: 20000000\n", + " num_row_groups: 1\n", + " format_version: 1.0\n", + " serialized_size: 5299" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "parquet_file.metadata" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "680cd102-991b-4ae9-bc5d-d44d876e3db7", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "\n", + " num_columns: 10\n", + " num_rows: 20000000\n", + " total_byte_size: 728526800" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "parquet_file.metadata.row_group(0)" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "c8b636f9-9818-4f09-8bda-de8ad8f1b22e", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "\n", + "required group field_id=-1 schema {\n", + " optional binary field_id=-1 id1 (String);\n", + " optional binary field_id=-1 id2 (String);\n", + " optional binary field_id=-1 id3 (String);\n", + " optional int64 field_id=-1 id4;\n", + " optional int64 field_id=-1 id5;\n", + " optional int64 field_id=-1 id6;\n", + " optional int64 field_id=-1 v1;\n", + " optional int64 field_id=-1 v2;\n", + " optional double field_id=-1 v3;\n", + " optional int64 field_id=-1 __null_dask_index__;\n", + "}" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "parquet_file.schema" + ] + }, + { + "cell_type": "markdown", + "id": "06f7cb7d-678f-499f-aa2e-0771dd8780d7", + "metadata": {}, + "source": [ + "## PySpark Parquet file" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "cdda39b1-5b55-44fc-9ea1-58b32b9294a7", + "metadata": {}, + "outputs": [], + "source": [ + "parquet_file = pq.ParquetFile('../tmp/mrpowers-h2o/groupby-1e7/parquet-pyspark/part-00000-c0fc14dc-8165-4db1-941e-9df62834f14c-c000.snappy.parquet')" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "e0b9146d-76be-4f53-b47e-9c326aaa9635", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "\n", + " created_by: parquet-mr version 1.12.1 (build 2a5c06c58fa987f85aa22170be14d927d5ff6e7d)\n", + " num_columns: 9\n", + " num_rows: 20000000\n", + " num_row_groups: 4\n", + " format_version: 1.0\n", + " serialized_size: 4926" + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "parquet_file.metadata" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "b5ad7225-cd00-4562-84ea-699b134eebb9", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "\n", + " num_columns: 9\n", + " num_rows: 6610100\n", + " total_byte_size: 196985924" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "parquet_file.metadata.row_group(0)" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "7058fc96-e78b-4149-919b-c8043f72ea71", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "\n", + "required group field_id=-1 spark_schema {\n", + " optional binary field_id=-1 id1 (String);\n", + " optional binary field_id=-1 id2 (String);\n", + " optional binary field_id=-1 id3 (String);\n", + " optional int32 field_id=-1 id4;\n", + " optional int32 field_id=-1 id5;\n", + " optional int32 field_id=-1 id6;\n", + " optional int32 field_id=-1 v1;\n", + " optional int32 field_id=-1 v2;\n", + " optional double field_id=-1 v3;\n", + "}" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "parquet_file.schema" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "48c14e79-f6fe-418a-a0e6-347b25711ac2", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python [conda env:mr-pyspark] *", + "language": "python", + "name": "conda-env-mr-pyspark-py" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/scripts/create_groupby_data_parquet.py b/scripts/create_groupby_data_parquet.py index c4ac331..bcf8bbf 100644 --- a/scripts/create_groupby_data_parquet.py +++ b/scripts/create_groupby_data_parquet.py @@ -1,7 +1,15 @@ import dask.dataframe as dd import sys -dataset = sys.argv[1] +# dataset = sys.argv[1] + +ddf = dd.read_csv("./data/mrpowers-h2o/groupby-1e8/csv/*.csv") +ddf.repartition(5).to_parquet( + "./tmp/mrpowers-h2o/groupby-1e7/parquet-dask", + engine="pyarrow", + compression="snappy", +) + if dataset == "1e7": # 1e7 diff --git a/scripts/create_groupby_data_parquet_pyspark.py b/scripts/create_groupby_data_parquet_pyspark.py index 422ec03..dbeb53e 100644 --- a/scripts/create_groupby_data_parquet_pyspark.py +++ b/scripts/create_groupby_data_parquet_pyspark.py @@ -1,11 +1,16 @@ import sys -dataset = sys.argv[1] +# dataset = sys.argv[1] from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("mrpowers").getOrCreate() +df = spark.read.csv("./data/mrpowers-h2o/groupby-1e8/csv", header='true', inferSchema=True) +df.repartition(5).write.parquet( + "./tmp/mrpowers-h2o/groupby-1e7/parquet-pyspark" +) + if dataset == "1e7": df = spark.read.csv("./data/mrpowers-h2o/groupby-1e7/csv", header='true', inferSchema=True) df.repartition(26).write.parquet(