Objective
You are given a DataFrame that represents user interactions on a popular social media platform. Each row represents a single interaction between two users.
Task
Write a PySpark function that finds users who have interacted with themselves. This is possible when a user makes a post and then likes, comments, or shares it themselves.
Filter the DataFrame to find rows where user1_id matches user2_id. Calculate the total number of self-interactions for each of these users.
Save your resulting DataFrame as result_df. Ensure the output strictly matches the requested Output Schema (rename user1_id to user_id). Sort the final output by user_id in ascending order. Explicitly cast self_interaction_count to integer type using .cast("int").
File Path
- Interactions Dataset:
/home/interview/interactions.csv
- Starter script:
/home/interview/self_interactions.py
Schema
interactions.csv
| Column Name |
Data Type |
| interaction_id |
integer |
| user1_id |
integer |
| user2_id |
integer |
| interaction_type |
string |
| timestamp |
timestamp |
Expected Output Schema
| Column Name |
Data Type |
| user_id |
integer |
| self_interaction_count |
integer |
Example
Given this sample input:
input_df
| interaction_id |
user1_id |
user2_id |
interaction_type |
timestamp |
| 1 |
1001 |
2002 |
like |
2023-01-01 10:00:00 |
| 2 |
1002 |
1002 |
comment |
2023-01-01 11:00:00 |
| 3 |
1003 |
2003 |
share |
2023-01-02 10:00:00 |
| 4 |
1004 |
1004 |
like |
2023-01-02 11:00:00 |
| 5 |
1005 |
2005 |
comment |
2023-01-03 10:00:00 |
The expected output would be:
| user_id |
self_interaction_count |
| 1002 |
1 |
| 1004 |
1 |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
input_df = spark.read.csv("/home/interview/interactions.csv", header=True, inferSchema=True)
# Step 1: Filter the DataFrame where user1_id is exactly equal to user2_id
self_interactions = input_df.filter(F.col("user1_id") == F.col("user2_id"))
# Step 2: Group by the user ID (renaming it to match the schema) and count the occurrences
result_df = self_interactions.groupBy(F.col("user1_id").alias("user_id")) \
.agg(F.count("*").cast("integer").alias("self_interaction_count"))
# Step 3: Select and order the final output
result_df = result_df.select("user_id", "self_interaction_count").orderBy("user_id")
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Filtering by Column Comparison
self_interactions = input_df.filter(F.col("user1_id") == F.col("user2_id"))
To find self-interactions, we don't need to filter by a specific static number (like user1_id == 1002). Instead, we can dynamically compare two columns against each other row-by-row. By using .filter(F.col("user1_id") == F.col("user2_id")), PySpark automatically isolates any row where the person performing the action is the exact same person receiving the action.
Step 2: Grouping and Counting
result_df = self_interactions.groupBy(F.col("user1_id").alias("user_id")) \
.agg(F.count("*").cast("integer").alias("self_interaction_count"))
Next, we need to count how many times each user performed a self-interaction. We use .groupBy() on the user ID. Notice that we can use .alias("user_id") directly inside the groupBy clause to seamlessly rename the user1_id column to match the requested Output Schema!
After grouping, we use .agg(F.count("*")) to count the number of rows (interactions) in each user's bucket. We explicitly cast this count to an integer and alias it as self_interaction_count.
Step 3: Formatting and Sorting
result_df = result_df.select("user_id", "self_interaction_count").orderBy("user_id")
Finally, we chain a .select() statement to ensure the columns are strictly arranged in the exact order requested, and we use .orderBy("user_id") to organize the report numerically.