仕事で BigQuery のクエリの結果を共有用のフォルダに Google Sheets 形式で保存して、Slack で URL を共有する、ということを頻繁にやっているのですが、
- WebUI だと、クエリ結果を Google Sheets 形式で保存できるけど、ファイル名とフォルダが指定できないので、いちいち名前を変えて、フォルダを移動しないといけない。Slack 通知はできない。
- Google Apps Scirpt (GAS) だと、全部実現できるけど、コードの管理を GitHub でやるのが面倒
と定形処理なのに、イマイチ効率の良い方法がなかったのを、tumugi で Ruby スクリプト化しました。
tumugi について詳細に知りたい方は、Ruby と tumugi によるデータパイプライン構築 をご覧ください。
タスクを分割して、定義する
「BigQuery のクエリ結果を Google Sheets に保存し、URL を Slack に通知」するという複合タスクは、以下の3つのタスクに分解できます。
- クエリを実行して、その結果を一時テーブルとして保存する
- 一時テーブルのデータを CSV 形式でダウンロードして、Google Drive に Sheets 形式に変換していアップロードする
- 作成された Sheets の URL を Incoming Webhooks を利用して、Slack に通知する
これを tumugi の DSL で定義すると次のようになります。上記のタスクが、ほぼ 1:1 に tumugi の task として記述できているのがわかると思います。
######################################################
# 1. クエリを実行して、その結果を一時テーブルとして保存する
######################################################
task :create_dataset, type: :bigquery_dataset do
dataset_id "tmp"
end
task :run_query, type: :bigquery_query do
requires :create_dataset
dataset_id { input.dataset_id }
table_id "dest_table_#{Time.now.strftime("%Y%m%d%H%M%S")}"
query <<~SQL
SELECT
word,
word_count
FROM
publicdata:samples.shakespeare
WHERE
corpus = "hamlet"
ORDER BY
word_count DESC
LIMIT
100
SQL
end
######################################################
# 2. 一時テーブルのデータを CSV 形式でダウンロードして、
# Google Drive に Sheets 形式に変換していアップロードする
######################################################
task :export_to_google_drive, type: :bigquery_export do
requires :run_query
dataset_id { input.dataset_id }
table_id { input.table_id }
output {
target :google_drive_file,
name: "#{input.table_id}.csv",
parents: ENV['GOOGLE_DRIVE_FOLDER_ID'],
mime_type: "application/vnd.google-apps.spreadsheet"
}
end
######################################################
# 3. 作成された Sheets の URL を Incoming Webhooks を
# 利用して、Slack に通知する
######################################################
task :notify_to_slack, type: :webhook do
requires :export_to_google_drive
url ENV['SLACK_WEBHOOK_URL']
body { { text: "#{input.name} export success.\n<#{input.url}|Click here> to get file" } }
end
####################################################
# Main Task
####################################################
task :main do
requires :notify_to_slack
run { log "Finished" }
end
DAG で見るとこんな感じです。
では、上記のワークフローを実際に動作させる手順を以下で説明します。
前提条件
以下のソフトがインストールされていること
- Ruby >= 2.3.0
- Bundler
tumugi のインストール
Bundler を使って、tumugi と必要なプラグインをインストールします。
以下の内容の Gemfile
を作成し、bundle install
します。
source 'https://rubygems.org'
gem "tumugi", "~> 0.6.3"
gem "tumugi-plugin-bigquery", "~> 0.3.0"
gem "tumugi-plugin-google_drive", "~> 0.4.0"
gem "tumugi-plugin-webhook", "~> 0.1.1"
Google Cloud Platform API の設定
BigQuery と Google Drive の Google Cloud Platform API を呼び出すので、その設定をします。
- サービスアカウントの作成と JSON 形式の秘密鍵の取得
- Google Drive API の有効化
- Google Drive API はデフォルトでは無効化されています
- 設定は公式ガイドを参照 -> Step 1: Turn on the Drive API in official guide
Google Drive の設定
- Google Drive にクエリ結果を保存するためのフォルダを作成
- GCPの設定の所で作成したサービスアカウントに、編集者権限を与える
- サービスアカウント ID がメールアドレスの形式をしているので、共有設定からサービスアカウント ID を入力すれば OK
-
[service-account-name]@[gcp-project-id].iam.gserviceaccount.com
という形式です
Slack の Incoming Webhooks の作成
https://[SlackチームID].slack.com/apps/A0F7XDUAZ-incoming-webhooks
で Incoming Webhooks を作成し、通知用の URL を取得します。
tumugi_config.rb の作成
tumugi ではプラグインで共通に使うような設定、例えば、APIキーや秘密鍵のパスなどは、tumugi_config.rb
というファイルで設定します。
tumugi コマンドを実行するディレクトリの直下に tumugi_config.rb
という名前のファイルを置いておくと、自動で読み込まれます。
今回は、サービスアカウントの秘密鍵の設定を tumugi_config.rb
に記述します。
Tumugi.configure do |config|
private_key_file = ENV['GCP_PRIVATE_KEY_FILE']
config.section("bigquery") do |section|
section.private_key_file = private_key_file
end
config.section("google_drive") do |section|
section.private_key_file = private_key_file
end
end
tumugi_config.rb
は拡張子の通り、Ruby ファイルなので環境変数の読み込みや、変数を利用して、秘密鍵のパスの共有化ができていることがわかりますね。
実行する
それでは、準備が整ったので実行してみましょう。
GCPの秘密鍵のパス、Google Drive のフォルダの ID、Slack Incoming Webhooks の URL は環境変数経由で設定します。
$ export GCP_PRIVATE_KEY_FILE="/path/to/gcp/private/key.json"
$ export GOOGLE_DRIVE_FOLDER_ID="folder_id"
$ export SLACK_WEBHOOK_URL="https://hooks.slack.com/services/xxx"
$ bundle exec tumugi run -f workflow.rb main
ログがでて、成功すると以下のような通知が Slack に来ます。
Click here
のところをクリックしてみると、
Google Sheets が開けました。
補足: 各タスクの解説
各タスクの機能について興味がある人がいるかもしれないので、補足で解説を書いておきます。
bigquery_datatset
task :create_dataset, type: :bigquery_dataset do
dataset_id "tmp"
end
dataset_id
で指定した BigQuery のデータセットが存在しなければ作成します。
bigquery_query
task :run_query, type: :bigquery_query do
requires :create_dataset
dataset_id { input.dataset_id }
table_id "dest_table_#{Time.now.strftime("%Y%m%d%H%M%S")}"
query <<~SQL
<SQL文>
SQL
end
query
で指定した SQL を実行し、dataset_id
.table_id
で指定したテーブルの結果を保存します。query
には文字列を指定するのですが、Ruby 2.3 から導入された前方の空白を無視するヒアドキュメント記法 <<~
を使うと改行を含んでいてもすっきり書けますね。
bigquery_export
task :export_to_google_drive, type: :bigquery_export do
requires :run_query
dataset_id { input.dataset_id }
table_id { input.table_id }
output {
target :google_drive_file,
name: "#{input.table_id}.csv",
parents: ENV['GOOGLE_DRIVE_FOLDER_ID'],
mime_type: "application/vnd.google-apps.spreadsheet"
}
end
dataset_id
.table_id
テーブルのデータを output
で指定した場所にエクスポートします。
Google Drive に出力する場合、mime_type
を指定することで、CSVファイルを Google Sheets に変換してアップロードすることができます。
全てのファイルが変換できるわけではないので、サポートしているファイルフォーマットは、公式ドキュメントを参照してください。
なお、BigQuery API では GCS への export (extract) しかサポートしていませんが、tumugi の FileSystemTarget に準拠した場所へならどこへでも export 可能です。現在は、Google Drive の他に、GCS、ローカルファイルをサポートしています。Amazon S3 も将来的にサポートする予定です。
webhook
task :notify_to_slack, type: :webhook do
requires :export_to_google_drive
url ENV['SLACK_WEBHOOK_URL']
body { { text: "#{input.name} export success.\n<#{input.url}|Click here> to get file" } }
end
url
で指定した URL に、body
で指定したデータを POST します。
HTTP メソッドを GET にしたり、content-type を application/json
から application/x-www-form-urlencoded
に変更したりすることもできます。
まとめ
今回は tumugi をワークフローエンジンというよりは、スクリプトを便利に書くためのツールとしても使える例として、BigQuery のクエリ結果を Google Sheets に保存し、URL を Slack に通知する方法を紹介しました。
今回の記事の例も含めて、サンプルは以下のリポジトリに格納されていますので、興味があればご覧ください。