ktg-plugin-marketplace/plugins/ms-ai-architect/skills/ms-ai-infrastructure/references/hybrid-edge/edge-to-cloud-data-synchronization.md
Kjell Tore Guttormsen 6a7632146e feat(ms-ai-architect): add plugin to open marketplace (v1.5.0 baseline)
Initial addition of ms-ai-architect plugin to the open-source marketplace.
Private content excluded: orchestrator/ (Linear tooling), docs/utredning/
(client investigation), generated test reports and PDF export script.
skill-gen tooling moved from orchestrator/ to scripts/skill-gen/.

Security scan: WARNING (risk 20/100) — no secrets, no injection found.
False positive fixed: added gitleaks:allow to Python variable reference
in output-validation-grounding-verification.md line 109.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-07 17:17:17 +02:00

16 KiB

Edge-to-Cloud Data Synchronization

Last updated: 2026-02 Status: GA Category: Hybrid Cloud & Edge AI


Introduksjon

Palitelig datasynkronisering mellom edge og sky er en av de mest komplekse utfordringene i hybrid AI-arkitekturer. Data ma flyte i begge retninger — sensordata og inferensresultater fra edge til sky for langsiktig analyse og modelltrening, og oppdaterte modeller og konfigurasjoner fra sky tilbake til edge. Alt dette ma handtere nettverksavbrudd, konflikter og dataintegritet.

For norsk offentlig sektor er palitelig synkronisering kritisk: inspeksjonsdata fra felt ma garantert na sentrale systemer, AI-modellsoppdateringer ma distribueres til alle edge-stasjoner uten manuell intervensjon, og logging for compliance-formal ma vaere komplett — selv etter langvarige offline-perioder.

Microsoft tilbyr flere synkroniseringsmekanismer: Azure IoT Edge med utvidet offline-stoette (ubegrenset offline-tid med lokal buffring), Azure Container Storage med automatisk sky-sync, Azure Cosmos DB med multi-region replikering, og Azure IoT Hub device twins for konfigurasjonssynkronisering. Valget avhenger av datamengde, konsistenskrav og tilkoblingsprofil.


Kjernekomponenter

Komponent Formal Teknologi
Azure IoT Edge Utvidet offline med meldingsbuffring Container runtime
Azure Container Storage Lokal lagring med automatisk sky-sync Arc-enabled
Azure Cosmos DB Multi-region database med konflikthondtering NoSQL / SQL API
IoT Hub Device Twins Konfigurasjonssynkronisering enhet-sky PaaS
Event Hub Hoeyvolum event-inntak Event streaming
Azure Data Lake Langsiktig datalagring Storage Gen2
Delta Lake ACID-transaksjoner pa datalake Open source

Eventual Consistency Patterns

Konsistensmodeller for edge-sky

Modell Beskrivelse Latens Brukstilfelle
Sterk konsistens Alle noder ser samme data samtidig Hoey Kritiske transaksjoner
Bounded staleness Data er konsistent innen et tidsvindu Medium Nesten-sanntid dashboards
Session konsistens Konsistens innen en enhet-session Lav Brukerinteraksjon
Eventuell konsistens Data konvergerer over tid Lavest Sensordata, logger

Azure Cosmos DB for edge-sky-synkronisering

# Azure Cosmos DB med konfigurerbar konsistens for edge-sky sync
from azure.cosmos import CosmosClient, PartitionKey
from azure.cosmos.documents import ConsistencyLevel

class EdgeCloudSyncStore:
    def __init__(self, endpoint: str, key: str):
        self.client = CosmosClient(
            endpoint, key,
            consistency_level=ConsistencyLevel.Session  # Bra for edge-sky
        )
        self.database = self.client.get_database_client("edge-ai-data")

    def setup_containers(self):
        """Opprett containere for synkronisert data"""

        # Sensordata: Eventuell konsistens, hoey throughput
        self.sensor_container = self.database.create_container_if_not_exists(
            id="sensor-data",
            partition_key=PartitionKey(path="/deviceId"),
            default_ttl=86400 * 30  # 30 dagers retention
        )

        # AI-resultater: Session konsistens
        self.ai_results = self.database.create_container_if_not_exists(
            id="ai-results",
            partition_key=PartitionKey(path="/deviceId"),
            default_ttl=86400 * 365  # 1 aars retention
        )

        # Modellkonfigurasjon: Sterkere konsistens
        self.model_config = self.database.create_container_if_not_exists(
            id="model-config",
            partition_key=PartitionKey(path="/region")
        )

    def upsert_sensor_data(self, device_id: str, readings: list[dict]):
        """Skriv sensordata med idempotensnokkel for a haandtere re-sync"""
        for reading in readings:
            reading["id"] = f"{device_id}_{reading['timestamp']}"
            reading["deviceId"] = device_id
            reading["_etag"] = None  # Cosmos DB haandterer versjonering

            self.sensor_container.upsert_item(
                body=reading,
                pre_trigger_include=None,
                post_trigger_include=None
            )

    def get_latest_model_config(self, region: str) -> dict:
        """Hent siste modellkonfigurasjon for en region"""
        query = """
            SELECT TOP 1 *
            FROM c
            WHERE c.region = @region
            ORDER BY c.updatedAt DESC
        """
        items = list(self.model_config.query_items(
            query=query,
            parameters=[{"name": "@region", "value": region}],
            enable_cross_partition_query=False
        ))
        return items[0] if items else None

Event-basert synkronisering

# Event-basert edge-to-cloud synkronisering
import asyncio
import json
import gzip
from datetime import datetime, timedelta

class EventBasedSync:
    def __init__(self, local_store, cloud_endpoint: str):
        self.local_store = local_store
        self.cloud_endpoint = cloud_endpoint
        self.sync_log = []
        self.last_sync = None

    async def sync_outbound(self, max_batch_size: int = 100) -> dict:
        """Synkroniser lokale hendelser til sky"""
        # Hent usynkroniserte hendelser
        pending = self.local_store.get_unsynced_events(limit=max_batch_size)

        if not pending:
            return {"status": "up_to_date", "synced": 0}

        # Komprimer for overfoering
        payload = gzip.compress(
            json.dumps([e.__dict__ for e in pending]).encode()
        )

        try:
            # Send til sky-endpoint
            response = await self._send_to_cloud(payload)

            if response["status"] == "accepted":
                # Marker som synkronisert
                event_ids = [e.id for e in pending]
                self.local_store.mark_synced(event_ids)

                self.last_sync = datetime.utcnow()
                self.sync_log.append({
                    "direction": "outbound",
                    "events": len(event_ids),
                    "size_bytes": len(payload),
                    "timestamp": self.last_sync.isoformat()
                })

                return {
                    "status": "synced",
                    "synced": len(event_ids),
                    "remaining": self.local_store.count_unsynced(),
                    "compressed_size": len(payload)
                }

        except ConnectionError:
            return {
                "status": "offline",
                "pending": len(pending),
                "retry_after": "next_connectivity"
            }

    async def sync_inbound(self) -> dict:
        """Hent oppdateringer fra sky (modeller, konfigurasjon)"""
        try:
            since = self.last_sync or datetime.utcnow() - timedelta(days=7)
            updates = await self._fetch_from_cloud(since)

            applied = 0
            for update in updates:
                if update["type"] == "model_update":
                    await self._apply_model_update(update)
                    applied += 1
                elif update["type"] == "config_change":
                    await self._apply_config_change(update)
                    applied += 1

            return {"status": "updated", "applied": applied}

        except ConnectionError:
            return {"status": "offline", "using_cached": True}

Delta Sync Optimization

Inkrementell synkronisering

# Delta-synkronisering for effektiv dataoverfoering
import hashlib
import json
from typing import Optional

class DeltaSyncEngine:
    def __init__(self):
        self.local_checksums: dict[str, str] = {}
        self.sync_watermark: Optional[str] = None

    def calculate_delta(self, current_data: dict,
                        last_synced_data: dict) -> dict:
        """Beregn delta mellom navaerende og sist synkronisert tilstand"""
        delta = {
            "added": {},
            "modified": {},
            "deleted": []
        }

        # Finn nye og endrede elementer
        for key, value in current_data.items():
            current_hash = self._hash_value(value)
            if key not in last_synced_data:
                delta["added"][key] = value
            elif self._hash_value(last_synced_data[key]) != current_hash:
                delta["modified"][key] = value

        # Finn slettede elementer
        for key in last_synced_data:
            if key not in current_data:
                delta["deleted"].append(key)

        return delta

    def apply_delta(self, base_data: dict, delta: dict) -> dict:
        """Anvend delta pa basisdatasettet"""
        result = dict(base_data)

        # Legg til nye
        result.update(delta.get("added", {}))

        # Oppdater endrede
        result.update(delta.get("modified", {}))

        # Fjern slettede
        for key in delta.get("deleted", []):
            result.pop(key, None)

        return result

    def compress_delta(self, delta: dict) -> bytes:
        """Komprimer delta for overfoering"""
        import gzip
        json_bytes = json.dumps(delta, separators=(',', ':')).encode()
        compressed = gzip.compress(json_bytes, compresslevel=9)

        return compressed

    def get_sync_stats(self, delta: dict, compressed: bytes) -> dict:
        """Beregn synkroniseringsstatistikk"""
        full_size = len(json.dumps(delta).encode())
        return {
            "items_changed": (
                len(delta.get("added", {})) +
                len(delta.get("modified", {})) +
                len(delta.get("deleted", []))
            ),
            "full_size_bytes": full_size,
            "compressed_size_bytes": len(compressed),
            "compression_ratio": f"{(1 - len(compressed)/full_size)*100:.1f}%"
                if full_size > 0 else "N/A"
        }

    def _hash_value(self, value) -> str:
        return hashlib.sha256(json.dumps(value, sort_keys=True).encode()).hexdigest()[:16]

Conflict Resolution Strategies

Konflikttyper i edge-sky-synkronisering

Konflikttype Arsak Losningsstrategi
Write-Write Samme data endret pa bade edge og sky LWW eller custom merge
Delete-Update Data slettet pa en side, oppdatert pa annen Policy-basert (behold eller slett)
Schema-conflict Modellversjon ulik pa edge og sky Versjonert schema med migrasjon
Ordering-conflict Hendelser mottat i feil rekkefolge Timestamp-basert reordering

Cosmos DB konflikthondtering

# Cosmos DB konflikthondterings-policy
from azure.cosmos import ContainerProxy

class CosmosConflictHandler:
    def __init__(self, container: ContainerProxy):
        self.container = container

    def setup_lww_policy(self):
        """Last-Write-Wins basert pa egendefinert felt"""
        # Konfigureres ved container-oppretting
        # Cosmos DB bruker _ts (timestamp) som default
        pass

    def setup_custom_resolution(self):
        """Custom konflikthondtering med stored procedure"""
        sproc_body = """
        function resolve(incomingItem, existingItem, isTombstone, conflictingItems) {
            // For AI-resultater: Behold den med hoeyest confidence
            if (incomingItem.ai_confidence > existingItem.ai_confidence) {
                return incomingItem;
            }
            return existingItem;
        }
        """
        self.container.scripts.create_stored_procedure({
            "id": "resolveConflict",
            "body": sproc_body
        })

    def read_conflict_feed(self) -> list[dict]:
        """Les konflikter som krever manuell losning"""
        conflicts = list(self.container.list_conflicts())
        return [{
            "id": c["id"],
            "resource_id": c.get("resourceId"),
            "conflict_type": c.get("operationType"),
            "source_region": c.get("sourceResourceId")
        } for c in conflicts]

Data Deduplication at Scale

Dedupliseringsstrategier

# Skalerbar deduplisering for edge-sky-data
import hashlib
from bloom_filter2 import BloomFilter

class EdgeDeduplication:
    def __init__(self, expected_items: int = 1_000_000):
        # Bloom-filter for hurtig duplikat-sjekk (minneeffektivt)
        self.bloom = BloomFilter(
            max_elements=expected_items,
            error_rate=0.01  # 1% falsk-positiv rate
        )
        # Eksakt sjekk for bloom-positive
        self.recent_hashes: set = set()
        self.max_recent = 100_000

    def is_duplicate(self, data: dict) -> bool:
        """Sjekk om dataelementet allerede er prosessert"""
        data_hash = self._compute_hash(data)

        # Hurtig bloom-filter-sjekk
        if data_hash not in self.bloom:
            return False

        # Eksakt sjekk for bekreftelse
        return data_hash in self.recent_hashes

    def mark_processed(self, data: dict):
        """Marker dataelement som prosessert"""
        data_hash = self._compute_hash(data)
        self.bloom.add(data_hash)
        self.recent_hashes.add(data_hash)

        # Begrens minnebruk
        if len(self.recent_hashes) > self.max_recent:
            # Fjern eldste 20%
            to_remove = len(self.recent_hashes) - int(self.max_recent * 0.8)
            for _ in range(to_remove):
                self.recent_hashes.pop()

    def _compute_hash(self, data: dict) -> str:
        """Beregn deterministisk hash av dataelementet"""
        # Bruk innholds-hash (ekskluder metadata som timestamp)
        content_keys = sorted(k for k in data.keys()
                             if k not in ("_ts", "synced_at", "sync_id"))
        content = {k: data[k] for k in content_keys}
        return hashlib.sha256(
            json.dumps(content, sort_keys=True).encode()
        ).hexdigest()

    def get_stats(self) -> dict:
        return {
            "bloom_filter_items": len(self.bloom),
            "recent_exact_items": len(self.recent_hashes),
            "estimated_memory_mb": (
                self.bloom.bitarray.nbytes / 1024 / 1024 +
                len(self.recent_hashes) * 64 / 1024 / 1024
            )
        }

Norsk offentlig sektor

Synkroniseringskrav for offentlig sektor

Krav Beskrivelse Losning
Dataintegritet Ingen datatap ved offline/sync Event sourcing + idempotens
Sporbarhet All synkronisering ma logges Sync audit log
Personvern Sensitive data ma krypteres i transit TLS 1.3 + end-to-end
Compliance 7 ars retention for visse datatyper Immutable storage
Konflikthondtering Sporbar og deterministisk Policy-basert med audit trail

Anbefalte Azure-tjenester per scenario

Scenario Primaer-tjeneste Sekundaer Konsistens
IoT-sensordata IoT Hub + Event Hub Data Lake Eventuell
AI-resultater Cosmos DB Data Lake backup Session
Modellkonfig IoT Hub Device Twin Git (GitOps) Sterk
Inspeksjonsdata Cosmos DB Blob Storage Bounded staleness

Beslutningsrammeverk

Scenario Anbefaling Begrunnelse
Hoeyvolum sensorer, enveis IoT Hub → Event Hub → Data Lake Skalerbart, rimelig
Toveis med konfliktfare Cosmos DB med session-konsistens Innebygd konflikthondtering
Kritisk data, null tap Event sourcing + Cosmos DB Idempotent, sporbar
Periodisk bulk-sync Delta sync + Azure Blob Minimal bandwidth
Multi-edge koordinering Cosmos DB multi-write Automatisk konflikthondtering
Modellpush til edge IoT Hub Device Twin + Blob SAS Etablert monster

For Cosmo

  • Event sourcing med idempotens er gullstandarden for edge-sky-synkronisering — alle dataelementer faar en unik ID og kan trygt re-sendes uten duplikater
  • Delta-synkronisering reduserer datavolum med 80-95% sammenlignet med full sync — beregn kun endringer og komprimer med gzip for minimal bandbreddebruk
  • Cosmos DB med session-konsistens er den beste balansen mellom ytelse og dataintegritet for de fleste edge-sky-scenarier i offentlig sektor
  • Bloom-filter gir O(1) dedupliseringssjekk med minimal minnebruk — implementer dette pa bade edge og sky-siden for a hindre duplikat-inntak
  • For norsk offentlig sektor: Krav til sporbarhet og retention betyr at ALL synkronisering ma logges — implementer sync audit log med 7 ars immutable retention for compliance med arkivloven