BigQuery workflowsでGA4スキーマ履歴を管理

BigQuery workflowsとはBigQueryの中に組み込まれたworkflowsのことで従来からあるworkflowsとは異なる機能になる。BigQuery workflowsとはクエリを設定したスケジューリングに従って順番に定期実行することが可能になる。

参考:Introduction to workflows – Google Cloud

BigQuery workflowsとは

クエリを定期実行するだけであれば「クエリのスケジューリング」を使えばよいが、クエリを順番に定期実行するケースにBigQuery workflowsを用いる。順番にスケジューリングを組むのであればDataformでもよいが、Dataformを使うまでもないケース。もっと簡潔に処理を組みたいといったニーズにBigQuery workflowsは最適であろう。つまり、クエリのスケジューリングとDataformの間に位置するのがBigQuery workflowsとなる。

BigQuery workflowsの作成

BigQueryのコンソール画面から、エディターペインのタブバーで+記号の横にある矢印ドロップダウンメニューをクリックし、 [ワークフロー]をクリックする。他、作成手順については操作していけば直感的に分かるため省略する。詳細については公式ヘルプを参照。

参考:Create workflows – Google Cloud

BigQuery workflowsの実行

BigQuery workflowsの作成から実行までの手順の大まかな流れは以下の通り。

  1. クエリの実行順番を指定
  2. スケジュール
  3. デプロイ

クエリの実行順番を指定

クエリを作成した後は、クエリ間の依存関係を定義していくと、その定義したリネージが表示される。

スケジュール

デプロイ

コードを修正した際は必ず「デプロイ」を実行する。Cloudの用語では「デプロイ」という単語はよく出てくるが、要はプログラムを配置して展開するといったような意味で使われる状態にするという意味。

BigQueryのGA4スキーマの履歴をBigQuery workflowsで管理

BigQueryにエクスポートされるGA4のスキーマは度々変更がかかるため、何が変わったのかを検知するためにBigQuery workflowsを活用してログを残す実例を提示する。構成として以下の通りで、テーブルが存在していなければ作成しGA4のスキーマ履歴を残すテーブルと昨日時点のスキーマを残すテーブルの2つを参照し、2日前と前日に変更があったかを検証し結果をログに残す仕組みとなる。

  1. create_schema_history
  2. insert_schema_history
  3. create_schema_yesterday
  4. create_schema_change_log
  5. insert_schema_changes_log

1.create_schema_history

CREATE TABLE IF NOT EXISTS `<project>.analytics_schema_log.schema_history` (
  table_name STRING,
  column_name STRING,
  data_type STRING,
  is_nullable STRING,
  snapshot_time DATETIME 
);

2.insert_schema_history

INSERT INTO `<project>.analytics_schema_log.schema_history`
SELECT
  table_name,
  column_name,
  data_type,
  is_nullable,
  CURRENT_DATETIME('Asia/Tokyo') AS snapshot_time
FROM
  `<project>.<dataset>.INFORMATION_SCHEMA.COLUMNS`
WHERE
  table_name = FORMAT_TIMESTAMP('events_%Y%m%d', DATE_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)); 

3.create_schema_yesterday

CREATE OR REPLACE TABLE `<project>.analytics_schema_log.schema_yesterday` AS
SELECT
  table_name,
  column_name,
  data_type,
  is_nullable,
  CURRENT_DATETIME('Asia/Tokyo') AS snapshot_time
FROM
  `<project>.<dataset>.INFORMATION_SCHEMA.COLUMNS`
WHERE
  table_name = FORMAT_TIMESTAMP('events_%Y%m%d', DATE_SUB(CURRENT_DATETIME('Asia/Tokyo'), INTERVAL 1 DAY))
;

4.create_schema_change_log

CREATE TABLE IF NOT EXISTS `<project>.analytics_schema_log.schema_change_log` (
  log_id INT64 OPTIONS (description="ログの一意なID。昇順・降順でソート可能"), 
  change_type STRING OPTIONS (description="スキーマ変更のタイプ(ADDED, REMOVED, MODIFIED)"),
  table_name STRING OPTIONS (description="テーブル名"),
  column_name STRING OPTIONS (description="カラム名"),
  data_type STRING OPTIONS (description="カラムのデータ型"),
  is_nullable STRING OPTIONS (description="NULLの許可 (YES/NO)"),
  log_time DATETIME OPTIONS (description="変更が記録された時間を日本時間で記録")  
);

5.insert_schema_changes_log

BEGIN

DECLARE new_log_id INT64;
SET new_log_id = (SELECT COALESCE(MAX(log_id), 0) + 1 FROM `<project>.analytics_schema_log.schema_change_log`);

BEGIN TRANSACTION;

CREATE TEMP TABLE temp_schema_changes AS
WITH 
  latest_schema AS (
    SELECT
      table_name,
      column_name,
      data_type,
      is_nullable
    FROM
      `<project>.analytics_schema_log.schema_yesterday`
  ),
  previous_schema AS (
    SELECT
      table_name,
      column_name,
      data_type,
      is_nullable
    FROM
      `<project>.analytics_schema_log.schema_history`
    WHERE
      table_name = FORMAT_TIMESTAMP('events_%Y%m%d', DATE_SUB(CURRENT_DATETIME('Asia/Tokyo'), INTERVAL 2 DAY)) 
    AND snapshot_time = (
      SELECT
        MAX(snapshot_time) 
      FROM
        `<project>.analytics_schema_log.schema_history`
      WHERE
        table_name = FORMAT_TIMESTAMP('events_%Y%m%d', DATE_SUB(CURRENT_DATETIME('Asia/Tokyo'), INTERVAL 2 DAY)) 
    )
  )

  SELECT
    'New Column' AS change_type,
    latest_schema.table_name,
    latest_schema.column_name,
    latest_schema.data_type,
    latest_schema.is_nullable,
    CURRENT_DATETIME('Asia/Tokyo') AS log_time
  FROM
    latest_schema
  LEFT JOIN
    previous_schema
  ON latest_schema.column_name = previous_schema.column_name
  WHERE previous_schema.column_name IS NULL

  UNION ALL

  SELECT
    'Removed Column' AS change_type,
    previous_schema.table_name,
    previous_schema.column_name,
    previous_schema.data_type,
    previous_schema.is_nullable,
    CURRENT_DATETIME('Asia/Tokyo') AS log_time 
  FROM
    previous_schema
  LEFT JOIN
    latest_schema
  ON previous_schema.column_name = latest_schema.column_name
  WHERE latest_schema.column_name IS NULL

  UNION ALL

  SELECT
    'Modified Column' AS change_type,
    latest_schema.table_name,
    latest_schema.column_name,
    latest_schema.data_type,
    latest_schema.is_nullable,
    CURRENT_DATETIME('Asia/Tokyo') AS log_time  
  FROM
    latest_schema
  JOIN
    previous_schema
  ON latest_schema.column_name = previous_schema.column_name
  WHERE
    latest_schema.data_type != previous_schema.data_type
  OR
    latest_schema.is_nullable != previous_schema.is_nullable;

INSERT INTO `<project>.analytics_schema_log.schema_change_log`
SELECT
  new_log_id AS log_id,  
  change_type,
  table_name,
  column_name,
  data_type,
  is_nullable,
  CURRENT_DATETIME('Asia/Tokyo') AS log_time
FROM
  temp_schema_changes;

COMMIT TRANSACTION;

END;

関連記事

BigQueryのWITH句(CTE)とサブクエリの使い分け

クエリのスケジューリングで最新データをLooker Studioで可視化

BigQueryでJavaScript UDFを活用

おすすめ記事

最近の記事
おすすめ記事
  1. サーチコンソールでキーワードの検索順位を確認する方法

  2. GRCがエラーで使えない?代替ツールはこれ一択『The Auto 順位チェッカー』

  3. 「GA4でイベント数を確認する方法」 イベント・セッション・ユーザー単位の解説

  1. GRCがエラーで使えない?代替ツールはこれ一択『The Auto 順位チェッカー』

  2. 【GA4 SQL】ページの前後をBigQueryから取得

  3. 【検証】GA4 entrancesパラメータ

PAGE TOP