Objective
You are a web developer working with various teams on your company's website. You have access to three separate DataFrames, each representing different types of user interactions with your website: visits, likes, and comments.
Task
All three DataFrames represent distinct user interactions and share the user_id and page_id columns. However, their timestamp columns have different names.
Write a PySpark function that combines these three DataFrames into one unified table.
- The timestamp column should be standardized to
interaction_time.
- You must add a new column called
interaction_type that indicates the type of interaction ('visit', 'like', or 'comment').
Save your resulting DataFrame as result_df. Ensure the output matches the exact schema order requested. Sort the final DataFrame chronologically by interaction_time (ascending). If multiple interactions happen at the exact same time, break ties by sorting interaction_type (ascending) and then user_id (ascending).
File Path
- Visits Dataset:
/home/interview/page_visits.csv
- Likes Dataset:
/home/interview/page_likes.csv
- Comments Dataset:
/home/interview/page_comments.csv
- Starter script:
/home/interview/user_interactions.py
Schema
page_visits.csv
| Column Name |
Data Type |
| user_id |
string |
| page_id |
string |
| visit_time |
timestamp |
page_likes.csv
| Column Name |
Data Type |
| user_id |
string |
| page_id |
string |
| like_time |
timestamp |
page_comments.csv
| Column Name |
Data Type |
| user_id |
string |
| page_id |
string |
| comment_time |
timestamp |
Expected Output Schema
| Column Name |
Data Type |
| user_id |
string |
| page_id |
string |
| interaction_time |
timestamp |
| interaction_type |
string |
Example
Given this sample input:
page_visits
| user_id |
page_id |
visit_time |
| U1 |
P1 |
2023-01-01 12:00:00 |
| U2 |
P3 |
2023-01-02 15:30:00 |
| U3 |
P2 |
2023-01-03 10:45:00 |
page_likes
| user_id |
page_id |
like_time |
| U1 |
P2 |
2023-01-02 14:20:00 |
| U2 |
P1 |
2023-01-03 16:40:00 |
| U3 |
P3 |
2023-01-04 18:55:00 |
page_comments
| user_id |
page_id |
comment_time |
| U1 |
P3 |
2023-01-03 13:00:00 |
| U2 |
P2 |
2023-01-04 17:10:00 |
| U3 |
P1 |
2023-01-05 19:25:00 |
The expected output would be:
| user_id |
page_id |
interaction_time |
interaction_type |
| U1 |
P1 |
2023-01-01 12:00:00 |
visit |
| U1 |
P2 |
2023-01-02 14:20:00 |
like |
| U2 |
P3 |
2023-01-02 15:30:00 |
visit |
| U3 |
P2 |
2023-01-03 10:45:00 |
visit |
| U1 |
P3 |
2023-01-03 13:00:00 |
comment |
| U2 |
P1 |
2023-01-03 16:40:00 |
like |
| U2 |
P2 |
2023-01-04 17:10:00 |
comment |
| U3 |
P3 |
2023-01-04 18:55:00 |
like |
| U3 |
P1 |
2023-01-05 19:25:00 |
comment |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
page_visits = spark.read.csv("/home/interview/page_visits.csv", header=True, inferSchema=True)
page_likes = spark.read.csv("/home/interview/page_likes.csv", header=True, inferSchema=True)
page_comments = spark.read.csv("/home/interview/page_comments.csv", header=True, inferSchema=True)
# Step 1: Standardize the visits DataFrame
visits_clean = page_visits.withColumnRenamed("visit_time", "interaction_time") \
.withColumn("interaction_type", F.lit("visit"))
# Step 2: Standardize the likes DataFrame
likes_clean = page_likes.withColumnRenamed("like_time", "interaction_time") \
.withColumn("interaction_type", F.lit("like"))
# Step 3: Standardize the comments DataFrame
comments_clean = page_comments.withColumnRenamed("comment_time", "interaction_time") \
.withColumn("interaction_type", F.lit("comment"))
# Step 4: Union the DataFrames vertically
result_df = visits_clean.unionByName(likes_clean).unionByName(comments_clean)
# Step 5: Format the final schema and sort chronologically with tie-breakers
result_df = result_df.select("user_id", "page_id", "interaction_time", "interaction_type") \
.orderBy("interaction_time", "interaction_type", "user_id")
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Standardizing the Schemas
visits_clean = page_visits.withColumnRenamed("visit_time", "interaction_time") \
.withColumn("interaction_type", F.lit("visit"))
Before we can stack DataFrames on top of each other, they must share the exact same column names. We use .withColumnRenamed() on each table to change their specific timestamp columns (visit_time, like_time, etc.) into the universal interaction_time.
Then, so we don't lose track of where the data came from after it's merged, we use .withColumn() and F.lit() to inject a literal string value (like "visit") into every row of that specific DataFrame.
Step 2: Combining the DataFrames Vertically
result_df = visits_clean.unionByName(likes_clean).unionByName(comments_clean)
Unlike a .join() which merges DataFrames horizontally based on a matching key, we want to stack these logs vertically to create one long master list. We use .unionByName(). This function appends the rows of the DataFrames together, matching them up safely based on their identical column names.
Step 3: Formatting and Sorting
result_df = result_df.select("user_id", "page_id", "interaction_time", "interaction_type") \
.orderBy("interaction_time", "interaction_type", "user_id")
Finally, we ensure the columns are arranged exactly as requested in the Expected Output Schema using .select(). We chain an .orderBy() using interaction_time to provide a chronological timeline, adding interaction_type and user_id as strict secondary criteria so identical timestamps are sorted deterministically.