ktg-plugin-marketplace/plugins/ms-ai-architect/skills/ms-ai-engineering/references/data-engineering/data-versioning-lineage.md
Kjell Tore Guttormsen 6a7632146e feat(ms-ai-architect): add plugin to open marketplace (v1.5.0 baseline)
Initial addition of ms-ai-architect plugin to the open-source marketplace.
Private content excluded: orchestrator/ (Linear tooling), docs/utredning/
(client investigation), generated test reports and PDF export script.
skill-gen tooling moved from orchestrator/ to scripts/skill-gen/.

Security scan: WARNING (risk 20/100) — no secrets, no injection found.
False positive fixed: added gitleaks:allow to Python variable reference
in output-validation-grounding-verification.md line 109.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-07 17:17:17 +02:00

16 KiB

Data Versioning and Lineage Tracking

Last updated: 2026-02 Status: GA Category: Data Engineering for AI


Introduksjon

Dataversionskontroll og lineage-sporing er grunnleggende kapabiliteter for pålitelige AI-systemer. Versjonskontroll gjør det mulig å reprodusere eksakt de dataene en modell ble trent på, mens lineage dokumenterer hele datareisen fra kilde til ferdig prediksjon. Sammen gir de sporbarhet, reproduserbarhet og tillitsgrunnlag for AI-beslutninger.

For norsk offentlig sektor er dette spesielt viktig gitt kravene i Utredningsinstruksen om etterprøvbarhet, Forvaltningslovens krav til dokumentasjon av vedtak, og EU AI Act sine krav til høyrisiko AI-systemer. En modell som påvirker borgeres rettigheter -- for eksempel NAV-ytelser eller byggetillatelser -- må kunne forklares og dokumenteres fra kilde til prediksjon.

Denne referansen dekker Delta Lake versjonskontroll og tidsreise, commit-historikk og audit trails, lineage-visualisering i Purview og Fabric, avhengighetskartlegging, og strategier for rollback og gjenoppretting.


Delta Lake Versioning and Time-Travel

Versjonskontroll-modell

Delta Lake bruker en Write-Ahead Log (WAL) i _delta_log-mappen som registrerer alle transaksjoner:

Tables/ml_training_data/
├── _delta_log/
│   ├── 00000000000000000000.json   # v0: Initial create (2026-01-01)
│   ├── 00000000000000000001.json   # v1: Append new data (2026-01-15)
│   ├── 00000000000000000002.json   # v2: Feature update (2026-02-01)
│   ├── 00000000000000000003.json   # v3: Delete PII (2026-02-05)
│   └── 00000000000000000004.json   # v4: Schema evolution (2026-02-10)
├── part-00000-*.snappy.parquet
├── part-00001-*.snappy.parquet
└── ...

Versjonsspørringer

from delta.tables import DeltaTable

# Les nåværende versjon
df_current = spark.read.format("delta").table("gold.ml_training_data")

# Les spesifikk versjon
df_v2 = spark.read.format("delta") \
    .option("versionAsOf", 2) \
    .table("gold.ml_training_data")

# Les data slik de var på et tidspunkt
df_jan = spark.read.format("delta") \
    .option("timestampAsOf", "2026-01-15T00:00:00Z") \
    .table("gold.ml_training_data")

# Sammenlign versjoner for å oppdage endringer
from pyspark.sql.functions import col

added_rows = df_v2.subtract(df_v1)   # Rader i v2 som ikke finnes i v1
removed_rows = df_v1.subtract(df_v2)  # Rader i v1 som ikke finnes i v2

print(f"Nye rader: {added_rows.count()}")
print(f"Fjernede rader: {removed_rows.count()}")

Versjonskontroll for ML-eksperimenter

import mlflow

# Logg data-versjon som del av ML-eksperiment
with mlflow.start_run(run_name="churn_model_v3"):
    # Hent Delta-tabell-versjon
    dt = DeltaTable.forPath(spark, "Tables/gold/ml_training_data")
    current_version = dt.history(1).select("version").collect()[0][0]

    # Logg metadata
    mlflow.log_param("data_table", "gold.ml_training_data")
    mlflow.log_param("data_version", current_version)
    mlflow.log_param("data_timestamp", "2026-02-10T00:00:00Z")
    mlflow.log_param("row_count", df_current.count())
    mlflow.log_param("column_count", len(df_current.columns))

    # Tren modell...
    # mlflow.sklearn.log_model(model, "model")

# Senere: Reproduser treningsdata eksakt
# df_reproduced = spark.read.format("delta")
#     .option("versionAsOf", logged_version)
#     .table("gold.ml_training_data")

Commit History and Audit Trails

DESCRIBE HISTORY

-- Vis full transaksjonshistorikk
DESCRIBE HISTORY gold.ml_training_data;

-- Resultat:
-- version | timestamp           | operation  | operationParameters        | operationMetrics
-- 4       | 2026-02-10 14:30:00 | WRITE      | {mode: Append}             | {numFiles: 3, numOutputRows: 15000}
-- 3       | 2026-02-05 09:15:00 | DELETE     | {predicate: [pii_flag=1]}  | {numDeletedRows: 250, numRemovedFiles: 2}
-- 2       | 2026-02-01 02:00:00 | MERGE      | {predicate: ...}           | {numUpdatedRows: 3400, numInsertedRows: 1200}
-- 1       | 2026-01-15 02:00:00 | WRITE      | {mode: Append}             | {numFiles: 5, numOutputRows: 50000}
-- 0       | 2026-01-01 10:00:00 | CREATE     | {partitionBy: [date]}      | {numFiles: 10, numOutputRows: 100000}

PySpark History API

from delta.tables import DeltaTable

dt = DeltaTable.forPath(spark, "Tables/gold/ml_training_data")

# Hent historikk
history = dt.history()

# Detaljert analyse av endringer
display(
    history.select(
        "version",
        "timestamp",
        "operation",
        "operationParameters",
        "operationMetrics",
        "userName",
        "notebook.notebookId"
    ).orderBy("version", ascending=False)
)

# Filtrer på spesifikke operasjoner
deletes = history.filter("operation = 'DELETE'")
merges = history.filter("operation = 'MERGE'")

Custom Audit Logging

from pyspark.sql import functions as F
from datetime import datetime

def log_data_operation(operation, table_name, details, user="system"):
    """Logg datapipelineoperasjoner til audit-tabell."""
    audit_record = spark.createDataFrame([{
        "timestamp": datetime.utcnow().isoformat(),
        "operation": operation,
        "table_name": table_name,
        "user": user,
        "details": str(details),
        "pipeline_run_id": spark.conf.get("spark.pipeline.runId", "interactive")
    }])

    audit_record.write.format("delta") \
        .mode("append") \
        .saveAsTable("governance.data_audit_log")

# Bruk i pipeline
log_data_operation(
    operation="FEATURE_UPDATE",
    table_name="gold.ml_training_data",
    details={
        "source_version": 3,
        "target_version": 4,
        "rows_added": 15000,
        "rows_updated": 3400,
        "features_modified": ["txn_7d_count", "income_band"]
    }
)

Data Lineage Visualization in Purview

Lineage-kilder i Purview

Purview fanger automatisk lineage fra flere kilder:

Dataprosesseringssystem Lineage-omfang
Azure Data Factory Copy Activity, Data Flow, SSIS
Fabric Data Factory Pipelines, Dataflow Gen2
Fabric Notebooks Lakehouse → Lakehouse (item-level)
Azure Synapse Analytics Copy Activity, Data Flow
Power BI Semantic Model → Report → Dashboard

Lineage-visning i Fabric

Lineage-visning for en ML-pipeline:

Azure SQL DB           Lakehouse (Bronze)        Lakehouse (Silver)
┌──────────┐          ┌──────────────┐          ┌──────────────┐
│ customers│──Copy──>│ raw_customers│──Notebook>│ customer_360 │
└──────────┘          └──────────────┘          └──────┬───────┘
                                                       │
Blob Storage           Lakehouse (Bronze)               │
┌──────────┐          ┌──────────────┐                  │ Notebook
│ events   │──Copy──>│ raw_events   │──Notebook──>─────┘
└──────────┘          └──────────────┘          │
                                                ▼
                                        ┌──────────────┐
                                        │ Gold:        │
                                        │ ml_features  │
                                        └──────┬───────┘
                                               │
                                        ┌──────▼───────┐
                                        │ ML Experiment│
                                        │ (MLflow)     │
                                        └──────┬───────┘
                                               │
                                        ┌──────▼───────┐
                                        │ Power BI     │
                                        │ Dashboard    │
                                        └──────────────┘

Tilgang til lineage

Lineage i Fabric er tilgjengelig fra:

  1. Workspace toolbar: Velg "Lineage view"
  2. Item options menu: Høyreklikk på element → "View lineage"
  3. Item details page: Under menyelementer øverst
  4. Purview Unified Catalog: Browse → Microsoft Fabric → Fabric Workspaces

Materialized Lake Views med auto-lineage

-- Materialized Lake Views genererer automatisk lineage
CREATE SCHEMA IF NOT EXISTS silver;

CREATE MATERIALIZED LAKE VIEW IF NOT EXISTS silver.customer_features AS
SELECT
    c.customer_id,
    c.name,
    c.region,
    COUNT(t.transaction_id) AS total_transactions,
    SUM(t.amount) AS total_amount,
    AVG(t.amount) AS avg_transaction_amount
FROM bronze.customers c
JOIN bronze.transactions t ON c.customer_id = t.customer_id
GROUP BY c.customer_id, c.name, c.region;

-- Lineage er automatisk sporet:
-- bronze.customers + bronze.transactions → silver.customer_features

Upstream/Downstream Dependency Mapping

Avhengighetsgraf

# Kartlegg avhengigheter programmatisk
dependency_graph = {
    "bronze.raw_customers": {
        "sources": ["Azure SQL DB: customers_table"],
        "consumers": ["silver.customer_360", "silver.customer_features"]
    },
    "silver.customer_360": {
        "sources": ["bronze.raw_customers", "bronze.raw_contacts", "bronze.raw_opportunities"],
        "consumers": ["gold.churn_features", "gold.revenue_predict_features"]
    },
    "gold.churn_features": {
        "sources": ["silver.customer_360", "silver.transaction_features"],
        "consumers": ["ML Experiment: churn_model_v3", "Power BI: Churn Dashboard"]
    }
}

def get_upstream_dependencies(table_name, graph, depth=0, max_depth=5):
    """Rekursivt finn alle oppstrøms avhengigheter."""
    if depth > max_depth or table_name not in graph:
        return []

    sources = graph[table_name].get("sources", [])
    all_upstream = list(sources)

    for source in sources:
        all_upstream.extend(
            get_upstream_dependencies(source, graph, depth + 1, max_depth)
        )

    return all_upstream

def get_downstream_impact(table_name, graph, depth=0, max_depth=5):
    """Finn alle nedstrøms konsumenter (impact-analyse)."""
    if depth > max_depth or table_name not in graph:
        return []

    consumers = graph[table_name].get("consumers", [])
    all_downstream = list(consumers)

    for consumer in consumers:
        all_downstream.extend(
            get_downstream_impact(consumer, graph, depth + 1, max_depth)
        )

    return all_downstream

# Eksempel: Hva påvirkes hvis vi endrer bronze.raw_customers?
impact = get_downstream_impact("bronze.raw_customers", dependency_graph)
print(f"Påvirkede elementer: {impact}")
# ['silver.customer_360', 'silver.customer_features',
#  'gold.churn_features', 'gold.revenue_predict_features',
#  'ML Experiment: churn_model_v3', 'Power BI: Churn Dashboard']

Data Contract Pattern

# Definer data contracts mellom lag
data_contract = {
    "table": "silver.customer_360",
    "version": "2.1",
    "owner": "data-engineering@example.no",
    "sla": {
        "freshness": "24 hours",
        "completeness": "> 99.5%",
        "accuracy": "validated against source"
    },
    "schema": {
        "required_columns": ["customer_id", "name", "region", "total_revenue"],
        "column_types": {
            "customer_id": "string",
            "total_revenue": "double",
            "region": "string"
        },
        "partitioned_by": ["region"],
        "row_count_range": [100000, 500000]
    },
    "consumers": [
        {"team": "ml-team", "usage": "churn prediction features"},
        {"team": "bi-team", "usage": "customer dashboard"}
    ]
}

Rollback and Recovery Strategies

Delta Lake RESTORE

-- Gjenopprett tabell til spesifikk versjon
RESTORE TABLE gold.ml_training_data TO VERSION AS OF 2;

-- Gjenopprett til tidspunkt
RESTORE TABLE gold.ml_training_data TO TIMESTAMP AS OF '2026-02-01T00:00:00Z';
# PySpark RESTORE
dt = DeltaTable.forPath(spark, "Tables/gold/ml_training_data")
dt.restoreToVersion(2)

# Verifiser
print(f"Gjenopprettet til versjon 2")
print(f"Rader: {spark.read.format('delta').table('gold.ml_training_data').count()}")

Recovery-strategier

Scenario Strategi Kommando
Feilaktig DELETE Restore til forrige versjon RESTORE TABLE ... TO VERSION AS OF n-1
Korrupt data lastet Restore til pre-load versjon RESTORE TABLE ... TO TIMESTAMP AS OF '...'
Schema-feil Restore + re-apply korrekt schema Restore + ALTER TABLE
Hel tabell tapt Gjenskape fra kildedata + audit log Kjør pipeline på nytt
VACUUM kjørt for tidlig Ingen recovery mulig! Forebygg: minimum 7d retention

Rollback av ML-eksperiment

import mlflow

def rollback_to_experiment(run_id):
    """Gjenopprett data og modell fra en tidligere MLflow-run."""
    # Hent metadata fra MLflow
    run = mlflow.get_run(run_id)
    data_version = int(run.data.params["data_version"])
    data_table = run.data.params["data_table"]

    # Gjenopprett treningsdata
    df_original = spark.read.format("delta") \
        .option("versionAsOf", data_version) \
        .table(data_table)

    print(f"Gjenopprettet data fra versjon {data_version}")
    print(f"Rader: {df_original.count()}")

    # Last modell
    model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")

    return df_original, model

Forebyggende tiltak

# 1. Sett minimum VACUUM-retensjon
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "true")
# Standard 7 dager = 168 timer

# 2. Aktiver Change Data Feed for sporbar endringshåndtering
spark.sql("""
    ALTER TABLE gold.ml_training_data
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# 3. Les Change Data Feed
changes = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 3) \
    .option("endingVersion", 4) \
    .table("gold.ml_training_data")

# Viser _change_type: insert, update_preimage, update_postimage, delete
display(changes.select("customer_id", "_change_type", "_commit_version", "_commit_timestamp"))

Referanser


For Cosmo

  • Bruk denne referansen når brukeren trenger reproduserbarhet for ML-modeller, audit trail for AI-beslutninger, eller impact-analyse for dataendringer.
  • For norsk offentlig sektor: Versjonskontroll er ikke valgfritt for AI-systemer som påvirker borgere. EU AI Act krever sporbarhet for høyrisiko-systemer, og Utredningsinstruksen krever dokumentasjon av beslutningsgrunnlag.
  • Anbefal å logge Delta-tabell-versjon som MLflow-parameter for hvert eksperiment -- dette er den enkleste veien til reproduserbar ML.
  • Change Data Feed er kraftig for å forstå eksakt hva som endret seg mellom versjoner -- aktiver dette for alle Gold-tabeller som brukes til ML-trening.
  • VACUUM-advarsel: Sørg for at VACUUM-retensjon er lang nok til å dekke alle aktive eksperimenter. 30 dager er et godt utgangspunkt for organisasjoner med ukentlige treningssykluser.