Skip to content

Commit c055029

Browse files
committed
fix(python-runtime): add compliance test for async context callbacks
feat(kernel): process callbacks while EndRequest is awaiting promise
1 parent b0a01e1 commit c055029

File tree

18 files changed

+772
-116
lines changed

18 files changed

+772
-116
lines changed

packages/@jsii/kernel/src/api.ts

+2
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ export function isPropertyOverride(value: Override): value is PropertyOverride {
7373

7474
export interface Callback {
7575
readonly cbid: string;
76+
/** Whether this callback is synchronous. */
77+
readonly sync: boolean;
7678
readonly cookie: string | undefined;
7779
readonly invoke?: InvokeRequest;
7880
readonly get?: GetRequest;

packages/@jsii/kernel/src/kernel.test.ts

+30
Original file line numberDiff line numberDiff line change
@@ -2212,6 +2212,36 @@ defineTest('invokeBinScript() accepts arguments', (sandbox) => {
22122212
});
22132213
});
22142214

2215+
// defineTest('ImplementationFromAsyncContext compliance', async (sandbox) => {
2216+
// const producer = sandbox.create({
2217+
// fqn: 'Object',
2218+
// overrides: [{ method: 'produce', cookie: 'produce1234' }],
2219+
// interfaces: ['jsii-calc.IPromiseProducer'],
2220+
// });
2221+
2222+
// const obj = sandbox.create({
2223+
// fqn: 'jsii-calc.ImplementationFromAsyncContext',
2224+
// args: [producer],
2225+
// });
2226+
2227+
// const promise1 = sandbox.begin({
2228+
// objref: obj,
2229+
// method: 'doAsyncWork',
2230+
// });
2231+
2232+
// const callbacks1 = sandbox.callbacks();
2233+
// expect(callbacks1.callbacks.length).toBe(1);
2234+
// expect(callbacks1.callbacks[0].cookie).toBe('produce1234');
2235+
2236+
// sandbox.complete({
2237+
// cbid: callbacks1.callbacks[0].cbid,
2238+
// result: 'test-string',
2239+
// });
2240+
2241+
// const result = (await sandbox.end(promise1)).result;
2242+
// expect(result).toBe('test-string');
2243+
// });
2244+
22152245
// =================================================================================================
22162246

22172247
const testNames: { [name: string]: boolean } = {};

packages/@jsii/kernel/src/kernel.ts

+52-2
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,52 @@ export class Kernel {
447447

448448
let result;
449449
try {
450-
result = await promise;
450+
let settled = false;
451+
/**
452+
* Poll for new callback requests until the promise is resolved. This is
453+
* to allow any promises necessary for the promise to be able to settle.
454+
* We use setImmediate so the next poll happens on the next run loop tick,
455+
* after other microtasks might have been paused on a pending callback.
456+
*/
457+
// eslint-disable-next-line no-inner-declarations
458+
function pollForCallbacks(kernel: Kernel) {
459+
// Promise has settled already, not going any further...
460+
if (settled) {
461+
return;
462+
}
463+
464+
for (const [cbid, cb] of kernel.cbs.entries()) {
465+
kernel.waiting.set(cbid, cb);
466+
kernel.cbs.delete(cbid);
467+
try {
468+
cb.succeed(
469+
kernel.callbackHandler({
470+
cbid,
471+
sync: false,
472+
cookie: cb.override.cookie,
473+
invoke: {
474+
objref: cb.objref,
475+
method: cb.override.method,
476+
args: cb.args,
477+
},
478+
}),
479+
);
480+
} catch (err) {
481+
cb.fail(err);
482+
} finally {
483+
kernel.waiting.delete(cbid);
484+
}
485+
}
486+
if (!settled) {
487+
setImmediate(pollForCallbacks, kernel);
488+
}
489+
}
490+
pollForCallbacks(this);
491+
492+
result = await promise.finally(() => {
493+
settled = true;
494+
});
495+
451496
this._debug('promise result:', result);
452497
} catch (e: any) {
453498
this._debug('promise error:', e);
@@ -475,14 +520,16 @@ export class Kernel {
475520
};
476521
}
477522

523+
/** @deprecated the flow should be handled directly by "end" */
478524
public callbacks(_req?: api.CallbacksRequest): api.CallbacksResponse {
479525
this._debug('callbacks');
480526
const ret = Array.from(this.cbs.entries()).map(([cbid, cb]) => {
481527
this.waiting.set(cbid, cb); // move to waiting
482528
this.cbs.delete(cbid); // remove from created
483529
const callback: api.Callback = {
484-
cbid,
485530
cookie: cb.override.cookie,
531+
cbid,
532+
sync: false,
486533
invoke: {
487534
objref: cb.objref,
488535
method: cb.override.method,
@@ -758,6 +805,7 @@ export class Kernel {
758805
const result = this.callbackHandler({
759806
cookie: override.cookie,
760807
cbid: this._makecbid(),
808+
sync: true,
761809
get: { objref, property: propertyName },
762810
});
763811
this._debug('callback returned', result);
@@ -774,6 +822,7 @@ export class Kernel {
774822
this.callbackHandler({
775823
cookie: override.cookie,
776824
cbid: this._makecbid(),
825+
sync: true,
777826
set: {
778827
objref,
779828
property: propertyName,
@@ -910,6 +959,7 @@ export class Kernel {
910959
const result = this.callbackHandler({
911960
cookie: override.cookie,
912961
cbid: this._makecbid(),
962+
sync: true,
913963
invoke: {
914964
objref,
915965
method: methodName,

packages/@jsii/python-runtime/src/jsii/_kernel/__init__.py

+7-20
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import datetime
22
import inspect
33
import itertools
4+
import time
45
from types import FunctionType, MethodType, BuiltinFunctionType, LambdaType
56

67
from typing import Callable, cast, Any, List, Optional, Sequence, Type
@@ -28,6 +29,7 @@
2829
CreateResponse,
2930
DeleteRequest,
3031
EndRequest,
32+
EndResponse,
3133
EnumRef,
3234
GetRequest,
3335
GetResponse,
@@ -464,26 +466,11 @@ def ainvoke(self, obj: Any, method: str, args: Optional[List[Any]] = None) -> An
464466
if isinstance(promise, Callback):
465467
promise = _callback_till_result(self, promise, BeginResponse)
466468

467-
callbacks = self.provider.callbacks(CallbacksRequest()).callbacks
468-
while callbacks:
469-
for callback in callbacks:
470-
try:
471-
result = _handle_callback(self, callback)
472-
except Exception as exc:
473-
# TODO: Maybe we want to print the whole traceback here?
474-
complete = self.provider.complete(
475-
CompleteRequest(cbid=callback.cbid, err=str(exc))
476-
)
477-
else:
478-
complete = self.provider.complete(
479-
CompleteRequest(cbid=callback.cbid, result=result)
480-
)
481-
482-
assert complete.cbid == callback.cbid
483-
484-
callbacks = self.provider.callbacks(CallbacksRequest()).callbacks
485-
486-
return self.provider.end(EndRequest(promiseid=promise.promiseid)).result
469+
response = self.provider.end(EndRequest(promiseid=promise.promiseid))
470+
if isinstance(response, Callback):
471+
return _callback_till_result(self, response, EndResponse).result
472+
else:
473+
return response.result
487474

488475
def stats(self):
489476
resp = self.provider.stats(StatsRequest())

packages/@jsii/python-runtime/src/jsii/_kernel/providers/process.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ def stop(self) -> None:
286286
assert self._process.stdin is not None
287287
if not self._process.stdin.closed:
288288
self._process.stdin.write(b'{"exit":0}\n')
289-
# Close the process' STDIN, singaling we are done with it
289+
# Close the process' STDIN, signaling we are done with it
290290
self._process.stdin.close()
291291

292292
try:

packages/@jsii/python-runtime/src/jsii/_kernel/types.py

+1
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ class StatsResponse:
234234
LoadResponse,
235235
CreateResponse,
236236
DeleteResponse,
237+
EndResponse,
237238
GetResponse,
238239
InvokeResponse,
239240
InvokeScriptResponse,

packages/@jsii/python-runtime/tests/test_compliance.py

+18-1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@
7777
AnonymousImplementationProvider,
7878
UpcasingReflectable,
7979
PromiseNothing,
80+
IPromiseProducer,
81+
ImplementationFromAsyncContext,
8082
)
8183
from jsii_calc.cdk16625 import Cdk16625
8284
from jsii_calc.cdk22369 import AcceptsPath
@@ -527,6 +529,7 @@ def test_asyncOverrides_overrideAsyncMethodByParentClass():
527529
assert obj.call_me() == 4452
528530

529531

532+
# fails for no reason and poisons the runtime
530533
def test_asyncOverrides_overrideCallsSuper():
531534
obj = OverrideCallsSuper()
532535
assert obj.override_me(12) == 1441
@@ -1358,5 +1361,19 @@ def test_void_returning_async():
13581361
"""Verifies it's okay to return a Promise<void>."""
13591362

13601363
assert PromiseNothing().instance_promise_it() is None
1361-
## TODO: This is currently broken as code-gen is incorrect for static async.
1364+
# TODO: This is currently broken as code-gen is incorrect for static async.
13621365
# assert PromiseNothing.promise_it() is None
1366+
1367+
1368+
def test_calling_implementation_from_async_context():
1369+
@jsii.implements(IPromiseProducer)
1370+
class ConcreteProducer:
1371+
def produce(self) -> str:
1372+
return "result"
1373+
1374+
producer = ConcreteProducer()
1375+
1376+
assert producer.produce() == "result"
1377+
1378+
worker = ImplementationFromAsyncContext(producer)
1379+
assert worker.do_async_work() == "result"

packages/@jsii/runtime/lib/host.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ export class KernelHost {
8282
return this.processRequest(
8383
req,
8484
completeCallback.bind(this),
85-
/* sync */ true,
85+
callback.sync,
8686
);
8787
}
8888
}
@@ -126,7 +126,7 @@ export class KernelHost {
126126
// promises. see the kernel test 'async overrides: two overrides'
127127
// for an example for this use case.
128128
if (apiReq.api === 'begin' || apiReq.api === 'complete') {
129-
checkIfAsyncIsAllowed();
129+
assertAsyncIsAllowed();
130130

131131
this.debug('processing pending promises before responding');
132132

@@ -141,7 +141,7 @@ export class KernelHost {
141141
// if this is an async method, return immediately and
142142
// call next only when the promise is fulfilled.
143143
if (this.isPromise(ret)) {
144-
checkIfAsyncIsAllowed();
144+
assertAsyncIsAllowed();
145145

146146
this.debug('waiting for promise to be fulfilled');
147147

@@ -169,7 +169,7 @@ export class KernelHost {
169169
// indicate this request was processed (synchronously).
170170
return next();
171171

172-
function checkIfAsyncIsAllowed() {
172+
function assertAsyncIsAllowed() {
173173
if (sync) {
174174
throw new JsiiFault(
175175
'Cannot handle async operations while waiting for a sync callback to return',

packages/@jsii/runtime/test/__snapshots__/kernel-host.test.js.snap

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/jsii-calc/lib/compliance.ts

+25
Original file line numberDiff line numberDiff line change
@@ -3141,3 +3141,28 @@ export class PromiseNothing {
31413141
return PromiseNothing.promiseIt();
31423142
}
31433143
}
3144+
3145+
/**
3146+
* Async Context operations
3147+
* Validates features work when run from within an async context
3148+
*
3149+
* @see https://github.com/aws/jsii/issues/3917
3150+
*/
3151+
export interface IPromiseProducer {
3152+
produce(): Promise<string>;
3153+
}
3154+
3155+
export class ImplementationFromAsyncContext {
3156+
public constructor(private readonly producer: IPromiseProducer) {}
3157+
3158+
public async doAsyncWork(): Promise<string> {
3159+
await this.sleep(200);
3160+
return this.producer.produce();
3161+
}
3162+
3163+
private async sleep(ms: number) {
3164+
return new Promise((resolve) => {
3165+
setTimeout(resolve, ms);
3166+
});
3167+
}
3168+
}

0 commit comments

Comments
 (0)