When we use Apache Spark or PySpark, we can store a snapshot of a
DataFrame to reuse it and share it across multiple computations after the first time it is computed.
For example, if we join two
DataFrames with the same
DataFrame, like in the example below, we can cache the DataFrame used in the right side of the join operation to avoid computing the same values twice:
# Example PySpark code that uses the cache operation: to_be_joined = spark.table('source_table') \\ .where(col('column') >= threshold) \\ .select('id_column', 'column', 'another_column') \\ .cache() joined_with_first = first_dataframe.join(to_be_joined, 'id_column', how='left') joined_with_second = second_dataframe.join(to_be_joined, 'id_column', how='inner')
Cache vs. Persist
cache function does not get any parameters and uses the default storage level (currently
The only difference between the
persist and the
cache function is the fact that
persist allows us to specify the storage level we want explicitly.
The storage level property consists of five configuration parameters. We may instruct Spark to persist the data on the disk, keep it in memory, keep it in memory not managed by the JVM that runs the Spark jobs (off-heap cache) or store the data in the deserialized form. We can also specify the replication to keep a copy of the data on multiple worker nodes, so we don’t have to recompute the cached
DataFrame if a worker fails.
When we cache a
DataFrame only in memory, Spark will NOT fail when there is not enough memory to store the whole DataFrame. Instead, it will cache as many partitions as possible and recompute the remaining ones when required.
Anyway, this is the fastest cache because if the
DataFrame fits in memory, we will not need to recompute anything. We will also not wait for loading data from the disk and deserializing it.
Note that to use the off-heap memory, we must specify two Spark configuration parameters. First, we have to enable the off-heap memory usage by setting the
spark.memory.offHeap.enabled parameter to
true. We also must specify the amount of memory available for off-heap storage using the
Using off-heap memory reduces garbage collection overhead, but it increases the number of computations Spark must do. To store data off-heap, it must be serialized. When we need it again, we must deserialize it and turn it into Java objects again.
Therefore, off-heap memory usage seems to be such a low-level optimization that I suggest trying everything else before you even start thinking about using the off-heap cache.
The only storage level that does not require any serialization is
MEMORY_ONLY. Serialization not only allows Spark to use off-heap memory and to store data on a disk, but it also packs the objects in a smaller amount of memory. Of course, when we want to use those objects again, they must be deserialized, and the corresponding Java objects are created again.
StorageLevel class lets us specify the number of nodes that will copy every partition of the cached
DataFrame. If we create our own instance of the class, we can use any number we want, but the built-in types allow us to choose between two options: no replication or two copies of every partition.
Spark comes with a bunch of predefined storage levels that we can use out-of-the-box. There is no single best storage level that you should use in all cases. The following table shows which storage levels use the most RAM (RAM used) or require additional computation (CPU used).
|Storage level||RAM used||CPU used|
|MEMORY_ONLY||The highest RAM usage||No|
|MEMORY_ONLY_SER||Less than MEMORY_ONLY||Uses CPU to serialize and deserialize the data|
|MEMORY_AND_DISK||Stores in RAM as many partitions as possible||Must serialize and deserialize the partitions that don’t fit in memory and get stored on the disk|
|MEMORY_AND_DISK_SER||Serializes the objects, so it needs less memory to store the partitions. Hence, more of them should be cached in RAM||Uses CPU to serialize and deserialize data even if nothing is stored on the disk|
|DISK_ONLY||The lowest RAM usage||Must serialize and deserialize the whole DataFrame|