こちらの記事は Gunosy Advent Calendar 2020 の5日目の記事です。 昨日の記事はコウ(@yuanzhi.ke)さんの 新卒入社して半年が経ちました vol.4 〜クーポン改善〜でした!!
おはようございます!こんにちは!こんばんは! 最近は goでlambda ばかり書いている @625 です。 世の中、生きているとlambdaのログをいい感じにaggregateしつつs3に出力しないといけない!みたいな状況、ありますよね。
そんなときには lambda + cloudwatch logs + subscription の firehose が役に立ちます! 全体の説明とかはだいたい以下のawsのドキュメントを見ていただければ良いので、cloudwatch logsのログをgoで書かれたlambdaを使っていい感じの形式に変えてs3に出力したいって部分だけを書きます。 docs.aws.amazon.com
lambdaをgoで書くにあたってどのような入力なのかというところですが、この辺はawsのドキュメント docs.aws.amazon.com を漁るよりも aws-lambda-go のリポジトリを見に行くと幸せになれますね。今回の場合はFirehoseを扱うので下記のREADMEを見ると良さそうです。 github.com
以下で定義されている型を使えば良いことがわかりました。このstructの data
の部分にcloudwatchのイベントが入ってきます。
github.com
docs.aws.amazon.comに以下のように書いてあるようにdataの部分にはgzipで圧縮された状態で入ってきます。
You can use a subscription filter with Kinesis, Lambda, or Kinesis Data Firehose. Logs that are sent to a receiving service through a subscription filter are Base64 encoded and compressed with the gzip format.
よしなに以下で解凍しましょう。
func ungzipCloudwatchLogsData(b []byte) (events.CloudwatchLogsData, error) { c := events.CloudwatchLogsData{} zr, err := gzip.NewReader(bytes.NewBuffer(b)) if err != nil { return c, err } defer zr.Close() dec := json.NewDecoder(zr) err = dec.Decode(&c) if err != nil { return c, err } return c, nil }
解凍して変換した後はLogEventsのMessageを好きに加工してresponseに代入するだけです。お手軽ですね!
func convertMessage(c events.CloudwatchLogsData) []byte { res := "" for _, evt := range c.LogEvents { res += evt.Message } return []byte(res) } func handleRequest(ctx context.Context, event events.KinesisFirehoseEvent) (events.KinesisFirehoseResponse, error) { response := events.KinesisFirehoseResponse{ Records: make([]events.KinesisFirehoseResponseRecord, 0, len(event.Records)), } for _, r := range event.Records { d, err := ungzipCloudwatchLogsData(r.Data) if err != nil { response.Records = append(response.Records, events.KinesisFirehoseResponseRecord{ RecordID: r.RecordID, Result: events.KinesisFirehoseTransformedStateProcessingFailed, Data: []byte(err.Error()), }) continue } m := convertMessage(d) response.Records = append(response.Records, events.KinesisFirehoseResponseRecord{ RecordID: r.RecordID, Result: events.KinesisFirehoseTransformedStateOk, Data: m, }) } return response, nil } func main() { lambda.Start(handleRequest) }
これでcloudwatchに出力したものをfirehoseでaggregateしながら吸い上げてgoで書かれたlambdaを使って変換しs3におくことができました! 世界平和ですね。めでたしめでたし!
次の記事は明日、しゅんけーさんです!おたのしみに!