Initial addition of ms-ai-architect plugin to the open-source marketplace. Private content excluded: orchestrator/ (Linear tooling), docs/utredning/ (client investigation), generated test reports and PDF export script. skill-gen tooling moved from orchestrator/ to scripts/skill-gen/. Security scan: WARNING (risk 20/100) — no secrets, no injection found. False positive fixed: added gitleaks:allow to Python variable reference in output-validation-grounding-verification.md line 109. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
15 KiB
IoT Operations and AI Integration
Last updated: 2026-02 Status: GA Category: Hybrid Cloud & Edge AI
Introduksjon
Azure IoT Operations er Microsofts edge runtime-plattform for industrielle IoT-scenarier, bygget pa Azure Arc-enabled Kubernetes. Den kombinerer datainnsamling fra sensorer og utstyr med AI-inferens direkte pa edge, noe som muliggjor sanntidsanalyse uten avhengighet av skytilkobling for tidskritiske beslutninger.
For norsk offentlig sektor er IoT-integrasjon med AI relevant i scenarier som smart infrastruktur (broer, tunneler, veier), miljooverkaking, energistyring i offentlige bygg, og transportlogistikk. Azure IoT Operations gir en standardisert plattform for a samle sensordata, normalisere dem, og kjore AI-modeller lokalt for prediktiv vedlikehold og anomalideteksjon.
Plattformen bygger pa MQTT-protokollen for enhetskommunikasjon, Data Flows for datatransformasjon og kontekstualisering, og Azure Arc for sentralisert administrasjon. AI-modeller kan deployes som containere pa edge-klyngen, med Azure ML for modelltrenings- og oppdateringspipeliner mellom sky og edge.
Kjernekomponenter
| Komponent | Formal | Teknologi |
|---|---|---|
| Azure IoT Operations | Edge runtime for IoT-datainnsamling og -prosessering | Arc-enabled Kubernetes |
| MQTT Broker | Meldingsinfrastruktur for enhets-til-edge-kommunikasjon | MQTT v3.1.1/v5 |
| Data Flows | Datatransformasjon, kontekstualisering og ruting | Pipelinekonfigurasjon |
| OPC UA Connector | Industriprotokoll for tilkobling til PLC-er og SCADA | OPC UA standard |
| Azure IoT Hub | Sky-endepunkt for telemetri og device management | PaaS |
| Azure Stream Analytics | Sanntids stromprosessering av IoT-data | SQL-basert query |
| Azure ML on Arc | Edge AI-modelltrenings- og inferenspipeline | Kubernetes ML |
Sensor Data Normalization
Utfordringer med sensordata
Sensordata fra industrielle miljoer er ofte heterogene — ulike protokoller (Modbus, OPC UA, MQTT), forskjellige dataformater, inkonsistente tidsserier, og varierende kvalitet. Normalisering er kritisk for at AI-modeller skal fungere palitelig.
Normaliseringsarkitektur
Sensorer → OPC UA / MQTT → Azure IoT Operations → Data Flows → Normalisert output
↓
AI-inferensmodul
↓
Azure IoT Hub (sky)
Data Flow-konfigurasjon for normalisering
# Eksempel: Data Flow for temperatursensor-normalisering
apiVersion: connectivity.iotoperations.azure.com/v1
kind: DataFlow
metadata:
name: temperature-normalization
spec:
sources:
- type: mqtt
topic: "sensors/temperature/#"
transformations:
- type: compute
expression: |
{
"deviceId": $.topic.split('/')[2],
"timestamp": $.systemProperties.enqueuedTime,
"temperature_celsius": $.payload.value * ($.payload.unit == 'F' ? 5/9 - 32*5/9 : 1),
"quality": $.payload.quality ?? 'unknown',
"location": $.payload.metadata.location
}
- type: filter
expression: "$.temperature_celsius >= -50 AND $.temperature_celsius <= 100"
destinations:
- type: mqtt
topic: "normalized/temperature"
- type: dataLakeStorage
endpoint: "edge-datalake"
Strategier for datakvalitet
| Strategi | Beskrivelse | Implementering |
|---|---|---|
| Range-validering | Filtrer verdier utenfor forventet omrade | Data Flow filter-transformasjon |
| Interpolering | Fyll manglende verdier i tidsserier | Edge-modul med pandas/numpy |
| Deduplisering | Fjern duplikate meldinger | MQTT broker QoS + dedup-logikk |
| Tidsstempelsynkronisering | Juster klokkeforskjeller mellom enheter | NTP + Data Flow timestamp-mapping |
| Enhetskonvertering | Standardiser til SI-enheter | Data Flow compute-transformasjon |
Edge Gateway AI Preprocessing
Gateway-arkitektur
Edge gateways fungerer som intelligente mellomledd mellom sensorer og sky. De utforer forbehandling, filtrering, aggregering og initial AI-inferens for a redusere datavolum og latens.
# Eksempel: Edge gateway AI-forbehandling med Azure IoT Edge
import asyncio
from azure.iot.device.aio import IoTHubModuleClient
import numpy as np
import onnxruntime as ort
class AIPreprocessingGateway:
def __init__(self):
self.module_client = None
self.anomaly_model = ort.InferenceSession("anomaly_detector.onnx")
self.buffer = []
self.buffer_size = 100
async def initialize(self):
self.module_client = IoTHubModuleClient.create_from_edge_environment()
await self.module_client.connect()
self.module_client.on_message_received = self.process_message
async def process_message(self, message):
"""Forbehandling pipeline: normalisering → anomalideteksjon → aggregering"""
data = message.data
# Trinn 1: Normalisering
normalized = self.normalize(data)
# Trinn 2: Anomalideteksjon (lokal inferens)
is_anomaly = self.detect_anomaly(normalized)
if is_anomaly:
# Send anomalier umiddelbart til sky
await self.module_client.send_message_to_output(
{"type": "anomaly", "data": normalized, "confidence": 0.95},
"alertOutput"
)
# Trinn 3: Buffer og aggreger normaldata
self.buffer.append(normalized)
if len(self.buffer) >= self.buffer_size:
aggregated = self.aggregate(self.buffer)
await self.module_client.send_message_to_output(
{"type": "aggregated", "data": aggregated},
"telemetryOutput"
)
self.buffer.clear()
def detect_anomaly(self, data):
"""ONNX-basert anomalideteksjon"""
input_array = np.array([data["values"]], dtype=np.float32)
result = self.anomaly_model.run(None, {"input": input_array})
return result[0][0] > 0.8 # Anomali-terskel
def aggregate(self, buffer):
"""Aggreger buffer til statistisk sammendrag"""
values = [item["value"] for item in buffer]
return {
"mean": np.mean(values),
"std": np.std(values),
"min": np.min(values),
"max": np.max(values),
"count": len(values),
"period_start": buffer[0]["timestamp"],
"period_end": buffer[-1]["timestamp"]
}
Fordeler med gateway-forbehandling
| Fordel | Beskrivelse | Effekt |
|---|---|---|
| Redusert bandwidth | Aggregering reduserer datamengde 10-100x | Lavere kostnader |
| Lavere latens | Anomalideteksjon pa millisekunder lokalt | Raskere respons |
| Offline-kapabilitet | Fortsetter drift uten skytilkobling | Hoyere tilgjengelighet |
| Datakvalitet | Validering og rensing for sky-inntak | Bedre analyser |
Time-Series Analytics at Edge
Azure Stream Analytics pa edge
Azure Stream Analytics kan deployes som IoT Edge-modul for sanntids tidsserieanalyse:
-- Stream Analytics edge-query: Glidende gjennomsnitt med anomalideteksjon
SELECT
IoTHub.ConnectionDeviceId AS DeviceId,
System.Timestamp() AS WindowEnd,
AVG(temperature) AS AvgTemperature,
STDEV(temperature) AS StdDevTemperature,
COUNT(*) AS ReadingCount,
CASE
WHEN AVG(temperature) >
LAG(AVG(temperature), 1) OVER (PARTITION BY IoTHub.ConnectionDeviceId LIMIT DURATION(minute, 30))
+ 3 * STDEV(temperature)
THEN 'ANOMALY'
ELSE 'NORMAL'
END AS Status
INTO
alertOutput
FROM
sensorInput TIMESTAMP BY EventProcessedUtcTime
GROUP BY
IoTHub.ConnectionDeviceId,
TumblingWindow(minute, 5)
HAVING
COUNT(*) > 10
Edge-basert prediktiv vedlikehold
# Prediktiv vedlikehold med tidsserieanalyse pa edge
import onnxruntime as ort
import numpy as np
from collections import deque
class PredictiveMaintenanceEdge:
def __init__(self, model_path: str, window_size: int = 100):
self.session = ort.InferenceSession(model_path)
self.window = deque(maxlen=window_size)
self.feature_names = ["vibration", "temperature", "pressure", "rpm"]
def add_reading(self, reading: dict) -> dict:
"""Legg til ny maling og returner prediksjon om bufferen er full"""
features = [reading.get(f, 0.0) for f in self.feature_names]
self.window.append(features)
if len(self.window) == self.window.maxlen:
return self.predict()
return {"status": "collecting", "readings": len(self.window)}
def predict(self) -> dict:
"""Kjor RUL-prediksjon (Remaining Useful Life)"""
input_data = np.array([list(self.window)], dtype=np.float32)
# Modell-inferens
rul_prediction = self.session.run(
["remaining_useful_life", "failure_probability"],
{"sensor_sequence": input_data}
)
rul_hours = float(rul_prediction[0][0])
failure_prob = float(rul_prediction[1][0])
return {
"remaining_useful_life_hours": rul_hours,
"failure_probability": failure_prob,
"recommendation": self._get_recommendation(rul_hours, failure_prob),
"confidence": self._calculate_confidence()
}
def _get_recommendation(self, rul: float, prob: float) -> str:
if prob > 0.8 or rul < 24:
return "IMMEDIATE_MAINTENANCE"
elif prob > 0.5 or rul < 168:
return "SCHEDULE_MAINTENANCE"
return "NORMAL_OPERATION"
Device-to-Cloud AI Pipelines
Pipeline-arkitektur
[Sensorer] → [Edge Gateway] → [Azure IoT Operations] → [IoT Hub] → [Stream Analytics]
↓ ↓ ↓
[Lokal inferens] [Edge AI-modell] [Azure ML / Fabric]
↓ ↓ ↓
[Sanntidsvarsler] [Kontekstualisert data] [Modelloppdatering]
↓ ↓
[Cloud feedback] ←←←←←←←← [Ny modellversjon]
Implementering med Azure ML og IoT Hub
# Device-to-Cloud AI Pipeline med modelloppdatering
from azure.iot.hub import IoTHubRegistryManager
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential
class AIEdgePipeline:
def __init__(self, hub_connection_string: str, ml_workspace: str):
self.registry_manager = IoTHubRegistryManager(hub_connection_string)
self.ml_client = MLClient(
DefaultAzureCredential(),
subscription_id="<sub-id>",
resource_group_name="<rg>",
workspace_name=ml_workspace
)
def deploy_model_to_edge(self, device_id: str, model_name: str, model_version: str):
"""Deploy oppdatert AI-modell til edge-enhet via IoT Hub device twin"""
twin = self.registry_manager.get_twin(device_id)
# Oppdater desired properties med ny modellinfo
twin_patch = {
"properties": {
"desired": {
"ai_model": {
"name": model_name,
"version": model_version,
"download_url": self._get_model_sas_url(model_name, model_version),
"checksum": self._get_model_checksum(model_name, model_version),
"updated_at": datetime.utcnow().isoformat()
}
}
}
}
self.registry_manager.update_twin(device_id, twin_patch)
def collect_edge_metrics(self, device_id: str) -> dict:
"""Hent ytelsesmetrikker fra edge AI-modul"""
twin = self.registry_manager.get_twin(device_id)
reported = twin.properties.reported.get("ai_metrics", {})
return {
"inference_count": reported.get("total_inferences", 0),
"avg_latency_ms": reported.get("avg_latency_ms", 0),
"model_version": reported.get("current_model_version", "unknown"),
"accuracy_drift": reported.get("accuracy_drift", 0),
"last_updated": reported.get("last_report_time")
}
Modell-feedback-loop
| Fase | Lokasjon | Handling | Verktoy |
|---|---|---|---|
| Datainnsamling | Edge | Samle inferensresultater og ground truth | IoT Edge modul |
| Dataaggregering | Edge | Komprimere og batche data | Data Flows |
| Dataoverfoering | Edge → Sky | Sende treningsdata til sky | IoT Hub / Event Hub |
| Modelltrening | Sky | Retrain/fine-tune modell | Azure ML |
| Modellvalidering | Sky | Evaluere ny modell mot baseline | Azure ML Endpoints |
| Modelldistribusjon | Sky → Edge | Pushe ny modell til edge | IoT Hub device twin |
| A/B-testing | Edge | Sammenligne modellversjoner | Edge-modul |
Norsk offentlig sektor
Relevante use cases
- Statens vegvesen: Sanntids verkontrollovervaking med AI-basert analyse av vaerdata, trafikkmonstre og veiforhold fra veistasjonssensorer
- Kystverket: Autonome sensorsystemer langs kysten for miljooverkaking og sikkerhet, med begrenset tilkobling
- Energisektoren: Smart styring av offentlige bygg med prediktiv vedlikeholdsanalyse av HVAC-systemer
- Helsesektoren: IoT-basert pasientovervaking pa sykehus med lokal AI for tidlig varsling
Regulatoriske hensyn
- Data fra sensorer i offentlig infrastruktur kan inneholde personopplysninger (kameradata, lokasjon)
- Schrems II krever at persondata prosesseres innenfor EOS
- NSM Grunnprinsipper gjelder for kritisk infrastruktur-systemer
- Personvernkonsekvensvurdering (DPIA) pakrevd for AI-basert overvaking
Beslutningsrammeverk
| Scenario | Anbefaling | Begrunnelse |
|---|---|---|
| < 100 sensorer, stabil tilkobling | Azure IoT Hub direkte | Enklest, lavest kostnad |
| 100-10 000 sensorer, variabel tilkobling | Azure IoT Operations pa AKS Edge | Lokal buffring og forbehandling |
| Kritisk sanntidsanalyse | Edge AI med Stream Analytics | Sub-sekund latens |
| Prediktiv vedlikehold | ONNX-modell pa edge gateway | Offline-kapabel, lav latens |
| Regulert miljoo (helse, forsvar) | Azure IoT Operations + Confidential Computing | Dataskydd i prosessering |
| Store datamengder, periodisk tilkobling | Edge-aggregering + batch-upload | Bandbreddesparing |
For Cosmo
- Azure IoT Operations er den foretrukne plattformen for industrielle IoT-AI-scenarier pa edge, med MQTT-basert kommunikasjon og Data Flows for datatransformasjon — anbefal dette fremfor eldre IoT Edge-moenstre
- Sensor data normalization er fundamentalt — uten standardisert datakvalitet og enhetskonvertering vil AI-modeller gi upanalitelige resultater, sa invester i Data Flow-transformasjoner for normalisering
- Gateway AI-forbehandling reduserer skyavhengighet dramatisk — anomalideteksjon og aggregering pa edge kan kutte bandwidth med 90%+ og gi sub-sekund responstid for kritiske hendelser
- Modelloppdatering via device twin er en etablert pattern for a holde edge AI-modeller oppdatert uten manuell intervensjon — bruk IoT Hub device twin for versjonsstyring og SAS-basert nedlasting
- For norsk offentlig sektor: Vurder alltid DPIA for sensor-AI-losninger som kan prosessere persondata, og sorg for at edge-prosessering begrenser hvilke data som forlater det lokale nettverket