Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed pipelines #428

Merged
merged 5 commits into from
Oct 3, 2022
Merged

Distributed pipelines #428

merged 5 commits into from
Oct 3, 2022

Conversation

aristotelis96
Copy link
Collaborator

@aristotelis96 aristotelis96 commented Sep 13, 2022

Refactored distributed runtime
These changes allow to fuse multiple operations and also enable vectorized execution at the distributed workers.

  • Distributed runtime now works similar to the vectorized engine. Compiler fuses multiple operations into one to create a "distributed pipeline". Compiler provides the runtime with inputs and a mlir code fragment of the pipeline. Runtime is responsible for providing all the necessary inputs of the pipeline to the workers, along with the mlir code fragment. Finally collects from the workers and returns the result of the pipeline.
  • Implemented DistributedWrapper.h. This kernel is responsible for executing a distributed pipeline (similar to MTWrapper.h for the vectorized engine).
  • Compiler support for creating distributed pipelines.
  • Distributed kernels (e.g. Distribute.h) similar to runtime/local/kernels are partially specialized. Each partial specialization exists for a different distributed backend.

Support for various data types on the distributed runtime

  • Added support for DenseMatrix<int64_t> and CSRMatrix<double/int64_t>. Distributed runtime only supported operations with DenseMatrix<double>, it now supports other datatypes.
  • For now only DenseMatrix<double> is supported for result of the distributed pipeline, however the operands can be other types.

Other small changes:

  • Added an accessor for the MetaDataObject of a Structure object. This was needed because distributed kernels need to modify the meta data of an object.
  • Seperated gRPC related code from the generic distributed worker.
  • Updated folder structure. Moved all distributed-related code under src/runtime/distributed/.
  • Updated tests.

Closes #96 #194 and #367.

@corepointer corepointer self-requested a review September 21, 2022 10:18
@corepointer corepointer added the Distributed Issues and PRs related to distributed computation label Sep 21, 2022
Copy link
Collaborator

@corepointer corepointer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your pull request and all the effort to track changes as it has been quite a while this pull request has been waiting now.

  • I found a number of things to improve in the code. See the comments that directly refer to the lines concerned. For some issues that occurred multiple times like the mem leaks and the duplicated code for parsing the environment variables, I only marked it once.

  • Documentation on how to use this would be an asset. I tried to fire up the example code from the deploy directory (and also learned how to fire up the workers in these scripts) but I could not verify that something was actually computed in a distributed fashion.

  • The GRPC specific code could be further encapsulated to make operations like distribute/broadcast more generic. E.g., the logic of the operators could stay but the means of transport should be interchangable as a broadcast operation is not specific to GRPC.

Regards, Mark

src/compiler/execution/DaphneIrExecutor.cpp Show resolved Hide resolved
src/runtime/distributed/coordinator/kernels/Broadcast.h Outdated Show resolved Hide resolved
src/runtime/distributed/coordinator/kernels/Broadcast.h Outdated Show resolved Hide resolved
src/runtime/distributed/coordinator/kernels/Broadcast.h Outdated Show resolved Hide resolved
src/runtime/local/io/ReadCsv.h Outdated Show resolved Hide resolved
src/runtime/local/io/ReadParquet.h Outdated Show resolved Hide resolved
src/runtime/distributed/proto/DistributedGRPCCaller.h Outdated Show resolved Hide resolved
@aristotelis96
Copy link
Collaborator Author

Thanks @corepointer for taking the time reviewing this PR. I answered a few of your comments above, I would like to mention a few more things:

  • @psomas implemented a DistributedContext that now holds information regarding worker addresses etc. This will probably be useful for future distributed implementations (e.g. MPI). In addition this helps with unnecessary code duplication.
  • Documentation: You are right, a proper documentation is indeed missing. We are planning on adding this for the upcoming Prototype release (v0.1).
  • I am not really sure how to approach a further encapsulation of gRPC for now. A few ideas that come to mind are:
  1. Moving code outside of distributed-kernels but that would make DistributedWrapper.h too complex.
  2. Another option would be to split the distributed kernels into logic and transport. E.g. Broadcast.h is splitted into two kernels, one that prepares and creates the meta data and one that is responsible only for communicating (the second one being the partially templated specialized for each distributed backend).
  3. We could leave Distribute.h, Broadcast.h etc. kernels as is and add inside a function that handles the logic. Each partial template specialization can then use said function to handle the logic part. This will probably be more clear once MPI is ready.

Copy link
Collaborator

@corepointer corepointer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All mentioned issues seem to have been addressed. I will squash and rebase to cleanly merge this to main.

@aristotelis96 aristotelis96 marked this pull request as draft September 29, 2022 12:04
@aristotelis96
Copy link
Collaborator Author

Thanks again @corepointer for reviewing this PR and your helpful comments. I just pushed two more commits that fix two bugs:

  1. For each object/value the worker receives, it allocates memory and stores a pointer to that address. Because it did not allocate any memory when storing values (e.g. a double) this caused a bug where the pointer stored was not pointing anywhere. Currently there is no support for free-operations for the distributed workers which means that we waste memory for data that is no longer used (this applies for both values and objects). I've added a few comments describing this issue.
  2. There was a bug where Distribute/Broadcast kernels updated the metadata of an object, before checking whether that object was already placed at the workers, resulting in unnecessary communication. This should now be fixed.

@aristotelis96 aristotelis96 marked this pull request as ready for review September 29, 2022 12:42
pdamme and others added 5 commits October 3, 2022 18:07
Basic compiler infrastructure for distributed pipelines.

- DistributedPipelineOp akin to VectorizedPipelineOp
  - main difference: pipeline body given as IR string, not as nested code region
- New DistributePipelinesPass rewriting VectorizedPipelineOp to DistributedPipelineOp.
  - including the generation of the IR string
  - distribution requires vectorization (--vec) to be effective
- RewriteToCallKernelOpPass lowers DistributedPipelineOp to CallKernelOp
  - special rewrite pattern for DistributedPipelineOp, for certain reasons it cannot be treated as any other op here
- distributedPipeline-kernel: currently just a stub that prints the information it receives, including the IR string.
- Limitations:
  - currently, we only support distributed pipelines with 1 or 2 outputs (we still have the same limitation for vectorized pipelines)
  - currently, we only support DenseMatrix<double>
  - currently, all vectorized pipelines are rewritten to distributed pipelines
This commit merges a longer development process to the main branch. The general topic is given in the first line of the commit message and the aggregated individual commit messages are listed below.

Closes #96, Closes #194

Distributed runtime updates:
- Updated project structure
- Moved distributed related kernels and datastructures under runtime/distributed/coordinator/
- Generalized Broadcast&Distribute kernels
- DistributedWrapper implementation
- Updated Worker to support Vectorized execution.

Implementation of vectorizedPipeline local kernel

- Updated distributedCollect Primitive
- TODO generated ir code needs to be fixed
- TODO Additional debugging needed on worker side

Distributed runtime updates:

- Extended parseType for rows/cols

- Updated Distributed Runtime for COLS combine

-Updated DistributedTest

- DistributedDescriptor implementation (metadata for the distributed runtime)
- Distributed allocation descriptor implementation. simply holds object metadata information
- Distributed kernels (distribute/broadcast/compute, etc.) use template functions for each communication framework.
- MPI implementation missing.

- New enum type for distributed backend implementation

[MINOR] Changes for readCSVFiles after rebasing
This commit merges a longer development process to the main branch. The general topic is given in the first line of the commit message and the aggregated individual commit messages are listed below.

Closes #367

Initial AllocationDescriptor Distributed Implementation

- GRPC implementation

Moved gRPC-related classes and files under "runtime/distributed/proto/" .

- Some files containing gRPC code where located under distributed/worker. Moved class ProtoDataConverter, class CallData
- Some files containing gRPC code where located under distributed/coordinator. Moved class DistributedGRPCCaller
- Updated CMAKE files.

Updated DistributedWorker

- Seperated worker implementation from gRPC.
- Worker gRPC implementation now derives from base class Worker Implementation.
- Base class WorkerImpl contains generic functions for storing data, computing pipelines, etc.
- class WorkerImplGRPC contains functions for communicating with gRPC and using parent class for storing/computing data.
- TODO WorkerImplMPI.

Distributed pipeline kernel: Support for more than two outputs.

Enabling multiple outputs for distributed pipelines.

- There was already a partial implementation transfering the recent changes from vectorized pipelines to distributed pipelines.

- However, a few pieces were still missing to make it work:

- The CallKernelOp generated for the DistributedPipelineOp in RewriteToCallKernelOpPass must have the attribute "hasVariadicResults" to ensure correct lowering in LowerToLLVMPass.

- The number of outputs must come after the outputs in the kernel, and must not be added as an operand to the CallKernelOp, since it is added automatically for variadic results in LowerToLLVMPass.

[MINOR] Bugfix: grpc was not throwing an error when handling unsupported types (for now we support only Dense<double>)

- Support for broadcasting single double values.
- Minor fixes.
- Due to current Object Meta Data limitations, we only support unique inputs for a Distributed Pipeline (no duplicate pipeline inputs).

Distributed kernels

- Distributed kernels have specializations for each distributed backend implementation.
- Distributed kernels update the meta data and handle the communication using specific distributed-backend implementation.
- Distributed metadata now hold only information.
- TODO: Add simple transferTo/From functions in the meta data class for the distributed gRPC implementation.
- Various small changes.

Rebased onto main

- main includes the initial Meta Data implementation
- MetaDataObject mdo field of class Structure is now public. Distributed kernels need to access and modify the metadata of an object.
- Various small updates to kernels in order to support the new meta data implementation.

Updated distributed runtime tests

- WorkerTest.cpp now tests the generic WorkerImpl class, instead of the gRPC specific implementation.
- TODO: Add a test for the gRPC WorkerImpl class.
- Removed unused utility function "StartDistributedWorker"
- Disabled "DistributedRead" test. With the new Distributed-Pipeline implementation we do not support distributed read yet, therefore this test does not actually test something significant.
- Updated a few test-scripts for the distributed runtime, due to unique-pipeline-inputs limitations.

Cleanup.

- Added Status nested class to WorkerImpl.
- Renamed and moved AllocationDescriptorGRPC.
- Renamed Worker::StoredInfo::filename to identifier.
- Improved serialization from CSRMatrix to protobuf.
- Changed MetaDataObject mdo in Structure class, from public to private.
- Added getter by reference for modifying MetaDataObject of a Structure.
- Improved CSRMatrix serialization from Daphne object to protobuf.
- Fixed various warnings.
- Minor changes.
Added distributed context and cli argument for distributed execution.

- Added DistributedContext.h containg information about distributed workers.
- Removed duplicate code.
- Added command line argument "--distributed", in order to enable distributed execution for DAPHNE.
- Various small fixes after adding "--distributed"" flag.

Co-authored-by: Stratos Psomadakis <[email protected]>
Co-authored-by: Aristotelis Vontzalidis <[email protected]>
This commit merges a longer development process to the main branch. The general topic is given in the first line of the commit message and the aggregated individual commit messages are listed below.

Closes #428

- Fixed various memory leaks.
- Fixed headers.
- Changed some pass-by-value to const-reference.
- Channel map changed to inline static.

[MINOR] Silenced some warnings

[Bugfix] Worker receiving and storing a value.

When the worker receives a value, we need to allocate memory, store the value and keep the address, for later use.
Since we did not allocate any memory this resulted in a bug where the address stored was not pointing to any value.

[Bugfix] Distribute/Broadcast kernels never actually checked if something was already placed at the workers.
@corepointer
Copy link
Collaborator

LGTM - Compilation and test suite have no issues. Time to merge. I hope the squashed commits make sense.

Thank you all for the hard work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Distributed Issues and PRs related to distributed computation
Projects
None yet
4 participants