From eb9dfd1dae37f2f6c9baf6a20477a0ba98dee05b Mon Sep 17 00:00:00 2001 From: Felippe Alves Date: Mon, 15 Jan 2024 13:52:53 +0000 Subject: [PATCH] Checkpoint --- Manifest.toml | 35 +++- Project.toml | 3 + scripts/run_gbp_algorithm_multi_process.jl | 206 ++++++++++++++++++++ scripts/run_gbp_algorithm_single_process.jl | 2 +- src/GBPQAMDecoder.jl | 183 ----------------- src/decoding.jl | 31 ++- 6 files changed, 267 insertions(+), 193 deletions(-) create mode 100644 scripts/run_gbp_algorithm_multi_process.jl diff --git a/Manifest.toml b/Manifest.toml index 7f0bbee..8972e5a 100644 --- a/Manifest.toml +++ b/Manifest.toml @@ -2,7 +2,7 @@ julia_version = "1.9.3" manifest_format = "2.0" -project_hash = "5902aa4cbdc90f37fe4ffcd1edcfda0f659c4971" +project_hash = "fcaf1a03f43730f617e1ecbadf378171f51cfb07" [[deps.AbstractPlutoDingetjes]] deps = ["Pkg"] @@ -198,7 +198,7 @@ version = "1.6.1" deps = ["Chain", "DataFrames", "Distributions", "LinearAlgebra", "NamedTupleTools", "Transducers"] path = "src/DataPrep" uuid = "d40174be-394b-476a-94dd-cdf19e683a9d" -version = "0.1.0" +version = "1.0.0" [[deps.DataStructures]] deps = ["Compat", "InteractiveUtils", "OrderedCollections"] @@ -339,7 +339,7 @@ uuid = "9fa8497b-333b-5362-9e8d-4d0656e87820" deps = ["FGenerators", "LinearAlgebra", "StatsBase", "Transducers", "Tullio"] path = "src/GBPAlgorithm" uuid = "52f5dfbb-b0c2-49ff-99e4-0d4f43f103e5" -version = "0.1.0" +version = "1.0.0" [[deps.HypergeometricFunctions]] deps = ["DualNumbers", "LinearAlgebra", "OpenLibm_jll", "SpecialFunctions"] @@ -446,6 +446,12 @@ weakdeps = ["StaticArrays"] [deps.LazyArrays.extensions] LazyArraysStaticArraysExt = "StaticArrays" +[[deps.LeftChildRightSiblingTrees]] +deps = ["AbstractTrees"] +git-tree-sha1 = "fb6803dafae4a5d62ea5cab204b1e657d9737e7f" +uuid = "1d6d02ad-be62-4b6b-8a6d-2f90e265016e" +version = "0.2.0" + [[deps.LibCURL]] deps = ["LibCURL_jll", "MozillaCACerts_jll"] uuid = "b27032c2-a3e7-50c8-80cd-2d36dbcbfd21" @@ -563,6 +569,17 @@ version = "0.14.3" uuid = "ca575930-c2e3-43a9-ace4-1e988b2c1908" version = "1.2.0" +[[deps.ObservablePmap]] +deps = ["Distributed", "Logging", "Observables"] +git-tree-sha1 = "8afaa8d730c922e7d3304f51eb7f1036fcb0c89d" +uuid = "471f3579-fe7c-4d8d-9c38-de9ac94d91b4" +version = "0.2.1" + +[[deps.Observables]] +git-tree-sha1 = "7438a59546cf62428fc9d1bc94729146d37a7225" +uuid = "510215fc-4207-5dde-b226-833fc4488ee2" +version = "0.5.5" + [[deps.OpenBLAS_jll]] deps = ["Artifacts", "CompilerSupportLibraries_jll", "Libdl"] uuid = "4536629a-c528-5b80-bd46-f80d51c5b363" @@ -641,6 +658,12 @@ version = "2.3.1" deps = ["Unicode"] uuid = "de0858da-6303-5e67-8744-51eddeeeb8d7" +[[deps.ProgressLogging]] +deps = ["Logging", "SHA", "UUIDs"] +git-tree-sha1 = "80d919dee55b9c50e8d9e2da5eeafff3fe58b539" +uuid = "33c8b6b6-d38a-422a-b730-caa89a2f386c" +version = "0.1.4" + [[deps.ProgressMeter]] deps = ["Distributed", "Printf"] git-tree-sha1 = "00099623ffee15972c16111bcf84c58a0051257c" @@ -845,6 +868,12 @@ deps = ["ArgTools", "SHA"] uuid = "a4e569a6-e804-4fa4-b0f3-eef7a1d5b13e" version = "1.10.0" +[[deps.TerminalLoggers]] +deps = ["LeftChildRightSiblingTrees", "Logging", "Markdown", "Printf", "ProgressLogging", "UUIDs"] +git-tree-sha1 = "f133fab380933d042f6796eda4e130272ba520ca" +uuid = "5d786b92-1e48-4d6f-9151-6b4477ca9bed" +version = "0.1.7" + [[deps.Test]] deps = ["InteractiveUtils", "Logging", "Random", "Serialization"] uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40" diff --git a/Project.toml b/Project.toml index c83fe05..1f5db99 100644 --- a/Project.toml +++ b/Project.toml @@ -14,11 +14,14 @@ GBPAlgorithm = "52f5dfbb-b0c2-49ff-99e4-0d4f43f103e5" JLD2 = "033835bb-8acc-5ee8-8aae-3f567f8a3819" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" NamedTupleTools = "d9ec5142-1e00-5aa0-9d6a-321866360f50" +ObservablePmap = "471f3579-fe7c-4d8d-9c38-de9ac94d91b4" +Observables = "510215fc-4207-5dde-b226-833fc4488ee2" Parquet2 = "98572fba-bba0-415d-956f-fa77e587d26d" PlutoUI = "7f904dfe-b85e-4ff6-b463-dae2292396a8" ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91" +TerminalLoggers = "5d786b92-1e48-4d6f-9151-6b4477ca9bed" Transducers = "28d57a85-8fef-5791-bfe6-a80928e7c999" Tullio = "bc48ee85-29a4-5162-ae0b-a64e1601d4bc" diff --git a/scripts/run_gbp_algorithm_multi_process.jl b/scripts/run_gbp_algorithm_multi_process.jl new file mode 100644 index 0000000..fa2eb76 --- /dev/null +++ b/scripts/run_gbp_algorithm_multi_process.jl @@ -0,0 +1,206 @@ +using Distributed +addprocs(4) + +@everywhere using DrWatson + +@everywhere begin + @quickactivate :GBPQAMDecoder + + using Chain, + DataFrames, + Dates, + JLD2, + LinearAlgebra, + Observables, + ObservablePmap, + Parquet2, + ProgressMeter, + Random, + StatsBase, + TerminalLoggers, + Transducers +end + +function move_cursor_up_while_clearing_lines(io,numlinesup) + for _ in 1:numlinesup + print(io,"\r\u1b[K\u1b[A") + end +end +move_cursor_up_while_clearing_lines(numlinesup) = Base.Fix1(move_cursor_up_while_clearing_lines,stderr) + +function printover(io::IO,s::AbstractString, color::Symbol = :color_normal) + print(io,"\r") + printstyled(io,s; color=color) + # print(io,s) + print(io,"\u1b[K") # clear the rest of the line +end + +printover(s::AbstractString,color::Symbol=:color_normal) = printover(stderr,s,color) + +let + test_tasks = decoding_tasks_list(; + power=[-1,1],with_noise=[false,true],pilots=0,k=2, + load_batches=1#,alg_params=(;T=10,step=8,showprogressinfo=false) + ) + + @info "Warming up..." + obs, task = ologpmap(test_tasks; schedule_now=true, logger_f=TerminalLogger) do task_info + produce_or_load( + solve_decoding_task,task_info,datadir("results","testing_script"); + force=true,verbose=true,tag=false + ) + GC.gc() + end + + sleep(45) + + obs_func = on(obs; update=false, weak=true) do val + n = split(val,'\n') + s = val*"\n"^(21-n) + move_cursor_up_while_clearing_lines(21) + printover(s) + flush(stderr) + end +end; + +GC.gc() + +let + test_tasks = decoding_tasks_list(; + power=[-1,1],with_noise=[false,true],pilots=0,k=2, + load_batches=250#,alg_params=(;T=10,step=8,showprogressinfo=false) + ) + + @info "...One more..." + obs, task = ologpmap(test_tasks; schedule_now=true, logger_f=TerminalLogger) do task_info + produce_or_load( + solve_decoding_task,task_info,datadir("results","testing_script"); + force=true,verbose=true,tag=false + ) + GC.gc() + end + + sleep(1) + + # obs[] = join([rpad("",80) for _ in 1:nprocs()], '\n') + obs_func = on(obs; update=false, weak=true) do val + n = split(val,'\n') + s = val*"\n"^(21-n) + move_cursor_up_while_clearing_lines(21) + printover(s) + flush(stderr) + end +end; + +let + test_tasks = decoding_tasks_list(; + power=collect(-2:8),with_noise=[false,true],pilots=collect(0:3),k=collect(1:3), + load_batches=2600#,alg_params=(;T=10,step=8,showprogressinfo=false) + ) + + @info "Running GBP for all cases" + obs, task = ologpmap(test_tasks; schedule_now=true, logger_f=TerminalLogger) do task_info + produce_or_load( + solve_decoding_task,task_info,datadir("results","gbp_all"); + force=true,verbose=true,tag=false + ) + GC.gc() + end + + sleep(1800) + + obs_func = on(obs; update=false, weak=true) do val + n = split(val,'\n') + s = val*"\n"^(21-n) + move_cursor_up_while_clearing_lines(21) + printover(s) + flush(stderr) + end +end; + + +# let +# test_tasks = decoding_tasks_list(; +# power=[-1,1],with_noise=[false,true],pilots=0,k=2, +# load_batches=1,alg_params=(;T=10,step=8,showprogressinfo=false) +# ) + +# p = Progress(length(test_tasks); desc="Warming up...", dt=10^-3, showspeed=true) +# channel = RemoteChannel(() -> Channel{Bool}(), 1) + +# @sync begin # start two tasks which will be synced in the very end +# # the first task updates the progress bar +# @async while take!(channel) +# next!(p) +# end + +# # the second task does the computation +# @async begin +# @distributed (+) for task_info in test_tasks +# produce_or_load( +# solve_decoding_task,task_info,datadir("results","testing_script"); +# force=true,verbose=false,tag=false +# ) +# GC.gc() +# put!(channel, true) # trigger a progress bar update +# 1 +# end +# put!(channel, false) # this tells the printing task to finish +# end +# end + +# # progress_pmap(test_tasks; progress) do task_info +# # GC.gc() +# # produce_or_load( +# # solve_decoding_task,task_info,datadir("results","testing_script"); +# # force=true,verbose=false,tag=false +# # ) +# # end + +# GC.gc() +# end; + +# GC.gc() + +# let +# test_tasks = decoding_tasks_list(; +# power=[-1,1],with_noise=[false,true],pilots=0,k=2, +# load_batches=25,alg_params=(;T=10,step=8,showprogressinfo=false) +# ) + +# p = Progress(length(test_tasks); desc="... one more...", dt=10^-3, showspeed=true) +# channel = RemoteChannel(() -> Channel{Bool}(), 1) + +# @sync begin # start two tasks which will be synced in the very end +# # the first task updates the progress bar +# @async while take!(channel) +# next!(p) +# end + +# # the second task does the computation +# @async begin +# @distributed (+) for task_info in test_tasks +# produce_or_load( +# solve_decoding_task,task_info,datadir("results","testing_script"); +# force=true,verbose=false,tag=false +# ) +# GC.gc() +# put!(channel, true) # trigger a progress bar update +# 1 +# end +# put!(channel, false) # this tells the printing task to finish +# end +# end + +# # progress_pmap(test_tasks; progress) do task_info +# # GC.gc() +# # produce_or_load( +# # solve_decoding_task,task_info,datadir("results","testing_script"); +# # force=true,verbose=false,tag=false +# # ) +# # end + +# GC.gc() +# end; + +println("") \ No newline at end of file diff --git a/scripts/run_gbp_algorithm_single_process.jl b/scripts/run_gbp_algorithm_single_process.jl index f9fc1ea..c3b050a 100644 --- a/scripts/run_gbp_algorithm_single_process.jl +++ b/scripts/run_gbp_algorithm_single_process.jl @@ -18,7 +18,7 @@ warm_up_results = let load_batches=2 ) - map(test_tasks) do task_info + results = map(test_tasks) do task_info produce_or_load(solve_decoding_task,task_info,datadir("results","testing_script"); force=true) end end; diff --git a/src/GBPQAMDecoder.jl b/src/GBPQAMDecoder.jl index c984d66..8d239aa 100644 --- a/src/GBPQAMDecoder.jl +++ b/src/GBPQAMDecoder.jl @@ -22,187 +22,4 @@ include(srcdir("decoding_tasks.jl")) include(srcdir("signal_simulator.jl")) include(srcdir("decoding.jl")) -# model_data_file(power) = datadir( -# "qam_data", -# "model_power_$(power)_wo_noise.parquet" -# ) - -# signal_data_file(power::Int,with_noise::Bool,pilots::Int) = datadir( -# "qam_data", -# "points_power_$(power)_pilots_$(pilots)_$(with_noise ? "w" : "wo")_noise.parquet" -# ) - -# noise_data_file() = datadir("noise_info.parquet") - -# encoding_data_file() = datadir("constellation_info.parquet") - -# dataset(fn) = Parquet2.Dataset(fn)|>DataFrame - -# Base.@kwdef struct AlgorithmParams -# T::Int -# step::Int -# w::Float64=0.25 -# niter::Int=10 -# showprogressinfo::Bool=true -# end - -# AlgorithmParams(d) = AlgorithmParams(;d...) - -# Base.iterate(a::AlgorithmParams,args...) = iterate(struct2dict(a),args...) -# DrWatson.allaccess(::AlgorithmParams) = (:T,:step,:w,:niter) -# DrWatson.default_allowed(::AlgorithmParams) = (Real,) - -# Base.@kwdef struct DecodingTask -# power::Int -# with_noise::Bool -# pilots::Int -# k::Int -# pilots_period::Int=100 -# skip_batches::Int=0 -# load_batches::Int=1_000_000 -# is_simulation::Bool=false -# alg_params::AlgorithmParams=AlgorithmParams(T=10,step=8) -# pilot_point::Tuple{Int,Int}=(1,1) -# qam_encoding::Dict{Tuple{Int,Int},Int}=DataPrep.get_qam_encoding(dataset(encoding_data_file())) -# noise_info::NamedTuple=get_noise_info(dataset(noise_data_file()),power,with_noise) -# model_file::String=model_data_file(power) -# signal_file::String=signal_data_file(power,with_noise,pilots) -# end - -# DecodingTask(d) = DecodingTask(;d...) - -# DrWatson.default_expand(::DecodingTask) = ["alg_params"] -# DrWatson.default_allowed(::DecodingTask) = (Real,Bool,AlgorithmParams) -# DrWatson.allaccess(::DecodingTask) = (:power,:with_noise,:pilots,:pilots_period,:k,:skip_batches,:load_batches,:alg_params) -# DrWatson.default_prefix(t::DecodingTask) = t.is_simulation ? "simulation-decoded" : "points-decoded" - -# function decodingtask2dict(t::DecodingTask) -# d = struct2dict(t) -# d[:alg_params] = struct2dict(t.alg_params) -# return d -# end - -# function decoding_tasks_list(; alg_params=nothing,kw...) -# d = Dict(kw) -# if !isnothing(alg_params) -# d[:alg_params] = alg_params|>pairs|>Dict|>dict_list.|>AlgorithmParams -# end -# return d|>dict_list.|>DecodingTask -# end - -# function add_pilots_info!(signal_table,task_info) -# (;pilots,pilots_period) = task_info -# @chain signal_table begin -# transform!(eachindex=>:t) -# transform!(:t=>ByRow(t->mod(t,1:pilots_period)<=pilots)=>:is_pilot) -# end -# end - -# function collapse_prior!(factors,sequence,T,step,part_idx) -# overlap = part_idx > 1 ? pairs(last(factors.Rs,T-step)) : () -# pilots = (r.i=>r.Ts for r in eachrow(sequence) if r.is_pilot) -# GBPAlgorithm.collapse_prior!(factors,overlap...,pilots...) -# return factors -# end - -# function load_problem_data(task_info) -# (;k,qam_encoding) = task_info -# (;noise_var) = task_info.noise_info -# model_table = @chain task_info.model_file begin -# dataset -# DataPrep.parse_model_data(k,noise_var,qam_encoding) -# end - -# (;load_batches,skip_batches,pilots_period) = task_info -# signal_table = if !task_info.is_simulation -# @chain task_info.signal_file begin -# Parquet2.Dataset -# Tables.rows -# _|>Drop(skip_batches*pilots_period)|>Take(load_batches*pilots_period)|>DataFrame -# DataPrep.parse_signal_data(qam_encoding) -# add_pilots_info!(task_info) -# end -# else -# @warn "Simulating signal..." -# symbol_sequence_length=min(load_batches*pilots_period,size(dataset(task_info.signal_file),1)) -# add_pilots_info!( -# simulate_signal_table( -# task_info, -# symbol_sequence_length, -# model_table, -# qam_encoding -# ), -# task_info -# ) -# end - -# return (;model_table,signal_table) -# end - -# progress_info_msg(prog,showinfo) = showinfo && next!(prog) - -# function task_info_msg(task_info) -# (;T,step) = task_info.alg_params -# (;power,with_noise,pilots,k) = task_info -# @info "Running algorithm:" "Avg. Power [dBm]"=power "With 4.5dB noise"=with_noise "Pilots"=pilots "Number of mixture components"=k "Sequence length"=T "Step"=step -# end - -# function save_info_msg(dir) -# @info "Saved decoded points" "Directory"=dir -# end - -# function get_noise_info(noise_data,power,with_noise) -# @chain noise_data begin -# subset(:pdbm=>ByRow(p->p==power)) -# map(eachrow(_)) do r -# noise_sigma = with_noise ? r.sigma*r.scale : 0.0 -# noise_var = noise_sigma^2 -# (;noise_sigma,noise_var,noise_scale=r.scale) -# end -# only -# end -# end - -# getresults(sequence,step,part_idx) = part_idx > 1 ? last(sequence,step) : sequence - -# maxparts(nsyms,partlen,step) = floor(Int,1 + (nsyms - partlen)/step) -# maxparts(seqdf::DataFrame,partlen,step) = maxparts(size(seqdf,1),partlen,step) - -# function solve_decoding_task(task_info) -# task_info_msg(task_info) -# (;model_table,signal_table) = load_problem_data(task_info) -# (;T,w,niter,step,showprogressinfo) = task_info.alg_params -# N = length(task_info.qam_encoding) -# decode_iter! = GBPAlgorithm.GBPDecoder(N,T,w) -# factors = GBPAlgorithm.Factors(N,T) -# n = maxparts(size(signal_table,1),T,step) -# prog = Progress(n; showspeed=true) -# R = withprogress(eachrow(signal_table); interval=10^-2)|> -# Partition(T,step)|> -# Enumerate()|> -# Map() do (part_idx,part) -# sequence = DataFrame(part) -# transform!(sequence,eachindex=>:i) -# GBPAlgorithm.reset_msg!(decode_iter!) -# DataPrep.memory_factor!(factors.M,model_table,sequence.Rx) -# collapse_prior!(factors,sequence,T,step,part_idx) -# decode_iter!(factors)|>Drop(niter-1)|>Take(1)|>collect|>only -# GBPAlgorithm.beliefs!(factors,decode_iter!) -# transform!(sequence,:Ts=>(x->copy(factors.Rs))=>:Rs) -# transform!(sequence,[:Ts,:Rs]=>((t,r)->t.!=r)=>:error) -# progress_info_msg(prog,showprogressinfo) -# getresults(sequence,step,part_idx) -# end|> -# Take(n)|> -# foldxl(vcat) -# results = select!(R, -# :Tx=>ByRow(((x,y),)->(;Tx_x=x,Tx_y=y))=>AsTable, -# :Rx=>ByRow(((x,y),)->(;Rx_x=x,Rx_y=y))=>AsTable, -# :Ts,:Rs,:error,:is_pilot -# ) - -# decoding_task = decodingtask2dict(task_info) -# return @strdict decoding_task results -# end - end # end Decoding module \ No newline at end of file diff --git a/src/decoding.jl b/src/decoding.jl index 4a0eb9b..48dcc25 100644 --- a/src/decoding.jl +++ b/src/decoding.jl @@ -32,7 +32,7 @@ function load_problem_data(task_info) end else @warn "Simulating signal..." - symbol_sequence_length=min(load_batches*pilots_period,size(dataset(task_info.signal_file),1)) + symbol_sequence_length=min(load_batches*pilots_period,262_000) add_pilots_info!( simulate_signal_table( task_info, @@ -47,12 +47,25 @@ function load_problem_data(task_info) return (;model_table,signal_table) end -progress_info_msg(prog,showinfo) = showinfo && next!(prog) +function progress_info_msg(prog,task_info) + (;power,with_noise,pilots,k) = task_info + (;T,step,w,niter) = task_info.alg_params + showvalues = [(:power,power),(:with_noise,with_noise),(:pilots,pilots), + (:k,k),(:T,T),(:step,step),(:w,w),(:niter,niter)] + task_info.alg_params.showprogressinfo && next!(prog; showvalues) +end + +# function task_info_msg(task_info) +# (;T,step) = task_info.alg_params +# (;power,with_noise,pilots,k) = task_info +# @info "Running algorithm:" "Avg. Power [dBm]"=power "With 4.5dB noise"=with_noise "Pilots"=pilots "Number of mixture components"=k "Sequence length"=T "Step"=step +# end function task_info_msg(task_info) (;T,step) = task_info.alg_params (;power,with_noise,pilots,k) = task_info - @info "Running algorithm:" "Avg. Power [dBm]"=power "With 4.5dB noise"=with_noise "Pilots"=pilots "Number of mixture components"=k "Sequence length"=T "Step"=step + msg = "Deocding: $(power)dBm with$(with_noise ? " " : "out ")noise, $(pilots) pilots,k=$(k),T=$(T),step=$(step)" + rpad(msg,60) end function save_info_msg(dir) @@ -77,14 +90,19 @@ maxparts(nsyms,partlen,step) = floor(Int,1 + (nsyms - partlen)/step) maxparts(seqdf::DataFrame,partlen,step) = maxparts(size(seqdf,1),partlen,step) function solve_decoding_task(task_info) - task_info_msg(task_info) (;model_table,signal_table) = load_problem_data(task_info) (;T,w,niter,step,showprogressinfo) = task_info.alg_params + # showprogressinfo && task_info_msg(task_info) N = length(task_info.qam_encoding) decode_iter! = GBPAlgorithm.GBPDecoder(N,T,w) factors = GBPAlgorithm.Factors(N,T) n = maxparts(size(signal_table,1),T,step) - prog = Progress(n; showspeed=true) + prog = Progress(n; + desc=task_info_msg(task_info), + barglyphs=BarGlyphs('|','█', ['▁' ,'▂' ,'▃' ,'▄' ,'▅' ,'▆', '▇'],' ','|',), + barlen=16, + showspeed=true + ) R = withprogress(eachrow(signal_table); interval=10^-2)|> Partition(T,step)|> Enumerate()|> @@ -98,7 +116,8 @@ function solve_decoding_task(task_info) GBPAlgorithm.beliefs!(factors,decode_iter!) transform!(sequence,:Ts=>(x->copy(factors.Rs))=>:Rs) transform!(sequence,[:Ts,:Rs]=>((t,r)->t.!=r)=>:error) - progress_info_msg(prog,showprogressinfo) + # progress_info_msg(prog,task_info) + showprogressinfo && ProgressMeter.next!(prog) getresults(sequence,step,part_idx) end|> Take(n)|>