diff --git a/nbs/docs/how-to-guides/0_computing_at_scale.ipynb b/nbs/docs/how-to-guides/0_computing_at_scale.ipynb new file mode 100644 index 00000000..14fd9f39 --- /dev/null +++ b/nbs/docs/how-to-guides/0_computing_at_scale.ipynb @@ -0,0 +1,105 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Computing at Scale with TimeGPT" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Handling large datasets is a common challenge in time series forecasting. For example, when working with retail data, you may have to forecast sales for thousands of products across hundreds of stores. Similarly, when dealing with electricity consumption data, you may need to predict consumption for thousands of households across various regions.\n", + "\n", + "Nixtla's `TimeGPT` enables you to use several distributed computing frameworks to manage large datasets efficiently. `TimeGPT` currently supports `Spark`, `Dask`, and `Ray` through `Fugue`.\n", + "\n", + "In this notebook, we will explain how to leverage these frameworks using `TimeGPT`. \n", + "\n", + "**Outline:**\n", + "1. [Getting Started](#1-getting-started)\n", + "2. [Forecasting at Scale](#2-forecasting-at-scale) \n", + "3. [Important Considerations](#3-important-considerations) " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Getting started \n", + "\n", + "To use `TimeGPT` with any of the supported distributed computing frameworks, you first need an API Key, just as you would when not using any distributed computing.\n", + "\n", + "Upon [registration](https://dashboard.nixtla.io/), you will receive an email asking you to confirm your signup. After confirming, you will receive access to your dashboard. There, under`API Keys`, you will find your API Key. Next, you need to integrate your API Key into your development workflow with the Nixtla SDK. For guidance on how to do this, please refer to the [Setting Up Your Authentication Key tutorial](https://docs.nixtla.io/docs/setting_up_your_authentication_api_key)." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Forecasting at Scale " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Using `TimeGPT` with any of the supported distributed computing frameworks is straightforward, as `TimeGPT` will read a `pandas` DataFrame and then use the corresponding framework. Thus, the usage is almost identical to the non-distributed case. \n", + "\n", + "1. Instantiate a `NixtlaClient` class.\n", + "2. Load your data as a `pandas` DataFrame.\n", + "3. Initialize the distributed computing framework. \n", + " - [Spark](https://docs.nixtla.io/docs/1_computing_at_scale_spark)\n", + " - [Dask](https://docs.nixtla.io/docs/2_computing_at_scale_dask)\n", + " - [Ray](https://docs.nixtla.io/docs/3_computing_at_scale_ray)\n", + "4. Use any of the `NixtlaClient` class methods.\n", + "5. Stop the distributed computing framework, if necessary. \n", + "\n", + "These are the general steps that you will need to follow to use `TimeGPT` with any of the supported distributed computing frameworks. For a detailed explanation and a complete example, please refer to the guide for the specific framework linked above." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "::: {.callout-important}\n", + "Parallelization in these frameworks is done along the various time series within your dataset. Therefore, it is essential that your dataset includes multiple time series, each with a unique id. \n", + ":::" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Important Considerations " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### When to Use a Distributed Computing Framework\n", + "\n", + "Consider using a distributed computing framework if your dataset:\n", + "\n", + "- Consists of millions of observations over multiple time series.\n", + "- Is too large to fit into the memory of a single machine.\n", + "- Would be too slow to process on a single machine.\n", + "\n", + "### Choosing the Right Framework\n", + "\n", + "When selecting a distributed computing framework, take into account your existing infrastructure and the skill set of your team. Although `TimeGPT` can be used with any of the supported frameworks with minimal code changes, choosing the right one should align with your specific needs and resources. This will ensure that you leverage the full potential of `TimeGPT` while handling large datasets efficiently." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "python3", + "language": "python", + "name": "python3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/nbs/docs/how-to-guides/0_distributed_fcst_spark.ipynb b/nbs/docs/how-to-guides/0_distributed_fcst_spark.ipynb deleted file mode 100644 index 054d0335..00000000 --- a/nbs/docs/how-to-guides/0_distributed_fcst_spark.ipynb +++ /dev/null @@ -1,519 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "5ff81b5a-514d-4d8b-953e-c8f7cb4ba215", - "metadata": {}, - "source": [ - "# How to on Spark: Forecasting\n", - "> Run TimeGPT distributedly on top of Spark.\n", - "\n", - "`TimeGPT` works on top of Spark, Dask, and Ray through Fugue. `TimeGPT` will read the input DataFrame and use the corresponding engine. For example, if the input is a Spark DataFrame, StatsForecast will use the existing Spark session to run the forecast.\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a3119cd0-9b9d-4df9-9779-005847c46048", - "metadata": {}, - "outputs": [], - "source": [ - "#| hide\n", - "from nixtla.utils import colab_badge" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "dbd11fae-3219-4ffc-b2de-a96542362d58", - "metadata": {}, - "outputs": [ - { - "data": { - "text/markdown": [ - "[![](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/Nixtla/nixtla/blob/main/nbs/docs/how-to-guides/0_distributed_fcst_spark.ipynb)" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], - "source": [ - "#| echo: false\n", - "colab_badge('docs/how-to-guides/0_distributed_fcst_spark')" - ] - }, - { - "cell_type": "markdown", - "id": "361d702c-361f-4321-85d3-2b76fb7b4937", - "metadata": {}, - "source": [ - "# Installation " - ] - }, - { - "cell_type": "markdown", - "id": "f2854f3c-7dc4-4615-9a85-7d7762fea647", - "metadata": {}, - "source": [ - "As long as Spark is installed and configured, `TimeGPT` will be able to use it. If executing on a distributed Spark cluster, make use the `nixtla` library is installed across all the workers." - ] - }, - { - "cell_type": "markdown", - "id": "743b89bd-6406-4f90-b545-2bd84a8ae62a", - "metadata": {}, - "source": [ - "## Executing on Spark" - ] - }, - { - "cell_type": "markdown", - "id": "b18574a5-76f8-4156-8264-9adae43e715d", - "metadata": {}, - "source": [ - "To run the forecasts distributed on Spark, just pass in a Spark DataFrame instead. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "434c950c-6252-4696-8ea8-2e1bb865847d", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "True" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "#| hide\n", - "import os\n", - "\n", - "import pandas as pd\n", - "from dotenv import load_dotenv\n", - "\n", - "load_dotenv()" - ] - }, - { - "cell_type": "markdown", - "id": "c5b9207c-29d1-4034-8d2e-223abc831cf1", - "metadata": {}, - "source": [ - "Instantiate `NixtlaClient` class." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "fcf6004b-ebd0-4a3c-8c02-d5463c62f79e", - "metadata": {}, - "outputs": [], - "source": [ - "from nixtla import NixtlaClient" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "bec2b1fb-74fb-4464-b57b-84c676cb997c", - "metadata": {}, - "outputs": [], - "source": [ - "nixtla_client = NixtlaClient(\n", - " # defaults to os.environ.get(\"NIXTLA_API_KEY\")\n", - " api_key = 'my_api_key_provided_by_nixtla'\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "57091ace-6068-410d-82fc-2b0bd20d63c1", - "metadata": {}, - "outputs": [], - "source": [ - "#| hide\n", - "nixtla_client = NixtlaClient()" - ] - }, - { - "cell_type": "markdown", - "id": "357aade9-ffaa-44c6-b9cb-48be7bda71f4", - "metadata": {}, - "source": [ - "Use Spark as an engine." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a7644af0-f628-46ea-8fb7-474ee2fca39e", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "Setting default log level to \"WARN\".\n", - "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", - "24/04/01 03:34:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" - ] - } - ], - "source": [ - "from pyspark.sql import SparkSession\n", - "\n", - "spark = SparkSession.builder.getOrCreate()" - ] - }, - { - "cell_type": "markdown", - "id": "395152be-c5c7-46bb-85d8-da739d470834", - "metadata": {}, - "source": [ - "### Forecast" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "21ac9c73-6644-47be-884c-23a682844e32", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - " \r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+---------+-------------------+-----+\n", - "|unique_id| ds| y|\n", - "+---------+-------------------+-----+\n", - "| BE|2016-12-01 00:00:00| 72.0|\n", - "| BE|2016-12-01 01:00:00| 65.8|\n", - "| BE|2016-12-01 02:00:00|59.99|\n", - "| BE|2016-12-01 03:00:00|50.69|\n", - "| BE|2016-12-01 04:00:00|52.58|\n", - "+---------+-------------------+-----+\n", - "only showing top 5 rows\n", - "\n" - ] - } - ], - "source": [ - "url_df = 'https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short.csv'\n", - "spark_df = spark.createDataFrame(pd.read_csv(url_df))\n", - "spark_df.show(5)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "305167a0-1984-4004-aea3-b97402832491", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+---------+-------------------+------------------+\n", - "|unique_id| ds| TimeGPT|\n", - "+---------+-------------------+------------------+\n", - "| FR|2016-12-31 00:00:00|62.130218505859375|\n", - "| FR|2016-12-31 01:00:00|56.890830993652344|\n", - "| FR|2016-12-31 02:00:00| 52.23155212402344|\n", - "| FR|2016-12-31 03:00:00| 48.88866424560547|\n", - "| FR|2016-12-31 04:00:00| 46.49836730957031|\n", - "+---------+-------------------+------------------+\n", - "only showing top 5 rows\n", - "\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - " \r" - ] - } - ], - "source": [ - "fcst_df = nixtla_client.forecast(spark_df, h=12)\n", - "fcst_df.show(5)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "cfdb15c0-1e13-43fc-a3ea-8f75b87cd71b", - "metadata": {}, - "outputs": [], - "source": [ - "#| hide\n", - "from fastcore.test import test_fail" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ce55c5fa-ddd7-454a-aa43-697aa8c805d8", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - " \r" - ] - } - ], - "source": [ - "#| hide\n", - "# test different results for different models\n", - "fcst_df_1 = fcst_df.toPandas()\n", - "fcst_df_2 = nixtla_client.forecast(spark_df, h=12, model='timegpt-1-long-horizon')\n", - "fcst_df_2 = fcst_df_2.toPandas()\n", - "test_fail(\n", - " lambda: pd.testing.assert_frame_equal(fcst_df_1[['TimeGPT']], fcst_df_2[['TimeGPT']]),\n", - " contains='(column name=\"TimeGPT\") are different'\n", - ")" - ] - }, - { - "cell_type": "markdown", - "id": "2008fbf0-9bd2-4974-904b-bb8dc90876e6", - "metadata": {}, - "source": [ - "### Forecast with exogenous variables" - ] - }, - { - "cell_type": "markdown", - "id": "1d281c8d-3a5c-4b3e-8468-7699ef44933b", - "metadata": {}, - "source": [ - "Exogenous variables or external factors are crucial in time series forecasting as they provide additional information that might influence the prediction. These variables could include holiday markers, marketing spending, weather data, or any other external data that correlate with the time series data you are forecasting.\n", - "\n", - "For example, if you're forecasting ice cream sales, temperature data could serve as a useful exogenous variable. On hotter days, ice cream sales may increase.\n", - "\n", - "To incorporate exogenous variables in TimeGPT, you'll need to pair each point in your time series data with the corresponding external data.\n", - "\n", - "Let's see an example." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "8b0d7fd4-5d69-4b6e-b065-efeba63f5911", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+---------+-------------------+-----+----------+----------+-----+-----+-----+-----+-----+-----+-----+\n", - "|unique_id| ds| y|Exogenous1|Exogenous2|day_0|day_1|day_2|day_3|day_4|day_5|day_6|\n", - "+---------+-------------------+-----+----------+----------+-----+-----+-----+-----+-----+-----+-----+\n", - "| BE|2016-12-01 00:00:00| 72.0| 61507.0| 71066.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|\n", - "| BE|2016-12-01 01:00:00| 65.8| 59528.0| 67311.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|\n", - "| BE|2016-12-01 02:00:00|59.99| 58812.0| 67470.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|\n", - "| BE|2016-12-01 03:00:00|50.69| 57676.0| 64529.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|\n", - "| BE|2016-12-01 04:00:00|52.58| 56804.0| 62773.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|\n", - "+---------+-------------------+-----+----------+----------+-----+-----+-----+-----+-----+-----+-----+\n", - "only showing top 5 rows\n", - "\n" - ] - } - ], - "source": [ - "df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short-with-ex-vars.csv')\n", - "spark_df = spark.createDataFrame(df)\n", - "spark_df.show(5)" - ] - }, - { - "cell_type": "markdown", - "id": "5172dc4a-66dd-47dd-a30d-228bc2f14317", - "metadata": {}, - "source": [ - "To produce forecasts we have to add the future values of the exogenous variables. Let's read this dataset. In this case we want to predict 24 steps ahead, therefore each unique id will have 24 observations." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a8697301-e53b-446b-a965-6f57383d1d2c", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+---------+-------------------+----------+----------+-----+-----+-----+-----+-----+-----+-----+\n", - "|unique_id| ds|Exogenous1|Exogenous2|day_0|day_1|day_2|day_3|day_4|day_5|day_6|\n", - "+---------+-------------------+----------+----------+-----+-----+-----+-----+-----+-----+-----+\n", - "| BE|2016-12-31 00:00:00| 64108.0| 70318.0| 0.0| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0|\n", - "| BE|2016-12-31 01:00:00| 62492.0| 67898.0| 0.0| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0|\n", - "| BE|2016-12-31 02:00:00| 61571.0| 68379.0| 0.0| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0|\n", - "| BE|2016-12-31 03:00:00| 60381.0| 64972.0| 0.0| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0|\n", - "| BE|2016-12-31 04:00:00| 60298.0| 62900.0| 0.0| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0|\n", - "+---------+-------------------+----------+----------+-----+-----+-----+-----+-----+-----+-----+\n", - "only showing top 5 rows\n", - "\n" - ] - } - ], - "source": [ - "future_ex_vars_df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short-future-ex-vars.csv')\n", - "spark_future_ex_vars_df = spark.createDataFrame(future_ex_vars_df)\n", - "spark_future_ex_vars_df.show(5)" - ] - }, - { - "cell_type": "markdown", - "id": "66ec94e4-98c5-48ee-ad2f-d6996e82b758", - "metadata": {}, - "source": [ - "Let's call the `forecast` method, adding this information:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "b3c51169-3561-4d00-adba-fd6e49ab6c24", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Using the following exogenous variables: Exogenous1, Exogenous2, day_0, day_1, day_2, day_3, day_4, day_5, day_6\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Using the following exogenous variables: Exogenous1, Exogenous2, day_0, day_1, day_2, day_3, day_4, day_5, day_6\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "[Stage 33:=====================================================> (19 + 1) / 20]\r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+---------+-------------------+------------------+------------------+------------------+-----------------+-----------------+\n", - "|unique_id| ds| TimeGPT| TimeGPT-lo-90| TimeGPT-lo-80| TimeGPT-hi-80| TimeGPT-hi-90|\n", - "+---------+-------------------+------------------+------------------+------------------+-----------------+-----------------+\n", - "| FR|2016-12-31 00:00:00| 59.39155162090687| 54.47111514324573| 56.13039408916859|62.65270915264515| 64.311988098568|\n", - "| FR|2016-12-31 01:00:00| 60.1843929541434|56.167005220683926|56.778585672649264|63.59020023563754|64.20178068760288|\n", - "| FR|2016-12-31 02:00:00| 58.12912691907976| 53.55469655256365| 55.23512607984636|61.02312775831316|62.70355728559587|\n", - "| FR|2016-12-31 03:00:00|53.825965179940155| 46.31002742817014| 50.66449432422726|56.98743603565305|61.34190293171017|\n", - "| FR|2016-12-31 04:00:00| 47.6941769331486| 38.21902702317546| 42.94538668046305|52.44296718583414|57.16932684312174|\n", - "+---------+-------------------+------------------+------------------+------------------+-----------------+-----------------+\n", - "only showing top 5 rows\n", - "\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - " \r" - ] - } - ], - "source": [ - "timegpt_fcst_ex_vars_df = nixtla_client.forecast(df=spark_df, X_df=spark_future_ex_vars_df, h=24, level=[80, 90])\n", - "timegpt_fcst_ex_vars_df.show(5)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "620ef1e3-da4f-4949-bf12-6fd3727dfec6", - "metadata": {}, - "outputs": [], - "source": [ - "spark.stop()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "python3", - "language": "python", - "name": "python3" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/nbs/docs/how-to-guides/1_computing_at_scale_spark_distributed.ipynb b/nbs/docs/how-to-guides/1_computing_at_scale_spark_distributed.ipynb new file mode 100644 index 00000000..4ecece4b --- /dev/null +++ b/nbs/docs/how-to-guides/1_computing_at_scale_spark_distributed.ipynb @@ -0,0 +1,473 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Computing at Scale with Spark \n", + "\n", + "> Run TimeGPT distributedly on top of Spark" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Spark is an open-source distributed computing framework designed for large-scale data processing. In this guide, we will explain how to use `TimeGPT` on top of Spark. \n", + "\n", + "**Outline:** \n", + "1. [Installation](#installation)\n", + "2. [Load Your Data](#load-your-data)\n", + "3. [Initialize Spark](#initialize-spark) \n", + "4. [Use TimeGPT on Spark](#use-timegpt-on-spark)\n", + "5. [Stop Spark](#stop-spark)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#| hide\n", + "from nixtla.utils import colab_badge" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/markdown": [ + "[![](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/Nixtla/nixtla/blob/main/nbs/docs/how-to-guides/1_computing_at_scale_with_spark.ipynb)" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "#| echo: false\n", + "colab_badge('docs/how-to-guides/1_computing_at_scale_with_spark_distributed')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Installation " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Install Spark through [Fugue](https://fugue-tutorials.readthedocs.io/). Fugue provides an easy-to-use interface for distributed computing that lets users execute Python code on top of several distributed computing frameworks, including Spark. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%capture \n", + "pip install \"fugue[spark]\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "If executing on a distributed `Spark` cluster, ensure that the `nixtla` library is installed across all the workers." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load Data " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can load your data as a `pandas` DataFrame. In this tutorial, we will use a dataset that contains hourly electricity prices from different markets. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
unique_iddsy
0BE2016-12-01 00:00:0072.00
1BE2016-12-01 01:00:0065.80
2BE2016-12-01 02:00:0059.99
3BE2016-12-01 03:00:0050.69
4BE2016-12-01 04:00:0052.58
\n", + "
" + ], + "text/plain": [ + " unique_id ds y\n", + "0 BE 2016-12-01 00:00:00 72.00\n", + "1 BE 2016-12-01 01:00:00 65.80\n", + "2 BE 2016-12-01 02:00:00 59.99\n", + "3 BE 2016-12-01 03:00:00 50.69\n", + "4 BE 2016-12-01 04:00:00 52.58" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import pandas as pd \n", + "\n", + "df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short.csv') \n", + "df.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Initialize Spark " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Initialize `Spark` and convert the pandas DataFrame to a `Spark` DataFrame. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "24/04/24 23:55:22 WARN Utils: Your hostname, Marianas-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.68.101 instead (on interface en0)\n", + "24/04/24 23:55:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address\n", + "Setting default log level to \"WARN\".\n", + "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", + "24/04/24 23:55:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" + ] + } + ], + "source": [ + "from pyspark.sql import SparkSession\n", + "\n", + "spark = SparkSession.builder.getOrCreate()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------+-------------------+-----+\n", + "|unique_id| ds| y|\n", + "+---------+-------------------+-----+\n", + "| BE|2016-12-01 00:00:00| 72.0|\n", + "| BE|2016-12-01 01:00:00| 65.8|\n", + "| BE|2016-12-01 02:00:00|59.99|\n", + "| BE|2016-12-01 03:00:00|50.69|\n", + "| BE|2016-12-01 04:00:00|52.58|\n", + "+---------+-------------------+-----+\n", + "only showing top 5 rows\n", + "\n" + ] + } + ], + "source": [ + "spark_df = spark.createDataFrame(df)\n", + "spark_df.show(5)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Use TimeGPT on Spark " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Using `TimeGPT` on top of `Spark` is almost identical to the non-distributed case. The only difference is that you need to use a `Spark` DataFrame. \n", + "\n", + "First, instantiate the `NixtlaClient` class. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from nixtla import NixtlaClient" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from nixtla import NixtlaClient\n", + "\n", + "nixtla_client = NixtlaClient(\n", + " # defaults to os.environ.get(\"NIXTLA_API_KEY\")\n", + " api_key = 'my_api_key_provided_by_nixtla'\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#| hide \n", + "nixtla_client = NixtlaClient()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Then use any method from the `NixtlaClient` class such as [`forecast`](https://nixtlaverse.nixtla.io/nixtla/nixtla_client.html#nixtlaclient-forecast) or [`cross_validation`](https://nixtlaverse.nixtla.io/nixtla/nixtla_client.html#nixtlaclient-cross-validation)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:nixtla.nixtla_client:Validating inputs... (0 + 1) / 1]\n", + "INFO:nixtla.nixtla_client:Preprocessing dataframes...\n", + "INFO:nixtla.nixtla_client:Inferred freq: H\n", + "INFO:nixtla.nixtla_client:Calling Forecast Endpoint...\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------+-------------------+------------------+\n", + "|unique_id| ds| TimeGPT|\n", + "+---------+-------------------+------------------+\n", + "| FR|2016-12-31 00:00:00|62.130218505859375|\n", + "| FR|2016-12-31 01:00:00|56.890830993652344|\n", + "| FR|2016-12-31 02:00:00| 52.23155212402344|\n", + "| FR|2016-12-31 03:00:00| 48.88866424560547|\n", + "| FR|2016-12-31 04:00:00| 46.49836730957031|\n", + "+---------+-------------------+------------------+\n", + "only showing top 5 rows\n", + "\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + } + ], + "source": [ + "fcst_df = nixtla_client.forecast(spark_df, h=12)\n", + "fcst_df.show(5)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:nixtla.nixtla_client:Validating inputs... (0 + 1) / 1]\n", + "INFO:nixtla.nixtla_client:Inferred freq: H\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Preprocessing dataframes...\n", + "INFO:nixtla.nixtla_client:Inferred freq: H\n", + "INFO:nixtla.nixtla_client:Calling Forecast Endpoint...\n", + "24/04/24 23:55:35 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Preprocessing dataframes...\n", + "INFO:nixtla.nixtla_client:Inferred freq: H\n", + "INFO:nixtla.nixtla_client:Calling Forecast Endpoint...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Preprocessing dataframes...\n", + "INFO:nixtla.nixtla_client:Inferred freq: H\n", + "INFO:nixtla.nixtla_client:Calling Forecast Endpoint...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Preprocessing dataframes...\n", + "INFO:nixtla.nixtla_client:Inferred freq: H\n", + "INFO:nixtla.nixtla_client:Calling Forecast Endpoint...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Preprocessing dataframes...\n", + "INFO:nixtla.nixtla_client:Inferred freq: H\n", + "INFO:nixtla.nixtla_client:Calling Forecast Endpoint...\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------+-------------------+-------------------+------------------+\n", + "|unique_id| ds| cutoff| TimeGPT|\n", + "+---------+-------------------+-------------------+------------------+\n", + "| FR|2016-12-30 04:00:00|2016-12-30 03:00:00| 44.89373779296875|\n", + "| FR|2016-12-30 05:00:00|2016-12-30 03:00:00| 46.05792999267578|\n", + "| FR|2016-12-30 06:00:00|2016-12-30 03:00:00|48.790077209472656|\n", + "| FR|2016-12-30 07:00:00|2016-12-30 03:00:00| 54.39702606201172|\n", + "| FR|2016-12-30 08:00:00|2016-12-30 03:00:00|57.592994689941406|\n", + "+---------+-------------------+-------------------+------------------+\n", + "only showing top 5 rows\n", + "\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:nixtla.nixtla_client:Validating inputs...\n", + " \r" + ] + } + ], + "source": [ + "cv_df = nixtla_client.cross_validation(spark_df, h=12, n_windows=5, step_size=2)\n", + "cv_df.show(5)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can also use exogenous variables with `TimeGPT` on top of `Spark`. To do this, please refer to the [Exogenous Variables](https://nixtlaverse.nixtla.io/nixtla/docs/tutorials/exogenous_variables.html) tutorial. Just keep in mind that instead of using a pandas DataFrame, you need to use a `Spark` DataFrame instead." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Stop Spark " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "When you are done, stop the `Spark` session. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "spark.stop()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "python3", + "language": "python", + "name": "python3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/nbs/docs/how-to-guides/1_distributed_cv_spark.ipynb b/nbs/docs/how-to-guides/1_distributed_cv_spark.ipynb deleted file mode 100644 index dc64e4af..00000000 --- a/nbs/docs/how-to-guides/1_distributed_cv_spark.ipynb +++ /dev/null @@ -1,458 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "5ff81b5a-514d-4d8b-953e-c8f7cb4ba215", - "metadata": {}, - "source": [ - "# How to on Spark: Cross Validation\n", - "> Run TimeGPT distributedly on top of Spark.\n", - "\n", - "`TimeGPT` works on top of Spark, Dask, and Ray through Fugue. `TimeGPT` will read the input DataFrame and use the corresponding engine. For example, if the input is a Spark DataFrame, StatsForecast will use the existing Spark session to run the forecast.\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "5051a20b-716a-4e83-ab9a-6472c7e4a4fa", - "metadata": {}, - "outputs": [], - "source": [ - "#| hide\n", - "from nixtla.utils import colab_badge" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "9ec6d4ad-7514-4ee9-8ca5-2ef027c45e6a", - "metadata": {}, - "outputs": [ - { - "data": { - "text/markdown": [ - "[![](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/Nixtla/nixtla/blob/main/nbs/docs/how-to-guides/1_distributed_cv_spark.ipynb)" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], - "source": [ - "#| echo: false\n", - "colab_badge('docs/how-to-guides/1_distributed_cv_spark')" - ] - }, - { - "cell_type": "markdown", - "id": "361d702c-361f-4321-85d3-2b76fb7b4937", - "metadata": {}, - "source": [ - "# Installation " - ] - }, - { - "cell_type": "markdown", - "id": "f2854f3c-7dc4-4615-9a85-7d7762fea647", - "metadata": {}, - "source": [ - "As long as Spark is installed and configured, `TimeGPT` will be able to use it. If executing on a distributed Spark cluster, make use the `nixtla` library is installed across all the workers." - ] - }, - { - "cell_type": "markdown", - "id": "743b89bd-6406-4f90-b545-2bd84a8ae62a", - "metadata": {}, - "source": [ - "## Executing on Spark" - ] - }, - { - "cell_type": "markdown", - "id": "b18574a5-76f8-4156-8264-9adae43e715d", - "metadata": {}, - "source": [ - "To run the forecasts distributed on Spark, just pass in a Spark DataFrame instead. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "434c950c-6252-4696-8ea8-2e1bb865847d", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "True" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "#| hide\n", - "import os\n", - "\n", - "import pandas as pd\n", - "from dotenv import load_dotenv\n", - "\n", - "load_dotenv()" - ] - }, - { - "cell_type": "markdown", - "id": "c5b9207c-29d1-4034-8d2e-223abc831cf1", - "metadata": {}, - "source": [ - "Instantiate `NixtlaClient` class." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "21bbe459-ed98-4ac1-8da7-2287305b3680", - "metadata": {}, - "outputs": [], - "source": [ - "from nixtla import NixtlaClient" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "97681b52-4e0e-420d-bcb9-e616dbd3b1b3", - "metadata": {}, - "outputs": [], - "source": [ - "nixtla_client = NixtlaClient(\n", - " # defaults to os.environ.get(\"NIXTLA_API_KEY\")\n", - " api_key = 'my_api_key_provided_by_nixtla'\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "0d1f3709-0f62-424c-a066-41efb1bfa2fe", - "metadata": {}, - "outputs": [], - "source": [ - "#| hide\n", - "nixtla_client = NixtlaClient()" - ] - }, - { - "cell_type": "markdown", - "id": "357aade9-ffaa-44c6-b9cb-48be7bda71f4", - "metadata": {}, - "source": [ - "Use Spark as an engine." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a7644af0-f628-46ea-8fb7-474ee2fca39e", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "Setting default log level to \"WARN\".\n", - "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", - "24/04/01 03:35:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" - ] - } - ], - "source": [ - "from pyspark.sql import SparkSession\n", - "\n", - "spark = SparkSession.builder.getOrCreate()" - ] - }, - { - "cell_type": "markdown", - "id": "395152be-c5c7-46bb-85d8-da739d470834", - "metadata": {}, - "source": [ - "### Cross validation" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "21ac9c73-6644-47be-884c-23a682844e32", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - " \r" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+---------+-------------------+-----+\n", - "|unique_id| ds| y|\n", - "+---------+-------------------+-----+\n", - "| BE|2016-12-01 00:00:00| 72.0|\n", - "| BE|2016-12-01 01:00:00| 65.8|\n", - "| BE|2016-12-01 02:00:00|59.99|\n", - "| BE|2016-12-01 03:00:00|50.69|\n", - "| BE|2016-12-01 04:00:00|52.58|\n", - "+---------+-------------------+-----+\n", - "only showing top 5 rows\n", - "\n" - ] - } - ], - "source": [ - "url_df = 'https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short.csv'\n", - "spark_df = spark.createDataFrame(pd.read_csv(url_df))\n", - "spark_df.show(5)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "305167a0-1984-4004-aea3-b97402832491", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+---------+-------------------+-------------------+------------------+\n", - "|unique_id| ds| cutoff| TimeGPT|\n", - "+---------+-------------------+-------------------+------------------+\n", - "| FR|2016-12-30 04:00:00|2016-12-30 03:00:00| 44.89374542236328|\n", - "| FR|2016-12-30 05:00:00|2016-12-30 03:00:00| 46.05792999267578|\n", - "| FR|2016-12-30 06:00:00|2016-12-30 03:00:00|48.790077209472656|\n", - "| FR|2016-12-30 07:00:00|2016-12-30 03:00:00| 54.39702606201172|\n", - "| FR|2016-12-30 08:00:00|2016-12-30 03:00:00| 57.59300231933594|\n", - "+---------+-------------------+-------------------+------------------+\n", - "only showing top 5 rows\n", - "\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "INFO:nixtlats.timegpt:Validating inputs...\n", - " \r" - ] - } - ], - "source": [ - "fcst_df = nixtla_client.cross_validation(spark_df, h=12, n_windows=5, step_size=2)\n", - "fcst_df.show(5)" - ] - }, - { - "cell_type": "markdown", - "id": "2008fbf0-9bd2-4974-904b-bb8dc90876e6", - "metadata": {}, - "source": [ - "### Cross validation with exogenous variables" - ] - }, - { - "cell_type": "markdown", - "id": "1d281c8d-3a5c-4b3e-8468-7699ef44933b", - "metadata": {}, - "source": [ - "Exogenous variables or external factors are crucial in time series forecasting as they provide additional information that might influence the prediction. These variables could include holiday markers, marketing spending, weather data, or any other external data that correlate with the time series data you are forecasting.\n", - "\n", - "For example, if you're forecasting ice cream sales, temperature data could serve as a useful exogenous variable. On hotter days, ice cream sales may increase.\n", - "\n", - "To incorporate exogenous variables in TimeGPT, you'll need to pair each point in your time series data with the corresponding external data.\n", - "\n", - "Let's see an example." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "8b0d7fd4-5d69-4b6e-b065-efeba63f5911", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+---------+-------------------+-----+----------+----------+-----+-----+-----+-----+-----+-----+-----+\n", - "|unique_id| ds| y|Exogenous1|Exogenous2|day_0|day_1|day_2|day_3|day_4|day_5|day_6|\n", - "+---------+-------------------+-----+----------+----------+-----+-----+-----+-----+-----+-----+-----+\n", - "| BE|2016-12-01 00:00:00| 72.0| 61507.0| 71066.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|\n", - "| BE|2016-12-01 01:00:00| 65.8| 59528.0| 67311.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|\n", - "| BE|2016-12-01 02:00:00|59.99| 58812.0| 67470.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|\n", - "| BE|2016-12-01 03:00:00|50.69| 57676.0| 64529.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|\n", - "| BE|2016-12-01 04:00:00|52.58| 56804.0| 62773.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|\n", - "+---------+-------------------+-----+----------+----------+-----+-----+-----+-----+-----+-----+-----+\n", - "only showing top 5 rows\n", - "\n" - ] - } - ], - "source": [ - "df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short-with-ex-vars.csv')\n", - "spark_df = spark.createDataFrame(df)\n", - "spark_df.show(5)" - ] - }, - { - "cell_type": "markdown", - "id": "66ec94e4-98c5-48ee-ad2f-d6996e82b758", - "metadata": {}, - "source": [ - "Let's call the `cross_validation` method, adding this information:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "b3c51169-3561-4d00-adba-fd6e49ab6c24", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "WARNING:nixtlats.timegpt:The specified horizon \"h\" exceeds the model horizon. This may lead to less accurate forecasts. Please consider using a smaller horizon.\n", - "INFO:nixtlats.timegpt:Using the following exogenous variables: Exogenous1, Exogenous2, day_0, day_1, day_2, day_3, day_4, day_5, day_6\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "WARNING:nixtlats.timegpt:The specified horizon \"h\" exceeds the model horizon. This may lead to less accurate forecasts. Please consider using a smaller horizon.\n", - "INFO:nixtlats.timegpt:Using the following exogenous variables: Exogenous1, Exogenous2, day_0, day_1, day_2, day_3, day_4, day_5, day_6\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "WARNING:nixtlats.timegpt:The specified horizon \"h\" exceeds the model horizon. This may lead to less accurate forecasts. Please consider using a smaller horizon.\n", - "INFO:nixtlats.timegpt:Using the following exogenous variables: Exogenous1, Exogenous2, day_0, day_1, day_2, day_3, day_4, day_5, day_6\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "WARNING:nixtlats.timegpt:The specified horizon \"h\" exceeds the model horizon. This may lead to less accurate forecasts. Please consider using a smaller horizon.\n", - "INFO:nixtlats.timegpt:Using the following exogenous variables: Exogenous1, Exogenous2, day_0, day_1, day_2, day_3, day_4, day_5, day_6\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Validating inputs...\n", - "INFO:nixtlats.timegpt:Preprocessing dataframes...\n", - "INFO:nixtlats.timegpt:Inferred freq: H\n", - "WARNING:nixtlats.timegpt:The specified horizon \"h\" exceeds the model horizon. This may lead to less accurate forecasts. Please consider using a smaller horizon.\n", - "INFO:nixtlats.timegpt:Using the following exogenous variables: Exogenous1, Exogenous2, day_0, day_1, day_2, day_3, day_4, day_5, day_6\n", - "INFO:nixtlats.timegpt:Calling Forecast Endpoint...\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+---------+-------------------+-------------------+------------------+------------------+-----------------+------------------+------------------+\n", - "|unique_id| ds| cutoff| TimeGPT| TimeGPT-lo-90| TimeGPT-lo-80| TimeGPT-hi-80| TimeGPT-hi-90|\n", - "+---------+-------------------+-------------------+------------------+------------------+-----------------+------------------+------------------+\n", - "| FR|2016-12-21 00:00:00|2016-12-20 23:00:00| 66.39748296460945| 62.03776876172859|63.28946471509773| 69.50550121412117| 70.7571971674903|\n", - "| FR|2016-12-21 01:00:00|2016-12-20 23:00:00| 63.71841894125738|59.770956050632385|61.16832944845953| 66.26850843405524| 67.66588183188237|\n", - "| FR|2016-12-21 02:00:00|2016-12-20 23:00:00| 61.13784444132001| 58.88184931650312| 59.5156742600456|62.760014622594426|63.393839566136904|\n", - "| FR|2016-12-21 03:00:00|2016-12-20 23:00:00| 55.77490648975175|53.047358607671676|53.22071413745683|58.329098842046676|58.502454371831824|\n", - "| FR|2016-12-21 04:00:00|2016-12-20 23:00:00|48.803786601770284| 44.10176355336941|44.58027990316188| 53.02729330037869| 53.50580965017116|\n", - "+---------+-------------------+-------------------+------------------+------------------+-----------------+------------------+------------------+\n", - "only showing top 5 rows\n", - "\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "INFO:nixtlats.timegpt:Validating inputs...\n", - " \r" - ] - } - ], - "source": [ - "timegpt_cv_ex_vars_df = nixtla_client.cross_validation(\n", - " df=spark_df,\n", - " h=48, \n", - " level=[80, 90],\n", - " n_windows=5,\n", - ")\n", - "timegpt_cv_ex_vars_df.show(5)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "6223e936-426a-4e64-9f35-7fcfce3eca08", - "metadata": {}, - "outputs": [], - "source": [ - "spark.stop()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "python3", - "language": "python", - "name": "python3" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/nbs/docs/how-to-guides/2_computing_at_scale_dask_distributed.ipynb b/nbs/docs/how-to-guides/2_computing_at_scale_dask_distributed.ipynb new file mode 100644 index 00000000..b9e22a21 --- /dev/null +++ b/nbs/docs/how-to-guides/2_computing_at_scale_dask_distributed.ipynb @@ -0,0 +1,564 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Computing at Scale with Dask\n", + "\n", + "> Run TimeGPT distributedly on top of Dask" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "[Dask](https://www.dask.org/get-started) is an open source parallel computing library for Python. In this guide, we will explain how to use `TimeGPT` on top of Dask. \n", + "\n", + "**Outline:** \n", + "1. [Installation](#installation)\n", + "2. [Load Your Data](#load-your-data)\n", + "3. [Import Dask](#import-dask) \n", + "4. [Use TimeGPT on Dask](#use-timegpt-on-dask)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#| hide\n", + "from nixtla.utils import colab_badge" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/markdown": [ + "[![](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/Nixtla/nixtla/blob/main/nbs/docs/how-to-guides/1_computing_at_scale_with_dask.ipynb)" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "#| echo: false\n", + "colab_badge('docs/how-to-guides/1_computing_at_scale_with_dask_distributed')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Installation " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Install Dask through [Fugue](https://fugue-tutorials.readthedocs.io/). Fugue provides an easy-to-use interface for distributed computing that lets users execute Python code on top of several distributed computing frameworks, including Dask. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%capture \n", + "pip install \"fugue[dask]\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "If executing on a distributed `Dask` cluster, ensure that the `nixtla` library is installed across all the workers." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load Data " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can load your data as a `pandas` DataFrame. In this tutorial, we will use a dataset that contains hourly electricity prices from different markets. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
unique_iddsy
0BE2016-12-01 00:00:0072.00
1BE2016-12-01 01:00:0065.80
2BE2016-12-01 02:00:0059.99
3BE2016-12-01 03:00:0050.69
4BE2016-12-01 04:00:0052.58
\n", + "
" + ], + "text/plain": [ + " unique_id ds y\n", + "0 BE 2016-12-01 00:00:00 72.00\n", + "1 BE 2016-12-01 01:00:00 65.80\n", + "2 BE 2016-12-01 02:00:00 59.99\n", + "3 BE 2016-12-01 03:00:00 50.69\n", + "4 BE 2016-12-01 04:00:00 52.58" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import pandas as pd \n", + "\n", + "df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short.csv') \n", + "df.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Import Dask" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Import Dask and convert the pandas DataFrame to a Dask DataFrame. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
Dask DataFrame Structure:
\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
unique_iddsy
npartitions=2
0stringstringfloat64
1800.........
3599.........
\n", + "
Dask Name: frompandas, 1 expression
" + ], + "text/plain": [ + "Dask DataFrame Structure:\n", + " unique_id ds y\n", + "npartitions=2 \n", + "0 string string float64\n", + "1800 ... ... ...\n", + "3599 ... ... ...\n", + "Dask Name: frompandas, 1 expression\n", + "Expr=df" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import dask.dataframe as dd\n", + "\n", + "dask_df = dd.from_pandas(df, npartitions=2)\n", + "dask_df " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Use TimeGPT on Dask " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Using `TimeGPT` on top of `Dask` is almost identical to the non-distributed case. The only difference is that you need to use a `Dask` DataFrame, which we already defined in the previous step. \n", + "\n", + "First, instantiate the `NixtlaClient` class. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from nixtla import NixtlaClient" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "nixtla_client = NixtlaClient(\n", + " # defaults to os.environ.get(\"NIXTLA_API_KEY\")\n", + " api_key = 'my_api_key_provided_by_nixtla'\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#| hide \n", + "nixtla_client = NixtlaClient()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Then use any method from the `NixtlaClient` class such as [`forecast`](https://nixtlaverse.nixtla.io/nixtla/nixtla_client.html#nixtlaclient-forecast) or [`cross_validation`](https://nixtlaverse.nixtla.io/nixtla/nixtla_client.html#nixtlaclient-cross-validation)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Preprocessing dataframes...\n", + "INFO:nixtla.nixtla_client:Inferred freq: H\n", + "INFO:nixtla.nixtla_client:Calling Forecast Endpoint...\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
unique_iddsTimeGPT
0FR2016-12-31 00:00:0062.130219
1FR2016-12-31 01:00:0056.890831
2FR2016-12-31 02:00:0052.231552
3FR2016-12-31 03:00:0048.888664
4FR2016-12-31 04:00:0046.498367
\n", + "
" + ], + "text/plain": [ + " unique_id ds TimeGPT\n", + "0 FR 2016-12-31 00:00:00 62.130219\n", + "1 FR 2016-12-31 01:00:00 56.890831\n", + "2 FR 2016-12-31 02:00:00 52.231552\n", + "3 FR 2016-12-31 03:00:00 48.888664\n", + "4 FR 2016-12-31 04:00:00 46.498367" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "fcst_df = nixtla_client.forecast(dask_df, h=12)\n", + "fcst_df.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Inferred freq: H\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Preprocessing dataframes...\n", + "INFO:nixtla.nixtla_client:Inferred freq: H\n", + "INFO:nixtla.nixtla_client:Calling Forecast Endpoint...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Preprocessing dataframes...\n", + "INFO:nixtla.nixtla_client:Inferred freq: H\n", + "INFO:nixtla.nixtla_client:Calling Forecast Endpoint...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Preprocessing dataframes...\n", + "INFO:nixtla.nixtla_client:Inferred freq: H\n", + "INFO:nixtla.nixtla_client:Calling Forecast Endpoint...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Preprocessing dataframes...\n", + "INFO:nixtla.nixtla_client:Inferred freq: H\n", + "INFO:nixtla.nixtla_client:Calling Forecast Endpoint...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n", + "INFO:nixtla.nixtla_client:Preprocessing dataframes...\n", + "INFO:nixtla.nixtla_client:Inferred freq: H\n", + "INFO:nixtla.nixtla_client:Calling Forecast Endpoint...\n", + "INFO:nixtla.nixtla_client:Validating inputs...\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
unique_iddscutoffTimeGPT
0FR2016-12-30 04:00:002016-12-30 03:00:0044.893738
1FR2016-12-30 05:00:002016-12-30 03:00:0046.05793
2FR2016-12-30 06:00:002016-12-30 03:00:0048.790077
3FR2016-12-30 07:00:002016-12-30 03:00:0054.397026
4FR2016-12-30 08:00:002016-12-30 03:00:0057.592995
\n", + "
" + ], + "text/plain": [ + " unique_id ds cutoff TimeGPT\n", + "0 FR 2016-12-30 04:00:00 2016-12-30 03:00:00 44.893738\n", + "1 FR 2016-12-30 05:00:00 2016-12-30 03:00:00 46.05793\n", + "2 FR 2016-12-30 06:00:00 2016-12-30 03:00:00 48.790077\n", + "3 FR 2016-12-30 07:00:00 2016-12-30 03:00:00 54.397026\n", + "4 FR 2016-12-30 08:00:00 2016-12-30 03:00:00 57.592995" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "cv_df = nixtla_client.cross_validation(dask_df, h=12, n_windows=5, step_size=2)\n", + "cv_df.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can also use exogenous variables with `TimeGPT` on top of `Dask`. To do this, please refer to the [Exogenous Variables](https://docs.nixtla.io/docs/exogenous_variables) tutorial. Just keep in mind that instead of using a pandas DataFrame, you need to use a `Dask` DataFrame instead." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "python3", + "language": "python", + "name": "python3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/nbs/docs/how-to-guides/3_computing_at_scale_ray_distributed.ipynb b/nbs/docs/how-to-guides/3_computing_at_scale_ray_distributed.ipynb new file mode 100644 index 00000000..f74e972a --- /dev/null +++ b/nbs/docs/how-to-guides/3_computing_at_scale_ray_distributed.ipynb @@ -0,0 +1,643 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Computing at Scale with Ray \n", + "\n", + "> Run TimeGPT distributedly on top of Ray" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "[Ray](https://www.ray.io/) is an open source unified compute framework to scale Python workloads. In this guide, we will explain how to use `TimeGPT` on top of Ray. \n", + "\n", + "**Outline:** \n", + "1. [Installation](#installation)\n", + "2. [Load Your Data](#load-your-data)\n", + "3. [Initialize Ray](#initialize-ray) \n", + "4. [Use TimeGPT on Ray](#use-timegpt-on-ray)\n", + "5. [Shutdown Ray](#shutdown-ray)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#| hide\n", + "from nixtla.utils import colab_badge" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/markdown": [ + "[![](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/Nixtla/nixtla/blob/main/nbs/docs/how-to-guides/1_computing_at_scale_with_ray.ipynb)" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "#| echo: false\n", + "colab_badge('docs/how-to-guides/1_computing_at_scale_with_ray_distributed')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Installation " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Install Ray through [Fugue](https://fugue-tutorials.readthedocs.io/). Fugue provides an easy-to-use interface for distributed computing that lets users execute Python code on top of several distributed computing frameworks, including Ray. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%capture \n", + "pip install \"fugue[ray]\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "If executing on a distributed `Ray` cluster, ensure that the `nixtla` library is installed across all the workers." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load Data " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can load your data as a `pandas` DataFrame. In this tutorial, we will use a dataset that contains hourly electricity prices from different markets. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
unique_iddsy
0BE2016-10-22 00:00:0070.00
1BE2016-10-22 01:00:0037.10
2BE2016-10-22 02:00:0037.10
3BE2016-10-22 03:00:0044.75
4BE2016-10-22 04:00:0037.10
\n", + "
" + ], + "text/plain": [ + " unique_id ds y\n", + "0 BE 2016-10-22 00:00:00 70.00\n", + "1 BE 2016-10-22 01:00:00 37.10\n", + "2 BE 2016-10-22 02:00:00 37.10\n", + "3 BE 2016-10-22 03:00:00 44.75\n", + "4 BE 2016-10-22 04:00:00 37.10" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import pandas as pd \n", + "\n", + "df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/electricity-short.csv') \n", + "df.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Initialize Ray" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Initialize `Ray` and convert the pandas DataFrame to a `Ray` DataFrame. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2024-04-26 14:47:26,225\tWARNING cluster_utils.py:121 -- Ray cluster mode is currently experimental and untested on Windows. If you are using it and running into issues please file a report at https://github.com/ray-project/ray/issues.\n", + "2024-04-26 14:47:28,636\tINFO utils.py:108 -- Overwriting previous Ray address (127.0.0.1:64941). Running ray.init() on this node will now connect to the new instance at 127.0.0.1:60031. To override this behavior, pass address=127.0.0.1:64941 to ray.init().\n", + "2024-04-26 14:47:28,636\tINFO worker.py:1431 -- Connecting to existing Ray cluster at address: 127.0.0.1:60031...\n", + "2024-04-26 14:47:28,647\tINFO worker.py:1621 -- Connected to Ray cluster.\n" + ] + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "60998a9efc7143448f3005608e2eb06b", + "version_major": 2, + "version_minor": 0 + }, + "text/html": [ + "
\n", + "
\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Python version:3.10.14
Ray version:2.6.2
\n", + "\n", + "
\n", + "
\n" + ], + "text/plain": [ + "RayContext(dashboard_url='', python_version='3.10.14', ray_version='2.6.2', ray_commit='92ad4bab9e93c7a207a44c65ab51295f92566cb4', protocol_version=None)" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import ray\n", + "from ray.cluster_utils import Cluster\n", + "\n", + "ray_cluster = Cluster(\n", + " initialize_head=True,\n", + " head_node_args={\"num_cpus\": 2}\n", + ")\n", + "ray.init(address=ray_cluster.address, ignore_reinit_error=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "cef0e8a3c3f54c8788cb264d1a6c5f22", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "MaterializedDataset(\n", + " num_blocks=1,\n", + " num_rows=8400,\n", + " schema={unique_id: object, ds: object, y: float64}\n", + ")" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ray_df = ray.data.from_pandas(df)\n", + "ray_df " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Use TimeGPT on Ray" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Using `TimeGPT` on top of `Ray` is almost identical to the non-distributed case. The only difference is that you need to use a `Ray` DataFrame. \n", + "\n", + "First, instantiate the `NixtlaClient` class. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from nixtla import NixtlaClient" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "nixtla_client = NixtlaClient(\n", + " # defaults to os.environ.get(\"NIXTLA_API_KEY\")\n", + " api_key = 'my_api_key_provided_by_nixtla'\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#| hide \n", + "nixtla_client = NixtlaClient()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Then use any method from the `NixtlaClient` class such as [`forecast`](https://nixtlaverse.nixtla.io/nixtla/nixtla_client.html#nixtlaclient-forecast) or [`cross_validation`](https://nixtlaverse.nixtla.io/nixtla/nixtla_client.html#nixtlaclient-cross-validation)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2024-04-26 14:47:31,617\tINFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition]\n", + "2024-04-26 14:47:31,618\tINFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n", + "2024-04-26 14:47:31,619\tINFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n", + "2024-04-26 14:47:32,114\tINFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(add_coarse_key)] -> LimitOperator[limit=1]\n", + "2024-04-26 14:47:32,114\tINFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n", + "2024-04-26 14:47:32,115\tINFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n", + "2024-04-26 14:47:32,166\tINFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(add_coarse_key)] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(group_fn)]\n", + "2024-04-26 14:47:32,167\tINFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)\n", + "2024-04-26 14:47:32,167\tINFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n" + ] + } + ], + "source": [ + "%%capture\n", + "fcst_df = nixtla_client.forecast(ray_df, h=12)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To visualize the result, use the `to_pandas` method to convert the output of `Ray` to a `pandas` DataFrame." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
unique_iddsTimeGPT
55NP2018-12-24 07:00:0053.784847
56NP2018-12-24 08:00:0054.437321
57NP2018-12-24 09:00:0054.66077
58NP2018-12-24 10:00:0054.744473
59NP2018-12-24 11:00:0054.737762
\n", + "
" + ], + "text/plain": [ + " unique_id ds TimeGPT\n", + "55 NP 2018-12-24 07:00:00 53.784847\n", + "56 NP 2018-12-24 08:00:00 54.437321\n", + "57 NP 2018-12-24 09:00:00 54.66077\n", + "58 NP 2018-12-24 10:00:00 54.744473\n", + "59 NP 2018-12-24 11:00:00 54.737762" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "fcst_df.to_pandas().tail()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2024-04-26 14:47:40,202\tINFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition]\n", + "2024-04-26 14:47:40,202\tINFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n", + "2024-04-26 14:47:40,202\tINFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n", + "2024-04-26 14:47:40,261\tINFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(add_coarse_key)] -> LimitOperator[limit=1]\n", + "2024-04-26 14:47:40,261\tINFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n", + "2024-04-26 14:47:40,262\tINFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n", + "2024-04-26 14:47:40,305\tINFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(add_coarse_key)] -> AllToAllOperator[Sort] -> TaskPoolMapOperator[MapBatches(group_fn)]\n", + "2024-04-26 14:47:40,306\tINFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)\n", + "2024-04-26 14:47:40,306\tINFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n" + ] + } + ], + "source": [ + "%%capture\n", + "cv_df = nixtla_client.cross_validation(ray_df, h=12, freq='H', n_windows=5, step_size=2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "\u001b[2m\u001b[36m(MapBatches(group_fn) pid=12096)\u001b[0m INFO:nixtla.nixtla_client:Validating inputs...\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
unique_iddscutoffTimeGPT
295NP2018-12-23 19:00:002018-12-23 11:00:0053.441555
296NP2018-12-23 20:00:002018-12-23 11:00:0052.649628
297NP2018-12-23 21:00:002018-12-23 11:00:0051.753975
298NP2018-12-23 22:00:002018-12-23 11:00:0050.681946
299NP2018-12-23 23:00:002018-12-23 11:00:0049.716431
\n", + "
" + ], + "text/plain": [ + " unique_id ds cutoff TimeGPT\n", + "295 NP 2018-12-23 19:00:00 2018-12-23 11:00:00 53.441555\n", + "296 NP 2018-12-23 20:00:00 2018-12-23 11:00:00 52.649628\n", + "297 NP 2018-12-23 21:00:00 2018-12-23 11:00:00 51.753975\n", + "298 NP 2018-12-23 22:00:00 2018-12-23 11:00:00 50.681946\n", + "299 NP 2018-12-23 23:00:00 2018-12-23 11:00:00 49.716431" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "cv_df.to_pandas().tail()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can also use exogenous variables with `TimeGPT` on top of `Ray`. To do this, please refer to the [Exogenous Variables](https://nixtlaverse.nixtla.io/nixtla/docs/tutorials/exogenous_variables.html) tutorial. Just keep in mind that instead of using a pandas DataFrame, you need to use a `Ray` DataFrame instead." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Shutdown Ray" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "When you are done, shutdown the `Ray` session. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ray.shutdown()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "python3", + "language": "python", + "name": "python3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}