Agentive
自動化ラボ

AIでログ分析を自動化 — エラーパターン検出と原因特定

約5分で読めます

なぜAIでログ分析するのか

1日に数GB発生するログファイルを人間が目視するのは不可能。AIを使えば、エラーパターンの検出、根本原因の特定、改善提案の生成までを自動化できる。grep/awkでは見つけられない「文脈を伴うパターン」を検出できるのがAIの強み。

ログパーサーの実装

まず、各種ログフォーマットを統一的に解析するパーサーを作る。

import json
import re
from datetime import datetime
from pathlib import Path

class LogParser:
    """複数フォーマット対応ログパーサー"""

    PATTERNS = {
        "nginx": re.compile(
            r'(?P<ip>\S+) .+ \[(?P<time>[^\]]+)\] '
            r'"(?P<method>\S+) (?P<path>\S+) \S+" '
            r'(?P<status>\d+) (?P<size>\d+)'
        ),
        "python": re.compile(
            r'(?P<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})'
            r',\d+ - (?P<level>\w+) - (?P<message>.+)'
        ),
        "jsonl": None  # JSON Lines形式
    }

    def parse(self, filepath):
        entries = []
        with open(filepath, "r", encoding="utf-8") as f:
            for line in f:
                entry = self._parse_line(line.strip())
                if entry:
                    entries.append(entry)
        return entries

    def _parse_line(self, line):
        # JSONL形式を試行
        try:
            data = json.loads(line)
            return {
                "time": data.get("timestamp", ""),
                "level": data.get("level", "INFO"),
                "message": data.get("message", str(data))
            }
        except json.JSONDecodeError:
            pass

        # 正規表現パターンで試行
        for fmt, pattern in self.PATTERNS.items():
            if pattern and (m := pattern.match(line)):
                return m.groupdict()

        return {"message": line, "level": "UNKNOWN"}

エラーパターン検出

パースしたログからエラーパターンを自動検出する。

from collections import Counter
from anthropic import Anthropic

def detect_patterns(entries, top_n=10):
    """頻出エラーパターンを検出"""
    errors = [
        e for e in entries
        if e.get("level") in ("ERROR", "CRITICAL", "FATAL")
        or (e.get("status") and int(e["status"]) >= 500)
    ]

    # エラーメッセージの正規化(数値・IDを除去)
    normalized = [
        re.sub(r'\b[0-9a-f-]{8,}\b', '<ID>', e["message"])
        for e in errors
    ]
    normalized = [
        re.sub(r'\d+', '<N>', msg) for msg in normalized
    ]

    # 頻出パターンTOP N
    counter = Counter(normalized)
    return counter.most_common(top_n)

def analyze_with_ai(patterns, sample_logs):
    """AIによる根本原因分析"""
    client = Anthropic()

    prompt = f"""以下のログエラーパターンを分析してください。

## 頻出エラーパターン(上位)
{json.dumps(patterns, indent=2, ensure_ascii=False)}

## サンプルログ(直近100件)
{json.dumps(sample_logs[:100], indent=2, ensure_ascii=False)}

## 回答形式
各パターンについて:
1. 推定原因
2. 影響範囲
3. 推奨対策(優先度付き)
"""

    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=2048,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.content[0].text

時系列異常検知

エラー頻度の急増を検知し、障害の予兆を捉える。

def detect_anomaly(entries, window_minutes=5, threshold=3.0):
    """エラー頻度の異常検知(Z-score方式)"""
    from statistics import mean, stdev

    # 時間窓ごとのエラー数を集計
    buckets = {}
    for e in entries:
        if e.get("level") in ("ERROR", "CRITICAL"):
            ts = e["time"][:16]  # 分単位に丸める
            buckets[ts] = buckets.get(ts, 0) + 1

    counts = list(buckets.values())
    if len(counts) < 10:
        return []

    avg = mean(counts)
    std = stdev(counts) or 1

    alerts = []
    for ts, count in buckets.items():
        z_score = (count - avg) / std
        if z_score > threshold:
            alerts.append({
                "time": ts,
                "count": count,
                "z_score": round(z_score, 2),
                "severity": "CRITICAL" if z_score > 5 else "WARNING"
            })

    return alerts

Discord通知連携

検出した異常をDiscord Webhookでリアルタイム通知する。

import httpx

DISCORD_WEBHOOK_URL = os.environ["DISCORD_WEBHOOK_URL"]

async def notify_discord(alerts, analysis):
    """Discord Webhookで通知"""
    if not alerts:
        return

    embed = {
        "title": "Log Alert: エラーパターン検出",
        "color": 0xFF0000 if any(
            a["severity"] == "CRITICAL" for a in alerts
        ) else 0xFFA500,
        "fields": [
            {
                "name": f"{a['time']}",
                "value": (
                    f"エラー数: {a['count']}件 "
                    f"(Z-score: {a['z_score']})"
                ),
                "inline": True
            }
            for a in alerts[:5]
        ],
        "description": analysis[:500],
        "timestamp": datetime.utcnow().isoformat()
    }

    async with httpx.AsyncClient() as client:
        await client.post(
            DISCORD_WEBHOOK_URL,
            json={"embeds": [embed]}
        )

定期実行の設定

cronまたはsystemd timerで定期実行する。

# crontab -e
# 5分ごとにログ分析を実行
*/5 * * * * cd /app && python log_analyzer.py >> /var/log/analyzer.log 2>&1

実用パターン集

ユースケース検出方法通知条件
API 5xx急増ステータスコード集計5分間で10件以上
メモリリークOOMキーワード検出1件でも即通知
認証失敗連続同一IPの401集計1分間に5回以上
レスポンス遅延レイテンシ抽出P95が3秒超
ディスク容量dfコマンド出力解析使用率90%超

関連記事

A

Agentive 編集部

AIエージェントを実際に使い倒す個人開発者。サイト制作の自動化を実践しながら、その知見を発信しています。