Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add divide-and-conquer histogram benchmark #54

Merged
merged 5 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ using BenchmarkTools
using ConcurrentCollections

function generate(; datasize = 2^19, nkeys = datasize)
ints = rand(1:nkeys, datasize)
lastkey = string(nkeys)
prefix = suffix = ""
# prefix = "9" ^ 30 # adding works for hashing and comparison
# suffix = "0" ^ 30 # adding works for hashing (but not for comparison)
ks = prefix .* string.(1:nkeys; pad = length(lastkey)) .* suffix
data = rand(ks, datasize)
data = prefix .* string.(ints; pad = length(lastkey)) .* suffix
return data
end

Expand Down Expand Up @@ -46,6 +46,30 @@ function hist_parallel!(dict::ConcurrentDict, data; ntasks = Threads.nthreads())
return dict
end

function hist_dac_impl(data, chunk_starts, basesize)
if length(chunk_starts) == 0
return Dict{String,Int}()
elseif length(chunk_starts) == 1
i = @inbounds chunk_starts[begin]
chunk = @inbounds data[i:min(i + basesize - 1, end)]
return hist_seq!(Dict{String,Int}(), chunk)
else
h = length(chunk_starts) ÷ 2
left_chunk = @view chunk_starts[begin:begin+h-1]
right_chunk = @view chunk_starts[begin+h:end]
task = Threads.@spawn hist_dac_impl(data, right_chunk, basesize)
left = hist_dac_impl(data, left_chunk, basesize)
right = fetch(task)::typeof(left)
return mergewith!(+, left, right)
end
end

function hist_parallel_dac(data; ntasks = Threads.nthreads())
basesize = cld(length(data), ntasks)
chunk_starts = firstindex(data):basesize:lastindex(data)
return hist_dac_impl(data, chunk_starts, basesize)
end

function default_ntasks_list()
ntasks_list = [Threads.nthreads()]
if Threads.nthreads() > 2
Expand All @@ -56,30 +80,50 @@ end

const CACHE = Ref{Any}()

function setup(; ntasks_list = default_ntasks_list())
CACHE[] = data = generate()
function setup(;
ntasks_list = default_ntasks_list(),
datasize = 2^19,
nkeys_list = [datasize],
)
CACHE[] = data = Dict(0 => generate(; nkeys = 1)) # dummy data for allocation
empty!(data)
T = typeof(data)

suite = BenchmarkGroup()
suite["base-seq"] = @benchmarkable(
# Base.Dict, sequential
hist_seq!(dict, CACHE[]::$T),
setup = (dict = Dict{String,Int}()),
evals = 1,
)
suite["cdict-seq"] = @benchmarkable(
# ConcurrentDict, sequential
hist_seq!(dict, CACHE[]::$T),
setup = (dict = ConcurrentDict{String,Int}()),
evals = 1,
)
for ntasks in ntasks_list
suite["cdict-ntasks=$ntasks"] = @benchmarkable(
# ConcurrentDict, parallel
hist_parallel!(dict, CACHE[]::$T; ntasks = $ntasks),
for nkeys in nkeys_list
data[nkeys] = generate(; datasize = datasize, nkeys = nkeys)

s0 = suite["nkeys=$nkeys"] = BenchmarkGroup()

sbs = s0["alg=:base_seq"] = BenchmarkGroup()
sbs["ntasks=1"] = @benchmarkable(
# Base.Dict, sequential
hist_seq!(dict, (CACHE[]::$T)[$nkeys]),
setup = (dict = Dict{String,Int}()),
evals = 1,
)
scs = s0["alg=:cdict_seq"] = BenchmarkGroup()
scs["ntasks=1"] = @benchmarkable(
# ConcurrentDict, sequential
hist_seq!(dict, (CACHE[]::$T)[$nkeys]),
setup = (dict = ConcurrentDict{String,Int}()),
evals = 1,
)
sbp = s0["alg=:base_par"] = BenchmarkGroup()
scp = s0["alg=:cdict_par"] = BenchmarkGroup()
for ntasks in ntasks_list
sbp["ntasks=$ntasks"] = @benchmarkable(
# Base.Dict, parallel
hist_parallel_dac((CACHE[]::$T)[$nkeys]; ntasks = $ntasks),
evals = 1,
)
scp["ntasks=$ntasks"] = @benchmarkable(
# ConcurrentDict, parallel
hist_parallel!(dict, (CACHE[]::$T)[$nkeys]; ntasks = $ntasks),
setup = (dict = ConcurrentDict{String,Int}()),
evals = 1,
)
end
end
return suite
end
Expand Down
3 changes: 3 additions & 0 deletions benchmark/dict_histogram/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/backup
/build
/tmp
25 changes: 25 additions & 0 deletions benchmark/dict_histogram/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
JULIA = julia1.7
JULIA_CMD ?= $(JULIA) --color=yes --startup-file=no

export JULIA_PROJECT = $(shell pwd)/../../test/ConcurrentCollectionsTests
# export JULIA_LOAD_PATH = @

.PHONY: benchmark clean backup

BUILD = build

benchmark: $(BUILD)/results.json

$(BUILD)/results.json:
$(JULIA_CMD) -t16 run.jl

clean:
rm -fv $(BUILD)/*.json

backup:
test -e $(BUILD)/results.json
mkdir -pv backup
rm -rf tmp/backup
mkdir -pv tmp/backup/build
mv $(BUILD)/* tmp/backup/build/
mv tmp/backup backup/backup-$$(date +%Y-%m-%d-%H%M%S)
70 changes: 70 additions & 0 deletions benchmark/dict_histogram/plot.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import BenchmarkConfigSweeps
import BenchmarkTools
using DataFrames
using FileIO
using JSON
using Statistics
using VegaLite

results = only(BenchmarkTools.load(joinpath(@__DIR__, "build/results.json")))
df_raw = DataFrame(BenchmarkConfigSweeps.flattable(results))

access_param = "Access density"

begin
df = select(df_raw, Not(:trial))
df = select(df, Not(r"JULIA_.*"))
df[:, :ms] = map(t -> mean(t).time, df_raw.trial) ./ 1e6
df[:, :Implementation] = map(df.alg) do alg
if alg === :base_seq || alg == :base_par
Symbol("Base.Dict + Divide-and-Conquer")
elseif alg === :cdict_seq || alg === :cdict_par
:ConcurrentDict
else
error("unknown alg = ", alg)
end
end
datasize = 2^19
df[:, access_param] = datasize ./ df.nkeys
df
end
#-

df_speedup = combine(groupby(df, Not([:ms, :ntasks, :alg, :Implementation]))) do g
baseline = only(g.ms[g.alg.===:base_seq])
hcat(g, DataFrame((; speedup = baseline ./ g.ms)))
end
#-

function parallel_algorithms(df)
idx = df.alg .∈ Ref((:base_par, :cdict_par))
return df[idx, :]
end

plt = @vlplot(
facet = {column = {field = :Implementation}},
spec = {
layer = [
{
# :line,
mark = {:line, point = true},
encoding = {
x = {:ntasks, type = :quantitative, title = "Number of Tasks"},
y = {
:speedup,
type = :quantitative,
title = "Speedup wrt sequential program",
},
color = {field = access_param, type = :ordinal},
},
},
{mark = :rule, encoding = {y = {datum = 1}}},
],
},
data = parallel_algorithms(df_speedup),
)

save(joinpath(@__DIR__, "build/results.png"), plt)
save(joinpath(@__DIR__, "build/results.svg"), plt)

plt
52 changes: 52 additions & 0 deletions benchmark/dict_histogram/run.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import BenchmarkTools
import ConcurrentCollectionsBenchmarks
import JSON

function git_info(dir = @__DIR__)
git(cmd) = strip(read(setenv(`git $cmd`; dir), String))
return (;
revision = git(`rev-parse HEAD`),
status = git(`status --short --untracked-files=no --porcelain`),
)
end

function julia_info()
return (
version = string(VERSION),
git = (
commit = Base.GIT_VERSION_INFO.commit,
branch = Base.GIT_VERSION_INFO.branch,
),
is_debugbuild = ccall(:jl_is_debugbuild, Cint, ()) != 0,
libllvm_version = string(Base.libllvm_version),
Sys = (
WORD_SIZE = Sys.WORD_SIZE,
JIT = Sys.JIT,
# CPU_NAME = Sys.CPU_NAME,
# CPU_THREADS = Sys.CPU_THREADS,
),
env = Dict(k => v for (k, v) in ENV if startswith(k, "JULIA_")),
)
end

function main(args = ARGS)
output = get(args, 1, joinpath(@__DIR__, "build", "results.json"))
mkpath(dirname(output))

info = (; git = git_info(), julia = julia_info())
open(joinpath(dirname(output), "info.json"), write = true) do io
JSON.print(io, info)
end

suite = ConcurrentCollectionsBenchmarks.BenchDictHistogram.setup(
ntasks_list = 1:Threads.nthreads(),
nkeys_list = [2^13, 2^16, 2^19, 2^25],
)
results = run(suite; verbose = true)
BenchmarkTools.save(output, results)
return results
end

if abspath(PROGRAM_FILE) == @__FILE__
main()
end
40 changes: 23 additions & 17 deletions test/ConcurrentCollectionsTests/src/test_bench_dict_histogram.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,20 @@ module TestBenchDictHistogram

using ConcurrentCollections
using ConcurrentCollectionsBenchmarks.BenchDictHistogram:
default_ntasks_list, generate, hist_parallel!, hist_seq!
default_ntasks_list, generate, hist_parallel!, hist_parallel_dac, hist_seq!
using Test

function diffvalues(actual, expected)
diffs = []
for (key, ve) in expected
va = actual[key]
if ve != va
push!(diffs, (; key, actual = va, expected = ve))
end
end
return diffs
end

function test()
datasize_list = [10, 2^5, 2^10, 2^20]
fulldata = generate(datasize = datasize_list[end])
Expand All @@ -20,14 +31,7 @@ function test(data)
cdseq = hist_seq!(ConcurrentDict{String,Int}(), data)
@test sort(collect(setdiff(keys(dbase), keys(cdseq)))) == []
@test sort(collect(setdiff(keys(cdseq), keys(dbase)))) == []
diffvalues = []
for (key, expected) in dbase
actual = cdseq[key]
if actual != expected
push!(diffvalues, (; key, actual, expected))
end
end
@test diffvalues == []
@test diffvalues(cdseq, dbase) == []
@test Dict(cdseq) == dbase
end
@testset for ntasks in default_ntasks_list()
Expand All @@ -39,16 +43,18 @@ function test(data)
=#
@test sort(collect(setdiff(keys(dbase), keys(cdpar)))) == []
@test sort(collect(setdiff(keys(cdpar), keys(dbase)))) == []
diffvalues = []
for (key, expected) in dbase
actual = cdpar[key]
if actual != expected
push!(diffvalues, (; key, actual, expected))
end
end
@test diffvalues == []
@test diffvalues(cdpar, dbase) == []
@test Dict(cdpar) == dbase
end
@testset "dac" begin
@testset for ntasks in default_ntasks_list()
dpar = hist_parallel_dac(data; ntasks = ntasks)
@test sort(collect(setdiff(keys(dbase), keys(dpar)))) == []
@test sort(collect(setdiff(keys(dpar), keys(dbase)))) == []
@test diffvalues(dpar, dbase) == []
@test dpar == dbase
end
end
end

end # module