Objective
You are working as a data analyst for a large construction company. Your task is to analyze data spanning across three different DataFrames relating to the company's projects, workforce, and machinery.
Task
Write a PySpark function that aggregates the following information for each project:
- The project's duration in days.
- The total number of employees working on the project.
- The number of unique roles across all employees in the project.
- The total cost of equipment allocated to the project.
Save the resulting DataFrame as result_df and ensure the columns match the exact order of the Output Schema. Use left joins to ensure no projects are dropped if they are missing employees or equipment. Order the final output by project_id in ascending order.
File Path
- Projects Dataset:
/home/interview/projects.csv
- Employees Dataset:
/home/interview/employees.csv
- Equipment Dataset:
/home/interview/equipment.csv
- Starter script:
/home/interview/project_metrics.py
Schema
projects.csv
| Column Name |
Data Type |
| project_id |
int |
| project_name |
string |
| start_date |
date |
| end_date |
date |
| budget |
int |
employees.csv
| Column Name |
Data Type |
| employee_id |
int |
| first_name |
string |
| last_name |
string |
| role |
string |
| project_id |
int |
equipment.csv
| Column Name |
Data Type |
| equipment_id |
int |
| equipment_name |
string |
| project_id |
int |
| cost |
int |
Expected Output Schema
| Column Name |
Data Type |
| project_id |
int |
| project_name |
string |
| start_date |
date |
| end_date |
date |
| duration_days |
int |
| total_employees |
int |
| unique_roles |
int |
| total_equipment_cost |
int |
Example
Given this sample input:
projects
| project_id |
project_name |
start_date |
end_date |
budget |
| 1 |
Skyscraper |
2022-01-01 |
2022-12-31 |
15000000 |
| 2 |
Bridge |
2022-03-01 |
2022-08-31 |
5000000 |
| 3 |
Tunnel |
2022-06-01 |
2023-01-31 |
10000000 |
employees
| employee_id |
first_name |
last_name |
role |
project_id |
| 1 |
John |
Doe |
Engineer |
1 |
| 2 |
Jane |
Smith |
Architect |
1 |
| 3 |
Jim |
Brown |
Project Manager |
1 |
| 4 |
Emily |
Davis |
Engineer |
2 |
| 5 |
Alan |
Johnson |
Architect |
2 |
equipment
| equipment_id |
equipment_name |
project_id |
cost |
| 1 |
Crane |
1 |
25000 |
| 2 |
Excavator |
1 |
15000 |
| 3 |
Bulldozer |
2 |
20000 |
| 4 |
Loader |
2 |
10000 |
| 5 |
Crane |
3 |
25000 |
The output would be:
| project_id |
project_name |
start_date |
end_date |
duration_days |
total_employees |
unique_roles |
total_equipment_cost |
| 1 |
Skyscraper |
2022-01-01 |
2022-12-31 |
364 |
3 |
3 |
40000 |
| 2 |
Bridge |
2022-03-01 |
2022-08-31 |
183 |
2 |
2 |
30000 |
| 3 |
Tunnel |
2022-06-01 |
2023-01-31 |
244 |
null |
null |
25000 |
(Note: Tunnel has equipment but no employees, resulting in null values for those specific metrics).
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
projects = spark.read.csv("/home/interview/projects.csv", header=True, inferSchema=True)
employees = spark.read.csv("/home/interview/employees.csv", header=True, inferSchema=True)
equipment = spark.read.csv("/home/interview/equipment.csv", header=True, inferSchema=True)
# Step 1: Pre-aggregate employees
emp_agg = employees.groupBy("project_id").agg(
F.count("employee_id").cast("int").alias("total_employees"),
F.countDistinct("role").cast("int").alias("unique_roles")
)
# Step 2: Pre-aggregate equipment
equip_agg = equipment.groupBy("project_id").agg(
F.sum("cost").cast("int").alias("total_equipment_cost")
)
# Step 3 & 4: Calculate duration, join DataFrames, and arrange schema
result_df = projects.withColumn("duration_days", F.datediff("end_date", "start_date")) \
.join(emp_agg, on="project_id", how="left") \
.join(equip_agg, on="project_id", how="left") \
.select(
"project_id", "project_name", "start_date", "end_date",
"duration_days", "total_employees", "unique_roles", "total_equipment_cost"
).orderBy("project_id")
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: The Danger of Joining Before Aggregating
If you try to join projects, employees, and equipment together first, you will create a Cartesian explosion. For example, if a project has 5 employees and 4 pieces of equipment, joining them will generate 20 rows (5 x 4) for that single project. Summing the equipment costs afterward would result in a massively inflated number. To prevent this, you must aggregate the child tables individually before joining them back to the parent table.
Step 2: Pre-Aggregating Employees & Equipment
emp_agg = employees.groupBy("project_id").agg(
F.count("employee_id").cast("int").alias("total_employees"),
F.countDistinct("role").cast("int").alias("unique_roles")
)
equip_agg = equipment.groupBy("project_id").agg(
F.sum("cost").cast("int").alias("total_equipment_cost")
)
We process employees using .groupBy("project_id"). We use F.count() to find the total workforce and F.countDistinct() to find the unique roles. We do the same for equipment using F.sum(). We use .cast("int") on these aggregations because PySpark's count and sum functions natively return long integers, but our required schema specifies standard int.
Step 3: Calculating Date Differences
projects.withColumn("duration_days", F.datediff("end_date", "start_date"))
To find the duration of the project, we use PySpark's built-in F.datediff() function. This function takes two date columns and returns the integer difference in days between them.
Step 4: Left Joining Everything Together
.join(emp_agg, on="project_id", how="left") \
.join(equip_agg, on="project_id", how="left") \
.select(...) \
.orderBy("project_id")
With everything cleanly grouped to a one-to-one level, we join emp_agg and equip_agg onto the main projects DataFrame. We use how="left" because some projects might be in the early phases and not have employees assigned yet (like the Tunnel in the example); an inner join would completely drop those projects from the report. Finally, we use .orderBy("project_id") to sort the output deterministically.