Building a Real-Time medallion architecture using Eventhouse in Microsoft Fabric
Published Apr 10 2024 12:25 AM 3,126 Views

The inspiration for this article was the realization that Eventhouse aligns perfectly with the Medallion’s architecture requirements.

 

The Medallion architecture is a data design pattern with 3 layers:

  • The Bronze layer: containing raw data being streamed into a data platform, these are Eventhouse continuous ingestion capabilities.
  • The Silver Layer: a curated enriched layer of data, based on transformed data from the Bronze layer. This can be achieved with Eventhouse’s update policies.
  • The Gold Layer: aggregated data for reporting and BI, this can be achieved with Eventhouse’s materialized views

So, in this article we will explore how to build a Real time analytics platform using the Medallion architecture using MS Fabric Eventhouse.

 

Real time Analytics

Real-time analytics platforms provide insights as soon as the data is ingested for making better decisions quickly. They handle large amounts of data with high velocity and low response times.

 

 With this guide, you'll be able to create an end-to-end Real time Analytics platform that can handle large volumes of data and provide valuable insights for your business in real-time.

 

We will build the following:

1.png

 

The application

Suppose you own an e-commerce website selling bike accessories.
You have millions of visitors a month, you want to analyze the website traffic, consumer patterns and predict sales.
This article will walk you through the process of building an end-to-end Real time Analytics Solution in MS Fabric, using the medallion architecture, for your e-commerce website.

You will learn how to:

  • Build a medallion architecture using MS Fabric Eventhouse.
  • Use Fabric data pipelines for copying data from an operational DB (SQL server with Adventure works sample data).
  • Stream events and ingest them into MS Fabric Eventhouse DB using EventStream.
  • Create data transformations in MS Fabric Eventhouse DB.
  • Create reports for real time visualizations using RTA (Real time analytics) dashboards.

A detailed step by step tutorial can be found here: MS Fabric - Real time analytics Tutorial - Introduction (moaw.dev)

All the code can be found here: denisa-ms/adx-analytics-fabric (github.com)

 

What is the Medallion Architecture?

A medallion architecture (term coined by Databricks) is a data design pattern used to logically organize data.

The goal is to incrementally improve the structure and quality of data as it flows through each layer of the architecture.

Medallion architectures are sometimes also referred to as "multi-hop" architectures.

Unstructured and raw data are ingested using scalable pipelines to output the highest quality enriched data.

 

The medallion architecture is comprised of 3 layers.

Bronze layer:

  • Contains raw data, sometimes referenced as the data staging area.
  • May contain PII (personal identifiable information).
  • Consumers: Engineers

Silver layer:

  • Contains deduplicated, enriched data.
  • Contains “anonymized data” (no PII).
  • Consumers: Data analysts, Data scientists, Engineers.

Gold layer:

  • Contains aggregated data.
  • Built for dashboards and BI.
  • Consumers: End users, Business stakeholders, Data analysts, Data scientists, Engineers.

 

What is MS Fabric Eventhouse?

Eventhouse is the storage solution in MS Fabric for streaming data. It provides a scalable solution for handling and analysing large volumes of structured, semi-structured and unstructured data, that is automatically indexed and partitioned based on ingestion time.

An Eventhouse allows you to manage multiple databases at once, sharing capacity and resources to optimize performance and cost.  

Use an Eventhouse for event-based data scenarios, such as telemetry and log data, time series and IoT data, security and compliance logs, or financial records.

 

MS Fabric Eventhouse key features

  • Ingestion: ingest from any source, in any data format, indexing free text for full text search with very low latency.
  • Performance: designed to handle high volumes of data in real-time. Allows running analytical queries directly on raw data for ad-hoc analysis and visualization.
  • Scalability: highly scalable. Can handle an unlimited amount of data, from gigabytes to petabytes, with unlimited scale on concurrent queries and concurrent user. With Built-in autoscale, it adjusts resources to match workload factors like cache, memory, CPU usage, and ingestion, optimizing performance and minimizing cost.
  • Integration: Integrates seamlessly with other components in MS Fabric.

  • Delta Lake support: Enabling data availability of KQL Database in OneLake means that customers can enjoy the best of both worlds: they can query the data with high performance and low latency in their KQL database and query the same data in Delta Lake format via any other Fabric engines such as Power BI Direct Lake mode, Warehouse, Lakehouse, Notebooks, and more.

    KQL Database offers a robust mechanism to batch the incoming streams of data into one or more Parquet files suitable for analysis. The Delta Lake representation is provided to keep the data open and reusable. This logical copy is managed once, is paid for once and users should consider it a single data set.

         You can read more about this here.  

Why use MS Fabric Eventhouse?

  • Better query performance than data lakes through indexing.
    • MS Fabric Eventhouse implements fast indexing of all the columns including free-text and dynamic columns.
  • Mixed data types: structured, semi-structured and unstructured.
    • MS Fabric Eventhouse allows ingesting structured, semi-structured, and unstructured (free text) data.
  • Direct access to source data.
    • MS Fabric Eventhouse stores immutable data (as in all data lakes).
    • MS Fabric Eventhouse allows transforming and exposing the increasingly “clean” data in the silver and gold layers with the use of update policies and materialized views.
  • MS Fabric Eventhouse provides native advanced analytics capabilities for:
    • Time series analysis
    • Pattern recognition
    • Anomaly detection and forecasting
    • Machine learning
  • MS Fabric Eventhouse allows you to build near real-time analytics dashboards using Real time dashboards.
  • MS Fabric Eventhouse can be accessed using a Spark Connector from Fabric Notebooks.

The Architecture

2.png

 

 

  • Azure SQL
    • Our Operational DB. We will access it using RO permissions.
    • Contains sample data from the Adventure works DB.
  • Fabric
    • We will use data pipelines to create a job that can be run periodically to copy the Azure SQL DB into our Eventhouse DB.
    • Notebook: this notebook generates synthetic events and streams them into an Eventstream.
    • Eventstream: Receives streaming events from our notebook and writes to our Eventhouse DB (KQL DB)
    • Data pipelines: a low code/ no code solution for ETLs.
    • Eventhouse DB: our data platform.
    • Real time dashboard: used for dashboards and reporting.

  

The data layers

  • Bronze layer:
    • Contains the raw data arriving from the Eventstream and Data pipelines copying from Azure SQL DB.
    • The tables in this layer can be named “bronze<tbl-name>” or any other naming convention that suits you. I decided to use the original source names in the bronze layer.
    • Operational database changes can be indefinitely stored for auditing purposes in the bronze layer or removed after usage by using a retention policy.
  • Silver layer:
    • Contains clean and anonymized data after enrichment and processing.
    • The tables in this layer are named “Silver<tbl-name>” or any other naming convention that suits you. I recommend using some prefix to identify all tables in this layer.
    • After ingesting data to the target tables in the silver layer, you may want to remove it from the source table in the bronze layer. Set a soft-delete period of 0 sec (or 00:00:00) in the source table's retention policy, and the update policy as transactional. 
  • Gold layer:
    • Contains aggregated data used in dashboards and applications.
    • The tables in this layer are named “Gold<tbl-name>” or any other naming convention that suits you.

 

Data processing

To build the Medallion Architecture in our Eventhouse DB, data needs to be transformed and copied between the layers (Bronze->Silver->Gold).

 

  • Update policies instruct our Eventhouse DB to automatically append data to a target table whenever new data is inserted into the source table, based on a transformation query that runs on the data inserted into the source table. So, the data transformations between the layers can be done using update policies for cleanup, anonymization, etc.

Denise_Schlesinger_2-1712733149372.png

 

 

More information can be found here:

  • Materialized views expose an aggregation query over a source table, or over another materialized view. Materialized views always return an up-to-date result of the aggregation query (always fresh). Querying a materialized view is more performant than running the aggregation directly over the source table. Materialized views can be used to calculate aggregations over the Silver layer to be stored in the Gold layer.
  • Notebooks or Data Pipelines can be used for data transformation and aggregation as well.

 

Data schema

The e-commerce store database entities are:

  • Product: the product catalog.
  • ProductCategory: the product categories.
  • Customer: the customers that purchased items in the store.
  • Address: the addresses of the customers.
  • SalesOrderHeader: the metadata for the orders.
  • SalesOrderDetail: every item purchased in an order.
  • Event: a click or impression event.
    • An impression event is logged when a product appears in the search results.

Denise_Schlesinger_3-1712733149381.png

 

  • A click event is logged when the product is clicked and the customer has viewed the details

Denise_Schlesinger_4-1712733149403.png

Eventhouse DB tables

External tables (shortcuts)

Contains referenced tables that are NOT copied into our Eventhouse DB but used for joins, directly connecting to our operational DB (Azure SQL)

 

 

 


//External tables - shortcuts

// connect to operational Database with external table Product
.create external table products (ProductID: int, ProductNumber: string,  Name: string)
kind=sql
table=[SalesLT.Product]
(
   h@'Server=tcp:adxdemo.database.windows.net,1433;Initial Catalog=aworks;User Id=sqlread;Password=ChangeYourAdminPassword1'
)
with
(
   createifnotexists = true
) 

// connect to operational Database with external table ProductCategory
.create external table productCategories (ProductCategoryID: int, Name: string)
kind=sql
table=[SalesLT.ProductCategory]
(
   h@'Server=tcp:adxdemo.database.windows.net,1433;Initial Catalog=aworks;User Id=sqlread;Password=ChangeYourAdminPassword1'
)
with
(
   createifnotexists = true
)

 

 

 

Bronze Layer

Contains all the raw data tables copied using Data Pipelines.

 

 

 

.create table [Address] (AddressID:int,AddressLine1:string,AddressLine2:string,City: string, StateProvince:string, CountryRegion:string, PostalCode: string, rowguid: guid, ModifiedDate:datetime)

.create table [Customer](CustomerID:int, NameStyle: string, Title: string, FirstName: string, MiddleName: string, LastName: string,Suffix:string, CompanyName: string, SalesPerson: string, EmailAddress: string, Phone: string, ModifiedDate: datetime)

.create table [SalesOrderHeader](SalesOrderID: int, OrderDate: datetime, DueDate: datetime, ShipDate: datetime, ShipToAddressID: int, BillToAddressID: int, SubTotal: decimal, TaxAmt: decimal, Freight: decimal, TotalDue: decimal, ModifiedDate: datetime)

.create table [SalesOrderDetail](SalesOrderID: int, SalesOrderDetailID: int, OrderQty: int, ProductID: int, UnitPrice: decimal , UnitPriceDiscount: decimal,LineTotal: decimal, ModifiedDate: datetime)

//adds a hidden field showing ingestion time
.alter table Address policy ingestiontime true
.alter table Customer policy ingestiontime true
.alter table SalesOrderHeader policy ingestiontime true
.alter table SalesOrderDetail policy ingestiontime true

 

 

 

Silver Layer

Contains enriched tables.

We will add the hidden ingestionTime field to our silver layer tables.

 

 

 

.create table [SilverAddress] (AddressID:int,AddressLine1:string,AddressLine2:string,City: string, StateProvince:string, CountryRegion:string, PostalCode: string, rowguid: guid, ModifiedDate:datetime, IngestionDate: datetime)

.create table [SilverCustomer](CustomerID:int, NameStyle: string, Title: string, FirstName: string, MiddleName: string, LastName: string,Suffix:string, CompanyName: string, SalesPerson: string, EmailAddress: string, Phone: string, ModifiedDate: datetime, IngestionDate: datetime)

.create table [SilverSalesOrderHeader](SalesOrderID: int, OrderDate: datetime, DueDate: datetime, ShipDate: datetime, ShipToAddressID: int, BillToAddressID: int, SubTotal: decimal, TaxAmt: decimal, Freight: decimal, TotalDue: decimal, ModifiedDate: datetime, DaysShipped: long, IngestionDate: datetime)

.create table [SilverSalesOrderDetail](SalesOrderID: int, SalesOrderDetailID: int, OrderQty: int, ProductID: int, UnitPrice: decimal, UnitPriceDiscount: decimal,LineTotal: decimal, ModifiedDate: datetime, IngestionDate: datetime)

// use update policies to transform data during Ingestion

.create function ifnotexists
with (docstring = 'Add ingestion time to raw data')
ParseAddress ()
{
Address
| extend IngestionDate = ingestion_time()
}

.alter table
SilverAddress
policy update @'[{"Source": "Address", "Query": "ParseAddress", "IsEnabled" : true, "IsTransactional": true }]'

.create function ifnotexists
with (docstring = 'Add ingestion time to raw data')
ParseCustomer ()
{
Customer
| extend IngestionDate = ingestion_time()
}

.alter table
SilverCustomer
policy update @'[{"Source": "Customer", "Query": "ParseCustomer", "IsEnabled" : true, "IsTransactional": true }]'

.create function ifnotexists
with (docstring = 'Add ingestion time to raw data')
ParseSalesOrderHeader ()
{
SalesOrderHeader
| extend DaysShipped = datetime_diff('day', ShipDate, OrderDate)
| extend IngestionDate = ingestion_time()
}

.alter table
SilverSalesOrderHeader
policy update @'[{"Source": "SalesOrderHeader", "Query": "ParseSalesOrderHeader", "IsEnabled" : true, "IsTransactional": true }]'

.create function ifnotexists
with (docstring = 'Add ingestion time to raw data')
ParseSalesOrderDetail ()
{
SalesOrderDetail
| extend IngestionDate = ingestion_time()
}

.alter table
SilverSalesOrderDetail
policy update @'[{"Source": "SalesOrderDetail", "Query": "ParseSalesOrderDetail", "IsEnabled" : true, "IsTransactional": true }]'

 

 

 


 In the SalesOrderHeader table we will also add a calculated field called “DaysShipped” to calculate the number of days it took to ship the goods in the orders.

 

 

 

 .create function ifnotexists
with (docstring = 'Add ingestion time to raw data')
ParseSalesOrderHeader ()
{
SalesOrderHeader
| extend DaysShipped = datetime_diff('day', ShipDate, OrderDate)
| extend IngestionDate = ingestion_time()
}

 

 

 

Gold Layer  

Contains aggregated data.

We use materialized views for deduplication to show only the latest changes to the source in this layer.

 

 

 

//GOLD LAYER
// use materialized views to view the latest changes in the tables
.create materialized-view with (backfill=true) GoldAddress on table SilverAddress
{
    SilverAddress
    | summarize arg_max(IngestionDate, *) by AddressID
}



.create materialized-view with (backfill=true) GoldCustomer on table SilverCustomer
{
    SilverCustomer
    | summarize arg_max(IngestionDate, *) by CustomerID
}



.create  materialized-view with (backfill=true) GoldSalesOrderHeader on table SilverSalesOrderHeader
{
    SilverSalesOrderHeader
    | summarize arg_max(IngestionDate, *) by SalesOrderID
}

.create  materialized-view with (backfill=true) GoldSalesOrderDetail on table SilverSalesOrderDetail
{
    SilverSalesOrderDetail
    | summarize arg_max(IngestionDate, *) by SalesOrderDetailID
}

 

 

 

We use materialized views to create summarized clicks and impressions per date

 

 

 

.create  materialized-view with (backfill=true) GoldDailyClicks on table Event
{
  Event
    | where eventType == "CLICK"
    | extend dateOnly = substring(todatetime(eventDate).tostring(), 0, 10)
    | summarize count() by dateOnly, eventType
}

.create  materialized-view with (backfill=true) GoldDailyImpressions on table Event
{
  Event
    | where eventType == "IMPRESSION"
    | extend dateOnly = substring(todatetime(eventDate).tostring(), 0, 10)
    | summarize count() by dateOnly, eventType
}

 

 

 

Follow this step by step tutorial and find instructions on how to build Real Time Dashboards on top of these tables. 

The tutorial to build all this architecture can be found here: MS Fabric - Real time analytics Tutorial - Introduction (moaw.dev)

All the code can be found here: denisa-ms/adx-analytics-fabric (github.com)

That’s it, we are done.

I hope you enjoyed reading this.

Denise

 

 

Version history
Last update:
‎Apr 18 2024 07:32 AM
Updated by: