What to do when you want to store something in a Parquet file when writing a standard Scala application, not an Apache Spark job? You can use the project created by my colleague — Parquet4S.
First, I am going to create a custom class with custom type parameters (I also included all of the imports in the first code snippet).
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import com.github.mjakubowski84.parquet4s._
import org.apache.parquet.schema.{OriginalType, PrimitiveType}
import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver._
import scala.concurrent.duration._
import scala.concurrent.Await
case class Email(value: String) extends AnyVal
case class FirstName(value: String) extends AnyVal
case class LastName(value: String) extends AnyVal
case class User(firstName: FirstName, lastName: LastName, email: Email)
Because I don’t use Scala built-in types, I have to define the codecs and schemas of those custom types.
implicit val firstNameTypeCodec: OptionalValueCodec[FirstName] =
new OptionalValueCodec[FirstName] {
override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): FirstName = value match {
case StringValue(string) => FirstName(string)
}
override protected def encodeNonNull(data: FirstName, configuration: ValueCodecConfiguration): Value =
StringValue(data.value)
}
implicit val firstNameSchema: TypedSchemaDef[FirstName] =
typedSchemaDef[FirstName](
PrimitiveSchemaDef(
primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
required = true,
originalType = Some(OriginalType.UTF8)
)
)
implicit val lastNameTypeCodec: OptionalValueCodec[LastName] =
new OptionalValueCodec[LastName] {
override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): LastName = value match {
case StringValue(string) => LastName(string)
}
override protected def encodeNonNull(data: LastName, configuration: ValueCodecConfiguration): Value =
StringValue(data.value)
}
implicit val lastNameSchema: TypedSchemaDef[LastName] =
typedSchemaDef[LastName](
PrimitiveSchemaDef(
primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
required = true,
originalType = Some(OriginalType.UTF8)
)
)
implicit val emailTypeCodec: OptionalValueCodec[Email] =
new OptionalValueCodec[Email] {
override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): Email = value match {
case StringValue(string) => Email(string)
}
override protected def encodeNonNull(data: Email, configuration: ValueCodecConfiguration): Value =
StringValue(data.value)
}
implicit val emailSchema: TypedSchemaDef[Email] =
typedSchemaDef[Email](
PrimitiveSchemaDef(
primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
required = true,
originalType = Some(OriginalType.UTF8)
)
)
A lot of boilerplate code, but that is the cost of reusing the domain model classes in the data persistence layer ;)
Want to build AI systems that actually work?
Download my expert-crafted GenAI Transformation Guide for Data Teams and discover how to properly measure AI performance, set up guardrails, and continuously improve your AI solutions like the pros.
Now, I can create an Akka stream which contains the data to be saved, and use the code from the Parquet4S documentation to store the data in parquet files.
val data = Stream.apply(User(FirstName("Test"), LastName("Test"), Email("mail@example.com")))
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
val writeOptions = ParquetWriter.Options(
writeMode = ParquetFileWriter.Mode.OVERWRITE,
compressionCodecName = CompressionCodecName.SNAPPY
)
Source(data).runWith(ParquetStreams.toParquetParallelUnordered(
path = "./users",
parallelism = 4,
options = writeOptions
))
Await.ready(actorSystem.terminate(), 2 minutes)