Gunosy Tech Blog

Gunosy Tech Blogは株式会社Gunosyのエンジニアが知見を共有する技術ブログです。

goで作るfirehoseのデータ変換lambda

こちらの記事は Gunosy Advent Calendar 2020 の5日目の記事です。 昨日の記事はコウ(@yuanzhi.ke)さんの 新卒入社して半年が経ちました vol.4 〜クーポン改善〜でした!!

おはようございます!こんにちは!こんばんは! 最近は goでlambda ばかり書いている @625 です。 世の中、生きているとlambdaのログをいい感じにaggregateしつつs3に出力しないといけない!みたいな状況、ありますよね。

そんなときには lambda + cloudwatch logs + subscription の firehose が役に立ちます! 全体の説明とかはだいたい以下のawsのドキュメントを見ていただければ良いので、cloudwatch logsのログをgoで書かれたlambdaを使っていい感じの形式に変えてs3に出力したいって部分だけを書きます。 docs.aws.amazon.com

lambdaをgoで書くにあたってどのような入力なのかというところですが、この辺はawsのドキュメント docs.aws.amazon.com を漁るよりも aws-lambda-go のリポジトリを見に行くと幸せになれますね。今回の場合はFirehoseを扱うので下記のREADMEを見ると良さそうです。 github.com

以下で定義されている型を使えば良いことがわかりました。このstructの data の部分にcloudwatchのイベントが入ってきます。 github.com

docs.aws.amazon.comに以下のように書いてあるようにdataの部分にはgzipで圧縮された状態で入ってきます。

You can use a subscription filter with Kinesis, Lambda, or Kinesis Data Firehose. Logs that are sent to a receiving service through a subscription filter are Base64 encoded and compressed with the gzip format.

よしなに以下で解凍しましょう。

func ungzipCloudwatchLogsData(b []byte) (events.CloudwatchLogsData, error) {
    c := events.CloudwatchLogsData{}
    zr, err := gzip.NewReader(bytes.NewBuffer(b))
    if err != nil {
        return c, err
    }
    defer zr.Close()

    dec := json.NewDecoder(zr)
    err = dec.Decode(&c)
    if err != nil {
        return c, err
    }
    return c, nil
}

解凍して変換した後はLogEventsのMessageを好きに加工してresponseに代入するだけです。お手軽ですね!

func convertMessage(c events.CloudwatchLogsData) []byte {
    res := ""
    for _, evt := range c.LogEvents {
        res += evt.Message
    }
    return []byte(res)
}

func handleRequest(ctx context.Context, event events.KinesisFirehoseEvent) (events.KinesisFirehoseResponse, error) {
    response := events.KinesisFirehoseResponse{
        Records: make([]events.KinesisFirehoseResponseRecord, 0, len(event.Records)),
    }

    for _, r := range event.Records {
        d, err := ungzipCloudwatchLogsData(r.Data)
        if err != nil {
            response.Records = append(response.Records, events.KinesisFirehoseResponseRecord{
                RecordID: r.RecordID,
                Result:   events.KinesisFirehoseTransformedStateProcessingFailed,
                Data:     []byte(err.Error()),
            })
            continue
        }

        m := convertMessage(d)
        response.Records = append(response.Records, events.KinesisFirehoseResponseRecord{
            RecordID: r.RecordID,
            Result:   events.KinesisFirehoseTransformedStateOk,
            Data:     m,
        })
    }
    return response, nil
}

func main() {
    lambda.Start(handleRequest)
}

これでcloudwatchに出力したものをfirehoseでaggregateしながら吸い上げてgoで書かれたlambdaを使って変換しs3におくことができました! 世界平和ですね。めでたしめでたし!

次の記事は明日、しゅんけーさんです!おたのしみに!