Kinesis Firehose to ES using a lambda transformation
Solution 1:
I have a CloudWatch log-group-1, kinesis firehose, lambda, S3.
log-group-1 sends logs to kinesis firehose (using subscription filter). Kinesis firehose triggers lambda to process the logs. Lambda returns the logs back to kinesis firehose and kinesis firehose saves transformed logs to S3.
Lambda gets the following input:
{
"invocationId": "000ac99...",
"deliveryStreamArn": "arn:aws:firehose:eu-central-1:123456789123:deliverystream/delivery-09",
"region": "eu-central-1",
"records": [
{
"recordId": "496199814216613477...",
"approximateArrivalTimestamp": 1625854080200,
"data": "H4sIAAAAAAAAADWOwQrCM......"
},
{
"recordId": "4961998142166134...",
"approximateArrivalTimestamp": 1625854100311,
"data": "H4sIAAAAAAAAADVPy07DMB......"
}
]
}
To return the transformed message you must change the records list. See example:
"records": [
{
"recordId": "you better take it from the input",
"result": "can be Ok, Dropped, ProcessingFailed",
"data": "must be an encoded base-64 string"
}
]
I attached a code written in Javascipt. It is enough just to copy-paste it to lambda.
const node_gzip_1 = require("node-gzip");
async function handler(event) {
console.log('event: ' + JSON.stringify(event, undefined, 3));
let result = [];
// Iterate through records list
const records = event.records;
for (let ii = 0; ii < records.length; ii++) {
const record = records[ii];
const recordId = record.recordId;
// Transform record data to a human readable string
const data = record.data;
const decodedData = Buffer.from(data, 'base64');
const ungziped = await node_gzip_1.ungzip(decodedData);
console.log('ungziped: ' + ungziped);
// Parse record data to JSON
const dataJson = JSON.parse(ungziped.toString());
// Get a list of log events and iterate through each element
const logEventsList = dataJson.logEvents;
logEventsList.forEach((logEventValue) => {
// Get the message which was saved in CloudWatch
const messageString = logEventValue.message;
// Create the transformed result
const transformedResultJson = {
someRandomNumber: Math.random(), // Some random variable I decided to put in the result
message: messageString + '-my-custom-change' // Edit the message
};
// Final data must be encoded to base 64
const messageBase64 = Buffer.from(JSON.stringify(transformedResultJson) + '\n').toString('base64'); // Adding a new line to transformed result is optional. It just make reading the S3 easier
console.log('messageBase64: ' + messageBase64);
// Save transformed result
result.push({
recordId: recordId,
result: 'Ok',
data: messageBase64
});
});
}
// Replace initial records list with the transformed list
event.records = result;
console.log('new event: ' + JSON.stringify(event, undefined, 2));
// Returned value will go back to kinesis firehose, then S3
return event;
}
exports.handler = handler;
Lambda return value is:
{
"invocationId": "000ac99...",
"deliveryStreamArn": "arn:aws:firehose:eu-central-1:123456789123:deliverystream/delivery-09",
"region": "eu-central-1",
"records": [
{
"recordId": "496199814216613477...",
"result": "Ok",
"data": "eyJzb21lUmF..."
},
{
"recordId": "4961998142166134...",
"result": "Ok",
"data": "eyJzb21lUmFuZG9..."
}
]
}
You can also use a lambda blueprint kinesis-firehose-syslog-to-json.
Also see:
- https://docs.amazonaws.cn/en_us/firehose/latest/dev/data-transformation.html
- Kinesis Firehose putting JSON objects in S3 without seperator comma