Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(pipes): update integration test, prune readme #31670

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 23 additions & 65 deletions packages/@aws-cdk/aws-pipes-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@


EventBridge Pipes let you create source to target connections between several
aws services. While transporting messages from a source to a target the messages
AWS services. While transporting messages from a source to a target the messages
can be filtered, transformed and enriched.

![diagram of pipes](https://d1.awsstatic.com/product-marketing/EventBridge/Product-Page-Diagram_Amazon-EventBridge-Pipes.cd7961854be4432d63f6158ffd18271d6c9fa3ec.png)

For more details see the service documentation:

[Documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)
For more details see the [service documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html).

## Pipe

Expand All @@ -34,12 +32,12 @@ is a fully managed service that enables point-to-point integrations between
event producers and consumers. Pipes can be used to connect several AWS services
to each other, or to connect AWS services to external services.

A Pipe has a source and a target. The source events can be filtered and enriched
A pipe has a source and a target. The source events can be filtered and enriched
before reaching the target.

## Example - pipe usage

> The following code examples use an example implementation of a [source](#source) and [target](#target). In the future there will be separate packages for the sources and targets.
> The following code examples use an example implementation of a [source](#source) and [target](#target).

To define a pipe you need to create a new `Pipe` construct. The `Pipe` construct needs a source and a target.

Expand Down Expand Up @@ -68,39 +66,20 @@ possible:
- [Self managed Apache Kafka stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kafka.html)
- [Amazon SQS queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html)

> Currently no implementation exist for any of the supported sources. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.

### Example source implementation
### Example source

```ts fixture=pipes-imports
class SqsSource implements pipes.ISource {
sourceArn: string;
sourceParameters = undefined;

constructor(private readonly queue: sqs.Queue) {
this.queue = queue;
this.sourceArn = queue.queueArn;
}

bind(_pipe: pipes.IPipe): pipes.SourceConfig {
return {
sourceParameters: this.sourceParameters,
};
}
```ts
import * as sources from '@aws-cdk/aws-pipes-sources-alpha';

grantRead(pipeRole: cdk.aws_iam.IRole): void {
this.queue.grantConsumeMessages(pipeRole);
}
}
declare const sourceQueue: sqs.Queue;
const pipeSource = new sources.SqsSource(sourceQueue);
```

A source implementation needs to provide the `sourceArn`, `sourceParameters` and grant the pipe role read access to the source.

## Filter

A Filter can be used to filter the events from the source before they are
A filter can be used to filter the events from the source before they are
forwarded to the enrichment or, if no enrichment is present, target step. Multiple filter expressions are possible.
If one of the filter expressions matches the event is forwarded to the enrichment or target step.
If one of the filter expressions matches, the event is forwarded to the enrichment or target step.

### Example - filter usage

Expand Down Expand Up @@ -130,7 +109,7 @@ This example shows a filter that only forwards events with the `customerType` B2

You can define multiple filter pattern which are combined with a logical `OR`.

Additional filter pattern and details can be found in the EventBridge pipes [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html)
Additional filter pattern and details can be found in the EventBridge pipes [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html).

## Input transformation

Expand Down Expand Up @@ -175,7 +154,7 @@ So when the following batch of input events is processed by the pipe
]
```

it is converted into the following payload.
it is converted into the following payload:

```json
[
Expand All @@ -189,7 +168,7 @@ it is converted into the following payload.
]
```

If the transformation is applied to a target it might be converted to a string representation. E.g. the resulting SQS message body looks like this.
If the transformation is applied to a target it might be converted to a string representation. For example, the resulting SQS message body looks like this:

```json
[
Expand Down Expand Up @@ -237,7 +216,7 @@ So when the following batch of input events is processed by the pipe
]
```

it is converted into the following target payload.
it is converted into the following target payload:

```json
[
Expand Down Expand Up @@ -420,37 +399,16 @@ targets are supported:
The target event can be transformed before it is forwarded to the target using
the same input transformation as in the enrichment step.

### Example target implementation

> Currently no implementation exist for any of the supported targets. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.
### Example target

```ts fixture=pipes-imports
class SqsTarget implements pipes.ITarget {
targetArn: string;
private inputTransformation: pipes.InputTransformation | undefined;

constructor(private readonly queue: sqs.Queue, props: {inputTransformation?: pipes.InputTransformation} = {}) {
this.queue = queue;
this.targetArn = queue.queueArn;
this.inputTransformation = props?.inputTransformation
}
```ts
import * as targets from '@aws-cdk/aws-pipes-targets-alpha';

bind(_pipe: pipes.Pipe): pipes.TargetConfig {
return {
targetParameters: {
inputTemplate: this.inputTransformation?.bind(_pipe).inputTemplate,
},
};
}
declare const targetQueue: sqs.Queue;

grantPush(pipeRole: cdk.aws_iam.IRole): void {
this.queue.grantSendMessages(pipeRole);
}
}
const pipeTarget = new targets.SqsTarget(targetQueue);
```

A target implementation needs to provide the `targetArn`, `enrichmentParameters` and grant the pipe role invoke access to the enrichment.

## Log destination

A pipe can produce log events that are forwarded to different log destinations.
Expand All @@ -462,8 +420,7 @@ Whereas the actual destination is defined independent.

### Example log destination implementation

> Currently no implementation exist for any of the supported enrichments. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.

> Currently no implementation exist for any of the supported log configurations. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.

```ts fixture=pipes-imports
class CloudwatchDestination implements pipes.ILogDestination {
Expand Down Expand Up @@ -510,5 +467,6 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
});
```

This example uses a cloudwatch loggroup to store the log emitted during a pipe execution. The log level is set to `TRACE` so all steps of the pipe are logged.
This example uses a CloudWatch Logs log group to store the log emitted during a pipe execution.
The log level is set to `TRACE` so all steps of the pipe are logged.
Additionally all execution data is logged as well.
Loading
Loading