Skip to content

Commit

Permalink
[JS] feat: Generated by AI Label, Feedback Loop, Streaming Buffer, Er…
Browse files Browse the repository at this point in the history
…ror Propagation, Entities Metadata (#2135)

## Linked issues

closes: #1970

## Details

- Added temporary 1.5 second buffer to adhere to 1RPS backend service
requirement
- Added support for Feedback Loop
- Added support for Generated by AI label
- Added reject/catch handling for errors
- Added `entities` metadata to match GA requirements

**screenshots**:

![image](https://github.com/user-attachments/assets/2b5d576e-c00a-4f20-90e9-ba499bff8a7b)


## Attestation Checklist

- [x] My code follows the style guidelines of this project

- I have checked for/fixed spelling, linting, and other errors
- I have commented my code for clarity
- I have made corresponding changes to the documentation (updating the
doc strings in the code is sufficient)
- My changes generate no new warnings
- I have added tests that validates my changes, and provides sufficient
test coverage. I have tested with:
  - Local testing
  - E2E testing in Teams
- New and existing unit tests pass locally with my changes

---------

Co-authored-by: lilydu <[email protected]>
  • Loading branch information
lilyydu and lilydu authored Oct 22, 2024
1 parent feeb230 commit e2ab8f3
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 14 deletions.
6 changes: 5 additions & 1 deletion getting-started/CONCEPTS/STREAMING.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ Once `endStream()` is called, the stream is considered ended and no further upda

### Current Limitations:
- Streaming is only available in 1:1 chats.
- SendActivity requests are restricted to 1 RPS. Our SDK buffers to 1.5 seconds.
- For Powered by AI features, only the Feedback Loop and Generated by AI Label is currently supported.
- Only rich text can be streamed.
- Due to future GA protocol changes, the `channelData` metadata must be included in the `entities` object as well.
- Only one informative message can be set. This is reused for each message.
- Examples include:
- “Scanning through documents”
Expand All @@ -70,7 +73,8 @@ You can configure streaming with your bot by following these steps:


#### Optional additions:
- Set the informative message in the `ActionPlanner` declaration via the `StartStreamingMessage` config.
- Set the informative message in the `ActionPlanner` declaration via the `StartStreamingMessage` config.
- As previously, set the feedback loop toggle in the `AIOptions` object in the `app` declaration and specify a handler.
- Set attachments in the final chunk via the `EndStreamHandler` in the `ActionPlanner` declaration.

#### C#
Expand Down
7 changes: 7 additions & 0 deletions js/packages/teams-ai/src/AI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,13 @@ export class AI<TState extends TurnState = TurnState> {
return this._options.planner;
}

/**
* @returns {boolean} Returns the feedback loop flag.
*/
public get enableFeedbackLoop(): boolean {
return this._options.enable_feedback_loop;
}

/**
* Registers a handler for a named action.
* @remarks
Expand Down
64 changes: 61 additions & 3 deletions js/packages/teams-ai/src/StreamingResponse.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ describe('StreamingResponse', function () {
const activity = adapter.getNextReply();
assert.equal(activity.type, 'message', 'activity.type should be "message"');
assert.equal(activity.text, '', 'activity.text should be ""');
assert.deepEqual(activity.channelData, { streamType: 'final' }, 'activity.channelData should match');
assert.deepEqual(
activity.channelData,
{ streamType: 'final', feedbackLoopEnabled: false },
'activity.channelData should match'
);
});
});

Expand All @@ -153,9 +157,63 @@ describe('StreamingResponse', function () {
assert.equal(activities[2].text, 'firstsecond', 'final activity text should be "firstsecond"');
assert.deepEqual(
activities[2].channelData,
{ streamType: 'final', streamId: response.streamId },
{ streamType: 'final', streamId: response.streamId, feedbackLoopEnabled: false },
'final activity channelData should match'
);
});
});

it('should send a final message with powered by AI features', async () => {
const adapter = new TestAdapter();
await adapter.sendTextToBot('test', async (context) => {
const response = new StreamingResponse(context);
response.queueTextChunk('first');
response.queueTextChunk('second');
response.setFeedbackLoop(true);
response.setGeneratedByAILabel(true);
await response.waitForQueue();
await response.endStream();
assert(response.updatesSent == 2, 'updatesSent should be 2');

// Validate sent activities
const activities = adapter.activeQueue;
assert.equal(activities.length, 3, 'should have sent 3 activities');
assert.equal(activities[0].channelData.streamSequence, 1, 'first activity streamSequence should be 1');
assert.equal(activities[0].entities!.length, 1, 'length of first activity entities should be 1');
assert.deepEqual(
activities[0].entities,
[{ type: 'streaminfo', properties: { ...activities[0].channelData } }],
'first activity entities should match'
);
assert.equal(activities[1].channelData.streamSequence, 2, 'second activity streamSequence should be 2');
assert.equal(activities[1].entities!.length, 1, 'length of second activity entities should be 1');
assert.deepEqual(
activities[1].entities,
[{ type: 'streaminfo', properties: { ...activities[1].channelData } }],
'second activity entities should match'
);
assert.equal(activities[2].type, 'message', 'final activity type should be "message"');
assert.equal(activities[2].text, 'firstsecond', 'final activity text should be "firstsecond"');

assert.deepEqual(
activities[2].channelData,
{ streamType: 'final', streamId: response.streamId, feedbackLoopEnabled: true },
'final activity channelData should match'
);
assert.deepEqual(
activities[2].entities,
[
{ type: 'streaminfo', properties: { streamType: 'final', streamId: response.streamId } },
{
type: 'https://schema.org/Message',
'@type': 'Message',
'@context': 'https://schema.org',
'@id': '',
additionalType: ['AIGeneratedContent']
}
],
'final activity entities obj should match'
);
});
});

Expand Down Expand Up @@ -191,7 +249,7 @@ describe('StreamingResponse', function () {
assert.equal(activities[2].text, 'firstsecond', 'final activity text should be "firstsecond"');
assert.deepEqual(
activities[2].channelData,
{ streamType: 'final', streamId: response.streamId },
{ streamType: 'final', streamId: response.streamId, feedbackLoopEnabled: false },
'final activity channelData should match'
);
assert.notEqual(activities[2].attachments, null);
Expand Down
68 changes: 63 additions & 5 deletions js/packages/teams-ai/src/StreamingResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
* Licensed under the MIT License.
*/

import { Activity, Attachment, TurnContext } from 'botbuilder-core';
import { Activity, Attachment, TurnContext, Entity } from 'botbuilder-core';
import { AIEntity } from './types';

/**
* A helper class for streaming responses to the client.
Expand All @@ -31,6 +32,10 @@ export class StreamingResponse {
private _queueSync: Promise<void> | undefined;
private _chunkQueued = false;

// Powered by AI feature flags
private _enableFeedbackLoop = false;
private _enableGeneratedByAILabel = false;

/**
* Creates a new StreamingResponse instance.
* @param {TurnContext} context - Context for the current turn of conversation with the user.
Expand Down Expand Up @@ -79,7 +84,7 @@ export class StreamingResponse {
}

/**
* Queues a chunk of partial message text to be sent to the client.
* Queues a chunk of partial message text to be sent to the client
* @remarks
* The text we be sent as quickly as possible to the client. Chunks may be combined before
* delivery to the client.
Expand Down Expand Up @@ -111,7 +116,7 @@ export class StreamingResponse {
this.queueNextChunk();

// Wait for the queue to drain
return this._queueSync!;
return this.waitForQueue();
}

/**
Expand All @@ -122,6 +127,25 @@ export class StreamingResponse {
this._attachments = attachments;
}

/**
* Sets the Feedback Loop in Teams that allows a user to
* give thumbs up or down to a response.
* Default is `false`.
* @param enableFeedbackLoop If true, the feedback loop is enabled.
*/
public setFeedbackLoop(enableFeedbackLoop: boolean): void {
this._enableFeedbackLoop = enableFeedbackLoop;
}

/**
* Sets the the Generated by AI label in Teams
* Default is `false`.
* @param enableGeneratedByAILabel If true, the label is added.
*/
public setGeneratedByAILabel(enableGeneratedByAILabel: boolean): void {
this._enableGeneratedByAILabel = enableGeneratedByAILabel;
}

/**
* Returns the most recently streamed message.
* @returns The streamed message.
Expand Down Expand Up @@ -185,7 +209,10 @@ export class StreamingResponse {

// If there's no sync in progress, start one
if (!this._queueSync) {
this._queueSync = this.drainQueue();
this._queueSync = this.drainQueue().catch((err) => {
console.error(`Error occured when sending activity while streaming: "${err}".`);
throw err;
});
}
}

Expand All @@ -195,7 +222,7 @@ export class StreamingResponse {
* @private
*/
private drainQueue(): Promise<void> {
return new Promise<void>(async (resolve) => {
return new Promise<void>(async (resolve, reject) => {
try {
while (this._queue.length > 0) {
// Get next activity from queue
Expand All @@ -207,6 +234,8 @@ export class StreamingResponse {
}

resolve();
} catch (err) {
reject(err);
} finally {
// Queue is empty, mark as idle
this._queueSync = undefined;
Expand All @@ -227,8 +256,37 @@ export class StreamingResponse {
activity.channelData = Object.assign({}, activity.channelData, { streamId: this._streamId });
}

activity.entities = [
{
type: 'streaminfo',
properties: {
...activity.channelData
}
} as Entity
];

// Add in Powered by AI feature flags
if (this._ended) {
// Add in feedback loop
activity.channelData = Object.assign({}, activity.channelData, {
feedbackLoopEnabled: this._enableFeedbackLoop
});

// Add in Generated by AI
if (this._enableGeneratedByAILabel) {
activity.entities.push({
type: 'https://schema.org/Message',
'@type': 'Message',
'@context': 'https://schema.org',
'@id': '',
additionalType: ['AIGeneratedContent']
} as AIEntity);
}
}

// Send activity
const response = await this._context.sendActivity(activity);
await new Promise((resolve) => setTimeout(resolve, 1.5));

// Save assigned stream ID
if (!this._streamId) {
Expand Down
13 changes: 12 additions & 1 deletion js/packages/teams-ai/src/planners/ActionPlanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ export interface ActionPlannerOptions<TState extends TurnState = TurnState> {
* Optional handler to run when a stream is about to conclude.
*/
endStreamHandler?: PromptCompletionModelResponseReceivedEvent;

/**
* If true, the feedback loop will be enabled for streaming responses.
*/
enableFeedbackLoop?: boolean;
}

/**
Expand All @@ -116,6 +121,7 @@ export class ActionPlanner<TState extends TurnState = TurnState> implements Plan
private readonly _options: ActionPlannerOptions<TState>;
private readonly _promptFactory: ActionPlannerPromptFactory<TState>;
private readonly _defaultPrompt?: string;
private _enableFeedbackLoop: boolean | undefined;

/**
* Creates a new `ActionPlanner` instance.
Expand Down Expand Up @@ -187,6 +193,10 @@ export class ActionPlanner<TState extends TurnState = TurnState> implements Plan
// Identify the augmentation to use
const augmentation = template.augmentation ?? new DefaultAugmentation();

if (ai.enableFeedbackLoop != null) {
this._enableFeedbackLoop = ai.enableFeedbackLoop;
}

// Complete prompt
const result = await this.completePrompt(context, state, template, augmentation);
if (result.status != 'success') {
Expand Down Expand Up @@ -265,7 +275,8 @@ export class ActionPlanner<TState extends TurnState = TurnState> implements Plan
max_repair_attempts: this._options.max_repair_attempts,
logRepairs: this._options.logRepairs,
startStreamingMessage: this._options.startStreamingMessage,
endStreamHandler: this._options.endStreamHandler
endStreamHandler: this._options.endStreamHandler,
enableFeedbackLoop: this._enableFeedbackLoop
});

// Complete prompt
Expand Down
3 changes: 2 additions & 1 deletion js/packages/teams-ai/src/planners/LLMClient.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ describe('LLMClient', function () {
const client = new LLMClient({
model: streamingModel,
template,
startStreamingMessage: 'start'
startStreamingMessage: 'start',
enableFeedbackLoop: true
});
const response = await client.completePrompt(context, state, functions);
assert.equal(adapter.activeQueue.length, 4, 'adapter should have 4 messages in the queue');
Expand Down
14 changes: 14 additions & 0 deletions js/packages/teams-ai/src/planners/LLMClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ export interface LLMClientOptions<TContent = any> {
* Optional handler to run when a stream is about to conclude.
*/
endStreamHandler?: PromptCompletionModelResponseReceivedEvent;

/**
* If true, the feedback loop will be enabled for streaming responses.
*/
enableFeedbackLoop?: boolean;
}

/**
Expand Down Expand Up @@ -200,6 +205,7 @@ export interface ConfiguredLLMClientOptions<TContent = any> {
export class LLMClient<TContent = any> {
private readonly _startStreamingMessage: string | undefined;
private readonly _endStreamHandler: PromptCompletionModelResponseReceivedEvent | undefined;
private readonly _enableFeedbackLoop: boolean | undefined;

/**
* Configured options for this LLMClient instance.
Expand Down Expand Up @@ -234,6 +240,7 @@ export class LLMClient<TContent = any> {

this._startStreamingMessage = options.startStreamingMessage;
this._endStreamHandler = options.endStreamHandler;
this._enableFeedbackLoop = options.enableFeedbackLoop;
}

/**
Expand Down Expand Up @@ -299,6 +306,13 @@ export class LLMClient<TContent = any> {
// Create streamer and send initial message
streamer = new StreamingResponse(context);
memory.setValue('temp.streamer', streamer);

if (this._enableFeedbackLoop != null) {
streamer.setFeedbackLoop(this._enableFeedbackLoop);
}

streamer.setGeneratedByAILabel(true);

if (this._startStreamingMessage) {
streamer.queueInformativeUpdate(this._startStreamingMessage);
}
Expand Down
11 changes: 8 additions & 3 deletions js/samples/04.ai-apps/i.teamsChefBot-streaming/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const model = new OpenAIModel({

// Azure OpenAI Support
// azureApiKey: process.env.AZURE_OPENAI_KEY!,
// azureDefaultDeployment: 'gpt-3.5-turbo',
// azureDefaultDeployment: 'gpt-4o',
// azureEndpoint: process.env.AZURE_OPENAI_ENDPOINT!,
// azureApiVersion: '2023-03-15-preview',

Expand Down Expand Up @@ -141,8 +141,9 @@ const storage = new MemoryStorage();
const app = new Application<ApplicationTurnState>({
storage,
ai: {
planner
}
planner,
enable_feedback_loop: true,
},
});

// Register your data source with planner
Expand Down Expand Up @@ -173,6 +174,10 @@ app.ai.action(AI.FlaggedOutputActionName, async (context: TurnContext, state: Ap
return AI.StopCommandName;
});

app.feedbackLoop(async (context, state, feedbackLoopData) => {
console.log("Feedback loop triggered");
});

// Listen for incoming server requests.
server.post('/api/messages', async (req, res) => {
// Route received a request to adapter for processing
Expand Down

0 comments on commit e2ab8f3

Please sign in to comment.