Skip to content

Commit aaa1f1d

Browse files
committed
Add 'stream' type
1 parent dd6bfde commit aaa1f1d

File tree

1 file changed

+84
-0
lines changed

1 file changed

+84
-0
lines changed

design/mvp/canonical-abi/definitions.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,10 @@ class Own(ValType):
166166
class Borrow(ValType):
167167
rt: ResourceType
168168

169+
@dataclass
170+
class Stream(ValType):
171+
t: ValType
172+
169173
### Despecialization
170174

171175
def despecialize(t):
@@ -193,6 +197,7 @@ def alignment(t):
193197
case Variant(cases) : return alignment_variant(cases)
194198
case Flags(labels) : return alignment_flags(labels)
195199
case Own(_) | Borrow(_) : return 4
200+
case Stream(_) : return 4
196201

197202
def alignment_record(fields):
198203
a = 1
@@ -243,6 +248,7 @@ def elem_size(t):
243248
case Variant(cases) : return elem_size_variant(cases)
244249
case Flags(labels) : return elem_size_flags(labels)
245250
case Own(_) | Borrow(_) : return 4
251+
case Stream(_) : return 4
246252

247253
def elem_size_record(fields):
248254
s = 0
@@ -303,6 +309,8 @@ class ComponentInstance:
303309
active_sync_task: bool
304310
pending_sync_tasks: list[asyncio.Future]
305311
async_subtasks: Table[AsyncSubtask]
312+
readable_streams: Table[ReadableStreamElem]
313+
writable_streams: Table[WritableStreamElem]
306314

307315
def __init__(self):
308316
self.may_leave = True
@@ -314,6 +322,8 @@ def __init__(self):
314322
self.active_sync_task = False
315323
self.pending_sync_tasks = []
316324
self.async_subtasks = Table[AsyncSubtask]()
325+
self.readable_streams = Table[ReadableStreamElem]()
326+
self.writable_streams = Table[WritableStreamElem]()
317327

318328
class HandleTables:
319329
rt_to_table: MutableMapping[ResourceType, Table[HandleElem]]
@@ -388,6 +398,12 @@ def __init__(self, rep, own, scope = None):
388398
self.scope = scope
389399
self.lend_count = 0
390400

401+
class StreamElem:
402+
t: ValType
403+
404+
def __init__(self, t):
405+
self.t = t
406+
391407
class AsyncCallState(IntEnum):
392408
STARTING = 0
393409
STARTED = 1
@@ -400,6 +416,9 @@ class EventCode(IntEnum):
400416
CALL_RETURNED = AsyncCallState.RETURNED
401417
CALL_DONE = AsyncCallState.DONE
402418
YIELDED = 4
419+
STREAM_READ = 5
420+
STREAM_CLOSED = 6
421+
STREAM_WROTE = 7
403422

404423
current_task = asyncio.Lock()
405424

@@ -489,6 +508,12 @@ def async_subtask_made_progress(self, subtask):
489508
subtask.enqueued = True
490509
self.events.put_nowait(subtask)
491510

511+
def stream_new(self, t):
512+
s = StreamElem(t)
513+
rsi = self.inst.readable_streams.add(s)
514+
wsi = self.inst.writable_streams.add(s)
515+
return (rsi, wsi)
516+
492517
def create_borrow(self):
493518
self.borrow_count += 1
494519

@@ -624,6 +649,7 @@ def load(cx, ptr, t):
624649
case Flags(labels) : return load_flags(cx, ptr, labels)
625650
case Own() : return lift_own(cx, load_int(cx, ptr, 4), t)
626651
case Borrow() : return lift_borrow(cx, load_int(cx, ptr, 4), t)
652+
case Stream() : return lift_stream(cx, load_int(cx, ptr, 4), t)
627653

628654
def load_int(cx, ptr, nbytes, signed = False):
629655
return int.from_bytes(cx.opts.memory[ptr : ptr+nbytes], 'little', signed=signed)
@@ -772,6 +798,10 @@ def lift_borrow(cx, i, t):
772798
cx.track_owning_lend(h)
773799
return h.rep
774800

801+
def lift_stream(cx, i, t):
802+
# TODO
803+
pass
804+
775805
### Storing
776806

777807
def store(cx, v, t, ptr):
@@ -797,6 +827,7 @@ def store(cx, v, t, ptr):
797827
case Flags(labels) : store_flags(cx, v, ptr, labels)
798828
case Own() : store_int(cx, lower_own(cx.opts, v, t), ptr, 4)
799829
case Borrow() : store_int(cx, lower_borrow(cx.opts, v, t), ptr, 4)
830+
case Stream() : store_int(cx, lower_stream(cx.opts, v, t), ptr, 4)
800831

801832
def store_int(cx, v, ptr, nbytes, signed = False):
802833
cx.opts.memory[ptr : ptr+nbytes] = int.to_bytes(v, nbytes, 'little', signed=signed)
@@ -1052,6 +1083,10 @@ def lower_borrow(cx, rep, t):
10521083
cx.create_borrow()
10531084
return cx.inst.handles.add(t.rt, h)
10541085

1086+
def lower_stream(cx, rep, t):
1087+
# TODO
1088+
pass
1089+
10551090
### Flattening
10561091

10571092
MAX_FLAT_PARAMS = 16
@@ -1101,6 +1136,7 @@ def flatten_type(t):
11011136
case Variant(cases) : return flatten_variant(cases)
11021137
case Flags(labels) : return ['i32']
11031138
case Own(_) | Borrow(_) : return ['i32']
1139+
case Stream(_) : return ['i32']
11041140

11051141
def flatten_record(fields):
11061142
flat = []
@@ -1162,6 +1198,7 @@ def lift_flat(cx, vi, t):
11621198
case Flags(labels) : return lift_flat_flags(vi, labels)
11631199
case Own() : return lift_own(cx, vi.next('i32'), t)
11641200
case Borrow() : return lift_borrow(cx, vi.next('i32'), t)
1201+
case Stream() : return lift_stream(cx, vi.next('i32'), t)
11651202

11661203
def lift_flat_unsigned(vi, core_width, t_width):
11671204
i = vi.next('i' + str(core_width))
@@ -1248,6 +1285,7 @@ def lower_flat(cx, v, t):
12481285
case Flags(labels) : return lower_flat_flags(v, labels)
12491286
case Own() : return [lower_own(cx, v, t)]
12501287
case Borrow() : return [lower_borrow(cx, v, t)]
1288+
case Stream() : return [lower_stream(cx, v, t)]
12511289

12521290
def lower_flat_signed(i, core_bits):
12531291
if i < 0:
@@ -1523,3 +1561,49 @@ async def canon_task_yield(task):
15231561
trap_if(task.opts.callback is not None)
15241562
await task.yield_()
15251563
return []
1564+
1565+
### 🔀 `canon stream.new`
1566+
1567+
async def canon_stream_new(t, task, ptr):
1568+
rsi, wsi = task.stream_new(t)
1569+
store(task, wsi, U32(), ptr)
1570+
return [rsi]
1571+
1572+
### 🔀 `canon stream.write`
1573+
1574+
async def canon_stream_write(sync, task, wsi, ptr, length, partial):
1575+
trap_if(length == 0)
1576+
s = self.inst.writable_streams.get(wsi)
1577+
if sync:
1578+
trap_if(s.pending_writes)
1579+
while partial > 0 and not canceled:
1580+
if not s.pending_reads:
1581+
task.inst.calling_sync_import = True
1582+
await ...
1583+
task.inst.calling_sync_import = False
1584+
... copy as much as you can
1585+
partial -= ... as much as you could
1586+
return [nwritten]
1587+
if s.pending_reads:
1588+
... do the read
1589+
return [nwritten]
1590+
return [0]
1591+
1592+
### 🔀 `canon stream.close`
1593+
1594+
async def canon_stream_close(task, wsi):
1595+
# TODO
1596+
pass
1597+
1598+
### 🔀 `canon stream.read`
1599+
1600+
async def canon_stream_read(sync, task, rsi, ptr, length):
1601+
trap_if(length == 0)
1602+
# TODO
1603+
pass
1604+
1605+
### 🔀 `canon stream.drop`
1606+
1607+
async def canon_stream_drop(task, rsi):
1608+
# TODO
1609+
pass

0 commit comments

Comments
 (0)