Objective
You are working on a CRM platform that manages customers, orders, and products in separate tables.
Task
Join all three DataFrames together and produce a single output containing the order details. Create a customer_name column by combining the customer's first and last name with a space in between. Rename email to customer_email and category to product_category. Save your result as result_df.
File Path
- Customers:
/home/interview/customers.csv
- Orders:
/home/interview/orders.csv
- Products:
/home/interview/products.csv
- Starter script:
/home/interview/crm_orders.py
Schema
customers.csv
| Column |
Type |
| customer_id |
integer |
| first_name |
string |
| last_name |
string |
| email |
string |
orders.csv
| Column |
Type |
| order_id |
integer |
| customer_id |
integer |
| product_id |
integer |
| order_date |
string |
products.csv
| Column |
Type |
| product_id |
integer |
| product_name |
string |
| category |
string |
Expected output schema
| Column |
Type |
| order_id |
integer |
| customer_name |
string |
| customer_email |
string |
| product_name |
string |
| product_category |
string |
| order_date |
string |
Example
Given this sample input:
customers
orders
| order_id |
customer_id |
product_id |
order_date |
| 1001 |
1 |
101 |
2023-01-10 |
| 1002 |
2 |
102 |
2023-01-11 |
products
| product_id |
product_name |
category |
| 101 |
Product A |
Electronics |
| 102 |
Product B |
Clothing |
The output would be:
| order_id |
customer_name |
customer_email |
product_name |
product_category |
order_date |
| 1001 |
John Doe |
[email protected] |
Product A |
Electronics |
2023-01-10 |
| 1002 |
Jane Smith |
[email protected] |
Product B |
Clothing |
2023-01-11 |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
customers = spark.read.csv("/home/interview/customers.csv", header=True, inferSchema=True)
orders = spark.read.csv("/home/interview/orders.csv", header=True, inferSchema=True)
products = spark.read.csv("/home/interview/products.csv", header=True, inferSchema=True)
joined_df = orders.join(customers, on="customer_id").join(products, on="product_id")
result_df = joined_df.select(
"order_id",
F.concat_ws(" ", "first_name", "last_name").alias("customer_name"),
F.col("email").alias("customer_email"),
"product_name",
F.col("category").alias("product_category"),
"order_date"
)
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Loading Three DataFrames
customers = spark.read.csv("/home/interview/customers.csv", header=True, inferSchema=True)
orders = spark.read.csv("/home/interview/orders.csv", header=True, inferSchema=True)
products = spark.read.csv("/home/interview/products.csv", header=True, inferSchema=True)
Each CSV is loaded as its own DataFrame. inferSchema=True tells Spark to detect column types automatically rather than treating everything as strings.
Step 2: Joining the DataFrames
joined_df = orders.join(customers, on="customer_id").join(products, on="product_id")
The join() method works like SQL joins. When you pass on="customer_id", Spark matches rows where both DataFrames share the same customer_id value. The default join type is inner, which keeps only rows that have a match in both tables. You can chain joins to bring in a third table.
Step 3: Concatenating Columns
F.concat_ws(" ", "first_name", "last_name").alias("customer_name")
concat_ws (concatenate with separator) joins multiple columns with a delimiter. The first argument is the separator, followed by the column names. .alias("customer_name") renames the result. This is equivalent to SQL's CONCAT_WS(' ', first_name, last_name) AS customer_name. An alternative is F.concat() which takes no separator, so you'd need to add the space manually with F.lit(" ").
Step 4: Renaming Columns
F.col("email").alias("customer_email")
F.col("category").alias("product_category")
Inside select(), F.col("email").alias("customer_email") picks the email column and renames it to customer_email. You could also use withColumnRenamed("email", "customer_email") as a separate step, but doing it inside select() is cleaner when you're already choosing which columns to keep.
Step 5: select() as a Projection
result_df = joined_df.select(
"order_id",
F.concat_ws(...).alias(...),
F.col("email").alias("customer_email"),
"product_name",
F.col("category").alias("product_category"),
"order_date"
)
select() serves two purposes: it picks which columns to keep (like SQL's SELECT) and lets you transform or rename columns in the same step. Columns you don't list are dropped. This is the standard way to control the output schema in PySpark.