agent-builder/scripts/templates/heartbeat/heartbeat-runner.sh

201 lines
5.8 KiB
Bash

#!/bin/bash
# Heartbeat runner for Claude Code agents.
# Reads HEARTBEAT.md, checks which tasks are due, invokes claude -p for each.
#
# Bash 3.2 compatible: no associative arrays, no mapfile, no |&
# Uses python3 for all JSON/YAML/date operations.
#
# Usage: ./heartbeat-runner.sh [--catchup]
# --catchup: run missed tasks on first invocation (max 5, 5s stagger)
#
# Placeholders:
# {{AGENT_NAME}} - name of the agent
# {{WORKING_DIR}} - absolute path to project directory
# {{MAX_TURNS}} - max turns per heartbeat (default: 10)
# {{ACK_MAX_CHARS}} - suppress responses shorter than this (default: 300)
AGENT_NAME="{{AGENT_NAME}}"
WORKING_DIR="{{WORKING_DIR}}"
MAX_TURNS="${MAX_TURNS:-10}"
ACK_MAX_CHARS="${ACK_MAX_CHARS:-300}"
HEARTBEAT_FILE="$WORKING_DIR/HEARTBEAT.md"
STATE_FILE="$WORKING_DIR/.heartbeat-state.json"
LOG_DIR="$WORKING_DIR/logs"
CATCHUP_MODE=false
if [ "$1" = "--catchup" ]; then
CATCHUP_MODE=true
fi
# Ensure directories exist
mkdir -p "$LOG_DIR"
# --- Emptiness detection (OpenClaw pattern) ---
# Skip API calls if heartbeat file has only headers/empty items
HEARTBEAT_FILE_ACTUAL="$HEARTBEAT_FILE"
EMPTY_CHECK=$(HEARTBEAT_FILE="$HEARTBEAT_FILE_ACTUAL" python3 -c "
import sys, re, os
hf = os.environ.get('HEARTBEAT_FILE', '')
try:
content = open(hf).read()
except:
print('true'); sys.exit(0)
stripped = re.sub(r'^#+.*$', '', content, flags=re.MULTILINE)
stripped = re.sub(r'^\s*$', '', stripped, flags=re.MULTILINE).strip()
print('true' if len(stripped) < 20 else 'false')
" 2>/dev/null)
if [ "$EMPTY_CHECK" = "true" ]; then
echo "$(date -u +%Y-%m-%dT%H:%M:%SZ) | heartbeat | SKIP (empty heartbeat file)" >> "$LOG_DIR/heartbeat.log"
exit 0
fi
# --- Parse tasks and check due times ---
DUE_TASKS=$(python3 << PYEOF
import json, re, os, time
heartbeat_file = "$HEARTBEAT_FILE_ACTUAL"
state_file = "$STATE_FILE"
catchup = "$CATCHUP_MODE" == "true"
# Parse tasks from HEARTBEAT.md
try:
content = open(heartbeat_file).read()
except FileNotFoundError:
print("[]")
exit(0)
# Simple YAML-like task parsing
tasks = []
current_task = {}
for line in content.split('\n'):
line = line.strip()
m_name = re.match(r'-\s*name:\s*(.+)', line)
m_interval = re.match(r'interval:\s*(.+)', line)
m_prompt = re.match(r'prompt:\s*"(.+)"', line)
if m_name:
if current_task.get('name'):
tasks.append(current_task)
current_task = {'name': m_name.group(1).strip()}
elif m_interval and current_task:
current_task['interval'] = m_interval.group(1).strip()
elif m_prompt and current_task:
current_task['prompt'] = m_prompt.group(1).strip()
if current_task.get('name'):
tasks.append(current_task)
# Load state
try:
state = json.load(open(state_file))
except:
state = {}
# Parse interval to seconds
def parse_interval(s):
s = s.strip()
m = re.match(r'(\d+)\s*(m|min|h|hr|d)', s)
if not m:
return 3600 # default 1 hour
val, unit = int(m.group(1)), m.group(2)
if unit in ('m', 'min'):
return val * 60
elif unit in ('h', 'hr'):
return val * 3600
elif unit == 'd':
return val * 86400
return 3600
# Check which tasks are due
now = time.time()
due = []
for task in tasks:
name = task.get('name', '')
interval_sec = parse_interval(task.get('interval', '1h'))
last_run = state.get(name, {}).get('last_run', 0)
if now - last_run >= interval_sec:
due.append(task)
elif catchup and last_run == 0:
due.append(task)
# Limit catchup to 5 tasks
if catchup:
due = due[:5]
print(json.dumps(due))
PYEOF
)
# --- Run due tasks ---
TASK_COUNT=$(echo "$DUE_TASKS" | python3 -c "import sys,json; print(len(json.load(sys.stdin)))" 2>/dev/null)
if [ "$TASK_COUNT" = "0" ]; then
echo "$(date -u +%Y-%m-%dT%H:%M:%SZ) | heartbeat | HEARTBEAT_OK (no tasks due)" >> "$LOG_DIR/heartbeat.log"
exit 0
fi
echo "$DUE_TASKS" | python3 -c "
import sys, json, subprocess, time, os
tasks = json.load(sys.stdin)
state_file = '$STATE_FILE'
log_dir = '$LOG_DIR'
working_dir = '$WORKING_DIR'
max_turns = '$MAX_TURNS'
ack_max = int('$ACK_MAX_CHARS')
catchup = '$CATCHUP_MODE' == 'true'
# Load state
try:
state = json.load(open(state_file))
except:
state = {}
for i, task in enumerate(tasks):
name = task.get('name', 'unknown')
prompt = task.get('prompt', '')
if not prompt:
continue
# Stagger catchup tasks
if catchup and i > 0:
time.sleep(5)
ts = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
print('{} | heartbeat | RUNNING: {}'.format(ts, name))
try:
result = subprocess.run(
['claude', '-p', prompt, '--output-format', 'text', '--max-turns', str(max_turns)],
capture_output=True, text=True, timeout=600,
cwd=working_dir
)
output = result.stdout.strip()
# Suppress short ack responses (OpenClaw ackMaxChars pattern)
if len(output) <= ack_max and 'HEARTBEAT_OK' in output:
log_line = '{} | heartbeat | {} | HEARTBEAT_OK (suppressed)'.format(ts, name)
else:
log_line = '{} | heartbeat | {} | completed ({} chars)'.format(ts, name, len(output))
# Save full output
log_path = os.path.join(log_dir, 'heartbeat-{}-{}.log'.format(name, time.strftime('%Y-%m-%d')))
with open(log_path, 'a') as f:
f.write('--- {} ---\n{}\n\n'.format(ts, output))
except subprocess.TimeoutExpired:
log_line = '{} | heartbeat | {} | TIMEOUT'.format(ts, name)
except Exception as e:
log_line = '{} | heartbeat | {} | ERROR: {}'.format(ts, name, str(e))
with open(os.path.join(log_dir, 'heartbeat.log'), 'a') as f:
f.write(log_line + '\n')
# Update state
state[name] = {'last_run': time.time()}
# Save state
with open(state_file, 'w') as f:
json.dump(state, f, indent=2)
"
echo "Heartbeat complete: $TASK_COUNT tasks processed"