In this article, I will show you how to extract multiple columns from a single column in a PySpark DataFrame. I am going to use two methods. First, I will use the withColumn
function to create a new column twice.In the second example, I will implement a UDF that extracts both columns at once.
Table of Contents
In both examples, I will use the following example DataFrame:
df_schema = StructType([StructField('to_be_extracted', StringType())])
test_list = [
['1183 Amsterdam'],
['06123 Ankara'],
['08067 Barcelona'],
['3030 Bern'],
['75116 Paris'],
['1149-014 Lisbon'],
['00-999 Warsaw'],
['00199 Rome'],
['HR-10 040 Zagreb']
]
df: DataFrame = spark_session.createDataFrame(test_list, schema=df_schema)
+----------------+
| to_be_extracted|
+----------------+
| 1183 Amsterdam|
| 06123 Ankara|
| 08067 Barcelona|
| 3030 Bern|
| 75116 Paris|
| 1149-014 Lisbon|
| 00-999 Warsaw|
| 00199 Rome|
|HR-10 040 Zagreb|
+----------------+
Note that it contains only one column to_be_extracted
, and that column contains both the postal code and the name of a European city. I want to create separate columns for those two values.
Using the withColumn Function
To separate the postal code from the city name, I need a regular expression that splits the data into two groups. To handle my example data the following expression is sufficient: r'^(.*?)\s(\w*?)$'
. It puts the last word in the text into the second group and everything else (without the space between groups) into the first group.
After defining the regular expression, I can use the withColumn
function and the regex_extract
function to separate the postal code from the city name:
regex = r'^(.*?)\s(\w*?)$'
df \
.withColumn(
'postal_code',
regexp_extract(col('to_be_extracted'), regex, 1)
) \
.withColumn(
'city',
regexp_extract(col('to_be_extracted'), regex, 2)
)
+----------------+-----------+---------+
| to_be_extracted|postal_code| city|
+----------------+-----------+---------+
| 1183 Amsterdam| 1183|Amsterdam|
| 06123 Ankara| 06123| Ankara|
| 08067 Barcelona| 08067|Barcelona|
| 3030 Bern| 3030| Bern|
| 75116 Paris| 75116| Paris|
| 1149-014 Lisbon| 1149-014| Lisbon|
| 00-999 Warsaw| 00-999| Warsaw|
| 00199 Rome| 00199| Rome|
|HR-10 040 Zagreb| HR-10 040| Zagreb|
+----------------+-----------+---------+
In this case, the obvious disadvantage is the need to run the regex_extract function twice.
Get Weekly AI Implementation Insights
Join engineering leaders who receive my analysis of common AI production failures and how to prevent them. No fluff, just actionable techniques.
Using a UDF
If I want to use a UDF, the code gets a little bit more complicated. First, I have to define the schema of the value returned by the UDF. My return value is going to be a struct containing two text fields:
schema = StructType([
StructField("postal_code", StringType(), False),
StructField("city", StringType(), False)
])
After defining the schema, I have to write the function code:
import re
def extract_in_python(content):
regex = r'^(.*?)\s(\w*?)$'
search_result = re.search(regex, content)
if search_result:
postal_code = search_result.group(1)
city = search_result.group(2)
return postal_code, city
else:
return None, None
Now, I can turn my Python function into a PySpark UDF and use it to extract the data from the column:
extract_udf = udf(extract_in_python, schema)
df \
.withColumn('extracted', extract_udf(col('to_be_extracted')))
+----------------+-------------------+
| to_be_extracted| extracted|
+----------------+-------------------+
| 1183 Amsterdam| [1183, Amsterdam]|
| 06123 Ankara| [06123, Ankara]|
| 08067 Barcelona| [08067, Barcelona]|
| 3030 Bern| [3030, Bern]|
| 75116 Paris| [75116, Paris]|
| 1149-014 Lisbon| [1149-014, Lisbon]|
| 00-999 Warsaw| [00-999, Warsaw]|
| 00199 Rome| [00199, Rome]|
|HR-10 040 Zagreb|[HR-10 040, Zagreb]|
+----------------+-------------------+
I didn’t get the result that I want. Instead of separate columns, I have a single column with the Struct inside. Fortunately, I can easily flatten the Struct using the select
function:
df \
.withColumn('extracted', extract_udf(col('to_be_extracted'))) \
.select(col('to_be_extracted'), col("extracted.*"))
+----------------+-----------+---------+
| to_be_extracted|postal_code| city|
+----------------+-----------+---------+
| 1183 Amsterdam| 1183|Amsterdam|
| 06123 Ankara| 06123| Ankara|
| 08067 Barcelona| 08067|Barcelona|
| 3030 Bern| 3030| Bern|
| 75116 Paris| 75116| Paris|
| 1149-014 Lisbon| 1149-014| Lisbon|
| 00-999 Warsaw| 00-999| Warsaw|
| 00199 Rome| 00199| Rome|
|HR-10 040 Zagreb| HR-10 040| Zagreb|
+----------------+-----------+---------+
Are UDFs Better Than Multiple withColumn Calls?
The short answer is: No. Using a PySpark UDF requires Spark to serialize the Scala objects, run a Python process, deserialize the data in Python, run the function, serialize the results, and deserialize them in Scala. This causes a considerable performance penalty, so I recommend to avoid using UDFs in PySpark.