UPSIDER Tech Blog

BigQuery から Salesforce への Reverse ETL パイプラインの紹介

こんにちは、UPSIDER のデータチームです。

UPSIDER では、複数の事業を展開する中で、事業間のシナジーを最大化を目指し、昨年から Salesforce の導入を進めています。このプロジェクトは、単なるツール導入にとどまらず、営業やカスタマーサポート、事業開発といった各チームのデータを一元管理し、情報共有をスムーズにすることで、業務効率化や事業成長を後押しする重要な取り組みです。

Salesforce の導入に伴い、データチーム側では次のようなタスクに取り組みました。

  • 既存クエリの参照テーブルを以前に使用していた CRM から Salesforce のものへ書き換え
  • Salesforce から BigQuery へのデータパイプライン構築
  • BigQuery から Salesforce への Reverse ETL パイプライン構築

今回は、 これらデータチームのタスクのうち3つ目の、「BigQuery から Salesforce への Reverse ETL パイプライン構築」について紹介します。

Salesforce にはレポート機能があるため、集計データを直接 Salesforce 上に載せることで、ビジネスメンバーがデータチームを介さずダッシュボード作成や分析ができるようになります。これにより、「ダッシュボードを作ったけど見られない」「ダッシュボードを作るためのリードタイムが長い」といった課題の解消が期待できます。

以下では、 Reverse ETL パイプライン全体の構成や、構築にあたっての課題とその対策についてご紹介します。

構成

UPSIDER では全社的に Google Cloud をインフラとして利用していて、データチームでも Cloud Run Functions/Service/Job や Workflows を活用し、 Google Sheets や GitHub 等の API からデータを取得する データパイプラインを構築してきました。

今回の BigQuery → Salesforce のパイプラインも他のジョブとあわせ、Python の Cloud Run Job で実装しました。

パイプラインの構成図は下図の通りで、 Cloud Run Job と BigQuery 以外には

  • 定時実行するための Cloud Scheduler Job
  • Salesforce の認証情報を格納する Secrets Manager Secret
  • CI/CD 用の GitHub Actions workflows
  • アラートを通知するための Slack チャンネル

などのリソースがあります。

Salesforce には「Bulk API」という複数のレコードを一括で変更するための API が用意されていますが、今回は Bulk API のエンドポイントに直接 requests 等でアクセスするのではなく、 simple-salesforce というライブラリを使うことにしました。

simple-salesforce は公式のライブラリではないものの、スター数が多め (2024年12月現在 1.7k)・ドキュメントがある程度わかりやすい・データチームのユースケースに対応していそう、といったの理由から採用しました。

たとえば、Bulk API で Account オブジェクトの update をする場合、以下のようなコードになり、コードの記述が簡潔になります。

data = [
      {'Id': '0000000000AAAAA', 'Email': 'examplenew@example.com'},
      {'Id': '0000000000BBBBB', 'Email': 'testnew@test.com'}
    ]

sf.bulk.Contact.update(data,batch_size=10000,use_serial=True)

(https://github.com/simple-salesforce/simple-salesforce?tab=readme-ov-file#using-bulk より引用)

認証

Salesforce API には、いくつか認証の方法がありますが、今回は “Connected App” というものを使いました。Connected App のイメージとしては Slack のアプリに近く、アプリを作成して、アプリを認可した後に、アプリの認証情報で API リクエストが叩けるようになります。

認証を通すまでの手順は、大まかに以下の通りです。

  1. Salesforce 上に Connected App を作成する
  2. Connected App の client ID, client secret を取得する
  3. ブラウザから認可コードを取得する
  4. 認可コードから access token を取得する
  5. access token を使って API を叩く

なお、 Access token の有効期限が切れた場合は、ジョブに access token とあわせて client ID, client secret, refresh token も格納しておくと、プロブラムから access token の再発行ができます。

Connected App を設定する際は、以下の設定がポイントになります。

  • Enable OAuth Settings: on
  • Callback URL: https://login.salesforce.com/ など
    • 認可コード取得時の redirect_uri と完全一致する必要あり
  • Selected OAuth scopes
    • Manage user data via APIs (api)
    • Perform requests at any time (refresh_token, offline_access)

権限の設定

前提: Connected App が持つ権限

単純に SalesforceAPI 経由でデータの更新をするだけなら、上記の手順で実現できますが、このままだとパイプラインに必要以上の権限が与えられる可能性があります。

というのも、チーム内で実験したところ、アクセストークンに与えられるオブジェクトへのアクセス権限は、 Connected App を認可したユーザの権限になっていそうなことがわかりました。したがって、管理者権限を持ったユーザが Connected App を認可すると、広範囲のオブジェクトの編集・削除権限が与えられる一方、オブジェクトへのアクセス権限が足りていないユーザが発行したアクセストークンを使用すると、API リクエストが 404 や 403 で返ってきました。

この点はドキュメントにあまりはっきりとは書かれていませんが、 “Manage user data via APIs” OAuth スコープの説明)を見ると、

Allows access to the current, logged-in user’s account using APIs, such as REST API and Bulk API 2.0. This scope also includes chatter_api, which allows access to Connect REST API resources.

と記載があり、 ”access to the current, logged-in user’s account” というのが「認可したユーザの権限でオブジェクトにアクセスする」という挙動に該当すると考えられます。

このままだと最小権限の原則をまもれておらず、パイプラインにバグが潜んでいた際に意図しないデータが書き換えられるなどのリスクがあるため、データチームで Connected App に与える権限を最小限にする方法を探しました。

専用ユーザ・プロファイルの作成

まずできることとして、「Connected App が認可したユーザの権限を使ってオブジェクトにアクセスする」という点を利用して、

  • オブジェクトへの最小権限を持った Salesforce プロファイルを作成する
  • Salesforce ユーザを作成する
  • ユーザにプロファイルを割り当てる

という設定をして、新規にアクセストークンを発行する場合は、このユーザにログインしてから認可する運用にすることができます。

Salesforce のプロファイルはユーザに与える

  • オブジェクトの権限
  • オブジェクトのフィールドレベル権限
    • フィールド単位の read/write 権限

などを設定することができ、プロファイルをユーザに割り当てると、ユーザにその権限が適用されます。

オブジェクト単位の権限を設定するセクション

フィールド単位の権限を設定するセクション

(フィールドはオブジェクトごとに独立したページになっていて、フィールドごとに Read/Edit Access を設定できます)

ただ、この方法ではうっかり自分のアカウントで認可してしまうリスクがあるため、もう一つ対策を入れます。

Connected App でプロファイルを制限する

Connected App には、特定のプロファイルを持ったユーザしかアプリを認可できないように設定できます。

そこで、

  • 最小権限をもつプロファイルを作成
  • Connected App を認可できるプロファイルを制限

の2つをあわせると、 「Connected App を認可できるのは最小権限のプロファイルがあたったユーザのみ」となり、権限をシステム的に絞ることができます。

Connected App 、 ユーザ、プロファイルの関係

Connected App を使用できる Profile 設定をするためには、以下のスクリーンショットのように、

  • OAuth Policies > Permitted Users: Admin approved users are pre-authorized を選択
  • Profiles に作成したプロファイルを選択

と設定します。

さいごに

今回は、Python で書いた BigQuery → Salesforce の Reverse ETL パイプラインの概要と、より安全に運用するための権限の絞り込みについてご紹介しました。Salesforce 権限については、データパイプライン以外でも Salesforce API を活用する際に出てくる内容なので、広く Salesforce まわりの仕組みを開発・運用中の皆さまにとって少しでも参考になりましたら幸いです!

なお、今回触れなかった Salesforce → BigQuery のパイプラインについては、 Google Cloud の Data Transfer に Salesforce のコネクタが用意されていて、認証情報を設定さえすれば定期的なデータの転送ができるようになっています。2025年1月現在はまだ Preview ですが、要件に合う場合は、あわせてご検討いただくと良いかもしれません。

https://cloud.google.com/bigquery/docs/salesforce-transfer

UPSIDER ではデータサイエンティストやデータエンジニアをはじめ、各チームで絶賛採用中なので、興味を持っていただけましたらぜひ下記のリンクをご覧ください!

herp.careers

UPSIDER Engineering Deckはこちら📣

speakerdeck.com