Objective
An accounting firm handles transactional data from various clients. The firm receives the data in two separate DataFrames: df_transactions contains the financial events, and df_clients contains the client metadata.
Task
It is observed that there are a number of duplicated and incorrect primary keys in both DataFrames, as well as improperly formatted dates. Write a PySpark function that combines the DataFrames while strictly enforcing the following data quality rules:
A valid TransactionID in df_transactions is an integer value greater than 0, and each TransactionID should be unique. Any row with an invalid or duplicate TransactionID should be dropped.
A valid ClientID in both DataFrames is an integer value greater than 0, and each ClientID in df_clients should be unique. Any row with an invalid or duplicate ClientID in df_clients should be dropped.
df_transactions should only include rows with ClientIDs that exist in the cleaned df_clients DataFrame.
The Date column should be strictly in the format yyyy-mm-dd. Rows with an invalid string format (like 2023-7-1 or 05/12/2023) must be dropped.
Save your resulting DataFrame as result_df. Ensure the output matches the exact schema order requested, cast the Amount column to an Integer, and order the final output by TransactionID in ascending order.
File Path
- Transactions Dataset:
/home/interview/df_transactions.csv
- Clients Dataset:
/home/interview/df_clients.csv
- Starter script:
/home/interview/clean_transactions.py
Schema
df_transactions.csv
| Column Name |
Data Type |
| TransactionID |
Integer |
| ClientID |
Integer |
| Date |
String |
| Amount |
Float |
df_clients.csv
| Column Name |
Data Type |
| ClientID |
Integer |
| ClientName |
String |
| Industry |
String |
Expected Output Schema
| Column Name |
Data Type |
| TransactionID |
Integer |
| ClientID |
Integer |
| Date |
String |
| Amount |
Integer |
| ClientName |
String |
| Industry |
String |
Example
Given this sample input:
df_transactions
| TransactionID |
ClientID |
Date |
Amount |
| 1 |
1 |
2023-07-01 |
100.0 |
| 2 |
1 |
2023-07-02 |
150.0 |
| 2 |
2 |
2023-07-01 |
200.0 |
| 3 |
3 |
2023-07-03 |
250.0 |
| -4 |
4 |
2023-07-04 |
300.0 |
| 5 |
2 |
2023-25-01 |
350.0 |
| 6 |
1 |
2023-07-02 |
400.0 |
| 7 |
6 |
2023-07-01 |
450.0 |
| 8 |
7 |
2023-07-03 |
500.0 |
| 9 |
-8 |
2023-07-04 |
550.0 |
df_clients
| ClientID |
ClientName |
Industry |
| 1 |
Client1 |
Tech |
| 2 |
Client2 |
Finance |
| 3 |
Client3 |
Real Estate |
| 4 |
Client4 |
Healthcare |
| 5 |
Client5 |
Tech |
| 1 |
Client6 |
Finance |
| 6 |
Client7 |
Real Estate |
| -7 |
Client8 |
Healthcare |
| 8 |
Client9 |
Tech |
| 2 |
Client10 |
Finance |
The expected output would be:
| TransactionID |
ClientID |
Date |
Amount |
ClientName |
Industry |
| 1 |
1 |
2023-07-01 |
100 |
Client1 |
Tech |
| 2 |
1 |
2023-07-02 |
150 |
Client1 |
Tech |
| 3 |
3 |
2023-07-03 |
250 |
Client3 |
Real Estate |
| 5 |
2 |
2023-25-01 |
350 |
Client2 |
Finance |
| 6 |
1 |
2023-07-02 |
400 |
Client1 |
Tech |
| 7 |
6 |
2023-07-01 |
450 |
Client7 |
Real Estate |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
df_transactions = spark.read.csv("/home/interview/df_transactions.csv", header=True, inferSchema=True)
df_clients = spark.read.csv("/home/interview/df_clients.csv", header=True, inferSchema=True)
# Step 1 & 2 & 3: Clean the transactions DataFrame (filter IDs, validate date via regex, drop dupes)
clean_transactions = df_transactions.filter(
(F.col("TransactionID") > 0) &
(F.col("ClientID") > 0) &
(F.col("Date").rlike(r"^\d{4}-\d{2}-\d{2}$"))
).dropDuplicates(["TransactionID"])
# Step 4: Clean the clients DataFrame (filter IDs, drop dupes)
clean_clients = df_clients.filter(
F.col("ClientID") > 0
).dropDuplicates(["ClientID"])
# Step 5: Join the cleaned datasets on ClientID
joined_df = clean_transactions.join(clean_clients, on="ClientID", how="inner")
# Step 6: Cast Amount to integer, order schema, and sort records
result_df = joined_df.withColumn("Amount", F.col("Amount").cast("integer")) \
.select("TransactionID", "ClientID", "Date", "Amount", "ClientName", "Industry") \
.orderBy("TransactionID")
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Filtering Invalid Primary Keys
Before looking for duplicates, we need to eliminate garbage data. The prompt specifies that valid TransactionID and ClientID values must be integers greater than 0. We chain multiple conditions inside a .filter() statement using the & (AND) operator: (F.col("TransactionID") > 0) & (F.col("ClientID") > 0).
Step 2: Validating Dates with Regex
The Date column needs to be explicitly in the yyyy-mm-dd format. While PySpark has date parsing functions, they can sometimes be overly permissive. By using F.col("Date").rlike(r"^\d{4}-\d{2}-\d{2}$"), we use a Regular Expression to strictly enforce the shape of the string: exactly 4 digits, a hyphen, 2 digits, a hyphen, and 2 digits.
Step 3: Removing Duplicates
Once the invalid rows are filtered out, we eliminate duplicates using .dropDuplicates(). By explicitly passing the primary key array (["TransactionID"] for transactions, and ["ClientID"] for clients), PySpark will scan only those columns and keep the first occurrence of the ID, safely dropping any subsequent rows that try to reuse the same primary key.
Step 4: Merging the Cleaned Data
The instruction states: "df_transactions DataFrame should only include rows with ClientIDs that exist in df_clients." This is the textbook definition of an inner join. By joining clean_transactions with clean_clients on ClientID, any transaction pointing to an unrecognized client is naturally dropped.
Step 5: Formatting, Casting, and Sorting
result_df = joined_df.withColumn("Amount", F.col("Amount").cast("integer")) \
.select("TransactionID", "ClientID", "Date", "Amount", "ClientName", "Industry") \
.orderBy("TransactionID")
Finally, the Amount column originates as a Float, but the Expected Output Schema requires an Integer. We use .withColumn("Amount", F.col("Amount").cast("integer")) to safely convert the data type. We chain a .select() statement to ensure the columns are arranged in the precise order requested, and finish by .orderBy("TransactionID") to deterministically sort the output.