-
-
Notifications
You must be signed in to change notification settings - Fork 422
Limit parallel Scala.js linking jobs to avoid high memory pressure #6260
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
e85906b
d2aa908
8fe304b
5e8d01b
fa300b4
ae3fd39
20fbd87
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| package mill.scalajslib.worker | ||
|
|
||
| import java.util.concurrent.Semaphore | ||
|
|
||
| /** | ||
| * Limit the parallelism of jobs run via [[runLimited]]. | ||
| * @param maxJobs The maximal parallelism | ||
| */ | ||
| class ParallelismLimiter(maxJobs: Int) { | ||
|
|
||
| private val linkerJobsSemaphore = Semaphore(maxJobs) | ||
|
|
||
| def runLimited[T](thunk: => T): T = { | ||
| linkerJobsSemaphore.acquire() | ||
| try { | ||
| thunk | ||
| } finally { | ||
| linkerJobsSemaphore.release() | ||
| } | ||
| } | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,7 @@ package mill.scalajslib.worker | |
|
|
||
| import mill.* | ||
| import mill.scalajslib.api | ||
| import mill.scalajslib.worker.{api => workerApi} | ||
| import mill.scalajslib.worker.api as workerApi | ||
| import mill.api.TaskCtx | ||
| import mill.api.Result | ||
| import mill.api.daemon.internal.internal | ||
|
|
@@ -13,7 +13,7 @@ import java.io.File | |
| import java.net.URLClassLoader | ||
|
|
||
| @internal | ||
| private[scalajslib] class ScalaJSWorker(jobs: Int) | ||
| private[scalajslib] class ScalaJSWorker(jobs: Int, linkerJobs: Int) | ||
| extends CachedFactory[Seq[mill.PathRef], (URLClassLoader, workerApi.ScalaJSWorkerApi)] { | ||
|
|
||
| override def setup(key: Seq[PathRef]) = { | ||
|
|
@@ -190,6 +190,8 @@ private[scalajslib] class ScalaJSWorker(jobs: Int) | |
| } | ||
| } | ||
|
|
||
| private val linkerJobLimiter = ParallelismLimiter(linkerJobs) | ||
|
|
||
| def link( | ||
| toolsClasspath: Seq[mill.PathRef], | ||
| runClasspath: Seq[mill.PathRef], | ||
|
|
@@ -207,7 +209,7 @@ private[scalajslib] class ScalaJSWorker(jobs: Int) | |
| minify: Boolean, | ||
| importMap: Seq[api.ESModuleImportMapping], | ||
| experimentalUseWebAssembly: Boolean | ||
| ): Result[api.Report] = { | ||
| ): Result[api.Report] = linkerJobLimiter.runLimited { | ||
| withValue(toolsClasspath) { case (_, bridge) => | ||
| bridge.link( | ||
| runClasspath = runClasspath.iterator.map(_.path.toNIO).toSeq, | ||
|
|
@@ -258,7 +260,11 @@ private[scalajslib] class ScalaJSWorker(jobs: Int) | |
| @internal | ||
| private[scalajslib] object ScalaJSWorkerExternalModule extends mill.api.ExternalModule { | ||
|
|
||
| def scalaJSWorker: Worker[ScalaJSWorker] = | ||
| Task.Worker { new ScalaJSWorker(Task.ctx().jobs) } | ||
| def scalaJSWorker: Worker[ScalaJSWorker] = Task.Worker { | ||
| new ScalaJSWorker( | ||
| jobs = Task.ctx().jobs, | ||
| linkerJobs = 2 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a suggestion: Perhaps this value could be exposed on Exposing it on the API also slightly improves the transparency around what's going on here, but perhaps this will need to be documented somehow?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I already thought about how to configure it, but didn't want to overengineer it. The natural place for a config task would be the What would be somewhat easier is accepting an environment variable. Also, we should converge to a "sensible default". I don't work with Scala.JS often, so I have no "feeling" for what a good value might be. We might also apply some logic based on heuristics, which I don't have.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure what reasonable heuristics you could sensibly apply, and I suspect attempting to do that might be a lot of work for not a lot of reward. 🤷 FWIW, @lolgab was suggesting a concurrency of |
||
| ) | ||
| } | ||
| lazy val millDiscover = Discover[this.type] | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| package mill.scalajslib.worker | ||
|
|
||
| import utest.* | ||
|
|
||
| import java.util.concurrent.atomic.AtomicInteger | ||
|
|
||
| class ParallelismLimiterTests extends TestSuite { | ||
|
|
||
| override def tests = Tests { | ||
| test("limitedJobs") { | ||
|
|
||
| val maxJobs = 3 | ||
| val limiter = ParallelismLimiter(maxJobs) | ||
|
|
||
| val concurrentCount = new AtomicInteger(0) | ||
| val maxObserved = new AtomicInteger(0) | ||
|
|
||
| def work(i: Int, workTimeMs: Int): Unit = { | ||
| val before = concurrentCount.incrementAndGet() | ||
| maxObserved.updateAndGet(v => Math.max(v, before)) | ||
|
|
||
| Thread.sleep(workTimeMs) | ||
|
|
||
| val after = concurrentCount.decrementAndGet() | ||
| assert(after >= 0) | ||
| } | ||
|
|
||
| val tasks = (1 to 10).map { i => | ||
| new Thread(() => | ||
| limiter.runLimited { | ||
| work(i, 50) | ||
| } | ||
| ) | ||
| } | ||
|
|
||
| tasks.foreach(_.start()) | ||
| tasks.foreach(_.join()) | ||
|
|
||
| assert(maxObserved.get() <= maxJobs) | ||
| } | ||
|
|
||
| } | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -297,7 +297,12 @@ class ScalaJSWorkerImpl(jobs: Int) extends ScalaJSWorkerApi { | |
| Left(e.getMessage) | ||
| } | ||
|
|
||
| Await.result(resultFuture, Duration.Inf) | ||
| val res = Await.result(resultFuture, Duration.Inf) | ||
| linker match { | ||
| case cl: ClearableLinker => cl.clear() | ||
| case _ => // no-op | ||
| } | ||
| res | ||
|
||
| } | ||
|
|
||
| def run(config: JsEnvConfig, report: Report): Unit = { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should pass
linkerJobsinstead ofjobstoSince we are running two linker jobs at a time to save memory, if we store 8 different ones in memory, we aren't saving as much memory as we want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My working hypothesis was, that the high memory usage is required while the linking is in process, but most of it gets freed afterwards. That means, by delaying/synchronizing linking jobs, we already reduce the memory pressure. #6260 (comment) seems to support or at least not counter support this hypothesis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider that the test was performed with the code to clear the linker after every link step, which makes sense to clean the linker memory afterwards. If we keep the linkers in memory and do not clear them, the test could give different results.