# Real-Time Streaming for AI Applications **Last updated:** 2026-02 **Status:** GA **Category:** Data Engineering for AI --- ## Introduksjon Sanntidsdatastrømming er en fundamental byggestein for AI-applikasjoner som krever umiddelbar respons på hendelser -- fra IoT-sensorer og transaksjoner til brukeratferd og systemmetrikker. Microsoft Fabric Real-Time Intelligence kombinert med Azure Event Hubs og Apache Kafka gir en komplett plattform for inntak, transformasjon og analyse av strømmedata som mater AI-modeller med oppdatert informasjon. For norsk offentlig sektor er sanntidsarkitektur særlig relevant for trafikkmonitorering (Direktoratet for digital tjenesteutvikling), helseovervåking, energistyring og beredskapsrespons. Evnen til å oppdage avvik i sanntid og utløse automatiserte handlinger basert på AI-prediksjoner kan redusere responstider dramatisk og forbedre tjenestekvalitet. Denne referansen dekker arkitekturmønstre for å integrere Event Hubs, Kafka og Fabric Eventstream med AI-applikasjoner, inkludert Spark Structured Streaming, KQL Database for tidsserieanalyse, og mønster for hendelsesfiltrering og avledede strømmer. --- ## Eventstream Connectors and Topologies ### Fabric Eventstream Overview Microsoft Fabric Eventstream er en fullstendig administrert hendelsesinntak- og strømmetjeneste som muliggjør sanntidsdatabehandling uten kode. | Kilde-type | Eksempler | Autentisering | |---|---|---| | Microsoft-kilder | Azure Event Hubs, Azure IoT Hub, Azure Service Bus | Managed identity, SAS | | Database CDC | Azure SQL DB, PostgreSQL, MySQL, Cosmos DB, SQL MI | Connection string | | Kafka-kilder | Confluent Cloud, Apache Kafka, Amazon MSK | SASL/PLAIN, OAuth | | Andre skyer | Amazon Kinesis, Google Cloud Pub/Sub | IAM credentials | | Fabric-hendelser | Workspace item events, Blob Storage events | Built-in | ### Topology Patterns ``` ┌──────────────┐ IoT Hub ────────>│ │────> KQL Database (tidsserier) │ │ Event Hubs ─────>│ Eventstream │────> Lakehouse (Delta tables) │ │ Kafka ──────────>│ (Filter + │────> Spark Notebook (ML) │ Transform) │ CDC (SQL) ──────>│ │────> Derived Stream (Real-Time Hub) └──────────────┘ ``` ### Konfigurere Event Hubs som kilde ```python # Eventstream configuration via Fabric UI or API # Event Hub connection parameters event_hub_config = { "namespace": "my-eventhub-ns.servicebus.windows.net", "event_hub": "ai-telemetry", "consumer_group": "$Default", "data_format": "Json", "authentication": "SharedAccessKey" } ``` ### Destinasjoner Eventstream støtter flere destinasjoner parallelt: | Destinasjon | Bruksområde | Latens | |---|---|---| | **Eventhouse (KQL Database)** | Tidsserieanalyse, ad-hoc-spørringer | Sekunder | | **Lakehouse** | Historisk analyse, Delta Lake lagring | Minutter | | **Spark Notebook** | Sanntids ML-inferens | Sekunder | | **Derived Stream** | Viderefordeling til andre forbrukere | Sub-sekund | | **Fabric Activator** | Automatiserte handlinger og varsler | Sekunder | | **Custom Endpoint** | Ekstern applikasjonsintegrasjon | Variabel | --- ## Structured Streaming with Spark ### Spark Structured Streaming i Fabric Fabric Notebooks kan lese direkte fra Eventstream via Spark Structured Streaming uten manuell tilkoblingskonfigurasjon. ```python # Les strømmende data fra Eventstream i Fabric Notebook # Parameter-verdier settes automatisk via "Read with Spark" i UI df_stream = ( spark.readStream .format("fabricEventStream") .option("eventstream.itemid", "") .option("eventstream.datasourceid", "") .load() ) # Vis skjema df_stream.printSchema() ``` ### Transformasjoner på strømmende data ```python from pyspark.sql.functions import col, window, avg, count, from_json from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType # Definer skjema for innkommende JSON schema = StructType() \ .add("sensorId", StringType()) \ .add("temperature", DoubleType()) \ .add("humidity", DoubleType()) \ .add("timestamp", TimestampType()) # Parse JSON og beregn vindusaggregater parsed_stream = ( df_stream .select(from_json(col("body").cast("string"), schema).alias("data")) .select("data.*") ) # 5-minutters glidende vindu med aggregater windowed_aggregates = ( parsed_stream .withWatermark("timestamp", "10 minutes") .groupBy( window(col("timestamp"), "5 minutes", "1 minute"), col("sensorId") ) .agg( avg("temperature").alias("avg_temp"), avg("humidity").alias("avg_humidity"), count("*").alias("event_count") ) ) ``` ### Skrive til Delta Lake (Lakehouse) ```python # Skriv strømmede data til Delta-tabell med optimalisering query = ( windowed_aggregates .writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "Tables/_checkpoints/sensor_agg") .trigger(processingTime="1 minute") # Batch hvert minutt .toTable("sensor_aggregates") ) query.awaitTermination() ``` ### Optimalisering av strømmeskrivinger | Teknikk | Beskrivelse | Anbefalt bruk | |---|---|---| | **Trigger interval** | `processingTime="1 minute"` batches hendelser | Reduserer små filer | | **Optimized Write** | `spark.databricks.delta.optimizeWrite.enabled` | Automatisk filstørrelsesoptimalisering | | **Partitioning** | `partitionBy("date", "sensorId")` | Når filtrering på partisjonsnøkler er vanlig | | **Repartition** | `repartition(48)` før skriving | Parallellisering over CPU-kjerner | | **Coalesce** | `coalesce(4)` for lav throughput | Unngår for mange små filer | --- ## KQL Database for Time-Series Analytics ### Eventhouse og KQL Database KQL Database i Fabric er optimalisert for tidsseriedata og gir sub-sekund spørringsrespons over milliarder av rader. ```kql // Tidsserieanalyse med KQL // Beregn glidende gjennomsnitt for sensortemperatur SensorData | where Timestamp > ago(24h) | summarize AvgTemp = avg(Temperature) by bin(Timestamp, 5m), SensorId | render timechart ``` ```kql // Anomalideteksjon med innebygd series_decompose_anomalies let min_t = ago(7d); let max_t = now(); SensorData | make-series AvgTemp = avg(Temperature) on Timestamp from min_t to max_t step 1h by SensorId | extend (anomalies, score, baseline) = series_decompose_anomalies(AvgTemp, 1.5, -1, 'linefit') | mv-expand Timestamp to typeof(datetime), AvgTemp to typeof(double), anomalies to typeof(int), score to typeof(double), baseline to typeof(double) | where anomalies != 0 ``` ### Sammenligning: KQL Database vs Lakehouse for strømmedata | Egenskap | KQL Database | Lakehouse (Delta) | |---|---|---| | **Optimal for** | Tidsserier, logdata, IoT | Strukturert analyse, ML-trening | | **Spørrespråk** | KQL | SQL, PySpark | | **Latens** | Sub-sekund | Sekunder til minutter | | **Retensjon** | Konfigurerbar policy | Ubegrenset (manuell VACUUM) | | **Innebygd ML** | Anomalideteksjon, forecasting | Via notebooks | | **Format** | Proprietært (optimalisert) | Delta Lake (åpent) | | **One Logical Copy** | Ja, til OneLake | Native | --- ## Event Filtering and Derived Streams ### Filtrering i Eventstream Eventstream støtter no-code transformasjoner direkte i strømmen: - **Filter**: Fjern hendelser basert på betingelser - **Manage Fields**: Velg, omdøp, fjern felt - **Group By**: Aggreger over tidsvindu - **Union**: Kombiner flere strømmer - **Expand**: Flatten nestede strukturer ### Derived Streams (avledede strømmer) ``` Eventstream (rå data) │ ├── Filter: temperature > 50 ──> Derived Stream: "high-temp-alerts" │ │ │ ├──> Activator (varsling) │ └──> KQL Database │ ├── Group By: 5min avg ────────> Derived Stream: "sensor-aggregates" │ │ │ └──> Lakehouse │ └── All events ────────────────> KQL Database (rå logging) ``` ### Content-Based Routing ```python # Pseudo-kode for content-based routing via Spark from pyspark.sql.functions import col # Les fra Eventstream raw_stream = spark.readStream.format("fabricEventStream").load() # Route basert på hendelsestype critical_events = raw_stream.filter(col("severity") == "CRITICAL") info_events = raw_stream.filter(col("severity") == "INFO") # Skriv til forskjellige destinasjoner critical_query = ( critical_events.writeStream .format("delta") .toTable("critical_alerts") ) info_query = ( info_events.writeStream .format("delta") .toTable("info_logs") ) ``` --- ## Streaming SLAs and Backpressure Handling ### SLA-dimensjoner for strømmesystemer | Dimensjon | Mål | Metric | |---|---|---| | **End-to-end latens** | < 5 sekunder for varsler | P99 latens | | **Throughput** | Minimum events/sek som må håndteres | Events per second | | **Data completeness** | Ingen tapte hendelser | Missing event rate | | **Processing guarantee** | At-least-once eller exactly-once | Delivery semantics | | **Recovery time** | Tid fra feil til normal drift | RTO | ### Backpressure-strategier ```python # Spark Structured Streaming med rate limiting query = ( df_stream .writeStream .format("delta") .option("maxOffsetsPerTrigger", 10000) # Begrens per batch .trigger(processingTime="30 seconds") .toTable("processed_events") ) ``` ### Event Hubs Partisjonering for skalering ```python # Event Hubs partisjonskonfigurasjon # Anbefalt: 4-32 partisjoner avhengig av throughput # Hver partisjon støtter opptil 1 MB/s inntak, 2 MB/s uttak # Fabric Eventstream håndterer automatisk partisjonskonsumering # For manuell Kafka-tilgang: kafka_config = { "kafka.bootstrap.servers": "eventstream-xxx.servicebus.windows.net:9093", "subscribe": "es_topic", "kafka.sasl.mechanism": "PLAIN", "kafka.security.protocol": "SASL_SSL", "startingOffsets": "latest", "maxOffsetsPerTrigger": 50000 } df = spark.readStream.format("kafka").options(**kafka_config).load() ``` ### Retry Policy for Spark Job Definitions For produksjonsmiljøer anbefales Spark Job Definitions over Notebooks: | Parameter | Anbefalt verdi | Begrunnelse | |---|---|---| | **Retry enabled** | Ja | Automatisk gjenstart ved feil | | **Max retries** | Ubegrenset | For kontinuerlige strømmejobber | | **Retry interval** | 60 sekunder | Unngå storm of retries | | **Checkpoint** | Alltid konfigurert | Gjenoppta fra siste posisjon | ### Monitoring Spark Structured Streaming UI gir innebygde metrikker: - Input Rate (hendelser/sekund) - Process Rate (hendelser/sekund) - Batch Duration (ms) - Input Rows per batch - Operation Duration breakdown --- ## Arkitekturmonstre for AI med sanntidsdata ### Lambda Architecture (hybrid batch + streaming) ``` ┌───────────────────┐ │ Event Hubs / │ │ Kafka Source │ └─────┬────┬────────┘ │ │ ┌───────────┘ └──────────────┐ │ │ ┌────────▼────────┐ ┌─────────▼──────────┐ │ Speed Layer │ │ Batch Layer │ │ (Eventstream │ │ (Data Factory + │ │ + KQL DB) │ │ Lakehouse) │ └────────┬─────────┘ └─────────┬──────────┘ │ │ ┌────────▼───────────────────────────────▼──────────┐ │ Serving Layer │ │ (Power BI, AI Models, REST APIs) │ └───────────────────────────────────────────────────┘ ``` ### Kappa Architecture (rent strømmende) Forenklet arkitektur der all data behandles som strømmer: ``` Event Source ──> Eventstream ──> Spark Structured Streaming │ ├──> Delta Table (append-only) ├──> ML Inference (real-time) └──> KQL Database (analytics) ``` --- ## Referanser - [Overview of Microsoft Fabric Eventstreams](https://learn.microsoft.com/en-us/fabric/real-time-intelligence/event-streams/overview) -- Komplett oversikt over Eventstream-kapabiliteter - [Query Eventstream from Notebook with Spark Structured Streaming](https://learn.microsoft.com/en-us/fabric/data-engineering/notebook-with-event-stream) -- Integrasjon mellom Notebook og Eventstream - [Get streaming data into Lakehouse with Spark Structured Streaming](https://learn.microsoft.com/en-us/fabric/data-engineering/lakehouse-streaming-data) -- Optimalisering av Delta Lake-skrivinger - [What is Real-Time Intelligence?](https://learn.microsoft.com/en-us/fabric/real-time-intelligence/overview) -- Oversikt over Real-Time Intelligence - [Introduction to Fabric Real-Time Hub](https://learn.microsoft.com/en-us/fabric/real-time-hub/real-time-hub-overview) -- Real-Time Hub connectors og integrasjon - [Get events from Azure Event Hubs into Real-Time hub](https://learn.microsoft.com/en-us/fabric/real-time-hub/add-source-azure-event-hubs) -- Event Hubs-kobling - [Add and manage destinations in an Eventstream](https://learn.microsoft.com/en-us/fabric/real-time-intelligence/event-streams/add-manage-eventstream-destinations) -- Alle støttede destinasjoner - [Get data from Azure Event Hubs](https://learn.microsoft.com/en-us/fabric/real-time-intelligence/get-data-event-hub) -- Direct ingestion til KQL Database --- ## For Cosmo - **Bruk denne referansen** når brukeren trenger sanntidsdatabehandling for AI-modeller, IoT-integrasjon, eller hendelsesdrevet arkitektur på Microsoft Fabric. - Anbefal **Eventstream + KQL Database** for tidsserieanalyse og anomalideteksjon med sub-sekund latens, og **Eventstream + Lakehouse** for historisk ML-trening. - For norsk offentlig sektor: Fremhev at Fabric Eventstream kjorer som SaaS i europeisk region (Norway East/West Europe), noe som forenkler personvernhensyn sammenlignet med selvhostede Kafka-clustere. - Bruk **Spark Job Definitions** (ikke Notebooks) i produksjon for strømmejobber, med retry-policy aktivert for høy tilgjengelighet. - Anbefal **Derived Streams** for å dele sanntidsdata mellom team uten å duplisere infrastruktur -- en enkelt Eventstream kan mate flere destinasjoner med forskjellige transformasjoner.