AIでログ分析を自動化 — エラーパターン検出と原因特定
約5分で読めます
なぜAIでログ分析するのか
1日に数GB発生するログファイルを人間が目視するのは不可能。AIを使えば、エラーパターンの検出、根本原因の特定、改善提案の生成までを自動化できる。grep/awkでは見つけられない「文脈を伴うパターン」を検出できるのがAIの強み。
ログパーサーの実装
まず、各種ログフォーマットを統一的に解析するパーサーを作る。
import json
import re
from datetime import datetime
from pathlib import Path
class LogParser:
"""複数フォーマット対応ログパーサー"""
PATTERNS = {
"nginx": re.compile(
r'(?P<ip>\S+) .+ \[(?P<time>[^\]]+)\] '
r'"(?P<method>\S+) (?P<path>\S+) \S+" '
r'(?P<status>\d+) (?P<size>\d+)'
),
"python": re.compile(
r'(?P<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})'
r',\d+ - (?P<level>\w+) - (?P<message>.+)'
),
"jsonl": None # JSON Lines形式
}
def parse(self, filepath):
entries = []
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
entry = self._parse_line(line.strip())
if entry:
entries.append(entry)
return entries
def _parse_line(self, line):
# JSONL形式を試行
try:
data = json.loads(line)
return {
"time": data.get("timestamp", ""),
"level": data.get("level", "INFO"),
"message": data.get("message", str(data))
}
except json.JSONDecodeError:
pass
# 正規表現パターンで試行
for fmt, pattern in self.PATTERNS.items():
if pattern and (m := pattern.match(line)):
return m.groupdict()
return {"message": line, "level": "UNKNOWN"}
エラーパターン検出
パースしたログからエラーパターンを自動検出する。
from collections import Counter
from anthropic import Anthropic
def detect_patterns(entries, top_n=10):
"""頻出エラーパターンを検出"""
errors = [
e for e in entries
if e.get("level") in ("ERROR", "CRITICAL", "FATAL")
or (e.get("status") and int(e["status"]) >= 500)
]
# エラーメッセージの正規化(数値・IDを除去)
normalized = [
re.sub(r'\b[0-9a-f-]{8,}\b', '<ID>', e["message"])
for e in errors
]
normalized = [
re.sub(r'\d+', '<N>', msg) for msg in normalized
]
# 頻出パターンTOP N
counter = Counter(normalized)
return counter.most_common(top_n)
def analyze_with_ai(patterns, sample_logs):
"""AIによる根本原因分析"""
client = Anthropic()
prompt = f"""以下のログエラーパターンを分析してください。
## 頻出エラーパターン(上位)
{json.dumps(patterns, indent=2, ensure_ascii=False)}
## サンプルログ(直近100件)
{json.dumps(sample_logs[:100], indent=2, ensure_ascii=False)}
## 回答形式
各パターンについて:
1. 推定原因
2. 影響範囲
3. 推奨対策(優先度付き)
"""
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
時系列異常検知
エラー頻度の急増を検知し、障害の予兆を捉える。
def detect_anomaly(entries, window_minutes=5, threshold=3.0):
"""エラー頻度の異常検知(Z-score方式)"""
from statistics import mean, stdev
# 時間窓ごとのエラー数を集計
buckets = {}
for e in entries:
if e.get("level") in ("ERROR", "CRITICAL"):
ts = e["time"][:16] # 分単位に丸める
buckets[ts] = buckets.get(ts, 0) + 1
counts = list(buckets.values())
if len(counts) < 10:
return []
avg = mean(counts)
std = stdev(counts) or 1
alerts = []
for ts, count in buckets.items():
z_score = (count - avg) / std
if z_score > threshold:
alerts.append({
"time": ts,
"count": count,
"z_score": round(z_score, 2),
"severity": "CRITICAL" if z_score > 5 else "WARNING"
})
return alerts
Discord通知連携
検出した異常をDiscord Webhookでリアルタイム通知する。
import httpx
DISCORD_WEBHOOK_URL = os.environ["DISCORD_WEBHOOK_URL"]
async def notify_discord(alerts, analysis):
"""Discord Webhookで通知"""
if not alerts:
return
embed = {
"title": "Log Alert: エラーパターン検出",
"color": 0xFF0000 if any(
a["severity"] == "CRITICAL" for a in alerts
) else 0xFFA500,
"fields": [
{
"name": f"{a['time']}",
"value": (
f"エラー数: {a['count']}件 "
f"(Z-score: {a['z_score']})"
),
"inline": True
}
for a in alerts[:5]
],
"description": analysis[:500],
"timestamp": datetime.utcnow().isoformat()
}
async with httpx.AsyncClient() as client:
await client.post(
DISCORD_WEBHOOK_URL,
json={"embeds": [embed]}
)
定期実行の設定
cronまたはsystemd timerで定期実行する。
# crontab -e
# 5分ごとにログ分析を実行
*/5 * * * * cd /app && python log_analyzer.py >> /var/log/analyzer.log 2>&1
実用パターン集
| ユースケース | 検出方法 | 通知条件 |
|---|---|---|
| API 5xx急増 | ステータスコード集計 | 5分間で10件以上 |
| メモリリーク | OOMキーワード検出 | 1件でも即通知 |
| 認証失敗連続 | 同一IPの401集計 | 1分間に5回以上 |
| レスポンス遅延 | レイテンシ抽出 | P95が3秒超 |
| ディスク容量 | dfコマンド出力解析 | 使用率90%超 |
関連記事
A
Agentive 編集部
AIエージェントを実際に使い倒す個人開発者。サイト制作の自動化を実践しながら、その知見を発信しています。