Skip to content

Commit

Permalink
Merge pull request #325 from bmx-ng/task/timed-blockingqueue
Browse files Browse the repository at this point in the history
Added time Enqueue and Dequeue methods.
  • Loading branch information
woollybah authored Jul 19, 2024
2 parents 89e84b3 + 4b30c50 commit 2a3dff6
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 29 deletions.
46 changes: 46 additions & 0 deletions collections.mod/examples/blockingqueue_02.bmx
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'
' Demonstrates how to use a blocking queue to synchronize threads.
'
SuperStrict

Framework Brl.StandardIO
Import Brl.Threads
Import Brl.Collections


Function Producer:Object(data:Object)
Local queue:TBlockingQueue<Int> = TBlockingQueue<Int>(data)

For Local i:Int = 1 To 10
Try
Print "Producing " + i
queue.Enqueue(i, 100, ETimeUnit.Milliseconds) ' 100 milliseconds timeout
Delay 100 ' Simulate work
Catch ex:TTimeoutException
Print "Enqueue timed out: " + ex.ToString()
End Try
Next
End Function

Function Consumer:Object(data:Object)
Local queue:TBlockingQueue<Int> = TBlockingQueue<Int>(data)

For Local i:Int = 1 To 10
Try
Local item:Int = queue.Dequeue(1500, ETimeUnit.Milliseconds) ' 1.5 second timeout
Print "Consuming " + item
Delay 1000 ' Simulate work
Catch ex:TTimeoutException
Print "Dequeue timed out: " + ex.ToString()
End Try
Next
End Function

Local queue:TBlockingQueue<Int> = New TBlockingQueue<Int>(5)
Local producerThread:TThread = CreateThread(Producer, queue)
Local consumerThread:TThread = CreateThread(Consumer, queue)

WaitThread(producerThread)
WaitThread(consumerThread)

Print "All tasks are done."
48 changes: 48 additions & 0 deletions collections.mod/queue.bmx
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ SuperStrict
Import "collection.bmx"
?threaded
Import BRL.threads
Import BRL.Time
?

Rem
Expand Down Expand Up @@ -350,6 +351,53 @@ Type TBlockingQueue<T> Extends TQueue<T>
lock.Unlock()
End Method

Rem
bbdoc: Adds an element to the end of the #TBlockingQueue, waiting up to the specified wait time if necessary for space to become available
about: If the queue is full, the operation will block until space becomes available or the specified timeout elapses.
Throws a #TTimeoutException if the operation times out.
End Rem
Method Enqueue(element:T, timeout:ULong, unit:ETimeUnit = ETimeUnit.Milliseconds)
Local timeoutMs:ULong = TimeUnitToMillis(timeout, unit)

Local startTime:ULong = CurrentUnixTime()
lock.Lock()
While full
Local now:ULong = CurrentUnixTime()
If timeout > 0 And now - startTime >= timeoutMs
lock.Unlock()
Throw New TTimeoutException("The operation timed out after " + timeoutMs + "ms")
End If
notFull.TimedWait(lock, Int(timeoutMs - (now - startTime)))
Wend
Super.Enqueue(element)
notEmpty.Signal()
lock.Unlock()
End Method

Rem
bbdoc: Removes and returns the element at the beginning of the #TBlockingQueue, waiting up to the specified wait time if necessary for an element to become available.
about: If the queue is empty, the operation will block until an element becomes available or the specified timeout elapses.
Throws a #TTimeoutException if the operation times out.
End Rem
Method Dequeue:T(timeout:ULong, unit:ETimeUnit = ETimeUnit.Milliseconds)
Local timeoutMs:ULong = TimeUnitToMillis(timeout, unit)

Local startTime:Long = CurrentUnixTime()
lock.Lock()
While IsEmpty()
Local now:ULong = CurrentUnixTime()
If timeout > 0 And now - startTime >= timeoutMs
lock.Unlock()
Throw New TTimeoutException("The operation timed out after " + timeoutMs + "ms")
End If
notEmpty.TimedWait(lock, Int(timeoutMs - (now - startTime)))
Wend
Local element:T = Super.Dequeue()
notFull.Signal()
lock.Unlock()
Return element
End Method

Method Dequeue:T()
lock.Lock()
While IsEmpty()
Expand Down
2 changes: 1 addition & 1 deletion threadpool.mod/examples/scheduled_02.bmx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Import BRL.ThreadPool

Local pool:TScheduledThreadPoolExecutor = TScheduledThreadPoolExecutor.newFixedThreadPool(11)

pool.schedule(New TTask("One-shot Task"), 5, ETimeUnit.Seconds) ' after 5 sceonds
pool.schedule(New TTask("One-shot Task"), 5, ETimeUnit.Seconds) ' after 5 seconds
pool.schedule(New TTask("Recurring Task"), 3, 5, ETimeUnit.Seconds) ' after 3 seconds and then every 5 seconds

Delay(10 * 1000) ' wait for 10 seconds and then shutdown the pool
Expand Down
32 changes: 4 additions & 28 deletions threadpool.mod/threadpool.bmx
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ ModuleInfo "History: Initial Release"

Import BRL.Threads
Import BRL.LinkedList
Import BRL.Time
Import pub.stdc

Rem
Expand Down Expand Up @@ -287,17 +288,6 @@ Public
end method
End Type

Rem
bbdoc: A unit of date-time, such as Days or Hours.
End Rem
Enum ETimeUnit
Milliseconds
Seconds
Minutes
Hours
Days
End Enum

Rem
bbdoc: An executor that can be used to schedule commands to run after a given delay, or to execute commands periodically.
End Rem
Expand Down Expand Up @@ -337,26 +327,12 @@ Type TScheduledThreadPoolExecutor Extends TThreadPoolExecutor
bbdoc: Schedules a recurring command to run after a given initial delay, and subsequently with the given period.
End Rem
Method schedule(command:TRunnable, initialDelay:ULong, period:ULong, unit:ETimeUnit = ETimeUnit.Milliseconds)
Local now:ULong = CurrentUnixTime()
Local now:ULong = CurrentUnixTime()

Local newTask:TScheduledTask = New TScheduledTask

Local delayMs:ULong = initialDelay
Local periodMs:ULong = period
Select unit
Case ETimeUnit.Seconds
delayMs :* 1000
periodMs :* 1000
Case ETimeUnit.Minutes
delayMs :* 60000
periodMs :* 60000
Case ETimeUnit.Hours
delayMs :* 3600000
periodMs :* 3600000
Case ETimeUnit.Days
delayMs :* 86400000
periodMs :* 86400000
End Select
Local delayMs:ULong = TimeUnitToMillis(initialDelay, unit)
Local periodMs:ULong = TimeUnitToMillis(period, unit)

newTask.executeAt = now + delayMs
newTask.intervalMs = periodMs
Expand Down
70 changes: 70 additions & 0 deletions time.mod/time.bmx
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
' Copyright (c)2024 Bruce A Henderson
'
' This software is provided 'as-is', without any express or implied
' warranty. In no event will the authors be held liable for any damages
' arising from the use of this software.
'
' Permission is granted to anyone to use this software for any purpose,
' including commercial applications, and to alter it and redistribute it
' freely, subject to the following restrictions:
'
' 1. The origin of this software must not be misrepresented; you must not
' claim that you wrote the original software. If you use this software
' in a product, an acknowledgment in the product documentation would be
' appreciated but is not required.
'
' 2. Altered source versions must be plainly marked as such, and must not be
' misrepresented as being the original software.
'
' 3. This notice may not be removed or altered from any source
' distribution.
'
SuperStrict

Module BRL.Time

ModuleInfo "Version: 1.0"
ModuleInfo "Author: Bruce A Henderson"
ModuleInfo "License: zlib/libpng"
ModuleInfo "Copyright: Bruce A Henderson"

ModuleInfo "History: 1.00"
ModuleInfo "History: Initial Release"


Rem
bbdoc: A unit of date-time, such as Days or Hours.
End Rem
Enum ETimeUnit
Milliseconds
Seconds
Minutes
Hours
Days
End Enum

Rem
bbdoc: Converts a time value to milliseconds.
End Rem
Function TimeUnitToMillis:ULong( value:ULong, unit:ETimeUnit )
Select unit
Case ETimeUnit.Milliseconds
Return value
Case ETimeUnit.Seconds
Return value * 1000
Case ETimeUnit.Minutes
Return value * 60000
Case ETimeUnit.Hours
Return value * 3600000
Case ETimeUnit.Days
Return value * 86400000
End Select
End Function

Type TTimeoutException Extends TRuntimeException

Method New(message:String)
Super.New(message)
End Method

End Type

1 comment on commit 2a3dff6

@thareh
Copy link
Contributor

@thareh thareh commented on 2a3dff6 Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good stuff! I actually have a very similar TimeUnitToMillis function in my own modules using the code from brl.timer -> TChrono.GetTimestamp()

I see you're using Generics, there's still one "game breaking" bug with generics and imports/modules that I found if you feel inclined, I tried fixing it myself some time ago but I couldn't wrap my head around it: bmx-ng/bcc#657

Thanks!

Please sign in to comment.