Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SYSTEMDS-3729] Add roll reorg operations in FED #2126

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

min-guk
Copy link
Contributor

@min-guk min-guk commented Sep 28, 2024

I haven't fully completed the implementation of the federated version yet, but I am creating this PR along with the code to ask some questions.

@min-guk
Copy link
Contributor Author

min-guk commented Sep 28, 2024

Here, I have outlined my implementation intentions, current progress, issues, and questions.

I realize that the content might be a bit extensive, but after thinking it through on my own for a long time, I still find myself uncertain. Therefore, I kindly ask for guidance.


1. Original Implementation Intent

I aimed to implement the Federated Roll Function as follows:

  1. Coordinator Role:

    • Shift the begindims and enddims of the fedworker matrices stored in fedmap by a specified shift value.
  2. Matrix Splitting Due to Shift:

    • If the shift causes a specific fedworker matrix to be split into two, dividing the entire matrix between the end and the beginning:
      • Existing Element: Modify fedrange to point to the end of the entire matrix, keeping feddata the same (data from a non-split fedworker).
      • Add New Element: Add a new entry with fedrange pointing to the beginning of the entire matrix, keeping feddata the same (data from a non-split fedworker).
  3. Handling Split fedworkers:

    • Send an EXEC_INST(rightindex) fedrequest only to the fedworkers that are being split, to divide and store their feddata.

2. Current Implementation Method

Due to uncertainties in implementing step (3) as intended, I had to deviate and implement it as follows:

  1. Sending EXEC_UDP(SplitRow) fedrequest to All fedworkers:

    • Non-split fedworkers: Set the existing matrix as output.
    • Split fedworkers: Perform two fedrequests:
      1. Slice the existing matrix to become the initial part of the entire matrix and set it as output.
      2. Slice the existing matrix to become the final part of the entire matrix and set it as output.
  2. Collecting Results:

    • Retrieve the results using MatrixObject out = ec.getMatrixObject(output);.

3. Problems and Questions

Problem 1: Data Interference Issues in Split fedworkers

  • Situation:

    • Split fedworkers perform two fedrequests through a fedmap that points to the same data but with different ranges.
    • The results are saved using ec.setMatrixOutput(String.valueOf(_outputID), resBlock), resulting in only one of the execution results being stored.
  • Question:

    • Is there a way to access the same feddata during fedrequest execution and save it as different feddata?

Problem 2: Avoiding fedrequest Communication with Non-split fedworkers

  • Situation:

    • Currently, results are aggregated using MatrixObject out = ec.getMatrixObject(output);, which requires communicating with non-split fedworkers to set their existing matrices as output.
  • Question:

    • Is there a method to set the input as output within the coordinator without communicating with specific fedworkers?

Problem 3: Using EXEC_UDP Instead of EXEC_INST

  • Situation:

    • The federated rightindex slices the entire matrix rather than a specific fedworker's matrix.
    • Therefore, the current approach of splitting a specific fedworker's matrix into two different feddata entries does not seem compatible.
  • Questions:

    • Can federated rightindex be applied to the current situation?
    • Are there potential issues or significant reductions in code reliability when using EXEC_UDP?

@min-guk
Copy link
Contributor Author

min-guk commented Sep 29, 2024

I'm planning to reimplement the first problem using the rightindex function and the third problem with broadcastslice function.

For the second problem, I haven't found a suitable reference yet, but I'll study more to resolve it.

@mboehm7
Copy link
Contributor

mboehm7 commented Sep 29, 2024

sorry for the delay, ad 3: you can send a single CP rightindex instruction to the specific fed worker which we need to handle; ad 2: I would recommend to do something as follows:

MatrixObject out = ec.getMatrixObject(output);
FederationMap outputFedMap = //copy old fedmap, add ranges, execute federated request for relevant range only 
out.setFedMapping(outputFedMap);

@min-guk
Copy link
Contributor Author

min-guk commented Sep 30, 2024

Not at all. Thank you very much for your advice!

I will try to proceed according to your suggestion.

@min-guk
Copy link
Contributor Author

min-guk commented Sep 30, 2024

@mboehm7 I sincerely apologize for asking again, but I would like to inquire once more about problem 1.

When performing a fedmap shift, a new fedmap element is added and shifted as shown below code. However, the newly added fedmap element points to a different range but refers to the same file (data), which seems to cause a problem.

/*Common*/
FederationMap outFedMapID1 = mo1.getFedMapping().copyWithNewID(outID1);
// outFedMap=[<[(0,10),(25,10)], fileA_ID1>, <[(25,10),(50,10)], fileB_ID1>, <[(50,10),(75,10)], fileC_ID1>,
//            <[(75,10),(100,10)], fileD_ID1>]

outFedMap.rollFedMap(shift=5)(;
// outFedMap=[<[(5,10),(30,10)], fileA_ID1>, <[(30,10),(55,10)], fileB_ID1>, <[(55,10),(80,10)], fileC_ID1>, 
//            <[(80,10),(100,10)], fileD_ID1>, <[(0,10),(5,10)], fileD_ID1>]

Based on my understanding, there seem to be two possible approaches, but each has its own issue:

image

/*CASE 1*/
Future<FederatedResponse>[] ffr = outFedMap.executeRollEnd(getTID(), true, fr, frEndID1, frStartID1);
// 1) frEnd:  Slice only <[(75,10),(100,10)], fileD_ID0> into [(80,10),(100,10)], fileD_ID1>
// 2) ftStart: Slice only <[(75,10),(100,10)], fileD_ID0> into <[(0,10),(5,10)], fileD_ID1> (Overwrite)

MatrixObject out = ec.getMatrixObject(output);
out.setFedMapping(outFedMapID1);

In CASE 1, fr is performed on two fedmap elements with the same ID, but since the file is the same, one of the fr results is overwritten and lost.
-> Is there a way for the fedworker to store the output differently after performing fr?

image

/*CASE 2*/
Future<FederatedResponse>[] ffr1 = outFedMap.executeRollEnd(getTID(), true, frID1, frEndID1);
// 1) frEnd:  Slice only <[(75,10),(100,10)], fileD_ID0> into [(80,10),(100,10)], fileD_ID1>

FederationMap outFedMapID2 = outFedMapID1.copyWithNewID(outID2);
Future<FederatedResponse>[] ffr2 = outFedMap.executeRollEnd(getTID(), true, frID2, frStartID2);
// 2) ftStart: Slice only <[(75,10),(100,10)], fileD_ID0> into <[(0,10),(5,10)], fileD_ID2>

MatrixObject out = ec.getMatrixObject(output);
out.setFedMapping(outFedMapID1); // lost <[(0,10),(5,10)], fileD_ID2>

In CASE 2, fr is performed on two fedmap elements with different IDs, but elements with different IDs cannot be mapped together.
-> Is there a way to map elements with different IDs together?

@mboehm7
Copy link
Contributor

mboehm7 commented Sep 30, 2024

yes we need to send two rightindex instructions to the federated worker and bind them to different IDs (which will automatically create temporary file names).

EXEC_INST rightindex var1 1 20 1 100 var2
EXEC_INST rightindex var1 20 25 1 100 var3

and then update the federated ranges accordingly at the coordinator. This requires a custom handling instead of copyWithNewID. Try to get as far as you get, and I would then do the rest during the merge.

@min-guk
Copy link
Contributor Author

min-guk commented Sep 30, 2024

Thank you for the quick response. I now have a clear understanding of the problem.

I had hoped there might be an existing API to handle this, but it seems a new implementation is needed.

I’ll give it my best shot and see it through!

@min-guk
Copy link
Contributor Author

min-guk commented Oct 13, 2024

I am submitting this question through the PR to provide better context by including the relevant code. Currently, only FED using CP INST works correctly, while FED using SP INST encounters errors. The detailed implementation of federatedRollFunction discussed in the previous meeting is as follows:

  1. For non-partitioned fedData, the modified range and the same varID as the input are registered in fedMap.
  2. For partitioned fedData, fedRange is modified, and two fedRequests are called with two different varIDs.
  3. fedRequest calls EXEC_INST instead of EXEC_UDF, reusing the already implemented instructions. (Not implemented yet, future work)

Question 1.

Although step 2 was implemented, the result of FederationUtils.getResults(ffr) in the roll function differed from the final result checked by readBlobFromFederated() in the test. This is because, in the federated roll function, the fedData in fedMap uses different varIDs, but the requestFederatedData function used by readBlobFromFederated() requests fedData using the same single _ID. In most cases, fedData belonging to the fedMap uses the single varID, so it seems to have been implemented this way previously.

Therefore, I modified the requestFederatedData function as follows to perform requests based on the varID of each fedData. Since this function is used in multiple places, I would like to ask if it’s okay to modify it this way. Additionally, if this modification is correct, should other functions like forEachParallel be modified similarly?

public List<Pair<FederatedRange, Future<FederatedResponse>>> requestFederatedData() {
	// FederatedRequest request = new FederatedRequest(RequestType.GET_VAR, _ID); // previous
	for(Pair<FederatedRange, FederatedData> e : _fedMap)
		FederatedRequest request = new FederatedRequest(RequestType.GET_VAR, e.getValue().getVarID());
		readResponses.add(Pair.of(e.getKey(), e.getValue().executeFederatedOperation(request)));
	return readResponses;
}

Question 2.

The current issue with FED using SP INST is that it fails to read fedData corresponding to step 1 in readBlobFromFederated(). However, if the fedData in step 1 is simply copied as output using a output varID, it is successfully read.

Since both CP and SP proceed using the same EXEC_UDF in the processInstruction() function, it seems likely that there is a task outside of processInstruction() that requests the fedWorker to clean the input varID data. I would like to confirm if this assumption is correct, and whether there are any additional considerations specifically for SP.

Below are the fedMap, readResponse, and error messages that I checked with the debugger when running readBlobFromFederated().

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

2 participants