Objective
You are working with a property management company that tracks landlords and their rental properties.
Task
Compute the total rental income for each landlord who owns at least one property. The total rental income is the sum of the rent values across all properties belonging to that landlord. Create a landlord_name column by combining the landlord's first_name and last_name with a space in between. The final output should contain only landlords who have properties. Save your result as result_df.
File Path
- Properties:
/home/interview/properties.csv
- Landlords:
/home/interview/landlords.csv
- Starter script:
/home/interview/rental_income.py
Schema
properties.csv
| Column |
Type |
| property_id |
integer |
| landlord_id |
integer |
| property_type |
string |
| rent |
float |
| square_feet |
integer |
| city |
string |
landlords.csv
| Column |
Type |
| landlord_id |
integer |
| first_name |
string |
| last_name |
string |
| email |
string |
| phone |
string |
Expected output schema
| Column |
Type |
| landlord_id |
integer |
| landlord_name |
string |
| total_rental_income |
float |
Example
Given this sample input:
landlords
properties
| property_id |
landlord_id |
property_type |
rent |
square_feet |
city |
| 1 |
101 |
Apartment |
1500.0 |
850 |
New York |
| 2 |
101 |
Condo |
1200.0 |
700 |
New York |
| 3 |
102 |
House |
2500.0 |
1800 |
Los Angeles |
The output would be:
| landlord_id |
landlord_name |
total_rental_income |
| 101 |
John Smith |
2700.0 |
| 102 |
Maria Garcia |
2500.0 |
Landlord 101 owns two properties with rents of 1500.0 and 1200.0, so their total is 2700.0. Landlord 102 has one property at 2500.0. Landlord 104 has no properties and is excluded from the output.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
properties = spark.read.csv("/home/interview/properties.csv", header=True, inferSchema=True)
landlords = spark.read.csv("/home/interview/landlords.csv", header=True, inferSchema=True)
income_df = properties.groupBy("landlord_id").agg(F.sum("rent").alias("total_rental_income"))
joined_df = income_df.join(landlords, on="landlord_id")
result_df = joined_df.select(
"landlord_id",
F.concat_ws(" ", "first_name", "last_name").alias("landlord_name"),
"total_rental_income"
)
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Loading the DataFrames
properties = spark.read.csv("/home/interview/properties.csv", header=True, inferSchema=True)
landlords = spark.read.csv("/home/interview/landlords.csv", header=True, inferSchema=True)
Each CSV is loaded as its own DataFrame. inferSchema=True tells Spark to detect column types automatically, so rent comes in as a double and landlord_id as an integer rather than everything being a string.
Step 2: Aggregating Rental Income
income_df = properties.groupBy("landlord_id").agg(F.sum("rent").alias("total_rental_income"))
groupBy("landlord_id") groups all properties belonging to the same landlord. Then F.sum("rent") adds up the rent values within each group. .alias("total_rental_income") gives the resulting column a meaningful name. This is equivalent to SELECT landlord_id, SUM(rent) AS total_rental_income FROM properties GROUP BY landlord_id in SQL.
Step 3: Joining with Landlords
joined_df = income_df.join(landlords, on="landlord_id")
The default join type is inner, which means only landlord IDs that appear in both the aggregated income DataFrame and the landlords DataFrame will be kept. Since the aggregation only includes landlords who have properties, any landlord without properties is naturally excluded.
Step 4: Creating the Landlord Name
result_df = joined_df.select(
"landlord_id",
F.concat_ws(" ", "first_name", "last_name").alias("landlord_name"),
"total_rental_income"
)
concat_ws (concatenate with separator) joins the first_name and last_name columns with a space between them. The select() call also serves as a projection, keeping only the three columns the output requires and dropping everything else like email and phone.