What to do when you want to store something in a Parquet file when writing a standard Scala application, not an Apache Spark job? You can use the project created by my colleague — Parquet4S.

Table of Contents

  1. Get Weekly AI Implementation Insights

First, I am going to create a custom class with custom type parameters (I also included all of the imports in the first code snippet).

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import com.github.mjakubowski84.parquet4s._
import org.apache.parquet.schema.{OriginalType, PrimitiveType}
import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver._

import scala.concurrent.duration._
import scala.concurrent.Await

case class Email(value: String) extends AnyVal
case class FirstName(value: String) extends AnyVal
case class LastName(value: String) extends AnyVal

case class User(firstName: FirstName, lastName: LastName, email: Email)

Because I don’t use Scala built-in types, I have to define the codecs and schemas of those custom types.

implicit val firstNameTypeCodec: OptionalValueCodec[FirstName] =
  new OptionalValueCodec[FirstName] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): FirstName = value match {
      case StringValue(string) => FirstName(string)
    }
    override protected def encodeNonNull(data: FirstName, configuration: ValueCodecConfiguration): Value =
      StringValue(data.value)
  }

implicit val firstNameSchema: TypedSchemaDef[FirstName] =
  typedSchemaDef[FirstName](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
      required = true,
      originalType = Some(OriginalType.UTF8)
    )
  )

implicit val lastNameTypeCodec: OptionalValueCodec[LastName] =
  new OptionalValueCodec[LastName] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): LastName = value match {
      case StringValue(string) => LastName(string)
    }
    override protected def encodeNonNull(data: LastName, configuration: ValueCodecConfiguration): Value =
      StringValue(data.value)
  }

implicit val lastNameSchema: TypedSchemaDef[LastName] =
  typedSchemaDef[LastName](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
      required = true,
      originalType = Some(OriginalType.UTF8)
    )
  )

implicit val emailTypeCodec: OptionalValueCodec[Email] =
  new OptionalValueCodec[Email] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): Email = value match {
      case StringValue(string) => Email(string)
    }
    override protected def encodeNonNull(data: Email, configuration: ValueCodecConfiguration): Value =
      StringValue(data.value)
  }

implicit val emailSchema: TypedSchemaDef[Email] =
  typedSchemaDef[Email](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
      required = true,
      originalType = Some(OriginalType.UTF8)
    )
  )

A lot of boilerplate code, but that is the cost of reusing the domain model classes in the data persistence layer ;)

Get Weekly AI Implementation Insights

Join engineering leaders who receive my analysis of common AI production failures and how to prevent them. No fluff, just actionable techniques.

Now, I can create an Akka stream which contains the data to be saved, and use the code from the Parquet4S documentation to store the data in parquet files.

val data = Stream.apply(User(FirstName("Test"), LastName("Test"), Email("mail@example.com")))

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()

val writeOptions = ParquetWriter.Options(
  writeMode = ParquetFileWriter.Mode.OVERWRITE,
  compressionCodecName = CompressionCodecName.SNAPPY
)

Source(data).runWith(ParquetStreams.toParquetParallelUnordered(
  path = "./users",
  parallelism = 4,
  options = writeOptions
))

Await.ready(actorSystem.terminate(), 2 minutes)

Get Weekly AI Implementation Insights

Join engineering leaders who receive my analysis of common AI production failures and how to prevent them. No fluff, just actionable techniques.

Older post

Row number in Apache Spark window — row_number, rank, and dense_rank

This article is mostly a “note to self” because I don’t want to google that anymore ;)

Newer post

Calculating the cumulative sum of a group using Apache Spark

How to use the window function to calculate a cumulative sum

Engineering leaders: Is your AI failing in production? Take the 10-minute assessment
>