Skip to content

Commit

Permalink
pooltransfer: Support non-ClusterSerializer refcounting
Browse files Browse the repository at this point in the history
  • Loading branch information
jpsamaroo committed Mar 7, 2022
1 parent 13b8dd9 commit a7299db
Showing 1 changed file with 27 additions and 3 deletions.
30 changes: 27 additions & 3 deletions src/datastore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,25 @@ function Serialization.serialize(io::AbstractSerializer, d::DRef)
serialize(io, d.id)
serialize(io, d.size)

_pooltransfer_send(io, d)
end
function _pooltransfer_send(io::Distributed.ClusterSerializer, d::DRef)
pid = Distributed.worker_id_from_socket(io.io)
if pid != -1
pooltransfer_send(d, pid)
else
@warn "Couldn't determine destination for DRef serialization\nRefcounting will be broken"
return
end
pid = Distributed.worker_id_from_socket(io)
if pid != -1
pooltransfer_send(d, pid)
return
end
@warn "Couldn't determine destination for DRef serialization\nRefcounting will be broken"
end
function _pooltransfer_send(io::AbstractSerializer, d::DRef)
# We assume that we're not making copies of the serialized DRef
# N.B. This is not guaranteed to be correct
poolref(d)
end
function Serialization.deserialize(io::AbstractSerializer, dt::Type{DRef})
# Construct the object
Expand All @@ -41,10 +54,21 @@ function Serialization.deserialize(io::AbstractSerializer, dt::Type{DRef})
ccall(:jl_set_nth_field, Cvoid, (Any, Csize_t, Any), d, i-1, Serialization.handle_deserialize(io, tag))
end
end

_pooltransfer_recv(io, d)
return d
end
function _pooltransfer_recv(io::Distributed.ClusterSerializer, d)
# Add a new reference manually, and unref on finalization
poolref(d, true)
finalizer(poolunref, d)
d
end
function _pooltransfer_recv(io::AbstractSerializer, d)
# N.B. This is not guaranteed to be correct
poolref(d)
finalizer(poolunref, d)
# Matches the poolref during serialization
poolunref(d)
end

# Ensure we call the DRef ctor
Expand Down

0 comments on commit a7299db

Please sign in to comment.