近頃はどこもかしこもIT業界はAIに関する話題で浮き足立っている。
自分もまたそのひとりであり、比較的大量のリクエストをAIに投げて処理する機会に恵まれたのでそのときのメモ。
Anthropic の Messages API には Batch API というバッチ処理用のエンドポイントがある。
複数のリクエストをまとめて投げて、非同期で結果を受け取る仕組み。
料金が通常の50%OFFになるので、リアルタイム性が不要な処理には積極的に使いたい。
この記事では anthropic-sdk-ruby(anthropic gem)を使った実装例を紹介します。
最初ruby-anthropicというのを使っていたんだけど、いつの間にやら公式のgemができていたのでそちらに乗り換えました。
多分、今後新機能とかの追従速度も公式のほうが早いはず。
全体の流れ
Batch API の処理は3つのフェーズに分けて考えました。
1. 登録 — リクエストをDBに溜める(status: pending)
2. アップロード — pending をまとめて Batch API に送信(status: uploaded)
3. ダウンロード — 処理完了を確認し、結果を取得して後続処理を実行
このフェーズを分けをすることで、登録はアプリケーションの任意のタイミングで行い、アップロードとダウンロードは定期的なrakeタスクやcronで実行する、という運用ができる。
DBスキーマ
まず、バッチリクエストを管理するテーブルを用意する。
# バッチリクエストのキュー
create_table :batch_prompts do |t|
t.string :prompt_type, null: false # ハンドラの識別子(例: 'review_analysis')
t.string :record_type, null: false # 対象モデル名(例: 'Product')
t.bigint :record_id, null: false # 対象レコードID
t.string :model, null: false # Claude モデル名
t.text :params_json, null: false # Messages API パラメータ(JSON)
t.string :custom_id, null: false # バッチ内の一意識別子
t.string :batch_api_id # Anthropic が返すバッチID
t.string :status, default: 'pending' # pending → uploaded → completed / failed
t.timestamps
end
# 結果の保存
create_table :batch_downloads do |t|
t.string :batch_api_id, null: false
t.string :custom_id, null: false
t.string :result_type # succeeded / errored / canceled / expired
t.text :response_body # レスポンス全体(JSON)
t.boolean :processed, default: false
t.timestamps
end
モデルはシンプルに。
class BatchPrompt < ApplicationRecord
scope :pending, -> { where(status: 'pending') }
scope :uploaded, -> { where(status: 'uploaded') }
def parsed_params
JSON.parse(params_json, symbolize_names: true)
end
end
Phase 1: リクエストの登録
アプリケーション側からバッチリクエストを登録する。
同じレコードに対して、いくつもリクエストが発生してしまうと、お金と時間がもったいない。
重複登録を防ぐチェックを入れておくと安心。
class BatchRegistrar
def self.call(prompt_type:, record_type:, record_id:, model:, params:)
custom_id = "#{prompt_type}-#{record_id}-#{SecureRandom.hex(4)}"
return if BatchPrompt.exists?(prompt_type:, record_type:, record_id:)
BatchPrompt.create!(
prompt_type:,
record_type:,
record_id:,
model:,
params_json: params.to_json,
custom_id:,
status: 'pending'
)
end
end
呼び出し側はこんな感じ。
BatchRegistrar.call(
prompt_type: 'review_analysis',
record_type: 'Product',
record_id: product.id,
model: 'claude-sonnet-4-6',
params: {
model: 'claude-sonnet-4-6',
system: [{ type: "text", text: system_prompt }],
messages: [{ role: "user", content: [{ type: "text", text: user_prompt }] }],
max_tokens: 4096
}
)
params には通常の Messages API と同じパラメータをそのまま渡す。tools や output_config(構造化出力)を含めることもできる。
Phase 2: アップロード
pending 状態のリクエストをまとめて Batch API に送信する。
class BatchUploader
def self.call
prompts = BatchPrompt.pending.to_a
return if prompts.empty?
requests = prompts.map do |prompt|
{ custom_id: prompt.custom_id, params: prompt.parsed_params }
end
batch = client.messages.batches.create(requests:)
BatchPrompt.where(id: prompts.map(&:id)).update_all(
batch_api_id: batch.id,
status: 'uploaded'
)
end
private
def self.client
Anthropic::Client.new(api_key: ENV['ANTHROPIC_API_KEY'])
end
end
messages.batches.create に requests 配列を渡すと、Anthropic 側でキューイングされてバッチ処理が始まる。レスポンスに含まれる batch.id を保存しておく。
Claude Consoleの画面を見ると、Batchesのところでも経過を確認できる。
Phase 3: ダウンロード
アップロード済みバッチの処理状況を確認し、完了していれば結果を取得する。
class BatchDownloader
HANDLER_MAP = {
'review_analysis' => 'Handlers::ReviewAnalysis',
'product_summary' => 'Handlers::ProductSummary'
}.freeze
def self.call
batch_ids = BatchPrompt.uploaded.distinct.pluck(:batch_api_id).compact
batch_ids.each do |batch_id|
batch = client.messages.batches.retrieve(batch_id)
case batch.processing_status
when :ended
download_results(batch_id)
when :in_progress, :canceling
# まだ処理中 — 次回のダウンロードで再チェック
puts "batch #{batch_id}: #{batch.processing_status}"
end
rescue => e
Rails.logger.error("BatchDownloader: #{batch_id} #{e.message}")
end
end
private
def self.download_results(batch_id)
stream = client.messages.batches.results_streaming(batch_id)
stream.each do |line|
parsed = line.is_a?(String) ? JSON.parse(line) : line
custom_id = parsed['custom_id']
result = parsed['result']
download = BatchDownload.create!(
batch_api_id: batch_id,
custom_id:,
result_type: result['type'],
response_body: result.to_json
)
dispatch(download)
end
end
def self.dispatch(download)
prompt = BatchPrompt.find_by(custom_id: download.custom_id)
return if prompt.nil?
handler = HANDLER_MAP[prompt.prompt_type]
if handler.nil?
prompt.update!(status: 'failed')
return
end
if download.result_type == 'succeeded'
handler.constantize.call(prompt, download)
download.update!(processed: true)
prompt.update!(status: 'completed')
else
prompt.update!(status: 'failed')
end
end
def self.client
Anthropic::Client.new(api_key: ENV['ANTHROPIC_API_KEY'])
end
end
results_streaming はストリーミングで1件ずつ結果を返してくれるので、メモリ効率が良い。各結果の custom_id で元のリクエストと紐付けて、HANDLER_MAP で対応するハンドラに処理を委譲する。
ハンドラの実装例
結果を受け取って後続処理を行うハンドラの例。
アップロード、ダウンロードのルートはひとつにしておいて、そこを通る処理は複数種類あっても大丈夫な作りとなると、こんな感じで対応するハンドラが適宜呼び出される形が良いと思う。
module Handlers
class ReviewAnalysis
def self.call(prompt, download)
product = Product.find_by(id: prompt.record_id)
return if product.nil?
# レスポンスからテキストを取り出す
response = JSON.parse(download.response_body)
content = response.dig('message', 'content')
text_block = content&.reverse&.find { |b| b['type'] == 'text' }
json_text = text_block&.dig('text')
return if json_text.blank?
ProductReview.create!(
product_id: product.id,
ai_model: prompt.model,
analysis_json: json_text
)
end
end
end
response_body の構造は通常の Messages API レスポンスと同じで、message.content 配列の中に text ブロックがある。構造化出力(output_config で JSON Schema を指定した場合)なら、そのまま JSON.parse できるテキストが返ってくる。
定期実行
rakeタスクを定義して、cronや定期実行の仕組みで回す。
namespace :batch do
task upload: :environment do
BatchUploader.call
end
task download: :environment do
BatchDownloader.call
end
end
活用Tips
プロンプトキャッシュ
system プロンプトに cache_control を付けると、同じ内容のシステムプロンプトがキャッシュされて入力トークンのコストをさらに削減できる。
バッチ処理では同じシステムプロンプトを大量に送ることが多いので、効果が大きくなるはず。
params: {
model: 'claude-sonnet-4-6',
system: [{
type: "text",
text: system_prompt,
cache_control: { type: "ephemeral" }
}],
messages: [{ role: "user", content: [{ type: "text", text: user_prompt }] }],
max_tokens: 4096
}
構造化出力
output_config を指定すると、JSON Schema に従った構造化レスポンスが保証される。
後続の処理でパースエラーを気にしなくて済む。プログラム書く側の都合としては大変良い。
これを知ったときは大変便利だと思った。OpenAIにもあるのだろうか。
ただし、後述するWeb検索ツールとは相性が悪く、うまく動作しない。
多分、時間の問題で後々は解決に向かっていくと思うけど今は併用しないほうがいいみたい。
params: {
# ...
output_config: {
format: {
type: :json_schema,
schema: {
type: "object",
properties: {
summary: { type: "string" },
score: { type: "integer", minimum: 1, maximum: 10 },
tags: { type: "array", items: { type: "string" } }
},
required: ["summary", "score", "tags"]
}
}
}
}
Web検索ツール
バッチリクエストでもツール利用が可能。web_search を含めれば、最新情報を踏まえた回答を得られる。
web_search_20260209という最新バージョンもあって、そっちを最初使ってみたけど、こっちはhtmlをAIがパースしてテキストを抜き出したりするみたいで無駄な処理を延々とやっているように見えたので採用を見送った。
上手に使えば、多分最新版のほうが良いのだろう。これは後々の課題だ。
params: {
# ...
tools: [{ type: "web_search_20250305", name: "web_search", max_uses: 5 }]
}
まとめ
Batch API は「リアルタイム性は不要だが、大量のリクエストを安く処理したい」というユースケースに最適。
DBキューを介した3フェーズ構成にしておくと、登録・実行・結果処理がきれいに分離できて運用しやすい。
通常の Messages API と同じパラメータがそのまま使えるので、既存のプロンプトをバッチに移行するのもそれほど手間ではなかった。
料金半分の恩恵を受けつつ、プロンプトキャッシュや構造化出力と組み合わせれば、さらにコスト効率を上げることができると思う。
あとは、Batch APIを使って大丈夫な処理でも、緊急用に即実行が可能な同期処理用のエントリポイントも設けておくと運用面ではより安心できるはず。