Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose Ls via grpc to enable demo building #108

Merged
merged 1 commit into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions protos/src/main/proto/filesystem.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ option go_package = "github.com/typestreamio/typestream/cli/pkg/filesystem_servi
service FileSystemService {
rpc Mount(MountRequest) returns (MountResponse) {}
rpc Unmount(UnmountRequest) returns (UnmountResponse) {}
rpc Ls(LsRequest) returns (LsResponse) {}
}

message MountRequest {
Expand All @@ -31,4 +32,14 @@ message UnmountResponse {
string error = 3;
}

message LsRequest {
string user_id = 1;
string path = 2;
}

message LsResponse {
repeated string files = 1;
string error = 2;
}


15 changes: 8 additions & 7 deletions server/src/main/kotlin/io/typestream/filesystem/FileSystem.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.typestream.compiler.ast.Grep
import io.typestream.compiler.ast.Join
import io.typestream.compiler.ast.Pipeline
import io.typestream.compiler.ast.Wc
import io.typestream.compiler.types.DataStream
import io.typestream.compiler.types.Encoding
import io.typestream.config.Config
import io.typestream.config.MountsConfig
Expand All @@ -38,16 +39,16 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher
private val jobs = mutableListOf<Job>()

companion object {
const val KAFKA_CLUSTERS_PREFIX = "/dev/kafka"
const val KAFKA_CLUSTERS_PREFIX: String = "/dev/kafka"
}

init {
config.sources.kafka.forEach { (name, config) ->
logger.info { "starting filesystem for kafka cluster: $name" }
kafkaDir.add(KafkaClusterDirectory(name, config, dispatcher))
}
config.mounts.random.values.forEach { config ->
randomDir.add(Random(config.endpoint.substringAfterLast("/"), config.valueType))
config.mounts.random.values.forEach { (valueType, endpoint) ->
randomDir.add(Random(endpoint.substringAfterLast("/"), valueType))
}
devDir.add(kafkaDir)
mntDir.add(randomDir)
Expand All @@ -60,7 +61,7 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher
return children.map { it.name }.sorted()
}

suspend fun watch() = supervisorScope {
suspend fun watch(): Boolean = supervisorScope {
jobs.add(launch(dispatcher) { root.watch() })
jobs.add(launch(dispatcher) { catalog.watch() })
}
Expand Down Expand Up @@ -95,7 +96,7 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher
}
}

fun stat(path: String) = root.findInode(path)?.stat() ?: error("cannot find $path")
fun stat(path: String): String = root.findInode(path)?.stat() ?: error("cannot find $path")

fun file(path: String, pwd: String): String {
val targetPath = if (path.startsWith("/")) path else "$pwd/$path"
Expand All @@ -118,7 +119,7 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher
return targetNode is Directory
}

fun findDataStream(path: String) = catalog[path]?.dataStream
fun findDataStream(path: String): DataStream? = catalog[path]?.dataStream

private fun findEncoding(path: String) = catalog[path]?.encoding

Expand Down Expand Up @@ -158,7 +159,7 @@ class FileSystem(val config: Config, private val dispatcher: CoroutineDispatcher
jobs.forEach(Job::cancel)
}

fun completePath(incompletePath: String, pwd: String) = buildList {
fun completePath(incompletePath: String, pwd: String): List<String> = buildList {
val isAbsolute = incompletePath.startsWith("/")
val targetPath = if (incompletePath.startsWith("/")) {
incompletePath
Expand Down
10 changes: 8 additions & 2 deletions server/src/main/kotlin/io/typestream/server/FileSystemService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,28 @@ package io.typestream.server

import io.typestream.compiler.vm.Vm
import io.typestream.grpc.filesystem_service.FileSystemServiceGrpcKt
import io.typestream.grpc.filesystem_service.Filesystem
import io.typestream.grpc.filesystem_service.Filesystem.MountRequest
import io.typestream.grpc.filesystem_service.Filesystem.UnmountRequest
import io.typestream.grpc.filesystem_service.lsResponse
import io.typestream.grpc.filesystem_service.mountResponse
import io.typestream.grpc.filesystem_service.unmountResponse

class FileSystemService(private val vm: Vm) :
FileSystemServiceGrpcKt.FileSystemServiceCoroutineImplBase() {

override suspend fun mount(request: MountRequest) = mountResponse {
override suspend fun mount(request: MountRequest): Filesystem.MountResponse = mountResponse {
vm.fileSystem.mount(request.config)
success = true
}

override suspend fun unmount(request: UnmountRequest) = unmountResponse {
override suspend fun unmount(request: UnmountRequest): Filesystem.UnmountResponse = unmountResponse {
vm.fileSystem.unmount(request.endpoint)

success = true
}

override suspend fun ls(request: Filesystem.LsRequest): Filesystem.LsResponse = lsResponse {
vm.fileSystem.ls(request.path).forEach { files += it }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,32 @@ class InteractiveSessionService(private val config: Config, private val vm: Vm)

private val sessions = Collections.synchronizedMap(mutableMapOf<String, Session>())

override suspend fun startSession(request: InteractiveSession.StartSessionRequest) = startSessionResponse {
val sessionId = UUID.randomUUID().toString()
[email protected][sessionId] = Session(vm.fileSystem, vm.scheduler, Env(config))
this.sessionId = sessionId
}
override suspend fun startSession(request: InteractiveSession.StartSessionRequest): InteractiveSession.StartSessionResponse =
startSessionResponse {
val sessionId = UUID.randomUUID().toString()
[email protected][sessionId] = Session(vm.fileSystem, vm.scheduler, Env(config))
this.sessionId = sessionId
}

override suspend fun runProgram(request: RunProgramRequest) = runProgramResponse {
val session = [email protected][request.sessionId]
requireNotNull(session) { "session ${request.sessionId} not found" }
override suspend fun runProgram(request: RunProgramRequest): InteractiveSession.RunProgramResponse =
runProgramResponse {
val session = [email protected][request.sessionId]
requireNotNull(session) { "session ${request.sessionId} not found" }

val vmResult = vm.run(request.source, session)
if (vmResult.programOutput.stdErr.isBlank()) {
session.env.addHistoryEntry(request.source)
}
val vmResult = vm.run(request.source, session)
if (vmResult.programOutput.stdErr.isBlank()) {
session.env.addHistoryEntry(request.source)
}

id = vmResult.program.id
hasMoreOutput = vmResult.program.hasMoreOutput()
this.env["PWD"] = session.env.pwd
id = vmResult.program.id
hasMoreOutput = vmResult.program.hasMoreOutput()
this.env["PWD"] = session.env.pwd

this.stdOut = vmResult.programOutput.stdOut
this.stdErr = vmResult.programOutput.stdErr
}
this.stdOut = vmResult.programOutput.stdOut
this.stdErr = vmResult.programOutput.stdErr
}

override suspend fun completeProgram(request: CompleteProgramRequest) = completeProgramResponse {
override suspend fun completeProgram(request: CompleteProgramRequest): InteractiveSession.CompleteProgramResponse = completeProgramResponse {
val session = [email protected][request.sessionId]
requireNotNull(session) { "session ${request.sessionId} not found" }

Expand All @@ -68,11 +70,11 @@ class InteractiveSessionService(private val config: Config, private val vm: Vm)
return vm.scheduler.jobOutput(request.id).map { getProgramOutputResponse { stdOut = it } }
}

override suspend fun stopSession(request: StopSessionRequest) = stopSessionResponse {
override suspend fun stopSession(request: StopSessionRequest): InteractiveSession.StopSessionResponse = stopSessionResponse {
val output = StringBuilder()
[email protected][request.sessionId]?.runningPrograms?.forEach { program ->
vm.scheduler.kill(program.id)
output.appendLine("killed ${program.id}")
[email protected][request.sessionId]?.runningPrograms?.forEach { (id, _) ->
vm.scheduler.kill(id)
output.appendLine("killed $id")
}
[email protected](request.sessionId)
this.stdOut = output.toString()
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/kotlin/io/typestream/server/JobService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.typestream.compiler.vm.Env
import io.typestream.compiler.vm.Session
import io.typestream.compiler.vm.Vm
import io.typestream.config.Config
import io.typestream.grpc.job_service.Job
import io.typestream.grpc.job_service.Job.CreateJobRequest
import io.typestream.grpc.job_service.JobServiceGrpcKt
import io.typestream.grpc.job_service.createJobResponse
Expand All @@ -14,7 +15,7 @@ import java.util.UUID
class JobService(private val config: Config, private val vm: Vm) :
JobServiceGrpcKt.JobServiceCoroutineImplBase() {

override suspend fun createJob(request: CreateJobRequest) = createJobResponse {
override suspend fun createJob(request: CreateJobRequest): Job.CreateJobResponse = createJobResponse {
//TODO we may want to generate uuids from the source code fingerprint
// as it is, we'll have running apps on runtime (e.g. kafka streams) with a different id than the
// job id that we create here
Expand Down
64 changes: 64 additions & 0 deletions server/src/test/kotlin/io/typestream/grpc/FileSystemServiceTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.typestream.grpc

import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder
import io.grpc.testing.GrpcCleanupRule
import io.typestream.Server
import io.typestream.config.testing.testConfig
import io.typestream.grpc.filesystem_service.FileSystemServiceGrpc
import io.typestream.grpc.filesystem_service.lsRequest
import io.typestream.testing.TestKafka
import io.typestream.testing.until
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.Rule
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers

@Testcontainers
internal class FileSystemServiceTest {
private val dispatcher = Dispatchers.IO

private lateinit var app: Server

@get:Rule
val grpcCleanupRule: GrpcCleanupRule = GrpcCleanupRule()

@Container
private val testKafka = TestKafka()

@BeforeEach
fun beforeEach() {
app = Server(testConfig(testKafka), dispatcher)
}

@Test
fun `returns filesystem paths`(): Unit = runBlocking {
app.use {
val serverName = InProcessServerBuilder.generateName()
launch(dispatcher) {
app.run(InProcessServerBuilder.forName(serverName).directExecutor())
}

until { requireNotNull(app.server) }

grpcCleanupRule.register(app.server ?: return@use)

val stub = FileSystemServiceGrpc.newBlockingStub(
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())
)

val path = lsRequest { path = "/" }

assertThat(stub.ls(path))
.extracting("filesList")
.isEqualTo(listOf("dev", "mnt"))

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ internal class InteractiveSessionServiceTest {

until { requireNotNull(app.server) }

grpcCleanupRule.register(app.server!!)
grpcCleanupRule.register(app.server ?: return@use)

val stub = InteractiveSessionServiceGrpc.newBlockingStub(
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())
Expand Down Expand Up @@ -117,7 +117,7 @@ internal class InteractiveSessionServiceTest {

until { requireNotNull(app.server) }

grpcCleanupRule.register(app.server!!)
grpcCleanupRule.register(app.server ?: return@use)

val stub = InteractiveSessionServiceGrpc.newBlockingStub(
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())
Expand All @@ -144,7 +144,7 @@ internal class InteractiveSessionServiceTest {

until { requireNotNull(app.server) }

grpcCleanupRule.register(app.server!!)
grpcCleanupRule.register(app.server ?: return@use)

val stub = InteractiveSessionServiceGrpc.newBlockingStub(
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())
Expand Down Expand Up @@ -186,7 +186,7 @@ internal class InteractiveSessionServiceTest {

until { requireNotNull(app.server) }

grpcCleanupRule.register(app.server!!)
grpcCleanupRule.register(app.server ?: return@use)

val stub = InteractiveSessionServiceGrpc.newBlockingStub(
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())
Expand Down Expand Up @@ -228,7 +228,7 @@ internal class InteractiveSessionServiceTest {

until { requireNotNull(app.server) }

grpcCleanupRule.register(app.server!!)
grpcCleanupRule.register(app.server ?: return@use)

val stub = InteractiveSessionServiceGrpc.newBlockingStub(
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())
Expand Down Expand Up @@ -271,7 +271,7 @@ internal class InteractiveSessionServiceTest {

until { requireNotNull(app.server) }

grpcCleanupRule.register(app.server!!)
grpcCleanupRule.register(app.server ?: return@use)

val stub = InteractiveSessionServiceGrpc.newBlockingStub(
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())
Expand Down
Loading