32
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

S3 LoggingのログをLambda event nortificationで取得・パースしてBigQueryに突っ込む

Last updated at Posted at 2015-11-26

概要

Static Website Hostingを使っている場合にはアクセスログ的な役割を果たす、S3へのリクエストのログを指定したバケットへ吐き出すLogging機能で、対象のバケットに更新通知(Event nortification)を仕込み、Lambdaを起動して取得・パースし、分析できるようにするためにBigQueryに突っ込みます。

事前準備

Lambda実行環境

  • ランタイムはPython2.7
  • 必要なモジュール (カッコ内は実際に使用したバージョン)
    • boto3 (Lambdaデフォルト)
    • pytz (2015.7)
    • gcloud (0.8.0)

S3 LoggingのログフォーマットとBigQueryのスキーマ

ログフォーマット

こちらに書いてます。
https://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html

ログをパースするための正規表現

各ラベルがそのまま次のスキーマにおけるカラム名に対応する感じ。

^(?P<owner>[^ ]+) (?P<bucket>[^ ]+) \[(?P<datetime>.+)\] (?P<remote_ip>[^ ]+) (?P<requester>[^ ]+) (?P<request_id>[^ ]+) (?P<operation>[^ ]+) (?P<key>[^ ]+) "(?P<method>[^ ]+) (?P<uri>[^ ]+) (?P<proto>.+)" (?P<status>[^ ]+) (?P<error>[^ ]+) (?P<bytes>[^ ]+) (?P<size>[^ ]+) (?P<total_time>[^ ]+) (?P<ta_time>[^ ]+) "(?P<referrer>.+)" "(?P<user_agent>.+)" (?P<version>.+)$

スキーマ

フォーマットに合わせてこんな感じ。
datetime カラムはTIMESTAMP型の方が良さそうですが、詳しくは後述しますが事情があり今回はSTRINGにしました。

カラム名
owner STRING
bucket STRING
datetime STRING
remote_ip STRING
requester STRING
request_id STRING
operation STRING
key STRING
method STRING
uri STRING
proto STRING
status STRING
error STRING
bytes INTEGER
size INTEGER
total_time INTEGER
ta_time INTEGER
referrer STRING
user_agent STRING
version STRING

ソース

<your-*>な部分は適宜置き換えてください。

import os
import json
import urllib
import boto3
import re
import datetime
import pytz
from gcloud import bigquery

BQ_PROJECT = '<your-project-id>'
BQ_DATASET = '<your-dataset-name>'
BQ_TABLE = '<your-table-name>'

s3 = boto3.client('s3')
bq = bigquery.Client.from_service_account_json(
    os.path.join(os.path.dirname(__file__), 'bq.json'),
    project=BQ_PROJECT)
dataset = bq.dataset(BQ_DATASET)
table = dataset.table(name=BQ_TABLE)
table.reload()

pattern = ' '.join([
    '^(?P<owner>[^ ]+)',
    '(?P<bucket>[^ ]+)',
    '\[(?P<datetime>.+)\]',
    '(?P<remote_ip>[^ ]+)',
    '(?P<requester>[^ ]+)',
    '(?P<request_id>[^ ]+)',
    '(?P<operation>[^ ]+)',
    '(?P<key>[^ ]+)',
    '"(?P<method>[^ ]+) (?P<uri>[^ ]+) (?P<proto>.+)"',
    '(?P<status>[^ ]+)',
    '(?P<error>[^ ]+)',
    '(?P<bytes>[^ ]+)',
    '(?P<size>[^ ]+)',
    '(?P<total_time>[^ ]+)',
    '(?P<ta_time>[^ ]+)',
    '"(?P<referrer>.+)"',
    '"(?P<user_agent>.+)"',
    '(?P<version>.+)$'])
log_pattern = re.compile(pattern)

def to_int(val):
    try:
        ret = int(val)
    except ValueError:
        ret = None
    return ret

def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8')
    res = s3.get_object(Bucket=bucket, Key=key)
    body = res['Body'].read()
    rows = []

    for line in body.splitlines():
        matches = log_pattern.match(line)
        dt_str = matches.group('datetime').split(' ')[0]
        timestamp = datetime.datetime.strptime(
            dt_str, '%d/%b/%Y:%H:%M:%S').replace(tzinfo=pytz.utc)

        rows.append((
            matches.group('owner'),
            matches.group('bucket'),
            timestamp.strftime('%Y-%m-%d %H:%M:%S'),
            matches.group('remote_ip'),
            matches.group('requester'),
            matches.group('request_id'),
            matches.group('operation'),
            matches.group('key'),
            matches.group('method'),
            matches.group('uri'),
            matches.group('proto'),
            matches.group('status'),
            matches.group('error'),
            to_int(matches.group('bytes')),
            to_int(matches.group('size')),
            to_int(matches.group('total_time')),
            to_int(matches.group('ta_time')),
            matches.group('referrer'),
            matches.group('user_agent'),
            matches.group('version'),))
    print(table.insert_data(rows))

注意事項など

  • GCPのAPI ManagerからJSON形式の認証情報を取得してbq.jsonという名前でLambda functionのデプロイパッケージに含める 1
  • 本体ならdatetimeの部分は%d/%b/%Y:%H:%M:%S %zでパースできるはずだが、Python < 3.2のバグで%zがエラーで使えないのでpytzを絡めた変則的な方法になっている 2
  • 本来はTIMESTAMP型カラムにdatetimeオブジェクトを指定するのが正だが、gcloud (0.8.0)のバグ?なのかBigQueryが秒単位なのに対してミリ秒で保存しようとしてありえない年月日になってしまうので注意
  • ログの各項目は出力すべきデータがない場合は-になる。intに変換したい項目などはそのままint('-')と変換すると例外になるので注意
  • Google Cloud Client for Python(gcloud)の公式ドキュメントは所々古いのでソース見ながらの実装が必要だった(2015.11.26現在)

ちなみに関係ないですが、Lambda for Pythonのデプロイは拙作ですが下記を使っています。
まだまだ開発中ですが普通に使ってて便利な感じにはなってきてるので良かったらお試しください。
https://github.com/marcy-terui/lamvery

  1. Lambdaにおける機密情報の気の利いた受け渡し方求む

  2. 早くPython3対応して欲しいところ

32
30
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
32
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?