BigQuery workflowsとはBigQueryの中に組み込まれたworkflowsのことで従来からあるworkflowsとは異なる機能になる。BigQuery workflowsとはクエリを設定したスケジューリングに従って順番に定期実行することが可能になる。
参考:Introduction to workflows – Google Cloud
BigQuery workflowsとは
クエリを定期実行するだけであれば「クエリのスケジューリング」を使えばよいが、クエリを順番に定期実行するケースにBigQuery workflowsを用いる。順番にスケジューリングを組むのであればDataformでもよいが、Dataformを使うまでもないケース。もっと簡潔に処理を組みたいといったニーズにBigQuery workflowsは最適であろう。つまり、クエリのスケジューリングとDataformの間に位置するのがBigQuery workflowsとなる。
BigQuery workflowsの作成
BigQueryのコンソール画面から、エディターペインのタブバーで+記号の横にある矢印ドロップダウンメニューをクリックし、 [ワークフロー]をクリックする。他、作成手順については操作していけば直感的に分かるため省略する。詳細については公式ヘルプを参照。
参考:Create workflows – Google Cloud
BigQuery workflowsの実行
BigQuery workflowsの作成から実行までの手順の大まかな流れは以下の通り。
- クエリの実行順番を指定
- スケジュール
- デプロイ
クエリの実行順番を指定
クエリを作成した後は、クエリ間の依存関係を定義していくと、その定義したリネージが表示される。
スケジュール
デプロイ
コードを修正した際は必ず「デプロイ」を実行する。Cloudの用語では「デプロイ」という単語はよく出てくるが、要はプログラムを配置して展開するといったような意味で使われる状態にするという意味。
BigQueryのGA4スキーマの履歴をBigQuery workflowsで管理
BigQueryにエクスポートされるGA4のスキーマは度々変更がかかるため、何が変わったのかを検知するためにBigQuery workflowsを活用してログを残す実例を提示する。構成として以下の通りで、テーブルが存在していなければ作成しGA4のスキーマ履歴を残すテーブルと昨日時点のスキーマを残すテーブルの2つを参照し、2日前と前日に変更があったかを検証し結果をログに残す仕組みとなる。
- create_schema_history
- insert_schema_history
- create_schema_yesterday
- create_schema_change_log
- insert_schema_changes_log
1.create_schema_history
CREATE TABLE IF NOT EXISTS `<project>.analytics_schema_log.schema_history` (
table_name STRING,
column_name STRING,
data_type STRING,
is_nullable STRING,
snapshot_time DATETIME
);
2.insert_schema_history
INSERT INTO `<project>.analytics_schema_log.schema_history`
SELECT
table_name,
column_name,
data_type,
is_nullable,
CURRENT_DATETIME('Asia/Tokyo') AS snapshot_time
FROM
`<project>.<dataset>.INFORMATION_SCHEMA.COLUMNS`
WHERE
table_name = FORMAT_TIMESTAMP('events_%Y%m%d', DATE_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY));
3.create_schema_yesterday
CREATE OR REPLACE TABLE `<project>.analytics_schema_log.schema_yesterday` AS
SELECT
table_name,
column_name,
data_type,
is_nullable,
CURRENT_DATETIME('Asia/Tokyo') AS snapshot_time
FROM
`<project>.<dataset>.INFORMATION_SCHEMA.COLUMNS`
WHERE
table_name = FORMAT_TIMESTAMP('events_%Y%m%d', DATE_SUB(CURRENT_DATETIME('Asia/Tokyo'), INTERVAL 1 DAY))
;
4.create_schema_change_log
CREATE TABLE IF NOT EXISTS `<project>.analytics_schema_log.schema_change_log` (
log_id INT64 OPTIONS (description="ログの一意なID。昇順・降順でソート可能"),
change_type STRING OPTIONS (description="スキーマ変更のタイプ(ADDED, REMOVED, MODIFIED)"),
table_name STRING OPTIONS (description="テーブル名"),
column_name STRING OPTIONS (description="カラム名"),
data_type STRING OPTIONS (description="カラムのデータ型"),
is_nullable STRING OPTIONS (description="NULLの許可 (YES/NO)"),
log_time DATETIME OPTIONS (description="変更が記録された時間を日本時間で記録")
);
5.insert_schema_changes_log
BEGIN
DECLARE new_log_id INT64;
SET new_log_id = (SELECT COALESCE(MAX(log_id), 0) + 1 FROM `<project>.analytics_schema_log.schema_change_log`);
BEGIN TRANSACTION;
CREATE TEMP TABLE temp_schema_changes AS
WITH
latest_schema AS (
SELECT
table_name,
column_name,
data_type,
is_nullable
FROM
`<project>.analytics_schema_log.schema_yesterday`
),
previous_schema AS (
SELECT
table_name,
column_name,
data_type,
is_nullable
FROM
`<project>.analytics_schema_log.schema_history`
WHERE
table_name = FORMAT_TIMESTAMP('events_%Y%m%d', DATE_SUB(CURRENT_DATETIME('Asia/Tokyo'), INTERVAL 2 DAY))
AND snapshot_time = (
SELECT
MAX(snapshot_time)
FROM
`<project>.analytics_schema_log.schema_history`
WHERE
table_name = FORMAT_TIMESTAMP('events_%Y%m%d', DATE_SUB(CURRENT_DATETIME('Asia/Tokyo'), INTERVAL 2 DAY))
)
)
SELECT
'New Column' AS change_type,
latest_schema.table_name,
latest_schema.column_name,
latest_schema.data_type,
latest_schema.is_nullable,
CURRENT_DATETIME('Asia/Tokyo') AS log_time
FROM
latest_schema
LEFT JOIN
previous_schema
ON latest_schema.column_name = previous_schema.column_name
WHERE previous_schema.column_name IS NULL
UNION ALL
SELECT
'Removed Column' AS change_type,
previous_schema.table_name,
previous_schema.column_name,
previous_schema.data_type,
previous_schema.is_nullable,
CURRENT_DATETIME('Asia/Tokyo') AS log_time
FROM
previous_schema
LEFT JOIN
latest_schema
ON previous_schema.column_name = latest_schema.column_name
WHERE latest_schema.column_name IS NULL
UNION ALL
SELECT
'Modified Column' AS change_type,
latest_schema.table_name,
latest_schema.column_name,
latest_schema.data_type,
latest_schema.is_nullable,
CURRENT_DATETIME('Asia/Tokyo') AS log_time
FROM
latest_schema
JOIN
previous_schema
ON latest_schema.column_name = previous_schema.column_name
WHERE
latest_schema.data_type != previous_schema.data_type
OR
latest_schema.is_nullable != previous_schema.is_nullable;
INSERT INTO `<project>.analytics_schema_log.schema_change_log`
SELECT
new_log_id AS log_id,
change_type,
table_name,
column_name,
data_type,
is_nullable,
CURRENT_DATETIME('Asia/Tokyo') AS log_time
FROM
temp_schema_changes;
COMMIT TRANSACTION;
END;