diff --git a/README.md b/README.md index d7b0015..fb49bba 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,11 @@ Apache Spark + Tableau Hyper API: or even incorporate it as a final step in an ETL pipeline, e.g. refresh data extract with latest CDC. ## Getting Started -A list of usage examples is available in the `demo` folder of this repo as a [Databricks Notebook Archive (DBC)](demo/Hyperleaup-Demo.dbc). +Install latest release from PyPI: +`pip install hyperleaup` + +A list of usage examples is available in the `demo` folder of this repo as a [Databricks Notebook Archive (DBC)](demo/Hyperleaup-Demo.dbc) or [IPython Notebook](demo/Hyperleaup-Demo.ipynb). + ## Example usage The following code snippet creates a Tableau Hyper file from a Spark SQL statement and publishes it as a datasource to a Tableau Server. @@ -55,6 +59,20 @@ select * hf.append(sql=new_data) ``` +## Creation Mode +There are several options for how to create the Hyper file that can be set by adding argument `creation_mode` when initializing HyperFile instance. The default is PARQUET. + +| Mode | Description | Data Size | +| --- | --- | --- | +| PARQUET | Saves data to a single Parquet file then copies to Hyper file. | MEDIUM | +| COPY | Saves data to CSV format then copies to Hyper file. | MEDIUM | +| INSERT | Reads data into memory; more forgiving for null values. | SMALL | +| LARGEFILE | Saves data to multiple Parquet files then copies to Hyper file. | LARGE | + + +Example of setting creation mode: +`hf = HyperFile(name="transaction_history", sql=query, is_dbfs_enabled=True, creation_mode="PARQUET")` + ## Hyper File Options There is an optional `HyperFileConfig` that can be used to change default behaviors. - timestamp_with_timezone: diff --git a/demo/Hyperleaup-Demo.dbc b/demo/Hyperleaup-Demo.dbc index fe02869..7a4bd01 100644 Binary files a/demo/Hyperleaup-Demo.dbc and b/demo/Hyperleaup-Demo.dbc differ diff --git a/demo/Hyperleaup-Demo.ipynb b/demo/Hyperleaup-Demo.ipynb new file mode 100644 index 0000000..1b31b24 --- /dev/null +++ b/demo/Hyperleaup-Demo.ipynb @@ -0,0 +1,528 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "cae832a6-468a-4011-b0e3-1db8f0344769", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "%sh\n", + "/databricks/python3/bin/pip install --upgrade pip;\n", + "/databricks/python3/bin/pip install tableauhyperapi;\n", + "/databricks/python3/bin/pip install tableauserverclient;" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "d87c5fae-576e-447d-99d9-1cba4c9664de", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "from hyperleaup import HyperFile" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "744b10b5-24ce-46c6-9cbf-9c64ff9b56ad", + "showTitle": true, + "title": "Create Sample Data" + } + }, + "outputs": [], + "source": [ + "eye_colors = [\n", + " (1, 'BLK', 'Black'),\n", + " (2, 'BLU', 'Blue'),\n", + " (3, 'BRO', 'Brown'),\n", + " (4, 'GRN', 'Green'),\n", + " (5, 'GRY', 'Gray'),\n", + " (6, 'HAZ', 'Hazel'),\n", + " (7, 'BLK', 'Black'),\n", + " (8, 'XXX', 'Unknown')\n", + "]\n", + "spark.createDataFrame(eye_colors, ['id', 'eye_color_code', 'eye_color_desc']).createOrReplaceTempView('eye_colors')" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "bf9b4188-b1ad-4605-98c8-7bd8dbb4001b", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "query = \"SELECT * FROM eye_colors\"" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "24079bf4-5df3-4c39-be98-e39fad9c4db3", + "showTitle": true, + "title": "Create a Hyper File from Spark SQL" + } + }, + "outputs": [], + "source": [ + "hf = HyperFile(name=\"eye_colors\", sql=query, is_dbfs_enabled=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "3e6dce4d-c8d8-441f-8908-e89fdbb277b7", + "showTitle": true, + "title": "Create a Hyper File from a Spark DataFrame" + } + }, + "outputs": [], + "source": [ + "df = spark.createDataFrame(eye_colors, ['id', 'eye_color_code', 'eye_color_desc'])\n", + "hf_2 = HyperFile(name=\"more_eye_colors\", df=df, is_dbfs_enabled=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "55ec646f-5857-4137-81af-44cab0bcabac", + "showTitle": true, + "title": "Available Creation Modes" + } + }, + "outputs": [], + "source": [ + "# Hyperleaup supports 4 creation modes:\n", + "# 'parquet' - (default) save to single parquet file then copy\n", + "# 'copy' - use CSV format then copy) \n", + "# 'insert' - more forgiving for null values)\n", + "# 'largefile' - save to multiple Parquet files then copy\n", + "hf_3 = HyperFile(name=\"even_more_eye_colors\",\n", + " df=df,\n", + " is_dbfs_enabled=True,\n", + " creation_mode='insert')" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "6121b723-faac-419d-bb44-cf30e66284cb", + "showTitle": true, + "title": "Setting options with HyperFileConfig" + } + }, + "outputs": [], + "source": [ + "# Hyperleaup supports options for how to handle data types and null values. They are set by passing in a HyperFileConfig object.\n", + "\n", + "# timestamp_with_timezone: bool, True to use timestamptz datatype with HyperFile, \n", + "# enable if using timestamp values with Parquet create mode (default=False)\n", + "\n", + "# allow_nulls: bool, True to skip default behavior of replacing null numeric and\n", + "# strings with non-null values (default=False).\n", + "\n", + "# convert_decimal_precision: bool, True to automatically convert decimals with \n", + "# precision over 18 down to 18. This has risk of data truncation so manual \n", + "# testing of your decimals is suggested before use. (default=False)\n", + "\n", + "from hyperleaup import HyperFileConfig\n", + "\n", + "hf_config = HyperFileConfig(timestamp_with_timezone=True, allow_nulls=True,\n", + " convert_decimal_precision=True)\n", + "\n", + "hf_4 = HyperFile(name=\"plus_even_more_eye_colors\",\n", + " df=df,\n", + " is_dbfs_enabled=True,\n", + " config=hf_config)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "c7e3d3b5-2c19-4932-a459-11d00baa33b2", + "showTitle": true, + "title": "Viewing HyperFile Attributes" + } + }, + "outputs": [], + "source": [ + "hf.print_table_def()" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "24e122ac-20fd-4735-9ea1-f03d7865343a", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "print(hf.name)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "a1b884d9-739a-4dc6-8d29-e54d27b394b1", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "print(hf.sql)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "ee90fdd9-9971-417f-a35b-e6892098fa7e", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "hf.print_rows()" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "50c4a83a-ae00-47ec-b74f-11473dee13ed", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "display(hf.df)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "fa94e217-db2e-43b8-b0ef-a747866ee25a", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "print(hf.path)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "e9740227-43ef-403e-8c16-ba78bb2b10cd", + "showTitle": true, + "title": "Publish Hyper File to Tableau" + } + }, + "outputs": [], + "source": [ + "# Add your Tableau Server details here\n", + "# Note: you must have a site and project created before publishing\n", + "username = ''\n", + "password = ''\n", + "tableau_server = ''\n", + "site_id = ''\n", + "project_name = ''\n", + "datasource_name = ''\n", + "\n", + "# Publish the Hyper File!\n", + "luid = hf.publish(tableau_server_url=tableau_server,\n", + " username=username,\n", + " password=password,\n", + " site_id=site_id,\n", + " project_name=project_name,\n", + " datasource_name=datasource_name)\n", + "print(f'Published Hyper File as new datasource luid: {luid}')" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "49365eb8-fe65-43a2-8d5c-5161bb3b49a0", + "showTitle": true, + "title": "Save Hyper File to DBFS" + } + }, + "outputs": [], + "source": [ + "hf.save('/tmp/demo/')" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "8c8c19ac-e491-4827-91af-9313cb1e0f72", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "%fs ls /tmp/demo/" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "40c60cc7-2a4c-4a54-ae3a-32a3c4c83ae6", + "showTitle": true, + "title": "Load an Existing Hyper File" + } + }, + "outputs": [], + "source": [ + "hf = HyperFile.load(path='/tmp/demo/eye_colors.hyper', is_dbfs_enabled=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "ba1b3dea-b7a3-4ac9-a37d-91bfea5a432f", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "print(hf.path)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "7af5f9e1-682d-41a0-b11b-ccde6d78cb6b", + "showTitle": true, + "title": "Append new Data to Existing Hyper File" + } + }, + "outputs": [], + "source": [ + "# Create new data\n", + "eye_colors = [\n", + " (9, 'PNK', 'Pink'),\n", + " (10, 'PUR', 'Purple'),\n", + " (11, 'YEL', 'Yellow')\n", + "]\n", + "df = spark.createDataFrame(eye_colors, ['id', 'eye_color_code', 'eye_color_desc'])" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "a34bbf02-bd86-40ae-82e9-8ee3b0220deb", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# append to an existing Hyper File\n", + "hf.append(df=df)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "1da5c6be-b0d1-49e0-8b90-4ce1241c28b3", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "hf.print_rows()" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "dashboards": [], + "language": "python", + "notebookMetadata": { + "mostRecentlyExecutedCommandWithImplicitDF": { + "commandId": 2488707797413206, + "dataframes": [ + "_sqldf" + ] + }, + "pythonIndentUnit": 2 + }, + "notebookName": "Hyperleaup-Demo", + "widgets": {} + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/dist/hyperleaup-0.1.1.tar.gz b/dist/hyperleaup-0.1.1.tar.gz deleted file mode 100644 index 0b3dbdc..0000000 Binary files a/dist/hyperleaup-0.1.1.tar.gz and /dev/null differ diff --git a/dist/hyperleaup-0.1.1-py3-none-any.whl b/dist/hyperleaup-0.1.2-py3-none-any.whl similarity index 53% rename from dist/hyperleaup-0.1.1-py3-none-any.whl rename to dist/hyperleaup-0.1.2-py3-none-any.whl index e3bf93b..3a60a7f 100644 Binary files a/dist/hyperleaup-0.1.1-py3-none-any.whl and b/dist/hyperleaup-0.1.2-py3-none-any.whl differ diff --git a/dist/hyperleaup-0.1.2.tar.gz b/dist/hyperleaup-0.1.2.tar.gz new file mode 100644 index 0000000..55865f0 Binary files /dev/null and b/dist/hyperleaup-0.1.2.tar.gz differ diff --git a/pyproject.toml b/pyproject.toml index 7522dc6..22f7a26 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "hyperleaup" -version = "0.1.1" +version = "0.1.2" authors = [ { name="Will Girten", email="will.girten@databricks.com" }, ] diff --git a/src/hyperleaup/creation_mode.py b/src/hyperleaup/creation_mode.py index 10b1f4e..4837665 100644 --- a/src/hyperleaup/creation_mode.py +++ b/src/hyperleaup/creation_mode.py @@ -5,3 +5,4 @@ class CreationMode(Enum): INSERT = "INSERT" COPY = "COPY" PARQUET = "PARQUET" + LARGEFILE = "LARGEFILE" diff --git a/src/hyperleaup/creator.py b/src/hyperleaup/creator.py index 378f566..c4b57ae 100644 --- a/src/hyperleaup/creator.py +++ b/src/hyperleaup/creator.py @@ -10,7 +10,7 @@ from tableauhyperapi import SqlType, TableDefinition, NULLABLE, NOT_NULLABLE, TableName, HyperProcess, Telemetry, \ Inserter, Connection, CreateMode from pathlib import Path - +from databricks.sdk.runtime import * def clean_dataframe(df: DataFrame, allow_nulls=False, convert_decimal_precision=False) -> DataFrame: """Replaces null or NaN values with '' and 0s""" @@ -242,15 +242,13 @@ def write_parquet_to_local_file_system(df: DataFrame, name: str, allow_nulls: bo def write_parquet_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert_decimal_precision = False) -> str: - """Moves a Parquet file written to a Databricks Filesystem to a temp directory on the driver node.""" + """Writes and moves a single Parquet file written to a Databricks Filesystem to a temp directory on the driver node.""" tmp_dir = f"/tmp/hyperleaup/{name}/" cleaned_df = clean_dataframe(df, allow_nulls, convert_decimal_precision) # write the DataFrame to DBFS as a single Parquet file cleaned_df.coalesce(1).write \ - .option("delimiter", ",") \ - .option("header", "true") \ .mode("overwrite").parquet(tmp_dir) dbfs_tmp_dir = "/dbfs" + tmp_dir @@ -274,6 +272,57 @@ def write_parquet_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert return dest_path +def write_parquet_multifile_to_dbfs(df: DataFrame, name: str, allow_nulls = False, convert_decimal_precision = False) -> str: + """For improved performance for large datasets, writes and moves multiple Parquet files written to a Databricks Filesystem + to a temp directory on the driver node.""" + tmp_dir = f"/tmp/hyperleaup/{name}/" + + cleaned_df = clean_dataframe(df, allow_nulls, convert_decimal_precision) + + # write the DataFrame to DBFS as a single Parquet file + cleaned_df.write \ + .mode("overwrite").parquet(tmp_dir) + + # Prepare location for hyper file + if not os.path.exists(tmp_dir): + os.makedirs(tmp_dir) + + dbfs_tmp_dir = "/dbfs" + tmp_dir + files = dbutils.fs.ls(tmp_dir) + if files is None: + raise FileNotFoundError(f"Parquet path '{tmp_dir}' not found on DBFS.") + + parquet_files = [] + for item in files: + if item.name.endswith(".parquet"): + parquet_files.append(item.name) + src_path = dbfs_tmp_dir + item.name + dest_path = tmp_dir + item.name + copyfile(src_path, dest_path) + + return [f"'{tmp_dir}/{parquet_file}'" for parquet_file in parquet_files] + + +def copy_parquet_array_to_hyper_file(parquet_paths: list[str], name: str, table_def: TableDefinition) -> str: + """Helper function that copies data from a Parquet file to a .hyper file.""" + hyper_database_path = f"/tmp/hyperleaup/{name}/{name}.hyper" + + with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hp: + with Connection(endpoint=hp.endpoint, + database=Path(hyper_database_path), + create_mode=CreateMode.CREATE_AND_REPLACE) as connection: + + connection.catalog.create_schema(schema=table_def.table_name.schema_name) + connection.catalog.create_table(table_definition=table_def) + + array_parquet_path = f"ARRAY[{','.join(parquet_paths)}]" + + copy_command = f"COPY \"Extract\".\"Extract\" from {array_parquet_path} with (format parquet)" + count = connection.execute_command(copy_command) + logging.info(f"Copied {count} rows.") + + return hyper_database_path + class Creator: def __init__(self, df: DataFrame, name: str, @@ -346,6 +395,20 @@ def create(self) -> str: logging.info("Copying data into Hyper File...") database_path = copy_parquet_to_hyper_file(parquet_path, self.name, table_def) + elif self.creation_mode.upper() == CreationMode.LARGEFILE.value: + + # Write Spark DataFrame to Parquet so that a file COPY can be done + logging.info("Writing Spark DataFrame to multiple Parquet files...") + parquet_paths = write_parquet_multifile_to_dbfs(self.df, self.name, self.config.allow_nulls, self.config.convert_decimal_precision) + + # Convert the Spark DataFrame schema to a Tableau `TableDefinition` + logging.info("Generating Tableau Table Definition...") + table_def = get_table_def(self.df, "Extract", "Extract", self.config.timestamp_with_timezone) + + # COPY data into a Tableau .hyper file + logging.info("Copying data into Hyper File...") + database_path = copy_parquet_array_to_hyper_file(parquet_paths, self.name, table_def) + else: raise ValueError(f'Invalid "creation_mode" specified: {self.creation_mode}') diff --git a/tests/test_creator.py b/tests/test_creator.py index 4941a13..35ff0cc 100644 --- a/tests/test_creator.py +++ b/tests/test_creator.py @@ -206,3 +206,27 @@ def test_create_with_options(self, is_dbfs_enabled=False): assert(len(tables) == 1) num_rows = TestUtils.get_row_count("Extract", "Extract", "/tmp/hyperleaup/employees2/employees2.hyper") assert(num_rows == 5) + + def test_create_largefile(self, is_dbfs_enabled=False): + + data = [ + (1001, "Jane", "Doe", "2000-05-01", 29.0, False), + (1002, "John", "Doe", "1988-05-03", 33.0, False), + (2201, "Elonzo", "Smith", "1990-05-03", 21.0, True), + (2202, None, None, "1980-05-03", 45.0, False), # Add a few nulls + (2235, "", "", "1980-05-03", 43.0, True) + + ] + df = get_spark_session().createDataFrame(data, ["id", "first_name", "last_name", "dob", "age", "is_temp"]) + + hf_config = HyperFileConfig(timestamp_with_timezone=True, allow_nulls=True, convert_decimal_precision=True) + + creator = Creator(df=df, name='employees3', is_dbfs_enabled=is_dbfs_enabled, + creation_mode="LARGEFILE", config=hf_config) + hyper_file_path = creator.create() + + assert(hyper_file_path == "/tmp/hyperleaup/employees3/employees3.hyper") + tables = TestUtils.get_tables("Extract", "/tmp/hyperleaup/employees3/employees3.hyper") + assert(len(tables) == 1) + num_rows = TestUtils.get_row_count("Extract", "Extract", "/tmp/hyperleaup/employees3/employees3.hyper") + assert(num_rows == 5)