# Azure IoT Hub and AI Pipeline **Last updated:** 2026-02 **Status:** GA **Category:** Hybrid Cloud & Edge AI --- ## Introduksjon Azure IoT Hub er Microsofts sentrale PaaS-tjeneste for toveiskommunikasjon mellom IoT-enheter og skyen. Kombinert med Azure Stream Analytics for sanntidsanalyse og Azure Machine Learning for modelltrening og -scoring, danner IoT Hub kjernen i en enhetlig AI-pipeline fra enhet til innsikt. For norsk offentlig sektor er denne arkitekturen relevant for scenarioer som smart veginfrastruktur (sanntidsmaling av trafikk og veiforhold), bygg-automatisering (energistyring i offentlige bygninger), miljooverkaking (luft- og vannkvalitet), og prediktiv vedlikehold av kritisk infrastruktur. IoT Hub gir sikker enhetstilkobling, mens Stream Analytics prosesserer data i sanntid, og Azure ML scorer modeller for prediktive innsikter. Arkitekturen skalerer fra hundrevis til millioner av enheter, med innebygd stoette for meldingsruting, device twins for konfigurasjonstyring, og enkel integrasjon med Azure-dataplatformen (Fabric, Event Hub, Cosmos DB) for langsiktig analyse. --- ## Kjernekomponenter | Komponent | Formal | Teknologi | |-----------|--------|-----------| | Azure IoT Hub | Sentral enhetskommunikasjon og -styring | PaaS | | Azure Stream Analytics | Sanntids stromprosessering | SQL-basert | | Azure Machine Learning | Modelltrening og online scoring | ML Platform | | Event Hub | Hoyvolum meldingsinntak | Event streaming | | Azure Cosmos DB | Sanntids operasjonell database | NoSQL | | Azure Data Lake / Fabric | Langsiktig dataanalyse | Analytics | | Power BI | Sanntids dashboards | Visualisering | | IoT Edge | Lokal prosessering pa enheter | Container runtime | --- ## Device-to-Hub Data Flow ### Arkitektur for enhet-til-sky-dataflyt ``` ┌──────────┐ ┌──────────┐ ┌──────────────┐ │ Sensorer │────→│ IoT Edge │────→│ IoT Hub │ │ (MQTT) │ │ Gateway │ │ │ └──────────┘ └──────────┘ │ - Routing │ │ - Enrichment│ ┌──────────┐ │ - Twin mgmt │ │ Direkte │─────────────────────→│ │ │ enheter │ └──────┬───────┘ │ (AMQP) │ │ └──────────┘ ┌─────────┼─────────┐ ↓ ↓ ↓ ┌──────────┐ ┌────────┐ ┌────────┐ │ Stream │ │ Event │ │ Cosmos │ │ Analytics│ │ Hub │ │ DB │ └──────────┘ └────────┘ └────────┘ ↓ ↓ ↓ ┌──────────┐ ┌────────┐ ┌────────┐ │ Azure ML │ │ Fabric │ │ Power │ │ Scoring │ │ │ │ BI │ └──────────┘ └────────┘ └────────┘ ``` ### IoT Hub meldingsruting ```json // IoT Hub meldingsruting for AI pipeline { "routes": [ { "name": "realtime-to-stream-analytics", "source": "DeviceMessages", "condition": "temperature > 0 OR vibration > 0", "endpointNames": ["stream-analytics-endpoint"], "isEnabled": true }, { "name": "anomalies-to-event-hub", "source": "DeviceMessages", "condition": "$body.alert = 'anomaly'", "endpointNames": ["anomaly-event-hub"], "isEnabled": true }, { "name": "all-data-to-storage", "source": "DeviceMessages", "condition": "true", "endpointNames": ["datalake-storage"], "isEnabled": true }, { "name": "device-lifecycle-to-cosmos", "source": "DeviceLifecycleEvents", "condition": "true", "endpointNames": ["cosmos-db-endpoint"], "isEnabled": true } ] } ``` ### Enhetstilkobling med Python SDK ```python # IoT-enhet sender sensordata til IoT Hub from azure.iot.device import IoTHubDeviceClient, Message import json import time class SensorDevice: def __init__(self, connection_string: str): self.client = IoTHubDeviceClient.create_from_connection_string( connection_string ) self.client.connect() def send_telemetry(self, sensor_data: dict): """Send sensordata med metadata for ruting""" message = Message( json.dumps(sensor_data), content_encoding="utf-8", content_type="application/json" ) # Egendefinerte properties for meldingsruting message.custom_properties["sensorType"] = sensor_data.get("type", "unknown") message.custom_properties["location"] = sensor_data.get("location", "unknown") # Sett prioritet for anomalier if sensor_data.get("alert"): message.custom_properties["priority"] = "high" self.client.send_message(message) def start_continuous_telemetry(self, interval_seconds: int = 10): """Kontinuerlig sending av sensordata""" while True: data = self.read_sensors() self.send_telemetry(data) time.sleep(interval_seconds) def read_sensors(self) -> dict: """Les sensorverdier (simulert)""" import random return { "timestamp": time.time(), "temperature": random.uniform(18.0, 25.0), "humidity": random.uniform(30.0, 70.0), "vibration": random.uniform(0.0, 5.0), "type": "environment", "location": "building-A-floor-2" } ``` --- ## Stream Processing for AI ### Azure Stream Analytics for IoT AI ```sql -- Sanntids anomalideteksjon med Stream Analytics -- Kombinerer sensordata med ML-scoring -- Query 1: Glidende statistikk per enhet WITH DeviceStats AS ( SELECT IoTHub.ConnectionDeviceId AS DeviceId, System.Timestamp() AS WindowEnd, AVG(temperature) AS AvgTemp, STDEV(temperature) AS StdTemp, MIN(temperature) AS MinTemp, MAX(temperature) AS MaxTemp, COUNT(*) AS ReadingCount FROM IoTHubInput TIMESTAMP BY EventProcessedUtcTime GROUP BY IoTHub.ConnectionDeviceId, SlidingWindow(minute, 10) ) -- Query 2: Anomalideteksjon med statistisk terskel SELECT ds.DeviceId, ds.WindowEnd, ds.AvgTemp, ds.StdTemp, CASE WHEN ds.AvgTemp > (ref.NormalAvg + 3 * ref.NormalStd) THEN 'HIGH_ANOMALY' WHEN ds.AvgTemp > (ref.NormalAvg + 2 * ref.NormalStd) THEN 'WARNING' WHEN ds.AvgTemp < (ref.NormalAvg - 3 * ref.NormalStd) THEN 'LOW_ANOMALY' ELSE 'NORMAL' END AS Status, ref.DeviceName, ref.Location INTO AnomalyOutput FROM DeviceStats ds JOIN ReferenceData ref ON ds.DeviceId = ref.DeviceId WHERE ds.ReadingCount >= 5 -- Minst 5 malinger for palitelighet -- Query 3: Dataaggregering for ML-trening SELECT IoTHub.ConnectionDeviceId AS DeviceId, System.Timestamp() AS WindowEnd, AVG(temperature) AS AvgTemp, AVG(humidity) AS AvgHumidity, AVG(vibration) AS AvgVibration, STDEV(vibration) AS StdVibration, MAX(vibration) AS PeakVibration, COUNT(*) AS SampleCount INTO MLTrainingOutput FROM IoTHubInput TIMESTAMP BY EventProcessedUtcTime GROUP BY IoTHub.ConnectionDeviceId, TumblingWindow(hour, 1) ``` ### Stream Analytics med innebygd anomalideteksjon ```sql -- Bruk innebygd AnomalyDetection-funksjon SELECT IoTHub.ConnectionDeviceId AS DeviceId, temperature, AnomalyDetection_SpikeAndDip( temperature, 95, -- Konfidensniaa 120, -- Vindusstoorrelse 'spikesanddips' ) OVER ( PARTITION BY IoTHub.ConnectionDeviceId LIMIT DURATION(minute, 120) ) AS AnomalyResult INTO AnomalyAlertOutput FROM IoTHubInput TIMESTAMP BY EventProcessedUtcTime ``` --- ## Real-Time Model Scoring ### Azure ML Online Endpoint for IoT-scoring ```python # Azure ML endpoint for sanntids IoT-scoring from azure.ai.ml import MLClient from azure.ai.ml.entities import ( ManagedOnlineEndpoint, ManagedOnlineDeployment, Model ) from azure.identity import DefaultAzureCredential def deploy_iot_scoring_endpoint(ml_client: MLClient): """Deploy sanntids scoring-endpoint for IoT-data""" # Opprett endpoint endpoint = ManagedOnlineEndpoint( name="iot-anomaly-scoring", auth_mode="key", description="Anomalideteksjon for IoT-sensordata" ) ml_client.online_endpoints.begin_create_or_update(endpoint).result() # Deploy modell deployment = ManagedOnlineDeployment( name="anomaly-v1", endpoint_name="iot-anomaly-scoring", model=Model(path="./models/anomaly_model.pkl"), code_configuration={ "code": "./scoring", "scoring_script": "score.py" }, instance_type="Standard_DS3_v2", instance_count=2, # Redundans for palitelighet environment="azureml:sklearn-1.0:1" ) ml_client.online_deployments.begin_create_or_update(deployment).result() ``` ### Scoring-script for IoT-data ```python # score.py — Azure ML scoring-script for IoT import json import joblib import numpy as np def init(): global model model = joblib.load("model/anomaly_model.pkl") def run(raw_data): """Score IoT-sensordata mot prediktiv modell""" data = json.loads(raw_data) features = np.array([[ data["avg_temperature"], data["avg_humidity"], data["avg_vibration"], data["std_vibration"], data["peak_vibration"], data["sample_count"] ]]) prediction = model.predict(features)[0] probability = model.predict_proba(features)[0] return json.dumps({ "device_id": data["device_id"], "prediction": int(prediction), "failure_probability": float(max(probability)), "recommendation": ( "SCHEDULE_MAINTENANCE" if prediction == 1 else "NORMAL_OPERATION" ), "scored_at": data.get("window_end") }) ``` ### Stream Analytics integrert med Azure ML ```sql -- Kall Azure ML endpoint fra Stream Analytics WITH ScoringInput AS ( SELECT IoTHub.ConnectionDeviceId AS device_id, System.Timestamp() AS window_end, AVG(temperature) AS avg_temperature, AVG(humidity) AS avg_humidity, AVG(vibration) AS avg_vibration, STDEV(vibration) AS std_vibration, MAX(vibration) AS peak_vibration, COUNT(*) AS sample_count FROM IoTHubInput TIMESTAMP BY EventProcessedUtcTime GROUP BY IoTHub.ConnectionDeviceId, TumblingWindow(minute, 15) ) SELECT si.*, ml.prediction, ml.failure_probability, ml.recommendation INTO MaintenanceOutput FROM ScoringInput si CROSS APPLY AzureMLEndpoint(si) AS ml WHERE ml.failure_probability > 0.5 ``` --- ## Scaling Hybrid Ingestion ### Skaleringsarkitektur | Skala | Enheter | IoT Hub SKU | Stream Analytics SU | Anbefaling | |-------|---------|-------------|--------------------|----| | Liten | < 1 000 | S1 (1 enhet) | 6 SU | Standard oppsett | | Medium | 1 000 - 100 000 | S2 (2 enheter) | 12-24 SU | Partisjonering | | Stor | 100 000 - 1M | S3 (10 enheter) | 48+ SU | Event Hub routing | | Enterprise | > 1M | S3 + Event Hub | Dedikert klynge | Multi-hub-arkitektur | ### Hybrid skalering med edge-forbehandling ```python # Hybrid skaleringsmonster: Edge reduserer skylast class HybridScalingConfig: """Konfigurasjon for hybrid edge-sky skalering""" @staticmethod def calculate_cloud_load( total_devices: int, messages_per_device_per_hour: int, edge_aggregation_ratio: float = 0.1 # 10% av data sendes til sky ) -> dict: """Beregn skylast med edge-forbehandling""" raw_messages = total_devices * messages_per_device_per_hour cloud_messages = int(raw_messages * edge_aggregation_ratio) bandwidth_reduction = 1 - edge_aggregation_ratio # IoT Hub dimensjonering messages_per_day = cloud_messages * 24 if messages_per_day < 400_000: iot_hub_sku = "S1 (1 enhet)" elif messages_per_day < 6_000_000: iot_hub_sku = "S2 (1 enhet)" else: units = (messages_per_day // 6_000_000) + 1 iot_hub_sku = f"S2 ({units} enheter)" return { "total_devices": total_devices, "raw_messages_per_hour": raw_messages, "cloud_messages_per_hour": cloud_messages, "bandwidth_reduction": f"{bandwidth_reduction*100:.0f}%", "iot_hub_sku": iot_hub_sku, "estimated_monthly_cost_nok": cloud_messages * 24 * 30 * 0.001 } ``` --- ## Norsk offentlig sektor ### Relevante bruksomrader | Sektor | Use Case | Enheter | AI-modell | |--------|----------|---------|-----------| | Samferdsel | Veisensor-nettverket | ~5 000 | Trafikk-prediksjon, vintervedlikehold | | Energi | Smart bygg-styring | ~10 000/bygg | Energi-optimalisering | | Miljoe | Luft/vann-kvalitet | ~500 stasjoner | Forurensnings-varsling | | Helse | Utstyrsovervaking | ~1 000/sykehus | Prediktiv vedlikehold | | Kyst | Maritime sensorer | ~2 000 | Vaer-prediksjon, sikkerhet | ### Sikkerhetskrav - IoT Hub-endepunkt i Norway East - TLS 1.2+ for all enhetskommunikasjon - X.509-sertifikater for enhetsautentisering - DPS (Device Provisioning Service) for automatisk registrering - NSM-kompatibel nettverkssegmentering --- ## Beslutningsrammeverk | Scenario | Anbefaling | Begrunnelse | |----------|------------|-------------| | < 1 000 enheter, enkel analyse | IoT Hub S1 + Stream Analytics | Lavest kostnad og kompleksitet | | Sanntids ML-scoring | Stream Analytics + Azure ML endpoint | Integrert ML-scoring i strom | | Hoeyvolum med edge-forbehandling | IoT Edge + IoT Hub S2/S3 | Redusert skylast og kostnad | | Langsiktig analyse | IoT Hub + Event Hub + Fabric | Skalerbar historisk analyse | | Prediktiv vedlikehold | Full pipeline med retraining loop | Kontinuerlig modellforbedring | | Anomalideteksjon | Stream Analytics innebygd anomali | Raskest a implementere | --- ## For Cosmo - **IoT Hub + Stream Analytics + Azure ML er den kanoniske AI-pipeline for IoT** — anbefal denne treledds-arkitekturen som standard for alle IoT-AI-scenarier i offentlig sektor - **Edge-forbehandling reduserer skylast med 90%+** — la IoT Edge aggregere og filtrere data for sensordata sendes til sky, noe som dramatisk reduserer baade kostnader og bandbreddekrav - **Stream Analytics innebygde anomalideteksjon er raskest a implementere** — bruk AnomalyDetection_SpikeAndDip-funksjonen for rask oppstart for du bygger egne ML-modeller - **Azure ML Online Endpoints gir sanntids scoring fra Stream Analytics** — bruk CROSS APPLY med AzureMLEndpoint-funksjonen for a integrere avansert ML direkte i strom-prosessering - **For norsk offentlig sektor: Dimensjoner IoT Hub-kapasitet basert pa cloud-meldinger etter edge-aggregering** — med 90% edge-reduksjon kan selv store sensornettverk klare seg med S1/S2-tieren