Skip to content

Commit

Permalink
Improve Rojo sync reliability (#739)
Browse files Browse the repository at this point in the history
Uses non-recursive main loop and avoids hanging promises
  • Loading branch information
boatbomber authored Jul 16, 2023
1 parent dd01a9b commit 8662d22
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 32 deletions.
15 changes: 2 additions & 13 deletions plugin/src/ApiContext.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,6 @@ local validateApiInfo = Types.ifEnabled(Types.ApiInfoResponse)
local validateApiRead = Types.ifEnabled(Types.ApiReadResponse)
local validateApiSubscribe = Types.ifEnabled(Types.ApiSubscribeResponse)

--[[
Returns a promise that will never resolve nor reject.
]]
local function hangingPromise()
return Promise.new(function() end)
end

local function rejectFailedRequests(response)
if response.code >= 400 then
local message = string.format("HTTP %s:\n%s", tostring(response.code), response.body)
Expand Down Expand Up @@ -212,12 +205,8 @@ function ApiContext:retrieveMessages()
local function sendRequest()
local request = Http.get(url)
:catch(function(err)
if err.type == Http.Error.Kind.Timeout then
if self.__connected then
return sendRequest()
else
return hangingPromise()
end
if err.type == Http.Error.Kind.Timeout and self.__connected then
return sendRequest()
end

return Promise.reject(err)
Expand Down
55 changes: 36 additions & 19 deletions plugin/src/ServeSession.lua
Original file line number Diff line number Diff line change
Expand Up @@ -299,27 +299,44 @@ function ServeSession:__initialSync(serverInfo)
end

function ServeSession:__mainSyncLoop()
return self.__apiContext:retrieveMessages()
:andThen(function(messages)
Log.trace("Serve session {} retrieved {} messages", tostring(self), #messages)

for _, message in ipairs(messages) do
local unappliedPatch = self.__reconciler:applyPatch(message)

if not PatchSet.isEmpty(unappliedPatch) then
Log.warn("Could not apply all changes requested by the Rojo server:\n{}",
PatchSet.humanSummary(self.__instanceMap, unappliedPatch))
end

if self.__patchAppliedCallback then
pcall(self.__patchAppliedCallback, message, unappliedPatch)
end
return Promise.new(function(resolve, reject)
while self.__status == Status.Connected do
local success, result = self.__apiContext:retrieveMessages()
:andThen(function(messages)
if self.__status == Status.Disconnected then
-- In the time it took to retrieve messages, we disconnected
-- so we just resolve immediately without patching anything
return
end

Log.trace("Serve session {} retrieved {} messages", tostring(self), #messages)

for _, message in messages do
local unappliedPatch = self.__reconciler:applyPatch(message)

if not PatchSet.isEmpty(unappliedPatch) then
Log.warn("Could not apply all changes requested by the Rojo server:\n{}",
PatchSet.humanSummary(self.__instanceMap, unappliedPatch))
end

if self.__patchAppliedCallback then
pcall(self.__patchAppliedCallback, message, unappliedPatch)
end
end
end):await()

if self.__status == Status.Disconnected then
-- If we are no longer connected after applying, we stop silently
-- without checking for errors as they are no longer relevant
break
elseif success == false then
reject(result)
end
end

if self.__status ~= Status.Disconnected then
return self:__mainSyncLoop()
end
end)
-- We are no longer connected, so we resolve the promise
resolve()
end)
end

function ServeSession:__stopInternal(err)
Expand Down

0 comments on commit 8662d22

Please sign in to comment.