Sometimes, Databricks can be a bit sluggish. Especially when working with many small parquet files on Azure Data Lake. This sluggishness is often due to the security and read/write access requests that the Databricks cluster needs to maintain. This slows down the processing significantly, as the Databricks cluster reserves memory space for every I/O thread that needs to stay open.
To tackle this problem, you often get the advice to merge several smaller parquet files into a few larger ones that have the right size for processing. This is typically around 1GB each. Easier said than done. My source files, for instance, are delivered several times per day on the bronze lake, and I do not wish to touch these files or change them for data lineage reasons. One method to overcome this issue is a Delta Cache Accelerated Storage Type Worker.
The Delta Cache Accelerated Storage Type Worker
What is the Delta Cache exactly? In a nutshell, the worker copies the requested files to the local SSD drive of the worker. Consequently, performance is improved on a wide range of queries on Data Lake files since the data sits very close to the worker. Moreover, no I/O thread to the Data Lake needs to be maintained. Another advantage of storing the source files on the SSD disks is that no memory is being taken away from the cluster. But be careful, this technique only works on parquet files stored on an Azure Blob Storage or Azure Data Lake (both Gen 1 and Gen2). Other storage formats such as CSV, JSON or ORC are not supported.
The Delta Cache will also maintain its file consistency automatically. It will detect changes to the underlying parquet files on the Data Lake and maintain its cache. This functionality is available from Databricks Runtime 5.5 onwards. To activate the Delta Cache, choose a Delta Cache Accelerated worker.
When you rely heavily on parquet files stored on a Data Lake for your processing, you will benefit from this. Using the Delta Cache hardly gives you any disadvantages. Things change, however, when your sources are CSV files on a Data Lake, as they are not supported by the Delta Cache.
Another type of caching in Databricks is the Spark Cache. The difference between Delta and Spark Cache is that the former caches the parquet source files on the Lake, while the latter caches the content of a dataframe. A dataframe can, of course, contain the outcome of a data operation such as ‘join’.
Anytime I want to cache a dataframe, I need to indicate this in the code by writing df2 = df.cache(). Whenever I do this, I interrupt the lazy cache mechanism of Spark by telling it to keep an intermediary result for later use. This can be handy, for instance, in the following situation:
- Perform a RANK on a dataframe and put it into df_ranked
- Use df_ranked in operation 1
- Use df_ranked in operation 2
- Use df_ranked in operation 3
If I would not cache, Spark will perform the rank three times as it did not save the intermediary result. Do this on a large dataset and the performance gain quickly becomes apparent. Keep in mind that even caching is a lazy operation. 😊 So if I call a cache in my code, and then not use the cached dataframe later on in the code, nothing will actually be cached. To come back to the example of the df_ranked:
- I command the RANK and to put it in cache
- I command to use the outcome of the RANK in operation 1
- ONLY NOW THE RANK WILL RUN, AND THE DATA BE STORED IN CACHE
- Operation 2 does not recalculate the RANK again as it can use the cache. Here you benefit from the cache
- Idem for Operation 3, here you also benefit from the cache
Spark stores the persisted data in memory or on its local SSD drives when the cached datadatframe does not fit in memory. This happens on the raw content of the dataframe, so in an uncompressed format. Unlike the compressed parquet format of the Delta Cache.
The downside of Spark cache
This also brings us to the downside of Spark Caching. Pumping your code full of cache commands and assuming it will run fast is often not a good idea. When datasets are large and calculation is light, recalculating an operation can sometimes result in a faster query response, compared to shipping the data to disk and retrieving it again later. You need to test this case by case, and you often need to re-evaluate these choices in your production code as the volume of data you handle might change over time.
Sparks checks the usage of persisted data on each node, and it will automatically drop cached data when not used. This happens by using the Least-Recently-Used algorithm. Alternatively, you can indicate in your code that Spark can drop cached data by using the unpersist() command. This will remove the datablocks from memory and disk.
Combining Delta Cache and Spark Cache
Spark Caching and Delta Caching can be used together as they operate in a different way. However, since Delta caching improves the calculation speed due to the faster retrieval of source data, you might benefit less from Spark Caching then you might expect. In fact, Spark Caching used in combination with Delta Caching can slow down your code, due to the transfer times to and from the SSDs of the raw dataframe.
A parquet file has a compression of around 10x compared to a raw dataframe. So, unless your calculation is very compute intensive and the outcome of the calculation results in a small dataframe, you might want to think twice before introducing Spark Cache where Delta is already active. In the end, it all boils down to the reuse of a cached dataset and the benefits you enjoy from it.