Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Rojo sync reliability #739

Merged
merged 2 commits into from
Jul 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 2 additions & 13 deletions plugin/src/ApiContext.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,6 @@ local validateApiInfo = Types.ifEnabled(Types.ApiInfoResponse)
local validateApiRead = Types.ifEnabled(Types.ApiReadResponse)
local validateApiSubscribe = Types.ifEnabled(Types.ApiSubscribeResponse)

--[[
Returns a promise that will never resolve nor reject.
]]
local function hangingPromise()
return Promise.new(function() end)
end

local function rejectFailedRequests(response)
if response.code >= 400 then
local message = string.format("HTTP %s:\n%s", tostring(response.code), response.body)
Expand Down Expand Up @@ -212,12 +205,8 @@ function ApiContext:retrieveMessages()
local function sendRequest()
local request = Http.get(url)
:catch(function(err)
if err.type == Http.Error.Kind.Timeout then
if self.__connected then
return sendRequest()
else
return hangingPromise()
end
if err.type == Http.Error.Kind.Timeout and self.__connected then
return sendRequest()
end

return Promise.reject(err)
Expand Down
55 changes: 36 additions & 19 deletions plugin/src/ServeSession.lua
Original file line number Diff line number Diff line change
Expand Up @@ -299,27 +299,44 @@ function ServeSession:__initialSync(serverInfo)
end

function ServeSession:__mainSyncLoop()
return self.__apiContext:retrieveMessages()
:andThen(function(messages)
Log.trace("Serve session {} retrieved {} messages", tostring(self), #messages)

for _, message in ipairs(messages) do
local unappliedPatch = self.__reconciler:applyPatch(message)

if not PatchSet.isEmpty(unappliedPatch) then
Log.warn("Could not apply all changes requested by the Rojo server:\n{}",
PatchSet.humanSummary(self.__instanceMap, unappliedPatch))
end

if self.__patchAppliedCallback then
pcall(self.__patchAppliedCallback, message, unappliedPatch)
end
return Promise.new(function(resolve, reject)
while self.__status == Status.Connected do
local success, result = self.__apiContext:retrieveMessages()
:andThen(function(messages)
if self.__status == Status.Disconnected then
-- In the time it took to retrieve messages, we disconnected
-- so we just resolve immediately without patching anything
return
end

Log.trace("Serve session {} retrieved {} messages", tostring(self), #messages)

for _, message in messages do
local unappliedPatch = self.__reconciler:applyPatch(message)

if not PatchSet.isEmpty(unappliedPatch) then
Log.warn("Could not apply all changes requested by the Rojo server:\n{}",
PatchSet.humanSummary(self.__instanceMap, unappliedPatch))
end

if self.__patchAppliedCallback then
pcall(self.__patchAppliedCallback, message, unappliedPatch)
end
end
end):await()

if self.__status == Status.Disconnected then
-- If we are no longer connected after applying, we stop silently
-- without checking for errors as they are no longer relevant
break
elseif success == false then
reject(result)
end
end

if self.__status ~= Status.Disconnected then
return self:__mainSyncLoop()
end
end)
-- We are no longer connected, so we resolve the promise
resolve()
end)
end

function ServeSession:__stopInternal(err)
Expand Down
Loading