Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support releasing/freeing actors #30

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions examples/stopping-actors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
let { Actor, ActorSystem, ActorSystemConfigurationBuilder } = require('../dist/index')

class Clock extends Actor {
constructor(times, message) {
super()

this.times = times
this.timer = this.schedule(1000, this.tick, [message])
}

async tick(message) {
console.log(this.times, "From clock ", this.id, " message:", message)
if (--this.times <= 0) {
this.cancel(this.timer)
}
}
}

let system = ActorSystem.for(ActorSystemConfigurationBuilder.define()
.done())

const clock = system.actorOf(Clock, [10, "Actors are cool"])

setTimeout(() => {
system.releaseActor(clock)
console.log('Actor released, no more messages. Wait for 3 more seconds.')
}, 3000)

setTimeout(() => {
system.free()
}, 6000)
10 changes: 10 additions & 0 deletions lib/actor-system/actor-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
* LICENSE file in the root directory of this source tree.
*/

const RELEASE_ACTOR_MESSAGE = '<release>'

type AnswerFunction = (r: any) => void

export default class ActorMessage {
Expand All @@ -23,4 +25,12 @@ export default class ActorMessage {
this.resolve = resolve
this.reject = reject
}

public isAReleaseMessage(): boolean {
return this.methodName == RELEASE_ACTOR_MESSAGE
}

public static releaseActor(): ActorMessage {
return new ActorMessage(RELEASE_ACTOR_MESSAGE, [], null, null)
}
}
29 changes: 24 additions & 5 deletions lib/actor-system/actor-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,35 @@ import Message from '../mailbox/message'
import Actor, { IActor } from './actor'
import ActorMessage from './actor-message'

class ActorHasBeenReleased {
private readonly id: string
private readonly message: ActorMessage
public readonly isActorReleased: boolean = true

public constructor(id: string, message: ActorMessage) {
this.id = id
this.message = message
}

public getMessage(): string {
return 'Actor ' + this.id + ' has been already released, but received message ' + JSON.stringify(this.message)
}
}

export default class ActorProxy {
public static sendAndReturn(
mailbox: Mailbox<ActorMessage>,
actorId: string,
actor: any,
methodName: string,
args: any[],
): Promise<object> {
return new Promise((resolve, reject) =>
mailbox.push(Message.of(actorId, ActorMessage.of(methodName, args, resolve, reject))),
)
return new Promise((resolve, reject) => {
if (actor.isBeingReleased) {
reject(new ActorHasBeenReleased(actor.id, ActorMessage.of(methodName, args, null, null)))
} else {
return mailbox.push(Message.of(actor.id, ActorMessage.of(methodName, args, resolve, reject)))
}
})
}

public static of<T extends IActor>(mailbox: Mailbox<ActorMessage>, actor: T): T {
Expand All @@ -33,7 +52,7 @@ export default class ActorProxy {
return allNames
.map((name: string): [string, any] => [
name,
(...args: any[]): any => ActorProxy.sendAndReturn(mailbox, actor.id, name, args),
(...args: any[]): any => ActorProxy.sendAndReturn(mailbox, actor, name, args),
])
.reduce((result, [member, method]) => ({ ...result, [member]: method }), { ref: actor }) as any
}
Expand Down
24 changes: 24 additions & 0 deletions lib/actor-system/actor-system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import Fiber from '../fiber/fiber'
import IProcessor from '../fiber/processor'
import Mailbox from '../mailbox/mailbox'
import Message from '../mailbox/message'
import Topic from '../pubsub/topic'
import { IActor } from './actor'
import ActorMessage from './actor-message'
import ActorProxy from './actor-proxy'
Expand Down Expand Up @@ -77,6 +79,11 @@ export default class ActorSystem implements IProcessor {
return proxy
}

public releaseActor(actor: IActor): void {
const actorProxy = actor as any
this.mailbox.push(Message.of(actorProxy.ref.id, ActorMessage.releaseActor()))
}

/**
* Looks for an existing actor in this actor system. If none exist,
* it will try to resolve the actor, by id, with the provided resolvers.
Expand Down Expand Up @@ -147,4 +154,21 @@ export default class ActorSystem implements IProcessor {
instance.supervisor = this.supervisor
instance.initialized()
}

private async releaseActorInternal(actor: IActor): Promise<void> {
this.actors.delete(actor.id) // first remove, we don't want more references

const actorSubscription = this.subscriptions.get(actor.id)
this.mailbox.removeSubscription(actorSubscription) // do not receive more messages

this.subscriptions.delete(actor.id)
// unsubscribe from all topics
const subscribedTopics = (actor as any).topicSubscriptions as Map<string, string>
const allUnsubs = Array.from(subscribedTopics.entries()).map(async ([value, key]) => {
const topic = await this.actorFor<Topic<any>>(key)
topic.unsubscribe(value)
})

await Promise.all(allUnsubs)
}
}
37 changes: 32 additions & 5 deletions lib/actor-system/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export default abstract class Actor implements IActor {
private readonly scheduled: Map<Cancellable, Timer> = new Map()
private readonly topicSubscriptions: Map<string, string> = new Map()
private busy = false
private isBeingReleased = false

protected constructor(id?: string) {
this.id = id || uuid()
Expand All @@ -56,10 +57,19 @@ export default abstract class Actor implements IActor {

const actorMessage = message.content
try {
this.materializers.forEach((materializer) => materializer.onBeforeMessage(this, actorMessage))
const result = await this.dispatchAndPromisify(actorMessage)
if (actorMessage.isAReleaseMessage()) {
this.isBeingReleased = true

actorMessage.resolve(result)
this.materializers.forEach((materializer) => materializer.onBeforeRelease(this))
this.cancelAll()
await (this.system! as any).releaseActorInternal(this)
this.materializers.forEach((materializer) => materializer.onAfterRelease(this))
} else {
this.materializers.forEach((materializer) => materializer.onBeforeMessage(this, actorMessage))
const result = await this.dispatchAndPromisify(actorMessage)

actorMessage.resolve(result)
}
} catch (ex) {
this.materializers.forEach((materializer) => materializer.onError(this, actorMessage, ex))
const strategy = await this.supervisor!.supervise(this.self, ex, actorMessage)
Expand All @@ -74,8 +84,10 @@ export default abstract class Actor implements IActor {
return true
}
} finally {
this.busy = false
this.materializers.forEach((materializer) => materializer.onAfterMessage(this, actorMessage))
if (!this.isBeingReleased) {
this.busy = false
this.materializers.forEach((materializer) => materializer.onAfterMessage(this, actorMessage))
}
}

return true
Expand Down Expand Up @@ -157,6 +169,21 @@ export default abstract class Actor implements IActor {
}, 0)
}

/**
* Cancel all scheduled actions created by #schedule or #scheduleOnce
*
* @see Actor#schedule
* @see Actor#scheduleOnce
*/
protected cancelAll(): void {
this.scheduled.forEach((value, key) => {
clearTimeout(value)
clearInterval(value)

this.scheduled.delete(key)
})
}

/**
* Creates a child actor of this actor. The current actor will behave as the supervisor
* of the created actor.
Expand Down
2 changes: 2 additions & 0 deletions lib/actor-system/materializer/materializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ export default interface IMaterializer {
onBeforeMessage(actor: Actor, message: ActorMessage): void
onAfterMessage(actor: Actor, message: ActorMessage): void
onError(actor: Actor, message: ActorMessage, error: any): void
onBeforeRelease(actor: Actor): void
onAfterRelease(actor: Actor): void
}
9 changes: 4 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,12 @@
"license": "MIT",
"devDependencies": {
"@faker-js/faker": "^7.6.0",
"@types/faker": "6.6.9",
"@types/jest": "29.2.5",
"@types/jest": "29.2.6",
"@types/node": "18.11.18",
"@typescript-eslint/eslint-plugin": "5.48.1",
"@typescript-eslint/parser": "5.48.1",
"@typescript-eslint/eslint-plugin": "5.48.2",
"@typescript-eslint/parser": "5.48.2",
"coveralls": "3.1.1",
"esbuild": "0.17.0",
"esbuild": "0.17.3",
"eslint": "8.32.0",
"eslint-config-standard": "17.0.0",
"eslint-plugin-import": "^2.27.4",
Expand Down
6 changes: 4 additions & 2 deletions test/actor-system/actor-proxy.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@ describe('actor proxy', () => {
const resultMessage = faker.datatype.uuid()
const expectedResult = faker.datatype.uuid()
const args = [faker.datatype.uuid(), faker.datatype.uuid()]
const actor = { id: actorId }

actorMessageMock.of.mockImplementation((_, __, resolve, ___) => {
resolve(expectedResult)
return resultActorMessage
})
messageMock.of.mockReturnValue(resultMessage)

const result = await ActorProxy.sendAndReturn(mailbox, actorId, methodName, args)
const result = await ActorProxy.sendAndReturn(mailbox, actor, methodName, args)

expect(actorMessageMock.of).toBeCalledWith(methodName, args, expect.any(Function), expect.any(Function))
expect(messageMock.of).toBeCalledWith(actorId, resultActorMessage)
Expand All @@ -88,14 +89,15 @@ describe('actor proxy', () => {
const resultMessage = faker.datatype.uuid()
const expectedResult = faker.datatype.uuid()
const args = [faker.datatype.uuid(), faker.datatype.uuid()]
const actor = { id: actorId }

actorMessageMock.of.mockImplementation((_, __, ___, reject) => {
reject(expectedResult)
})
messageMock.of.mockReturnValue(resultMessage)

try {
await ActorProxy.sendAndReturn(mailbox, actorId, methodName, args)
await ActorProxy.sendAndReturn(mailbox, actor, methodName, args)
fail()
} catch (_) {
//
Expand Down
39 changes: 39 additions & 0 deletions test/actor-system/actor-system.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import IMaterializer from '../../lib/actor-system/materializer/materializer'
import NamedActor from './fixtures/named-actor'
import SemaphoreActor from './fixtures/semaphore-actor'
import waitFor from './fixtures/wait-for'
import sleep from './fixtures/sleep'
import IResolver from '../../lib/actor-system/resolver/resolver'
import { Actor } from '../../lib'

Expand All @@ -30,12 +31,16 @@ describe('Actor System', () => {
onBeforeMessage: jest.fn(),
onError: jest.fn(),
onInitialize: jest.fn(),
onBeforeRelease: jest.fn(),
onAfterRelease: jest.fn(),
}
secondMaterializer = {
onAfterMessage: jest.fn(),
onBeforeMessage: jest.fn(),
onError: jest.fn(),
onInitialize: jest.fn(),
onBeforeRelease: jest.fn(),
onAfterRelease: jest.fn(),
}
firstResolver = {
resolveActorById: jest.fn(),
Expand Down Expand Up @@ -183,4 +188,38 @@ describe('Actor System', () => {
await waitFor(() => fnActor(5, 15))
expect(fn).toBeCalledWith(5, 15)
})

describe('released actors', () => {
test('should call materializers before releasing', async () => {
const actor: NamedActor = actorSystem.actorOf(NamedActor, ['myReleasedActor'])

actorSystem.releaseActor(actor)
actorSystem.process()
await waitFor(() => sleep(10))

expect(firstMaterializer.onBeforeRelease).toHaveBeenCalled()
expect(secondMaterializer.onBeforeRelease).toHaveBeenCalled()
expect(firstMaterializer.onAfterRelease).toHaveBeenCalled()
expect(secondMaterializer.onAfterRelease).toHaveBeenCalled()
})

test('should not received more messages once it has been released', async () => {
const actor: NamedActor = actorSystem.actorOf(NamedActor, ['myReleasedActor'])

actorSystem.releaseActor(actor)
actorSystem.process()

try {
console.log('first wait')
await waitFor(() => sleep(10))
await waitFor(() => actor.sayHi())
} catch (ex) {
expect(ex.getMessage()).toContain('has been already released, but received message')
expect(ex.isActorReleased).toBeTruthy()
return
}

fail('Should have not been dispatched the message')
})
})
})
Loading