diff --git a/Project.toml b/Project.toml index 7050fff..520548d 100644 --- a/Project.toml +++ b/Project.toml @@ -6,6 +6,7 @@ version = "0.1.0" [deps] CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" +Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c" @@ -13,8 +14,8 @@ Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c" julia = "1" [extras] -Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] test = ["Test", "DataFrames"] diff --git a/src/connector.jl b/src/connector.jl index 6e64fb5..896954b 100644 --- a/src/connector.jl +++ b/src/connector.jl @@ -1,4 +1,5 @@ using Tables +using Dates abstract type AbstractConnector end @@ -12,7 +13,7 @@ mutable struct TablesConnector <: AbstractConnector args::Dict{Symbol, Any} end -function TablesConnector(data; shuffle::Bool = false) +function TablesConnector(data; shuffle::Bool = false, timestamp::Bool = false) if !Tables.istable(data) throw(ArgumentError("data must have the Tables.jl interface")) end @@ -21,17 +22,23 @@ function TablesConnector(data; shuffle::Bool = false) data = data[Random.shuffle(1:size(data,1)), :] end + if timestamp + rightnow = Dates.now(); + TimestampModifier = EasyStream.AlterDataModifier((data,event)-> data[:, :timestamp] .= rightnow) + apply!(TimestampModifier, data, Event(Dict{Symbol, Any}())) + end + return TablesConnector(Tables.rows(data), 0, Dict{Symbol, Any}()) end -function TablesConnector(data, orderBy::Symbol; rev::Bool = false) +function TablesConnector(data, orderBy::Symbol; rev::Bool = false, timestamp::Bool = false) if !(orderBy in propertynames(data)) throw(ArgumentError("data doesn't have the column $orderBy")) end data = sort(data, orderBy, rev = rev) - return TablesConnector(data) + return TablesConnector(data, timestamp = timestamp) end TablesConnector(filename::String) = TablesConnector(CSV.read(filename; header = false)) @@ -64,6 +71,6 @@ end function next(conn::GeneratorConnector) total = 100 data = conn.generator(;n_samples = total, conn.args...) - + return DataFrame(data[1 + Int(floor(rand(1,1)[1] .* size(data)[1])), :]) end diff --git a/test/modifiers.jl b/test/modifiers.jl index 2349c34..0cc4f57 100644 --- a/test/modifiers.jl +++ b/test/modifiers.jl @@ -24,4 +24,12 @@ @test_logs (:warn, "There are duplicate columns.") EasyStream.FilterModifier([:x, :x, :y]) @test_logs (:warn, "There are duplicate columns.") EasyStream.FilterModifier(:x, :x, :y) + + + df = DataFrame(x = [1, 2, 3, 4, 5, 6], y = [6, 5, 4, 3, 2, 1], z = [6, 5, 4, 3, 2, 1]) + conn_df = EasyStream.TablesConnector(df, timestamp = true); + stream = EasyStream.BatchStream(conn_df; batch = 2); + + @test size(EasyStream.listen(stream), 2) == 4 + end