diff --git a/protos/src/main/proto/filesystem.proto b/protos/src/main/proto/filesystem.proto index c504068..e688f1d 100644 --- a/protos/src/main/proto/filesystem.proto +++ b/protos/src/main/proto/filesystem.proto @@ -9,6 +9,7 @@ option go_package = "github.com/typestreamio/typestream/cli/pkg/filesystem_servi service FileSystemService { rpc Mount(MountRequest) returns (MountResponse) {} rpc Unmount(UnmountRequest) returns (UnmountResponse) {} + rpc Ls(LsRequest) returns (LsResponse) {} } message MountRequest { @@ -31,4 +32,14 @@ message UnmountResponse { string error = 3; } +message LsRequest { + string user_id = 1; + string path = 2; +} + +message LsResponse { + repeated string files = 1; + string error = 2; +} + diff --git a/server/src/main/kotlin/io/typestream/filesystem/FileSystem.kt b/server/src/main/kotlin/io/typestream/filesystem/FileSystem.kt index ac0a196..a6ac276 100644 --- a/server/src/main/kotlin/io/typestream/filesystem/FileSystem.kt +++ b/server/src/main/kotlin/io/typestream/filesystem/FileSystem.kt @@ -12,6 +12,7 @@ import io.typestream.compiler.ast.Grep import io.typestream.compiler.ast.Join import io.typestream.compiler.ast.Pipeline import io.typestream.compiler.ast.Wc +import io.typestream.compiler.types.DataStream import io.typestream.compiler.types.Encoding import io.typestream.config.Config import io.typestream.config.MountsConfig @@ -38,7 +39,7 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher private val jobs = mutableListOf() companion object { - const val KAFKA_CLUSTERS_PREFIX = "/dev/kafka" + const val KAFKA_CLUSTERS_PREFIX: String = "/dev/kafka" } init { @@ -46,8 +47,8 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher logger.info { "starting filesystem for kafka cluster: $name" } kafkaDir.add(KafkaClusterDirectory(name, config, dispatcher)) } - config.mounts.random.values.forEach { config -> - randomDir.add(Random(config.endpoint.substringAfterLast("/"), config.valueType)) + config.mounts.random.values.forEach { (valueType, endpoint) -> + randomDir.add(Random(endpoint.substringAfterLast("/"), valueType)) } devDir.add(kafkaDir) mntDir.add(randomDir) @@ -60,7 +61,7 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher return children.map { it.name }.sorted() } - suspend fun watch() = supervisorScope { + suspend fun watch(): Boolean = supervisorScope { jobs.add(launch(dispatcher) { root.watch() }) jobs.add(launch(dispatcher) { catalog.watch() }) } @@ -95,7 +96,7 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher } } - fun stat(path: String) = root.findInode(path)?.stat() ?: error("cannot find $path") + fun stat(path: String): String = root.findInode(path)?.stat() ?: error("cannot find $path") fun file(path: String, pwd: String): String { val targetPath = if (path.startsWith("/")) path else "$pwd/$path" @@ -118,7 +119,7 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher return targetNode is Directory } - fun findDataStream(path: String) = catalog[path]?.dataStream + fun findDataStream(path: String): DataStream? = catalog[path]?.dataStream private fun findEncoding(path: String) = catalog[path]?.encoding @@ -158,7 +159,7 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher jobs.forEach(Job::cancel) } - fun completePath(incompletePath: String, pwd: String) = buildList { + fun completePath(incompletePath: String, pwd: String): List = buildList { val isAbsolute = incompletePath.startsWith("/") val targetPath = if (incompletePath.startsWith("/")) { incompletePath diff --git a/server/src/main/kotlin/io/typestream/server/FileSystemService.kt b/server/src/main/kotlin/io/typestream/server/FileSystemService.kt index 6c04ca0..b0a1f86 100644 --- a/server/src/main/kotlin/io/typestream/server/FileSystemService.kt +++ b/server/src/main/kotlin/io/typestream/server/FileSystemService.kt @@ -2,22 +2,28 @@ package io.typestream.server import io.typestream.compiler.vm.Vm import io.typestream.grpc.filesystem_service.FileSystemServiceGrpcKt +import io.typestream.grpc.filesystem_service.Filesystem import io.typestream.grpc.filesystem_service.Filesystem.MountRequest import io.typestream.grpc.filesystem_service.Filesystem.UnmountRequest +import io.typestream.grpc.filesystem_service.lsResponse import io.typestream.grpc.filesystem_service.mountResponse import io.typestream.grpc.filesystem_service.unmountResponse class FileSystemService(private val vm: Vm) : FileSystemServiceGrpcKt.FileSystemServiceCoroutineImplBase() { - override suspend fun mount(request: MountRequest) = mountResponse { + override suspend fun mount(request: MountRequest): Filesystem.MountResponse = mountResponse { vm.fileSystem.mount(request.config) success = true } - override suspend fun unmount(request: UnmountRequest) = unmountResponse { + override suspend fun unmount(request: UnmountRequest): Filesystem.UnmountResponse = unmountResponse { vm.fileSystem.unmount(request.endpoint) success = true } + + override suspend fun ls(request: Filesystem.LsRequest): Filesystem.LsResponse = lsResponse { + vm.fileSystem.ls(request.path).forEach { files += it } + } } diff --git a/server/src/main/kotlin/io/typestream/server/InteractiveSessionService.kt b/server/src/main/kotlin/io/typestream/server/InteractiveSessionService.kt index 496332e..6330611 100644 --- a/server/src/main/kotlin/io/typestream/server/InteractiveSessionService.kt +++ b/server/src/main/kotlin/io/typestream/server/InteractiveSessionService.kt @@ -28,30 +28,32 @@ class InteractiveSessionService(private val config: Config, private val vm: Vm) private val sessions = Collections.synchronizedMap(mutableMapOf()) - override suspend fun startSession(request: InteractiveSession.StartSessionRequest) = startSessionResponse { - val sessionId = UUID.randomUUID().toString() - this@InteractiveSessionService.sessions[sessionId] = Session(vm.fileSystem, vm.scheduler, Env(config)) - this.sessionId = sessionId - } + override suspend fun startSession(request: InteractiveSession.StartSessionRequest): InteractiveSession.StartSessionResponse = + startSessionResponse { + val sessionId = UUID.randomUUID().toString() + this@InteractiveSessionService.sessions[sessionId] = Session(vm.fileSystem, vm.scheduler, Env(config)) + this.sessionId = sessionId + } - override suspend fun runProgram(request: RunProgramRequest) = runProgramResponse { - val session = this@InteractiveSessionService.sessions[request.sessionId] - requireNotNull(session) { "session ${request.sessionId} not found" } + override suspend fun runProgram(request: RunProgramRequest): InteractiveSession.RunProgramResponse = + runProgramResponse { + val session = this@InteractiveSessionService.sessions[request.sessionId] + requireNotNull(session) { "session ${request.sessionId} not found" } - val vmResult = vm.run(request.source, session) - if (vmResult.programOutput.stdErr.isBlank()) { - session.env.addHistoryEntry(request.source) - } + val vmResult = vm.run(request.source, session) + if (vmResult.programOutput.stdErr.isBlank()) { + session.env.addHistoryEntry(request.source) + } - id = vmResult.program.id - hasMoreOutput = vmResult.program.hasMoreOutput() - this.env["PWD"] = session.env.pwd + id = vmResult.program.id + hasMoreOutput = vmResult.program.hasMoreOutput() + this.env["PWD"] = session.env.pwd - this.stdOut = vmResult.programOutput.stdOut - this.stdErr = vmResult.programOutput.stdErr - } + this.stdOut = vmResult.programOutput.stdOut + this.stdErr = vmResult.programOutput.stdErr + } - override suspend fun completeProgram(request: CompleteProgramRequest) = completeProgramResponse { + override suspend fun completeProgram(request: CompleteProgramRequest): InteractiveSession.CompleteProgramResponse = completeProgramResponse { val session = this@InteractiveSessionService.sessions[request.sessionId] requireNotNull(session) { "session ${request.sessionId} not found" } @@ -68,11 +70,11 @@ class InteractiveSessionService(private val config: Config, private val vm: Vm) return vm.scheduler.jobOutput(request.id).map { getProgramOutputResponse { stdOut = it } } } - override suspend fun stopSession(request: StopSessionRequest) = stopSessionResponse { + override suspend fun stopSession(request: StopSessionRequest): InteractiveSession.StopSessionResponse = stopSessionResponse { val output = StringBuilder() - this@InteractiveSessionService.sessions[request.sessionId]?.runningPrograms?.forEach { program -> - vm.scheduler.kill(program.id) - output.appendLine("killed ${program.id}") + this@InteractiveSessionService.sessions[request.sessionId]?.runningPrograms?.forEach { (id, _) -> + vm.scheduler.kill(id) + output.appendLine("killed $id") } this@InteractiveSessionService.sessions.remove(request.sessionId) this.stdOut = output.toString() diff --git a/server/src/main/kotlin/io/typestream/server/JobService.kt b/server/src/main/kotlin/io/typestream/server/JobService.kt index 4289b17..76458de 100644 --- a/server/src/main/kotlin/io/typestream/server/JobService.kt +++ b/server/src/main/kotlin/io/typestream/server/JobService.kt @@ -5,6 +5,7 @@ import io.typestream.compiler.vm.Env import io.typestream.compiler.vm.Session import io.typestream.compiler.vm.Vm import io.typestream.config.Config +import io.typestream.grpc.job_service.Job import io.typestream.grpc.job_service.Job.CreateJobRequest import io.typestream.grpc.job_service.JobServiceGrpcKt import io.typestream.grpc.job_service.createJobResponse @@ -14,7 +15,7 @@ import java.util.UUID class JobService(private val config: Config, private val vm: Vm) : JobServiceGrpcKt.JobServiceCoroutineImplBase() { - override suspend fun createJob(request: CreateJobRequest) = createJobResponse { + override suspend fun createJob(request: CreateJobRequest): Job.CreateJobResponse = createJobResponse { //TODO we may want to generate uuids from the source code fingerprint // as it is, we'll have running apps on runtime (e.g. kafka streams) with a different id than the // job id that we create here diff --git a/server/src/test/kotlin/io/typestream/grpc/FileSystemServiceTest.kt b/server/src/test/kotlin/io/typestream/grpc/FileSystemServiceTest.kt new file mode 100644 index 0000000..bacb28c --- /dev/null +++ b/server/src/test/kotlin/io/typestream/grpc/FileSystemServiceTest.kt @@ -0,0 +1,64 @@ +package io.typestream.grpc + +import io.grpc.inprocess.InProcessChannelBuilder +import io.grpc.inprocess.InProcessServerBuilder +import io.grpc.testing.GrpcCleanupRule +import io.typestream.Server +import io.typestream.config.testing.testConfig +import io.typestream.grpc.filesystem_service.FileSystemServiceGrpc +import io.typestream.grpc.filesystem_service.lsRequest +import io.typestream.testing.TestKafka +import io.typestream.testing.until +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.junit.Rule +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.testcontainers.junit.jupiter.Container +import org.testcontainers.junit.jupiter.Testcontainers + +@Testcontainers +internal class FileSystemServiceTest { + private val dispatcher = Dispatchers.IO + + private lateinit var app: Server + + @get:Rule + val grpcCleanupRule: GrpcCleanupRule = GrpcCleanupRule() + + @Container + private val testKafka = TestKafka() + + @BeforeEach + fun beforeEach() { + app = Server(testConfig(testKafka), dispatcher) + } + + @Test + fun `returns filesystem paths`(): Unit = runBlocking { + app.use { + val serverName = InProcessServerBuilder.generateName() + launch(dispatcher) { + app.run(InProcessServerBuilder.forName(serverName).directExecutor()) + } + + until { requireNotNull(app.server) } + + grpcCleanupRule.register(app.server ?: return@use) + + val stub = FileSystemServiceGrpc.newBlockingStub( + grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()) + ) + + val path = lsRequest { path = "/" } + + assertThat(stub.ls(path)) + .extracting("filesList") + .isEqualTo(listOf("dev", "mnt")) + + } + } + +} diff --git a/server/src/test/kotlin/io/typestream/grpc/InteractiveSessionServiceTest.kt b/server/src/test/kotlin/io/typestream/grpc/InteractiveSessionServiceTest.kt index 42805c0..aede37b 100644 --- a/server/src/test/kotlin/io/typestream/grpc/InteractiveSessionServiceTest.kt +++ b/server/src/test/kotlin/io/typestream/grpc/InteractiveSessionServiceTest.kt @@ -56,7 +56,7 @@ internal class InteractiveSessionServiceTest { until { requireNotNull(app.server) } - grpcCleanupRule.register(app.server!!) + grpcCleanupRule.register(app.server ?: return@use) val stub = InteractiveSessionServiceGrpc.newBlockingStub( grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()) @@ -117,7 +117,7 @@ internal class InteractiveSessionServiceTest { until { requireNotNull(app.server) } - grpcCleanupRule.register(app.server!!) + grpcCleanupRule.register(app.server ?: return@use) val stub = InteractiveSessionServiceGrpc.newBlockingStub( grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()) @@ -144,7 +144,7 @@ internal class InteractiveSessionServiceTest { until { requireNotNull(app.server) } - grpcCleanupRule.register(app.server!!) + grpcCleanupRule.register(app.server ?: return@use) val stub = InteractiveSessionServiceGrpc.newBlockingStub( grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()) @@ -186,7 +186,7 @@ internal class InteractiveSessionServiceTest { until { requireNotNull(app.server) } - grpcCleanupRule.register(app.server!!) + grpcCleanupRule.register(app.server ?: return@use) val stub = InteractiveSessionServiceGrpc.newBlockingStub( grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()) @@ -228,7 +228,7 @@ internal class InteractiveSessionServiceTest { until { requireNotNull(app.server) } - grpcCleanupRule.register(app.server!!) + grpcCleanupRule.register(app.server ?: return@use) val stub = InteractiveSessionServiceGrpc.newBlockingStub( grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()) @@ -271,7 +271,7 @@ internal class InteractiveSessionServiceTest { until { requireNotNull(app.server) } - grpcCleanupRule.register(app.server!!) + grpcCleanupRule.register(app.server ?: return@use) val stub = InteractiveSessionServiceGrpc.newBlockingStub( grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())