Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
samwillis committed Oct 7, 2024
1 parent 3d637d1 commit 9fbeae8
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 107 deletions.
1 change: 1 addition & 0 deletions cibuild/linkimports.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ _getenv
_interactive_one
_interactive_read
_interactive_write
_use_socketfile
_lowerstr
_main
_main_repl
Expand Down
213 changes: 110 additions & 103 deletions packages/pglite/src/pglite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,109 +299,8 @@ export class PGlite
}
mod.FS.registerDevice(devId, devOpt)
mod.FS.mkdev('/dev/blob', devId)
},
(mod: any) => {
console.log('registering SOCKET_FILE.IN device')
// Register SOCKET_FILE.IN device
const devId = mod.FS.makedev(63, 0)
const devOpt = {
open: (_stream: any) => {},
close: (_stream: any) => {},
read: (
_stream: any,
buffer: Uint8Array,
offset: number,
length: number,
position: number,
) => {
const buf = this.#queryInBuffer
if (!buf) {
throw new Error(`No ${SOCKET_FILE.IN} Buffer provided to read from`)
}
const contents = new Uint8Array(buf)
if (position >= contents.length) return 0
const size = Math.min(contents.length - position, length)
for (let i = 0; i < size; i++) {
buffer[offset + i] = contents[position + i]
}
return size
},
write: (
_stream: any,
_buffer: Uint8Array,
_offset: number,
_length: number,
_position: number,
) => {
throw new Error('Not implemented')
},
llseek: (stream: any, offset: number, whence: number) => {
const buf = this.#queryInBuffer
if (!buf) {
throw new Error(`No ${SOCKET_FILE.IN} Buffer provided to llseek`)
}
let position = offset
if (whence === 1) {
position += stream.position
} else if (whence === 2) {
position = new Uint8Array(buf).length
}
if (position < 0) {
throw new mod.FS.ErrnoError(28)
}
return position
},
}
console.log('registering SOCKET_FILE.IN device 2')
mod.FS.registerDevice(devId, devOpt)
console.log('registering SOCKET_FILE.IN device 3')
mod.FS.mkdir('/tmp/pglite/base/')
mod.FS.mkdev(SOCKET_FILE.IN, devId)
console.log('registering SOCKET_FILE.IN device 4')
// mod.FS.mkdev(SOCKET_FILE.OLOCK, devId)
console.log('registered SOCKET_FILE.IN device')
},
(mod: any) => {
console.log('registering SOCKET_FILE.OUT device')
// Register SOCKET_FILE.OUT device
const devId = mod.FS.makedev(62, 0)
const devOpt = {
open: (_stream: any) => {},
close: (_stream: any) => {},
read: (
_stream: any,
_buffer: Uint8Array,
_offset: number,
_length: number,
_position: number,
) => {
throw new Error('Not implemented')
},
write: (
_stream: any,
buffer: Uint8Array,
offset: number,
length: number,
_position: number,
) => {
this.#queryOutChunks ??= []
this.#queryOutChunks.push(buffer.slice(offset, offset + length))
return length
},
llseek: (_stream: any, _offset: number, _whence: number) => {
throw new Error('Not implemented')
},
}
console.log('registering SOCKET_FILE.OUT device 2')
mod.FS.registerDevice(devId, devOpt)
console.log('registering SOCKET_FILE.OUT device 3')
mod.FS.mkdir('/tmp/pglite/base/')
mod.FS.mkdev(SOCKET_FILE.OUT, devId)
console.log('registering SOCKET_FILE.OUT device 4')
// mod.FS.mkdev(SOCKET_FILE.OLOCK, devId)
console.log('registered SOCKET_FILE.OUT device')
},
],
}
]
}

emscriptenOpts = await this.fs!.emscriptenOpts(emscriptenOpts)
Expand Down Expand Up @@ -521,6 +420,9 @@ export class PGlite
// TODO: only sync here if initdb did init db.
await this.syncToFs()

// Setup the socket files for the query protocol IO
await this.setupSocketFiles()

this.#ready = true

// Set the search path to public for this connection
Expand All @@ -535,6 +437,111 @@ export class PGlite
}
}

async setupSocketFiles() {
const mod = this.mod!
// Register SOCKET_FILE.IN device
console.log('registering SOCKET_FILE.IN device')
const inDevId = mod.FS.makedev(63, 0)
const inDevOpt = {
open: (_stream: any) => {},
close: (_stream: any) => {},
read: (
_stream: any,
buffer: Uint8Array,
offset: number,
length: number,
position: number,
) => {
console.log('reading SOCKET_FILE.IN device')
const buf = this.#queryInBuffer
if (!buf) {
throw new Error(`No ${SOCKET_FILE.IN} Buffer provided to read from`)
}
const contents = new Uint8Array(buf)
if (position >= contents.length) return 0
const size = Math.min(contents.length - position, length)
for (let i = 0; i < size; i++) {
buffer[offset + i] = contents[position + i]
}
return size
},
write: (
_stream: any,
_buffer: Uint8Array,
_offset: number,
_length: number,
_position: number,
) => {
throw new Error('Not implemented')
},
llseek: (stream: any, offset: number, whence: number) => {
console.log('seeking SOCKET_FILE.IN device')
const buf = this.#queryInBuffer
if (!buf) {
throw new Error(`No ${SOCKET_FILE.IN} Buffer provided to llseek`)
}
let position = offset
if (whence === 1) {
position += stream.position
} else if (whence === 2) {
position = new Uint8Array(buf).length
}
if (position < 0) {
throw new mod.FS.ErrnoError(28)
}
return position
},
}
console.log('registering SOCKET_FILE.IN device 2')
mod.FS.registerDevice(inDevId, inDevOpt)
console.log('registering SOCKET_FILE.IN device 3')
mod.FS.mkdev(SOCKET_FILE.IN, inDevId)
console.log('registering SOCKET_FILE.IN device 4')
// mod.FS.mkdev(SOCKET_FILE.OLOCK, devId)
console.log('registered SOCKET_FILE.IN device')

// Register SOCKET_FILE.OUT devicex
console.log('registering SOCKET_FILE.OUT device')
const outDevId = mod.FS.makedev(62, 0)
const outDevOpt = {
open: (_stream: any) => {},
close: (_stream: any) => {},
read: (
_stream: any,
_buffer: Uint8Array,
_offset: number,
_length: number,
_position: number,
) => {
throw new Error('Not implemented')
},
write: (
_stream: any,
buffer: Uint8Array,
offset: number,
length: number,
_position: number,
) => {
console.log('writing SOCKET_FILE.OUT device')
this.#queryOutChunks ??= []
this.#queryOutChunks.push(buffer.slice(offset, offset + length))
return length
},
llseek: (_stream: any, _offset: number, _whence: number) => {
throw new Error('Not implemented')
},
}
console.log('registering SOCKET_FILE.OUT device 2')
mod.FS.registerDevice(outDevId, outDevOpt)
console.log('registering SOCKET_FILE.OUT device 3')
mod.FS.mkdev(SOCKET_FILE.OUT, outDevId)
console.log('registering SOCKET_FILE.OUT device 4')
// mod.FS.mkdev(SOCKET_FILE.OLOCK, devId)
console.log('registered SOCKET_FILE.OUT device')

mod._use_socketfile()
}

/**
* The Postgres Emscripten Module
*/
Expand Down
1 change: 1 addition & 0 deletions packages/pglite/src/postgresMod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export interface PostgresMod
_interactive_write: (msgLength: number) => void
_interactive_one: () => void
_interactive_read: () => number
_use_socketfile: () => void
}

type PostgresFactory<T extends PostgresMod = PostgresMod> = (
Expand Down
10 changes: 6 additions & 4 deletions patches/pg_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -1182,13 +1182,15 @@ extern void AsyncPostgresSingleUserMain(int single_argc, char *single_argv[], co

#include "../postgresql/src/bin/initdb/initdb.c"

void use_socketfile(void) {
is_repl = true;
is_node = true;
}
#undef PG_INITDB_MAIN
#undef PG_MAIN
#endif // __wasi__

EMSCRIPTEN_KEEPALIVE void use_socketfile(void) {
is_repl = true;
is_node = true;
}

EMSCRIPTEN_KEEPALIVE int main_repl();
EMSCRIPTEN_KEEPALIVE int
main_repl() {
Expand Down

0 comments on commit 9fbeae8

Please sign in to comment.