diff --git a/CHANGELOG.md b/CHANGELOG.md index 1adb25f3be..99449e06eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # main +# v5.16 + +- New function `offline_run!` allows writing data to file at predefined intervals during `run!` instead of storing it in memory. Currently supports [CSV](https://csv.juliadata.org/stable/) and [Arrow](https://apache.github.io/arrow-julia/stable/) files. + # v5.15 - Agents.jl moved to Julia 1.9+, and now exports visualization and interactive applications automatically once Makie (or Makie backends diff --git a/Project.toml b/Project.toml index 64860fbd9d..1219060b4c 100644 --- a/Project.toml +++ b/Project.toml @@ -1,11 +1,11 @@ name = "Agents" uuid = "46ada45e-f475-11e8-01d0-f70cc89e6671" authors = ["George Datseris", "Tim DuBois", "Aayush Sabharwal", "Ali Vahdati", "Adriano Meligrana"] -version = "5.15.3" +version = "5.16.0" [deps] -CommonSolve = "38540f10-b2f7-11e9-35d8-d573e4eb0ff2" CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" +CommonSolve = "38540f10-b2f7-11e9-35d8-d573e4eb0ff2" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" @@ -25,16 +25,19 @@ StaticArrays = "90137ffa-7385-5640-81b9-e52037218182" StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91" [weakdeps] +Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45" Makie = "ee78f7c6-11fb-53f2-987a-cfe4a2b5a57a" OSMMakie = "76b6901f-8821-46bb-9129-841bc9cfe677" [extensions] -AgentsVisualizations = "Makie" AgentsOSMVisualizations = "OSMMakie" +AgentsVisualizations = "Makie" +AgentsArrow = "Arrow" [compat] -CommonSolve = "0.2.4" +Arrow = "2" CSV = "0.9.7, 0.10" +CommonSolve = "0.2.4" DataFrames = "0.21, 0.22, 1" DataStructures = "0.18" Distributions = "0.25" @@ -50,9 +53,10 @@ StatsBase = "0.32, 0.33, 0.34" julia = "1.9" [extras] +Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45" BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" StableRNGs = "860ef19b-820b-49d6-a774-d7a799459cd3" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test", "BenchmarkTools", "StableRNGs"] +test = ["Test", "BenchmarkTools", "StableRNGs", "Arrow"] diff --git a/docs/src/api.md b/docs/src/api.md index b980b0c3b8..e921a9c7b3 100644 --- a/docs/src/api.md +++ b/docs/src/api.md @@ -328,5 +328,10 @@ AgentsIO.populate_from_csv! AgentsIO.dump_to_csv ``` +It is also possible to write data to file at predefined intervals while running your model, instead of storing it in memory: +```@docs +offline_run! +``` + In case you require custom serialization for model properties, refer to the [Developer Docs](@ref) for details. diff --git a/ext/AgentsArrow/AgentsArrow.jl b/ext/AgentsArrow/AgentsArrow.jl new file mode 100644 index 0000000000..537911bd65 --- /dev/null +++ b/ext/AgentsArrow/AgentsArrow.jl @@ -0,0 +1,25 @@ +module AgentsArrow + +using Agents, Arrow + +function Agents.writer_arrow(filename, data, append) + if append + Arrow.append(filename, data) + else + Arrow.write(filename, data; file = false) + end +end + +# TODO: Implement populate_from and dump_to functions for Arrow.jl + +function AgentsIO.populate_from_arrow!() + @error "Not yet implemented." + return +end + +function AgentsIO.dump_to_arrow() + @error "Not yet implemented." + return +end + +end diff --git a/ext/AgentsOSMVisualizations.jl b/ext/AgentsOSMVisualizations/AgentsOSMVisualizations.jl similarity index 100% rename from ext/AgentsOSMVisualizations.jl rename to ext/AgentsOSMVisualizations/AgentsOSMVisualizations.jl diff --git a/ext/AgentsVisualizations.jl b/ext/AgentsVisualizations/AgentsVisualizations.jl similarity index 100% rename from ext/AgentsVisualizations.jl rename to ext/AgentsVisualizations/AgentsVisualizations.jl diff --git a/ext/src/abmplot.jl b/ext/AgentsVisualizations/src/abmplot.jl similarity index 100% rename from ext/src/abmplot.jl rename to ext/AgentsVisualizations/src/abmplot.jl diff --git a/ext/src/convenience.jl b/ext/AgentsVisualizations/src/convenience.jl similarity index 100% rename from ext/src/convenience.jl rename to ext/AgentsVisualizations/src/convenience.jl diff --git a/ext/src/daisyworld_def.jl b/ext/AgentsVisualizations/src/daisyworld_def.jl similarity index 100% rename from ext/src/daisyworld_def.jl rename to ext/AgentsVisualizations/src/daisyworld_def.jl diff --git a/ext/src/deprecations.jl b/ext/AgentsVisualizations/src/deprecations.jl similarity index 100% rename from ext/src/deprecations.jl rename to ext/AgentsVisualizations/src/deprecations.jl diff --git a/ext/src/inspection.jl b/ext/AgentsVisualizations/src/inspection.jl similarity index 100% rename from ext/src/inspection.jl rename to ext/AgentsVisualizations/src/inspection.jl diff --git a/ext/src/interaction.jl b/ext/AgentsVisualizations/src/interaction.jl similarity index 100% rename from ext/src/interaction.jl rename to ext/AgentsVisualizations/src/interaction.jl diff --git a/ext/src/lifting.jl b/ext/AgentsVisualizations/src/lifting.jl similarity index 100% rename from ext/src/lifting.jl rename to ext/AgentsVisualizations/src/lifting.jl diff --git a/ext/src/model_observable.jl b/ext/AgentsVisualizations/src/model_observable.jl similarity index 100% rename from ext/src/model_observable.jl rename to ext/AgentsVisualizations/src/model_observable.jl diff --git a/ext/src/utils.jl b/ext/AgentsVisualizations/src/utils.jl similarity index 100% rename from ext/src/utils.jl rename to ext/AgentsVisualizations/src/utils.jl diff --git a/src/simulations/collect.jl b/src/simulations/collect.jl index 1b0d1303ff..376f7de793 100644 --- a/src/simulations/collect.jl +++ b/src/simulations/collect.jl @@ -1,4 +1,5 @@ export run!, + offline_run!, collect_agent_data!, collect_model_data!, init_agent_dataframe, @@ -34,6 +35,8 @@ Run the model (step it with the input arguments propagated into [`step!`](@ref)) data specified by the keywords, explained one by one below. Return the data as two `DataFrame`s, one for agent-level data and one for model-level data. +See also [`offline_run!`](@ref) to write data to file while running the model. + ## Data-deciding keywords * `adata::Vector` means "agent data to collect". If an entry is a `Symbol`, e.g. `:weight`, then the data for this entry is agent's field `weight`. If an entry is a `Function`, e.g. @@ -101,27 +104,22 @@ If `a1.weight` but `a2` (type: Agent2) has no `weight`, use or [`deepcopy`](https://docs.julialang.org/en/v1/base/base/#Base.deepcopy) if some data are nested mutable containers. Both of these options have performance penalties. * `agents_first=true` : Whether to update agents first and then the model, or vice versa. -* `showprogress=false` : Whether to show a progress bar. +* `showprogress=false` : Whether to show progress """ function run! end run!(model::ABM, agent_step!, n::Int = 1; kwargs...) = run!(model::ABM, agent_step!, dummystep, n; kwargs...) -function run!( - model, - agent_step!, - model_step!, - n; - when = true, - when_model = when, - mdata = nothing, - adata = nothing, - obtainer = identity, - agents_first = true, - showprogress = false, -) - +function run!(model, agent_step!, model_step!, n; + when = true, + when_model = when, + mdata = nothing, + adata = nothing, + obtainer = identity, + agents_first = true, + showprogress = false, + ) df_agent = init_agent_dataframe(model, adata) df_model = init_model_dataframe(model, mdata) if n isa Integer @@ -164,6 +162,156 @@ function run!( return df_agent, df_model end +""" + offline_run!(model, agent_step! [, model_step!], n::Integer; kwargs...) + offline_run!(model, agent_step!, model_step!, n::Function; kwargs...) + +Do the same as [`run`](@ref), but instead of collecting the whole run into an in-memory +dataframe, write the output to a file after collecting data `writing_interval` times and +empty the dataframe after each write. +Useful when the amount of collected data is expected to exceed the memory available +during execution. + +## Keywords +* `backend=:csv` : backend to use for writing data. + Currently supported backends: `:csv`, `:arrow` +* `adata_filename="adata.\$backend"` : a file to write agent data on. + Appends to the file if it already exists, otherwise creates the file. +* `mdata_filename="mdata.\$backend"`: a file to write the model data on. + Appends to the file if it already exists, otherwise creates the file. +* `writing_interval=1` : write to file every `writing_interval` times data collection + is triggered. If the `when` keyword is not set, this corresponds to writing to file + every `writing_interval` steps; otherwise, the data will be written every + `writing_interval` times the `when` condition is satisfied + (the same applies to `when_model`). +""" +function offline_run! end + +offline_run!(model::ABM, agent_step!, n::Int = 1; kwargs...) = + offline_run!(model::ABM, agent_step!, dummystep, n; kwargs...) + +function offline_run!(model, agent_step!, model_step!, n; + when = true, + when_model = when, + mdata = nothing, + adata = nothing, + obtainer = identity, + agents_first = true, + showprogress = false, + backend::Symbol = :csv, + adata_filename = "adata.$backend", + mdata_filename = "mdata.$backend", + writing_interval = 1, + ) + df_agent = init_agent_dataframe(model, adata) + df_model = init_model_dataframe(model, mdata) + if n isa Integer + if when == true + for c in eachcol(df_agent) + sizehint!(c, n) + end + end + if when_model == true + for c in eachcol(df_model) + sizehint!(c, n) + end + end + end + + writer = get_writer(backend) + run_and_write!(model, agent_step!, model_step!, df_agent, df_model, n; + when, when_model, + mdata, adata, + obtainer, agents_first, + showprogress, + writer, adata_filename, mdata_filename, writing_interval + ) +end + +function run_and_write!(model, agent_step!, model_step!, df_agent, df_model, n; + when, when_model, + mdata, adata, + obtainer, agents_first, + showprogress, + writer, adata_filename, mdata_filename, writing_interval +) + s = 0 + p = if typeof(n) <: Int + ProgressMeter.Progress(n; enabled=showprogress, desc="run! progress: ") + else + ProgressMeter.ProgressUnknown(desc="run! steps done: ", enabled=showprogress) + end + + agent_count_collections = 0 + model_count_collections = 0 + while until(s, n, model) + if should_we_collect(s, model, when) + collect_agent_data!(df_agent, model, adata, s; obtainer) + agent_count_collections += 1 + if agent_count_collections % writing_interval == 0 + writer(adata_filename, df_agent, isfile(adata_filename)) + empty!(df_agent) + end + end + if should_we_collect(s, model, when_model) + collect_model_data!(df_model, model, mdata, s; obtainer) + model_count_collections += 1 + if model_count_collections % writing_interval == 0 + writer(mdata_filename, df_model, isfile(mdata_filename)) + empty!(df_model) + end + end + step!(model, agent_step!, model_step!, 1, agents_first) + s += 1 + ProgressMeter.next!(p) + end + + if should_we_collect(s, model, when) + collect_agent_data!(df_agent, model, adata, s; obtainer) + agent_count_collections += 1 + end + if should_we_collect(s, model, when_model) + collect_model_data!(df_model, model, mdata, s; obtainer) + model_count_collections += 1 + end + # catch collected data that was not yet written to disk + if !isempty(df_agent) + writer(adata_filename, df_agent, isfile(adata_filename)) + empty!(df_agent) + end + if !isempty(df_model) + writer(mdata_filename, df_model, isfile(mdata_filename)) + empty!(df_model) + end + + ProgressMeter.finish!(p) + return nothing +end + +""" + get_writer(backend) +Return a function to write to file using a given `backend`. +The returned writer function will take three arguments: +filename, data to write, whether to append to existing file or not. +""" +function get_writer(backend) + @assert backend in (:csv, :arrow) "Backend $backend not supported." + if backend == :csv + return writer_csv + elseif backend == :arrow + if Sys.iswindows() + error("""Arrow.jl integration currently does not work on Windows. + Please use another backend like `:csv` until the issue has been resolved. + Further info: https://github.com/JuliaDynamics/Agents.jl/issues/826""") + end + return writer_arrow + end +end + +writer_csv(filename, data, append) = AgentsIO.CSV.write(filename, data; append) + +function writer_arrow end + ################################################### # core data collection functions per step ################################################### diff --git a/src/submodules/io/AgentsIO.jl b/src/submodules/io/AgentsIO.jl index 608a5bc563..8742ad75ec 100644 --- a/src/submodules/io/AgentsIO.jl +++ b/src/submodules/io/AgentsIO.jl @@ -11,4 +11,8 @@ using Agents include("csv_integration.jl") include("jld2_integration.jl") + +function dump_to_arrow end +function populate_from_arrow! end + end \ No newline at end of file diff --git a/test/collect_tests.jl b/test/collect_tests.jl index 1fc232e423..3b870ada6d 100644 --- a/test/collect_tests.jl +++ b/test/collect_tests.jl @@ -202,6 +202,67 @@ end mdata = [(m) -> (m.deep.data[i]) for i in 1:length(model.deep.data)], ) @test Array{Float64,1}(model_data[1, 2:end]) == model.deep.data + + @testset "Writing to file while running" begin + + # CSV + offline_run!(model, agent_step!, model_step!, 365 * 5; + when_model = each_year, + when = six_months, + mdata = [:flag, :year], + adata = [(:weight, mean)], + writing_interval = 3 + ) + + adata_saved = CSV.read("adata.csv", DataFrame) + @test size(adata_saved) == (11, 2) + @test propertynames(adata_saved) == [:step, :mean_weight] + + mdata_saved = CSV.read("mdata.csv", DataFrame) + @test size(mdata_saved) == (6, 3) + @test propertynames(mdata_saved) == [:step, :flag, :year] + + rm("adata.csv") + rm("mdata.csv") + @test !isfile("adata.csv") + @test !isfile("mdata.csv") + + # Arrow, fails on Windows (see issue #826 (https://github.com/JuliaDynamics/Agents.jl/issues/826)) + if !(Sys.iswindows()) + offline_run!(model, agent_step!, model_step!, 365 * 5; + when_model = each_year, + when = six_months, + backend = :arrow, + mdata = [:flag, :year], + adata = [(:weight, mean)], + writing_interval = 3 + ) + + adata_saved = DataFrame(Arrow.Table("adata.arrow")) + @test size(adata_saved) == (11, 2) + @test propertynames(adata_saved) == [:step, :mean_weight] + + mdata_saved = DataFrame(Arrow.Table("mdata.arrow")) + @test size(mdata_saved) == (6, 3) + @test propertynames(mdata_saved) == [:step, :flag, :year] + + @test size(vcat(DataFrame.(Arrow.Stream("adata.arrow"))...)) == (11, 2) + @test size(vcat(DataFrame.(Arrow.Stream("mdata.arrow"))...)) == (6, 3) + + rm("adata.arrow") + rm("mdata.arrow") + @test !isfile("adata.arrow") + @test !isfile("mdata.arrow") + end + + # Backends + @test_throws TypeError begin + offline_run!(model, agent_step!, model_step!, 365 * 5; backend = "hdf5") + end + @test_throws AssertionError begin + offline_run!(model, agent_step!, model_step!, 365 * 5; backend = :hdf5) + end + end end @testset "Low-level API for Collections" begin diff --git a/test/runtests.jl b/test/runtests.jl index 64863b935b..c14cbc631e 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,4 +1,5 @@ using Test, Agents, Random, LinearAlgebra +using CSV, Arrow using Agents.Graphs, Agents.DataFrames using StatsBase: mean using StableRNGs @@ -6,7 +7,8 @@ using StableRNGs using Distributed addprocs(2) @everywhere begin - using Test, Agents, Random + using Test, Agents, Random, LinearAlgebra + using CSV, Arrow using Agents.Graphs, Agents.DataFrames using StatsBase: mean using StableRNGs