Skip to content

Commit

Permalink
Second try for “Async IO for HttpsDataVault, Fox Error Handling” (#7155)
Browse files Browse the repository at this point in the history
* Revert "Revert "Async IO for HttpsDataVault, Fox Error Handling (#7137)" (#7154)"

This reverts commit 723e207.

* separate RemoteSourceDescriptorService and DataVaultService

* remove unused imports
  • Loading branch information
fm3 authored Jun 19, 2023
1 parent 723e207 commit bdc2828
Show file tree
Hide file tree
Showing 46 changed files with 509 additions and 528 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released
- Creating bounding boxes can now be done by dragging the left mouse button (when the bounding box tool is selected). To move around in the dataset while this tool is active, keep ALT pressed. [#7118](https://github.com/scalableminds/webknossos/pull/7118)
- Agglomerate skeletons can only be modified if the proofreading tool is active so they stay in sync with the underlying segmentation and agglomerate graph. Agglomerate skeletons cannot be modified using any other means. They are marked in the skeleton list using the clipboard icon of the proofreading tool. When exporting skeletons in the NML format, trees ("things") now have a `type` property which is either "DEFAULT" or "AGGLOMERATE". [#7086](https://github.com/scalableminds/webknossos/pull/7086)
- The cache for remote dataset array contents can now have a configured size in bytes. New config option `datastore.cache.imageArrayChunks.maxSizeBytes`. Default is 2 GB, consider increasing for production. [#7067](https://github.com/scalableminds/webknossos/pull/7067)
- Optimized processing of parallel requests for remote datasets, improving performance and reducing idle waiting. [#7137](https://github.com/scalableminds/webknossos/pull/7137)

### Fixed
- Fixed a bug where some volume annotations could not be downloaded. [#7115](https://github.com/scalableminds/webknossos/pull/7115)
Expand Down
2 changes: 2 additions & 0 deletions app/WebKnossosModule.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import com.google.inject.AbstractModule
import com.scalableminds.webknossos.datastore.storage.DataVaultService
import controllers.InitialDataService
import models.analytics.AnalyticsSessionService
import models.annotation.{AnnotationMutexService, AnnotationStore}
Expand Down Expand Up @@ -28,6 +29,7 @@ class WebKnossosModule extends AbstractModule {
bind(classOf[AnnotationMutexService]).asEagerSingleton()
bind(classOf[DataSetService]).asEagerSingleton()
bind(classOf[TimeSpanService]).asEagerSingleton()
bind(classOf[DataVaultService]).asEagerSingleton()
bind(classOf[TempFileService]).asEagerSingleton()
bind(classOf[MailchimpTicker]).asEagerSingleton()
bind(classOf[JobService]).asEagerSingleton()
Expand Down
8 changes: 4 additions & 4 deletions app/models/binary/credential/CredentialService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package models.binary.credential
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.storage.{
DataVaultCredential,
DataVaultsHolder,
DataVaultService,
GoogleServiceAccountCredential,
HttpBasicAuthCredential,
S3AccessKeyCredential
Expand All @@ -24,21 +24,21 @@ class CredentialService @Inject()(credentialDAO: CredentialDAO) {
userId: ObjectId,
organizationId: ObjectId): Option[DataVaultCredential] =
uri.getScheme match {
case DataVaultsHolder.schemeHttps | DataVaultsHolder.schemeHttp =>
case DataVaultService.schemeHttps | DataVaultService.schemeHttp =>
credentialIdentifier.map(
username =>
HttpBasicAuthCredential(uri.toString,
username,
credentialSecret.getOrElse(""),
userId.toString,
organizationId.toString))
case DataVaultsHolder.schemeS3 =>
case DataVaultService.schemeS3 =>
(credentialIdentifier, credentialSecret) match {
case (Some(keyId), Some(secretKey)) =>
Some(S3AccessKeyCredential(uri.toString, keyId, secretKey, userId.toString, organizationId.toString))
case _ => None
}
case DataVaultsHolder.schemeGS =>
case DataVaultService.schemeGS =>
for {
secret <- credentialSecret
secretJson <- tryo(Json.parse(secret)).toOption
Expand Down
8 changes: 5 additions & 3 deletions app/models/binary/explore/ExploreRemoteLayerService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import com.scalableminds.webknossos.datastore.datareaders.zarr._
import com.scalableminds.webknossos.datastore.datareaders.zarr3.Zarr3ArrayHeader
import com.scalableminds.webknossos.datastore.datavault.VaultPath
import com.scalableminds.webknossos.datastore.models.datasource._
import com.scalableminds.webknossos.datastore.storage.{DataVaultsHolder, RemoteSourceDescriptor}
import com.scalableminds.webknossos.datastore.storage.{DataVaultService, RemoteSourceDescriptor}
import com.typesafe.scalalogging.LazyLogging
import models.binary.credential.CredentialService
import models.user.User
Expand All @@ -38,7 +38,9 @@ object ExploreRemoteDatasetParameters {
implicit val jsonFormat: OFormat[ExploreRemoteDatasetParameters] = Json.format[ExploreRemoteDatasetParameters]
}

class ExploreRemoteLayerService @Inject()(credentialService: CredentialService) extends FoxImplicits with LazyLogging {
class ExploreRemoteLayerService @Inject()(credentialService: CredentialService, dataVaultService: DataVaultService)
extends FoxImplicits
with LazyLogging {

def exploreRemoteDatasource(
urisWithCredentials: List[ExploreRemoteDatasetParameters],
Expand Down Expand Up @@ -172,7 +174,7 @@ class ExploreRemoteLayerService @Inject()(credentialService: CredentialService)
requestingUser._organization)
remoteSource = RemoteSourceDescriptor(uri, credentialOpt)
credentialId <- Fox.runOptional(credentialOpt)(c => credentialService.insertOne(c)) ?~> "dataVault.credential.insert.failed"
remotePath <- DataVaultsHolder.getVaultPath(remoteSource) ?~> "dataVault.setup.failed"
remotePath <- dataVaultService.getVaultPath(remoteSource) ?~> "dataVault.setup.failed"
layersWithVoxelSizes <- exploreRemoteLayersForRemotePath(
remotePath,
credentialId.map(_.toString),
Expand Down
2 changes: 0 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ object Dependencies {
private val jhdf = "cisd" % "jhdf5" % "19.04.0"
private val ucarCdm = "edu.ucar" % "cdm-core" % "5.3.3"
private val jblosc = "org.lasersonlab" % "jblosc" % "1.0.1"
private val sttp = "com.softwaremill.sttp.client3" %% "core" % "3.8.11"
private val guava = "com.google.guava" % "guava" % "18.0"
private val awsS3 = "com.amazonaws" % "aws-java-sdk-s3" % "1.12.288"
private val tika = "org.apache.tika" % "tika-core" % "1.5"
Expand Down Expand Up @@ -107,7 +106,6 @@ object Dependencies {
awsS3,
tika,
jblosc,
sttp,
commonsCompress,
googleCloudStorage,
googleCloudStorageNio,
Expand Down
47 changes: 31 additions & 16 deletions test/backend/DataVaultTestSuite.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package backend

import com.scalableminds.util.tools.Fox
import org.scalatestplus.play.PlaySpec

import java.net.URI
Expand All @@ -12,33 +13,42 @@ import com.scalableminds.webknossos.datastore.datavault.{
VaultPath
}
import com.scalableminds.webknossos.datastore.storage.RemoteSourceDescriptor
import play.api.test.WsTestClient

import scala.collection.immutable.NumericRange
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.{global => globalExecutionContext}

class DataVaultTestSuite extends PlaySpec {

val openFoxJustification = "Opening Fox in Unit Test Context"

"Data vault" when {
"using Range requests" when {
val range: NumericRange[Long] = Range.Long(0, 1024, 1)
val dataKey = "32_32_40/15360-15424_8384-8448_3520-3584" // when accessed via range request, the response body is 1024 bytes long, otherwise 124.8 KB

"with HTTP Vault" should {
"return correct response" in {
val uri = new URI("http://storage.googleapis.com/")
val vaultPath = new VaultPath(uri, HttpsDataVault.create(RemoteSourceDescriptor(uri, None)))
val bytes =
(vaultPath / s"neuroglancer-fafb-data/fafb_v14/fafb_v14_orig/$dataKey").readBytes(Some(range)).get

assert(bytes.length == range.length)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
WsTestClient.withClient { ws =>
val uri = new URI("http://storage.googleapis.com/")
val vaultPath = new VaultPath(uri, HttpsDataVault.create(RemoteSourceDescriptor(uri, None), ws))
val bytes =
(vaultPath / s"neuroglancer-fafb-data/fafb_v14/fafb_v14_orig/$dataKey")
.readBytes(Some(range))(globalExecutionContext)
.get(openFoxJustification)

assert(bytes.length == range.length)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
}
}
}

"with Google Cloud Storage Vault" should {
"return correct response" in {
val uri = new URI("gs://neuroglancer-fafb-data/fafb_v14/fafb_v14_orig")
val vaultPath = new VaultPath(uri, GoogleCloudDataVault.create(RemoteSourceDescriptor(uri, None)))
val bytes = (vaultPath / dataKey).readBytes(Some(range)).get
val bytes = (vaultPath / dataKey).readBytes(Some(range))(globalExecutionContext).get(openFoxJustification)

assert(bytes.length == range.length)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
Expand All @@ -51,20 +61,24 @@ class DataVaultTestSuite extends PlaySpec {

"with HTTP Vault" should {
"return correct response" in {
val uri = new URI("http://storage.googleapis.com/")
val vaultPath = new VaultPath(uri, HttpsDataVault.create(RemoteSourceDescriptor(uri, None)))
val bytes = (vaultPath / s"neuroglancer-fafb-data/fafb_v14/fafb_v14_orig/$dataKey").readBytes().get

assert(bytes.length == dataLength)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
WsTestClient.withClient { ws =>
val uri = new URI("http://storage.googleapis.com/")
val vaultPath = new VaultPath(uri, HttpsDataVault.create(RemoteSourceDescriptor(uri, None), ws))
val bytes = (vaultPath / s"neuroglancer-fafb-data/fafb_v14/fafb_v14_orig/$dataKey")
.readBytes()(globalExecutionContext)
.get(openFoxJustification)

assert(bytes.length == dataLength)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
}
}
}

"with Google Cloud Storage Vault" should {
"return correct response" in {
val uri = new URI("gs://neuroglancer-fafb-data/fafb_v14/fafb_v14_orig")
val vaultPath = new VaultPath(uri, GoogleCloudDataVault.create(RemoteSourceDescriptor(uri, None)))
val bytes = (vaultPath / dataKey).readBytes().get
val bytes = (vaultPath / dataKey).readBytes()(globalExecutionContext).get(openFoxJustification)

assert(bytes.length == dataLength)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
Expand All @@ -74,7 +88,8 @@ class DataVaultTestSuite extends PlaySpec {

"using vault path" when {
class MockDataVault extends DataVault {
override def readBytes(path: VaultPath, range: RangeSpecifier): (Array[Byte], Encoding.Value) = ???
override def readBytesAndEncoding(path: VaultPath, range: RangeSpecifier)(
implicit ec: ExecutionContext): Fox[(Array[Byte], Encoding.Value)] = ???
}

"Uri has no trailing slash" should {
Expand Down
22 changes: 21 additions & 1 deletion util/src/main/scala/com/scalableminds/util/tools/Fox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package com.scalableminds.util.tools
import net.liftweb.common.{Box, Empty, Failure, Full}
import play.api.libs.json.{JsError, JsResult, JsSuccess}

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.reflect.ClassTag
import scala.util.{Success, Try}

Expand Down Expand Up @@ -246,9 +247,20 @@ object Fox extends FoxImplicits {
class Fox[+A](val futureBox: Future[Box[A]])(implicit ec: ExecutionContext) {
val self: Fox[A] = this

// Add error message in case of Failure and Empty (wrapping Empty in a Failure)
def ?~>(s: String): Fox[A] =
new Fox(futureBox.map(_ ?~! s))

// Add error message only in case of Failure, pass through Empty
def ?=>(s: String): Fox[A] =
futureBox.flatMap {
case f: Failure =>
new Fox(Future.successful(f)) ?~> s
case Full(value) => Fox.successful(value)
case Empty => Fox.empty
}

// Add http error code in case of Failure or Empty (wrapping Empty in a Failure)
def ~>[T](errorCode: => T): Fox[A] =
new Fox(futureBox.map(_ ~> errorCode))

Expand Down Expand Up @@ -311,6 +323,14 @@ class Fox[+A](val futureBox: Future[Box[A]])(implicit ec: ExecutionContext) {
}
}).flatMap(identity)

/*
Awaits the future and opens the box. Do not use this in production code!
*/
def get(justification: String, awaitTimeout: FiniteDuration = 10 seconds): A = {
val box = Await.result(futureBox, awaitTimeout)
box.openOrThrowException(justification)
}

/**
* Helper to force an implicit conversation
*/
Expand Down
15 changes: 5 additions & 10 deletions util/src/main/scala/com/scalableminds/util/tools/JsonHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import play.api.libs.json._
import play.api.libs.json.Reads._
import play.api.libs.json.Writes._

import scala.concurrent.ExecutionContext.Implicits._
import java.nio.charset.StandardCharsets
import scala.concurrent.duration._
import scala.io.{BufferedSource, Source}

object JsonHelper extends BoxImplicits with LazyLogging {

def jsonToFile[A: Writes](path: Path, value: A) =
def jsonToFile[A: Writes](path: Path, value: A): Box[Unit] =
FileIO.printToFile(path.toFile) { printer =>
printer.print(Json.prettyPrint(Json.toJson(value)))
}
Expand Down Expand Up @@ -101,6 +101,9 @@ object JsonHelper extends BoxImplicits with LazyLogging {
}
}

def parseAndValidateJson[T: Reads](bytes: Array[Byte]): Box[T] =
parseAndValidateJson[T](new String(bytes, StandardCharsets.UTF_8))

def parseAndValidateJson[T: Reads](s: String): Box[T] =
tryo(Json.parse(s))
.flatMap(parsed => validateJsValue[T](parsed)) ~> "Failed to parse or validate json against data schema"
Expand All @@ -113,14 +116,6 @@ object JsonHelper extends BoxImplicits with LazyLogging {
Failure("Validating Json Failed: " + JsError.toJson(errors).toString())
}

def jsResultToFox[T](result: JsResult[T]): Fox[T] =
result match {
case JsSuccess(parsed, _) =>
Fox.successful(parsed)
case errors: JsError =>
Fox.failure("Validating Json Failed: " + JsError.toJson(errors).toString())
}

def jsResultToOpt[T](result: JsResult[T]): Option[T] =
result match {
case JsSuccess(parsed, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.actor.ActorSystem
import com.google.inject.AbstractModule
import com.google.inject.name.Names
import com.scalableminds.webknossos.datastore.services._
import com.scalableminds.webknossos.datastore.storage.DataVaultService

class DataStoreModule extends AbstractModule {

Expand All @@ -16,6 +17,7 @@ class DataStoreModule extends AbstractModule {
bind(classOf[DataSourceRepository]).asEagerSingleton()
bind(classOf[UploadService]).asEagerSingleton()
bind(classOf[DataSourceService]).asEagerSingleton()
bind(classOf[DataVaultService]).asEagerSingleton()
bind(classOf[DSRemoteWebKnossosClient]).asEagerSingleton()
bind(classOf[BinaryDataServiceHolder]).asEagerSingleton()
bind(classOf[MappingService]).asEagerSingleton()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import com.scalableminds.util.tools.{Fox, FoxImplicits}
import com.scalableminds.webknossos.datastore.datavault.{FileSystemVaultPath, VaultPath}
import com.scalableminds.webknossos.datastore.models.BucketPosition
import com.scalableminds.webknossos.datastore.models.requests.DataReadInstruction
import com.scalableminds.webknossos.datastore.storage.{DataCubeCache, DataVaultService}
import com.scalableminds.webknossos.datastore.storage.{DataCubeCache, RemoteSourceDescriptorService}
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.Empty

import scala.concurrent.ExecutionContext

trait BucketProvider extends FoxImplicits with LazyLogging {

def dataVaultServiceOpt: Option[DataVaultService]
def remoteSourceDescriptorServiceOpt: Option[RemoteSourceDescriptorService]

// To be defined in subclass.
def loadFromUnderlying(readInstruction: DataReadInstruction)(implicit ec: ExecutionContext): Fox[DataCubeHandle] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.scalableminds.webknossos.datastore.dataformats
import com.scalableminds.util.geometry.Vec3Int
import com.scalableminds.webknossos.datastore.datareaders.AxisOrder
import com.scalableminds.webknossos.datastore.models.datasource.ResolutionFormatHelper
import com.scalableminds.webknossos.datastore.storage.{DataVaultsHolder, LegacyDataVaultCredential}
import com.scalableminds.webknossos.datastore.storage.{DataVaultService, LegacyDataVaultCredential}
import play.api.libs.json.{Json, OFormat}

import java.net.URI
Expand All @@ -17,7 +17,7 @@ case class MagLocator(mag: Vec3Int,

lazy val pathWithFallback: String = path.getOrElse(mag.toMagLiteral(allowScalar = true))
lazy val uri: URI = new URI(pathWithFallback)
lazy val isRemote: Boolean = DataVaultsHolder.isSupportedRemoteScheme(uri.getScheme)
lazy val isRemote: Boolean = DataVaultService.isSupportedRemoteScheme(uri.getScheme)
}

object MagLocator extends ResolutionFormatHelper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ package com.scalableminds.webknossos.datastore.dataformats.n5

import com.scalableminds.util.cache.AlfuCache
import com.scalableminds.util.geometry.Vec3Int
import com.scalableminds.util.tools.{Fox, TextUtils}
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.dataformats.{BucketProvider, DataCubeHandle, MagLocator}
import com.scalableminds.webknossos.datastore.datareaders.n5.N5Array
import com.scalableminds.webknossos.datastore.datavault.VaultPath
import com.scalableminds.webknossos.datastore.models.BucketPosition
import com.scalableminds.webknossos.datastore.models.datasource.DataSourceId
import com.scalableminds.webknossos.datastore.models.requests.DataReadInstruction
import com.scalableminds.webknossos.datastore.storage.DataVaultService
import com.scalableminds.webknossos.datastore.storage.RemoteSourceDescriptorService
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.Empty
import net.liftweb.util.Helpers.tryo
import ucar.ma2.{Array => MultiArray}

import scala.concurrent.ExecutionContext
Expand All @@ -31,7 +30,7 @@ class N5CubeHandle(n5Array: N5Array) extends DataCubeHandle with LazyLogging {

class N5BucketProvider(layer: N5Layer,
dataSourceId: DataSourceId,
val dataVaultServiceOpt: Option[DataVaultService],
val remoteSourceDescriptorServiceOpt: Option[RemoteSourceDescriptorService],
sharedChunkContentsCache: Option[AlfuCache[String, MultiArray]])
extends BucketProvider
with LazyLogging {
Expand All @@ -44,15 +43,15 @@ class N5BucketProvider(layer: N5Layer,
n5MagOpt match {
case None => Fox.empty
case Some(n5Mag) =>
dataVaultServiceOpt match {
case Some(dataVaultService: DataVaultService) =>
remoteSourceDescriptorServiceOpt match {
case Some(remoteSourceDescriptorService: RemoteSourceDescriptorService) =>
for {
magPath: VaultPath <- if (n5Mag.isRemote) {
dataVaultService.vaultPathFor(n5Mag)
remoteSourceDescriptorService.vaultPathFor(n5Mag)
} else localPathFrom(readInstruction, n5Mag.pathWithFallback)
chunkContentsCache <- sharedChunkContentsCache
cubeHandle <- tryo(onError = (e: Throwable) => logger.error(TextUtils.stackTraceAsString(e)))(N5Array
.open(magPath, dataSourceId, layer.name, n5Mag.axisOrder, n5Mag.channelIndex, chunkContentsCache))
chunkContentsCache <- sharedChunkContentsCache.toFox
cubeHandle <- N5Array
.open(magPath, dataSourceId, layer.name, n5Mag.axisOrder, n5Mag.channelIndex, chunkContentsCache)
.map(new N5CubeHandle(_))
} yield cubeHandle
case None => Empty
Expand Down
Loading

0 comments on commit bdc2828

Please sign in to comment.