Objective
You are working for a mortgage company which holds records of multiple mortgage types from various users. The company maintains two different DataFrames: MortgageDetails (which logs all the mortgage types and their base rates) and UserMortgages (which tracks which user selected which mortgage).
Task
Write a PySpark function that calculates the interest rate for each type of mortgage based on user adoption.
The RateOfMortgage is calculated specifically as the sum of the InterestRate for each MortgageType divided by the count of UserID for each MortgageType.
Save your resulting DataFrame as result_df. Order the final output alphabetically by MortgageType.
File Path
- Details Dataset:
/home/interview/mortgage_details.csv
- Users Dataset:
/home/interview/user_mortgages.csv
- Starter script:
/home/interview/mortgage_rates.py
Schema
mortgage_details.csv
| Column Name |
Type |
| MortgageID |
String |
| MortgageType |
String |
| InterestRate |
Double |
user_mortgages.csv
| Column Name |
Type |
| UserID |
String |
| MortgageID |
String |
Expected Output Schema
| Column Name |
Type |
| MortgageType |
String |
| RateOfMortgage |
Double |
Example
Given this sample input:
MortgageDetails
| MortgageID |
MortgageType |
InterestRate |
| M1 |
Fixed |
4.5 |
| M2 |
Variable |
3.2 |
| M3 |
Adjustable |
2.8 |
UserMortgages
| UserID |
MortgageID |
| U1 |
M1 |
| U2 |
M1 |
| U3 |
M2 |
| U4 |
M3 |
The expected output would be:
| MortgageType |
RateOfMortgage |
| Adjustable |
2.8 |
| Fixed |
4.5 |
| Variable |
3.2 |
Explanation: * "Fixed" has two users (U1, U2). The sum of their interest rates is 4.5 + 4.5 = 9.0. Divided by 2 users = 4.5.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
df_details = spark.read.csv("/home/interview/mortgage_details.csv", header=True, inferSchema=True)
df_users = spark.read.csv("/home/interview/user_mortgages.csv", header=True, inferSchema=True)
# Step 1: Join the users table with the mortgage details table
joined_df = df_users.join(df_details, on="MortgageID", how="inner")
# Step 2: Group by the mortgage type and apply the custom formula
result_df = joined_df.groupBy("MortgageType").agg(
(F.sum("InterestRate") / F.count("UserID")).alias("RateOfMortgage")
)
# Step 3: Enforce schema order and sort alphabetically without rounding
result_df = result_df.select(
"MortgageType",
"RateOfMortgage"
).orderBy("MortgageType")
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Joining the DataFrames
joined_df = df_users.join(df_details, on="MortgageID", how="inner")
To calculate metrics based on user counts and interest rates, we must combine the tables. Because MortgageID acts as the primary key in the details table and the foreign key in the users table, we execute an inner join on that column. This pairs every user with the specific rates of their chosen mortgage plan.
Step 2: Grouping by Category
joined_df.groupBy("MortgageType")
The output requires one row per MortgageType. By calling .groupBy("MortgageType"), PySpark collects all the joined records and buckets them by their type (e.g., placing all "Fixed" records into one group and all "Variable" records into another).
Step 3: Applying the Custom Math Formula
.agg(
(F.sum("InterestRate") / F.count("UserID")).alias("RateOfMortgage")
)
The prompt specifies a strict custom formula: RateOfMortgage is the sum of the InterestRate divided by the count of UserID. Inside the .agg() method, PySpark allows you to perform inline arithmetic between different aggregation functions. We simply divide F.sum() by F.count() and alias the result. (Note: Mathematically, calculating the sum of a value and dividing it by the count of its occurrences is identical to just using F.avg("InterestRate"), but explicitly writing out the requested formula shows a stronger command of PySpark arithmetic).
Step 4: Formatting and Sorting
result_df = result_df.select("MortgageType", "RateOfMortgage").orderBy("MortgageType")
Finally, we chain a .select() statement to enforce the exact column order requested by the schema, and we use .orderBy("MortgageType") to arrange the final report alphabetically. We intentionally avoid rounding the result so we don't obscure the underlying floating-point math.