“Distributed computing (Apache Hadoop, Spark, …) Advent Calendar 2016” の 12/19 担当ということで、Spark 2.0 on EMR で Spark Streaming と Structured Streaming をやってみた結果を書きます。
この記事では Spark 2.0 で、現在アルファ版の Structured Streaming をやってみます。
Structured Streaming とは、Spark SQL エンジンで実現されたストリーム処理の仕組みです。
従来型の Spark Streaming は RDD を DStream とよばれる Spark Streaming 特有のモデルを導入して扱うのに対して、Structured Streaming では Spark SQL で広く使用されている Dataset/DataFrame API により同一のプログラミングモデルで扱うことができ、Spark SQL のバッチジョブと同じ書き方で Streaming 処理を実現できます。
また、”Structured” ということでスキーマをもっているため、例えばデータの到着時刻ではなく出力時刻ベースで集計することができたりします。
Spark 2.0 時点ではアルファリリースということでプロダクションへの適用は推奨されていませんが、将来的には Spark のストリーム処理の標準となっていくようです。
この記事の流れとしては、まずは従来型の Spark Streaming をやってみて、その次に Structured Streaming をやってみます。
Spark 2.0 を準備します。
といっても、EMR 5.2.0 (現在の最新バージョン) で Application に Spark 2.0.2 を選択して起動するだけです。
今回は、マスターノード m4.xlarge、コアノード m4.xlarge x 2 の構成で検証しています。
SSH でマスターノードに接続するとこんな感じになります。
Using username "hadoop". Authenticating with public key "imported-openssh-key" Last login: Sat Dec 17 14:35:45 2016 __| __|_ ) _| ( / Amazon Linux AMI ___|\___|___| https://aws.amazon.com/amazon-linux-ami/2016.09-release-notes/ 19 package(s) needed for security, out of 30 available Run "sudo yum update" to apply all updates. EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R E::::E M::::::M:::M M:::M::::::M R:::R R::::R E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R E::::E M:::::M M:::M M:::::M R:::R R::::R E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR [hadoop@ip-172-31-28-103 ~]$
今回は REPL で検証しますので、spark-shell を起動します。
[hadoop@ip-172-31-28-103 ~]$ spark-shell Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). 16/12/17 14:52:20 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. 16/12/17 14:52:31 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect. Spark context Web UI available at http://172.31.28.103:4040 Spark context available as 'sc' (master = yarn, app id = application_1481792128181_0003). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.0.2 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_111) Type in expressions to have them evaluated. Type :help for more information. scala>
まず、spark-shell に以下のようなソースコードを投入します。
import org.apache.spark._ import org.apache.spark.streaming._ val ssc = new StreamingContext(sc, Seconds(15)) val lines = ssc.socketTextStream("ip-172-31-28-103", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print()
このとき以下に注意。
ここまで準備できたら、別の端末を開いて Netcat でつなぎます。
$ nc -lk 9999
では、spark-shell の端末に戻って ssc.start します。
spark-shell> ssc.start()
Netcat 側の端末で例えば “If you want to test Spark on EMR please launch EMR and choose Spark then you can get Spark environment quickly.” と入力すると、spark-shell 側の端末には以下のように表示されます。
------------------------------------------- Time: 1481988480000 ms ------------------------------------------- (can,1) (quickly.,1) (get,1) (,1) (EMR,2) (please,1) (want,1) (test,1) (Spark,3) (you,2) ...
ちゃんと WordCount できてますね。
次に、本日のお題の Structured Streaming を試します。
spark-shell に以下を流し込みます。
import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate() import spark.implicits._ val lines = spark.readStream.format("socket").option("host", "ip-172-31-28-103").option("port", 9999).load() val words = lines.as[String].flatMap(_.split(" ")) val wordCounts = words.groupBy("value").count() val query = wordCounts.writeStream.outputMode("complete").format("console").start() query.awaitTermination()
Spark Streaming のときと同じように Netcat でつないで文字列を入力すると、以下のようになります。
------------------------------------------- Batch: 2 ------------------------------------------- 16/12/17 15:40:10 WARN TextSocketSource: Stream closed by ip-172-31-28-103:9999 +-----------+-----+ | value|count| +-----------+-----+ | launch| 1| | If| 1| | quickly.| 1| | you| 2| | can| 1| | please| 1| | EMR| 2| | hello| 1| | on| 1| | choose| 1| | want| 1| | and| 1| | world| 1| | get| 1| | Spark| 3| | | 2| | then| 1| |environment| 1| | test| 1| | to| 1| +-----------+-----+
WordCountできました。
今回は従来型の Spark Streaming と Structured Streaming を試しました。
Structured Streaming は従来型の Spark Streaming の DStream を使った方法に比べて Dataset/DataFrame API で記述できる分、ソースも書きやすいし可読性も上がるしいい感じです。
まだアルファリリースとのことで、これから様々な改善が入るかと思います。
Spark JIRA の Structured Streaming の Issue はこちらから確認できますので動向をウォッチしつつ何かあれば報告したりできるとよいかと。
AWSのログで遊ぼうシリーズ第4弾 – Data Pipeline x Redshift。
Redshiftは第1弾~第3弾で説明した通り、COPYコマンドでデータをロードすることができるので、単純にロードだけを考えるならRedshiftを単独で使うだけで十分です。
でも、Data Pipelineと連携して使うと、スケジューリングやエラー処理、通知など高度なデータ連携が可能です。
今回はData Pipelineを使って、ELB、S3、CloudFrontのログをRedshiftにロードします。
Data Pipelineは簡単に説明するとデータ連携のためのサービスです。
GUIで表現されたマップ上でアイコンを配置していき、簡単にデータ連携を定義できます。
GUIで定義したPipelineは内部的にJSONで表現され、1クリックでJSONをExportできます。
ただ、個人的にはGUIではなく生のJSONを直接扱うほうが好きなので、今回はJSONベースで説明していきます。
Data Pipelineは2013年11月にRedshiftに対応し、直近では2014年10月にさらなる機能改善が実施されました。
Data PipelineでRedshiftを扱おうとすると下記の3種類のオブジェクトが必要になります。
RedshiftDataNodeではRedshift Cluster上のテーブルを設定します。
各種ログをロードする対象のRedshift Clusterを定義します。
RedshiftCopyActivityでは、CSV/TSVのファイルをRedshiftにCOPYします。
ちなみに、今回ターゲットにしているログのうち、ELBログとS3ログが空白区切りであるため、そのままでは使用できません。
ただし、2014年10月から新たに導入されたcommandOptionsを使えばRedshiftのCOPY文で使用する各種オプションを利用できるので、delimiterを指定したCOPYが可能になります。
データロード先のRedshift Clusterをあらかじめ作成します。
テーブルはPipeline処理の内部で作成するため、事前に作成する必要はありません。
{ "id": "S3DataNodeElbLog", "name": "S3DataNodeElbLog", "type": "S3DataNode", "directoryPath": "s3://path_to_elb_log/", "schedule": { "ref": "ScheduleId1" } },
{ "id": "S3DataNodeS3Log", "name": "S3DataNodeS3Log", "type": "S3DataNode", "directoryPath": "s3://path_to_s3_log/", "schedule": { "ref": "ScheduleId1" } },
{ "id": "S3DataNodeCloudFrontLog", "name": "S3DataNodeCloudFrontLog", "type": "S3DataNode", "directoryPath": "s3://path_to_cloudfront_log/", "schedule": { "ref": "ScheduleId1" } },
ポイントはdataFormatを指定していない点です。
CloudFrontのログはTSV形式で出力されるため、本来S3DataNodeではビルトインのdataFormatであるTSVのオブジェクトを指定すればよいはずです。
しかし、後述のRedshiftCopyActivityでcommandOptionsを使用したいため、あえてdataFormatの指定を外しています。
各種ログをロードする対象のRedshift Clusterを定義します。
今回は3種類いずれも同一のデータベースにロードするため、共用のRedshiftDatabaseを下記のように定義します。
{ "id": "RedshiftDatabaseId1", "name": "DefaultRedshiftDatabase1", "type": "RedshiftDatabase", "clusterId": "hoge", "databaseName": "dev", "username": "hogehoge", "*password": "fugafuga" },
clusterId, databaseName, username, passwordを指定していることからも、Redshift ClusterがData PipelineのオブジェクトとしてのRedshiftDatabaseに対応することがわかるかと思います。
ELBログ用に、RedshiftDataNodeとRedshiftCopyActivityを設定します。
RedshiftDataNodeではRedshift Cluster上のテーブルを設定し、RedshiftCopyActivityでは実際にロードする際に使用するCOPYコマンドを設定するイメージです。
{ "id": "RedshiftDataNodeElbLog", "name": "RedshiftDataNodeElbLog", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" }, "tableName": "elb_access_logs", "createTableSql": "CREATE TABLE elb_access_logs ( request_timestamp DateTime encode lzo, elb varchar(30) encode lzo, client_port varchar(22) encode lzo, backend_port varchar(22) encode lzo, request_processing_time FLOAT encode bytedict, backend_processing_time FLOAT encode bytedict, response_prosessing_time FLOAT encode bytedict, elb_status_code varchar(3) encode lzo, backend_status_code varchar(3) encode lzo, received_bytes BIGINT encode lzo, sent_bytes BIGINT encode lzo, request varchar(MAX) encode lzo)sortkey(request_timestamp) ;", "schedule": { "ref": "ScheduleId1" } },
{ "id": "RedshiftCopyActivityElbLog", "name": "RedshiftCopyActivityElbLog", "type": "RedshiftCopyActivity", "input": { "ref": "S3DataNodeElbLog" }, "commandOptions": [ "DELIMITER ' '", "TRUNCATECOLUMNS", "TRIMBLANKS", "REMOVEQUOTES", "TIMEFORMAT as 'auto'", "ACCEPTINVCHARS", "MAXERROR as 100000" ], "insertMode": "TRUNCATE", "runsOn": { "ref": "Ec2ResourceId1" }, "schedule": { "ref": "ScheduleId1" }, "output": { "ref": "RedshiftDataNodeElbLog" } },
ポイントはcommandOptionsでdelimiterを明示的に指定している点です。
S3ログ用に、RedshiftDataNodeとRedshiftCopyActivityを設定します。
大体、ELBログの定義と同じようなイメージになります。
{ "id": "RedshiftDataNodeS3Log", "name": "RedshiftDataNodeS3Log", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" }, "tableName": "s3_access_logs", "createTableSql": "CREATE TABLE s3_access_logs ( bucket_owner VARCHAR(MAX) ENCODE RUNLENGTH, bucket VARCHAR(255) ENCODE LZO, request_timestamp VARCHAR(MAX) SORTKEY ENCODE LZO, request_timestamp_delta VARCHAR(MAX) ENCODE LZO, remote_ip VARCHAR(50) ENCODE LZO, requestor VARCHAR(MAX) ENCODE LZO, request_id VARCHAR(MAX) ENCODE LZO, operation VARCHAR(MAX) ENCODE LZO, key VARCHAR(MAX) ENCODE LZO, request_uri VARCHAR(MAX) DISTKEY ENCODE LZO, http_status_code VARCHAR(MAX) ENCODE LZO, error_code VARCHAR(MAX) ENCODE LZO, sent_bytes VARCHAR(MAX) ENCODE LZO, object_size VARCHAR(MAX) ENCODE LZO, total_time VARCHAR(MAX) ENCODE LZO, turn_around_time VARCHAR(MAX) ENCODE LZO, referer VARCHAR(MAX) ENCODE LZO, user_agent VARCHAR(MAX) ENCODE LZO, version_id VARCHAR(10) ENCODE LZO);", "schedule": { "ref": "ScheduleId1" } },
{ "id": "RedshiftCopyActivityS3Log", "name": "RedshiftCopyActivityS3Log", "type": "RedshiftCopyActivity", "input": { "ref": "S3DataNodeS3Log" }, "commandOptions": [ "DELIMITER ' '", "TRUNCATECOLUMNS", "TRIMBLANKS", "REMOVEQUOTES", "ACCEPTINVCHARS", "MAXERROR as 100000" ], "insertMode": "TRUNCATE", "runsOn": { "ref": "Ec2ResourceId1" }, "schedule": { "ref": "ScheduleId1" }, "output": { "ref": "RedshiftDataNodeS3Log" } },
ポイントはcommandOptionsでdelimiterを明示的に指定している点です。
CloudFrontログ用に、RedshiftDataNodeとRedshiftCopyActivityを設定します。
そろそろ慣れてきますね。
{ "id": "RedshiftDataNodeCloudFrontLog", "name": "RedshiftDataNodeCloudFrontLog", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" }, "tableName": "cloudfront_access_logs", "createTableSql": "CREATE TABLE cloudfront_access_logs ( request_date VARCHAR(MAX) SORTKEY, request_time VARCHAR(MAX) ENCODE LZO, x_edge_location VARCHAR(40) ENCODE LZO, sc_bytes INT ENCODE LZO, remote_ip VARCHAR(50) ENCODE LZO, cs_method VARCHAR(50) ENCODE LZO, cs_host VARCHAR(MAX) ENCODE LZO, cs_uri_stem VARCHAR(MAX) DISTKEY ENCODE LZO, sc_status VARCHAR(20) ENCODE LZO, cs_referrer VARCHAR(MAX) ENCODE LZO, cs_useragent VARCHAR(MAX) ENCODE LZO, cs_uri_query VARCHAR(MAX) ENCODE LZO, cs_cookie VARCHAR(MAX) ENCODE LZO, x_edge_result_type VARCHAR(MAX) ENCODE LZO, x_edge_request_id VARCHAR(MAX) ENCODE LZO, x_host_header VARCHAR(MAX) ENCODE LZO, cs_protocol VARCHAR(10) ENCODE LZO, cs_bytes INT ENCODE LZO, time_taken VARCHAR(MAX) ENCODE LZO);", "schedule": { "ref": "ScheduleId1" } },
{ "id": "RedshiftCopyActivityCloudFrontLog", "name": "RedshiftCopyActivityCloudFrontLog", "type": "RedshiftCopyActivity", "input": { "ref": "S3DataNodeCloudFrontLog" }, "commandOptions": [ "DELIMITER '\t'", "IGNOREHEADER 2", "TRUNCATECOLUMNS", "TRIMBLANKS", "ACCEPTINVCHARS", "MAXERROR as 100000", "gzip" ], "insertMode": "TRUNCATE", "runsOn": { "ref": "Ec2ResourceId1" }, "schedule": { "ref": "ScheduleId1" }, "output": { "ref": "RedshiftDataNodeCloudFrontLog" } },
ポイントはELBやS3の場合と同じように、commandOptionsを指定している点です。
CloudFrontのログはTSV形式で出力されるため、inputに指定するS3DataNodeにビルトインのdataFormatであるTSVのオブジェクトが指定されていれば、commandOptionsは不要です。
逆に、inputのS3DataNodeに何らかのdataFormatが指定されていると、commandOptionsが無効となります。
今回は、CloudFrontのログをパースするためにcommandOptions(特にIGNOREHEADERとgzip)を使用したいため、あえてこのようにしています。
{ "id": "Default", "scheduleType": "TIMESERIES", "failureAndRerunMode": "CASCADE", "name": "Default", "pipelineLogUri": "s3://path_to_log", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" },
Pipeline定義には必須です。
{ "id": "ScheduleId1", "name": "DefaultSchedule1", "startAt": "FIRST_ACTIVATION_DATE_TIME", "type": "Schedule", "period": "15 Minutes" },
Pipeline定義には必須です。
{ "id": "Ec2ResourceId1", "name": "DefaultEc2Resource1", "type": "Ec2Resource", "terminateAfter": "1 HOURS", "schedule": { "ref": "ScheduleId1" }, "securityGroups": "default", "logUri": "s3://path_to_log", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" },
RedshiftCopyActivityを使う上で、Activityを実行するResourceとして必要になります。
ハマりがちなポイントとして、Ec2ResourceのsecurityGroupsに指定するのはSecurity Group IDではなくSecurity Group Nameなのでご注意を。
以上、長々と説明してきましたが、最終的に完成したPipeline定義をGistに用意しました。
$ aws datapipeline create-pipeline --name "elb2redshift" --unique-id "elb2redshift" { "pipelineId": "df-00901072TQD44FKNKAR3" } $ aws datapipeline put-pipeline-definition --pipeline-id df-00901072TQD44FKNKAR3 --pipeline-definition file://./elblog2redshift.json $ aws datapipeline activate-pipeline --pipeline-id df-00901072TQD44FKNKAR3
あとは実際にPipelineが実行されるのを15分ほど待ちます。
Data PipelineコンソールでPipelineの実行状況を確認します。
こんな感じで、StatusがFINISHEDになっていればPipelineの実行は正常に完了しています。
あとはインポート先のRedshiftクラスタにアクセスして煮るなり焼くなり好きにクエリすればいいかと。
この記事はPostgreSQL Advent Calendar 2014の12/15のエントリです。
今日15日目は、PostgreSQLといっても特にRDS for PostgreSQLをターゲットに、運用では欠かせない”監視”に焦点を当ててみます。
通常のサーバーで動作するPostgreSQLでは、サーバーのOSレベルの死活・パフォーマンスの監視に加え、RDBMSサービスとしての正常性の監視をするのが一般的かと思われます。
一方、RDS for PostgreSQLではサーバーのOSレベルのアクセスは制限されているため、RDSならではのちょっとしたコツが必要になります。
今回は、RDS for PostgreSQLならではの監視について考えていきます。
RDS for PostgreSQLの監視では、次の2層の監視がポイントです。
それぞれ考えてみましょう。
DBインスタンスレイヤーの監視では、次の4つのポイントがあります。
DBインスタンスの現在の状態は、DescribeDBInstances APIでDBInstanceStatusとして取得できます。
AWS CLIでdescribe-db-instancesコマンド経由でAPIを発行するとこんな感じになります。
$ aws rds describe-db-instances | grep DBInstanceStatus "DBInstanceStatus": "available",
DBInstanceStatusの一覧はこちらのページにあります。
Statusの名前だけ列挙するとこんな感じです。
このように、14種類の状態が定義されています。
手っ取り早く、availableであるかどうかを見るのもよいかもしれません。
次に、DBインスタンス上で発生したフェイルオーバーやバックアップなどのイベントを監視する方法を考えます。
これらは、RDSではEventという項目で記録されていきます。
Amazon RDS のイベントの表示 – Amazon Relational Database Service ユーザーガイド
RDSでは、イベントが発生した際にSNSで通知を発行することができます。
これを監視することでイベントの発生を把握することができます。
本エントリでは詳細は省きますが、詳細はこちら。
Amazon RDS イベント通知の使用 – Amazon Relational Database Service ユーザーガイド
DBインスタンスのパフォーマンスはCloudWatchで監視できます。
DB インスタンスのメトリックスの表示 – Amazon Relational Database Service ユーザーガイド
現在RDSで提供されているCloudWatch Metricsはこちら。
項目だけ抜粋すると下記があります。
RDSはCloudTrailをサポートしているので、RDSに関するAPI callはCloudTrailで記録されます。
そこで、CloudTrailに記録された、DBインスタンスに関連するAPI callを監視することで、意図せぬ操作等を監視することができます。
監視方法としておすすめなのは、CloudWatch Logsですね。
CloudTrailはCloudWatch Logsに連携しているので、こちらを利用することで手間なく監視が可能です。
詳細はこちら。
【AWS発表】CloudTrail と CloudWatch Logs が連携。2つのパートナーソリューションの紹介 – Amazon Web Services ブログ
DBエンジン・データベースレイヤーの監視では、次の3つのポイントがあります。
DBエンジン・データベースの現在の死活状況を確認するのに最も手っ取り早く確実な手段は、実際にそのプロトコルで接続しSQLを実行して結果が問題ないか確認する方法です。
DBインスタンスのパフォーマンスで省略されているような、個別のDBエンジン固有の値等についても、要件やユースケースに応じて監視を検討する必要があるでしょう。
DBエンジン・データベースのログは、トラブルシューティングのために重要です。
RDS for PostgreSQLでは、error/postgresql.log.YYYY-MM-DD-nn が該当します。
これまで挙げてきた項目のうち下記の項目の監視について、CloudWatch カスタムメトリクスを使って監視するためのスクリプト”aws-mon-pgsql“を作ってみました。
スクリプト自体は今年の4月に作ったものを改善する形で今回のエントリ用に準備しました。
詳細はGitHubのREADME.mdに書きましたが、このエントリでも簡単に説明していきます。
まずは、aws-mon-pgsql.shをダウンロードします。
git clone https://github.com/moomindani/aws-mon-pgsql.git cd aws-mon-pgsql
ちなみに、今回はgit cloneしていますが、ただダウンロードするだけでもオッケーです。
次に、PostgreSQLにパスワードなしで接続できるように、.pgpassを用意します。
echo "postgres.xxx.ap-northeast-1.rds.amazonaws.com:5432:postgres:postgres:password" > ~/.pgpass chmod 600 ~/.pgpass
あとはスクリプトを実行するだけです。
DBInstanceStatusがavailableかどうかチェックするには、–statusオプションを指定します。
$ ~/aws-mon-pgsql/aws-mon-pgsql.sh --id postgresql -h postgresql.ca6md0qizbcz.ap-northeast-1.rds.amazonaws.com -p 5432 -U test -d test --status --verbose status:0
“available”の場合、結果が”0″となります。
DBエンジン・データベースがクエリ実行可能な状態かチェックするには、–query-executionオプションを指定します。
$ ~/aws-mon-pgsql/aws-mon-pgsql.sh --id postgresql -h postgresql.ca6md0qizbcz.ap-northeast-1.rds.amazonaws.com -p 5432 -U test -d test --query-execution --verbose query_execution:0
SQL”SELECT 1 for UPDATE”の結果が正常に得られた場合、結果が”0″となります。
DBエンジン・データベースのキャッシュヒット率をチェックするには、–cache-hiitオプションを指定します。
$ ~/aws-mon-pgsql/aws-mon-pgsql.sh --id postgresql -h postgresql.ca6md0qizbcz.ap-northeast-1.rds.amazonaws.com -p 5432 -U test -d test --cache-hit --verbose cache_hit:99.000
crontabを設定します。
crontab -e
全項目をチェックするには、–all-itemsオプションを指定します。
*/5 * * * * ~/aws-mon-pgsql/aws-mon-pgsql.sh --id postgresql -h postgresql.ca6md0qizbcz.ap-northeast-1.rds.amazonaws.com -p 5432 -U test -d test --all-items --from-cron
これで、5分ごとに全項目を監視し、結果をCloudWatch カスタムメトリクスとしてアップロードするようになります。
(利用料金にはご注意を!)
関連エントリ: Amazon RDS for PostgreSQLのキャッシュヒット率, セッション数等をCloudWatchで監視するスクリプトを作ってみた
]]>この記事はAWS Lambda Advent Calendar 2014の12/8のエントリです。
昨日7日目のエントリはmaroon1stさんの「今後のLambdaの機能拡張に対する要望を挙げてみた – AWS Lambda Advent Calendar 2014:7日目」でした。
今日8日目は、LambdaでS3上に出力されたログをCloudWatch Logsに取り込んで監視してみます。
CloudWatch Logsはログの蓄積や監視を実現するためのサービスとして、2014年7月にリリースされました。
現時点では、Linuxでは任意のログファイルの監視に、Windowsでは任意のログファイルに加えてWindows Eventlog・Event Tracing for Windowsの監視にも対応し、CloudWatch Logsは既にEC2インスタンス内部のログの大部分に対応していると言えるでしょう。
また、2014年11月にはCloudTrailで記録したAPI callのログ(Trail)をCloudWatch Logsに投入することができるようになりました。
でもAWSでは他にもログがあります。AWSのサービス自体が出力するログです。
ELBやS3、CloudFrontなど、AWSのサービス自体が出力するログの多くはS3上に出力されます。
今回はこれらのS3上に出力されるログをターゲットに、Lambdaを使ってCloudWatch Logsに取り込んで監視する方法を考えてみます。
何はともあれ、まずはLambdaコードです。
var aws = require('aws-sdk'); var s3 = new aws.S3({apiVersion: '2006-03-01'}); var cloudwatchlogs = new aws.CloudWatchLogs({region: 'us-east-1'}); exports.handler = function(event, context) { var bucket = event.Records[0].s3.bucket.name; var key = event.Records[0].s3.object.key; s3.getObject({Bucket:bucket, Key:key}, function(err,data) { if (err) { console.log('error getting object ' + key + ' from bucket ' + bucket + '. Make sure they exist and your bucket is in the same region as this function.'); context.done('error','error getting file'+err); } else { var contentEncoding = data.ContentEncoding; var logEvents = []; var contentBody = data.Body.toString(contentEncoding); contentBody.split('\n').forEach(function (line) { var logEvent = { message: line, timestamp: new Date().getTime() } if (line.length > 0){ logEvents.push(logEvent); } }); var param = { logEvents: logEvents, logGroupName: 'hoge-group', logStreamName: 'fuga-stream', sequenceToken: null }; var request = cloudwatchlogs.putLogEvents(param, function(err, data) { if (err) { if (err['code'] == 'InvalidSequenceTokenException'){ var nextSequenceToken = err['message'].match(/[0-9]+/).pop(); console.log ('nextSequenceToken = '+nextSequenceToken); param['sequenceToken'] = nextSequenceToken; var retryRequest = cloudwatchlogs.putLogEvents(param, function(err, data) { if (err) { context.done("retryRequest error", err); } else { context.done(null,'put retry succeeded'); } }); } else { context.done("unknown error", err); } } else { context.done(null,'put succeeded'); } }); } } ); };
簡単に処理を解説すると、下記のような感じ。
このような処理の都合上、改行区切りのログにのみ対応しています。
ELBやS3、CloudFrontのログは皆改行区切りなので大丈夫ですね。
また、Log EventにはLog Eventをつくるときのタイムスタンプを使っていて、ログの行に含まれるタイムスタンプをパースして使っているわけではありません。
このため、Log Eventのタイムスタンプにログの正確な日時を挿入したい場合は、ログに合わせてパースするなどの工夫が必要になります。
試しに、S3のアクセスログを出力するように設定したS3バケットをEvent sourceに設定し、動作を確認してみます。
ガンガン蓄積されているのがわかります。
(S3に出力されるまでにしばらくかかるのでご注意を。)
あとは、Log GroupにMetric Filterを用意して、CloudWatch Metricsにアラート設定すれば、
S3に出力されたログをイベントドリブンで監視できますね。
今回の記事を書くにあたり、いくつかハマったポイントがあったので、いくつかまとめてみます。
当初、Lambdaのイベントハンドラ内で実行したリクエストが完了していないにも関わらずプロセスが終了してしまいました。
コンソールにはこんなログが表示されていました。たしかにリクエストが正常に完了している形跡がありません。
Process exited before completing request
これを回避するには、リクエストが完了したタイミングでちゃんとcontext.doneが呼ばれるようにしてあげます。
今回のLambdaコードを例にあげて説明します。
var request = cloudwatchlogs.putLogEvents(param, function(err, data) { // main }
var request = cloudwatchlogs.putLogEvents(param, function(err, data) { // main } context.done(null,'success');
こうすると、putLogEventsのリクエストが完了してcall back functionが実行される前にcontext.doneが実行されてしまいプロセスが停止してしまいます。
var request = cloudwatchlogs.putLogEvents(param, function(err, data) { // main context.done(null,'success'); }
このようにすることで、ちゃんとcall back functionが実行されてからcontext.doneが呼ばれるようにしましょう。
if-else文等を使っている場合等、すべてのパスでcontext.doneが正しく実行されるようにご注意を。
これはLambdaというよりはnode.jsのノウハウになります。
今回のLambdaコードでもdataやerrなどいくつかオブジェクトが登場します。
デバッグ時にはオブジェクトの中身を全部見たいですよね。そこでこんなコードを書きがちです。
console.log ('error : '+err);
これを実行するとerrの中身が全部見える・・・かと思いきや、”Object”と省略されてしまい深い階層のプロパティが見えないことに気づきます。
そんなとき、util.inspectを使いましょう。
まずはrequire句で読み込みます。(Lambda環境で追加でモジュールをインストールする必要はありません。)
var util = require('util')
ログ出力時はこんな感じです。
console.log ('error : '+util.inspect(err,false,null));
これを実行するとerrの中身が全部見えます。トラブルシューティングに便利ですね。
]]>AWSのログで遊ぼうシリーズ第3弾 – CloudFront x Redshift。
今回はCloudFrontのアクセスログを対象に、Redshiftにロードしてみます。
方法は下記のドキュメント参照。
CloudFrontのアクセスログは指定したS3上の下記のパスに格納されます。
{bucket-name}.s3.amazonaws.com/{optional-prefix/}{distribution-ID}.{YYYY}-{MM}-{DD}-{HH}.{unique-ID}.gz
ELB/S3のアクセスログとは異なり、圧縮済み(.gz)で出力されますのでご注意ください。
詳細は下記のドキュメント参照。
CloudFrontのアクセスログは、タブ区切りで1行に1リクエストが記録されます。
ウェブディストリビューションの場合、各行には下記のフィールドが含まれます。(公式ドキュメントの抜粋です。)
フィールド名 | 説明 |
---|---|
request_date | イベントが発生した日付。yyyy-mm-dd 形式です(例: 2014-05-23)。日付と時刻は協定世界時(UTC)です。 |
request_time | サーバーがリクエストの処理を完了した時刻(UTC)(例: 01:42:39)。 |
x_edge_location | リクエストを処理したエッジロケーション。各エッジロケーションは、3 文字コードと、割り当てられた任意の数字で識別されます(例: DFW3)。 |
sc_bytes | サーバーからクライアントへのバイト数(ヘッダーを含む。例: 1045619)。 |
remote_ip | クライアントの IP(例: 192.0.2.183)。 |
cs_method | HTTP アクセス方式。 |
cs_host | DNS 名(リクエストに指定された CloudFront ディストリビューション名)。CNAME に対するリクエストを実行した場合、DNS 名フィールドには、CNAME ではなく、基礎となるディストリビューション DNS 名が含まれます。 |
cs_uri_stem | URI ステム(例: /images/daily-ad.jpg)。 |
sc_status | HTTP ステータスコード(例: 200) or 000 (CloudFront がリクエストに応答する前に、ビューアが接続をクローズしたことを示す) |
cs_referer | リファラー。 |
cs_user_agent | ユーザーエージェント |
cs_uri_query | 接続文字列に含まれる URI のクエリ文字列部分。URI にクエリ文字列が含まれない場合、ログファイルにあるその要求の cs-uri-query フィールドには 1 つのハイフン(-)が含まれます。 |
cs_cookie | 名前値のペアおよび関連属性を含む、リクエスト内の Cookie ヘッダー。 |
x_edge_result_type | リクエストの結果タイプ。 |
x_edge_request_id | 要求を一意に識別する暗号化された文字列。 |
x_host_header | ビューアによってこのリクエストの Host ヘッダーに組み込まれた値。 |
cs_protocol | ビューアによってリクエストに指定されたプロトコル。http または https。 |
cs_bytes | ビューアによってリクエストに組み込まれたデータのバイト数(クライアントからサーバーへの送信時のバイト数。ヘッダーを含む)。 |
time_taken | CloudFront エッジサーバーがビューアのリクエストを受け取った時間と、CloudFront がレスポンスの最終バイトをサーバーの出力キューに書き込んだ時間との差(秒数)をサーバー側で測定。 |
サンプルはこんな感じです。
#Version: 1.0 #Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken 2014-05-23 01:13:11 FRA2 182 192.0.2.10 GET d111111abcdef8.cloudfront.net /view/my/file.html 200 www.displaymyfiles.com Mozilla/4.0%20(compatible;%20MSIE%205.0b1;%20Mac_PowerPC) - zip=98101 RefreshHit MRVMF7KydIvxMWfJIglgwHQwZsbG2IhRJ07sn9AkKUFSHS9EXAMPLE== d111111abcdef8.cloudfront.net http - 0.001 2014-05-23 01:13:12 LAX1 2390282 192.0.2.202 GET d111111abcdef8.cloudfront.net /soundtrack/happy.mp3 304 www.unknownsingers.com Mozilla/4.0%20(compatible;%20MSIE%207.0;%20Windows%20NT%205.1) a=b&c=d zip=50158 Hit xGN7KWpVEmB9Dp7ctcVFQC4E-nrcOcEKS3QyAez--06dV7TEXAMPLE== d111111abcdef8.cloudfront.net http - 0.002
あらかじめ、CloudFrontのアクセスログを格納するためのテーブルをRedshift上に作成しておきます。
CloudFrontのアクセスログのフォーマットを考えて、cloudfront_access_logsテーブルを下記のように定義し、CREATE TABLE文を実行します。
CREATE TABLE cloudfront_access_logs ( request_date VARCHAR(MAX) SORTKEY, request_time VARCHAR(MAX) ENCODE LZO, x_edge_location VARCHAR(40) ENCODE LZO, sc_bytes INT ENCODE LZO, remote_ip VARCHAR(50) ENCODE LZO, cs_method VARCHAR(50) ENCODE LZO, cs_host VARCHAR(MAX) ENCODE LZO, cs_uri_stem VARCHAR(MAX) DISTKEY ENCODE LZO, sc_status VARCHAR(20) ENCODE LZO, cs_referrer VARCHAR(MAX) ENCODE LZO, cs_useragent VARCHAR(MAX) ENCODE LZO, cs_uri_query VARCHAR(MAX) ENCODE LZO, cs_cookie VARCHAR(MAX) ENCODE LZO, x_edge_result_type VARCHAR(MAX) ENCODE LZO, x_edge_request_id VARCHAR(MAX) ENCODE LZO, x_host_header VARCHAR(MAX) ENCODE LZO, cs_protocol VARCHAR(10) ENCODE LZO, cs_bytes INT ENCODE LZO, time_taken VARCHAR(MAX) ENCODE LZO );
COPYコマンドを実行して、S3上に蓄積されたCloudFrontのアクセスログをRedshiftにコピーします。
COPY cloudfront_access_logs FROM 's3://hogehoge' COMPUPDATE OFF CREDENTIALS 'aws_access_key_id=yourkey;aws_secret_access_key=yoursecretkey' DELIMITER '\t' IGNOREHEADER 2 TRUNCATECOLUMNS TRIMBLANKS ACCEPTINVCHARS MAXERROR as 100000 gzip
インポートが完了したら下記のクエリを実行して、意図した通りにログの各フィールドがテーブルの各列にマッピングされていることを確認しましょう。
SELECT * FROM cloudfront_access_logs LIMIT 10;
4つのパターンでクエリ実行してみました。
SELECT * FROM cloudfront_access_logs WHERE to_date(request_date || ' ' || request_time,'YYYY-MM-DD HH:MI:SS') >= '2014-11-17 12:00:00' AND to_date(request_date || ' ' || request_time,'YYYY-MM-DD HH:MI:SS') < '2014-11-17 13:00:00'
SELECT request_date, count(*) as lines FROM cloudfront_access_logs GROUP BY request_date ORDER BY request_date
SELECT top 10 * FROM cloudfront_access_logs ORDER BY time_taken
SELECT cs_host, count(*) as lines FROM cloudfront_access_logs WHERE sc_status <> 200 GROUP BY cs_host
s
AWSのログで遊ぼうシリーズ第2弾 – S3 x Redshift。
今回はS3のアクセスログを対象に、Redshiftにロードしてみます。
方法は下記のドキュメント参照。
S3のアクセスログは、スペース区切りで1行に1リクエストが記録されます。
各行には下記のフィールドが含まれます。(公式ドキュメントの抜粋です。)
フィールド名 | 説明 |
---|---|
bucket_owner | 配信元バケット所有者の正規ユーザー ID。 |
bucket | リクエストの処理対象のバケットの名前。システムで受け取ったリクエストの形式に誤りがあり、バケットを特定できない場合、そのリクエストはサーバーアクセスログに表示されません。 |
request_timestamp | リクエストを受け取った時刻。形式は strftime() の用語を使用し、[%d/%b/%Y:%H:%M:%S %z] になります。 |
remote_ip | リクエスタの表面上のインターネットアドレス。中間プロキシやファイアウォールにより、リクエストを作成したマシンの実際のアドレスが不明確になる場合があります。 |
requester | リクエスタの正規ユーザー ID。未認証リクエストの場合は「Anonymous」という文字列。この識別子は、アクセスコントロールに使用されるものと同じです。 |
request_id | リクエスト ID は、各リクエストを一意に識別するために Amazon S3 によって生成される文字列です。 |
operation | SOAP.operation、REST.HTTP_method.resource_type、または WEBSITE.HTTP_method.resource_type |
key | リクエストの URL エンコードされた「key」部分、オペレーションがキーパラメータを取らない場合は「-」。 |
request_uri | HTTP リクエストメッセージの Request-URI の部分。 |
http_status_code | レスポンスの HTTP ステータスの数値。 |
error_code | Amazon S3 エラーコード。エラーがない場合は「-」。 |
sent_bytes | 送信されたレスポンスのバイト数(HTTP プロトコルオーバーヘッドを除きます)。ゼロの場合は「-」。 |
object_size | 該当するオブジェクトの合計サイズ。 |
total_time | サーバーから見た、リクエストの転送中の時間数(ミリ秒単位)。これは、リクエストが受信されてから、レスポンスの最終バイトが送信されるまでの時間を計測した値です。クライアント側での計測値は、ネットワークレイテンシーにより長くなる場合があります。 |
turn_around_time | Amazon S3 でリクエストの処理に要した時間数(ミリ秒単位)。これは、リクエストの最終バイトが受信されてから、レスポンスの先頭バイトが送信されるまでの時間を計測した値です。 |
referer | HTTP Referrer ヘッダーの値(存在する場合)。一般に、HTTP ユーザーエージェント(例: ブラウザ)はこのヘッダーをリクエスト作成時のリンクまたは埋め込みページの URL に設定します。 |
user_agent | HTTP User-Agent ヘッダーの値。 |
version_id | リクエストのバージョン ID。オペレーションが versionId パラメータを取らない場合は「-」。 |
サンプルは下記。
79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket [06/Feb/2014:00:00:38 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be 3E57427F3EXAMPLE REST.GET.VERSIONING - "GET /mybucket?versioning HTTP/1.1" 200 - 113 - 7 - "-" "S3Console/0.4" - 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket [06/Feb/2014:00:00:38 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be 891CE47D2EXAMPLE REST.GET.LOGGING_STATUS - "GET /mybucket?logging HTTP/1.1" 200 - 242 - 11 - "-" "S3Console/0.4" - 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket [06/Feb/2014:00:00:38 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be A1206F460EXAMPLE REST.GET.BUCKETPOLICY - "GET /mybucket?policy HTTP/1.1" 404 NoSuchBucketPolicy 297 - 38 - "-" "S3Console/0.4" - 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket [06/Feb/2014:00:01:00 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be 7B4A0FABBEXAMPLE REST.GET.VERSIONING - "GET /mybucket?versioning HTTP/1.1" 200 - 113 - 33 - "-" "S3Console/0.4" - 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket [06/Feb/2014:00:01:57 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be DD6CC733AEXAMPLE REST.PUT.OBJECT s3-dg.pdf "PUT /mybucket/s3-dg.pdf HTTP/1.1" 200 - - 4406583 41754 28 "-" "S3Console/0.4" - 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be mybucket [06/Feb/2014:00:03:21 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be BC3C074D0EXAMPLE REST.GET.VERSIONING - "GET /mybucket?versioning HTTP/1.1" 200 - 113 - 28 - "-" "S3Console/0.4" -
あらかじめ、S3のアクセスログを格納するためのテーブルをRedshift上に作成しておきます。
S3のアクセスログのフォーマットを考えて、s3_access_logsテーブルを下記のように定義し、CREATE TABLE文を実行します。
CREATE TABLE s3_access_logs ( bucket_owner VARCHAR(MAX) ENCODE RUNLENGTH, bucket VARCHAR(255) ENCODE LZO, request_timestamp VARCHAR(MAX) SORTKEY ENCODE LZO, request_timestamp_delta VARCHAR(MAX) ENCODE LZO, remote_ip VARCHAR(50) ENCODE LZO, requestor VARCHAR(MAX) ENCODE LZO, request_id VARCHAR(MAX) ENCODE LZO, operation VARCHAR(MAX) ENCODE LZO, key VARCHAR(MAX) ENCODE LZO, request_uri VARCHAR(MAX) DISTKEY ENCODE LZO, http_status_code VARCHAR(MAX) ENCODE LZO, error_code VARCHAR(MAX) ENCODE LZO, sent_bytes VARCHAR(MAX) ENCODE LZO, object_size VARCHAR(MAX) ENCODE LZO, total_time VARCHAR(MAX) ENCODE LZO, turn_around_time VARCHAR(MAX) ENCODE LZO, referer VARCHAR(MAX) ENCODE LZO, user_agent VARCHAR(MAX) ENCODE LZO, version_id VARCHAR(10) ENCODE LZO );
COPYコマンドを実行して、S3上に蓄積されたS3のアクセスログをRedshiftにコピーします。
COPY s3_access_logs FROM 's3://hogehoge' COMPUPDATE OFF CREDENTIALS 'aws_access_key_id=yourkey;aws_secret_access_key=yoursecretkey' DELIMITER ' ' TRUNCATECOLUMNS TRIMBLANKS REMOVEQUOTES ACCEPTINVCHARS MAXERROR as 100000
REMOVEQUOTESを指定すると、S3のアクセスログにおけるrequest_uriのように、ダブルクォートで囲まれた文字列に含まれるスペースを区切り文字とみなさず、ひとかたまりで扱うことができます。
例えば、
"GET /mybucket?versioning HTTP/1.1"
というrequest_uriの場合、REMOVEQUOTESを指定しないと「”GET」「/mybucket?versioning」「HTTP/1.1″」に分割されて認識されます。
一方、REMOVEQUOTESを指定すると、「GET /mybucket?versioning HTTP/1.1」として認識されます。
便利ですね。
インポートが完了したら下記のクエリを実行して、意図した通りにログの各フィールドがテーブルの各列にマッピングされていることを確認しましょう。
SELECT * FROM s3_access_logs LIMIT 10;
4つのパターンでクエリ実行してみました。
SELECT * FROM s3_access_logs WHERE to_date(btrim(request_timestamp,'['),'DD/MON/YYYY:HH:MI:SS') >= '2014-11-17 12:00:00' AND to_date(btrim(request_timestamp,'['),'DD/MON/YYYY:HH:MI:SS') < '2014-11-17 13:00:00'
SELECT top 10 * FROM s3_access_logs ORDER BY turn_around_time
SELECT bucket, count(*) as lines FROM s3_access_logs WHERE http_status_code <> 200 GROUP BY bucket
SELECT user_agent, count(*) as lines FROM s3_access_logs WHERE error_code <> '-' GROUP BY user_agent ORDER BY lines DESC
AWSのログで遊ぼうシリーズ第1弾 – ELB x Redshift。
今回はELBのアクセスログを対象に、Redshiftにロードしてみます。
方法は下記のドキュメント参照。
ELBのアクセスログは、指定したS3の下記のパスに5分毎あるいは60分毎に(設定依存)出力されます。
{Bucket}/{Prefix}/AWSLogs/{AWS AccountID}/elasticloadbalancing/{Region}/{Year}/{Month}/{Day}/{AWS Account ID}_elasticloadbalancing_{Region}_{Load Balancer Name}_{End Time}_{Load Balancer IP}_{Random String}.log
出力されるログファイルは非圧縮です
ELBのアクセスログは、スペース区切りで1行に1リクエストが記録されます。
各行には下記のフィールドが含まれます。(公式ドキュメントの抜粋です。)
フィールド名 | 説明 |
---|---|
request_timestamp | クライアントに応答が返された時間(UTC)。ISO 8601 形式が使用されます。 |
elb | ロードバランサーの名前 |
client_port | リクエストを送信したクライアントの IP アドレスとポート番号。 |
backend_port | このリクエストを処理した登録済みインスタンスの IP アドレスとポート番号。 |
request_processing_time | ロードバランサーがリクエストを受け取り、そのリクエストを登録済みインスタンスに送信した時点からの合計経過時間(秒単位) |
backend_processing_time | ロードバランサーが登録済みインスタンスにリクエストを送信し、そのインスタンスが応答ヘッダーの送信を開始した時点から合計経過時間(秒単位)。 |
response_processing_time | ロードバランサーが登録済みインスタンスから応答ヘッダーを受け取り、クライアントへの応答の送信を開始した時点からの合計経過時間(秒単位)。この処理時間には、ロードバランサーでの待機時間と、ロードバランサーからバックエンドへの接続の取得時間の両方が含まれます。 |
elb_status_code | ロードバランサー(HTTP のみ)からの応答のステータスコード |
backend_status_code | 登録済みインスタンス(HTTP のみ)からの応答のステータスコード |
received_bytes | クライアント(リクエスタ)から受け取ったリクエストのサイズ(バイト単位)。HTTP リクエストの場合、受信したバイト数はリクエストの本文を表し、ヘッダーを含みません。TCP の場合、バイト数はヘッダーを含みます。 |
sent_bytes | クライアント(リクエスタ)に返される応答のサイズ(バイト単位)。HTTP レスポンスの場合、送信バイト数は応答の本文を表し、ヘッダーを含みません。TCP の場合、バイト数はヘッダーを含みます。 |
request | クライアントからのリクエスト行は二重引用符で囲まれており、次の形式でログに記録されます。HTTP メソッド + プロトコル://ホストヘッダー:ポート + パス + HTTP バージョン。TCP リクエストの場合、URL が入力されない代わりに、スペースで区切られた 3 個のダッシュが引用符で囲まれて表示されます(例: “- – -“)。 |
HTTPのアクセスログはこんな感じです。
2014-03-04T02:20:21.212932Z my-elb 192.22.123.169:21029 172.16.93.0:80 0.000066 0.000651 0.000044 404 404 0 315 "GET http://example.com:80/index2.html HTTP/1.1" 2014-03-04T02:20:21.273241Z my-elb 192.22.127.225:28376 172.16.93.1:80 0.000047 0.000701 0.000032 200 200 0 1085 "GET http://example.com:80/index.html HTTP/1.1" 2014-03-04T02:20:21.444392Z my-elb 192.22.165.43:13503 172.16.93.1:80 0.000067 0.00067 0.000053 200 200 0 1085 "GET http://example.com:80/index.html HTTP/1.1" 2014-03-04T02:20:21.977025Z my-elb 192.22.121.162:30815 172.16.93.1:80 0.00007 0.000867 0.000069 200 200 0 1097 "GET http://example.com:80/sample.html HTTP/1.1"
TCPのアクセスログはこんな感じです。
2014-03-04T02:15:43.959433Z my-elb 192.22.81.240:24599 172.16.93.1:80 0.000497 0.000015 0.000017 - - 200 1362 "- - - " 2014-03-04T02:15:44.001637Z my-elb 192.22.152.221:27889 172.16.93.0:80 0.000574 0.000015 0.000017 - - 200 1362 "- - - " 2014-03-04T02:15:44.196433Z my-elb 192.22.165.251:49539 172.16.93.1:80 0.00053 0.000012 0.000017 - - 200 1362 "- - - " 2014-03-04T02:15:44.634838Z my-elb 192.22.32.224:57443 172.16.93.0:80 0.000587 0.000015 0.000017 - - 200 1362 "- - - " 2014-03-04T02:15:44.667070Z my-elb 192.22.68.165:33606 172.16.93.0:80 0.000506 0.000015 0.000017 - - 200 1350 "- - - " 2014-03-04T02:15:44.764904Z my-elb 192.22.250.2:32954 172.16.93.0:80 0.000483 0.000011 0.000018 - - 200 1350 "- - - "
あらかじめ、ELBのアクセスログを格納するためのテーブルをRedshift上に作成しておきます。
ELBのアクセスログのフォーマットを考えて、elb_access_logsテーブルを下記のように定義し、CREATE TABLE文を実行します。
CREATE TABLE elb_access_logs ( request_timestamp DateTime encode lzo, elb varchar(30) encode lzo, client_port varchar(22) encode lzo, backend_port varchar(22) encode lzo, request_processing_time FLOAT encode bytedict, backend_processing_time FLOAT encode bytedict, response_prosessing_time FLOAT encode bytedict, elb_status_code varchar(3) encode lzo, backend_status_code varchar(3) encode lzo, received_bytes BIGINT encode lzo, sent_bytes BIGINT encode lzo, request varchar(MAX) encode lzo ) sortkey(request_timestamp) ;
COPYコマンドを実行して、S3上に蓄積されたELBのアクセスログをRedshiftにコピーします。
COPY elb_access_logs FROM 's3://hogehoge' COMPUPDATE OFF CREDENTIALS 'aws_access_key_id=yourkey;aws_secret_access_key=yoursecretkey' DELIMITER ' ' TRUNCATECOLUMNS TRIMBLANKS REMOVEQUOTES TIMEFORMAT as 'auto' ACCEPTINVCHARS MAXERROR as 100000
インポートが完了したら下記のクエリを実行して、意図した通りにログの各フィールドがテーブルの各列にマッピングされていることを確認しましょう。
SELECT * FROM elb_access_logs LIMIT 10;
4つのパターンでクエリ実行してみた結果、下記のようになりました。
SELECT * FROM elb_access_logs WHERE request_timestamp >= '2014-11-17 12:00:00' AND request_timestamp < '2014-11-17 13:00:00'
SELECT top 10 * FROM elb_access_logs ORDER BY backend_processing_time
SELECT to_char(request_timestamp, 'YYYY-MM') as month, count(*) as lines FROM elb_access_logs WHERE elb_status_code <> 200 GROUP BY to_char(request_timestamp, 'YYYY-MM') ORDER BY to_char(request_timestamp, 'YYYY-MM');
結果例:
month | lines |
---|---|
2014-09 | 414 |
2014-10 | 256 |
2014-11 | 383 |
SELECT elb, count(*) as lines FROM elb_access_logs WHERE request_processing_time > 1 GROUP BY elb;
結果例:
elb | lines |
---|---|
testA | 1835 |
testB | 932 |
testC | 24 |
非常に細かい小ネタ。
ツールやスクリプトを書いていると、自サーバーのIPアドレスの文字列部分だけをパースして取得したくなることは結構あります。
ただ、パースとかめんどくさいので毎回考えるのが億劫になってしまいます。
今回はIPアドレス文字列を抽出するためのコマンドを13個紹介します。
自身のホスト名で名前解決できる場合、ホスト名からIPアドレスをひいてくると手っ取り早いです。
ただし、NICを指定してIPアドレスを取得したい場合、環境の制約で名前解決できない場合は不向きです。
$ hostname -i
// eth0 $ hostname -I | cut -f1 -d' ' // eth1 $ hostname -I | cut -f2 -d' '
$ hostname -I | awk -F" " '{print $1}'
$ host `hostname` | awk '{print $4}'
経験あるインフラエンジニアなら、昔から慣れたifconfigコマンドを使う方法がなじみ深いでしょう。
$ ifconfig eth0 | awk '/inet / {print $2}' | awk -F: '{print $2}'
$ ifconfig | grep 'inet addr:' | grep -v '127.0.0.1' | awk -F: '{print $2}' | awk '{print $1}' | head -1
$ ifconfig | grep -A 1 'eth0' | tail -1 | cut -d ':' -f 2 | cut -d ' ' -f 1
ifconfigとか使っているのはオッサンだけだってばよ とのことなので、若者向けにipコマンドを使ったパターンをば。
$ ip -f inet -o addr show eth0|cut -d\ -f 7 | cut -d/ -f 1
$ ip addr show eth0 | grep -o 'inet [0-9]\+\.[0-9]\+\.[0-9]\+\.[0-9]\+' | grep -o [0-9].*
$ wget -q -O - http://169.254.169.254/latest/meta-data/local-ipv4
$ wget -q -O - http://169.254.169.254/latest/meta-data/public-ipv4
$ wget -q -O - http://checkip.amazonaws.com/
$ wget -q -O - http://ipecho.net/plain]]>
またもやHubotネタです。
#最近Hubot弄ってるだけで他のこと全然できてない・・・
今回は、投稿されたURLのTitleをつぶやくスクリプトを実装します。
早速スクリプト本体です。Gistにもおいておきました。
下記を参考にさせていただきました。
request = require 'request' cheerio = require 'cheerio' iconv = require 'iconv' convertEncode = (body) -> charset = body.toString('ascii').match /<meta[^>]*charset\s*=\s*["']?([-\w]+)["']?/i return new iconv.Iconv(charset[1], 'UTF-8//TRANSLIT//IGNORE').convert(body) if charset body urlBlackList = { 'ignore hoge.com' : 'https://hoge.com', 'ignore hoge.co.jp' : 'https://hoge.co.jp' } module.exports = (robot) -> robot.hear /(h?ttps?:\/\/[-\w@:%\+.~#?&\/=]+)/i, (msg)-> uri = msg.match[1] for key, value of urlBlackList if uri.indexOf(value) != -1 return options = { uri: uri, followRedirect: true, followAllRedirects: true, encoding: null, strictSSL: false } request options, (error, response, body)-> return if error return if response.statusCode != 200 $ = cheerio.load convertEncode(body).toString().replace(/<!\[CDATA\[([^\]]+)]\]>/ig, "$1") title = $("title").first() if title titleText = title.text().replace(/^[\s\n]+/, '').replace(/[\s\n]+$/, '') msg.send "title: #{titleText}" if titleText != ''
requestモジュールは、Node.js で HTTP リクエストを簡単にハンドリングするためのモジュールです。
optionsにて様々な挙動を設定することができますので、followRedirectとかstrictSSLとかユースケースに合わせてカスタマイズしてます。
その他オプションについてはGithubのREADME参照。
一部無視したいURLがあったのでブラックリストで除外するように作りこんでます。
画像ファイル等、titleがないものについてはそもそもメッセージを送信しないようにしました。
URL喋るだけ。
(moomindani) https://moomindani.wordpress.com (moobot) title: mooapp | AWSやJava、iPhone App、OpenFlowなど、様々なトピックを扱う技術系ブログです。]]>
今回も引き続きHubotネタです。
++, –のような命令で、誰かに対して投票するようなスクリプトを実装します。
早速スクリプト本体です。Gistsにおいておきました。
末尾のgihyo.jpのサンプルスクリプトを参考にしてます。
# Description: # Utility commands for voting someone. # # Commands: # <name>++, <name>--, !vote-list, !vote-clear module.exports = (robot) -> KEY_SCORE = 'key_score' getScores = () -> return robot.brain.get(KEY_SCORE) or {} changeScore = (name, diff) -> source = getScores() score = source[name] or 0 new_score = score + diff source[name] = new_score robot.brain.set KEY_SCORE, source return new_score robot.hear /!vote-list/i, (msg) -> source = getScores() console.log source for name, score of source msg.send "#{name}: #{score}" robot.hear /!vote-clear/i, (msg) -> robot.brain.set KEY_SCORE, null robot.hear /([A-z]+)\+\+$/i, (msg) -> name = msg.match[1] new_score = changeScore(name, 1) msg.send "#{name}: #{new_score}" robot.hear /([A-z]+)--$/i, (msg) -> name = msg.match[1] new_score = changeScore(name, -1) msg.send "#{name}: #{new_score}"
HubotにはBotがリアクティブにアクションするための方法として、respondとhearが用意されています。
respondを使うと、Botに対して名前付きで話しかけてきた発言に反応します。
(moomindani) moobot inoshishi++ (moobot) inoshishi: 1
一方、hearを使うと、チャット上に流れてきた発言に反応します。
(moomindani) inoshishi++ (moobot) inoshishi: 1
個人的にはBotの存在をあまり意識したくないので、hearを使うことが多いです。
robot.brainはデータを永続化するためのインタフェースです。
これを使うと、Redisにデータを読み書きすることができます。
robot.brain.get('key')
robot.brain.set 'key' 'value'
データ消したければnullをSETしちゃいます。
robot.brain.set 'key' null
(moomindani) inoshishi++ (moobot) inoshishi: 1 (moomindani) inoshishi++ (moobot) inoshishi: 2
(moomindani) inoshishi-- (moobot) inoshishi: 1
(moomindani) !vote-list (moobot) inoshishi: 1 (moobot) ushi: 1 (moobot) hituji: 1
(moomindani) !vote-clear
下記に参考にした資料を記載します。
特に、hubot-plusplusは便利そうだったのですが、自分の用途にはオーバースペックだったので今回は使わず。