From b7cad5412ce8bd88e48b9a9d5f11592710e14d42 Mon Sep 17 00:00:00 2001 From: "Igoshev, Yaroslav" Date: Thu, 11 Feb 2021 11:26:48 +0300 Subject: [PATCH 1/7] DOCS-#2720: Add documentation for partition API Signed-off-by: Igoshev, Yaroslav --- docs/developer/pandas/partition_api.rst | 75 +++++++++++++++++++++++++ docs/index.rst | 1 + 2 files changed, 76 insertions(+) create mode 100644 docs/developer/pandas/partition_api.rst diff --git a/docs/developer/pandas/partition_api.rst b/docs/developer/pandas/partition_api.rst new file mode 100644 index 00000000000..a8cd4a5e418 --- /dev/null +++ b/docs/developer/pandas/partition_api.rst @@ -0,0 +1,75 @@ +Pandas Partition API in Modin +============================= + +If you are working with a Modin Dataframe, you can unwrap its remote partitions +to get the raw future objects compatible with the execution engine (e.g. ``ray.ObjectRef`` for Ray). +You can use this API to get the IPs of the nodes that hold these objects as well. In that case you can pass the partitions +having needed IPs to your function. It can help with minimazing of data movement between nodes. In addition to +unwrapping of the remote partitions we also provide API to construct Modin DataFrame from them. + +Ray engine +---------- +However, it is worth noting that for Modin on ``Ray`` engine with ``pandas`` backend IPs of the remote partitions may not match +actual locations if the partitions are lower than 100 kB. Ray saves such objects (<= 100 kB, by default) in in-process store +of the calling process (please, refer to `Ray documentation`_ for more information). We can't get IPs for such objects while maintaining good performance. +So, you should keep in mind this for unwrapping of the remote partitions with their IPs. Several options are provided to handle the case in +``How to handle Ray objects that are lower 100 kB`` section. + +Dask engine +----------- +There is no mentioned above issue for Modin on ``Dask`` engine with ``pandas`` backend because ``Dask`` saves any objects +in the worker process that processes a function (please, refer to `Dask documentation`_ for more information). + +Install Modin Pandas Partition API +---------------------------------- + +Modin now comes with all the dependencies for pandas partition API functionality by default! See +the :doc:`installation page ` for more information on installing Modin. + +How to handle Ray objects that are lower than 100 kB +---------------------------------------------------- + +* If you are sure that each of the remote partitions being unwrapped is higher than 100 kB, you can just import Modin or perform ``ray.init()`` manually. + +* If you don't know partition sizes you can pass the option ``_system_config={"max_direct_call_object_size": ,}``, where ``nbytes`` is threshold for objects that will be stored in in-process store, to ``ray.init()`` or export the following environment variable: + +.. code-block:: bash + + export MODIN_ON_RAY_PARTITION_THRESHOLD= + +When specifying ``nbytes`` equal to 0, all the objects will be saved to shared-memory object store (plasma). + +* You can also start Ray as follows: ``ray start --head --system-config='{"max_direct_call_object_size":}'``. + +Note that when specifying the threshold the performance of some Modin operations may change. + +API +--- + +It is currently supported the following API: + +.. automodule:: modin.distributed.dataframe.pandas + :members: unwrap_partitions + +.. automodule:: modin.distributed.dataframe.pandas + :members: from_partitions + +Running an example with pandas partition API +-------------------------------------------- + +Before you run this, please make sure you follow the instructions listed above. + +.. code-block:: python + + import modin.pandas as pd + from modin.distributed.dataframe.pandas import unwrap_partitions, from_partitions + df = pd.read_csv("/path/to/your/file") + partitions = unwrap_partitions(df, axis=0, bind_ip=True) + print(partitions) + # Also, you can create Modin DataFrame from remote partitions including their IPs + new_df = from_partitions(partitions, 0) + print(new_df) + + +.. _`Ray documentation`: https://docs.ray.io/en/master/index.html# +.. _`Dask documentation`: https://distributed.dask.org/en/latest/index.html diff --git a/docs/index.rst b/docs/index.rst index 7afe3b1c78f..77980c42769 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -156,6 +156,7 @@ nature, you get a fast DataFrame at 1MB and 1TB+. contributing developer/architecture + developer/pandas/partition_api .. toctree:: :caption: Engines, Backends, and APIs From b8603761aa43d4a55e082bbb393e17e532099ec7 Mon Sep 17 00:00:00 2001 From: YarShev Date: Fri, 12 Feb 2021 13:53:10 +0300 Subject: [PATCH 2/7] DOCS-#2720: Apply suggestions from code review Co-authored-by: Alexey Prutskov Signed-off-by: Igoshev, Yaroslav --- docs/developer/pandas/partition_api.rst | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/docs/developer/pandas/partition_api.rst b/docs/developer/pandas/partition_api.rst index a8cd4a5e418..114d491a6e6 100644 --- a/docs/developer/pandas/partition_api.rst +++ b/docs/developer/pandas/partition_api.rst @@ -49,9 +49,11 @@ API It is currently supported the following API: .. automodule:: modin.distributed.dataframe.pandas + :noindex: :members: unwrap_partitions .. automodule:: modin.distributed.dataframe.pandas + :noindex: :members: from_partitions Running an example with pandas partition API @@ -63,11 +65,13 @@ Before you run this, please make sure you follow the instructions listed above. import modin.pandas as pd from modin.distributed.dataframe.pandas import unwrap_partitions, from_partitions - df = pd.read_csv("/path/to/your/file") - partitions = unwrap_partitions(df, axis=0, bind_ip=True) + import numpy as np + data = np.random.randint(0, 100, size=(2 ** 10, 2 ** 8)) + df = pd.DataFrame(data) + partitions = unwrap_partitions(df, axis=0, get_ip=True) print(partitions) # Also, you can create Modin DataFrame from remote partitions including their IPs - new_df = from_partitions(partitions, 0) + new_df = from_partitions(partitions, axis=0) print(new_df) From 8d2dd2c0484c1c11dbe194b7af690451d1a201fc Mon Sep 17 00:00:00 2001 From: "Igoshev, Yaroslav" Date: Mon, 15 Feb 2021 21:19:19 +0300 Subject: [PATCH 3/7] DOCS-#2720: Apply comments Signed-off-by: Igoshev, Yaroslav --- docs/developer/pandas/partition_api.rst | 82 ++++++++++++------------- 1 file changed, 39 insertions(+), 43 deletions(-) diff --git a/docs/developer/pandas/partition_api.rst b/docs/developer/pandas/partition_api.rst index 114d491a6e6..2c49a0e9b0d 100644 --- a/docs/developer/pandas/partition_api.rst +++ b/docs/developer/pandas/partition_api.rst @@ -1,11 +1,44 @@ -Pandas Partition API in Modin -============================= +Partition API in Modin +====================== -If you are working with a Modin Dataframe, you can unwrap its remote partitions +When you are working with a Modin Dataframe, you can unwrap its remote partitions to get the raw future objects compatible with the execution engine (e.g. ``ray.ObjectRef`` for Ray). -You can use this API to get the IPs of the nodes that hold these objects as well. In that case you can pass the partitions -having needed IPs to your function. It can help with minimazing of data movement between nodes. In addition to -unwrapping of the remote partitions we also provide API to construct Modin DataFrame from them. +In addition to unwrapping of the remote partitions we also provide API to construct Modin DataFrame from them. + +API +--- + +It is currently supported the following API: + +.. automodule:: modin.distributed.dataframe.pandas + :noindex: + :members: unwrap_partitions + +.. automodule:: modin.distributed.dataframe.pandas + :noindex: + :members: from_partitions + +Running an example with Partition API +------------------------------------- + +.. code-block:: python + + import modin.pandas as pd + from modin.distributed.dataframe.pandas import unwrap_partitions, from_partitions + import numpy as np + data = np.random.randint(0, 100, size=(2 ** 10, 2 ** 8)) + df = pd.DataFrame(data) + partitions = unwrap_partitions(df, axis=0) + print(partitions) + # Also, you can create Modin DataFrame from remote partitions + new_df = from_partitions(partitions, axis=0) + print(new_df) + +Partition IPs Usage +------------------- + +Also, you can use the mentioned above API to get the IPs of the nodes that hold the partitions (``get_ip=True``). +In that case you can pass the partitions having needed IPs to your function. It can help with minimazing of data movement between nodes. Ray engine ---------- @@ -20,12 +53,6 @@ Dask engine There is no mentioned above issue for Modin on ``Dask`` engine with ``pandas`` backend because ``Dask`` saves any objects in the worker process that processes a function (please, refer to `Dask documentation`_ for more information). -Install Modin Pandas Partition API ----------------------------------- - -Modin now comes with all the dependencies for pandas partition API functionality by default! See -the :doc:`installation page ` for more information on installing Modin. - How to handle Ray objects that are lower than 100 kB ---------------------------------------------------- @@ -43,37 +70,6 @@ When specifying ``nbytes`` equal to 0, all the objects will be saved to shared-m Note that when specifying the threshold the performance of some Modin operations may change. -API ---- - -It is currently supported the following API: - -.. automodule:: modin.distributed.dataframe.pandas - :noindex: - :members: unwrap_partitions - -.. automodule:: modin.distributed.dataframe.pandas - :noindex: - :members: from_partitions - -Running an example with pandas partition API --------------------------------------------- - -Before you run this, please make sure you follow the instructions listed above. - -.. code-block:: python - - import modin.pandas as pd - from modin.distributed.dataframe.pandas import unwrap_partitions, from_partitions - import numpy as np - data = np.random.randint(0, 100, size=(2 ** 10, 2 ** 8)) - df = pd.DataFrame(data) - partitions = unwrap_partitions(df, axis=0, get_ip=True) - print(partitions) - # Also, you can create Modin DataFrame from remote partitions including their IPs - new_df = from_partitions(partitions, axis=0) - print(new_df) - .. _`Ray documentation`: https://docs.ray.io/en/master/index.html# .. _`Dask documentation`: https://distributed.dask.org/en/latest/index.html From 5223535fa78e7b82983a539fa5868fba87d8dd55 Mon Sep 17 00:00:00 2001 From: YarShev Date: Wed, 17 Feb 2021 09:39:46 +0300 Subject: [PATCH 4/7] DOCS-#2720: Apply suggestions from code review Co-authored-by: Devin Petersohn Signed-off-by: Igoshev, Yaroslav --- docs/developer/pandas/partition_api.rst | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/docs/developer/pandas/partition_api.rst b/docs/developer/pandas/partition_api.rst index 2c49a0e9b0d..a8629a34abe 100644 --- a/docs/developer/pandas/partition_api.rst +++ b/docs/developer/pandas/partition_api.rst @@ -2,24 +2,26 @@ Partition API in Modin ====================== When you are working with a Modin Dataframe, you can unwrap its remote partitions -to get the raw future objects compatible with the execution engine (e.g. ``ray.ObjectRef`` for Ray). -In addition to unwrapping of the remote partitions we also provide API to construct Modin DataFrame from them. +to get the raw futures objects compatible with the execution engine (e.g. ``ray.ObjectRef`` for Ray). +In addition to unwrapping of the remote partitions we also provide an API to construct +a ``modin.pandas.DataFrame`` from raw futures objects. -API ---- - -It is currently supported the following API: +unwrap_partitions +----------------- .. automodule:: modin.distributed.dataframe.pandas :noindex: :members: unwrap_partitions +from_partitions +--------------- + .. automodule:: modin.distributed.dataframe.pandas :noindex: :members: from_partitions -Running an example with Partition API -------------------------------------- +Example +------- .. code-block:: python @@ -37,7 +39,7 @@ Running an example with Partition API Partition IPs Usage ------------------- -Also, you can use the mentioned above API to get the IPs of the nodes that hold the partitions (``get_ip=True``). +You can use the mentioned above APIs to get the IPs of the nodes that hold the partitions (``get_ip=True``). In that case you can pass the partitions having needed IPs to your function. It can help with minimazing of data movement between nodes. Ray engine From 82e019abf39f3cf6baf859f98f103877b27952ec Mon Sep 17 00:00:00 2001 From: "Igoshev, Yaroslav" Date: Thu, 18 Feb 2021 11:31:26 +0300 Subject: [PATCH 5/7] DOCS-#2720: Update doc Signed-off-by: Igoshev, Yaroslav --- docs/developer/pandas/partition_api.rst | 67 ++++++++----------------- 1 file changed, 22 insertions(+), 45 deletions(-) diff --git a/docs/developer/pandas/partition_api.rst b/docs/developer/pandas/partition_api.rst index a8629a34abe..cc7a4e2dd12 100644 --- a/docs/developer/pandas/partition_api.rst +++ b/docs/developer/pandas/partition_api.rst @@ -3,8 +3,13 @@ Partition API in Modin When you are working with a Modin Dataframe, you can unwrap its remote partitions to get the raw futures objects compatible with the execution engine (e.g. ``ray.ObjectRef`` for Ray). -In addition to unwrapping of the remote partitions we also provide an API to construct -a ``modin.pandas.DataFrame`` from raw futures objects. +In addition to unwrapping of the remote partitions we also provide an API to construct a ``modin.pandas.DataFrame`` +from raw futures objects. + +Partition IPs +------------- +Also, you can get the IPs of the nodes that hold the partitions. In that case you can pass +the partitions having needed IPs to your function. It can help with minimazing of data movement between nodes. unwrap_partitions ----------------- @@ -13,6 +18,13 @@ unwrap_partitions :noindex: :members: unwrap_partitions +map_partitions_to_ips +--------------------- + +.. automodule:: modin.distributed.dataframe.pandas + :noindex: + :members: map_partitions_to_ips + from_partitions --------------- @@ -26,52 +38,17 @@ Example .. code-block:: python import modin.pandas as pd - from modin.distributed.dataframe.pandas import unwrap_partitions, from_partitions + from modin.distributed.dataframe.pandas import ( + unwrap_partitions, + map_partitions_to_ips, + from_partitions, + ) import numpy as np data = np.random.randint(0, 100, size=(2 ** 10, 2 ** 8)) df = pd.DataFrame(data) - partitions = unwrap_partitions(df, axis=0) + partitions = unwrap_partitions(df, axis=0, get_ip=True) print(partitions) - # Also, you can create Modin DataFrame from remote partitions + mapped_partitions = map_partitions_to_ips(partitions, axis=0) + print(mapped_partitions) new_df = from_partitions(partitions, axis=0) print(new_df) - -Partition IPs Usage -------------------- - -You can use the mentioned above APIs to get the IPs of the nodes that hold the partitions (``get_ip=True``). -In that case you can pass the partitions having needed IPs to your function. It can help with minimazing of data movement between nodes. - -Ray engine ----------- -However, it is worth noting that for Modin on ``Ray`` engine with ``pandas`` backend IPs of the remote partitions may not match -actual locations if the partitions are lower than 100 kB. Ray saves such objects (<= 100 kB, by default) in in-process store -of the calling process (please, refer to `Ray documentation`_ for more information). We can't get IPs for such objects while maintaining good performance. -So, you should keep in mind this for unwrapping of the remote partitions with their IPs. Several options are provided to handle the case in -``How to handle Ray objects that are lower 100 kB`` section. - -Dask engine ------------ -There is no mentioned above issue for Modin on ``Dask`` engine with ``pandas`` backend because ``Dask`` saves any objects -in the worker process that processes a function (please, refer to `Dask documentation`_ for more information). - -How to handle Ray objects that are lower than 100 kB ----------------------------------------------------- - -* If you are sure that each of the remote partitions being unwrapped is higher than 100 kB, you can just import Modin or perform ``ray.init()`` manually. - -* If you don't know partition sizes you can pass the option ``_system_config={"max_direct_call_object_size": ,}``, where ``nbytes`` is threshold for objects that will be stored in in-process store, to ``ray.init()`` or export the following environment variable: - -.. code-block:: bash - - export MODIN_ON_RAY_PARTITION_THRESHOLD= - -When specifying ``nbytes`` equal to 0, all the objects will be saved to shared-memory object store (plasma). - -* You can also start Ray as follows: ``ray start --head --system-config='{"max_direct_call_object_size":}'``. - -Note that when specifying the threshold the performance of some Modin operations may change. - - -.. _`Ray documentation`: https://docs.ray.io/en/master/index.html# -.. _`Dask documentation`: https://distributed.dask.org/en/latest/index.html From cfabb0d45449cb8f116485f29cf646a69077e031 Mon Sep 17 00:00:00 2001 From: YarShev Date: Fri, 19 Feb 2021 10:15:50 +0300 Subject: [PATCH 6/7] DOCS-#2720: Update docs/developer/pandas/partition_api.rst Co-authored-by: Devin Petersohn Signed-off-by: Igoshev, Yaroslav --- docs/developer/pandas/partition_api.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/developer/pandas/partition_api.rst b/docs/developer/pandas/partition_api.rst index cc7a4e2dd12..7da7693be7b 100644 --- a/docs/developer/pandas/partition_api.rst +++ b/docs/developer/pandas/partition_api.rst @@ -8,8 +8,8 @@ from raw futures objects. Partition IPs ------------- -Also, you can get the IPs of the nodes that hold the partitions. In that case you can pass -the partitions having needed IPs to your function. It can help with minimazing of data movement between nodes. +For finer grained placement control, Modin also provides an API to get the IP addresses of the nodes that hold each partition. +You can pass the partitions having needed IPs to your function. It can help with minimazing of data movement between nodes. unwrap_partitions ----------------- From b346f505511138f84213cabee268ba43baeacba6 Mon Sep 17 00:00:00 2001 From: "Igoshev, Yaroslav" Date: Thu, 25 Feb 2021 10:09:52 +0300 Subject: [PATCH 7/7] DOCS-#2720: Update doc page Signed-off-by: Igoshev, Yaroslav --- docs/developer/pandas/partition_api.rst | 27 +++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/docs/developer/pandas/partition_api.rst b/docs/developer/pandas/partition_api.rst index 7da7693be7b..25fae335d12 100644 --- a/docs/developer/pandas/partition_api.rst +++ b/docs/developer/pandas/partition_api.rst @@ -52,3 +52,30 @@ Example print(mapped_partitions) new_df = from_partitions(partitions, axis=0) print(new_df) + +Ray engine +---------- +However, it is worth noting that for Modin on ``Ray`` engine with ``pandas`` backend IPs of the remote partitions may not match +actual locations if the partitions are lower than 100 kB. Ray saves such objects (<= 100 kB, by default) in in-process store +of the calling process (please, refer to `Ray documentation`_ for more information). We can't get IPs for such objects while maintaining good performance. +So, you should keep in mind this for unwrapping of the remote partitions with their IPs. Several options are provided to handle the case in +``How to handle Ray objects that are lower 100 kB`` section. + +Dask engine +----------- +There is no mentioned above issue for Modin on ``Dask`` engine with ``pandas`` backend because ``Dask`` saves any objects +in the worker process that processes a function (please, refer to `Dask documentation`_ for more information). + +How to handle Ray objects that are lower than 100 kB +---------------------------------------------------- + +* If you are sure that each of the remote partitions being unwrapped is higher than 100 kB, you can just import Modin or perform ``ray.init()`` manually. + +* If you don't know partition sizes you can pass the option ``_system_config={"max_direct_call_object_size": ,}``, where ``nbytes`` is threshold for objects that will be stored in in-process store, to ``ray.init()``. + +* You can also start Ray as follows: ``ray start --head --system-config='{"max_direct_call_object_size":}'``. + +Note that when specifying the threshold the performance of some Modin operations may change. + +.. _`Ray documentation`: https://docs.ray.io/en/master/index.html# +.. _`Dask documentation`: https://distributed.dask.org/en/latest/index.html