Solution 1:

I have a CloudWatch log-group-1, kinesis firehose, lambda, S3.

log-group-1 sends logs to kinesis firehose (using subscription filter). Kinesis firehose triggers lambda to process the logs. Lambda returns the logs back to kinesis firehose and kinesis firehose saves transformed logs to S3.

Lambda gets the following input:

{
  "invocationId": "000ac99...",
  "deliveryStreamArn": "arn:aws:firehose:eu-central-1:123456789123:deliverystream/delivery-09",
  "region": "eu-central-1",
  "records": [
    {
      "recordId": "496199814216613477...",
      "approximateArrivalTimestamp": 1625854080200,
      "data": "H4sIAAAAAAAAADWOwQrCM......"
    },
    {
      "recordId": "4961998142166134...",
      "approximateArrivalTimestamp": 1625854100311,
      "data": "H4sIAAAAAAAAADVPy07DMB......"
    }
  ]
}

To return the transformed message you must change the records list. See example:

"records": [
  {
    "recordId": "you better take it from the input",
    "result": "can be Ok, Dropped, ProcessingFailed",
    "data": "must be an encoded base-64 string"
  }
]

I attached a code written in Javascipt. It is enough just to copy-paste it to lambda.

const node_gzip_1 = require("node-gzip");

async function handler(event) {
  console.log('event: ' + JSON.stringify(event, undefined, 3));
  let result = [];

  // Iterate through records list
  const records = event.records;
  for (let ii = 0; ii < records.length; ii++) {
    const record = records[ii];
    const recordId = record.recordId;

    // Transform record data to a human readable string
    const data = record.data;
    const decodedData = Buffer.from(data, 'base64');
    const ungziped = await node_gzip_1.ungzip(decodedData);
    console.log('ungziped: ' + ungziped);

    // Parse record data to JSON
    const dataJson = JSON.parse(ungziped.toString());

    // Get a list of log events and iterate through each element
    const logEventsList = dataJson.logEvents;
    logEventsList.forEach((logEventValue) => {
      // Get the message which was saved in CloudWatch
      const messageString = logEventValue.message;

      // Create the transformed result
      const transformedResultJson = {
        someRandomNumber: Math.random(), // Some random variable I decided to put in the result
        message: messageString + '-my-custom-change' // Edit the message
      };

      // Final data must be encoded to base 64
      const messageBase64 = Buffer.from(JSON.stringify(transformedResultJson) + '\n').toString('base64'); // Adding a new line to transformed result is optional. It just make reading the S3 easier
      console.log('messageBase64: ' + messageBase64);

      // Save transformed result
      result.push({
        recordId: recordId,
        result: 'Ok',
        data: messageBase64
      });
    });
  }

  // Replace initial records list with the transformed list
  event.records = result;
  console.log('new event: ' + JSON.stringify(event, undefined, 2));

  // Returned value will go back to kinesis firehose, then S3
  return event;
}
exports.handler = handler;

Lambda return value is:

{
  "invocationId": "000ac99...",
  "deliveryStreamArn": "arn:aws:firehose:eu-central-1:123456789123:deliverystream/delivery-09",
  "region": "eu-central-1",
  "records": [
    {
      "recordId": "496199814216613477...",
      "result": "Ok",
      "data": "eyJzb21lUmF..."
    },
    {
      "recordId": "4961998142166134...",
      "result": "Ok",
      "data": "eyJzb21lUmFuZG9..."
    }
  ]
}

You can also use a lambda blueprint kinesis-firehose-syslog-to-json.

Also see:

  • https://docs.amazonaws.cn/en_us/firehose/latest/dev/data-transformation.html
  • Kinesis Firehose putting JSON objects in S3 without seperator comma