Example to Call another Custom Python Function from a PySpark UDF
Now, let us see a few different examples of calling Python custom functions from a Pyspark UDF.
Example 1: Converting a DataFrame column to uppercase
In this example, we will create a spark dataframe ‘df’ using Pyspark that contains names and ages of people. Then we will define a custom Python function ‘to_uppercase()’ which takes a Python String as an argument and converts it to the upper case and stores the result in a new column of that dataframe. Then we created Pyspark UDF using Pyspark’s ‘udf()‘ function.
Python3
# import modules from pyspark.sql import SparkSession from pyspark.sql.functions import udf # Defining the custom Python function here def to_uppercase(string): return string.upper() # Now we will create a SparkSession spark = SparkSession.builder.getOrCreate() # Now create a DataFrame data = [( "Marry" , 25 ), ( "Sunny" , 30 ), ( "Ram" , 35 )] df = spark.createDataFrame(data, [ "name" , "age" ]) # Make a PySpark UDF now to_uppercase_udf = udf(to_uppercase) # Now Apply the UDF to the 'name' column. df = df.withColumn( "name_uppercase" , to_uppercase_udf(df[ "name" ])) # Function to show the DataFrame df.show() |
Output:
Example 2: Calling a custom Python function that combines multiple DataFrame columns
In this example, we will create a dataframe that contains 2 columns – ‘first_name‘ and ‘last_name‘. Then create a Python custom function ‘combine_columns‘ which takes the ‘first_name’ and ‘last_name’ as parameters and returns a column that combines them together to create ‘full_name’‘.
Python3
# import modules from pyspark.sql import SparkSession from pyspark.sql.functions import udf # Defining the custom Python function def combine_columns(col1, col2): return col1 + " " + col2 # Now create a SparkSession spark = SparkSession.builder.getOrCreate() # Now create a DataFrame data = [( "John" , "Doe" ), ( "Ram" , "Kumar" ), ( "Smith" , "Jones" )] df = spark.createDataFrame(data, [ "first_name" , "last_name" ]) # Make a PySpark UDF combine_columns_udf = udf(combine_columns) # Apply the UDF to the 'first_name' and 'last_name' columns df = df.withColumn( "full_name" , combine_columns_udf(df[ "first_name" ], df[ "last_name" ])) # Function to show the DataFrame df.show() |
Output:
Example 3: Calling a Custom Python Function from PySpark UDF with External Libraries
For more complex calculations, PySpark enables us to use external Python libraries within bespoke functions. Assume we wish to use the fuzzy matching library ‘fuzzywuzzy’ and a custom Python method named ‘calculate_similarity’ to compare the similarity between two texts.
In this example, we import the ‘fuzz’ module from the fuzzywuzzy library in Python and use the ‘fuzz.ratio()‘ function to determine the degree of similarity between two texts. We create the unique Python method ‘calculate_similarity()‘ to use the input strings to invoke the ‘fuzz.ratio()’ algorithm. Using the ‘udf()’ function, we build a UDF named ‘similarity_udf’ and define the input and output types. Finally, we use the ‘withColumn()’ method to apply the UDF to the ‘string1’ and ‘string2’ columns, and the resultant DataFrame with the similarity ratios is presented.
Python3
# import modules from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType from fuzzywuzzy import fuzz # Creating a SparkSession spark = SparkSession.builder.getOrCreate() # Sample DataFrame taken with columns 'string1' and 'string2' data = [( "apple" , "apples" ), ( "banana" , "bananas" ), ( "cat" , "dog" )] df = spark.createDataFrame(data, [ "string1" , "string2" ]) # Creating a Custom Python function def calculate_similarity(str1, str2): return fuzz.ratio(str1, str2) # Creating a UDF from the custom function similarity_udf = udf(calculate_similarity, IntegerType()) # Apply the UDF to calculate similarity df.withColumn( "similarity" , similarity_udf(df[ "string1" ], df[ "string2" ])).show() |
Output:
Example 4: Applying a Custom Python Function with Complex Logic
Let’s look at an example where we have a DataFrame with a column of strings representing sentences and we want to use a custom Python function called ‘count_words’ to determine how many words are present in each phrase.
In this illustration, the custom Python function ‘count_words’ uses the ‘split()’ method to break the input text up into words and uses the ‘len()’ function to get the word count. Using the ‘udf()’ function, we build a UDF named ‘count_udf’ and define the input and output types. Finally, we use the ‘withColumn()’ method to apply the UDF to the “sentence” column, and the resultant DataFrame with the word counts is presented.
Python3
# import modules from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType # Creating a SparkSession spark = SparkSession.builder.getOrCreate() # Sample DataFrame with a column 'sentence' data = [( "Hello, PySpark!" ,), ( "PySpark is great in today's world" ,), ( "Spark DataFrames are powerful in python to work on" ,)] df = spark.createDataFrame(data, [ "sentence" ]) # Creating a Custom Python function def count_words(sentence): return len (sentence.split()) # Creating a UDF from the custom function count_udf = udf(count_words, IntegerType()) # Apply the UDF to count words in each sentence df.withColumn( "word_count" , count_udf(df[ "sentence" ])).show() |
Output:
Calling another custom Python function from Pyspark UDF
PySpark, often known as Python API for Apache Spark, was created for distributed data processing. It gives users the ability to efficiently and scalable do complex computations and transformations on large datasets. User-Defined Functions (UDFs), which let users create their unique functions and apply them to Spark DataFrames or RDDs, which is one of the main features of PySpark. Using UDFs, PySpark’s capabilities may be expanded and customized to meet certain needs. In this article, we will learn how to call another custom Python function from Pyspark UDF.