Agentive
AIエージェント活用

AIエージェントのレート制限対策 — API制限を賢く回避する

約5分で読めます

レート制限とは

APIプロバイダーは過負荷を防ぐためにリクエスト数を制限している。Anthropic APIの場合、Tier別にRPM(リクエスト/分)とTPM(トークン/分)の上限がある。エージェントを安定稼働させるには、この制限を理解した上で対策を実装する必要がある。

主要APIのレート制限一覧

プロバイダーTierRPMTPM
AnthropicTier 15040,000
AnthropicTier 44,000400,000
OpenAITier 16060,000
OpenAITier 410,0002,000,000

エージェントが複数のサブエージェントを並列実行すると、あっという間にRPM上限に到達する。

指数バックオフの実装

レート制限に引っかかったら、待機時間を指数的に増やしてリトライする。

import time
import random
from anthropic import Anthropic, RateLimitError

client = Anthropic()

def call_with_backoff(messages, max_retries=5):
    """指数バックオフ付きAPI呼び出し"""
    for attempt in range(max_retries):
        try:
            return client.messages.create(
                model="claude-sonnet-4-20250514",
                max_tokens=1024,
                messages=messages
            )
        except RateLimitError as e:
            if attempt == max_retries - 1:
                raise

            # 指数バックオフ + ジッター
            wait = min(2 ** attempt + random.uniform(0, 1), 60)
            print(f"Rate limited. Retry in {wait:.1f}s "
                  f"(attempt {attempt + 1}/{max_retries})")
            time.sleep(wait)

    raise Exception("Max retries exceeded")

ジッター(ランダムな揺らぎ)を加えることで、複数エージェントが同時にリトライして再び制限に引っかかる「thundering herd」問題を防ぐ。

リクエストキューの実装

並列実行するサブエージェントのリクエストを、キューで制御する。

import asyncio
from collections import deque
from time import time

class RequestQueue:
    """RPM制限を守るリクエストキュー"""

    def __init__(self, max_rpm=50):
        self.max_rpm = max_rpm
        self.timestamps = deque()
        self.lock = asyncio.Lock()

    async def acquire(self):
        """スロットが空くまで待機"""
        async with self.lock:
            now = time()
            # 60秒より古いタイムスタンプを除去
            while self.timestamps and \
                  self.timestamps[0] < now - 60:
                self.timestamps.popleft()

            if len(self.timestamps) >= self.max_rpm:
                # 最古のリクエストから60秒後まで待機
                wait = 60 - (now - self.timestamps[0])
                await asyncio.sleep(max(wait, 0.1))

            self.timestamps.append(time())

    async def call_api(self, messages):
        """キュー制御付きAPI呼び出し"""
        await self.acquire()
        return await async_api_call(messages)

# 使用例
queue = RequestQueue(max_rpm=45)  # 余裕を持って45に設定

async def run_agents():
    tasks = [
        queue.call_api(msg) for msg in agent_messages
    ]
    return await asyncio.gather(*tasks)

max_rpmは公式上限より少し低く設定するのがコツ。バッファを持たせることで、制限到達を予防する。

モデルフォールバック戦略

高性能モデルが制限に達したら、軽量モデルに自動切替する。

MODEL_FALLBACK_CHAIN = [
    {
        "model": "claude-opus-4-20250514",
        "use_for": "complex_reasoning",
        "rpm_limit": 50
    },
    {
        "model": "claude-sonnet-4-20250514",
        "use_for": "general_tasks",
        "rpm_limit": 100
    },
    {
        "model": "claude-haiku-35-20241022",
        "use_for": "simple_tasks",
        "rpm_limit": 200
    }
]

class ModelRouter:
    def __init__(self):
        self.queues = {
            m["model"]: RequestQueue(m["rpm_limit"])
            for m in MODEL_FALLBACK_CHAIN
        }
        self.blocked = set()

    async def call(self, messages, complexity="general"):
        """複雑度に応じたモデル選択 + フォールバック"""
        chain = self._get_chain(complexity)

        for model_config in chain:
            model = model_config["model"]
            if model in self.blocked:
                continue
            try:
                return await self.queues[model].call_api(
                    messages, model=model
                )
            except RateLimitError:
                self.blocked.add(model)
                # 60秒後にブロック解除
                asyncio.create_task(
                    self._unblock(model, 60)
                )
                continue

        raise Exception("All models exhausted")

複雑なタスクはOpusから試行し、制限に達したらSonnet、さらにHaikuへフォールバックする。タスクの複雑度に応じて開始モデルを変えることで、コストも最適化できる。

レスポンスヘッダの活用

APIレスポンスには残りリクエスト数の情報が含まれる。これを監視して事前に制御する。

def check_rate_headers(response):
    """レスポンスヘッダからレート状況を確認"""
    remaining = int(response.headers.get(
        "x-ratelimit-remaining", 0
    ))
    reset_at = response.headers.get(
        "x-ratelimit-reset"
    )

    if remaining < 5:
        print(f"Warning: Only {remaining} requests left. "
              f"Resets at {reset_at}")
        return "throttle"
    return "ok"

実運用でのベストプラクティス

対策効果実装難度
指数バックオフリトライ成功率95%以上
リクエストキューRPM超過を完全防止
モデルフォールバック可用性99.9%達成
レスポンスヘッダ監視事前警告で予防
プロンプトキャッシュリクエスト数を50%削減
バッチAPI活用RPM消費を大幅削減

関連記事

A

Agentive 編集部

AIエージェントを実際に使い倒す個人開発者。サイト制作の自動化を実践しながら、その知見を発信しています。