agent-builder/scripts/templates/optimization/pipeline-optimizer.sh
Kjell Tore Guttormsen fa8bc86897 feat(templates): add pipeline optimization and self-healing templates
Session 5 step 21 — pipeline-optimizer writes RECOMMENDATIONS.md with
VFM pre-scores (never modifies pipeline files directly). self-healing
categorizes errors and applies recovery strategies with 5-attempt hard
cap, logging to healing-log.jsonl.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 06:51:38 +02:00

221 lines
8.1 KiB
Bash

#!/bin/bash
# Pipeline optimizer: identify bottlenecks, excess loops, cost outliers.
# Bash 3.2 compatible. Uses python3 for all analysis.
# Does NOT auto-implement any changes -- produces RECOMMENDATIONS.md only.
#
# Analysis covers:
# - Bottleneck agents (highest avg duration or cost per run)
# - Unnecessary revision loops (agents that loop 3+ times on average)
# - Underutilized agents (invoked < 10% of pipeline runs)
# - Cost outliers (single run cost >= 3x average)
#
# Output: RECOMMENDATIONS.md with VFM pre-scores for each recommendation.
#
# Usage:
# ./pipeline-optimizer.sh
# ./pipeline-optimizer.sh --pipeline {{PIPELINE_NAME}}
#
# Placeholders:
# {{WORKING_DIR}} - absolute path to project directory
WORKING_DIR="{{WORKING_DIR}}"
FEEDBACK_FILE="$WORKING_DIR/FEEDBACK.md"
COST_LOG="$WORKING_DIR/budget/cost-events.jsonl"
RECOMMENDATIONS_FILE="$WORKING_DIR/RECOMMENDATIONS.md"
PIPELINE_FILTER=""
# Parse arguments (bash 3.2 compatible)
while [ "$#" -gt 0 ]; do
case "$1" in
--pipeline) PIPELINE_FILTER="$2"; shift 2 ;;
*) shift ;;
esac
done
python3 << PYEOF
import re, json, os, sys
from collections import defaultdict
from datetime import datetime
feedback_file = "$FEEDBACK_FILE"
cost_log = "$COST_LOG"
recommendations_file = "$RECOMMENDATIONS_FILE"
pipeline_filter = "$PIPELINE_FILTER"
# Parse FEEDBACK.md
feedback_rows = []
if os.path.exists(feedback_file):
with open(feedback_file) as f:
in_table = False
for line in f:
line = line.strip()
if '| Date |' in line:
in_table = True
continue
if in_table and line.startswith('|---'):
continue
if in_table and line.startswith('|') and '{{' not in line:
cols = [c.strip() for c in line.strip('|').split('|')]
if len(cols) >= 7:
try:
score_m = re.match(r'(\d+)', cols[3])
score = int(score_m.group(1)) if score_m else 0
feedback_rows.append({
'date': cols[0],
'pipeline': cols[1],
'agent': cols[2],
'score': score,
'issue': cols[4],
'pattern': cols[6]
})
except (ValueError, IndexError):
pass
# Filter by pipeline
if pipeline_filter:
feedback_rows = [r for r in feedback_rows if r['pipeline'] == pipeline_filter]
# Parse cost events
cost_events = []
if os.path.exists(cost_log):
with open(cost_log) as f:
for line in f:
line = line.strip()
if line:
try:
cost_events.append(json.loads(line))
except Exception:
pass
# Per-agent event counts (cost proxy)
cost_by_agent = defaultdict(list)
# Group by agent+date for per-run cost
run_costs = defaultdict(list)
for e in cost_events:
agent = e.get('agent', 'unknown')
date = e.get('timestamp', '')[:10]
run_key = f"{agent}:{date}"
cost_by_agent[agent].append(1)
run_costs[agent].append(1)
# Build recommendations
recommendations = []
# 1. Bottleneck agents: top 2 by event count
if cost_by_agent:
agent_totals = [(a, len(events)) for a, events in cost_by_agent.items()]
agent_totals.sort(key=lambda x: -x[1])
for agent, total in agent_totals[:2]:
all_costs = [len(v) for v in run_costs.values()]
avg_cost = sum(all_costs) / len(all_costs) if all_costs else 1
if total > avg_cost * 1.5:
recommendations.append({
'type': 'bottleneck',
'agent': agent,
'description': f"Agent '{agent}' accounts for {total} events vs avg {avg_cost:.0f}. "
f"Consider batching its tool calls or reducing its task scope.",
'vfm_prescore': 70
})
# 2. Unnecessary revision loops: agents with loop-excess pattern >= 3 times
pattern_by_agent = defaultdict(lambda: defaultdict(int))
for r in feedback_rows:
if r['pattern']:
pattern_by_agent[r['agent']][r['pattern']] += 1
for agent, patterns in pattern_by_agent.items():
if patterns.get('loop-excess', 0) >= 3:
count = patterns['loop-excess']
recommendations.append({
'type': 'loop-excess',
'agent': agent,
'description': f"Agent '{agent}' has {count} feedback rows tagged 'loop-excess'. "
f"Review pipeline revision criteria -- tighten acceptance conditions "
f"or add a max-iterations guard (see self-healing.sh).",
'vfm_prescore': 80
})
# 3. Underutilized agents: invoked in < 10% of pipeline runs
if feedback_rows:
all_runs = set(r['date'] + ':' + r['pipeline'] for r in feedback_rows)
total_runs = len(all_runs) if all_runs else 1
agent_runs = defaultdict(set)
for r in feedback_rows:
agent_runs[r['agent']].add(r['date'] + ':' + r['pipeline'])
for agent, runs in agent_runs.items():
utilization = len(runs) / total_runs
if utilization < 0.1 and total_runs >= 10:
recommendations.append({
'type': 'underutilized',
'agent': agent,
'description': f"Agent '{agent}' appears in only {utilization*100:.0f}% of pipeline runs. "
f"Consider removing from the pipeline or combining with another agent.",
'vfm_prescore': 60
})
# 4. Cost outliers: single-run cost >= 3x average
if run_costs:
all_run_totals = []
for agent, runs in run_costs.items():
all_run_totals.extend(runs)
avg_run = sum(all_run_totals) / len(all_run_totals) if all_run_totals else 1
for agent, runs in run_costs.items():
for run_cost in runs:
if run_cost >= avg_run * 3:
recommendations.append({
'type': 'cost-outlier',
'agent': agent,
'description': f"Agent '{agent}' had a run costing {run_cost} events "
f"vs avg {avg_run:.1f} (3x+ threshold). "
f"Add per-run budget cap with budget-hook.sh.",
'vfm_prescore': 75
})
break # one recommendation per agent
# Write RECOMMENDATIONS.md
timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
pipeline_label = pipeline_filter if pipeline_filter else "all pipelines"
lines = [
f"# Pipeline Optimization Recommendations",
f"",
f"Generated: {timestamp}",
f"Scope: {pipeline_label}",
f"",
f"> These are recommendations only. No changes have been made.",
f"> Review each item and implement manually or with team approval.",
f"",
]
if recommendations:
lines.append(f"## Recommendations ({len(recommendations)} found)")
lines.append("")
for i, rec in enumerate(recommendations, 1):
lines.append(f"### R{i}: {rec['type'].upper()} -- {rec['agent']}")
lines.append("")
lines.append(rec['description'])
lines.append("")
lines.append(f"**VFM pre-score:** {rec['vfm_prescore']}/100")
lines.append("")
else:
lines.append("## No recommendations")
lines.append("")
lines.append("No bottlenecks, excess loops, underutilized agents, or cost outliers detected.")
lines.append("")
lines.append("## Next steps")
lines.append("")
lines.append("1. Review each recommendation with the team")
lines.append("2. Prioritize by VFM pre-score (higher = more value per effort)")
lines.append("3. Implement approved changes one at a time")
lines.append("4. Run feedback-collector.sh for 10+ runs after each change")
lines.append("5. Re-run pipeline-optimizer.sh to confirm improvement")
with open(recommendations_file, 'w') as f:
f.write('\n'.join(lines) + '\n')
print(f"Recommendations written to {recommendations_file}")
print(f" Found: {len(recommendations)} recommendations")
for rec in recommendations:
print(f" - [{rec['type']}] {rec['agent']}: VFM pre-score {rec['vfm_prescore']}")
PYEOF