Objective
As an AI engineer at an innovative technology company, you are tracking the performance and utilization of various AI models developed over the years. You are given two DataFrames: df_models, containing model metadata, and df_usage, containing daily usage logs.
Task
Write a PySpark function that merges the information from both DataFrames based on Model_ID. In addition, it should compute the total number of uses for each model over time, and the average accuracy of each Model_Type.
Save your resulting DataFrame as result_df. Ensure the output matches the exact schema order requested.
File Path
- Models Dataset:
/home/interview/models.csv
- Usage Dataset:
/home/interview/usage.csv
- Starter script:
/home/interview/ml_metrics.py
Schema
models.csv
| Column Name |
Data Type |
| Model_ID |
String |
| Model_Name |
String |
| Model_Type |
String |
| Accuracy |
Float |
usage.csv
| Column Name |
Data Type |
| Model_ID |
String |
| Date |
Date |
| Uses |
Integer |
Expected Output Schema
| Column Name |
Data Type |
| Model_ID |
String |
| Model_Name |
String |
| Model_Type |
String |
| Accuracy |
Float |
| Total_Uses |
Integer |
| Average_Accuracy |
Float |
Example
Given this sample input:
df_models
| Model_ID |
Model_Name |
Model_Type |
Accuracy |
| M1 |
ModelA |
Type1 |
0.85 |
| M2 |
ModelB |
Type2 |
0.78 |
| M3 |
ModelC |
Type1 |
0.88 |
| M4 |
ModelD |
Type3 |
0.92 |
| M5 |
ModelE |
Type2 |
0.82 |
df_usage
| Model_ID |
Date |
Uses |
| M1 |
2023-01-01 |
100 |
| M1 |
2023-01-02 |
120 |
| M2 |
2023-01-01 |
200 |
| M3 |
2023-01-01 |
150 |
| M4 |
2023-01-02 |
130 |
The output would be:
| Model_ID |
Model_Name |
Model_Type |
Accuracy |
Total_Uses |
Average_Accuracy |
| M1 |
ModelA |
Type1 |
0.85 |
220 |
0.865 |
| M2 |
ModelB |
Type2 |
0.78 |
200 |
0.8 |
| M3 |
ModelC |
Type1 |
0.88 |
150 |
0.865 |
| M4 |
ModelD |
Type3 |
0.92 |
130 |
0.92 |
| M5 |
ModelE |
Type2 |
0.82 |
0 |
0.8 |
Notice how M5 has no usage logs in df_usage, but it still appears in the output with Total_Uses = 0 because of the left join. Its Average_Accuracy (0.8) is the average of all Type2 models (M2 at 0.78 and M5 at 0.82).
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
df_models = spark.read.csv("/home/interview/models.csv", header=True, inferSchema=True)
df_usage = spark.read.csv("/home/interview/usage.csv", header=True, inferSchema=True)
# Step 1: Pre-aggregate the usage data per model
usage_agg = df_usage.groupBy("Model_ID").agg(
F.sum("Uses").alias("Total_Uses")
)
# Step 2: Join the aggregated usage back to the model metadata
joined_df = df_models.join(usage_agg, on="Model_ID", how="left")
# Step 3: Define a window to calculate average accuracy by Model_Type
type_window = Window.partitionBy("Model_Type")
# Step 4: Apply the window function and format columns
result_df = joined_df.withColumn(
"Average_Accuracy",
F.avg("Accuracy").over(type_window).cast("float")
).withColumn(
"Total_Uses",
F.coalesce(F.col("Total_Uses").cast("integer"), F.lit(0))
)
# Step 5: Select the final schema columns in the requested order
result_df = result_df.select(
"Model_ID", "Model_Name", "Model_Type", "Accuracy", "Total_Uses", "Average_Accuracy"
)
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Pre-Aggregating Usage
usage_agg = df_usage.groupBy("Model_ID").agg(
F.sum("Uses").alias("Total_Uses")
)
Because df_usage contains multiple daily logs for a single model, joining it directly to df_models first would duplicate the model rows. To avoid this, we pre-aggregate the usage data. We group by Model_ID and sum the Uses, giving us exactly one row per model with its total lifetime usage.
Step 2: Joining the DataFrames
joined_df = df_models.join(usage_agg, on="Model_ID", how="left")
We bring the aggregated totals into our main metadata table using a left join. This ensures that even if a model was recently deployed and has zero usage logs, it is not dropped from our final report.
Step 3: Creating a Category-Level Window
type_window = Window.partitionBy("Model_Type")
We are asked to find the average accuracy of each Model_Type (e.g., the average of all "Type1" models) and attach that average to every individual row. While you could create a second .groupBy() DataFrame and do another join, defining a Window partitioned by Model_Type is much cleaner and faster.
Step 4: Applying the Window Function and Handling Nulls
result_df = joined_df.withColumn(
"Average_Accuracy",
F.avg("Accuracy").over(type_window).cast("float")
).withColumn(
"Total_Uses",
F.coalesce(F.col("Total_Uses").cast("integer"), F.lit(0))
)
We compute the average over the window using F.avg("Accuracy").over(type_window). Because we used a left join earlier, any unused models will have a null value for Total_Uses. We wrap Total_Uses in F.coalesce() to convert those nulls safely to 0. Both metrics are explicitly cast to match the Float and Integer constraints of the output schema.
Step 5: Enforcing the Schema Order
result_df = result_df.select(
"Model_ID", "Model_Name", "Model_Type", "Accuracy", "Total_Uses", "Average_Accuracy"
)
PySpark joins can leave the output columns in an unpredictable order. We chain a final .select() block to force the DataFrame to adhere perfectly to the requested Output Schema column order.