Objective
You are a Data Scientist working for the federal government. Your task involves analyzing budget and spending data across various departments to identify financial volatility. You have been given two DataFrames that represent these data sets.
Task
Write a PySpark function that calculates the sample variance in the budget and spending for each department over the years.
Combine the two DataFrames, group the data by department, and calculate the variance for both the budget and the spending. The final variance columns must be cast to an Integer type.
Save the resulting DataFrame as result_df. Ensure the output matches the exact schema order requested, and order the final output alphabetically by Department.
File Path
- Budget Dataset:
/home/interview/budget.csv
- Spending Dataset:
/home/interview/spending.csv
- Starter script:
/home/interview/gov_budget.py
Schema
budget.csv
| Column Name |
Data Type |
| Department |
String |
| Year |
Integer |
| Budget |
Double |
spending.csv
| Column Name |
Data Type |
| Department |
String |
| Year |
Integer |
| Spending |
Double |
Expected Output Schema
| Column Name |
Data Type |
| Department |
String |
| Budget_Variance |
Integer |
| Spending_Variance |
Integer |
Example
Given this sample input:
budget_df
| Department |
Year |
Budget |
| Health |
2019 |
750.0 |
| Education |
2019 |
500.0 |
| Health |
2020 |
800.0 |
| Education |
2020 |
550.0 |
spending_df
| Department |
Year |
Spending |
| Health |
2019 |
700.0 |
| Education |
2019 |
450.0 |
| Health |
2020 |
780.0 |
| Education |
2020 |
540.0 |
The expected output would be:
| Department |
Budget_Variance |
Spending_Variance |
| Education |
1250 |
4050 |
| Health |
1250 |
3200 |
Explanation: * For the Health Department's spending: The mean of (700 and 780) is 740. The sample variance is calculated as ((700 - 740)^2 + (780 - 740)^2) / (2 - 1) = (1600 + 1600) / 1 = 3200.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
budget_df = spark.read.csv("/home/interview/budget.csv", header=True, inferSchema=True)
spending_df = spark.read.csv("/home/interview/spending.csv", header=True, inferSchema=True)
# Step 1: Join the DataFrames on both Department and Year to align the financial records
joined_df = budget_df.join(spending_df, on=["Department", "Year"], how="inner")
# Step 2: Group by department and calculate the sample variance for both metrics
result_df = joined_df.groupBy("Department").agg(
F.variance("Budget").cast("integer").alias("Budget_Variance"),
F.variance("Spending").cast("integer").alias("Spending_Variance")
)
# Step 3: Select columns in the exact requested schema order and sort alphabetically
result_df = result_df.select(
"Department", "Budget_Variance", "Spending_Variance"
).orderBy("Department")
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Multi-Key Joins
joined_df = budget_df.join(spending_df, on=["Department", "Year"], how="inner")
To accurately calculate the variance, we first need to combine the budget and spending datasets. Because a single department appears multiple times across different years, joining on just Department would create a Cartesian product, multiplying the rows incorrectly. By passing a list of keys ["Department", "Year"] into the on parameter, we ensure that the 2019 Health budget aligns perfectly with the 2019 Health spending.
Step 2: Calculating Statistical Variance
result_df = joined_df.groupBy("Department").agg(
F.variance("Budget").cast("integer").alias("Budget_Variance"), ...
)
Variance is a statistical measurement of the spread between numbers in a data set. PySpark provides a built-in F.variance() function (which defaults to sample variance, dividing by N-1). We group the data by Department and apply this aggregation to both the Budget and Spending columns.
Step 3: Casting, Selecting, and Sorting
Because statistical functions like variance calculate means and division, they naturally return highly precise floating-point numbers (Doubles). The Output Schema strictly requests Integer data types. We chain .cast("integer") to truncate these decimals into whole numbers before giving the columns their final .alias(). Finally, we use .select() to ensure the columns are ordered exactly as requested and chain .orderBy("Department") to organize the report.