Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate differences between Parquet files generated by PySpark & … #6

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,4 @@ dmypy.json
*.parquet

data/
tmp/
246 changes: 246 additions & 0 deletions notebooks/pyarrow-vs-pyspark-parquet-metadata.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "8dcbcfc2-79be-4e36-9633-f16b384bbb84",
"metadata": {},
"outputs": [],
"source": [
"import pyarrow.parquet as pq"
]
},
{
"cell_type": "markdown",
"id": "678fec87-1c2c-4ee9-8862-3148adff1a1c",
"metadata": {},
"source": [
"## Dask Parquet file"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "429c1c5e-15fd-4eb7-b4a1-5b3199dc580b",
"metadata": {},
"outputs": [],
"source": [
"parquet_file = pq.ParquetFile('../tmp/mrpowers-h2o/groupby-1e7/parquet-dask/part.0.parquet')"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "257f078c-4fec-4b70-807a-59e3a4dfa1ac",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<pyarrow._parquet.FileMetaData object at 0x16b19b270>\n",
" created_by: parquet-cpp-arrow version 6.0.1\n",
" num_columns: 10\n",
" num_rows: 20000000\n",
" num_row_groups: 1\n",
" format_version: 1.0\n",
" serialized_size: 5299"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"parquet_file.metadata"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "680cd102-991b-4ae9-bc5d-d44d876e3db7",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<pyarrow._parquet.RowGroupMetaData object at 0x16b1ed450>\n",
" num_columns: 10\n",
" num_rows: 20000000\n",
" total_byte_size: 728526800"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"parquet_file.metadata.row_group(0)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "c8b636f9-9818-4f09-8bda-de8ad8f1b22e",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<pyarrow._parquet.ParquetSchema object at 0x16b1fa900>\n",
"required group field_id=-1 schema {\n",
" optional binary field_id=-1 id1 (String);\n",
" optional binary field_id=-1 id2 (String);\n",
" optional binary field_id=-1 id3 (String);\n",
" optional int64 field_id=-1 id4;\n",
" optional int64 field_id=-1 id5;\n",
" optional int64 field_id=-1 id6;\n",
" optional int64 field_id=-1 v1;\n",
" optional int64 field_id=-1 v2;\n",
" optional double field_id=-1 v3;\n",
" optional int64 field_id=-1 __null_dask_index__;\n",
"}"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"parquet_file.schema"
]
},
{
"cell_type": "markdown",
"id": "06f7cb7d-678f-499f-aa2e-0771dd8780d7",
"metadata": {},
"source": [
"## PySpark Parquet file"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "cdda39b1-5b55-44fc-9ea1-58b32b9294a7",
"metadata": {},
"outputs": [],
"source": [
"parquet_file = pq.ParquetFile('../tmp/mrpowers-h2o/groupby-1e7/parquet-pyspark/part-00000-c0fc14dc-8165-4db1-941e-9df62834f14c-c000.snappy.parquet')"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "e0b9146d-76be-4f53-b47e-9c326aaa9635",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<pyarrow._parquet.FileMetaData object at 0x10ca928b0>\n",
" created_by: parquet-mr version 1.12.1 (build 2a5c06c58fa987f85aa22170be14d927d5ff6e7d)\n",
" num_columns: 9\n",
" num_rows: 20000000\n",
" num_row_groups: 4\n",
" format_version: 1.0\n",
" serialized_size: 4926"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"parquet_file.metadata"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "b5ad7225-cd00-4562-84ea-699b134eebb9",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<pyarrow._parquet.RowGroupMetaData object at 0x10c9dd220>\n",
" num_columns: 9\n",
" num_rows: 6610100\n",
" total_byte_size: 196985924"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"parquet_file.metadata.row_group(0)"
]
},
{
"cell_type": "code",
"execution_count": 17,
"id": "7058fc96-e78b-4149-919b-c8043f72ea71",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<pyarrow._parquet.ParquetSchema object at 0x16b1bd200>\n",
"required group field_id=-1 spark_schema {\n",
" optional binary field_id=-1 id1 (String);\n",
" optional binary field_id=-1 id2 (String);\n",
" optional binary field_id=-1 id3 (String);\n",
" optional int32 field_id=-1 id4;\n",
" optional int32 field_id=-1 id5;\n",
" optional int32 field_id=-1 id6;\n",
" optional int32 field_id=-1 v1;\n",
" optional int32 field_id=-1 v2;\n",
" optional double field_id=-1 v3;\n",
"}"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"parquet_file.schema"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "48c14e79-f6fe-418a-a0e6-347b25711ac2",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python [conda env:mr-pyspark] *",
"language": "python",
"name": "conda-env-mr-pyspark-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
10 changes: 9 additions & 1 deletion scripts/create_groupby_data_parquet.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import dask.dataframe as dd
import sys

dataset = sys.argv[1]
# dataset = sys.argv[1]

ddf = dd.read_csv("./data/mrpowers-h2o/groupby-1e8/csv/*.csv")
ddf.repartition(5).to_parquet(
"./tmp/mrpowers-h2o/groupby-1e7/parquet-dask",
engine="pyarrow",
compression="snappy",
)


if dataset == "1e7":
# 1e7
Expand Down
7 changes: 6 additions & 1 deletion scripts/create_groupby_data_parquet_pyspark.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import sys

dataset = sys.argv[1]
# dataset = sys.argv[1]

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("mrpowers").getOrCreate()

df = spark.read.csv("./data/mrpowers-h2o/groupby-1e8/csv", header='true', inferSchema=True)
df.repartition(5).write.parquet(
"./tmp/mrpowers-h2o/groupby-1e7/parquet-pyspark"
)

if dataset == "1e7":
df = spark.read.csv("./data/mrpowers-h2o/groupby-1e7/csv", header='true', inferSchema=True)
df.repartition(26).write.parquet(
Expand Down