Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package mill.scalajslib.worker

import java.util.concurrent.Semaphore

/**
* Limit the parallelism of jobs run via [[runLimited]].
* @param maxJobs The maximal parallelism
*/
class ParallelismLimiter(maxJobs: Int) {

private val linkerJobsSemaphore = Semaphore(maxJobs)

def runLimited[T](thunk: => T): T = {
linkerJobsSemaphore.acquire()
try {
thunk
} finally {
linkerJobsSemaphore.release()
}
}

}
16 changes: 11 additions & 5 deletions libs/scalajslib/src/mill/scalajslib/worker/ScalaJSWorker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package mill.scalajslib.worker

import mill.*
import mill.scalajslib.api
import mill.scalajslib.worker.{api => workerApi}
import mill.scalajslib.worker.api as workerApi
import mill.api.TaskCtx
import mill.api.Result
import mill.api.daemon.internal.internal
Expand All @@ -13,7 +13,7 @@ import java.io.File
import java.net.URLClassLoader

@internal
private[scalajslib] class ScalaJSWorker(jobs: Int)
private[scalajslib] class ScalaJSWorker(jobs: Int, linkerJobs: Int)
extends CachedFactory[Seq[mill.PathRef], (URLClassLoader, workerApi.ScalaJSWorkerApi)] {

override def setup(key: Seq[PathRef]) = {
Expand Down Expand Up @@ -190,6 +190,8 @@ private[scalajslib] class ScalaJSWorker(jobs: Int)
}
}

private val linkerJobLimiter = ParallelismLimiter(linkerJobs)
Copy link
Member

@lolgab lolgab Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should pass linkerJobs instead of jobs to

    val bridge = cl
      .loadClass("mill.scalajslib.worker.ScalaJSWorkerImpl")
      .getDeclaredConstructor(classOf[Int])
      .newInstance(jobs)
      .asInstanceOf[workerApi.ScalaJSWorkerApi]

Since we are running two linker jobs at a time to save memory, if we store 8 different ones in memory, we aren't saving as much memory as we want.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My working hypothesis was, that the high memory usage is required while the linking is in process, but most of it gets freed afterwards. That means, by delaying/synchronizing linking jobs, we already reduce the memory pressure. #6260 (comment) seems to support or at least not counter support this hypothesis.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider that the test was performed with the code to clear the linker after every link step, which makes sense to clean the linker memory afterwards. If we keep the linkers in memory and do not clear them, the test could give different results.


def link(
toolsClasspath: Seq[mill.PathRef],
runClasspath: Seq[mill.PathRef],
Expand All @@ -207,7 +209,7 @@ private[scalajslib] class ScalaJSWorker(jobs: Int)
minify: Boolean,
importMap: Seq[api.ESModuleImportMapping],
experimentalUseWebAssembly: Boolean
): Result[api.Report] = {
): Result[api.Report] = linkerJobLimiter.runLimited {
withValue(toolsClasspath) { case (_, bridge) =>
bridge.link(
runClasspath = runClasspath.iterator.map(_.path.toNIO).toSeq,
Expand Down Expand Up @@ -258,7 +260,11 @@ private[scalajslib] class ScalaJSWorker(jobs: Int)
@internal
private[scalajslib] object ScalaJSWorkerExternalModule extends mill.api.ExternalModule {

def scalaJSWorker: Worker[ScalaJSWorker] =
Task.Worker { new ScalaJSWorker(Task.ctx().jobs) }
def scalaJSWorker: Worker[ScalaJSWorker] = Task.Worker {
new ScalaJSWorker(
jobs = Task.ctx().jobs,
linkerJobs = 2
)
}
lazy val millDiscover = Discover[this.type]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package mill.scalajslib.worker

import utest.*

import java.util.concurrent.atomic.AtomicInteger

class ParallelismLimiterTests extends TestSuite {

override def tests = Tests {
test("limitedJobs") {

val maxJobs = 3
val limiter = ParallelismLimiter(maxJobs)

val concurrentCount = new AtomicInteger(0)
val maxObserved = new AtomicInteger(0)

def work(i: Int, workTimeMs: Int): Unit = {
val before = concurrentCount.incrementAndGet()
maxObserved.updateAndGet(v => Math.max(v, before))

Thread.sleep(workTimeMs)

val after = concurrentCount.decrementAndGet()
assert(after >= 0)
}

val tasks = (1 to 10).map { i =>
new Thread(() =>
limiter.runLimited {
work(i, 50)
}
)
}

tasks.foreach(_.start())
tasks.foreach(_.join())

assert(maxObserved.get() <= maxJobs)
}

}

}
Loading