Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: change interactive_one to read/write from/to FS device. #330

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cibuild/linkimports.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ _getenv
_interactive_one
_interactive_read
_interactive_write
_use_socketfile
_lowerstr
_main
_main_repl
Expand Down
9 changes: 7 additions & 2 deletions packages/pglite/src/fs/tarUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ export async function loadTar(
const files = untar(tarball)
for (const file of files) {
const filePath = pgDataDir + file.name
const dirPath = filePath.split('/')
const fileName = dirPath.pop()

if (fileName?.startsWith('.')) {
continue
}

// Ensure the directory structure exists
const dirPath = filePath.split('/').slice(0, -1)
for (let i = 1; i <= dirPath.length; i++) {
const dir = dirPath.slice(0, i).join('/')
if (!FS.analyzePath(dir).exists) {
Expand Down Expand Up @@ -80,7 +85,7 @@ function readDirectory(FS: FS, path: string) {
const traverseDirectory = (currentPath: string) => {
const entries = FS.readdir(currentPath)
entries.forEach((entry) => {
if (entry === '.' || entry === '..') {
if (entry === '.' || entry === '..' || entry.startsWith('.')) {
return
}
const fullPath = currentPath + '/' + entry
Expand Down
179 changes: 154 additions & 25 deletions packages/pglite/src/pglite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ import {
NotificationResponseMessage,
} from '@electric-sql/pg-protocol/messages'

const SOCKET_FILE = {
ILOCK: '/tmp/pglite/.s.PGSQL.5432.lock.in',
IN: '/tmp/pglite/.s.PGSQL.5432.in',
OLOCK: '/tmp/pglite/.s.PGSQL.5432.lock.out',
OUT: '/tmp/pglite/.s.PGSQL.5432.out',
}
const PAGE_SIZE = 8192

export class PGlite
extends BasePGlite
implements PGliteInterface, AsyncDisposable
Expand Down Expand Up @@ -54,14 +62,20 @@ export class PGlite

#protocolParser = new ProtocolParser()

// These are the current ArrayBuffer that is being read or written to
#queryInBuffer?: ArrayBuffer
#queryOutChunks?: Uint8Array[]

// These are the current /dev/blob ArrayBuffer that is being read or written to
// during a query, such as COPY FROM or COPY TO.
#queryReadBuffer?: ArrayBuffer
#queryWriteChunks?: Uint8Array[]
#devBlobReadBuffer?: ArrayBuffer
#devBlobWriteChunks?: Uint8Array[]

#notifyListeners = new Map<string, Set<(payload: string) => void>>()
#globalNotifyListeners = new Set<(channel: string, payload: string) => void>()

#socketInDevId?: number
#socketOutDevId?: number

/**
* Create a new PGlite instance
* @param dataDir The directory to store the database files
Expand Down Expand Up @@ -243,7 +257,7 @@ export class PGlite
length: number,
position: number,
) => {
const buf = this.#queryReadBuffer
const buf = this.#devBlobReadBuffer
if (!buf) {
throw new Error(
'No /dev/blob File or Blob provided to read from',
Expand All @@ -264,12 +278,14 @@ export class PGlite
length: number,
_position: number,
) => {
this.#queryWriteChunks ??= []
this.#queryWriteChunks.push(buffer.slice(offset, offset + length))
this.#devBlobWriteChunks ??= []
this.#devBlobWriteChunks.push(
buffer.slice(offset, offset + length),
)
return length
},
llseek: (stream: any, offset: number, whence: number) => {
const buf = this.#queryReadBuffer
const buf = this.#devBlobReadBuffer
if (!buf) {
throw new Error('No /dev/blob File or Blob provided to llseek')
}
Expand Down Expand Up @@ -412,6 +428,9 @@ export class PGlite
// TODO: only sync here if initdb did init db.
await this.syncToFs()

// Setup the socket files for the query protocol IO
this.#setupSocketDevices()

this.#ready = true

// Set the search path to public for this connection
Expand All @@ -426,6 +445,112 @@ export class PGlite
}
}

#setupSocketDevices() {
const mod = this.mod!

// Register SOCKET_FILE.IN device
this.#socketInDevId = mod.FS.makedev(63, 0)
const inDevOpt = {
open: (_stream: any) => {},
close: (_stream: any) => {},
read: (
_stream: any,
buffer: Uint8Array,
offset: number,
length: number,
position: number,
) => {
const buf = this.#queryInBuffer
if (!buf) {
throw new Error(`No ${SOCKET_FILE.IN} Buffer provided to read from`)
}
const contents = new Uint8Array(buf)
if (position >= contents.length) return 0
const size = Math.min(
PAGE_SIZE,
Math.min(contents.length - position, length),
)
for (let i = 0; i < size; i++) {
buffer[offset + i] = contents[position + i]
}
return size
},
write: (
_stream: any,
_buffer: Uint8Array,
_offset: number,
_length: number,
_position: number,
) => {
throw new Error('Not implemented')
},
llseek: (stream: any, offset: number, whence: number) => {
const buf = this.#queryInBuffer
if (!buf) {
throw new Error(`No ${SOCKET_FILE.IN} Buffer provided to llseek`)
}
let position = offset
if (whence === 1) {
position += stream.position
} else if (whence === 2) {
position = new Uint8Array(buf).length
}
if (position < 0) {
throw new mod.FS.ErrnoError(28)
}
return position
},
}
mod.FS.registerDevice(this.#socketInDevId!, inDevOpt)

// Register SOCKET_FILE.OUT devicex
this.#socketOutDevId = mod.FS.makedev(62, 0)
const outDevOpt = {
open: (_stream: any) => {},
close: (_stream: any) => {},
read: (
_stream: any,
_buffer: Uint8Array,
_offset: number,
_length: number,
_position: number,
) => {
throw new Error('Not implemented')
},
write: (
_stream: any,
buffer: Uint8Array,
offset: number,
length: number,
_position: number,
) => {
this.#queryOutChunks ??= []
if (length > 0) {
this.#queryOutChunks.push(buffer.slice(offset, offset + length))
}
return length
},
llseek: (_stream: any, _offset: number, _whence: number) => {
throw new Error('Not implemented')
},
}
mod.FS.registerDevice(this.#socketOutDevId!, outDevOpt)

this.#makeSocketFiles()

mod._use_socketfile()
}

#makeSocketFiles() {
const mod = this.mod!
if (!mod.FS.analyzePath(SOCKET_FILE.IN).exists) {
mod.FS.mkdev(SOCKET_FILE.IN, this.#socketInDevId!)
}
if (!mod.FS.analyzePath(SOCKET_FILE.OUT).exists) {
mod.FS.mkdev(SOCKET_FILE.OUT, this.#socketOutDevId!)
}
}

/**
* The Postgres Emscripten Module
*/
Expand Down Expand Up @@ -496,26 +621,26 @@ export class PGlite
* @param file The file to handle
*/
async _handleBlob(blob?: File | Blob) {
this.#queryReadBuffer = blob ? await blob.arrayBuffer() : undefined
this.#devBlobReadBuffer = blob ? await blob.arrayBuffer() : undefined
}

/**
* Cleanup the current file
*/
async _cleanupBlob() {
this.#queryReadBuffer = undefined
this.#devBlobReadBuffer = undefined
}

/**
* Get the written blob from the current query
* @returns The written blob
*/
async _getWrittenBlob(): Promise<Blob | undefined> {
if (!this.#queryWriteChunks) {
if (!this.#devBlobWriteChunks) {
return undefined
}
const blob = new Blob(this.#queryWriteChunks)
this.#queryWriteChunks = undefined
const blob = new Blob(this.#devBlobWriteChunks)
this.#devBlobWriteChunks = undefined
return blob
}

Expand Down Expand Up @@ -551,29 +676,33 @@ export class PGlite
message: Uint8Array,
{ syncToFs = true }: ExecProtocolOptions = {},
) {
const msg_len = message.length
const mod = this.mod!
// Make query available at /dev/query-in
this.#queryInBuffer = message

// >0 set buffer content type to wire protocol
// set buffer size so answer will be at size+0x2 pointer addr
mod._interactive_write(msg_len)
// Remove the lock files if they exist
const mod = this.mod!
if (mod.FS.analyzePath(SOCKET_FILE.OLOCK).exists) {
mod.FS.unlink(SOCKET_FILE.OLOCK)
}
if (mod.FS.analyzePath(SOCKET_FILE.OLOCK).exists) {
mod.FS.unlink(SOCKET_FILE.OLOCK)
}

// copy whole buffer at addr 0x1
mod.HEAPU8.set(message, 1)
this.#makeSocketFiles()

// execute the message
mod._interactive_one()
this.#queryOutChunks = []
this.mod!._interactive_one()

// Read responses from the buffer
const msg_start = msg_len + 2
const msg_end = msg_start + mod._interactive_read()
const data = mod.HEAPU8.subarray(msg_start, msg_end)
// Read responses from SOCKET_FILE.OUT
const data = await new Blob(this.#queryOutChunks).arrayBuffer()
this.#queryOutChunks = undefined

if (syncToFs) {
await this.syncToFs()
}

return data
return new Uint8Array(data)
}

/**
Expand Down
1 change: 1 addition & 0 deletions packages/pglite/src/postgresMod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export interface PostgresMod
_interactive_write: (msgLength: number) => void
_interactive_one: () => void
_interactive_read: () => number
_use_socketfile: () => void
}

type PostgresFactory<T extends PostgresMod = PostgresMod> = (
Expand Down
Loading
Loading