feat(ms-ai-architect): add plugin to open marketplace (v1.5.0 baseline)
Initial addition of ms-ai-architect plugin to the open-source marketplace. Private content excluded: orchestrator/ (Linear tooling), docs/utredning/ (client investigation), generated test reports and PDF export script. skill-gen tooling moved from orchestrator/ to scripts/skill-gen/. Security scan: WARNING (risk 20/100) — no secrets, no injection found. False positive fixed: added gitleaks:allow to Python variable reference in output-validation-grounding-verification.md line 109. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
a8d79e4484
commit
6a7632146e
490 changed files with 213249 additions and 2 deletions
|
|
@ -0,0 +1,403 @@
|
|||
# State Management and Consistency During Failover
|
||||
|
||||
**Last updated:** 2026-02
|
||||
**Status:** GA
|
||||
**Category:** Business Continuity & Disaster Recovery
|
||||
|
||||
---
|
||||
|
||||
## Introduksjon
|
||||
|
||||
Håndtering av applikasjonstilstand (state) under failover-scenarioer er en av de mest utfordrende aspektene ved BCDR for AI-systemer. AI-applikasjoner har typisk flere typer state som må ivaretas: brukersesjoner, konversasjonshistorikk, mellomresultater fra langvarige operasjoner (fine-tuning, batch-indeksering), og applikasjonskonfigurasjon.
|
||||
|
||||
Under en failover kan in-flight requests gå tapt, sesjonsstilstand kan bli inkonsistent mellom regioner, og operasjoner som var halvveis fullført kan etterlate systemet i en ukjent tilstand. For å håndtere dette kreves distribuerte state management-mønstre, idempotente operasjoner, og robust request-retry logikk.
|
||||
|
||||
For norsk offentlig sektor er tap av state spesielt problematisk når AI-systemet støtter saksbehandling eller vedtaksfatting. Forvaltningsloven krever sporbarhet og etterrettelighet, noe som betyr at konversasjonshistorikk og AI-anbefalinger må bevares konsistent gjennom failover.
|
||||
|
||||
## Distribuerte state management-mønstre
|
||||
|
||||
### State-kategorier for AI-systemer
|
||||
|
||||
| State-type | Eksempel | Varighet | Kritikalitet | Lagring |
|
||||
|-----------|---------|----------|-------------|---------|
|
||||
| Session state | Autentiseringstoken, brukerpreferanser | Timer | Middels | Redis Cache / Cosmos DB |
|
||||
| Conversation state | Chat-historikk, kontekstvindu | Dager | Høy | Cosmos DB |
|
||||
| Operation state | Fine-tuning progress, batch-status | Timer–Dager | Middels | Queue + Cosmos DB |
|
||||
| Configuration state | Model deployments, system prompts | Permanent | Kritisk | App Configuration / Git |
|
||||
| Cache state | Søkeresultater, embeddings | Minutter–Timer | Lav | Redis Cache |
|
||||
|
||||
### Distribuert state med Azure Cosmos DB
|
||||
|
||||
```python
|
||||
# Distribuert state management for AI chatbot
|
||||
from azure.cosmos.aio import CosmosClient
|
||||
from azure.identity.aio import DefaultAzureCredential
|
||||
import json
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
class DistributedStateManager:
|
||||
"""Manage AI application state across regions with Cosmos DB."""
|
||||
|
||||
def __init__(self, connection_string, database_name="ai-state"):
|
||||
self.client = CosmosClient.from_connection_string(connection_string)
|
||||
self.database = self.client.get_database_client(database_name)
|
||||
self.sessions = self.database.get_container_client("sessions")
|
||||
self.conversations = self.database.get_container_client("conversations")
|
||||
|
||||
async def save_session(self, session_id: str, user_id: str, data: dict):
|
||||
"""Save session state with TTL and version tracking."""
|
||||
document = {
|
||||
"id": session_id,
|
||||
"userId": user_id,
|
||||
"data": data,
|
||||
"version": data.get("version", 0) + 1,
|
||||
"lastUpdated": datetime.utcnow().isoformat(),
|
||||
"ttl": 3600 * 24, # 24 timer TTL
|
||||
"region": self._get_current_region()
|
||||
}
|
||||
await self.sessions.upsert_item(document)
|
||||
return document["version"]
|
||||
|
||||
async def get_session(self, session_id: str, user_id: str):
|
||||
"""Get session with partition key optimization."""
|
||||
try:
|
||||
response = await self.sessions.read_item(
|
||||
item=session_id,
|
||||
partition_key=user_id
|
||||
)
|
||||
return response
|
||||
except Exception:
|
||||
return None # Session not found
|
||||
|
||||
async def save_conversation_turn(
|
||||
self, conversation_id: str, user_id: str, turn: dict
|
||||
):
|
||||
"""Append a conversation turn atomically."""
|
||||
# Bruk conditional update for å unngå konflikter
|
||||
conversation = await self._get_or_create_conversation(
|
||||
conversation_id, user_id
|
||||
)
|
||||
|
||||
# Legg til turn med unik ID for idempotens
|
||||
turn["turnId"] = f"{conversation_id}-{len(conversation['turns'])}"
|
||||
turn["timestamp"] = datetime.utcnow().isoformat()
|
||||
conversation["turns"].append(turn)
|
||||
conversation["lastUpdated"] = datetime.utcnow().isoformat()
|
||||
|
||||
# Conditional update med ETag for optimistisk locking
|
||||
await self.conversations.replace_item(
|
||||
item=conversation_id,
|
||||
body=conversation,
|
||||
match_condition=conversation.get("_etag")
|
||||
)
|
||||
|
||||
def _get_current_region(self):
|
||||
import os
|
||||
return os.environ.get("AZURE_REGION", "unknown")
|
||||
```
|
||||
|
||||
### Redis Cache for Session State
|
||||
|
||||
```bash
|
||||
# Azure Cache for Redis med geo-replikering
|
||||
# Primær region
|
||||
az redis create \
|
||||
--name "redis-ai-norwayeast" \
|
||||
--resource-group "rg-ai-prod" \
|
||||
--location "norwayeast" \
|
||||
--sku "Premium" \
|
||||
--vm-size "P1" \
|
||||
--enable-non-ssl-port false \
|
||||
--minimum-tls-version "1.2"
|
||||
|
||||
# Sekundær region (geo-replica)
|
||||
az redis create \
|
||||
--name "redis-ai-swedencentral" \
|
||||
--resource-group "rg-ai-dr" \
|
||||
--location "swedencentral" \
|
||||
--sku "Premium" \
|
||||
--vm-size "P1" \
|
||||
--enable-non-ssl-port false
|
||||
|
||||
# Opprett geo-replikering
|
||||
az redis server-link create \
|
||||
--name "redis-ai-norwayeast" \
|
||||
--resource-group "rg-ai-prod" \
|
||||
--server-to-link "/subscriptions/{sub}/resourceGroups/rg-ai-dr/providers/Microsoft.Cache/Redis/redis-ai-swedencentral" \
|
||||
--replication-role Secondary
|
||||
```
|
||||
|
||||
## Sesjonsstilstandsreplikering og synkronisering
|
||||
|
||||
### Session Affinity vs. Shared State
|
||||
|
||||
| Tilnærming | Fordel | Ulempe | Anbefalt for |
|
||||
|-----------|--------|--------|-------------|
|
||||
| Session affinity (sticky) | Enkel, ingen replikering | Session tapt ved node-feil | Dev/test |
|
||||
| Shared state (Redis) | Rask failover | Replikeringsforsinkelse | Produksjon |
|
||||
| Shared state (Cosmos DB) | Global replikering | Høyere latens enn Redis | Multi-region |
|
||||
| Stateless (JWT) | Ingen server-state | Begrenset datamengde | API-first design |
|
||||
|
||||
### Session migration under failover
|
||||
|
||||
```csharp
|
||||
// C# Session migration strategy
|
||||
public class ResilientSessionStore : ISessionStore
|
||||
{
|
||||
private readonly IDistributedCache _primaryCache;
|
||||
private readonly IDistributedCache _secondaryCache;
|
||||
private readonly CosmosClient _cosmosClient;
|
||||
private bool _usingPrimary = true;
|
||||
|
||||
public async Task<SessionData?> GetSessionAsync(string sessionId)
|
||||
{
|
||||
var cache = _usingPrimary ? _primaryCache : _secondaryCache;
|
||||
|
||||
try
|
||||
{
|
||||
var data = await cache.GetStringAsync(sessionId);
|
||||
if (data != null)
|
||||
return JsonSerializer.Deserialize<SessionData>(data);
|
||||
}
|
||||
catch (RedisConnectionException)
|
||||
{
|
||||
// Redis failover
|
||||
_usingPrimary = !_usingPrimary;
|
||||
cache = _usingPrimary ? _primaryCache : _secondaryCache;
|
||||
|
||||
try
|
||||
{
|
||||
var data = await cache.GetStringAsync(sessionId);
|
||||
if (data != null)
|
||||
return JsonSerializer.Deserialize<SessionData>(data);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Begge Redis nede — fall tilbake til Cosmos DB
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: hent fra Cosmos DB (persistent store)
|
||||
return await GetFromCosmosAsync(sessionId);
|
||||
}
|
||||
|
||||
public async Task SaveSessionAsync(string sessionId, SessionData data)
|
||||
{
|
||||
// Skriv til Redis OG Cosmos DB (write-through)
|
||||
var json = JsonSerializer.Serialize(data);
|
||||
var options = new DistributedCacheEntryOptions
|
||||
{
|
||||
AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(24)
|
||||
};
|
||||
|
||||
// Redis (rask, men kan feile)
|
||||
try
|
||||
{
|
||||
var cache = _usingPrimary ? _primaryCache : _secondaryCache;
|
||||
await cache.SetStringAsync(sessionId, json, options);
|
||||
}
|
||||
catch { /* Redis-feil er ikke kritisk */ }
|
||||
|
||||
// Cosmos DB (persistent, geo-replikert)
|
||||
await SaveToCosmosAsync(sessionId, data);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Håndtering av in-flight requests under failover
|
||||
|
||||
### Request draining
|
||||
|
||||
```python
|
||||
# Graceful request draining under failover
|
||||
import asyncio
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
class GracefulFailoverManager:
|
||||
"""Manage in-flight requests during failover."""
|
||||
|
||||
def __init__(self, drain_timeout_seconds=30):
|
||||
self.drain_timeout = drain_timeout_seconds
|
||||
self.active_requests = 0
|
||||
self.accepting_requests = True
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
@asynccontextmanager
|
||||
async def track_request(self):
|
||||
"""Context manager to track active requests."""
|
||||
async with self._lock:
|
||||
if not self.accepting_requests:
|
||||
raise ServiceUnavailableError(
|
||||
"Service is draining for failover. "
|
||||
"Please retry against the new endpoint."
|
||||
)
|
||||
self.active_requests += 1
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
async with self._lock:
|
||||
self.active_requests -= 1
|
||||
|
||||
async def initiate_drain(self):
|
||||
"""Stop accepting new requests and wait for in-flight to complete."""
|
||||
async with self._lock:
|
||||
self.accepting_requests = False
|
||||
|
||||
# Vent på at aktive requests fullføres
|
||||
start = asyncio.get_event_loop().time()
|
||||
while self.active_requests > 0:
|
||||
elapsed = asyncio.get_event_loop().time() - start
|
||||
if elapsed > self.drain_timeout:
|
||||
print(f"Drain timeout! {self.active_requests} requests still active")
|
||||
break
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
return self.active_requests == 0
|
||||
|
||||
# Bruk i applikasjon
|
||||
failover_mgr = GracefulFailoverManager(drain_timeout_seconds=30)
|
||||
|
||||
async def handle_chat_request(request):
|
||||
async with failover_mgr.track_request():
|
||||
response = await process_ai_request(request)
|
||||
return response
|
||||
```
|
||||
|
||||
## Idempotens og request retry-strategier
|
||||
|
||||
### Idempotent design for AI-operasjoner
|
||||
|
||||
```python
|
||||
# Idempotent AI operations with deduplication
|
||||
import hashlib
|
||||
import json
|
||||
|
||||
class IdempotentAIService:
|
||||
"""Ensure AI operations are idempotent using request IDs."""
|
||||
|
||||
def __init__(self, state_store, cache_ttl_seconds=3600):
|
||||
self.state_store = state_store
|
||||
self.cache_ttl = cache_ttl_seconds
|
||||
|
||||
def generate_idempotency_key(self, operation: str, params: dict) -> str:
|
||||
"""Generate deterministic key for deduplication."""
|
||||
canonical = json.dumps(params, sort_keys=True)
|
||||
return hashlib.sha256(f"{operation}:{canonical}".encode()).hexdigest()
|
||||
|
||||
async def execute_idempotent(
|
||||
self, operation: str, params: dict, execute_fn
|
||||
):
|
||||
"""Execute operation with idempotency guarantee."""
|
||||
key = self.generate_idempotency_key(operation, params)
|
||||
|
||||
# Sjekk om operasjonen allerede er utført
|
||||
existing = await self.state_store.get(f"idempotent:{key}")
|
||||
if existing:
|
||||
return json.loads(existing) # Returner cached resultat
|
||||
|
||||
# Utfør operasjonen
|
||||
result = await execute_fn(params)
|
||||
|
||||
# Lagre resultat for deduplisering
|
||||
await self.state_store.set(
|
||||
f"idempotent:{key}",
|
||||
json.dumps(result),
|
||||
ttl=self.cache_ttl
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
# Eksempel: Idempotent embedding-generering
|
||||
service = IdempotentAIService(redis_store)
|
||||
|
||||
async def generate_embedding(text):
|
||||
return await service.execute_idempotent(
|
||||
operation="embed",
|
||||
params={"text": text, "model": "text-embedding-3-large"},
|
||||
execute_fn=lambda p: openai_client.embeddings.create(
|
||||
input=p["text"], model=p["model"]
|
||||
)
|
||||
)
|
||||
```
|
||||
|
||||
### Retry-strategi med idempotens
|
||||
|
||||
| Operasjonstype | Idempotent? | Retry-strategi | Max retries |
|
||||
|---------------|-------------|---------------|-------------|
|
||||
| Chat completion | Ja (med seed) | Exponential backoff | 3 |
|
||||
| Embedding generation | Ja (deterministisk) | Fast retry | 3 |
|
||||
| Search query | Ja (read-only) | Fast retry | 5 |
|
||||
| Index update | Ja (upsert) | Exponential backoff | 3 |
|
||||
| Fine-tuning start | Nei | Ingen retry | 0 |
|
||||
| Conversation save | Conditional (ETag) | Exponential backoff | 3 |
|
||||
|
||||
## State validering og verifikasjonsprosedyrer
|
||||
|
||||
### Post-failover validering
|
||||
|
||||
```python
|
||||
# Post-failover state validation checklist
|
||||
async def validate_state_after_failover(primary_region, dr_region):
|
||||
"""Validate state consistency after failover."""
|
||||
results = {}
|
||||
|
||||
# 1. Verifiser session state
|
||||
sample_sessions = await get_recent_sessions(limit=100)
|
||||
session_ok = 0
|
||||
for session in sample_sessions:
|
||||
dr_session = await dr_state_store.get_session(session["id"])
|
||||
if dr_session and dr_session["version"] >= session["version"] - 1:
|
||||
session_ok += 1
|
||||
results["sessions"] = {
|
||||
"total": len(sample_sessions),
|
||||
"consistent": session_ok,
|
||||
"pct": round(session_ok / max(len(sample_sessions), 1) * 100, 1)
|
||||
}
|
||||
|
||||
# 2. Verifiser conversation state
|
||||
sample_convs = await get_recent_conversations(limit=50)
|
||||
conv_ok = 0
|
||||
for conv in sample_convs:
|
||||
dr_conv = await dr_state_store.get_conversation(conv["id"])
|
||||
if dr_conv and len(dr_conv["turns"]) >= len(conv["turns"]) - 1:
|
||||
conv_ok += 1
|
||||
results["conversations"] = {
|
||||
"total": len(sample_convs),
|
||||
"consistent": conv_ok,
|
||||
"pct": round(conv_ok / max(len(sample_convs), 1) * 100, 1)
|
||||
}
|
||||
|
||||
# 3. Verifiser configuration state
|
||||
primary_config = await get_app_configuration(primary_region)
|
||||
dr_config = await get_app_configuration(dr_region)
|
||||
config_match = primary_config == dr_config
|
||||
results["configuration"] = {"consistent": config_match}
|
||||
|
||||
# 4. Samlet vurdering
|
||||
all_ok = (
|
||||
results["sessions"]["pct"] > 95 and
|
||||
results["conversations"]["pct"] > 95 and
|
||||
results["configuration"]["consistent"]
|
||||
)
|
||||
results["overall"] = "PASS" if all_ok else "FAIL"
|
||||
|
||||
return results
|
||||
```
|
||||
|
||||
## Referanser
|
||||
|
||||
- [Recommendations for handling transient faults](https://learn.microsoft.com/en-us/azure/well-architected/design-guides/handle-transient-faults) — Retry og idempotens
|
||||
- [Retry pattern](https://learn.microsoft.com/en-us/azure/architecture/patterns/retry) — Retry-mønster
|
||||
- [Designing Azure Functions for identical input](https://learn.microsoft.com/en-us/azure/azure-functions/functions-idempotent) — Idempotent design
|
||||
- [Compensating Transaction pattern](https://learn.microsoft.com/en-us/azure/architecture/patterns/compensating-transaction) — Kompenserende transaksjoner
|
||||
- [Azure Cosmos DB consistency levels](https://learn.microsoft.com/en-us/azure/cosmos-db/consistency-levels) — Konsistensmodeller
|
||||
- [Azure Cache for Redis geo-replication](https://learn.microsoft.com/en-us/azure/azure-cache-for-redis/cache-how-to-geo-replication) — Redis geo-replikering
|
||||
|
||||
## For Cosmo
|
||||
|
||||
- **Bruk denne referansen** når kunden trenger hjelp med state management under failover for AI-applikasjoner.
|
||||
- Anbefal alltid write-through til Cosmos DB selv om Redis brukes som primær session store — Redis-data kan gå tapt ved failover.
|
||||
- Idempotens er OBLIGATORISK for alle AI-operasjoner som kan retries — bruk request IDs og conditional updates.
|
||||
- For konversasjonshistorikk: Bruk append-only mønster med unik turnId for å unngå duplikater ved retry.
|
||||
- Graceful request draining bør implementeres i alle produksjonsapplikasjoner — brå terminering av in-flight requests gir dårlig brukeropplevelse.
|
||||
Loading…
Add table
Add a link
Reference in a new issue