# Data Versioning and Lineage Tracking **Last updated:** 2026-02 **Status:** GA **Category:** Data Engineering for AI --- ## Introduksjon Dataversionskontroll og lineage-sporing er grunnleggende kapabiliteter for pålitelige AI-systemer. Versjonskontroll gjør det mulig å reprodusere eksakt de dataene en modell ble trent på, mens lineage dokumenterer hele datareisen fra kilde til ferdig prediksjon. Sammen gir de sporbarhet, reproduserbarhet og tillitsgrunnlag for AI-beslutninger. For norsk offentlig sektor er dette spesielt viktig gitt kravene i Utredningsinstruksen om etterprøvbarhet, Forvaltningslovens krav til dokumentasjon av vedtak, og EU AI Act sine krav til høyrisiko AI-systemer. En modell som påvirker borgeres rettigheter -- for eksempel NAV-ytelser eller byggetillatelser -- må kunne forklares og dokumenteres fra kilde til prediksjon. Denne referansen dekker Delta Lake versjonskontroll og tidsreise, commit-historikk og audit trails, lineage-visualisering i Purview og Fabric, avhengighetskartlegging, og strategier for rollback og gjenoppretting. --- ## Delta Lake Versioning and Time-Travel ### Versjonskontroll-modell Delta Lake bruker en Write-Ahead Log (WAL) i `_delta_log`-mappen som registrerer alle transaksjoner: ``` Tables/ml_training_data/ ├── _delta_log/ │ ├── 00000000000000000000.json # v0: Initial create (2026-01-01) │ ├── 00000000000000000001.json # v1: Append new data (2026-01-15) │ ├── 00000000000000000002.json # v2: Feature update (2026-02-01) │ ├── 00000000000000000003.json # v3: Delete PII (2026-02-05) │ └── 00000000000000000004.json # v4: Schema evolution (2026-02-10) ├── part-00000-*.snappy.parquet ├── part-00001-*.snappy.parquet └── ... ``` ### Versjonsspørringer ```python from delta.tables import DeltaTable # Les nåværende versjon df_current = spark.read.format("delta").table("gold.ml_training_data") # Les spesifikk versjon df_v2 = spark.read.format("delta") \ .option("versionAsOf", 2) \ .table("gold.ml_training_data") # Les data slik de var på et tidspunkt df_jan = spark.read.format("delta") \ .option("timestampAsOf", "2026-01-15T00:00:00Z") \ .table("gold.ml_training_data") # Sammenlign versjoner for å oppdage endringer from pyspark.sql.functions import col added_rows = df_v2.subtract(df_v1) # Rader i v2 som ikke finnes i v1 removed_rows = df_v1.subtract(df_v2) # Rader i v1 som ikke finnes i v2 print(f"Nye rader: {added_rows.count()}") print(f"Fjernede rader: {removed_rows.count()}") ``` ### Versjonskontroll for ML-eksperimenter ```python import mlflow # Logg data-versjon som del av ML-eksperiment with mlflow.start_run(run_name="churn_model_v3"): # Hent Delta-tabell-versjon dt = DeltaTable.forPath(spark, "Tables/gold/ml_training_data") current_version = dt.history(1).select("version").collect()[0][0] # Logg metadata mlflow.log_param("data_table", "gold.ml_training_data") mlflow.log_param("data_version", current_version) mlflow.log_param("data_timestamp", "2026-02-10T00:00:00Z") mlflow.log_param("row_count", df_current.count()) mlflow.log_param("column_count", len(df_current.columns)) # Tren modell... # mlflow.sklearn.log_model(model, "model") # Senere: Reproduser treningsdata eksakt # df_reproduced = spark.read.format("delta") # .option("versionAsOf", logged_version) # .table("gold.ml_training_data") ``` --- ## Commit History and Audit Trails ### DESCRIBE HISTORY ```sql -- Vis full transaksjonshistorikk DESCRIBE HISTORY gold.ml_training_data; -- Resultat: -- version | timestamp | operation | operationParameters | operationMetrics -- 4 | 2026-02-10 14:30:00 | WRITE | {mode: Append} | {numFiles: 3, numOutputRows: 15000} -- 3 | 2026-02-05 09:15:00 | DELETE | {predicate: [pii_flag=1]} | {numDeletedRows: 250, numRemovedFiles: 2} -- 2 | 2026-02-01 02:00:00 | MERGE | {predicate: ...} | {numUpdatedRows: 3400, numInsertedRows: 1200} -- 1 | 2026-01-15 02:00:00 | WRITE | {mode: Append} | {numFiles: 5, numOutputRows: 50000} -- 0 | 2026-01-01 10:00:00 | CREATE | {partitionBy: [date]} | {numFiles: 10, numOutputRows: 100000} ``` ### PySpark History API ```python from delta.tables import DeltaTable dt = DeltaTable.forPath(spark, "Tables/gold/ml_training_data") # Hent historikk history = dt.history() # Detaljert analyse av endringer display( history.select( "version", "timestamp", "operation", "operationParameters", "operationMetrics", "userName", "notebook.notebookId" ).orderBy("version", ascending=False) ) # Filtrer på spesifikke operasjoner deletes = history.filter("operation = 'DELETE'") merges = history.filter("operation = 'MERGE'") ``` ### Custom Audit Logging ```python from pyspark.sql import functions as F from datetime import datetime def log_data_operation(operation, table_name, details, user="system"): """Logg datapipelineoperasjoner til audit-tabell.""" audit_record = spark.createDataFrame([{ "timestamp": datetime.utcnow().isoformat(), "operation": operation, "table_name": table_name, "user": user, "details": str(details), "pipeline_run_id": spark.conf.get("spark.pipeline.runId", "interactive") }]) audit_record.write.format("delta") \ .mode("append") \ .saveAsTable("governance.data_audit_log") # Bruk i pipeline log_data_operation( operation="FEATURE_UPDATE", table_name="gold.ml_training_data", details={ "source_version": 3, "target_version": 4, "rows_added": 15000, "rows_updated": 3400, "features_modified": ["txn_7d_count", "income_band"] } ) ``` --- ## Data Lineage Visualization in Purview ### Lineage-kilder i Purview Purview fanger automatisk lineage fra flere kilder: | Dataprosesseringssystem | Lineage-omfang | |---|---| | **Azure Data Factory** | Copy Activity, Data Flow, SSIS | | **Fabric Data Factory** | Pipelines, Dataflow Gen2 | | **Fabric Notebooks** | Lakehouse → Lakehouse (item-level) | | **Azure Synapse Analytics** | Copy Activity, Data Flow | | **Power BI** | Semantic Model → Report → Dashboard | ### Lineage-visning i Fabric ``` Lineage-visning for en ML-pipeline: Azure SQL DB Lakehouse (Bronze) Lakehouse (Silver) ┌──────────┐ ┌──────────────┐ ┌──────────────┐ │ customers│──Copy──>│ raw_customers│──Notebook>│ customer_360 │ └──────────┘ └──────────────┘ └──────┬───────┘ │ Blob Storage Lakehouse (Bronze) │ ┌──────────┐ ┌──────────────┐ │ Notebook │ events │──Copy──>│ raw_events │──Notebook──>─────┘ └──────────┘ └──────────────┘ │ ▼ ┌──────────────┐ │ Gold: │ │ ml_features │ └──────┬───────┘ │ ┌──────▼───────┐ │ ML Experiment│ │ (MLflow) │ └──────┬───────┘ │ ┌──────▼───────┐ │ Power BI │ │ Dashboard │ └──────────────┘ ``` ### Tilgang til lineage Lineage i Fabric er tilgjengelig fra: 1. **Workspace toolbar**: Velg "Lineage view" 2. **Item options menu**: Høyreklikk på element → "View lineage" 3. **Item details page**: Under menyelementer øverst 4. **Purview Unified Catalog**: Browse → Microsoft Fabric → Fabric Workspaces ### Materialized Lake Views med auto-lineage ```sql -- Materialized Lake Views genererer automatisk lineage CREATE SCHEMA IF NOT EXISTS silver; CREATE MATERIALIZED LAKE VIEW IF NOT EXISTS silver.customer_features AS SELECT c.customer_id, c.name, c.region, COUNT(t.transaction_id) AS total_transactions, SUM(t.amount) AS total_amount, AVG(t.amount) AS avg_transaction_amount FROM bronze.customers c JOIN bronze.transactions t ON c.customer_id = t.customer_id GROUP BY c.customer_id, c.name, c.region; -- Lineage er automatisk sporet: -- bronze.customers + bronze.transactions → silver.customer_features ``` --- ## Upstream/Downstream Dependency Mapping ### Avhengighetsgraf ```python # Kartlegg avhengigheter programmatisk dependency_graph = { "bronze.raw_customers": { "sources": ["Azure SQL DB: customers_table"], "consumers": ["silver.customer_360", "silver.customer_features"] }, "silver.customer_360": { "sources": ["bronze.raw_customers", "bronze.raw_contacts", "bronze.raw_opportunities"], "consumers": ["gold.churn_features", "gold.revenue_predict_features"] }, "gold.churn_features": { "sources": ["silver.customer_360", "silver.transaction_features"], "consumers": ["ML Experiment: churn_model_v3", "Power BI: Churn Dashboard"] } } def get_upstream_dependencies(table_name, graph, depth=0, max_depth=5): """Rekursivt finn alle oppstrøms avhengigheter.""" if depth > max_depth or table_name not in graph: return [] sources = graph[table_name].get("sources", []) all_upstream = list(sources) for source in sources: all_upstream.extend( get_upstream_dependencies(source, graph, depth + 1, max_depth) ) return all_upstream def get_downstream_impact(table_name, graph, depth=0, max_depth=5): """Finn alle nedstrøms konsumenter (impact-analyse).""" if depth > max_depth or table_name not in graph: return [] consumers = graph[table_name].get("consumers", []) all_downstream = list(consumers) for consumer in consumers: all_downstream.extend( get_downstream_impact(consumer, graph, depth + 1, max_depth) ) return all_downstream # Eksempel: Hva påvirkes hvis vi endrer bronze.raw_customers? impact = get_downstream_impact("bronze.raw_customers", dependency_graph) print(f"Påvirkede elementer: {impact}") # ['silver.customer_360', 'silver.customer_features', # 'gold.churn_features', 'gold.revenue_predict_features', # 'ML Experiment: churn_model_v3', 'Power BI: Churn Dashboard'] ``` ### Data Contract Pattern ```python # Definer data contracts mellom lag data_contract = { "table": "silver.customer_360", "version": "2.1", "owner": "data-engineering@example.no", "sla": { "freshness": "24 hours", "completeness": "> 99.5%", "accuracy": "validated against source" }, "schema": { "required_columns": ["customer_id", "name", "region", "total_revenue"], "column_types": { "customer_id": "string", "total_revenue": "double", "region": "string" }, "partitioned_by": ["region"], "row_count_range": [100000, 500000] }, "consumers": [ {"team": "ml-team", "usage": "churn prediction features"}, {"team": "bi-team", "usage": "customer dashboard"} ] } ``` --- ## Rollback and Recovery Strategies ### Delta Lake RESTORE ```sql -- Gjenopprett tabell til spesifikk versjon RESTORE TABLE gold.ml_training_data TO VERSION AS OF 2; -- Gjenopprett til tidspunkt RESTORE TABLE gold.ml_training_data TO TIMESTAMP AS OF '2026-02-01T00:00:00Z'; ``` ```python # PySpark RESTORE dt = DeltaTable.forPath(spark, "Tables/gold/ml_training_data") dt.restoreToVersion(2) # Verifiser print(f"Gjenopprettet til versjon 2") print(f"Rader: {spark.read.format('delta').table('gold.ml_training_data').count()}") ``` ### Recovery-strategier | Scenario | Strategi | Kommando | |---|---|---| | **Feilaktig DELETE** | Restore til forrige versjon | `RESTORE TABLE ... TO VERSION AS OF n-1` | | **Korrupt data lastet** | Restore til pre-load versjon | `RESTORE TABLE ... TO TIMESTAMP AS OF '...'` | | **Schema-feil** | Restore + re-apply korrekt schema | Restore + ALTER TABLE | | **Hel tabell tapt** | Gjenskape fra kildedata + audit log | Kjør pipeline på nytt | | **VACUUM kjørt for tidlig** | Ingen recovery mulig! | Forebygg: minimum 7d retention | ### Rollback av ML-eksperiment ```python import mlflow def rollback_to_experiment(run_id): """Gjenopprett data og modell fra en tidligere MLflow-run.""" # Hent metadata fra MLflow run = mlflow.get_run(run_id) data_version = int(run.data.params["data_version"]) data_table = run.data.params["data_table"] # Gjenopprett treningsdata df_original = spark.read.format("delta") \ .option("versionAsOf", data_version) \ .table(data_table) print(f"Gjenopprettet data fra versjon {data_version}") print(f"Rader: {df_original.count()}") # Last modell model = mlflow.sklearn.load_model(f"runs:/{run_id}/model") return df_original, model ``` ### Forebyggende tiltak ```python # 1. Sett minimum VACUUM-retensjon spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "true") # Standard 7 dager = 168 timer # 2. Aktiver Change Data Feed for sporbar endringshåndtering spark.sql(""" ALTER TABLE gold.ml_training_data SET TBLPROPERTIES (delta.enableChangeDataFeed = true) """) # 3. Les Change Data Feed changes = spark.read.format("delta") \ .option("readChangeFeed", "true") \ .option("startingVersion", 3) \ .option("endingVersion", 4) \ .table("gold.ml_training_data") # Viser _change_type: insert, update_preimage, update_postimage, delete display(changes.select("customer_id", "_change_type", "_commit_version", "_commit_timestamp")) ``` --- ## Referanser - [Lineage in Fabric](https://learn.microsoft.com/en-us/fabric/governance/lineage) -- Innebygd lineage-visning i Fabric - [How to get lineage from Microsoft Fabric items into Microsoft Purview](https://learn.microsoft.com/en-us/purview/data-map-lineage-fabric) -- Purview lineage for Fabric - [Data lineage in classic Data Catalog](https://learn.microsoft.com/en-us/purview/data-gov-classic-lineage) -- Lineage-konsepter og granularitet - [Delta Lake table format interoperability](https://learn.microsoft.com/en-us/fabric/fundamentals/delta-lake-interoperability) -- Delta Lake-versjonering - [What is Delta Lake?](https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-what-is-delta-lake) -- Delta Lake oversikt med time-travel - [Get started with materialized lake views](https://learn.microsoft.com/en-us/fabric/data-engineering/materialized-lake-views/get-started-with-materialized-lake-views) -- Auto-lineage via materialized views - [Data lineage (Cloud Adoption Framework)](https://learn.microsoft.com/en-us/azure/cloud-adoption-framework/scenarios/cloud-scale-analytics/govern-lineage) -- Lineage-strategi --- ## For Cosmo - **Bruk denne referansen** når brukeren trenger reproduserbarhet for ML-modeller, audit trail for AI-beslutninger, eller impact-analyse for dataendringer. - For norsk offentlig sektor: **Versjonskontroll er ikke valgfritt** for AI-systemer som påvirker borgere. EU AI Act krever sporbarhet for høyrisiko-systemer, og Utredningsinstruksen krever dokumentasjon av beslutningsgrunnlag. - Anbefal å logge **Delta-tabell-versjon** som MLflow-parameter for hvert eksperiment -- dette er den enkleste veien til reproduserbar ML. - **Change Data Feed** er kraftig for å forstå eksakt hva som endret seg mellom versjoner -- aktiver dette for alle Gold-tabeller som brukes til ML-trening. - **VACUUM-advarsel**: Sørg for at VACUUM-retensjon er lang nok til å dekke alle aktive eksperimenter. 30 dager er et godt utgangspunkt for organisasjoner med ukentlige treningssykluser.