
こんに
目次
目次を開く
- 1. ミライCRMとは
- 2. 全体
アーキテクチャ - 3. 技術スタックの
選定理由 - 3.1 バックエンド: Go
- 3.2 バックエンド: TypeScript
(Cloudflare Workers / Analytics Agent) - 3.3 フロントエンド: React + TanStack Router
- 3.4 API: Connect RPC
(gRPC互換) + Protocol Buffers - 3.5 データストア: PostgreSQL + ClickHouse + Redis
- 3.6 SQL → Go の
橋渡し: sqlc - 3.7 非同期処理: Cloud Pub/Sub と
Temporal の 使い分け - 3.8 認証: Clerk
- 3.9 エッジ: Cloudflare Workers
- 3.10 インフラ: Google Cloud / Cloud Run
- 4. これから
1. ミライCRMとは
ミライCRM は、「次世代AI LINE CRMツール」 を
プロダクトと
EC事業者を
- 配信業務の
売上 最大化 - テナントの
業務効率化 - AIに
よる 施策提案・運用支援
を
主な機能
機能は
メッセージ配信
- セグメント配信: ECや
LINE上の データを 元に、 特定の セグメントに のみ 配信する - シナリオ配信: ユーザーの
アクションを トリガーに、 メッセージを 自動配信する
顧客管理
- 1対1チャット: 友だちとの
個別チャットを ミライCRM内で 完結させる - タグ管理: 手動/ユーザー行動に
応じた 自動付与で タグを 管理する - 回答フォーム機能: 自由記述・アンケート用の
フォームを 作成して 配信する - ランク機能: ECでの
購入金額に 応じた 会員ランク付与と、 ランク連動の クーポン発行
LINE内コンテンツ
- Flexメッセージ: ブロックを
組み合わせた 自由な レイアウトの メッセージ。生成AIに よる をFlexメッセージ 自動生成 搭載 - リッチメニュー: LINE公式アカウント単体では
できない 「複数タブ表示」 「セグメント配信」に 対応 - シナリオ設定: 選択肢に
応じて シナリオが 分岐する 応答botを 作成する
分析
- 友だち分析: 有効友だち・ブロック数などを
時系列で 確認 - 配信分析: 配信ごとに
開封率・CTR・CVR・売上・ROASを ファネル形式で 一覧 表示。 前月比の 変動も 自動算出 - 流入経路分析URL: 流入経路別に
友だち追加URLを 発行・タグ付与 - CSV出力: 友だち・配信データを
CSVで 抽出 - AI分析:
「直近1ヶ月の 配信効果を 分析してください」と いった 自然言語の 問い 合わせに 対し、 LINE × ECデータを 横断して インサイトと ネクストアクションを 返す
AIによる業務効率化
ミライCRMの
- AI Flexメッセージ生成: AIが
配信用Flexメッセージの デザインを 自動生成 - AIチャットアシスタント: 問い
合わせ内容を 理解し、 最適な 返信案を 提示 - AI配信分析: 配信の
パフォーマンスを 解析し、 インサイトと ネクストアクションを 提示
「専任の
これらの
2. 全体アーキテクチャ
現状の
- フロントエンド: 管理者向けWebアプリ/LIFFアプリ/Shopifyアプリ
- バックエンド: APIサーバー/ワーカー類
- データストア: PostgreSQL/ClickHouse/Redis
- 非同期処理基盤: Cloud Pub/Sub と
Temporal - エッジ/トラッキング基盤: Cloudflare Workers
- インフラ: Google Cloud
(Cloud Run中心)
全体
flowchart LR
subgraph Clients["Clients"]
Tenant[テナント運用者]
Customer[顧客<br/>LINE / Webサイト]
end
subgraph External["External Services"]
LINE[LINE Messaging API]
Ecforce[ecforce]
Clerk[Clerk]
Shopify[Shopify]
end
subgraph Edge["Cloudflare Edge"]
CFW[Tracking Edge Workers]
CFQueue[(Cloudflare Queues)]
end
subgraph GCP["Google Cloud"]
Web[Web Frontend]
LIFF[LIFF Web App]
ShopifyApp[Shopify App]
API[Main API Server]
Broker[Webhook Broker]
Worker[Temporal Worker]
Agent[Analytics Agent]
PubSub[(Cloud Pub/Sub)]
Temporal[(Temporal Cloud)]
PG[(PostgreSQL)]
CH[(ClickHouse)]
Redis[(Redis)]
end
Tenant --> Web
Customer --> LIFF
Customer --> CFW
Tenant --> ShopifyApp
Tenant --> Agent
Web --> API
LIFF --> API
ShopifyApp --> API
CFW --> CFQueue
CFQueue --> CH
LINE -- Webhook --> Broker
Ecforce -- Webhook --> Broker
Clerk -- Webhook --> Broker
Broker -- publish --> PubSub
Shopify -- ネイティブPub/Sub配送 --> PubSub
PubSub --> API
API --> PG
API --> CH
API --> Redis
API --> Temporal
Agent --> CH
Temporal --> Worker
Worker --> PG
Worker --> CH
Worker --> LINE
2.1 サービス構成
ミライCRMは
| サービス | 役割 |
|---|---|
| Main API Server | メインのAPIサーバー。Connect RPCでフロントエンドからのリクエストを受ける |
| Webhook Broker | LINE / Clerk / ecforce などからのWebhookを受け、Pub/Subにルーティングする |
| Temporal Worker | キャンペーン配信・データ同期などの長時間ワークフローを実行するワーカー |
| Web Frontend | テナント向けの管理画面(Reactアプリ) |
| LIFF Web App | LINE上で動く顧客向け画面(LIFFアプリ) |
| Shopify App | Shopify組み込みアプリ/Flowアクション拡張 |
| Analytics Agent | LINE × ECデータを横断で分析する、AI分析エージェント |
| Tracking Edge Workers | Cloudflare Workers上で動くトラッキング系エッジ関数群 |
2.2 マルチテナンシー
ミライCRMのorg_id カラムで
この
- 運用コストが
線形に : テナントごとに増えない DBを 切る サイロモデルは、 N個の マイグレーション実行・N個の バックアップ・N個の 接続プールを 管理する ことになる。 SaaSと して テナント数が 伸びる 前提に 立つと、 ここを 線形に しない 設計が 必須 - 横断クエリ・横断機能を
作りやすい : 全テナントを 横断した 内部分析・運用ダッシュボード・課金集計を、 追加の 集約パイプラインなしで 素直に 書ける - デプロイ・スキーマ変更が
シンプル : マイグレーションは1度流せば よく、 「ある テナントだけ古い スキーマで 動いている」 状態が 起きない
代わりに、
具体的には、org_id カラムをSET LOCAL app.org_id = ... でtenant_isolation_policy) で
これにWHERE org_id = ? をorg_id を
RLSをテストで強制する
ただし、
そこで、public スキーマのorg_id カラム・RLS有効・tenant_isolation_policy の
具体的には、pg_tables / pg_policies / information_schema.columns) を
func TestRLSEnabledOnAllRequiredTables(t *testing.T) {
t.Parallel()
// Arrange
db := NewTestDB(t)
ctx := context.Background()
// Act - Query to get all tables with their RLS status and org_id column existence
query := `
WITH table_info AS (
SELECT
t.tablename,
t.rowsecurity AS rls_enabled,
EXISTS (
SELECT 1
FROM information_schema.columns c
WHERE c.table_schema = 'public'
AND c.table_name = t.tablename
AND c.column_name = 'org_id'
) AS has_org_id,
EXISTS (
SELECT 1
FROM pg_policies p
WHERE p.schemaname = 'public'
AND p.tablename = t.tablename
AND p.policyname = 'tenant_isolation_policy'
) AS has_tenant_isolation_policy
FROM pg_tables t
WHERE t.schemaname = 'public'
ORDER BY t.tablename
)
SELECT * FROM table_info;
`
rows, err := db.QueryContext(ctx, query)
require.NoError(t, err)
defer rows.Close()
var failedTables []string
var tablesChecked int
// Assert
for rows.Next() {
var tableName string
var rlsEnabled bool
var hasOrgID bool
var hasTenantIsolationPolicy bool
err := rows.Scan(&tableName, &rlsEnabled, &hasOrgID, &hasTenantIsolationPolicy)
require.NoError(t, err)
// Skip excluded tables (e.g. organizations table itself)
if rlsExcludedTables[tableName] {
continue
}
tablesChecked++
if !hasOrgID {
failedTables = append(failedTables,
"Table '"+tableName+"' is missing 'org_id' column")
}
if !rlsEnabled {
failedTables = append(failedTables,
"Table '"+tableName+"' does not have RLS enabled")
}
if !hasTenantIsolationPolicy {
failedTables = append(failedTables,
"Table '"+tableName+"' does not have 'tenant_isolation_policy'")
}
}
require.NoError(t, rows.Err())
if len(failedTables) > 0 {
t.Errorf("RLS verification failed for the following tables:\n")
for _, failure := range failedTables {
t.Errorf(" - %s\n", failure)
}
}
assert.Greater(t, tablesChecked, 0, "No tables were checked")
}
ポイントは
- 対象を
「特定の : 新しいテーブル」ではなく 「全テーブル」に している テーブルを 足した とき、 テスト側で 何も 書き換えなくても 自動的に 検査対象に なる。 RLSを 張り忘れた 瞬間に テストが 落ちる - 検査を
PostgreSQL自身に : スキーマ定義ファイルを問い合わせている パースするのではなく、 pg_tables/pg_policiesという 実態に 問い合わせるので、 「マイグレーションは 書いたが DDLが 流れていない」のような ずれも 検知できる - 意図的に
除外したい : テナント横断のテーブルだけ rlsExcludedTablesで明示する メタテーブル ( organizations自身など、ごく 少数)は allowlistに 入れる。「除外する 判断は のが明示的に 書かないと 通らない」 大事で、 暗黙的に 抜ける ことを 許さない
さらに、
func TestRLSPolicyDefinition(t *testing.T) {
t.Parallel()
// Arrange
db := NewTestDB(t)
ctx := context.Background()
// Act
query := `
SELECT tablename, policyname, permissive, roles, cmd, qual
FROM pg_policies
WHERE schemaname = 'public'
AND policyname = 'tenant_isolation_policy'
ORDER BY tablename;
`
rows, err := db.QueryContext(ctx, query)
require.NoError(t, err)
defer rows.Close()
expectedQual := "(org_id = current_setting('app.org_id'::text))"
policiesChecked := 0
// Assert
for rows.Next() {
var tableName, policyName, permissive, cmd string
var roles []byte
var qual *string
err := rows.Scan(&tableName, &policyName, &permissive, &roles, &cmd, &qual)
require.NoError(t, err)
if rlsExcludedTables[tableName] {
continue
}
policiesChecked++
assert.Equal(t, "tenant_isolation_policy", policyName, "Policy name mismatch for table %s", tableName)
assert.Equal(t, "PERMISSIVE", permissive, "Policy should be PERMISSIVE for table %s", tableName)
assert.Equal(t, "ALL", cmd, "Policy command should be ALL for table %s", tableName)
if qual != nil {
assert.Equal(t, expectedQual, *qual, "Policy qualification mismatch for table %s", tableName)
}
}
require.NoError(t, rows.Err())
assert.Greater(t, policiesChecked, 0, "No policies were checked")
}
こちらは(org_id = current_setting('app.org_id'::text)) で
「RLSを
2.3 Webhook Broker を分離している理由
ミライCRMでは、
な
flowchart LR
LINE[LINE Messaging API]
Ecforce[ecforce]
Clerk[Clerk]
Shopify[Shopify]
Broker[Webhook Broker<br/>署名検証 / ルーティング]
PubSub[(Cloud Pub/Sub)]
API[Main API Server]
Worker[Temporal Worker]
LINE -- Webhook --> Broker
Ecforce -- Webhook --> Broker
Clerk -- Webhook --> Broker
Shopify -- ネイティブPub/Sub配送 --> PubSub
Broker -- publish --> PubSub
PubSub --> API
PubSub --> Worker
あえて
- 外部
サービスの : WebhookはSLAと 自社APIの SLAを 切り離す LINEや ecforce側から 見ると 「受信が 成功する こと」が 最優先で、 受信に 失敗すると リトライ・ 最悪は 配送停止に つながる。 Main APIの デプロイ中・障害中・スロークエリの 巻き添えで Webhook を 取りこぼすと、 LINEの 友だち追加イベントや ECの 注文通知が 欠損し、 後段の 業務データが 恒久的に 壊れる。「Webhookを 取り逃さない をこと」だけを 責務に した、 極力薄くて 壊れにくいサービス 独立に 置く 価値が 大きい - スパイクの
吸収レイヤーに : LINEのなる 一斉メッセージ送信後の 既読・クリックイベントや、 ecforceの キャンペーンセール時の 注文Webhookは、 瞬間的に 通常の 数十〜数百倍の リクエストが 飛んでくる。 Brokerは 受けたら 即Pub/Subに 積むだけなので、 後段の Main APIや Workerは 自分の ペースで 消化できる。外部からの 突発トラフィックが Main APIの レイテンシに 直接影響しない - 配送先の
ファンアウトを : 1本のPub/Sub に 任せられる Webhookを 「PostgreSQLへの 反映」 「ClickHouseへの イベント記録」 「Temporalワークフローの トリガー」など 複数の 用途に 流したい ケースが 多い。 Brokerが publish した 時点で、 購読側を 増やしても Broker 側の コード変更は 不要。新しい 用途を 足すたびに Main API の エンドポイントを 増やす必要が ない - デプロイ・スケーリングを
独立させる : Main API は機能追加で 頻繁に デプロイされるが、 Brokerは 安定して 動き続けて ほしい。 逆に Brokerは 外部トラフィックの 量だけで 水平に スケールしたい。 両者の ライフサイクルを 切り離す ことで、 Main APIの デプロイ中でも Webhook受信が 継続される
要するに、Webhook Broker は
3. 技術スタックの選定理由
まず、
| 用途 | Technology / Service |
|---|---|
| API インターフェース | Connect, Protobuf |
| フロントエンド | TypeScript, React, TanStack Router, TanStack Query, connect-web, shadcn/ui, Tailwind CSS, Vitest, happy-dom, GCS |
| E2E テスト | Playwright |
| API サーバー | Go, connect-go, sqlc, Atlas, Cloud Run |
| ワークフローエンジン | Temporal Cloud |
| データベース | Cloud SQL for PostgreSQL / ClickHouse |
| WAF | Cloud Armor |
| 認証 | Clerk |
| DB踏み台サーバー | GCE |
| IaC | Terraform |
| ソースコード管理 | GitHub |
| CI / CD | GitHub Actions, Blacksmith |
| ロギング | Grafana Cloud |
| モニタリング / エラートラッキング | Grafana Cloud, Sentry, Slack |
| 分析基盤 | ClickHouse, dbt, lightdash |
| Feature Flag / プロダクトアナリティクス | Statsig |
ここからは、
3.1 バックエンド: Go
メインAPI・Webhook Broker・Temporal Worker と
- 静的型付け+シンプルな
言語仕様で、 新しく 入った メンバーが コードを 読み 進めやすい - 並行処理
(goroutine / channel)が 言語レベルで 自然に 書ける。 ミライCRMでは 「LINEに 対して 数十万通の メッセージを 並行して 送る」ような 処理が 日常的に 必要で、 ここが Goの 得意領域と 相性が 良い - バイナリが
単一で デプロイが 軽く、 Cloud Run のような Serverless container との 組み合わせが そのまま 素直に ハマる - gRPC / Connect / sqlc / Temporal SDK など、
必要な 周辺ライブラリの エコシステムが 厚い
また
3.2 バックエンド: TypeScript(Cloudflare Workers / Analytics Agent)
メインの
- Tracking Edge Workers: Cloudflare Workers 上で
動く Webトラッキング/リンククリック計測の エッジ関数群 - Analytics Agent: LINE × ECデータを
横断で 問い 合わせる AI分析エージェント
それぞれGoではなく
Tracking Edge Workers
- ランタイムが
V8 isolates なので、 第一級の 言語サポートは JavaScript / TypeScript。 Goを WASM経由で 動かすことも できるが、 コールドスタートと バンドルサイズで 素直に 不利 - Hono / Wrangler / Cloudflare 公式SDKと
いった エッジ周辺の エコシステムが TypeScript前提で 揃っており、 wrangler deployするだけで型付きの バインディング (KV, Queues, Secrets など)が そのまま 使える - ロジック自体が
薄く (受け取って Cloudflare Queues に 積むだけ)、 Goに 揃える ことに よる 運用上の メリットよりも、 エッジネイティブな 書き方が できる メリットの 方が 大きい
Analytics Agent
- LLMエージェント周辺の
ツーリング (Mastra・MCPクライアント・ストリーミングUIプロトコルなど)が TypeScript / Pythonに 集中している。 Goから 扱うと 自分で ラッパーを 書き続ける 羽目になり、 エコシステムへの 追従コストが 線形に 効いてくる - AI機能は
プロンプト・ツール定義・レスポンス整形の 変更サイクルが 非常に 速い 領域で、 エコシステムの 最新版に すぐ 乗れる ことが そのまま 開発速度に なる
「Goに
3.3 フロントエンド: React + TanStack Router
フロントエンドの
- React: フロントエンドの
デファクトスタンダード。 エコシステム・知見・採用市場の いずれも 厚く、 長期で 運用する プロダクトの フロントエンド基盤と して 安全な 選択 - TanStack Router: SPA前提の
ルーターと して、 ファイルベースルーティング・型安全な search params・ルートローダーでの データ先読み・Suspense/Error境界の 宣言的な 扱いが 揃っており現状もっとも 噛み合う。 CRMの 画面では 「URLに フィルタ・ページネーション状態を 載せる」 「ルート遷移時に データを まとめて 取得する」のが 日常的なので、 ここの 型 安全性が 効く - TanStack Query: サーバーステートは
これに 集約。 Connect RPCの 生成クライアントと 組み合わせると、 createQueryOptions(rpcFn, params, { transport })のようにクエリキー管理まで 含めて 自然に 書ける - TanStack Form + valibot: ノーコードフォームビルダーや
キャンペーン編集など、 フォームの 状態管理が 複雑な 画面が 多い。 TanStack Form の フィールドAPIは 入れ子構造に 強く、 valibotとの 組み合わせで 型安全な バリデーションが 書ける - Vite: SPA前提なら、
開発サーバーの 起動・HMRが 圧倒的に 速い Viteを 選ばない 理由が ない - Tailwind CSS v4 + shadcn/ui (Base UI): 業務画面のように
「同じ パターンの 画面を 量産する」用途では、 デザインシステムを トークンと プリミティブで 管理した方が 早い。 DESIGN.mdでセマンティックトークン・スペーシング・コンポーネント方 針を 厳格に 定義し、 ぶれない 密度の UIを 保っている - oxlint + oxfmt: ESLint + Prettierから
移行。 Rustベースで 速く、 tsgolintで 型を 見た lintも 回せる
3.4 API: Connect RPC(gRPC互換)+ Protocol Buffers
APIインターフェースには
Connect を
- Protocol Buffersで
スキーマを 書けば、 Go と TypeScript の 両方の 型・クライアント・サーバースタブが 一気に 生成される - API定義が
OpenAPI YAMLより : Protobufは圧倒的に 読みやすい 型・サービス・メソッドが 専用構文で 構造化されているので、 スキーマ その ものが そのまま 仕様書と して 読める。 OpenAPIの 冗長な YAMLと 比べて、 PRレビューでも 差分が 追いやすい - HTTP/1.1に
ネイティブ対応している ので、gRPCと 違って ブラウザから 素直に 喋れる (Protobuf binary と JSON の どちらも first-class な エンコーディングと して 選べる)。 「ブラウザの ために 別途REST層を 維持する」と いう 典型的な 二重実装の 手間が ない - 同じ
ハンドラで Connect / gRPC / gRPC-Web を 同時に 受けられるので、 サーバー間 通信で gRPC を 使いたくなっても 別実装を 用意する 必要が ない - protovalidateに
よる : スキーマに宣言的な バリデーションが 柔軟 制約を 直接書けて、 サーバー側は インターセプター1つで 全エンドポイントに 自動適用できる。 クライアント側にも 同じ 制約を 配れるので、 バリデーションロジックの 二重実装が 要らない - ストリーミング・エラーコード・インターセプターが
標準化されているので、 認可・ロギング・監査ログの ミドルウェアが 書きやすい
「OpenAPIと
3.5 データストア: PostgreSQL + ClickHouse + Redis
データの
PostgreSQL(メインデータ)
- テナント・顧客・キャンペーン・オーディエンス・フォームなど、「正」に
なる は業務データ PostgreSQLに 置く - トランザクションが
必要、 JOINが 頻繁、 強い 整合性が 要る、と いう CRUD的な ドメインの ほぼ 全てを 引き受ける - スキーマ管理は
Atlas で 宣言的に 行い、 SQLから Goコードへの 変換は 次節で 触れる sqlc に 任せている
ClickHouse(イベント・顧客プロフィール・セグメント配信のデータソース)
ミライCRMに
具体的には、
- 顧客イベント
(Webトラッキング、 LINEイベント、 メッセージ開封・クリックなど) - 顧客プロフィール
(属性の 最新値) - セグメント所属
(顧客 × セグメントの メンバーシップ・履歴) - 監査・変更ログ
( 「いつ・誰の ・どの 属性が 変わったか」)
そして、
この
- 数千万〜数億行の
イベント/プロフィールから、 : PostgreSQLでは数秒以内に セグメント対象を 返す必要が ある 現実的でない 件数の 絞り込みを、 テナント運用者が 画面操作で 繰り返す。 列指向+MergeTreeエンジンの 圧倒的な 集計性能が そのまま 機能要件に なる - 「分析用DB」と
「配信用DB」を :二重化したくない 「Postgresで 配信用、 ClickHouseで 分析用」のように 二系統を 持つと、 片方だけ 更新が 遅れたり、 定義が ズレたりする。配信に 使った ことに絞り込み条件と、 その 配信の 分析が、 同じ テーブル・ 同じ 定義で 説明できる 価値が ある - イベントの
書き込みスループットを : トラッキングや犠牲に しない LINEイベントは 数十万〜数百万 events/day レベルで 流れ込む。 ClickHouseは この 書き込みを 直接受けつつ、 同時に 配信用の クエリにも 応えられる - マルチテナント前提の
スキーマ設計 : すべてのテーブルで ソートキーに org_idを含め、 テナントごとに クエリが パーティション・スキップで 効率化されるように している
テナントが
Incremental Materialized View で event log から「最新状態」を導出する
この
ClickHouseのReplacingMergeTree やAggregatingMergeTree などの
具体例: 「顧客属性ログ」から「最新顧客属性」を導出する
ミライCRMでは、
- 顧
客属性ログ (MergeTree): 属性が 書かれる たびに 1行追記される イベントログ。過去に どう を変化したか すべて 残す - 最新顧
客属性 (ReplacingMergeTree): 顧客 × 属性キーごとに 最新値だけ を 持つテーブル。 セグメント配信は ここを 読む
この
CREATE MATERIALIZED VIEW customer_traits_mv TO customer_traits AS
SELECT
org_id,
customer_id,
source,
key,
argMax(value, timestamp) AS value,
argMax(value_type, timestamp) AS value_type,
argMax(timestamp, timestamp) AS last_timestamp
FROM customer_trait_logs
GROUP BY org_id, customer_id, source, key;
顧argMax 集計がFROM 句もReplacingMergeTree(last_timestamp) とlast_timestamp をFINAL もargMax で
flowchart LR
Source[(LINE / Webトラッキング / ECイベント)]
Logs[("顧客属性ログ<br/>MergeTree / append-only log")]
MV{{顧客属性集約MV<br/>argMax by timestamp}}
Latest[("最新顧客属性<br/>ReplacingMergeTree<br/>最新値のみ")]
Segment[セグメント配信の絞り込みクエリ]
Source -- 属性書き込みイベント --> Logs
Logs -. INSERTブロックごとにtrigger .-> MV
MV -- 集計結果を流し込む --> Latest
Latest -- 最新値を読む --> Segment
結果と
- 書き込み側は
イベントログに でappend するだけ よい (イベントと しての 履歴も 残る) - 読み込み側は
いつでも 最新顧 客属性テーブルを 読めば 最新値が 手に 入る
とargMax を
同じパターンを別の用途にも展開している
「追記専用ログ → 最新状態テーブル」の
-
セグメント所属の
最新状態 :「いつ・誰が セグメントに 入った /出た」を 追記する セグメント出入りログ から、 argMaxで最新の 所属状態だけを 持つ 最新セグメント所属テーブル に 畳み込む。 配信時の 宛先リスト生成は この テーブルを 叩くので、 過去の 出入りログを scan する 必要が ない -
Webトラッキング → 顧客イベントへの
identity resolution : Webサイト上の行動は、 ログイン前の 訪問者では 匿名ID でしか 識別できないため、 生イベントは 一旦 匿名イベントログ に 溜まる。 これを 匿名ID → 顧客ID 対応テーブル と JOIN する MV を2本動かして、 顧客イベントの 正本テーブルに 流し込んでいる - リアルタイムMV: 匿名イベントが
入った 瞬間、 すでに 対応が 存在すれば 顧客IDを 解決して 顧客イベントテーブルに 流す - バック
フィルMV : identify で新しい 対応が 作られた 瞬間、 その 匿名IDに 紐づく 過去の 匿名イベントを まとめて 顧客イベントテーブルに 流し込む
この
MV2本の おかげで、 identify が 先か 後か を気に せず、 後段の セグメント配信・分析からは 「同一顧客の イベント」と して 一貫して 見える 状態が、 別パイプライン無しに 保てる - リアルタイムMV: 匿名イベントが
ポイントは
- 「バッチ集計ジョブ」を
運用しなくていい : AirflowやCloud Schedulerで 動く 「夜間集計ジョブ」を 一切持たず、 INSERT その ものが 派生テーブルを 更新する。 失敗・遅延・再実行と いった 夜間バッチ特有の 運用コストが 消える - 「集計」と
「紐付け (JOIN)」の :両方を INSERT契機で 宣言的に 書ける 「最新値への 畳み込み (最新顧 客属性 / 最新セグメント所属)」も 「identity resolution 付きの 転送 (Webトラッキング → 顧客イベント)」も、 同じ MVの 仕組みで 表現できる。 新しい 派生テーブル・新しい 結合 先が 必要に なっても、 別パイプラインや 夜間ジョブを 増や さずに 済む
「イベントは
私たちがCRMの
Redis(キャッシュ/セッション)
- RPCの
レスポンスキャッシュなどで 利用 - 「壊れても
復元できる」 ものに 限定して 使う ルールで 使用
3.6 SQL → Go の橋渡し: sqlc
PostgreSQL との
選定の
- 「SQLが
書ければ、 : ORMのそのまま 型付きGoコードに なる」と いう モデルの 素直さ 抽象化を 覚え直す必要が ない。 新しく 入った エンジニアでも、 SQLが 読めれば 即戦力に なる - クエリの
実態を : ORMだとレビューできる 最終的に 発行される SQLが 隠れが ちだが、 sqlcは 「人間が 書いた SQLが そのまま DBに 飛ぶ」 モデル。 N+1・インデックス活用・実行計画と いった 性能議論が、 PRレビューの 中で そのまま 行える - スキーマと
型の : 列をずれが 必ずコンパイルエラーに なる 増減した /型を 変えた 瞬間に、 それを 使っている 全クエリ・ 全G oコードが コンパイルエラーで 一気に 浮かび 上がる。 マイグレーションの リスクが 目に 見える 形で 減る
「ORMのdatabase/sqlの
3.7 非同期処理: Cloud Pub/Sub と Temporal の使い分け
非同期処理は
flowchart LR
subgraph Inputs["入力"]
Webhook[LINE / Shopify / ecforce / Clerk<br/>Webhook]
Track[Tracking events]
UI[配信開始ボタン<br/>同期API初期化]
end
subgraph Light["軽量・ステートレス → Pub/Sub"]
PubSub[(Cloud Pub/Sub)]
Subs["Subscribers<br/>(at-least-once / idempotent)"]
CH[(ClickHouse)]
PG1[(PostgreSQL)]
end
subgraph Heavy["長時間・多段階・要リトライ → Temporal"]
Temporal[(Temporal Cloud)]
WF["Workflow<br/>キャンペーン配信 / 大量同期"]
Acts["Activities<br/>(LINE送信 / Shopify fetch)"]
PG2[(PostgreSQL)]
LINEAPI[LINE Messaging API]
end
Webhook --> PubSub
Track --> PubSub
PubSub --> Subs
Subs --> CH
Subs --> PG1
UI --> Temporal
Temporal --> WF
WF --> Acts
Acts --> LINEAPI
Acts --> PG2
Cloud Pub/Sub を
- 単発の
軽いイベント配信 (Webhook受信 → ファンアウト、 トラッキングイベント → ClickHouse 書き込み) - 「at-least-onceで
投げて、 消費側が 冪等に 処理すれば よい」 もの - 高スループットが
要るが、 ステートレスな 処理
Temporal を
- LINEへの
キャンペーン配信のような、長時間・ 多段階・部 分失敗の リトライが 必要な 処理 - Shopifyや
ecforceなど ECプラットフォームからの 大量データ同期 (顧客・注文の 初回ロード)などの バッチ処理 - シナリオなど
「途中で クラッシュしても、 決定論的に 最後まで 進む」ことが 求められる ワークフロー
LINEへの
3.8 認証: Clerk
認証・組織管理には
- マルチテナント前提の
CRMでは、 ユーザー × 組織 × ロールの 管理が 初日から 必要で、 ここを 自前で 組むのは 費用対効果が 悪い - Clerkは
Organizations / Invitations / Roles を マネージドで 提供していて、 ミライCRMの 「組織」 モデルと そのまま 噛み合った - Webhookで
ユーザーや Organization の 変化を webhook_broker経由で受け取り、 内部の organizations/usersテーブルと整合性を 取っている
「認証は
3.9 エッジ: Cloudflare Workers
Webトラッキングやリンククリックの
- ユーザーに
最も 近い 場所で 受けたい (レイテンシが 顧客体験 その もの) - 1リクエストの
処理が 軽い (受け取って Cloudflare Queues に 積むだけ)ので、 Workers の コストモデル・ コールドスタートの 薄さが 効く
「重い
3.10 インフラ: Google Cloud / Cloud Run
インフラは
- Cloud Run: Goバイナリ単体で
動く ワークロードが 多く、 Kubernetesを 抱える ほどの 運用余力は 割きたくない。 Cloud Runは 「コンテナを そのまま デプロイ」 「リクエストベース課金」 「スケールを 意識しない」の バランスが 良い - Cloud Pub/Sub: 上で
書いた 通り、 軽い 非同期メッセージング基盤と して - Cloud Storage: 画像・添付ファイルなどの
永続オブジェクト - Cloud Scheduler: 定期ジョブの
起動 (ローカルでは cron で 代替)
「マネージドに
4. これから
ここまでが
- 顧客イベントを
使った レコメンデーションや、 CRMの AIエージェント化を 本格化する
と
技術選定の