*日本語版は下記にあります。

1. Introduction: Why Data Pipelines Exist (and Why You Care)
Let’s start with a situation most teams run into sooner or later.
Your app stores fruit inventory in Firestore.
The app is happy. Firestore is happy.
Then the analytics team shows up and asks:
“Can we get this data in BigQuery… in real time?”
Suddenly, life is less happy.
Firestore is great for operational workloads—fast reads, writes, and transactions.
BigQuery is great for analytics—aggregations, dashboards, and reports.
But they speak very different languages.
That’s where data pipelines come in.
A data pipeline’s job is simple in theory:
- listen for data changes
- move data somewhere else
- reshape it so it’s actually useful
In this post, we’ll build a real-time Firestore → BigQuery pipeline using Google Cloud’s managed services. It’s fully serverless, production-friendly, and can be up and running in about 15 minutes.
(There is one small piece of custom code—a Cloud Run “bridge.” Don’t worry, it’s boring code, which is exactly what you want.)
Code & Repository
All infrastructure glue code and pipeline logic lives in a single Git repository.
GitHub Repository: https://github.com/pranavnijampurkar33/ml-pipeline
The repository contains:
- a minimal Cloud Run service to bridge Eventarc → Pub/Sub
- an Apache Beam (Dataflow) streaming pipeline
- deployment scripts and configuration examples
2. Data Pipelines, Without the Buzzwords
Before we draw boxes and arrows, let’s get aligned on terms.
What Is a Data Pipeline, Really?
At its core, a data pipeline does three things:
- Ingests data (events, records, files)
- Transforms data (format, structure, types)
- Delivers data to another system
There are two common flavors:
- Batch pipelines
- Run on a schedule.
- Great for nightly reports.
- Streaming pipelines
- React to events as they happen.
- Great when “now” actually means now.
In this article, we’re firmly in streaming territory.
Where Do Pipelines Show Up?
Pretty much everywhere:
- Orders flowing into analytics
- Inventory updates powering dashboards
- User activity feeding product insights
- IoT sensors writing time-series data
- Audit logs stored for compliance
Any time you want to decouple your application from analytics, a pipeline is your friend.
3. The Architecture: Firestore → BigQuery
Now let’s look at what we’re building.
High-Level Flow

Yes, that’s a few components—but each one has a very specific job. No passengers, no fluff.
Let’s walk through them quickly.
Firestore (Source)
Firestore is where our operational data lives.
Whenever a document is created, updated, or deleted, Firestore emits an event.
Eventarc (Event Detection)
Eventarc listens for those Firestore changes and delivers them as CloudEvents over HTTP.
Think of Eventarc as the messenger that says:
“Hey, something changed. Here’s what happened.”
Cloud Run Bridge (The Translator)
Here’s the slightly unusual part.
- Eventarc delivers events over HTTP
- Pub/Sub expects events to be published
- Eventarc can’t publish directly to Pub/Sub
So we add a tiny Cloud Run service whose only job is to:
- receive the HTTP CloudEvent
- normalize the payload
- publish it to Pub/Sub
This service doesn’t do business logic. It doesn’t store state.
It’s basically a translator standing between two systems that don’t speak the same protocol.
Pub/Sub (The Buffer)
Pub/Sub gives us:
- buffering
- retries
- decoupling
- the ability to add more consumers later
Firestore doesn’t need to know how fast Dataflow is. Pub/Sub handles the awkward timing.
Dataflow (The Worker)
Dataflow is where the real work happens:
- parsing document paths
- converting Firestore’s typed fields
- flattening nested data
- validating schemas
If this pipeline were a factory, Dataflow would be the assembly line.
BigQuery (Destination)
Finally, the data lands in BigQuery:
- structured
- queryable
- optimized for analytics
This is where dashboards, reports, and ad-hoc queries live happily ever after.
4. Understanding the Data (Before Writing Any Code)
Before we deploy anything, let’s look at what the data actually looks like.
Firestore Input
Firestore events arrive as protobuf-encoded documents with typed fields:
{ "name":"projects/.../documents/fruits/apple-123", "fields":{ "name":{"stringValue":"Apple"}, "color":{"stringValue":"Red"}, "taste":{"stringValue":"Sweet"}, "price":{"doubleValue":1.5}, "origin":{"stringValue":"Japan"}, "in_stock":{"booleanValue":true} }, "updateTime":"2025-11-13T06:00:00Z" }
This format is excellent for Firestore…
and absolutely not what you want to run analytics on.
Diagram: Firestore Document Event Format
Below diagram shows the structure of a Firestore document change event, including the document path, typed fields, and metadata.

BigQuery Output
In BigQuery, we want something much simpler:
- one row per event
- flat columns
- consistent types
- extra metadata for analysis
For example:
"event_id" STRING "event_type" STRING "fruit_id" STRING "name" STRING "color" STRING "price" NUMERIC "in_stock" BOOLEAN "timestamp" TIMESTAMP "processed_at" TIMESTAMP
5. Quick PoC: Wiring Everything Together
At this point, we understand:
- the architecture
- the data formats
- why each component exists
Now we build.
The goal here isn’t perfection—it’s an end-to-end working pipeline. Each step takes only a minute or two.
Prerequisites
- GCP project with billing enabled
- APIs enabled: Firestore, Eventarc, Cloud Run, Pub/Sub, Dataflow, BigQuery
gcloudCLI authenticated
Steps (Unchanged, Just Less Stressful)
- Deploy Cloud Run bridge
- Create Pub/Sub topic & subscription
- Create Eventarc trigger
- Create BigQuery dataset & table
- Deploy Dataflow pipeline
- Write a test document to Firestore
If data shows up in BigQuery, you win 🎉
If not, the checklist tells you exactly where to look.

6. Key Implementation Details (The Only Code You Actually Write)
GitHub Repository: https://github.com/pranavnijampurkar33/ml-pipeline
Let’s zoom in on the two places where code exists.
Before reading anything, keep this mental model:
Cloud Run normalizes events
Pub/Sub buffers them
Dataflow flattens them
BigQuery stores them
If you remember that, the code makes sense.
Cloud Run Bridge (main.py)
This service is intentionally boring. That’s a compliment.
Its entire job:
- receive a CloudEvent
- extract metadata
- decode the Firestore payload
- publish JSON to Pub/Sub
Pseudocode: Main Handler
@app.route("/", methods=["POST"]) defhandle_event(): event = from_http(request.headers, request.data) firestore_payload = parse_firestore_event_body(request.data) normalized = { "type": event.get("type"), "source": event.get("source"), "id": event.get("id"), "time": event.get("time",""), "data": firestore_payload, } publisher.publish(TOPIC_PATH, json.dumps(normalized).encode()) return {"status":"ok"},200
What This Service Is Not
- Not stateful
- Not doing transformations
- Not business logic
- Not clever
And that’s exactly why it’s safe and easy to maintain.
Dataflow Pipeline (dataflow_pipeline.py)
Dataflow does the heavy lifting.
For each Pub/Sub message, it:
- figures out whether this was a create, update, or delete
- extracts the document ID from the path
- unwraps Firestore’s typed fields
- builds a BigQuery-ready row
Pseudocode: Core Transformation
classParseCloudEvent(beam.DoFn):
defprocess(self, element):
msg = json.loads(element.decode())
document = extract_document(msg)
fruit_id = extract_id(document["name"])
fields = convert_firestore_fields(document["fields"])
yield {
"event_type": detect_action(msg),
"fruit_id": fruit_id,
"name": fields.get("name"),
"price": fields.get("price"),
"in_stock": fields.get("in_stock"),
"processed_at": now(),
}
Pipeline Definition
(pipeline | ReadFromPubSub(...) | beam.ParDo(ParseCloudEvent()) | WriteToBigQuery(...) )
That’s the pipeline. No magic, just plumbing.
7. Why This Architecture Works Well
Every component exists for a reason:
- Cloud Run keeps Eventarc and Pub/Sub loosely coupled
- Pub/Sub absorbs spikes and retries safely
- Dataflow handles schema evolution and transformation
- BigQuery stays clean and analytics-friendly
This pattern scales from:
- “tiny PoC”
- to “millions of events per day”
…without changing the core design.
8. Monitoring and Troubleshooting (Because Things Break)
Here’s what to watch:
- Cloud Run: request count, errors
- Pub/Sub: backlog size
- Dataflow: worker count, throughput
- BigQuery: insert errors
Example failure pattern:
Pub/Sub backlog growing + Dataflow workers flat
→ auto-scaling is probably disabled
Most issues show up clearly if you know where to look.
9. Going Beyond the PoC
Once this is running, it’s easy to extend:
- dead-letter queues
- validation rules
- alerts
- cost budgets
- fan-out to multiple consumers
- real-time ML inference
The hard part—wiring the pipeline—is already done.
10. Final Thoughts
We built a real-time Firestore → BigQuery pipeline that is:
- serverless
- scalable
- production-ready
- easy to reason about
More importantly, it’s a pattern you can reuse for many use cases—not just fruit 🍎
If you’ve ever struggled to get operational data into analytics cleanly, this setup will save you a lot of future pain.
GitHub Repository: https://github.com/pranavnijampurkar33/ml-pipeline
References
- Google Cloud Eventarc Documentation
- Apache Beam Documentation
- Dataflow Best Practices
- BigQuery Streaming Inserts
- Cloud Run Documentation
- Pub/Sub Documentation
日本語版:
約15分で構築する、GCPのリアルタイムデータパイプライン

1. なぜデータパイプラインが必要なのか?( そして、なぜ気にするべきなのか)
まずは、多くのチームがいずれ必ず直面する状況から始めましょう。
私のアプリは、Firestore に果物の在庫データを保存しています。 アプリは満足。Firestore も満足。 そこに、分析チームが現れてこう言います。
「このデータ、BigQuery に…リアルタイムで入れられる?」
その瞬間、世界は少しだけ厳しくなります。
Firestore は 業務用途(オペレーショナルワークロード) にとても優れています。
高速な読み書き、トランザクション処理——文句なしです。
一方で、BigQuery は 分析用途 のプロ。
集計、ダッシュボード、レポート作成はお手のものです。
問題は、この2つがまったく違う言語を話していること。
そこで登場するのが データパイプライン です。
データパイプラインの役割は、理論上とてもシンプルです。
- データの変更を検知する
- データを別の場所へ運ぶ
- 分析に使える形に整える
本記事では、Google Cloud のマネージドサービスを使って
Firestore → BigQuery のリアルタイムデータパイプライン を構築します。完全にサーバーレスで、プロダクションにも耐えられ、しかも 約15分 で動かせます。
(カスタムコードはひとつだけ。Cloud Run の「ブリッジ」です。安心してください、とても退屈なコードです。そして、こういうコードほど信頼できるものはありません。)
コードとリポジトリについて
本記事で使用するインフラ構成用のグルーコードおよびパイプラインの実装は、1つの Git リポジトリにまとめています。
GitHub Repository: https://github.com/pranavnijampurkar33/ml-pipeline
このリポジトリには、以下の内容が含まれています。
- Eventarc から Pub/Sub への連携を行う、最小構成の Cloud Run サービス
- Apache Beam(Dataflow)によるストリーミングパイプライン
- デプロイ用スクリプトおよび各種設定例
2. バズワード抜きで理解するデータパイプライン
図やアーキテクチャの話に入る前に、まずは用語の整理をしておきましょう。
そもそも、データパイプラインとは何か?
本質的に、データパイプラインが行うことは次の3つです。
- データを取り込む(イベント、レコード、ファイルなど)
- データを変換する(フォーマット、構造、型の変換)
- 別のシステムへ届ける
一般的に、データパイプラインには次の2つの種類があります。
- バッチパイプライン
- 定期実行されるパイプライン
- 日次・時間単位のレポートに適している
- ストリーミングパイプライン
- イベント発生と同時に処理を行う
- 「今すぐ」データが必要なケースに向いている
本記事で扱うのは、後者の ストリーミングパイプライン です。
データパイプラインはどこで使われているのか?
データパイプラインは、実際にはさまざまな場面で利用されています。
- 注文データを分析基盤へ連携するケース
- 在庫更新をダッシュボードに反映するケース
- ユーザー行動をプロダクト分析に活用するケース
- IoT センサーのデータを時系列データとして保存するケース
- 監査ログをコンプライアンス用途で保管するケース
アプリケーションと分析基盤を疎結合にしたい場合、データパイプラインは非常に有効な選択肢になります。
3. アーキテクチャ:Firestore から BigQuery へ
ここからは、今回構築するパイプラインの全体像を見ていきます。
全体のデータフロー

一見するとコンポーネントが多く見えるかもしれませんが、それぞれが明確な役割を持っており、不要な要素は含まれていません。
以下で、各コンポーネントを順に見ていきます。
Firestore(Source)
Firestore は、アプリケーションの オペレーショナルデータ を保持するデータベースです。
ドキュメントが作成・更新・削除されるたびに、Firestore は対応するイベントを発行します。このイベントが、今回のパイプラインの起点になります。
Eventarc (Event Detection)
Eventarc は Firestore の変更イベントを検知し、それらを CloudEvent として配信します。配信は HTTP 経由で行われ、「どのドキュメントに、どのような変更が起きたか」といった情報が含まれます。
Eventarc は、
「何かが変わった」という事実を確実に通知する役割を担っています。
Cloud Run Bridge (The Translator)
ここが、今回のアーキテクチャでやや特徴的な部分です。
- Eventarc は HTTP 経由 でイベントを配信します
- Pub/Sub は メッセージを publish する 形式を前提としています
- Eventarc から Pub/Sub へ直接イベントを送ることはできません
そのため、間に Cloud Run の小さなサービス を配置します。この Cloud Run サービスの役割は非常に限定的です。
- HTTP で CloudEvent を受信する
- ペイロードを正規化する
- Pub/Sub にメッセージとして publish する
ビジネスロジックは持たず、状態も保持しません。
異なるプロトコル間をつなぐ 変換用のブリッジ として機能します。
Pub/Sub (The Buffer)
Pub/Sub は、イベントを一時的に保持し、下流の処理と疎結合にする役割を担います。これにより、以下のメリットが得られます。
- 一時的な負荷増加に対するバッファリング
- 自動リトライ
- 将来的なコンシューマー追加(ファンアウト)
Firestore 側は、
「Dataflow がどれくらいの速度で処理しているか」を意識する必要がありません。この調整を Pub/Sub が吸収します。
Dataflow (The Worker)
Dataflow は、パイプラインの中で 最も多くの処理を行うコンポーネント です。具体的には、次のような処理を担当します。
- Firestore ドキュメントパスの解析
- Firestore の型付きフィールドの変換
- ネストされたデータのフラット化
- BigQuery スキーマへのマッピングおよび検証
もしこのパイプラインを工場に例えるなら、Dataflow は 実際に組み立て作業を行う生産ライン に相当します。
BigQuery(Destination)
最終的に、変換されたデータは BigQuery に書き込まれます。BigQuery では、データは次のような形で利用できます。
- 構造化されている
- SQL でクエリ可能
- 分析・ダッシュボード用途に最適化されている
ここで、レポート作成やアドホック分析が行われることになります。
4. コードを書く前に、データを理解する
ここまででアーキテクチャの全体像は把握できました。次に重要なのは、パイプラインを流れるデータがどのような形をしているか を理解することです。
実装に入る前にこれを確認しておくと、後のコードや設定が格段に理解しやすくなります。
Firestore Input
Firestore の変更イベントは、型情報を含んだ protobuf 形式のドキュメント として届きます。
例として、次のようなデータが送られてきます。
{ "name":"projects/.../documents/fruits/apple-123", "fields":{ "name":{"stringValue":"Apple"}, "color":{"stringValue":"Red"}, "taste":{"stringValue":"Sweet"}, "price":{"doubleValue":1.5}, "origin":{"stringValue":"Japan"}, "in_stock":{"booleanValue":true} }, "updateTime":"2025-11-13T06:00:00Z" }
この形式は Firestore にとっては非常に都合が良いものです。
各フィールドの型が明示されており、正確なデータ表現が可能です。
一方で、このままの形で 分析用途に使うのは適していません。
Diagram: Firestore Document Event Format
この図は、Firestore のドキュメント変更イベントがどのような構造を持っているかを示しています。

- ドキュメントの完全なパス
- 型付きのフィールド(
stringValue、doubleValueなど) - 更新時刻などのメタデータ
この時点では、データはまだ オペレーショナル寄りの表現 になっています。
BigQuery Output
BigQuery 側では、まったく異なる形のデータが求められます。求められるのは、次のような特徴を持つデータです。
- イベントごとに 1行
- ネストされていない フラットなカラム構造
- 一貫したデータ型
- 分析に使いやすいメタデータの付加
例えば、以下のようなスキーマになります。
"event_id" STRING "event_type" STRING "fruit_id" STRING "name" STRING "color" STRING "price" NUMERIC "in_stock" BOOLEAN "timestamp" TIMESTAMP "processed_at" TIMESTAMP
5. クイック PoC:全体をつなぐ
ここまでで、次の点はすでに把握できています。
- 全体のアーキテクチャ
- パイプラインを流れるデータの形式
- 各コンポーネントが存在する理由
それでは、実際に構築していきましょう。
このセクションの目的は、完璧な実装を目指すことではありません。まずは、エンドツーエンドで動作するパイプラインを短時間で作ること がゴールです。
各ステップは 1〜2 分程度で完了し、途中で動作確認もしながら進められる構成になっています。
Prerequisites
- 課金が有効化された GCP プロジェクト
- 以下の API が有効になっていること
- Cloud Firestore API
- Eventarc API
- Cloud Run API
- Pub/Sub API
- Dataflow API
- BigQuery API
gcloudCLI がインストールされ、認証済みであること
手順:内容はそのまま、気持ちは少し軽く
以下の手順で進めます。
- Cloud Run ブリッジをデプロイ
- Pub/Sub のトピックとサブスクリプションを作成
- Eventarc トリガーを作成
- BigQuery のデータセットとテーブルを作成
- Dataflow パイプラインをデプロイ
- Firestore にテスト用ドキュメントを書き込む
すべて完了した後、
BigQuery にデータが表示されていれば成功です 🎉もし表示されない場合でも、
チェックポイントが明確なので、どこで問題が起きているかを簡単に切り分けることができます。

6. 実際に書くコードは、ここだけ
本セクションでは、このパイプラインの中で実際にコードを書く必要がある部分 にフォーカスします。
GitHub Repository: https://github.com/pranavnijampurkar33/ml-pipeline
インフラ構成やマネージドサービスの設定が中心となる中で、コードが必要なのは、次の 2 箇所だけです。
- Cloud Run ブリッジ
- Dataflow パイプライン
まず最初に:コードを読むためのメンタルモデル
コードを見る前に、次の整理を頭に入れておくと理解がスムーズです。
Cloud Run:イベントを正規化する Pub/Sub:イベントをバッファリングす Dataflow:データをフラット化・変換する BigQuery:分析用に保存する
この流れを意識しておくと、それぞれのコードが「なぜ存在しているのか」が明確になります。
Cloud Run Bridge (main.py)
Cloud Run ブリッジは、意図的にシンプルに作られたサービス です。
複雑なロジックは持たず、役割は明確に限定されています。
Cloud Run ブリッジの役割
- Eventarc から送られてくる CloudEvent を受信する
- イベントのメタデータを抽出する
- Firestore のペイロードをデコードする
- JSON 形式に正規化し、Pub/Sub に publish する
Pseudocode: Main Handler
@app.route("/", methods=["POST"]) defhandle_event(): event = from_http(request.headers, request.data) firestore_payload = parse_firestore_event_body(request.data) normalized = { "type": event.get("type"), "source": event.get("source"), "id": event.get("id"), "time": event.get("time",""), "data": firestore_payload, } publisher.publish(TOPIC_PATH, json.dumps(normalized).encode()) return {"status":"ok"},200
このサービスが「やらないこと」
Cloud Run ブリッジは、次のことを 意図的に行いません。
- 状態管理
- データ変換や集計
- ビジネスロジックの実装
- 複雑な処理
こうした責務を持たせないことで、
サービスは小さく、壊れにくく、保守しやすくなります。
Dataflow Pipeline (dataflow_pipeline.py)
Dataflow は、このパイプラインの中で
最も多くの処理を担当するコンポーネント です。
Pub/Sub から受信したメッセージに対して、
次の処理を順に行います。
- イベントが create / update / delete のどれかを判定
- ドキュメントパスから ID を抽出
- Firestore の型付きフィールドを通常の値に変換
- BigQuery に書き込めるレコードを生成
Pseudocode: Core Transformation
classParseCloudEvent(beam.DoFn):
defprocess(self, element):
msg = json.loads(element.decode())
document = extract_document(msg)
fruit_id = extract_id(document["name"])
fields = convert_firestore_fields(document["fields"])
yield {
"event_type": detect_action(msg),
"fruit_id": fruit_id,
"name": fields.get("name"),
"price": fields.get("price"),
"in_stock": fields.get("in_stock"),
"processed_at": now(),
}
Pipeline Definition
(pipeline | ReadFromPubSub(...) | beam.ParDo(ParseCloudEvent()) | WriteToBigQuery(...) )
この構成が、Dataflow パイプラインの全体像です。
特別な魔法はなく、イベントを読み取り、変換し、書き込むという非常に素直な処理になっています。
7. なぜこのアーキテクチャがうまく機能するのか?
このアーキテクチャでは、すべてのコンポーネントが明確な理由を持って配置されています。
- Cloud Run は、Eventarc と Pub/Sub を疎結合に保つ役割を担います
- Pub/Sub は、スパイク的な負荷を吸収し、安全にリトライを行います
- Dataflow は、スキーマの変化への対応やデータ変換を担当します
- BigQuery は、分析に適したクリーンな状態を維持します
このパターンは、
- 「小さな PoC」から
- 「1 日に数百万イベントを処理する規模」まで
コアとなる設計を変更することなく スケールさせることが可能です。
8. 監視とトラブルシューティング
監視すべき主なポイントは以下のとおりです。
- Cloud Run:リクエスト数、エラー数
- Pub/Sub:バックログのサイズ
- Dataflow:ワーカー数、スループット
- BigQuery:インサートエラー
代表的な障害パターンの一例は次のとおりです。
Pub/Sub のバックログが増加している
+ Dataflow のワーカー数が増えていない
→ オートスケーリングが無効になっている可能性があります
9. PoC のその先へ
このパイプラインが一度動き始めれば、拡張は比較的容易です。
- デッドレターキューの追加
- バリデーションルールの実装
- アラートの設定
- コストバジェットの設定
- 複数コンシューマーへのファンアウト
- リアルタイム ML 推論
10. まとめ
本記事では、Firestore → BigQuery のリアルタイムデータパイプライン を構築しました。このパイプラインは、次の特徴を持っています。
- サーバーレス
- スケーラブル
- プロダクション利用を前提とした設計
- 理解しやすい構成
さらに重要なのは、これが 特定のユースケースに限定された実装ではなく、再利用可能な「パターン」 であるという点です。
果物の在庫管理 🍎 に限らず、さまざまなケースに応用できます。
オペレーショナルデータを分析基盤へきれいに連携することに苦労した経験がある場合、
この構成は今後の開発において、多くの手間やトラブルを減らしてくれるはずです。
参考資料
- Google Cloud Eventarc ドキュメント
- Apache Beam ドキュメント
- Dataflow ベストプラクティス
- BigQuery ストリーミングインサート
- Cloud Run ドキュメント
- Pub/Sub ドキュメント
We Are Hiring !!
UPSIDERでは現在積極採用をしています。 ぜひお気軽にご応募ください。
UPSIDER Engineering Deckはこちら📣