Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join vertex - temporary PR on my own fork #1

Closed
wants to merge 64 commits into from
Closed

Conversation

juliev0
Copy link
Owner

@juliev0 juliev0 commented Jun 29, 2023

Fixes numaproj#719

Enables Join Vertex (multiple Vertices publish to a single Vertex). This includes both cases where the Join Vertex is a map type, sink type, or Reduce type.

The implementation of this is based on this design document. (Sorry it's an internal link for now.)

Changes

The Design document goes over the need for an EdgeFetcherSet, which is the set of EdgeFetchers that a Vertex consumes Watermarks from. EdgeFetcherSet traverses the EdgeFetchers for each incoming Edge.

In addition to that, some other changes made include:

  • setting the ID of a message to avoid duplicates, thereby incorporating the publishing vertex name into it
  • enabling all publishing Vertices to use the same partitioning algorithm seed when publishing to a Reduce Vertex to ensure they all partition the same

Cycles

Plan is to enable Cycles in the pipeline as well as long as there is no Reduce Vertex at the point of the cycle or to the right (since Reduce vertex relies on the Watermark, and Cycles create a scenario in which late data arrives after Close-of-book). As of yet, Cycles have not been tested, and also validation needs to be added to prevent the Cycles mentioned above.

juliev0 added 30 commits May 30, 2023 14:39
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
…ishing to the same buffer

Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelmani <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
@@ -261,7 +261,7 @@ func Test_batchSyncWithMaxBatchSize(t *testing.T) {
assert.NoError(t, err)
err = wal.Write(&message)
assert.NoError(t, err)
assert.Equal(t, int64(222), tempWAL.prevSyncedWOffset)
assert.Equal(t, int64(252), tempWAL.prevSyncedWOffset)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the ID changed to include vertex name, partition, etc, it made the overall message longer.

@@ -137,7 +137,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
shuffleFuncMap := make(map[string]*shuffle.Shuffle)
for _, edge := range sp.VertexInstance.Vertex.Spec.ToEdges {
if edge.ToVertexType == dfv1.VertexTypeReduceUDF && edge.GetToVertexPartitionCount() > 1 {
s := shuffle.NewShuffle(sp.VertexInstance.Vertex.GetName(), edge.GetToVertexPartitionCount())
s := shuffle.NewShuffle(edge.To, edge.GetToVertexPartitionCount())
Copy link
Owner Author

@juliev0 juliev0 Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All Vertices publishing to a given Reduce Vertex need to be using the same hashing seed for distributing messages - thus, use the receiving Vertex name for that

Signed-off-by: Julie Vogelman <[email protected]>
// we only consider idle watermark if it is smaller than or equal to min of all the last processed watermarks.
if headWMB.Watermark > e.getMinFromLastProcessed(headWMB.Watermark) {
return wmb.WMB{}
}
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needed to be moved into EdgeFetcherSet.GetHeadWMB()

Signed-off-by: Julie Vogelman <[email protected]>
@@ -67,7 +67,7 @@ metadata:
spec:
redis:
native:
version: 6.2.6`
version: 7.0.11`
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe should move this into a separate PR - just noticed it was outdated

containerTemplate:
env:
- name: NUMAFLOW_DEBUG
value: "true"
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe should take out debug level environment variable for the sake of parity with the other tests

Signed-off-by: Julie Vogelman <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
Signed-off-by: Julie Vogelman <[email protected]>
@juliev0 juliev0 closed this Jul 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

JOIN Vertex to join data from more than one Vertices
1 participant