Friday, February 27, 2026

Processing Nested JSON with PySpark in Microsoft Fabric

I have been working on a project in Microsoft Fabric, where I am calling an API using a pipeline and getting JSON files into the Files section of the Lakehouse. These JSON files track every insert, update, and delete that happens in the source database.

In this post, I will walk through this real-world example step by step. The pipeline copies these JSON files sitting into the Lakehouse Files folder, each containing nested CDC data. 

Our goal was to:

Read each JSON file using PySpark inside a Fabric Notebook

Navigate the nested structure to extract column names and their values

Pivot those values so each column name becomes its own table column

Add metadata columns (timestamp, start_lsn and operation type)

Save the result as a Delta table — automatically named after the source file

Loop through all files in the folder so the whole process is automated


By the end of this post you will have a fully parameterised, reusable notebook that you can trigger from a Fabric Pipeline.

Understanding the Source Data

Before we write any code, it helps to understand what our data looks like. Each JSON file has a structure like this:


 {
  "data": {
    "currentItemCount": 20,
    "items": [
      {
        "changes": [
          {
            "__$operation": 3,
            "data": [
              { "columnName": "PKID",  "value": "3834523" },
              { "columnName": "Name", "value": "John Doe" }
            ]
          }
        ]
      }
    ]
  }
}

When CDC files are involved the key things to notice are: 
  • data.items — an array of change sets 
  • items[].changes — an array of individual changes, each with an operation code 
  •  __$operation — a number representing the CDC operation (1=Delete, 2=Insert, 3=Update Before, 4=Update After) 
  • changes[].data — an array of key-value pairs where columnName is the field name and value is the data 

Our task is to turn those key-value pairs into proper columns in a Delta table.

Setting Up Your Fabric Notebook

Open a new Notebook in your Fabric workspace and make sure it is connected to your default Lakehouse. The notebook will be organised into clearly labelled cells.

 

💡 Tip: In Fabric Notebooks you can mark a cell as a Parameters cell by clicking the three-dot menu (...) on the cell and selecting Toggle Parameter Cell. This lets a Pipeline pass in values at runtime without changing your code.

 

Cell 1: Parameters

This cell defines the folder path where your JSON files are stored. Mark this as a Parameters cell so a Pipeline can override it.

 

# Default parameter - override this from a Fabric Pipeline

source_folder = "Files/your_folder_name"

 

💡 Tip: The source_folder value is a relative path inside the Files section of your Lakehouse. Right-click the folder in your Lakehouse explorer and choose Properties to confirm the exact path.

 

Cell 2: Imports

We need a small set of PySpark functions. Import them all upfront to keep things tidy.

 

from pyspark.sql.functions import (

    col, explode, monotonically_increasing_id,

    current_timestamp, when

)

import os


Cell 3: List the JSON Files

We use mssparkutils — a utility built into Fabric Notebooks — to list all the files in our folder. We then filter to only JSON files.

 

# List all files in the source folder

files = mssparkutils.fs.ls(f'/{source_folder}')

 

# Keep only .json files

json_files = [f for f in files if f.name.endswith('.json')]

 

print(f'Found {len(json_files)} JSON files:')

for f in json_files:

    print(f'  {f.name}')

 

💡 Tip: mssparkutils.fs.ls() returns a list of file objects. Each object has a .name property (just the filename) and a .path property (the full path you need for spark.read).

 

Cell 4: Operation Code Helper Function

The __$operation field in CDC data is a number. This helper function maps those numbers to human-readable labels, which makes the final table much easier to understand.

 

def map_operation(df_in):

    return df_in.withColumn('operation',

        when(col('operation') == 1, 'Delete')

       .when(col('operation') == 2, 'Insert')

       .when(col('operation') == 3, 'Update Before')

       .when(col('operation') == 4, 'Update After')

       .otherwise('Unknown')

    )

 

Cell 5: The Main Loop

This is the heart of the notebook. For each JSON file --

       Derive the Delta table name from the filename

       Read the JSON file into a Spark DataFrame

       Explode the nested arrays step by step

       Pivot the key-value pairs into wide-format columns

       Add inserted_date and operation columns

       Append the result to a Delta table

 

for file in json_files:

    # Derive table name from filename (strip .json extension)

    table_name = os.path.splitext(file.name)[0]

 

    # Sanitise: replace hyphens and spaces with underscores

    table_name_clean = table_name.replace('-', '_').replace(' ', '_')

 

    print(f'\nProcessing: {file.name} -> Table: {table_name_clean}')

 

    try:

        # Step 1: Read the JSON file

        df = spark.read.option('multiline', 'true').json(file.path)

 

        # Step 2: Explode the items array

        df_items = df.select(explode(col('data.items')).alias('item'))

 

        # Step 3: Explode changes, then select __$operation

        #         Note: backticks are required around __$operation

        #         because the $ sign is a special character in Spark

        df_changes = df_items.select(

            explode(col('item.changes')).alias('change')

        ).select(

            monotonically_increasing_id().alias('change_id'),

            col('change.`__$operation`').alias('operation'),

            col('change.data').alias('change_data')

        )

 

        # Step 4: Explode the data array (columnName + value pairs)

        df_data = df_changes.select(

            col('change_id'),

            col('operation'),

            explode(col('change_data')).alias('record')

        )

 

        # Step 5: Pull out the columnName and value fields

        df_kv = df_data.select(

            col('change_id'),

            col('operation'),

            col('record.columnName').alias('column_name'),

            col('record.value').alias('value')

        )

 

        # Step 6: Pivot so each unique columnName becomes its own column

        df_wide = df_kv.groupBy('change_id', 'operation') \

                       .pivot('column_name') \

                       .agg({'value': 'first'})

 

        # Step 7: Add metadata columns and drop the surrogate key

        df_final = df_wide \

            .withColumn('inserted_date', current_timestamp()) \

            .drop('change_id')

 

        # Step 8: Map operation codes to readable labels

        df_final = map_operation(df_final)

 

        # Step 9: Append to Delta table named after the file

        df_final.write \

            .format('delta') \

            .mode('append') \

            .option('mergeSchema', 'true') \

            .saveAsTable(table_name_clean)

 

        row_count = df_final.count()

        print(f'  Done: {row_count} rows appended to {table_name_clean}')

 

    except Exception as e:

        print(f'  ERROR processing {file.name}: {str(e)}')

        continue

 

print('\nAll files processed.')

 

Understanding Each Step in Detail

If you are new to PySpark, the explode and pivot steps can look a little daunting. Here is a plain-English explanation of what is happening at each stage.

 

What does explode() do?

When your data has an array (a list of items inside a field), explode() turns each element of that array into its own separate row. Think of it like unzipping a zip file — each item gets its own space.

In the data used, there is a need to call explode() three times because there were arrays nested inside arrays:

       First on data.items to get one row per change set

       Then on item.changes to get one row per individual change

       Then on change.data to get one row per column name/value pair

 

What does pivot() do?

After exploding, our data is in a long format — each row has a column_name and a value. The pivot() function rotates these rows so that each unique column_name becomes its own column and the value moves into the corresponding cell. This is exactly the wide format you would expect in a database table.

 

Why to use backticks around __$operation?

The __$operation field name contains special characters (underscores and a dollar sign). Spark's column expression parser would misinterpret these, so wrap the name in backtick characters to tell Spark to treat the whole string as a single column name.

 

What is mergeSchema?

When appending to a Delta table, if a new JSON file introduces a new column that did not exist before, Spark would normally throw an error. The mergeSchema option tells Delta to automatically add the new column to the table schema rather than failing. This makes the notebook robust to schema changes in your source data.

 

Tips and lessons learnt

 

⚠️ 1: Do not call from_json() on a column that is already a struct. When Spark reads a JSON file, nested objects are automatically parsed into structs. Calling from_json() on a struct causes an AnalysisException because it expects a string. Navigate the struct directly using dot notation like col('data.items').

 

⚠️ 2: You cannot reference a column created by explode() in the same select() call. Always chain a second .select() after the explode so that the new column is fully resolved before you try to access its fields.

 

⚠️ 3: Column names with special characters like $ or spaces must be wrapped in backticks inside col() expressions. Without backticks, Spark will fail to parse the column name and throw an UNRESOLVED_COLUMN error.

 

⚠️ 4: Use mode('append') with option('mergeSchema', 'true') — not option('overwriteSchema', 'true'). overwriteSchema only applies when you are using overwrite mode. Using it with append mode will cause an error.

 

💡 5: Use os.path.splitext(file.name)[0] to cleanly strip the .json extension from the filename. This is more reliable than using .replace('.json', '') because splitext handles edge cases like filenames that contain the word json elsewhere.

 

Triggering from a Fabric Pipeline

Because the source_folder cell is marked as a Parameters cell, this notebook can be called from a Fabric Data Pipeline. In the Pipeline, add a Notebook activity and under the Settings tab you will find a Base parameters section where you can pass in the folder path dynamically.

 

Parameter Name : source_folder

Value          : Files/cdc_data/2024-01-15

 

This means you can have a Pipeline that processes a different date folder every day simply by passing the folder name as a parameter — no code changes needed.

 

Summary

In this post I have explained a complete, beginner-friendly solution for processing nested CDC JSON files in Microsoft Fabric. Here is a quick recap of what is covered:

       How to use mssparkutils.fs.ls() to list files in your Lakehouse

       How to navigate nested JSON structures using dot notation and explode()

       Why backticks are needed for column names with special characters

       How to pivot key-value pairs into a wide table format

       How to add inserted_date and a human-readable operation label

       How to append to Delta tables with automatic schema evolution using mergeSchema

       How to parameterise the notebook so it works with any folder from a Pipeline

 

The full notebook is reusable — just drop new JSON files into the folder and run the Pipeline again. Each file gets its own Delta table named after the source file, making it easy to trace data back to its origin.



No comments:

Processing Nested JSON with PySpark in Microsoft Fabric

I have been working on a project in Microsoft Fabric, where I am calling an API using a pipeline and getting JSON files into the Files secti...