Skip to content

Commit

Permalink
feat: expose Ls via grpc to enable demo building
Browse files Browse the repository at this point in the history
  • Loading branch information
lucapette committed Jan 25, 2025
1 parent 8aafa8f commit dfdd153
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 39 deletions.
11 changes: 11 additions & 0 deletions protos/src/main/proto/filesystem.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ option go_package = "github.com/typestreamio/typestream/cli/pkg/filesystem_servi
service FileSystemService {
rpc Mount(MountRequest) returns (MountResponse) {}
rpc Unmount(UnmountRequest) returns (UnmountResponse) {}
rpc Ls(LsRequest) returns (LsResponse) {}
}

message MountRequest {
Expand All @@ -31,4 +32,14 @@ message UnmountResponse {
string error = 3;
}

message LsRequest {
string user_id = 1;
string path = 2;
}

message LsResponse {
repeated string files = 1;
string error = 2;
}


15 changes: 8 additions & 7 deletions server/src/main/kotlin/io/typestream/filesystem/FileSystem.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.typestream.compiler.ast.Grep
import io.typestream.compiler.ast.Join
import io.typestream.compiler.ast.Pipeline
import io.typestream.compiler.ast.Wc
import io.typestream.compiler.types.DataStream
import io.typestream.compiler.types.Encoding
import io.typestream.config.Config
import io.typestream.config.MountsConfig
Expand All @@ -38,16 +39,16 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher
private val jobs = mutableListOf<Job>()

companion object {
const val KAFKA_CLUSTERS_PREFIX = "/dev/kafka"
const val KAFKA_CLUSTERS_PREFIX: String = "/dev/kafka"
}

init {
config.sources.kafka.forEach { (name, config) ->
logger.info { "starting filesystem for kafka cluster: $name" }
kafkaDir.add(KafkaClusterDirectory(name, config, dispatcher))
}
config.mounts.random.values.forEach { config ->
randomDir.add(Random(config.endpoint.substringAfterLast("/"), config.valueType))
config.mounts.random.values.forEach { (valueType, endpoint) ->
randomDir.add(Random(endpoint.substringAfterLast("/"), valueType))
}
devDir.add(kafkaDir)
mntDir.add(randomDir)
Expand All @@ -60,7 +61,7 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher
return children.map { it.name }.sorted()
}

suspend fun watch() = supervisorScope {
suspend fun watch(): Boolean = supervisorScope {
jobs.add(launch(dispatcher) { root.watch() })
jobs.add(launch(dispatcher) { catalog.watch() })
}
Expand Down Expand Up @@ -95,7 +96,7 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher
}
}

fun stat(path: String) = root.findInode(path)?.stat() ?: error("cannot find $path")
fun stat(path: String): String = root.findInode(path)?.stat() ?: error("cannot find $path")

fun file(path: String, pwd: String): String {
val targetPath = if (path.startsWith("/")) path else "$pwd/$path"
Expand All @@ -118,7 +119,7 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher
return targetNode is Directory
}

fun findDataStream(path: String) = catalog[path]?.dataStream
fun findDataStream(path: String): DataStream? = catalog[path]?.dataStream

private fun findEncoding(path: String) = catalog[path]?.encoding

Expand Down Expand Up @@ -158,7 +159,7 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher
jobs.forEach(Job::cancel)
}

fun completePath(incompletePath: String, pwd: String) = buildList {
fun completePath(incompletePath: String, pwd: String): List<String> = buildList {
val isAbsolute = incompletePath.startsWith("/")
val targetPath = if (incompletePath.startsWith("/")) {
incompletePath
Expand Down
10 changes: 8 additions & 2 deletions server/src/main/kotlin/io/typestream/server/FileSystemService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,28 @@ package io.typestream.server

import io.typestream.compiler.vm.Vm
import io.typestream.grpc.filesystem_service.FileSystemServiceGrpcKt
import io.typestream.grpc.filesystem_service.Filesystem
import io.typestream.grpc.filesystem_service.Filesystem.MountRequest
import io.typestream.grpc.filesystem_service.Filesystem.UnmountRequest
import io.typestream.grpc.filesystem_service.lsResponse
import io.typestream.grpc.filesystem_service.mountResponse
import io.typestream.grpc.filesystem_service.unmountResponse

class FileSystemService(private val vm: Vm) :
FileSystemServiceGrpcKt.FileSystemServiceCoroutineImplBase() {

override suspend fun mount(request: MountRequest) = mountResponse {
override suspend fun mount(request: MountRequest): Filesystem.MountResponse = mountResponse {
vm.fileSystem.mount(request.config)
success = true
}

override suspend fun unmount(request: UnmountRequest) = unmountResponse {
override suspend fun unmount(request: UnmountRequest): Filesystem.UnmountResponse = unmountResponse {
vm.fileSystem.unmount(request.endpoint)

success = true
}

override suspend fun ls(request: Filesystem.LsRequest): Filesystem.LsResponse = lsResponse {
vm.fileSystem.ls(request.path).forEach { files += it }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,32 @@ class InteractiveSessionService(private val config: Config, private val vm: Vm)

private val sessions = Collections.synchronizedMap(mutableMapOf<String, Session>())

override suspend fun startSession(request: InteractiveSession.StartSessionRequest) = startSessionResponse {
val sessionId = UUID.randomUUID().toString()
this@InteractiveSessionService.sessions[sessionId] = Session(vm.fileSystem, vm.scheduler, Env(config))
this.sessionId = sessionId
}
override suspend fun startSession(request: InteractiveSession.StartSessionRequest): InteractiveSession.StartSessionResponse =
startSessionResponse {
val sessionId = UUID.randomUUID().toString()
this@InteractiveSessionService.sessions[sessionId] = Session(vm.fileSystem, vm.scheduler, Env(config))
this.sessionId = sessionId
}

override suspend fun runProgram(request: RunProgramRequest) = runProgramResponse {
val session = this@InteractiveSessionService.sessions[request.sessionId]
requireNotNull(session) { "session ${request.sessionId} not found" }
override suspend fun runProgram(request: RunProgramRequest): InteractiveSession.RunProgramResponse =
runProgramResponse {
val session = this@InteractiveSessionService.sessions[request.sessionId]
requireNotNull(session) { "session ${request.sessionId} not found" }

val vmResult = vm.run(request.source, session)
if (vmResult.programOutput.stdErr.isBlank()) {
session.env.addHistoryEntry(request.source)
}
val vmResult = vm.run(request.source, session)
if (vmResult.programOutput.stdErr.isBlank()) {
session.env.addHistoryEntry(request.source)
}

id = vmResult.program.id
hasMoreOutput = vmResult.program.hasMoreOutput()
this.env["PWD"] = session.env.pwd
id = vmResult.program.id
hasMoreOutput = vmResult.program.hasMoreOutput()
this.env["PWD"] = session.env.pwd

this.stdOut = vmResult.programOutput.stdOut
this.stdErr = vmResult.programOutput.stdErr
}
this.stdOut = vmResult.programOutput.stdOut
this.stdErr = vmResult.programOutput.stdErr
}

override suspend fun completeProgram(request: CompleteProgramRequest) = completeProgramResponse {
override suspend fun completeProgram(request: CompleteProgramRequest): InteractiveSession.CompleteProgramResponse = completeProgramResponse {
val session = this@InteractiveSessionService.sessions[request.sessionId]
requireNotNull(session) { "session ${request.sessionId} not found" }

Expand All @@ -68,11 +70,11 @@ class InteractiveSessionService(private val config: Config, private val vm: Vm)
return vm.scheduler.jobOutput(request.id).map { getProgramOutputResponse { stdOut = it } }
}

override suspend fun stopSession(request: StopSessionRequest) = stopSessionResponse {
override suspend fun stopSession(request: StopSessionRequest): InteractiveSession.StopSessionResponse = stopSessionResponse {
val output = StringBuilder()
this@InteractiveSessionService.sessions[request.sessionId]?.runningPrograms?.forEach { program ->
vm.scheduler.kill(program.id)
output.appendLine("killed ${program.id}")
this@InteractiveSessionService.sessions[request.sessionId]?.runningPrograms?.forEach { (id, _) ->
vm.scheduler.kill(id)
output.appendLine("killed $id")
}
this@InteractiveSessionService.sessions.remove(request.sessionId)
this.stdOut = output.toString()
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/kotlin/io/typestream/server/JobService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.typestream.compiler.vm.Env
import io.typestream.compiler.vm.Session
import io.typestream.compiler.vm.Vm
import io.typestream.config.Config
import io.typestream.grpc.job_service.Job
import io.typestream.grpc.job_service.Job.CreateJobRequest
import io.typestream.grpc.job_service.JobServiceGrpcKt
import io.typestream.grpc.job_service.createJobResponse
Expand All @@ -14,7 +15,7 @@ import java.util.UUID
class JobService(private val config: Config, private val vm: Vm) :
JobServiceGrpcKt.JobServiceCoroutineImplBase() {

override suspend fun createJob(request: CreateJobRequest) = createJobResponse {
override suspend fun createJob(request: CreateJobRequest): Job.CreateJobResponse = createJobResponse {
//TODO we may want to generate uuids from the source code fingerprint
// as it is, we'll have running apps on runtime (e.g. kafka streams) with a different id than the
// job id that we create here
Expand Down
64 changes: 64 additions & 0 deletions server/src/test/kotlin/io/typestream/grpc/FileSystemServiceTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.typestream.grpc

import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder
import io.grpc.testing.GrpcCleanupRule
import io.typestream.Server
import io.typestream.config.testing.testConfig
import io.typestream.grpc.filesystem_service.FileSystemServiceGrpc
import io.typestream.grpc.filesystem_service.lsRequest
import io.typestream.testing.TestKafka
import io.typestream.testing.until
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.Rule
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers

@Testcontainers
internal class FileSystemServiceTest {
private val dispatcher = Dispatchers.IO

private lateinit var app: Server

@get:Rule
val grpcCleanupRule: GrpcCleanupRule = GrpcCleanupRule()

@Container
private val testKafka = TestKafka()

@BeforeEach
fun beforeEach() {
app = Server(testConfig(testKafka), dispatcher)
}

@Test
fun `returns filesystem paths`(): Unit = runBlocking {
app.use {
val serverName = InProcessServerBuilder.generateName()
launch(dispatcher) {
app.run(InProcessServerBuilder.forName(serverName).directExecutor())
}

until { requireNotNull(app.server) }

grpcCleanupRule.register(app.server ?: return@use)

val stub = FileSystemServiceGrpc.newBlockingStub(
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())
)

val path = lsRequest { path = "/" }

assertThat(stub.ls(path))
.extracting("filesList")
.isEqualTo(listOf("dev", "mnt"))

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ internal class InteractiveSessionServiceTest {

until { requireNotNull(app.server) }

grpcCleanupRule.register(app.server!!)
grpcCleanupRule.register(app.server ?: return@use)

val stub = InteractiveSessionServiceGrpc.newBlockingStub(
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())
Expand Down Expand Up @@ -117,7 +117,7 @@ internal class InteractiveSessionServiceTest {

until { requireNotNull(app.server) }

grpcCleanupRule.register(app.server!!)
grpcCleanupRule.register(app.server ?: return@use)

val stub = InteractiveSessionServiceGrpc.newBlockingStub(
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())
Expand All @@ -144,7 +144,7 @@ internal class InteractiveSessionServiceTest {

until { requireNotNull(app.server) }

grpcCleanupRule.register(app.server!!)
grpcCleanupRule.register(app.server ?: return@use)

val stub = InteractiveSessionServiceGrpc.newBlockingStub(
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())
Expand Down Expand Up @@ -186,7 +186,7 @@ internal class InteractiveSessionServiceTest {

until { requireNotNull(app.server) }

grpcCleanupRule.register(app.server!!)
grpcCleanupRule.register(app.server ?: return@use)

val stub = InteractiveSessionServiceGrpc.newBlockingStub(
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())
Expand Down Expand Up @@ -228,7 +228,7 @@ internal class InteractiveSessionServiceTest {

until { requireNotNull(app.server) }

grpcCleanupRule.register(app.server!!)
grpcCleanupRule.register(app.server ?: return@use)

val stub = InteractiveSessionServiceGrpc.newBlockingStub(
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())
Expand Down Expand Up @@ -271,7 +271,7 @@ internal class InteractiveSessionServiceTest {

until { requireNotNull(app.server) }

grpcCleanupRule.register(app.server!!)
grpcCleanupRule.register(app.server ?: return@use)

val stub = InteractiveSessionServiceGrpc.newBlockingStub(
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())
Expand Down

0 comments on commit dfdd153

Please sign in to comment.