I have been working on a project in Microsoft Fabric, where I am calling an API using a pipeline and getting JSON files into the Files section of the Lakehouse. These JSON files track every insert, update, and delete that happens in the source database.
In this post, I will walk through this real-world example step by step. The pipeline copies these JSON files sitting into the Lakehouse Files folder, each containing nested CDC data.
Our goal was to:
• Read each JSON file using PySpark inside a Fabric Notebook
• Navigate the nested structure to extract column names and their values
• Pivot those values so each column name becomes its own table column
• Add metadata columns (timestamp, start_lsn and operation type)
• Save the result as a Delta table — automatically named after the source file
• Loop through all files in the folder so the whole process is automated
By the end of this post you will have a fully parameterised, reusable notebook that you can trigger from a Fabric Pipeline.
Understanding the Source Data
Before we write any code, it helps to understand what our data looks like. Each JSON file has a structure like this:
{
"data": {
"currentItemCount": 20,
"items": [
{
"changes": [
{
"__$operation": 3,
"data": [
{ "columnName": "PKID", "value": "3834523" },
{ "columnName": "Name", "value": "John Doe" }
]
}
]
}
]
}
}
When CDC files are involved the key things to notice are: - data.items — an array of change sets
- items[].changes — an array of individual changes, each with an operation code
- __$operation — a number representing the CDC operation (1=Delete, 2=Insert, 3=Update Before, 4=Update After)
- changes[].data — an array of key-value pairs where columnName is the field name and value is the data
Setting Up Your Fabric Notebook
Open a new Notebook in your Fabric workspace and make sure it
is connected to your default Lakehouse. The notebook will be organised into
clearly labelled cells.
|
💡 Tip: In Fabric Notebooks you can mark a cell as a Parameters
cell by clicking the three-dot menu (...) on the cell and selecting Toggle
Parameter Cell. This lets a Pipeline pass in values at runtime without
changing your code. |
Cell 1: Parameters
This cell defines the folder path where your JSON files are
stored. Mark this as a Parameters cell so a Pipeline can override it.
# Default parameter - override this from a Fabric Pipeline
source_folder = "Files/your_folder_name"
|
💡 Tip: The source_folder value is a relative path inside the
Files section of your Lakehouse. Right-click the folder in your Lakehouse
explorer and choose Properties to confirm the exact path. |
Cell 2: Imports
We need a small set of PySpark functions. Import them all upfront to keep things tidy.
from pyspark.sql.functions import (
col, explode, monotonically_increasing_id,
current_timestamp, when
)
import os
Cell 3: List the JSON Files
We use mssparkutils — a utility built into Fabric Notebooks —
to list all the files in our folder. We then filter to only JSON files.
# List all files in the source folder
files =
mssparkutils.fs.ls(f'/{source_folder}')
# Keep only .json files
json_files = [f for f in
files if f.name.endswith('.json')]
print(f'Found
{len(json_files)} JSON files:')
for f in json_files:
print(f' {f.name}')
|
💡 Tip: mssparkutils.fs.ls() returns a list of file objects.
Each object has a .name property (just the filename) and a .path property
(the full path you need for spark.read). |
Cell 4: Operation Code Helper Function
The __$operation field in CDC data is a number. This helper
function maps those numbers to human-readable labels, which makes the final
table much easier to understand.
def map_operation(df_in):
return df_in.withColumn('operation',
when(col('operation') == 1, 'Delete')
.when(col('operation') == 2, 'Insert')
.when(col('operation') == 3, 'Update
Before')
.when(col('operation') == 4, 'Update
After')
.otherwise('Unknown')
)
Cell 5: The Main Loop
This is the heart of the notebook. For each JSON file --
•
Derive the Delta table name from the filename
•
Read the JSON file into a Spark DataFrame
•
Explode the nested arrays step by step
•
Pivot the key-value pairs into wide-format columns
•
Add inserted_date and operation columns
•
Append the result to a Delta table
for file in json_files:
# Derive table name from filename (strip
.json extension)
table_name = os.path.splitext(file.name)[0]
# Sanitise: replace hyphens and spaces with
underscores
table_name_clean = table_name.replace('-',
'_').replace(' ', '_')
print(f'\nProcessing: {file.name} ->
Table: {table_name_clean}')
try:
# Step 1: Read the JSON file
df = spark.read.option('multiline',
'true').json(file.path)
# Step 2: Explode the items array
df_items =
df.select(explode(col('data.items')).alias('item'))
# Step 3: Explode changes, then select
__$operation
# Note: backticks are required around
__$operation
# because the $ sign is a special
character in Spark
df_changes = df_items.select(
explode(col('item.changes')).alias('change')
).select(
monotonically_increasing_id().alias('change_id'),
col('change.`__$operation`').alias('operation'),
col('change.data').alias('change_data')
)
# Step 4: Explode the data array
(columnName + value pairs)
df_data = df_changes.select(
col('change_id'),
col('operation'),
explode(col('change_data')).alias('record')
)
# Step 5: Pull out the columnName and
value fields
df_kv = df_data.select(
col('change_id'),
col('operation'),
col('record.columnName').alias('column_name'),
col('record.value').alias('value')
)
# Step 6: Pivot so each unique
columnName becomes its own column
df_wide = df_kv.groupBy('change_id',
'operation') \
.pivot('column_name') \
.agg({'value': 'first'})
# Step 7: Add metadata columns and drop
the surrogate key
df_final = df_wide \
.withColumn('inserted_date',
current_timestamp()) \
.drop('change_id')
# Step 8: Map operation codes to
readable labels
df_final = map_operation(df_final)
# Step 9: Append to Delta table named
after the file
df_final.write \
.format('delta') \
.mode('append') \
.option('mergeSchema', 'true') \
.saveAsTable(table_name_clean)
row_count = df_final.count()
print(f' Done: {row_count} rows appended to
{table_name_clean}')
except Exception as e:
print(f' ERROR processing {file.name}: {str(e)}')
continue
print('\nAll files processed.')
Understanding Each Step in Detail
If you are new to PySpark, the explode and pivot steps can
look a little daunting. Here is a plain-English explanation of what is
happening at each stage.
What does explode() do?
When your data has an array (a list of items inside a field),
explode() turns each element of that array into its own separate row. Think of
it like unzipping a zip file — each item gets its own space.
In the data used, there is a need to call explode() three times because there were arrays nested inside arrays:
•
First on data.items to get one row per change set
•
Then on item.changes to get one row per individual
change
•
Then on change.data to get one row per column
name/value pair
What does pivot() do?
After exploding, our data is in a long format — each row has a
column_name and a value. The pivot() function rotates these rows so that each
unique column_name becomes its own column and the value moves into the
corresponding cell. This is exactly the wide format you would expect in a
database table.
Why to use backticks around __$operation?
The __$operation field name contains special characters
(underscores and a dollar sign). Spark's column expression parser would
misinterpret these, so wrap the name in backtick characters to tell Spark to
treat the whole string as a single column name.
What is mergeSchema?
When appending to a Delta table, if a new JSON file introduces
a new column that did not exist before, Spark would normally throw an error.
The mergeSchema option tells Delta to automatically add the new column to the
table schema rather than failing. This makes the notebook robust to schema
changes in your source data.
Tips and lessons learnt
|
⚠️ 1: Do not call from_json() on a column that is already a
struct. When Spark reads a JSON file, nested objects are automatically parsed
into structs. Calling from_json() on a struct causes an AnalysisException
because it expects a string. Navigate the struct directly using dot notation
like col('data.items'). |
|
⚠️ 2: You cannot reference a column created by explode() in
the same select() call. Always chain a second .select() after the explode so
that the new column is fully resolved before you try to access its fields. |
|
⚠️ 3: Column names with special characters like $ or spaces
must be wrapped in backticks inside col() expressions. Without backticks,
Spark will fail to parse the column name and throw an UNRESOLVED_COLUMN
error. |
|
⚠️ 4: Use mode('append') with option('mergeSchema', 'true') —
not option('overwriteSchema', 'true'). overwriteSchema only applies when you
are using overwrite mode. Using it with append mode will cause an error. |
|
💡 5: Use os.path.splitext(file.name)[0] to cleanly strip the
.json extension from the filename. This is more reliable than using
.replace('.json', '') because splitext handles edge cases like filenames that
contain the word json elsewhere. |
Triggering from a Fabric Pipeline
Because the source_folder cell is marked as a Parameters cell,
this notebook can be called from a Fabric Data Pipeline. In the Pipeline, add a
Notebook activity and under the Settings tab you will find a Base parameters
section where you can pass in the folder path dynamically.
Parameter Name : source_folder
Value :
Files/cdc_data/2024-01-15
This means you can have a Pipeline that processes a different
date folder every day simply by passing the folder name as a parameter — no
code changes needed.
Summary
In this post I have explained a complete, beginner-friendly
solution for processing nested CDC JSON files in Microsoft Fabric. Here is a
quick recap of what is covered:
•
How to use mssparkutils.fs.ls() to list files in your
Lakehouse
•
How to navigate nested JSON structures using dot
notation and explode()
•
Why backticks are needed for column names with special
characters
•
How to pivot key-value pairs into a wide table format
•
How to add inserted_date and a human-readable operation
label
•
How to append to Delta tables with automatic schema
evolution using mergeSchema
•
How to parameterise the notebook so it works with any
folder from a Pipeline
The full notebook is reusable — just drop new JSON files into
the folder and run the Pipeline again. Each file gets its own Delta table named
after the source file, making it easy to trace data back to its origin.

