#!/bin/bash # Pipeline optimizer: identify bottlenecks, excess loops, cost outliers. # Bash 3.2 compatible. Uses python3 for all analysis. # Does NOT auto-implement any changes -- produces RECOMMENDATIONS.md only. # # Analysis covers: # - Bottleneck agents (highest avg duration or cost per run) # - Unnecessary revision loops (agents that loop 3+ times on average) # - Underutilized agents (invoked < 10% of pipeline runs) # - Cost outliers (single run cost >= 3x average) # # Output: RECOMMENDATIONS.md with VFM pre-scores for each recommendation. # # Usage: # ./pipeline-optimizer.sh # ./pipeline-optimizer.sh --pipeline {{PIPELINE_NAME}} # # Placeholders: # {{WORKING_DIR}} - absolute path to project directory WORKING_DIR="{{WORKING_DIR}}" FEEDBACK_FILE="$WORKING_DIR/FEEDBACK.md" COST_LOG="$WORKING_DIR/budget/cost-events.jsonl" RECOMMENDATIONS_FILE="$WORKING_DIR/RECOMMENDATIONS.md" PIPELINE_FILTER="" # Parse arguments (bash 3.2 compatible) while [ "$#" -gt 0 ]; do case "$1" in --pipeline) PIPELINE_FILTER="$2"; shift 2 ;; *) shift ;; esac done python3 << PYEOF import re, json, os, sys from collections import defaultdict from datetime import datetime feedback_file = "$FEEDBACK_FILE" cost_log = "$COST_LOG" recommendations_file = "$RECOMMENDATIONS_FILE" pipeline_filter = "$PIPELINE_FILTER" # Parse FEEDBACK.md feedback_rows = [] if os.path.exists(feedback_file): with open(feedback_file) as f: in_table = False for line in f: line = line.strip() if '| Date |' in line: in_table = True continue if in_table and line.startswith('|---'): continue if in_table and line.startswith('|') and '{{' not in line: cols = [c.strip() for c in line.strip('|').split('|')] if len(cols) >= 7: try: score_m = re.match(r'(\d+)', cols[3]) score = int(score_m.group(1)) if score_m else 0 feedback_rows.append({ 'date': cols[0], 'pipeline': cols[1], 'agent': cols[2], 'score': score, 'issue': cols[4], 'pattern': cols[6] }) except (ValueError, IndexError): pass # Filter by pipeline if pipeline_filter: feedback_rows = [r for r in feedback_rows if r['pipeline'] == pipeline_filter] # Parse cost events cost_events = [] if os.path.exists(cost_log): with open(cost_log) as f: for line in f: line = line.strip() if line: try: cost_events.append(json.loads(line)) except Exception: pass # Per-agent event counts (cost proxy) cost_by_agent = defaultdict(list) # Group by agent+date for per-run cost run_costs = defaultdict(list) for e in cost_events: agent = e.get('agent', 'unknown') date = e.get('timestamp', '')[:10] run_key = f"{agent}:{date}" cost_by_agent[agent].append(1) run_costs[agent].append(1) # Build recommendations recommendations = [] # 1. Bottleneck agents: top 2 by event count if cost_by_agent: agent_totals = [(a, len(events)) for a, events in cost_by_agent.items()] agent_totals.sort(key=lambda x: -x[1]) for agent, total in agent_totals[:2]: all_costs = [len(v) for v in run_costs.values()] avg_cost = sum(all_costs) / len(all_costs) if all_costs else 1 if total > avg_cost * 1.5: recommendations.append({ 'type': 'bottleneck', 'agent': agent, 'description': f"Agent '{agent}' accounts for {total} events vs avg {avg_cost:.0f}. " f"Consider batching its tool calls or reducing its task scope.", 'vfm_prescore': 70 }) # 2. Unnecessary revision loops: agents with loop-excess pattern >= 3 times pattern_by_agent = defaultdict(lambda: defaultdict(int)) for r in feedback_rows: if r['pattern']: pattern_by_agent[r['agent']][r['pattern']] += 1 for agent, patterns in pattern_by_agent.items(): if patterns.get('loop-excess', 0) >= 3: count = patterns['loop-excess'] recommendations.append({ 'type': 'loop-excess', 'agent': agent, 'description': f"Agent '{agent}' has {count} feedback rows tagged 'loop-excess'. " f"Review pipeline revision criteria -- tighten acceptance conditions " f"or add a max-iterations guard (see self-healing.sh).", 'vfm_prescore': 80 }) # 3. Underutilized agents: invoked in < 10% of pipeline runs if feedback_rows: all_runs = set(r['date'] + ':' + r['pipeline'] for r in feedback_rows) total_runs = len(all_runs) if all_runs else 1 agent_runs = defaultdict(set) for r in feedback_rows: agent_runs[r['agent']].add(r['date'] + ':' + r['pipeline']) for agent, runs in agent_runs.items(): utilization = len(runs) / total_runs if utilization < 0.1 and total_runs >= 10: recommendations.append({ 'type': 'underutilized', 'agent': agent, 'description': f"Agent '{agent}' appears in only {utilization*100:.0f}% of pipeline runs. " f"Consider removing from the pipeline or combining with another agent.", 'vfm_prescore': 60 }) # 4. Cost outliers: single-run cost >= 3x average if run_costs: all_run_totals = [] for agent, runs in run_costs.items(): all_run_totals.extend(runs) avg_run = sum(all_run_totals) / len(all_run_totals) if all_run_totals else 1 for agent, runs in run_costs.items(): for run_cost in runs: if run_cost >= avg_run * 3: recommendations.append({ 'type': 'cost-outlier', 'agent': agent, 'description': f"Agent '{agent}' had a run costing {run_cost} events " f"vs avg {avg_run:.1f} (3x+ threshold). " f"Add per-run budget cap with budget-hook.sh.", 'vfm_prescore': 75 }) break # one recommendation per agent # Write RECOMMENDATIONS.md timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') pipeline_label = pipeline_filter if pipeline_filter else "all pipelines" lines = [ f"# Pipeline Optimization Recommendations", f"", f"Generated: {timestamp}", f"Scope: {pipeline_label}", f"", f"> These are recommendations only. No changes have been made.", f"> Review each item and implement manually or with team approval.", f"", ] if recommendations: lines.append(f"## Recommendations ({len(recommendations)} found)") lines.append("") for i, rec in enumerate(recommendations, 1): lines.append(f"### R{i}: {rec['type'].upper()} -- {rec['agent']}") lines.append("") lines.append(rec['description']) lines.append("") lines.append(f"**VFM pre-score:** {rec['vfm_prescore']}/100") lines.append("") else: lines.append("## No recommendations") lines.append("") lines.append("No bottlenecks, excess loops, underutilized agents, or cost outliers detected.") lines.append("") lines.append("## Next steps") lines.append("") lines.append("1. Review each recommendation with the team") lines.append("2. Prioritize by VFM pre-score (higher = more value per effort)") lines.append("3. Implement approved changes one at a time") lines.append("4. Run feedback-collector.sh for 10+ runs after each change") lines.append("5. Re-run pipeline-optimizer.sh to confirm improvement") with open(recommendations_file, 'w') as f: f.write('\n'.join(lines) + '\n') print(f"Recommendations written to {recommendations_file}") print(f" Found: {len(recommendations)} recommendations") for rec in recommendations: print(f" - [{rec['type']}] {rec['agent']}: VFM pre-score {rec['vfm_prescore']}") PYEOF