Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New function offline_run! to write data during run! #815

Merged
merged 43 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
1f2241d
Allow writing data during `run!`
kavir1698 Jun 8, 2022
eef1e0c
fix conflicts
mastrof Jun 3, 2023
aa46620
define new `offline_run!` function
mastrof Jun 3, 2023
893aaa8
fix tests
mastrof Jun 3, 2023
a1a9361
cleanup `run!`
mastrof Jun 3, 2023
1267d2d
cleanup `run!` (again)
mastrof Jun 3, 2023
13fb707
fix typo
mastrof Jun 3, 2023
4c30cd0
fix docstrings and use `empty!` instead of reinitializing dataframes
mastrof Jun 10, 2023
3c42bba
remove duplicate CSV from project extras
mastrof Jun 10, 2023
15a6584
use semicolon for kwargs and fix docstring
mastrof Jun 11, 2023
c21d2f0
mention `offline_run!` in the `run!` docstring
mastrof Jun 11, 2023
03487ac
select writing backend via kwarg and update test
mastrof Jun 12, 2023
9207cc0
update project and changelog
mastrof Jun 12, 2023
088a956
add `offline_run!` to docs
mastrof Jun 12, 2023
dd0156c
introduce function barrier
mastrof Jun 15, 2023
e5a6457
Merge branch 'main' of https://github.com/JuliaDynamics/Agents.jl int…
mastrof Jun 15, 2023
6b2cab5
Backends as symbols; isolate writer functions
fbanning Jun 20, 2023
a5168c7
Add Arrow dependency
fbanning Jun 20, 2023
3368582
Add stub for arrow integration in AgentsIO
fbanning Jun 20, 2023
ab617f6
Simplify flow in functions
fbanning Jun 20, 2023
7c9d968
Update docstring
fbanning Jun 20, 2023
8958696
Add tests for Arrow
fbanning Jun 21, 2023
7728d81
Condense function defs
fbanning Jun 21, 2023
7630fa4
Convert ArgumentError to AssertionError
fbanning Jun 21, 2023
962d4e3
Fix docstring formatting
fbanning Jun 21, 2023
1073722
Condense append check
fbanning Jun 21, 2023
82b0617
Fix test for Arrow.Stream
fbanning Jun 21, 2023
9dbc101
Catch collected data not yet written to disk
fbanning Jun 21, 2023
86788bb
Update CHANGELOG
fbanning Jun 21, 2023
7336c91
Reintroduce function barrier
fbanning Jun 21, 2023
800b5c8
Move extensions into separate folders
fbanning Jun 21, 2023
b4f4a62
Add AgentsArrow extension
fbanning Jun 21, 2023
7992063
Remove AgentsIO for Arrow for now
fbanning Jun 21, 2023
875fb8c
Turn Arrow into weak dep for ext
fbanning Jun 21, 2023
fdc63fb
Create stub function for extension
fbanning Jun 21, 2023
2930f57
Directly use Arrow in tests
fbanning Jun 21, 2023
216f48a
Add Arrow as test dependency
fbanning Jun 21, 2023
3971829
Merge AgentsIO placeholder funcs into AgentsArrow
fbanning Jun 21, 2023
0f308cd
Add placeholders for AgentsIO with Arrow
fbanning Jun 21, 2023
c137a7e
maybe
Tortar Jul 10, 2023
80f9adc
Merge branch 'main' into offline_run
Tortar Jul 10, 2023
28c75b0
revert last change
Tortar Jul 10, 2023
b5a25ee
Handle Arrow.jl integration on Windows
fbanning Jul 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# main

# v5.16

- New function `offline_run!` allows writing data to file at predefined intervals during `run!` instead of storing it in memory. Currently supports [CSV](https://csv.juliadata.org/stable/) and [Arrow](https://apache.github.io/arrow-julia/stable/) files.

# v5.15
- Agents.jl moved to Julia 1.9+, and now exports visualization
and interactive applications automatically once Makie (or Makie backends
Expand Down
14 changes: 9 additions & 5 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
name = "Agents"
uuid = "46ada45e-f475-11e8-01d0-f70cc89e6671"
authors = ["George Datseris", "Tim DuBois", "Aayush Sabharwal", "Ali Vahdati", "Adriano Meligrana"]
version = "5.15.3"
version = "5.16.0"

[deps]
CommonSolve = "38540f10-b2f7-11e9-35d8-d573e4eb0ff2"
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
CommonSolve = "38540f10-b2f7-11e9-35d8-d573e4eb0ff2"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Expand All @@ -25,16 +25,19 @@ StaticArrays = "90137ffa-7385-5640-81b9-e52037218182"
StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91"

[weakdeps]
Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45"
Makie = "ee78f7c6-11fb-53f2-987a-cfe4a2b5a57a"
OSMMakie = "76b6901f-8821-46bb-9129-841bc9cfe677"

[extensions]
AgentsVisualizations = "Makie"
AgentsOSMVisualizations = "OSMMakie"
AgentsVisualizations = "Makie"
AgentsArrow = "Arrow"

[compat]
CommonSolve = "0.2.4"
Arrow = "2"
CSV = "0.9.7, 0.10"
CommonSolve = "0.2.4"
DataFrames = "0.21, 0.22, 1"
DataStructures = "0.18"
Distributions = "0.25"
Expand All @@ -50,9 +53,10 @@ StatsBase = "0.32, 0.33, 0.34"
julia = "1.9"

[extras]
Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45"
BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf"
StableRNGs = "860ef19b-820b-49d6-a774-d7a799459cd3"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Test", "BenchmarkTools", "StableRNGs"]
test = ["Test", "BenchmarkTools", "StableRNGs", "Arrow"]
5 changes: 5 additions & 0 deletions docs/src/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,5 +328,10 @@ AgentsIO.populate_from_csv!
AgentsIO.dump_to_csv
```

It is also possible to write data to file at predefined intervals while running your model, instead of storing it in memory:
```@docs
offline_run!
Datseris marked this conversation as resolved.
Show resolved Hide resolved
```

In case you require custom serialization for model properties, refer to the [Developer Docs](@ref)
for details.
25 changes: 25 additions & 0 deletions ext/AgentsArrow/AgentsArrow.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module AgentsArrow

using Agents, Arrow

function Agents.writer_arrow(filename, data, append)
if append
Arrow.write(filename, data; file = false)
else
Arrow.write(filename, data)
end
end

# TODO: Implement populate_from and dump_to functions for Arrow.jl

function AgentsIO.populate_from_arrow!()
@error "Not yet implemented."
return
end

function AgentsIO.dump_to_arrow()
@error "Not yet implemented."
return
end

end
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
173 changes: 158 additions & 15 deletions src/simulations/collect.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export run!,
offline_run!,
collect_agent_data!,
collect_model_data!,
init_agent_dataframe,
Expand Down Expand Up @@ -34,6 +35,8 @@ Run the model (step it with the input arguments propagated into [`step!`](@ref))
data specified by the keywords, explained one by one below. Return the data as
two `DataFrame`s, one for agent-level data and one for model-level data.

See also [`offline_run!`](@ref) to write data to file while running the model.

## Data-deciding keywords
* `adata::Vector` means "agent data to collect". If an entry is a `Symbol`, e.g. `:weight`,
then the data for this entry is agent's field `weight`. If an entry is a `Function`, e.g.
Expand Down Expand Up @@ -101,27 +104,22 @@ If `a1.weight` but `a2` (type: Agent2) has no `weight`, use
or [`deepcopy`](https://docs.julialang.org/en/v1/base/base/#Base.deepcopy) if some data are
nested mutable containers. Both of these options have performance penalties.
* `agents_first=true` : Whether to update agents first and then the model, or vice versa.
* `showprogress=false` : Whether to show a progress bar.
* `showprogress=false` : Whether to show progress
"""
function run! end

run!(model::ABM, agent_step!, n::Int = 1; kwargs...) =
run!(model::ABM, agent_step!, dummystep, n; kwargs...)

function run!(
model,
agent_step!,
model_step!,
n;
when = true,
when_model = when,
mdata = nothing,
adata = nothing,
obtainer = identity,
agents_first = true,
showprogress = false,
)

function run!(model, agent_step!, model_step!, n;
when = true,
when_model = when,
mdata = nothing,
adata = nothing,
obtainer = identity,
agents_first = true,
showprogress = false,
)
df_agent = init_agent_dataframe(model, adata)
df_model = init_model_dataframe(model, mdata)
if n isa Integer
Expand Down Expand Up @@ -164,6 +162,151 @@ function run!(
return df_agent, df_model
end

"""
offline_run!(model, agent_step! [, model_step!], n::Integer; kwargs...)
offline_run!(model, agent_step!, model_step!, n::Function; kwargs...)

Do the same as [`run`](@ref), but instead of collecting the whole run into an in-memory
dataframe, write the output to a file after collecting data `writing_interval` times and
empty the dataframe after each write.
Useful when the amount of collected data is expected to exceed the memory available
during execution.

## Keywords
* `backend=:csv` : backend to use for writing data.
Currently supported backends: `:csv`, `:arrow`
* `adata_filename="adata.\$backend"` : a file to write agent data on.
Appends to the file if it already exists, otherwise creates the file.
* `mdata_filename="mdata.\$backend"`: a file to write the model data on.
Appends to the file if it already exists, otherwise creates the file.
* `writing_interval=1` : write to file every `writing_interval` times data collection
is triggered. If the `when` keyword is not set, this corresponds to writing to file
every `writing_interval` steps; otherwise, the data will be written every
`writing_interval` times the `when` condition is satisfied
(the same applies to `when_model`).
"""
function offline_run! end

offline_run!(model::ABM, agent_step!, n::Int = 1; kwargs...) =
offline_run!(model::ABM, agent_step!, dummystep, n; kwargs...)

function offline_run!(model, agent_step!, model_step!, n;
when = true,
when_model = when,
mdata = nothing,
adata = nothing,
obtainer = identity,
agents_first = true,
showprogress = false,
backend::Symbol = :csv,
adata_filename = "adata.$backend",
mdata_filename = "mdata.$backend",
writing_interval = 1,
)
df_agent = init_agent_dataframe(model, adata)
df_model = init_model_dataframe(model, mdata)
if n isa Integer
if when == true
for c in eachcol(df_agent)
sizehint!(c, n)
end
end
if when_model == true
for c in eachcol(df_model)
sizehint!(c, n)
end
end
end

writer = get_writer(backend)
run_and_write!(model, agent_step!, model_step!, df_agent, df_model, n;
when, when_model,
mdata, adata,
obtainer, agents_first,
showprogress,
writer, adata_filename, mdata_filename, writing_interval
)
end

function run_and_write!(model, agent_step!, model_step!, df_agent, df_model, n;
when, when_model,
mdata, adata,
obtainer, agents_first,
showprogress,
writer, adata_filename, mdata_filename, writing_interval
)
s = 0
p = if typeof(n) <: Int
ProgressMeter.Progress(n; enabled=showprogress, desc="run! progress: ")
else
ProgressMeter.ProgressUnknown(desc="run! steps done: ", enabled=showprogress)
end

agent_count_collections = 0
model_count_collections = 0
while until(s, n, model)
if should_we_collect(s, model, when)
collect_agent_data!(df_agent, model, adata, s; obtainer)
agent_count_collections += 1
if agent_count_collections % writing_interval == 0
writer(adata_filename, df_agent, isfile(adata_filename))
empty!(df_agent)
end
end
if should_we_collect(s, model, when_model)
collect_model_data!(df_model, model, mdata, s; obtainer)
model_count_collections += 1
if model_count_collections % writing_interval == 0
writer(mdata_filename, df_model, isfile(mdata_filename))
empty!(df_model)
end
end
step!(model, agent_step!, model_step!, 1, agents_first)
s += 1
ProgressMeter.next!(p)
end

if should_we_collect(s, model, when)
collect_agent_data!(df_agent, model, adata, s; obtainer)
agent_count_collections += 1
end
if should_we_collect(s, model, when_model)
collect_model_data!(df_model, model, mdata, s; obtainer)
model_count_collections += 1
end
# catch collected data that was not yet written to disk
if !isempty(df_agent)
writer(adata_filename, df_agent, isfile(adata_filename))
empty!(df_agent)
end
if !isempty(df_model)
writer(mdata_filename, df_model, isfile(mdata_filename))
empty!(df_model)
end

ProgressMeter.finish!(p)
return nothing
end

"""
get_writer(backend)
Return a function to write to file using a given `backend`.
The returned writer function will take three arguments:
filename, data to write, whether to append to existing file or not.
"""
function get_writer(backend)
@assert backend in (:csv, :arrow) "Backend $backend not supported."
if backend == :csv
return writer_csv
elseif backend == :arrow
return writer_arrow
end
end

writer_csv(filename, data, append) = AgentsIO.CSV.write(filename, data; append)

function writer_arrow end

###################################################
# core data collection functions per step
###################################################
Expand Down
4 changes: 4 additions & 0 deletions src/submodules/io/AgentsIO.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,8 @@ using Agents

include("csv_integration.jl")
include("jld2_integration.jl")

function dump_to_arrow end
function populate_from_arrow! end

end
59 changes: 59 additions & 0 deletions test/collect_tests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,65 @@ end
mdata = [(m) -> (m.deep.data[i]) for i in 1:length(model.deep.data)],
)
@test Array{Float64,1}(model_data[1, 2:end]) == model.deep.data

@testset "Writing to file while running" begin

# CSV
offline_run!(model, agent_step!, model_step!, 365 * 5;
when_model = each_year,
when = six_months,
mdata = [:flag, :year],
adata = [(:weight, mean)],
writing_interval = 3
)

adata_saved = CSV.read("adata.csv", DataFrame)
@test size(adata_saved) == (11, 2)
@test propertynames(adata_saved) == [:step, :mean_weight]

mdata_saved = CSV.read("mdata.csv", DataFrame)
@test size(mdata_saved) == (6, 3)
@test propertynames(mdata_saved) == [:step, :flag, :year]

rm("adata.csv")
rm("mdata.csv")
@test !isfile("adata.csv")
@test !isfile("mdata.csv")

# Arrow
offline_run!(model, agent_step!, model_step!, 365 * 5;
when_model = each_year,
when = six_months,
backend = :arrow,
mdata = [:flag, :year],
adata = [(:weight, mean)],
writing_interval = 3
)

adata_saved = DataFrame(Arrow.Table("adata.arrow"))
@test size(adata_saved) == (11, 2)
@test propertynames(adata_saved) == [:step, :mean_weight]

mdata_saved = DataFrame(Arrow.Table("mdata.arrow"))
@test size(mdata_saved) == (6, 3)
@test propertynames(mdata_saved) == [:step, :flag, :year]

@test size(vcat(DataFrame.(Arrow.Stream("adata.arrow"))...)) == (11, 2)
@test size(vcat(DataFrame.(Arrow.Stream("mdata.arrow"))...)) == (6, 3)

rm("adata.arrow")
rm("mdata.arrow")
@test !isfile("adata.arrow")
@test !isfile("mdata.arrow")

# Backends
@test_throws TypeError begin
offline_run!(model, agent_step!, model_step!, 365 * 5; backend = "hdf5")
end
@test_throws AssertionError begin
offline_run!(model, agent_step!, model_step!, 365 * 5; backend = :hdf5)
end
end
end

@testset "Low-level API for Collections" begin
Expand Down
4 changes: 3 additions & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
using Test, Agents, Random, LinearAlgebra
using CSV, Arrow
using Agents.Graphs, Agents.DataFrames
using StatsBase: mean
using StableRNGs

using Distributed
addprocs(2)
@everywhere begin
using Test, Agents, Random
using Test, Agents, Random, LinearAlgebra
using CSV, Arrow
using Agents.Graphs, Agents.DataFrames
using StatsBase: mean
using StableRNGs
Expand Down
Loading