martes, 11 de octubre de 2022

Merge in Synapse Spark Pool using delta lake tables (For example with deltas from SAP)

Merge does requires only row per key to work. Imagine the following scenario:


You are loading at intervals deltas or incremental from an SAP ERP system (ECC; S/4, etc.) related to purchase orders.


In your target table (in delta lake format for example), you already have a this rows:








The "PrimaryKey" field is self-explanatory. Then the other fields are just example fields, but the last field, "RowNumber", is something the extraction tool usually provides; it can be just an integer number that defines a sequence or a timestamp.


Let's understand that field or column. 

The following table depicts a classic delta or incremental load.





It has two kinds of records, the light blue ones, which are new records (new primary keys), and the white ones which are updates to existing records.


K2 and K9 appear twice. One reason for this could be because of the following:

The user changed purchase order K2; for example, he changed one condition of the payment Field1 = "NEWVALUE2".

Then he made a second change; he altered how many units he wanted to order Vfield3 = 22.

Now we can see that we have two records for K2, and we need to pick the last one. In this case, we identify the last one using the PrimaryKey and RowNumber. 



If we try to merge this delta without telling the merge how to pick the last and only one record per primary key, it will fail with the following error:

(This screenshot is from a Synapse Notebook)





We can use a Window Function like row_number to filter the last record for each primary key; in this way, we guarantee that the Merge statement runs successfully.


We can create a view:

%%sql
create temp view T_DeltaPurchaseOrders as
select PrimaryKey, Field1, Field2, Vfield3, row_number() over (PARTITION
BY PrimaryKey ORDER BY deltapurchaseorders.RowNumber desc) AS frank FROM DeltaPurchaseOrders

This view will return the following result:





Then we can add a condition "where frank = 1"

We will get the following result:





As we can see, now we have only one row per primary key, we can merge that into our table and it won't fail.



Heads-up: Before trying this technique, verify with someone who knows the source system; this is how you should handle "duplicate" records in incremental or delta loads. There may be other cases.


References:

https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge



  • merge operation can fail if multiple rows of the source dataset match and the merge attempts to update the same rows of the target Delta table. According to the SQL semantics of merge, such an update operation is ambiguous as it is unclear which source row should be used to update the matched target row. You can preprocess the source table to eliminate the possibility of multiple matches. See the change data capture example—it shows how to preprocess the change dataset (that is, the source dataset) to retain only the latest change for each key before applying that change into the target Delta table.

martes, 26 de julio de 2022

From data warehouses to lakehouses

In this article we're going to discuss the differences between data warehouses, data lakes and lakehouses, as well as take a deeper look at some of their benefits and drawbacks.


Data warehouses

Back in the 80s, data warehouses were the solution that provided an architectural model for the flow of data from systems of records like ERPs to decision support environments like SAP Business Warehouse, Oracle, and others.

BI and reporting tools where able to connect to the data warehouses to generate dashboards and reports, and also to support decision makers. 

As data volumes grew and the complexity of the data increased (data became semi-structured, nested, etc.), data warehouses had some challenges:

  • High maintenance costs, mainly storage costs (high performance proprietary storage for a proprietary format).

  • There was no support for machine learning cases, as it was only meant for BI and reporting.

  • Lack of scalability and flexibility for handling different data complexities.

These challenges, the cloud and other factors started to shape into something new: the data lake.

Data lakes

In the earlier days of the data lake movement, Hadoop was the main component. There were success stories, there were failure stories, nevertheless a new era had begun: 

  • In many cases, companies have been able to replace expensive data warehouse software with in-house computing clusters running open source Hadoop.

  • It allowed companies to analyse massive amounts of unstructured data (also called big data) in a way that wasn’t possible before.

Nowadays it seems that Spark is the one running the show as the engine for data lakes. But what are the new challenges the data lakes yield?

They lack some basic features that were available for decades in RDBMS and data warehouses such as:

  • support for transactions

  • enforcement of data quality (like formal data types)

  • support of appends and updates without having to re-process multiple files

  • support of locking (or similar mechanism) to avoid inconsistency of updates coming from batch or stream

But on the other hand data lakes offer an extremely affordable and durable storage like Azure Storage or AWS S3, and are enablers of machine learning use cases.

Because of this, companies ended up having a mix of technologies: data lakes, data warehouses, streaming solutions, and others. But having multiple technologies increases the complexity, increases costs, and creates the need to copy data to multiple locations, which in turn increases latency and again costs, and from a security perspective it also increases the attack surface.

Lakehouses

Lakehouse’s promise is to combine the best features of both data warehouses and data lakes. They are trying to enable data management features of data warehouses directly on low-cost storage (e.g Azure Storage, AWS S3) used by delta lakes.

Now I will get a bit more technical and more detailed, because this is the interesting part.

Imagine you are a retailer, you have a table called Items. This table has 9 million records. How would you store it in a data lake? One commonly used approach is with parquet files:

  • Maybe 18 files holding aprox. 0.5 MM records each. In a best case scenario, for this one table you’ll have to handle around 20 such files. There could even be 1,000 files representing the Items table in a suboptimal implementation. The point is that in a data lake you will end up with the “too many files” problem, sooner or later. Remember that in real life you will have more entities than just the items, which will contribute to having more files.

  • Now imagine you need to update 10,000 items (for example to add 1% to their price). You will need to find in which files those products are located, then delete the original files and write new ones instead. While this is happening, some processes could be trying to read those files but will fail. In other words, modifying existing data is very costly, complex and unreliable. 

  • At some point you want to look back and determine when the price was changed and to which articles. In the data lake world, to keep a history of changes, you will need to create and maintain copies of the original files, which creates a significant overhead.

  • You may also want a streaming data flow to update the items, but at the same time there is a batch processes doing an update. And since there are no mechanism for locking, this is not feasible.

  • Since the computing that respond to queries goes “blindly” (without hints like indexes) and scan multiple or all the files, performance is not great in data lakes.

Lastly, all these pain points contribute to having poor data quality issues.

Lakehouse answers to the challenges above are:

  • ACID transactions: every operation is transactional. This means that every operation either fully succeeds or is aborted. When aborted, it is logged, and any residue is cleaned so you can retry later. Modification of existing data is possible because transactions allow you to do fine-grained updates. Real-time operations are consistent, and the historical data versions are automatically stored. The lakehouse also provides snapshots of data to allow developers to easily access and revert to earlier versions for audits, rollbacks, or experiment reproductions.

  • Indexing: a mechanism that helps the compute to read less files to find the needed information.

  • Schema validation: the data you put in the lakehouse must adhere to a defined schema. If data doesn’t adhere it can be moved to a quarantine and an automated notification sent out.

But how do you provide these features on top of cheap storage like the one of data lakes?

With file formats and frameworks like:

You combine any of these formats with a compute engine like Spark and you get ACID transactions, indexing, versioning, etc. on top of automatically managed files stored in cheap storage from the data lake.

Going back to our Items table, if for example you choose Delta Lake, you will get a folder X containing the files with Items data. No need to worry about how many files, or nothing like that. 

You can also have a catalog that will match table Items and the contents of folder X. Thanks to this, you can use SQL, or your preferred language, to query, update, insert or delete records without knowing the specific files. Now you have an actual table (like in RDBMS) but the storage is in a cheap and durable data lake.

There are other SaaS offerings like Snowflake, BigQuery, Etc. that provide same or even more features with storages prices as low as the ones offered by Datalakes.

This article is part I, in part II I will dig deeper in the different engines and formats.


sábado, 8 de enero de 2022

Power BI Hybrid tables - Freedom to choose what you load

Autor: Miguel Peredo Zurcher


(Update 30.January.2022, Hybrid tables now work with Snowflake) (see here)


As an introduction, let's agree that we want dashboards with response times of less than one second. Nobody enjoys (or uses) dashboards that take two or more seconds to load.


How do you achieve fast-loading dashboards in BI?


BI tools have two access modes: Load Mode and Live or Direct Query Mode. In Load Mode, the data is copied (loaded, imported) from the source to the in-memory layer of the BI tool, which for specific volumes provides a sub-second response time. In Direct Query mode, the response time of the dashboard depends on the response time of the source where the tool is connected, which usually is not a sub-second time. There are other aspects to consider to Load Mode (import) or Direct Query mode (live); check your BI tool documentation.


Most cloud Dataware house-oriented products have an incredible response time, they can process vast amounts of data, and the response time will be just a couple of seconds, but not below one second. 


Because of this, you must first load the data to the BI tool (load mode), which is usually an in-memory layer that will respond in less than one second for most cases. And then, query the data from this layer.


So far, so good; we can load our data to the in-memory layer of the BI tool, and our sub-second response time challenge is solved, but can we really load all our data to the in-memory layer?


The answer is a qualified yes. Loading the data to BI tool in-memory engine had some challenges:

  • In-memory is expensive 
  • The complexity of doing incremental loads 
  • Stale dashboards accentuated by streaming and frequent updates into the source data


These challenges have had workarounds, such as Power BI aggregates, allowing you to load aggregated data into the in-memory capacity. But as you can see, there is still the need to load from the data warehouse to the in-memory layer. 


I must mention that several vendors are working on putting an ultra-fast layer on top of their data warehouse solution, for example: What is BigQuery BI Engine? | Google CloudUsing the Search Optimization Service — Snowflake Documentation, etc.


Making an extra load impacts the freshness of the data in your dashboard, meaning the data that has been written (maybe streamed) to the data warehouse is not visible on the dashboard until you do the next load.


Power BI has now a great feature (in public preview) that allows you to tackle the challenges mentioned before; here is the official announcement: Announcing Public Preview of Hybrid Tables in Power BI Premium | Microsoft Power BI Blog | Microsoft Power BI


What can you do with Power BI Hybrid Tables?


It allows you to decide which parts of a table (usually a fact table, like sales for the last ten years) to load in the in-memory layer and which parts you keep in the source but still accessible in the dashboards with different response times.


For example, your Sales table could look like this:





In part A, you have the data has been inserted into the source after the last load to B.

In part B, you have sales data from the previous two years, which resides in memory. You load this data in a scheduled way. (For example, every 8 hours).

In part C, you have historical data.



The objective is to make all the data available for the BI dashboards. 


The access modes for different parts of the Sales table are: 

For A - Direct Query Mode. Response time is acceptable when querying from the source since it's a small amount of data. 

For B - Load Mode. The performance of querying from memory will be top of the line. Most dashboard queries will hit only the last two years of data (as agreed with the business). 

For C - Direct Query Mode. For queries that span more than the last two years, the performance will depend on the source. But again, this is what the business requested.


All this is transparent to the report creators; for them is just another table, but behind the scenes, it is a union of in-memory and direct queries, automatically managed by Power BI.  


If that layout does not fit your scenario, you have the freedom to choose what goes to the in-memory (load mode) and what stays in the source. 


Do you want to try it out?

Remember that you need Power BI Premium (can be PPU or Capacity)


And I suggest following the instructions from the blog mentioned above: Announcing Public Preview of Hybrid Tables in Power BI Premium | Microsoft Power BI Blog | Microsoft Power BI


When experimenting, I first tried Snowflake, but it did not work. I suppose because the feature is still in preview. I posted the issue in the Power BI community:

Hybrid Tables, Snowflake and Query Folding - Microsoft Power BI Community

(Update 30.January.2022, Hybrid tables now work with Snowflake) 


Then I tried Synapse, and it worked.


After publishing to the Power BI premium and doing one refresh that created the partitions, I used Tabular Model Editor to see the partitions and check for each partition if those are in Load Mode or Direct Query Mode. The Tabular Model Editor also allowed me to change the settings for the partitions, but as mentioned in the Blog from Power BI, you can also make the changes programmatically. 


Some screenshots: