この記事は hacomono advent calendar 2024 の15日目の記事です
はじめに
こんにちは。SRE部のiwachan(@Diwamoto_)です。
hacomonoに入ってもうすぐ1年が経とうとしています。
記事を書くにあたって、去年と今年でどのくらい活動が変わったかをGithubのContributionの表で調べてみました。
2023年は前職でBitbucketへのコミットだったので記録はありませんが、
2024年はhacomonoに来て最初の年で、ほとんどの時間コードを書いて、書いていない時間もコードを書くためにいっぱい時間を使えました。
さて今回の記事ですが、前回書いた以下の記事の実装をする際に具体的にどういう方法で実現したのか、困った点はどこか等を述べていきたいと思います。
TL;DR
- 開発者がSLOを簡単に設定・管理できる仕組みを作ったよ
- AthenaのSQLと結果取得方法を工夫したよ
- SLOは怖くない!
想定読者
- 前回の記事を見て、結局どうやればDatadogでSLO監視ができるねんとなった方
- terraform / Datadog / AWSの環境下でSLOを管理し、運用したい方
時系列に合わせて述べようと思います。
slo_config.ymlを作った
前回の記事でSLO DocsにSLOの具体的な値をドキュメント化しましたが、その値をプログラムで利用する上で、どこで渡すか?を考える必要がありました。
色々方法はありますが、今回はコード管理することでこれまでの履歴を管理 + エンジニア自身が変更する際に変更しやすくしました。
具体的には下記のようなカラムを持っています。
# どのplatformに送るか send_log_platforms: - 'datadog' # - 'cloudwatch' # sloダッシュボードの名前 slo_dashboard_name: 'pos_slo_dashboard' # latencyのErrorCountを計測するためのカスタムメトリクスの名前 latency_error_count_metric_name: 'pos.elb.latency_error_count' # availabilityのErrorCountを計測するためのカスタムメトリクスの名前 availability_error_count_metric_name: 'pos.elb.availability_error_count' # 全体イベントを記録するためのカスタムメトリクスの名前 total_count_metric_name: 'pos.elb.total_count' # パスに変数が含まれる場合、その変数を置換してグルーピングするためのルール target_path_grouping_rules: - path_pattern: '^/api/users/[^/]+/test$' group_by: 'user_id' path_group: '/api/users/$user_id/test' # 可用性のSLOでエラーとみなすステータスコード availability_error_status_codes: - 403 # Forbidden - 404 # Not Found - 413 # Request Entity Too Large - 429 # Too Many Requests - 460 # ALB Client Closed Request - 500 # Internal Server Error - 502 # Bad Gateway - 503 # Service Unavailable - 504 # Gateway Timeout # SLOの設定 各APIに定めたSLOをここで定義し、プログラムに伝える slo_settings: - path_group: /api/users/$user_id/test description: http_method: GET latency_threshold: 1.0 latency_slo: 99.0 availability_slo: 99.9
エンジニアはこのファイルに対してPRを出すだけでDatadogのSLO周りのリソースが一括でterraformで書き換わるようにしました。
ℹ️ 400と401はバリデーションによって発生することが多いのでエラーコードとしては認識していません。
具体的な手法としては、terraformはyamldecodeでこのファイルを読み込めるため、下記のようにlocalsにconfigファイルを読み込むようにしました。
locals { slo_config = yamldecode(file("${path.module}/slo_config.yml")) } resource "datadog_service_level_objective" "availability" { for_each = { for key, value in local.slo_config["slo_settings"] : value["path_group"] => value } . . .
ALBのログのパーサを作った
今回はAthenaで検索をかけるように作りましたが、構想段階ではS3にあるALBのログファイルをS3のイベント通知を利用してLambdaで処理し、Datadogに送信する想定でした。
その際、Go言語で書かれたログ収集用のLambdaではいい感じのLogのパーサがなかったため、Goのライブラリを自作しました。
EventBridgeがS3のPutObjectを検知し、データからバケットとオブジェクトキーを抽出して、gz形式で圧縮されてるコードを取得し、構造体に落とし込むことができました。
package main import ( "fmt" "compress/gzip" "os" "bufio" albparser "github.com/Diwamoto/alb-log-parser" ) func main() { // Create a new parser instance parser := albparser.NewAlbLogParser() // Open gzipped ALB log file // ALB logs are often in gz format when fetched from S3. f, err := os.Open("alb-logs.gz") if err != nil { fmt.Printf("Error opening file: %v\n", err) return } defer f.Close() // Create gzip reader gz, err := gzip.NewReader(f) if err != nil { fmt.Printf("Error creating gzip reader: %v\n", err) return } defer gz.Close() // Read line by line scanner := bufio.NewScanner(gz) for scanner.Scan() { record, err := parser.ParseAlbLog(scanner.Text()) if err != nil { fmt.Printf("Error parsing log: %v\n", err) continue } // Access the parsed fields fmt.Printf("Request Method: %s\n", record.HttpMethod) fmt.Printf("Status Code: %s\n", record.ElbStatusCode) fmt.Printf("Client IP: %s\n", record.ClientIP) } if err := scanner.Err(); err != nil { fmt.Printf("Error reading file: %v\n", err) } }
Athenaのレスポンスが1000件ずつしか取得できない話
Athenaにはクエリ結果の取得件数に制限があり、1000件ずつしか取得できません。
5分に一回検索をかけるのですが、ログの件数は少ない時でも1000件は超えるため、AthenaのGetQueryResultsだと一回では取れません。
ページングされてるAPIへの対処としてまずはカーソルでの取得が思いつくかと思います。
// slo算出用のクエリを実行し、結果を返す func getLogsFromAthena(ctx context.Context, cfg aws.Config, event Event) ([]AlbLogRecord, error) { client := athena.NewFromConfig(cfg) sloCollectQuery := // slo集計用のクエリを取得してくる // クエリを実行する result, err := client.StartQueryExecution(ctx, &athena.StartQueryExecutionInput{ QueryString: aws.String(sloCollectQuery), QueryExecutionContext: &athena_types.QueryExecutionContext{ Catalog: aws.String(os.Getenv("ATHENA_CATALOG")), Database: aws.String(os.Getenv("ATHENA_DATABASE")), }, ExecutionParameters: []string{ startDate, // Athenaのテーブルのパーティションを指定 startDatetime, // 検索対象の始まり endDatetime // 検索対象の終わり }, WorkGroup: aws.String(os.Getenv("ATHENA_WORKGROUP")), }) if err != nil { return nil, fmt.Errorf("failed to start collect_slo query execution: %v", err) } // athenaのクエリの実行が終わるまでまつ for { status, err := client.GetQueryExecution(ctx, &athena.GetQueryExecutionInput{ QueryExecutionId: result.QueryExecutionId, }) if err != nil { return nil, fmt.Errorf("failed to get query execution status: %v", err) } if status.QueryExecution.Status.State != athena_types.QueryExecutionStateRunning && status.QueryExecution.Status.State != athena_types.QueryExecutionStateQueued { break } } // クエリの結果を取得する ページング含めて resultRows := []AlbLogRecord{} var nextToken *string for { resultData, err := client.GetQueryResults(ctx, &athena.GetQueryResultsInput{ QueryExecutionId: result.QueryExecutionId, NextToken: nextToken, }) if err != nil { return nil, fmt.Errorf("failed to get query results: %v", err) } for _, row := range resultData.ResultSet.Rows { // 取得できた行を使ってゴニョゴニョ } // 取得行数が0になるか、次のページがない場合は終了 if len(resultData.ResultSet.Rows) == 0 || resultData.NextToken == nil { break } nextToken = resultData.NextToken } return resultRows, nil }
上記コードで確かにデータを取得できますが、これだと件数が多くなるにつれてデータ取得がいつまで経っても終わらないという事態になります。
そこで、Athenaのクエリ結果が保存されているS3から直接ファイルを取得することにしました。
CSVファイル形式で保存されているため、これを読み込んでDatadogへ送信できます。
// クエリの結果を取得する ページング含めて の後の部分 s3Client := s3.NewFromConfig(cfg) outputLocation := *status.QueryExecution.ResultConfiguration.OutputLocation bucket := strings.Split(outputLocation, "/")[2] key := strings.Join(strings.Split(outputLocation, "/")[3:], "/") output, err := client.GetObject(ctx, &s3.GetObjectInput{ Bucket: bucket, Key: key, }) if err != nil { return nil, fmt.Errorf("failed to get object from S3: %v", err) } defer output.Body.Close() if output.ContentLength == aws.Int64(0) { return nil, fmt.Errorf("the object is empty") } scanner := bufio.NewScanner(output.Body) var table [][]string for scanner.Scan() { line := scanner.Text() line = strings.ReplaceAll(line, "\"", "") row := strings.Split(line, ",") // さっきやっていたゴニョゴニョ } if err := scanner.Err(); err != nil { return nil, fmt.Errorf("scanner error: %v", err) }
この方法により、Athenaのクエリ実行時間以外のログ取得時間を大幅に削減することができました。
DatadogのSLIの計算式に何を当てはめるのか
SLIの計算は実はかなりシンプルで、「(成功数÷合計数) * 100」で導かれます。
これにより信頼性の指標となるパーセンテージが得られます。
モニタリングの観点からは失敗数をモニタリングした方が都合がいいので、「成功値 = (合計値 - 失敗値)」であることから、
各APIに対して以下の3つのメトリクスを送信することにしました。
- 合計アクセス数:APIへの総リクエスト数
- 可用性違反数:APIのレスポンスコードがslo_config.ymlの中のエラーコードだった数
- レイテンシ違反数:APIのレスポンスタイムがslo_config.ymlの設定値を上回った数
これらのメトリクスをDatadog SLOに設定します。
なぜCloudWatchではなくDatadogを用いたか?
1 元々Datadogも利用していたから
元々マルチテナント環境の監視プラットフォームとしてDatadogを用いており、新たに監視する場所を増やしたくありませんでした。
2 SLO運用に特化した機能が充実
複数ウインドウサイズでのSLO測定やバーンレートの自動計算など、SLO運用に必要な機能が標準搭載されていますが、CloudWatchではこれらの機能を自前で実装する必要があり、実装コスト的にDatadogを用いました。
CloudWatchにもApplication Signalsが追加されSLOに関する機能が追加されつつありますが、言語としてrubyがまだGAされていないため、今回は見送りました。
3 コスト面での優位性
DatadogのProプランでは100カスタムメトリクスまで無料で利用可能です。(参考)
一方、CloudWatchでは1ダッシュボードに月額($3/個)かかることに加え、カスタムメトリクスの料金がDatadogより高い点があり、Datadogを採択しました。
- Datadog → $5 / 100Metric
- CloudWatch → $0.3 / Metric → $30 / 100Metric
4 認証の使いやすさ
- Okta認証による簡単なアクセスが可能
- CloudWatchではAWSアカウントに紐づくため、ダッシュボードを閲覧する際にアカウント切り替えが必要
5 メトリクスの集約、グルーピング
Datadogではタグベースでメトリクスを分類でき、普段は一つのメトリクスをテナントで絞ることができますが、CloudWatchはディメンションとしてメトリクスがそもそも別れて管理することになるので、増えていくテナントに対しては不向きだと考えました。
下記のように、sum by でテナント名を指定することで、一つのメトリクスで複数のメトリクスを管理しているような状態にすることができます。
Datadogのメトリクスで過去との比較を出す方法(小ネタ)
SLOの計測に用いているカスタムメトリクスを用いて、直近の傾向を知るためのダッシュボードを構築しているのですが、メトリクスをそのまま表示しているため、「先週と比べてどうなったんだろう」と言ったシステムの劣化の傾向が掴めないです。
そこで、Datadogのメトリクスのfunctionにあるcalender_shiftを使います。
calender_shiftによって、1週間前(1日前や1年前も出せる)のメトリクスを同時に表示することができます。
これはある日のタイミングの全体アクセス数のグラフですが、先週と比べて傾向にあまり差異がないことがわかります。
もしあるAPIにリリースが入って突然エラーレートが上がったり、レイテンシが悪くなった場合、どこかに障害があってアクセス数が異常値になった時等に、先週との違いがはっきりわかるようになります。
可用性の実際のエラーが起こった際にログをAthenaから検索する方法
今回の実装だと、SLO違反が起こった際にドリルダウンするためのログの検索プラットフォームとしてAthenaを利用する必要があります。
⚠️ Datadog Logsを使ってSLO監視をする場合、この章は使いません。
前回の記事で公開したSLOの具体値決定用SQLを少しカスタマイズし、下記のようなSQLを作成しました。
-- check_availability_error_detail.sql -- 可用性に関するエラー詳細を抽出するSQL -- 出力するカラム: -- service_name (ex. hacomono-xxx) -- time (ex.2024-05-20T02:30:56.888930Z) -- http_method (ex. GET) -- request_uri (ex. /api/auth) -- status_code (ex. 200) -- response_time (ex. 0.123) -- user_agent (ex. Mozilla/5.0 ~~~~~) -- ------------------------------------------------------------------------------- -- parmaeter1 必須: param_date - 日付 : ex. '2024/05/06' : partitionキーなので必ず設定して下さい。 %でワイルドカード指定可能。 -- parmaeter2 必須: param_service_name - サービス名 : ex. 'hacomono-xxx' : サービス名を指定して下さい。 %でワイルドカード指定可能。 -- parmaeter3 必須: param_request_uri - リクエストURI : ex. '/api/auth' : リクエストURIを指定して下さい。 %でワイルドカード指定可能。 WITH params as ( SELECT ? param_date, ? param_service_name, ? param_request_uri ) SELECT { domain_nameをhacomonoの論理テナント名に変更したもの } END as service_name, time, request_verb as http_method, url_extract_path(request_url) as request_uri, elb_status_code as status_code, round(request_processing_time + target_processing_time + response_processing_time, 3) as response_time, user_agent FROM "${table}", params WHERE date LIKE param_date AND request_processing_time >= 0 AND target_processing_time >= 0 AND response_processing_time >= 0 AND request_verb IN ('GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS') AND elb_status_code IN (403,404,413,429,460,500,502,503,504) AND { domain_nameをhacomonoの論理テナント名に変更したもの } LIKE param_service_name AND url_extract_path(request_url) LIKE param_request_uri AND starts_with(url_extract_path(request_url), '/api')
Athenaのパラメータを使うことで、日付 / hacomonoのテナント名 / API名で絞ることができ、検索対象の日にどのテナントでどのAPIがエラーになったかを確認することができます。
SLOのバーンレートが低下した時に、どのテナントでエラーが起こっているのかがこのクエリによって抽出することができるようになりました。
まとめ
今回の記事では、Datadog SLOを使って実際にSLO監視を始める際の実践的な内容を紹介しました。
「SLO サービスレベル目標」の中では、「SLOを求めるシステムは最も信頼性が高い必要がある」と述べられています。
高い信頼性を実現するために、S3のALBのログからの検索にはAthenaを利用し、LambdaやEventBridge等のサーバレスリソースに委譲することで、信頼性を担保しています。
株式会社hacomonoでは一緒に働く仲間を募集しています。
エンジニア採用サイトや採用ウィッシュリストもぜひご覧ください!