In this article, I will show you how to extract multiple columns from a single column in a PySpark DataFrame. I am going to use two methods. First, I will use the withColumn
function to create a new column twice.In the second example, I will implement a UDF that extracts both columns at once.
Table of Contents
In both examples, I will use the following example DataFrame:
df_schema = StructType([StructField('to_be_extracted', StringType())])
test_list = [
['1183 Amsterdam'],
['06123 Ankara'],
['08067 Barcelona'],
['3030 Bern'],
['75116 Paris'],
['1149-014 Lisbon'],
['00-999 Warsaw'],
['00199 Rome'],
['HR-10 040 Zagreb']
]
df: DataFrame = spark_session.createDataFrame(test_list, schema=df_schema)
+----------------+
| to_be_extracted|
+----------------+
| 1183 Amsterdam|
| 06123 Ankara|
| 08067 Barcelona|
| 3030 Bern|
| 75116 Paris|
| 1149-014 Lisbon|
| 00-999 Warsaw|
| 00199 Rome|
|HR-10 040 Zagreb|
+----------------+
Note that it contains only one column to_be_extracted
, and that column contains both the postal code and the name of a European city. I want to create separate columns for those two values.
Using the withColumn Function
To separate the postal code from the city name, I need a regular expression that splits the data into two groups. To handle my example data the following expression is sufficient: r'^(.*?)\s(\w*?)$'
. It puts the last word in the text into the second group and everything else (without the space between groups) into the first group.
After defining the regular expression, I can use the withColumn
function and the regex_extract
function to separate the postal code from the city name:
regex = r'^(.*?)\s(\w*?)$'
df \
.withColumn(
'postal_code',
regexp_extract(col('to_be_extracted'), regex, 1)
) \
.withColumn(
'city',
regexp_extract(col('to_be_extracted'), regex, 2)
)
+----------------+-----------+---------+
| to_be_extracted|postal_code| city|
+----------------+-----------+---------+
| 1183 Amsterdam| 1183|Amsterdam|
| 06123 Ankara| 06123| Ankara|
| 08067 Barcelona| 08067|Barcelona|
| 3030 Bern| 3030| Bern|
| 75116 Paris| 75116| Paris|
| 1149-014 Lisbon| 1149-014| Lisbon|
| 00-999 Warsaw| 00-999| Warsaw|
| 00199 Rome| 00199| Rome|
|HR-10 040 Zagreb| HR-10 040| Zagreb|
+----------------+-----------+---------+
In this case, the obvious disadvantage is the need to run the regex_extract function twice.
Want to build AI systems that actually work?
Download my expert-crafted GenAI Transformation Guide for Data Teams and discover how to properly measure AI performance, set up guardrails, and continuously improve your AI solutions like the pros.
Using a UDF
If I want to use a UDF, the code gets a little bit more complicated. First, I have to define the schema of the value returned by the UDF. My return value is going to be a struct containing two text fields:
schema = StructType([
StructField("postal_code", StringType(), False),
StructField("city", StringType(), False)
])
After defining the schema, I have to write the function code:
import re
def extract_in_python(content):
regex = r'^(.*?)\s(\w*?)$'
search_result = re.search(regex, content)
if search_result:
postal_code = search_result.group(1)
city = search_result.group(2)
return postal_code, city
else:
return None, None
Now, I can turn my Python function into a PySpark UDF and use it to extract the data from the column:
extract_udf = udf(extract_in_python, schema)
df \
.withColumn('extracted', extract_udf(col('to_be_extracted')))
+----------------+-------------------+
| to_be_extracted| extracted|
+----------------+-------------------+
| 1183 Amsterdam| [1183, Amsterdam]|
| 06123 Ankara| [06123, Ankara]|
| 08067 Barcelona| [08067, Barcelona]|
| 3030 Bern| [3030, Bern]|
| 75116 Paris| [75116, Paris]|
| 1149-014 Lisbon| [1149-014, Lisbon]|
| 00-999 Warsaw| [00-999, Warsaw]|
| 00199 Rome| [00199, Rome]|
|HR-10 040 Zagreb|[HR-10 040, Zagreb]|
+----------------+-------------------+
I didn’t get the result that I want. Instead of separate columns, I have a single column with the Struct inside. Fortunately, I can easily flatten the Struct using the select
function:
df \
.withColumn('extracted', extract_udf(col('to_be_extracted'))) \
.select(col('to_be_extracted'), col("extracted.*"))
+----------------+-----------+---------+
| to_be_extracted|postal_code| city|
+----------------+-----------+---------+
| 1183 Amsterdam| 1183|Amsterdam|
| 06123 Ankara| 06123| Ankara|
| 08067 Barcelona| 08067|Barcelona|
| 3030 Bern| 3030| Bern|
| 75116 Paris| 75116| Paris|
| 1149-014 Lisbon| 1149-014| Lisbon|
| 00-999 Warsaw| 00-999| Warsaw|
| 00199 Rome| 00199| Rome|
|HR-10 040 Zagreb| HR-10 040| Zagreb|
+----------------+-----------+---------+
Are UDFs Better Than Multiple withColumn Calls?
The short answer is: No. Using a PySpark UDF requires Spark to serialize the Scala objects, run a Python process, deserialize the data in Python, run the function, serialize the results, and deserialize them in Scala. This causes a considerable performance penalty, so I recommend to avoid using UDFs in PySpark.