Skip to content

Commit

Permalink
drivers: fix concurrency with mvcc
Browse files Browse the repository at this point in the history
By default idx:max() or idx:min() have read confirmed isolation level.
It could lead to a task duplication or double task take when we
already insert or update a task, commited, but it is not yet
confirmed.

See also:

1. tarantool/queue#207
2. tarantool/queue#211
  • Loading branch information
oleg-jukovec committed Aug 10, 2023
1 parent 2f8a283 commit 12c6414
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
4 changes: 2 additions & 2 deletions sharded_queue/drivers/fifo.lua
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ end

-- put task in space
function method.put(args)
local task = box.atomic(function()
local task = utils.atomic(function()
local idx = get_index(args)
local task_id = utils.pack_task_id(
args.bucket_id,
Expand All @@ -96,7 +96,7 @@ end

-- take task
function method.take(args)
local task = box.atomic(function()
local task = utils.atomic(function()
local task = get_space(args).index.status:min { state.READY }
if task == nil or task[3] ~= state.READY then
return
Expand Down
4 changes: 2 additions & 2 deletions sharded_queue/drivers/fifottl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ function method.put(args)
local ttr = args.ttr or args.options.ttr or ttl
local priority = args.priority or args.options.priority or 0

local task = box.atomic(function()
local task = utils.atomic(function()
local idx = get_index(args.tube_name, args.bucket_id)

local next_event
Expand Down Expand Up @@ -301,7 +301,7 @@ end

function method.take(args)

local task = box.atomic(take, args)
local task = utils.atomic(take, args)
if task == nil then return end

if args.extra and args.extra.log_request then
Expand Down
18 changes: 18 additions & 0 deletions sharded_queue/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,24 @@ local fiber = require('fiber')

local utils = {}

local function atomic_tail(status, ...)
if not status then
box.rollback()
error((...), 2)
end
box.commit()
return ...
end

function utils.atomic(fun, ...)
--if box.cfg.memtx_use_mvcc_engine then
box.begin({txn_isolation = 'read-committed'})
--else
-- box.begin()
--end
return atomic_tail(pcall(fun, ...))
end

function utils.array_shuffle(array)
if not array then return nil end
math.randomseed(tonumber(0ULL + fiber.time64()))
Expand Down

0 comments on commit 12c6414

Please sign in to comment.