Objective
A top tech company is facing a challenge with user churn. They maintain three different sources of data: user accounts, user activities, and user exit surveys. All these data sources are represented as three separate DataFrames.
Task
Unfortunately, due to some system glitches, there are duplicates within the df_activities DataFrame. Duplicate rows are defined as rows that have the exact same user_id, activity_date, and activity_type.
Write a PySpark function that removes the duplicates from df_activities and combines all three DataFrames. Ensure that if there is no corresponding row for a user in any of the input DataFrames, the respective columns contain null values in the output DataFrame.
Save your result as result_df. The final output must be sorted by user_id in ascending order, then by activity_date in descending order, and finally by activity_type in ascending order (to break ties deterministically).
File Path
- Accounts Dataset:
/home/interview/accounts.csv
- Activities Dataset:
/home/interview/activities.csv
- Exit Surveys Dataset:
/home/interview/exit_surveys.csv
- Starter script:
/home/interview/churn_analysis.py
Schema
accounts.csv
| Column Name |
Type |
| user_id |
String |
| account_created_date |
Date |
| location |
String |
activities.csv
| Column Name |
Type |
| user_id |
String |
| activity_date |
Date |
| activity_type |
String |
exit_surveys.csv
| Column Name |
Type |
| user_id |
String |
| exit_date |
Date |
| exit_reason |
String |
Expected Output Schema
| Column Name |
Type |
| user_id |
String |
| account_created_date |
Date |
| location |
String |
| activity_date |
Date |
| activity_type |
String |
| exit_date |
Date |
| exit_reason |
String |
Example
Given this sample input:
df_accounts
| user_id |
account_created_date |
location |
| U001 |
2023-01-01 |
New York |
| U002 |
2023-01-05 |
Chicago |
| U003 |
2023-01-10 |
San Francisco |
df_activities
| user_id |
activity_date |
activity_type |
| U001 |
2023-02-01 |
Login |
| U001 |
2023-02-01 |
Login |
| U002 |
2023-02-05 |
File Upload |
| U002 |
2023-02-05 |
File Upload |
| U003 |
2023-02-10 |
Logout |
df_exit_surveys
| user_id |
exit_date |
exit_reason |
| U001 |
2023-03-01 |
Moved to a competitor |
| U002 |
2023-03-05 |
Not user-friendly |
| U003 |
2023-03-10 |
High pricing |
The expected output would be:
| user_id |
account_created_date |
location |
activity_date |
activity_type |
exit_date |
exit_reason |
| U001 |
2023-01-01 |
New York |
2023-02-01 |
Login |
2023-03-01 |
Moved to a competitor |
| U002 |
2023-01-05 |
Chicago |
2023-02-05 |
File Upload |
2023-03-05 |
Not user-friendly |
| U003 |
2023-01-10 |
San Francisco |
2023-02-10 |
Logout |
2023-03-10 |
High pricing |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
df_accounts = spark.read.csv("/home/interview/accounts.csv", header=True, inferSchema=True)
df_activities = spark.read.csv("/home/interview/activities.csv", header=True, inferSchema=True)
df_exit_surveys = spark.read.csv("/home/interview/exit_surveys.csv", header=True, inferSchema=True)
df_activities_clean = df_activities.dropDuplicates(["user_id", "activity_date", "activity_type"])
result_df = df_accounts.join(df_activities_clean, on="user_id", how="outer") \
.join(df_exit_surveys, on="user_id", how="outer")
result_df = result_df.select(
"user_id", "account_created_date", "location",
"activity_date", "activity_type", "exit_date", "exit_reason"
)
result_df = result_df.orderBy(
F.col("user_id").asc(),
F.col("activity_date").desc(),
F.col("activity_type").asc()
)
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Isolating Duplicate Criteria
df_activities_clean = df_activities.dropDuplicates(["user_id", "activity_date", "activity_type"])
Before joining any data, we must resolve the glitch in the activities table. The dropDuplicates() method can take a subset of columns as an argument. By passing ["user_id", "activity_date", "activity_type"], PySpark looks only at those specific columns to determine if a row is a duplicate, dropping any exact matches.
Step 2: Merging the Accounts and Activities
result_df = df_accounts.join(df_activities_clean, on="user_id", how="outer") \
.join(df_exit_surveys, on="user_id", how="outer")
The prompt strictly states that if a user is missing from any of the DataFrames, they should still be included with null values for the missing data. To achieve this without dropping unmatched records, we must use a full outer join (or simply "outer" in PySpark). We chain a second .join() to bring in the df_exit_surveys DataFrame. Because user_id is perfectly consistent across all three tables, it acts as our universal foreign key for both merges.
Step 3: Arranging the Schema
result_df = result_df.select(
"user_id", "account_created_date", "location",
"activity_date", "activity_type", "exit_date", "exit_reason"
)
After performing multiple outer joins, the column order can become unpredictable depending on PySpark's execution plan. We use a .select() block to force the DataFrame to align perfectly with the target Output Schema.
Step 4: Multi-Level Sorting
result_df = result_df.orderBy(
F.col("user_id").asc(),
F.col("activity_date").desc(),
F.col("activity_type").asc()
)
The final requirement asks us to sort the data by user_id (ascending), then by activity_date (descending). Because a single user can have multiple distinct activities on the exact same date (e.g., a Login and a File Upload), we must provide a third tie-breaker sort condition (activity_type ascending) to ensure the output ordering is perfectly deterministic.