This package contains classes (suffixed InputPreprocessing) that can be used to perform preprocessing of streaming data before ingesting into Kinesis Analytics applications. This package also contains classes (suffixed OutputDelivery) that can be used to send processing results from Kinesis Analytics applications to a Lambda function.
Sample function to perform preprocessing of streaming data before ingesting into Kinesis Analytics Applications
public class Function
{
public KinesisAnalyticsInputPreprocessingResponse FunctionHandler(KinesisAnalyticsStreamsInputPreprocessingEvent evnt, ILambdaContext context)
{
context.Logger.LogLine($"InvocationId: {evnt.InvocationId}");
context.Logger.LogLine($"StreamArn: {evnt.StreamArn}");
context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}");
var response = new KinesisAnalyticsInputPreprocessingResponse
{
Records = new List<KinesisAnalyticsInputPreprocessingResponse.Record>()
};
foreach (var record in evnt.Records)
{
context.Logger.LogLine($"\tRecordId: {record.RecordId}");
context.Logger.LogLine($"\tShardId: {record.RecordMetadata.ShardId}");
context.Logger.LogLine($"\tPartitionKey: {record.RecordMetadata.PartitionKey}");
context.Logger.LogLine($"\tRecord ApproximateArrivalTime: {record.RecordMetadata.ApproximateArrivalTimestamp}");
context.Logger.LogLine($"\tData: {record.DecodeData()}");
// Add your record preprocessig logic here.
var preprocessedRecord = new KinesisAnalyticsInputPreprocessingResponse.Record
{
RecordId = record.RecordId,
Result = KinesisAnalyticsInputPreprocessingResponse.OK
};
preprocessedRecord.EncodeData(record.DecodeData().ToUpperInvariant());
response.Records.Add(preprocessedRecord);
}
return response;
}
}
public class Function
{
public KinesisAnalyticsOutputDeliveryResponse FunctionHandler(KinesisAnalyticsOutputDeliveryEvent evnt, ILambdaContext context)
{
context.Logger.LogLine($"InvocationId: {evnt.InvocationId}");
context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}");
var response = new KinesisAnalyticsOutputDeliveryResponse
{
Records = new List<KinesisAnalyticsOutputDeliveryResponse.Record>()
};
foreach (var record in evnt.Records)
{
context.Logger.LogLine($"\tRecordId: {record.RecordId}");
context.Logger.LogLine($"\tRetryHint: {record.RecordMetadata.RetryHint}");
context.Logger.LogLine($"\tData: {record.DecodeData()}");
// Add logic here to send to the record to final destination of your choice.
var deliveredRecord = new KinesisAnalyticsOutputDeliveryResponse.Record
{
RecordId = record.RecordId,
Result = KinesisAnalyticsOutputDeliveryResponse.OK
};
response.Records.Add(deliveredRecord);
}
return response;
}
}