Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GraphQL: updating arrays with delta updates #6307

Open
oliver-sanders opened this issue Aug 16, 2024 · 4 comments
Open

GraphQL: updating arrays with delta updates #6307

oliver-sanders opened this issue Aug 16, 2024 · 4 comments
Labels
bug Something is wrong :( question Flag this as a question for the next Cylc project meeting.
Milestone

Comments

@oliver-sanders
Copy link
Member

oliver-sanders commented Aug 16, 2024

[Edit]: In difference to what's written below, it looks like prereqs are working correctly, but outputs aren't, see #6307 (comment) for the deets.


Data stores maintained by GraphQL updates (i.e. cylc-ui) are not presently able to synchronise some array fields.

I think the following fields are affected (all graphene.List type):

  • latest_state_tasks
  • ns_def_order
  • job_log_names
  • states
  • directives
  • environment
  • outputs
  • messages
  • proxies
  • parents
  • namespace
  • conditions
  • cycle_points
  • namespace
  • external_triggers
  • prerequisites
  • jobs
  • ancestors
  • child_tasks
  • child_families
  • edges
  • workflow_polling_tasks
  • leaves
  • feet

This has been discussed before (can't find the issue dammit). We have previously worked around the issue by repeating all elements of updated arrays in the returned deltas.

Example: Outputs

This subscription tracks task outputs:

subscription {
  deltas {
    updated {
      workflow {
        taskProxies {
          id
          outputs {
            label
            satisfied
          }
        }
      }
    }
  }
}

The corresponding added subscription, would initially yield a list of all outputs, i.e:

[
    {'label': 'submitted', 'satisfied': false},
    {'label': 'started', 'satisfied': false},
    {'label': 'succeeded', 'satisfied': false},
    {'label': 'failed', 'satisfied': false},
    {'label': 'submit-failed', 'satisfied': false},
    {'label': 'expired', 'satisfied': false}
]

Whenever there is a change, e.g. when the first job submits, we will receive only the element that was updated e.g:

[
    {'label': 'submitted', 'satisfied': true},
]

And so on:

[
    {'label': 'started', 'satisfied': true},
]
[
    {'label': 'succeeded', 'satisfied': true},
]

This makes sense, we are pushing only the minimal information down the wire.

But how is the remote data store supposed to work with these deltas?

The remote data store needs to be able to associate the updated element back to the corresponding element in the store in order to update it.

In this case it's easy to do by inspection, the label is the unique key representing the object. The remote data store should look through the outputs until it finds one with the same label, and update the satisfied field to match the update. Although this would require the remote data store to know that label is the unique key for outputs arrays which makes updates complicated.

Example: Prerequisites

Edit: Prerequisites aren't currently an issue because we do not update them incrementally (we just re-generate the whole lot whenever any of them change I think).

However, it's not always so easy. Take the prerequisites field for example:

subscription {
  deltas {
    updated {
      workflow {
        taskProxies {
          id
          prerequisites {
            satisfied
            expression
            conditions {
              taskId
              message
              exprAlias
              satisfied
            }
          }
        }
      }
    }
  }
}

There is no unique key here. The expression field (which has values like c0 & c1 or c0 | c1) is not unique. The conditions array isn't unique either since the same conditions could be arranged into different prerequisite expressions.

The only way we can get a unique key is to form a composite of the following fields:

expression
conditions {
    taskId
    message
    exprAlias
}

Problem: Cylc UI

At present, the Cylc UI data store uses Object.assign(data_store, update) to update the data. This will overwrite each key in data_store with the value in update.

The result is that the full list of task outputs that we received in the added delta, will be completely overwritten by the single output we received in the updated delta.

Solution 1: Repeat all array elements in delta updates

One option would be to just repeat all elements in the update.

This isn't ideal efficiency wise, but it is a quick & dirty fix that would allow us to overcome this hurdle quickly without getting too involved in protocols and update functions.

I.E, rather than returning only the updated item in a delta update:

[
    {'label': 'submitted', 'satisfied': true},
]

We would return the whole array:

[
    {'label': 'submitted', 'satisfied': true},  // the updated element
    {'label': 'started', 'satisfied': false},
    {'label': 'succeeded', 'satisfied': false},
    {'label': 'failed', 'satisfied': false},
    {'label': 'submit-failed', 'satisfied': false},
    {'label': 'expired', 'satisfied': false}
]

Whenever there is a change, e.g. when the first job submits, we will receive only the element that was updated e.g:

Solution 2: Replace arrays with objects

We could replace arrays with Objects where the array index is used as the key.

For example, rather than doing this:

[
    {'label': 'submitted', 'satisfied': false},
    {'label': 'started', 'satisfied': false},
    {'label': 'succeeded', 'satisfied': false},
    ...
]

We would do this:

{
    0: {'label': 'submitted', 'satisfied': false},
    1: {'label': 'started', 'satisfied': false},
    2: {'label': 'succeeded', 'satisfied': false},
    ...
}

The downside to this is that it would create a backwards incompatible change. The only alternative to breaking compatibility would be to create a duplicate field for the new object.

Another downside is that there would be no obvious mechanism for removing elements. We can add them, we can update them, but the only way to remove them is to prune the containing object and (re)send and added delta (essentially destroy and rebuild that part of the store).

Solution 3: Develop an array update protocol

The most sophisticated solution would be to enable to UI to update these arrays intelligently by providing a standard "key" for the update algorithm to use for mapping the update back to the original object in the data store.

E.G. we could standardise on using a field called id:

[  // added delta
    {'id': 'submitted', 'label': 'submitted', 'satisfied': false},
    {'id': 'submitted', 'label': 'started', 'satisfied': false},
    {'id': 'submitted', 'label': 'succeeded', 'satisfied': false},
    ...
]

So that updates can be easily mapped onto data store elements:

[  // updated delta
    {'id': 'submitted', 'satisfied': true},
    ...
]

This protocol would need to include a mechanism for "pruning" an element, e.g:

[  // updated delta
    {'id': 'submitted', '_cylc_remove': true}  // special remove flag
    ...
]
@oliver-sanders
Copy link
Member Author

oliver-sanders commented Aug 16, 2024

Sadly this is impacting the Info view (cylc/cylc-ui#1886) which requires both prerequisites and outputs. It will also want to use several other fields in due course.

I'm tempted to lead towards solution (1) at the moment. It's quick and dirty, but it should be enough to get us moving again. We could chalk up solution (3) as a possible efficiency enhancement in the future.

Whatever solution we pick, we need to apply it globally rather than waiting for bug reports and changing the fields one at a time.

@MetRonnie
Copy link
Member

I've tested on 8.3.x and master, and in the networks panel of devtools I see all the prerequisites listed in every update, even if only one of them changed

@oliver-sanders
Copy link
Member Author

oliver-sanders commented Oct 7, 2024

I've tested on 8.3.x and master, and in the networks panel of devtools I see all the prerequisites listed in every update, even if only one of them changed

Strange!

I've had a go, and it looks like all prereqs are included with each update that contains any prereqs (correct). I must have jumped the gun when writing up this issue, however, the same is not true for outputs, where we only get the updated outputs pushed down the wire.

E.G. Using this example workflow:

[scheduling]
    [[graph]]
        R1 = a => b

[runtime]
    [[a]]
        script = """
            sleep 5
            cylc message -- xxx
            sleep 5
        """
        [[[outputs]]]
            x = xxx

    [[b]]

Then opening the info view on 1/a and inspecting the messages sent down the websocket (via browser devtools), I got the following:

Complete message chain as a list:
[
    {
        "id": "9",
        "type": "start",
        "payload": {
            "variables": {
                "workflowId": "~me/tmp.IgNNIrr6jj/run1",
                "taskID": "1/a"
            },
            "extensions": {},
            "operationName": "InfoViewSubscription",
            "query": "subscription InfoViewSubscription($workflowId: ID, $taskID: ID) {\n  deltas(workflows: [$workflowId]) {\n    added {\n      ...AddedDelta\n      __typename\n    }\n    updated(stripNull: true) {\n      ...UpdatedDelta\n      __typename\n    }\n    __typename\n  }\n}\n\nfragment AddedDelta on Added {\n  taskProxies(ids: [$taskID]) {\n    ...TaskProxyData\n    __typename\n  }\n  __typename\n}\n\nfragment UpdatedDelta on Updated {\n  taskProxies(ids: [$taskID]) {\n    ...TaskProxyData\n    __typename\n  }\n  __typename\n}\n\nfragment TaskProxyData on TaskProxy {\n  id\n  state\n  isHeld\n  isQueued\n  isRunahead\n  task {\n    ...TaskDefinitionData\n    __typename\n  }\n  jobs {\n    ...JobData\n    __typename\n  }\n  prerequisites {\n    satisfied\n    expression\n    conditions {\n      taskId\n      reqState\n      exprAlias\n      satisfied\n      __typename\n    }\n    __typename\n  }\n  outputs {\n    label\n    satisfied\n    __typename\n  }\n  __typename\n}\n\nfragment TaskDefinitionData on Task {\n  meanElapsedTime\n  meta {\n    title\n    description\n    URL\n    userDefined\n    __typename\n  }\n  __typename\n}\n\nfragment JobData on Job {\n  id\n  jobId\n  startedTime\n  state\n  __typename\n}"
        }
    },
    {
        "id": "9",
        "type": "data",
        "payload": {
            "data": {
                "deltas": {
                    "added": {
                        "taskProxies": [
                            {
                                "id": "~me/tmp.IgNNIrr6jj/run1//1/a",
                                "state": "waiting",
                                "isHeld": false,
                                "isQueued": true,
                                "isRunahead": false,
                                "task": {
                                    "meanElapsedTime": 0.0,
                                    "meta": {
                                        "title": "",
                                        "description": "",
                                        "URL": "",
                                        "userDefined": {},
                                        "__typename": "NodeMeta"
                                    },
                                    "__typename": "Task"
                                },
                                "jobs": [],
                                "prerequisites": [],
                                "outputs": [
                                    {
                                        "label": "expired",
                                        "satisfied": false,
                                        "__typename": "Output"
                                    },
                                    {
                                        "label": "submitted",
                                        "satisfied": false,
                                        "__typename": "Output"
                                    },
                                    {
                                        "label": "submit-failed",
                                        "satisfied": false,
                                        "__typename": "Output"
                                    },
                                    {
                                        "label": "started",
                                        "satisfied": false,
                                        "__typename": "Output"
                                    },
                                    {
                                        "label": "succeeded",
                                        "satisfied": false,
                                        "__typename": "Output"
                                    },
                                    {
                                        "label": "failed",
                                        "satisfied": false,
                                        "__typename": "Output"
                                    },
                                    {
                                        "label": "x",
                                        "satisfied": false,
                                        "__typename": "Output"
                                    }
                                ],
                                "__typename": "TaskProxy"
                            }
                        ],
                        "__typename": "Added"
                    },
                    "updated": {
                        "__typename": "Updated"
                    },
                    "__typename": "Deltas"
                }
            }
        }
    },
    {
        "id": "9",
        "type": "data",
        "payload": {
            "data": {
                "deltas": {
                    "added": {
                        "taskProxies": [],
                        "__typename": "Added"
                    },
                    "updated": {
                        "taskProxies": [
                            {
                                "id": "~me/tmp.IgNNIrr6jj/run1//1/a",
                                "state": "preparing",
                                "isQueued": false,
                                "task": {
                                    "meta": {
                                        "title": "",
                                        "description": "",
                                        "URL": "",
                                        "__typename": "NodeMeta"
                                    },
                                    "__typename": "Task"
                                },
                                "__typename": "TaskProxy"
                            }
                        ],
                        "__typename": "Updated"
                    },
                    "__typename": "Deltas"
                }
            }
        }
    },
    {
        "id": "9",
        "type": "data",
        "payload": {
            "data": {
                "deltas": {
                    "added": {
                        "taskProxies": [],
                        "__typename": "Added"
                    },
                    "updated": {
                        "taskProxies": [
                            {
                                "id": "~me/tmp.IgNNIrr6jj/run1//1/a",
                                "state": "submitted",
                                "isQueued": false,
                                "task": {
                                    "meta": {
                                        "title": "",
                                        "description": "",
                                        "URL": "",
                                        "__typename": "NodeMeta"
                                    },
                                    "__typename": "Task"
                                },
                                "jobs": [
                                    {
                                        "id": "~me/tmp.IgNNIrr6jj/run1//1/a/01",
                                        "jobId": "123116",
                                        "state": "submitted",
                                        "__typename": "Job"
                                    }
                                ],
                                "outputs": [
                                    {
                                        "label": "submitted",
                                        "satisfied": true,
                                        "__typename": "Output"
                                    }
                                ],
                                "__typename": "TaskProxy"
                            }
                        ],
                        "__typename": "Updated"
                    },
                    "__typename": "Deltas"
                }
            }
        }
    },
    {
        "id": "9",
        "type": "data",
        "payload": {
            "data": {
                "deltas": {
                    "added": {
                        "taskProxies": [],
                        "__typename": "Added"
                    },
                    "updated": {
                        "taskProxies": [
                            {
                                "id": "~me/tmp.IgNNIrr6jj/run1//1/a",
                                "state": "running",
                                "task": {
                                    "meta": {
                                        "title": "",
                                        "description": "",
                                        "URL": "",
                                        "__typename": "NodeMeta"
                                    },
                                    "__typename": "Task"
                                },
                                "jobs": [
                                    {
                                        "id": "~me/tmp.IgNNIrr6jj/run1//1/a/01",
                                        "jobId": "123116",
                                        "startedTime": "2024-10-07T16:21:46+01:00",
                                        "state": "running",
                                        "__typename": "Job"
                                    }
                                ],
                                "outputs": [
                                    {
                                        "label": "started",
                                        "satisfied": true,
                                        "__typename": "Output"
                                    }
                                ],
                                "__typename": "TaskProxy"
                            }
                        ],
                        "__typename": "Updated"
                    },
                    "__typename": "Deltas"
                }
            }
        }
    },
    {
        "id": "9",
        "type": "data",
        "payload": {
            "data": {
                "deltas": {
                    "added": {
                        "taskProxies": [],
                        "__typename": "Added"
                    },
                    "updated": {
                        "taskProxies": [
                            {
                                "id": "~me/tmp.IgNNIrr6jj/run1//1/a",
                                "task": {
                                    "meta": {
                                        "title": "",
                                        "description": "",
                                        "URL": "",
                                        "__typename": "NodeMeta"
                                    },
                                    "__typename": "Task"
                                },
                                "jobs": [
                                    {
                                        "id": "~me/tmp.IgNNIrr6jj/run1//1/a/01",
                                        "jobId": "123116",
                                        "startedTime": "2024-10-07T16:21:46+01:00",
                                        "state": "running",
                                        "__typename": "Job"
                                    }
                                ],
                                "outputs": [
                                    {
                                        "label": "x",
                                        "satisfied": true,
                                        "__typename": "Output"
                                    }
                                ],
                                "__typename": "TaskProxy"
                            }
                        ],
                        "__typename": "Updated"
                    },
                    "__typename": "Deltas"
                }
            }
        }
    },
    {
        "id": "9",
        "type": "data",
        "payload": {
            "data": {
                "deltas": {
                    "added": {
                        "taskProxies": [],
                        "__typename": "Added"
                    },
                    "updated": {
                        "taskProxies": [
                            {
                                "id": "~me/tmp.IgNNIrr6jj/run1//1/a",
                                "state": "succeeded",
                                "task": {
                                    "meanElapsedTime": 12.0,
                                    "meta": {
                                        "title": "",
                                        "description": "",
                                        "URL": "",
                                        "__typename": "NodeMeta"
                                    },
                                    "__typename": "Task"
                                },
                                "jobs": [
                                    {
                                        "id": "~me/tmp.IgNNIrr6jj/run1//1/a/01",
                                        "jobId": "123116",
                                        "startedTime": "2024-10-07T16:21:46+01:00",
                                        "state": "succeeded",
                                        "__typename": "Job"
                                    }
                                ],
                                "outputs": [
                                    {
                                        "label": "succeeded",
                                        "satisfied": true,
                                        "__typename": "Output"
                                    }
                                ],
                                "__typename": "TaskProxy"
                            }
                        ],
                        "__typename": "Updated"
                    },
                    "__typename": "Deltas"
                }
            }
        }
    },
    {
        "id": "9",
        "type": "data",
        "payload": {
            "data": {
                "deltas": {
                    "added": {
                        "taskProxies": [],
                        "__typename": "Added"
                    },
                    "updated": {
                        "__typename": "Updated"
                    },
                    "__typename": "Deltas"
                }
            }
        }
    },
    {
        "id": "9",
        "type": "data",
        "payload": {
            "data": {
                "deltas": {
                    "added": {
                        "taskProxies": [],
                        "__typename": "Added"
                    },
                    "updated": {
                        "__typename": "Updated"
                    },
                    "__typename": "Deltas"
                }
            }
        }
    },
    {
        "id": "9",
        "type": "data",
        "payload": {
            "data": {
                "deltas": {
                    "added": {
                        "taskProxies": [],
                        "__typename": "Added"
                    },
                    "updated": {
                        "__typename": "Updated"
                    },
                    "__typename": "Deltas"
                }
            }
        }
    },
    {
        "id": "9",
        "type": "data",
        "payload": {
            "data": {
                "deltas": {
                    "added": {
                        "taskProxies": [],
                        "__typename": "Added"
                    },
                    "updated": {
                        "__typename": "Updated"
                    },
                    "__typename": "Deltas"
                }
            }
        }
    },
    {
        "id": "9",
        "type": "data",
        "payload": {
            "data": {
                "deltas": {
                    "added": {
                        "taskProxies": [],
                        "__typename": "Added"
                    },
                    "updated": {
                        "__typename": "Updated"
                    },
                    "__typename": "Deltas"
                }
            }
        }
    },
    {
        "id": "9",
        "type": "data",
        "payload": {
            "data": {
                "deltas": {
                    "added": {
                        "taskProxies": [],
                        "__typename": "Added"
                    },
                    "updated": {
                        "__typename": "Updated"
                    },
                    "__typename": "Deltas"
                }
            }
        }
    },
    {
        "id": "9",
        "type": "data",
        "payload": {
            "data": {
                "deltas": {
                    "added": {
                        "taskProxies": [],
                        "__typename": "Added"
                    },
                    "updated": {
                        "__typename": "Updated"
                    },
                    "__typename": "Deltas"
                }
            }
        }
    }
]

Summary:

  • Subscription issued.
  • (message 1) Received added delta which lists all outputs (satisfied=false).
  • (message 2) Received updated delta containing the task state change waiting -> preparing.
  • (message 3) Received only the "submitted" output (satisfied=true).
  • (message 4) Received only the "started" output (satisfied=true).
  • (message 5) Received only the "x" output (satisfied=true).
  • (message 6) Received only the "succeeded" output (satisfied=true).
  • (message 7) Blank.
  • (message 8) Blank.
  • ...
  • (message 13) Blank.

@oliver-sanders
Copy link
Member Author

With this diff, I get all of the outputs, every time any one of them is updated:

diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py
index 01b43bfe9..fda578e6c 100644
--- a/cylc/flow/data_store_mgr.py
+++ b/cylc/flow/data_store_mgr.py
@@ -2428,11 +2428,25 @@ class DataStoreMgr:
         tp_delta = self.updated[TASK_PROXIES].setdefault(
             tp_id, PbTaskProxy(id=tp_id))
         tp_delta.stamp = f'{tp_id}@{update_time}'
-        output = tp_delta.outputs[label]
-        output.label = label
-        output.message = message
-        output.satisfied = outputs.is_message_complete(message)
-        output.time = update_time
+
+        for _label in tproxy.outputs:
+            output = tp_delta.outputs[_label]
+
+            # set the new output
+            if _label == label:
+                output.label = label
+                output.message = message
+                output.satisfied = outputs.is_message_complete(message)
+
+            # ensure all outputs are included in the delta
+            else:
+                _output = tproxy.outputs[_label]
+                output.label = _output.label
+                output.message = _output.message
+                output.satisfied = _output.satisfied
+
+            output.time = update_time
+
         self.updates_pending = True
 
     def delta_task_outputs(self, itask: TaskProxy) -> None:

So this might be enough for us to press ahead with the metadata view, although there are many other lists out there in the schema, and any one of them could go wrong in this way.

It's a shame as this is a negative change really. The data store code is doing the right thing at both ends, it's just that we don't have any protocol for incrementally updating lists.



Q) So why are prerequisites behaving differently to outputs?

A) Because the prerequsite update is done in a lazier fashion, every time any of the prereqs are updated, we re-generate the all of them from the Cylc Prerequisite object.

def delta_task_prerequisite(self, itask: TaskProxy) -> None:
"""Create delta for change in task proxy prerequisite.
Args:
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy
objects from the workflow task pool.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
update_time = time()
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{update_time}'
prereq_list = []
for prereq in itask.state.prerequisites:
# Protobuf messages populated within
prereq_obj = prereq.api_dump()
if prereq_obj:
prereq_list.append(prereq_obj)
del tp_delta.prerequisites[:]
tp_delta.prerequisites.extend(prereq_list)
self.updates_pending = True

@oliver-sanders oliver-sanders added the question Flag this as a question for the next Cylc project meeting. label Oct 16, 2024
oliver-sanders added a commit to oliver-sanders/cylc-flow that referenced this issue Oct 29, 2024
* We don't have a protocol for updating arbitrary lists incrementally.
* See cylc#6307
* This is a temporary patch to disable incremental output updates until
  we have a suitable protocol for doing so.
oliver-sanders added a commit to oliver-sanders/cylc-flow that referenced this issue Oct 29, 2024
* We don't have a protocol for updating arbitrary lists incrementally.
* See cylc#6307
* This is a temporary patch to disable incremental output updates until
  we have a suitable protocol for doing so.
dwsutherland pushed a commit that referenced this issue Oct 30, 2024
* We don't have a protocol for updating arbitrary lists incrementally.
* See #6307
* This is a temporary patch to disable incremental output updates until
  we have a suitable protocol for doing so.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is wrong :( question Flag this as a question for the next Cylc project meeting.
Projects
None yet
Development

No branches or pull requests

2 participants