|
1 | 1 | package com.malinskiy.marathon.worker
|
2 | 2 |
|
| 3 | +import com.malinskiy.marathon.Marathon |
| 4 | +import com.malinskiy.marathon.di.marathonStartKoin |
3 | 5 | import com.malinskiy.marathon.execution.ComponentInfo
|
4 | 6 | import com.malinskiy.marathon.execution.Configuration
|
5 | 7 | import kotlinx.coroutines.channels.Channel
|
6 | 8 | import java.util.concurrent.CountDownLatch
|
7 |
| -import java.util.concurrent.ExecutorService |
8 | 9 | import java.util.concurrent.Executors
|
9 | 10 | import java.util.concurrent.Future
|
10 | 11 | import java.util.concurrent.TimeUnit
|
11 | 12 | import java.util.concurrent.atomic.AtomicBoolean
|
12 | 13 |
|
13 |
| -class WorkerContext : WorkerHandler { |
14 |
| - |
15 |
| - private lateinit var configuration: Configuration |
| 14 | +internal class WorkerContext(configuration: Configuration) : WorkerHandler { |
| 15 | + private val executor = Executors.newSingleThreadExecutor() |
16 | 16 | private val componentsChannel: Channel<ComponentInfo> = Channel(capacity = Channel.UNLIMITED)
|
17 | 17 |
|
| 18 | + private val application = marathonStartKoin(configuration) |
| 19 | + private val marathon = application.koin.get<Marathon>() |
18 | 20 | private val isRunning = AtomicBoolean(false)
|
19 | 21 | private val startedLatch = CountDownLatch(1)
|
20 | 22 |
|
21 |
| - private lateinit var executor: ExecutorService |
22 | 23 | private lateinit var finishFuture: Future<*>
|
23 | 24 |
|
24 |
| - override fun initialize(configuration: Configuration) { |
25 |
| - this.configuration = configuration |
| 25 | + override fun scheduleTests(componentInfo: ComponentInfo) { |
| 26 | + ensureStarted() |
| 27 | + componentsChannel.trySend(componentInfo) |
26 | 28 | }
|
27 | 29 |
|
28 |
| - override fun ensureStarted() { |
29 |
| - if (isRunning.getAndSet(true)) return |
30 |
| - |
31 |
| - val runnable = WorkerRunnable(componentsChannel, configuration) |
| 30 | + override fun await() { |
| 31 | + if (!isRunning.getAndSet(false)) return |
32 | 32 |
|
33 |
| - executor = Executors.newSingleThreadExecutor() |
34 |
| - finishFuture = executor.submit(runnable) |
| 33 | + startedLatch.await(WAITING_FOR_START_TIMEOUT_MINUTES, TimeUnit.MINUTES) |
| 34 | + componentsChannel.close() |
35 | 35 |
|
36 |
| - startedLatch.countDown() |
| 36 | + try { |
| 37 | + // Use future to propagate all exceptions from runnable |
| 38 | + finishFuture.get() |
| 39 | + } finally { |
| 40 | + executor.shutdown() |
| 41 | + } |
37 | 42 | }
|
38 | 43 |
|
39 |
| - override fun scheduleTests(componentInfo: ComponentInfo) { |
40 |
| - componentsChannel.trySend(componentInfo) |
| 44 | + override fun close() { |
| 45 | + isRunning.set(false) |
| 46 | + componentsChannel.close() |
| 47 | + executor.shutdown() |
| 48 | + application.close() |
41 | 49 | }
|
42 | 50 |
|
43 |
| - override fun await() { |
44 |
| - if (isRunning.get()) { |
45 |
| - startedLatch.await(WAITING_FOR_START_TIMEOUT_MINUTES, TimeUnit.MINUTES) |
46 |
| - componentsChannel.close() |
47 |
| - |
48 |
| - try { |
49 |
| - // Use future to propagate all exceptions from runnable |
50 |
| - finishFuture.get() |
51 |
| - } finally { |
52 |
| - executor.shutdown() |
53 |
| - } |
54 |
| - } |
| 51 | + private fun ensureStarted() { |
| 52 | + if (isRunning.getAndSet(true)) return |
| 53 | + |
| 54 | + val runnable = WorkerRunnable(marathon, componentsChannel) |
| 55 | + finishFuture = executor.submit(runnable) |
| 56 | + |
| 57 | + startedLatch.countDown() |
55 | 58 | }
|
56 | 59 |
|
57 | 60 | private companion object {
|
|
0 commit comments