Objective
You work as a data analyst for a multinational flooring company, "Floors 'R' Us". Your task is to process and analyze data from three different sources - the customers, orders, and products tables.
Task
The full_name column in the customers DataFrame is a combination of the customer's first and last name, separated by a space. The product_info column in the products DataFrame contains the type and color of the product, separated by a comma.
Write a function that:
- Joins the three tables so each order has its associated customer and product information.
- Splits the
full_name column into first_name and last_name columns.
- Splits the
product_info column into product_type and product_color columns.
Save your resulting DataFrame as result_df. Ensure the output exactly matches the requested Output Schema, containing all the orders.
File Path
- Customers Dataset:
/home/interview/customers.csv
- Orders Dataset:
/home/interview/orders.csv
- Products Dataset:
/home/interview/products.csv
- Starter script:
/home/interview/floors_r_us.py
Schema
customers.csv
| column_name |
data_type |
| customer_id |
int |
| full_name |
string |
| location |
string |
orders.csv
| column_name |
data_type |
| order_id |
int |
| customer_id |
int |
| product_id |
int |
| quantity |
int |
products.csv
| column_name |
data_type |
| product_id |
int |
| product_info |
string |
Expected Output Schema
| column_name |
data_type |
| order_id |
int |
| customer_id |
int |
| first_name |
string |
| last_name |
string |
| location |
string |
| product_id |
int |
| product_type |
string |
| product_color |
string |
| quantity |
int |
Example
Given this sample input:
customers
| customer_id |
full_name |
location |
| 1 |
John Doe |
Texas |
| 2 |
Jane Smith |
California |
orders
| order_id |
customer_id |
product_id |
quantity |
| 1001 |
1 |
101 |
5 |
| 1002 |
2 |
102 |
2 |
products
| product_id |
product_info |
| 101 |
Carpet,Red |
| 102 |
Tile,Blue |
The expected output would be:
| order_id |
customer_id |
first_name |
last_name |
location |
product_id |
product_type |
product_color |
quantity |
| 1001 |
1 |
John |
Doe |
Texas |
101 |
Carpet |
Red |
5 |
| 1002 |
2 |
Jane |
Smith |
California |
102 |
Tile |
Blue |
2 |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
customers = spark.read.csv("/home/interview/customers.csv", header=True, inferSchema=True)
orders = spark.read.csv("/home/interview/orders.csv", header=True, inferSchema=True)
products = spark.read.csv("/home/interview/products.csv", header=True, inferSchema=True)
# Step 1: Combine all three DataFrames starting from the central orders table
joined_df = orders.join(customers, on="customer_id", how="inner") \
.join(products, on="product_id", how="inner")
# Step 2: Split the full_name (by space) and product_info (by comma) and extract the elements
split_df = joined_df.withColumn("first_name", F.split(F.col("full_name"), " ").getItem(0)) \
.withColumn("last_name", F.split(F.col("full_name"), " ").getItem(1)) \
.withColumn("product_type", F.split(F.col("product_info"), ",").getItem(0)) \
.withColumn("product_color", F.split(F.col("product_info"), ",").getItem(1))
# Step 3: Select and order the columns to match the Expected Output Schema exactly
result_df = split_df.select(
"order_id", "customer_id", "first_name", "last_name",
"location", "product_id", "product_type", "product_color", "quantity"
)
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Multi-Table Joins
joined_df = orders.join(customers, on="customer_id", how="inner") \
.join(products, on="product_id", how="inner")
In a normalized database, the orders table acts as the central hub connecting transactions to reference tables. We can chain multiple .join() methods together. First, we join orders to customers using the shared customer_id. Then, taking that combined result, we immediately join it to the products table using the shared product_id.
Step 2: String Splitting and Array Extraction
.withColumn("first_name", F.split(F.col("full_name"), " ").getItem(0))
To break apart the aggregated strings, we use F.split().
- For the name, we split by a space:
F.split(..., " "). This creates a PySpark Array (e.g., ["John", "Doe"]).
- To extract the specific words out of that array and into their own columns, we chain
.getItem(index). Arrays in PySpark are zero-indexed, meaning .getItem(0) grabs the first item (first name or product type) and .getItem(1) grabs the second item (last name or product color).
Step 3: Output Formatting
result_df = split_df.select(
"order_id", "customer_id", "first_name", "last_name",
...
)
Because we joined multiple tables and extracted new columns, our DataFrame is cluttered with old data (like the original full_name and product_info columns). We use a clean .select() statement to pluck out only the columns we need, in the exact order requested by the Output Schema.