Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add automatic swap-to-disk support #60

Merged
merged 9 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
*.jl.*.cov
*.jl.mem
.mempool
Manifest.toml
Manifest.toml
*.swp
4 changes: 2 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name = "MemPool"
uuid = "f9f48841-c794-520a-933b-121f7ba6ed94"
license = "MIT"
desc = "a simple distributed data store"
version = "0.3.9"
version = "0.4.0"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Expand All @@ -14,7 +14,7 @@ Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"

[compat]
DataStructures = "0.18"
julia = "1.4"
julia = "1.7"

[extras]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Expand Down
48 changes: 44 additions & 4 deletions src/MemPool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module MemPool

using Serialization, Sockets, Random
import Serialization: serialize, deserialize
export DRef, FileRef, poolset, poolget, pooldelete, destroyonevict,
export DRef, FileRef, poolset, poolget, pooldelete,
movetodisk, copytodisk, savetodisk, mmwrite, mmread, cleanup,
deletefromdisk, poolref, poolunref
import .Threads: ReentrantLock
Expand Down Expand Up @@ -109,16 +109,56 @@ function approx_size(s::Symbol)
end

function __init__()
global session = "sess-" * randstring(5)
global session = "sess-" * randstring(6)
try
global host = getipaddr()
catch err
global host = Sockets.localhost
end
datastore_lock[] = NonReentrantLock()
id_counter[] = Atomic{Int}(0)
if parse(Bool, get(ENV, "JULIA_MEMPOOL_EXPERIMENTAL_FANCY_ALLOCATOR", "0"))
membound = parse(Int, get(ENV, "JULIA_MEMPOOL_EXPERIMENTAL_MEMORY_BOUND", repr(8*(1024^3))))
diskpath = get(ENV, "JULIA_MEMPOOL_EXPERIMENTAL_DISK_CACHE", joinpath(default_dir(), randstring(6)))
diskdevice = SerializationFileDevice(FilesystemResource(), diskpath)
diskbound = parse(Int, get(ENV, "JULIA_MEMPOOL_EXPERIMENTAL_DISK_BOUND", repr(32*(1024^3))))
kind = get(ENV, "JULIA_MEMPOOL_EXPERIMENTAL_ALLOCATOR_KIND", "MRU")
if !(kind in ("LRU", "MRU"))
@warn "Unknown allocator kind: $kind\nDefaulting to MRU"
kind = "MRU"
end
GLOBAL_DEVICE[] = SimpleRecencyAllocator(membound, diskdevice, diskbound, Symbol(kind))
end

# Ensure we cleanup all references
atexit() do
exit_flag[] = true
kill_counter = 10
empty!(file_to_dref)
empty!(who_has_read)
while kill_counter > 0 && with_lock(()->!isempty(datastore), datastore_lock)
GC.gc()
sleep(1)
kill_counter -= 1
end
with_lock(datastore_lock) do
if length(datastore) > 0
@warn "Failed to cleanup datastore after 10 seconds\nForcibly evicting all entries"
for id in collect(keys(datastore))
state = MemPool.datastore[id]
device = storage_read(state).root
if device !== nothing
@debug "Evicting ref $id with device $device"
try
delete_from_device!(device, state, id)
catch
end
end
delete!(MemPool.datastore, id)
end
end
end
if ispath(default_dir())
rm(default_dir(); recursive=true)
end
end
end

Expand Down
Loading