Objective
As a Data Analyst at a major airline, you are presented with three DataFrames. The first, flights, contains the flight data including the flight_id, origin_airport, and destination_airport. The second, airports, consists of details of each airport including airport_id and airport_name. The third, planes, contains details about planes, including plane_id and plane_model.
Task
Write a PySpark function that combines these DataFrames to calculate the length of the string names for the origin airport, destination airport, and plane model.
In the output:
- The
flight_id column corresponds to the id of each flight in the flights DataFrame.
origin_airport_name_length and destination_airport_name_length columns represent the lengths of the names of the origin and destination airports respectively.
plane_model_length is the length of the plane model's name.
Please note that the origin_airport and destination_airport columns in the flights DataFrame correspond to the airport_id in the airports DataFrame. The flight_id in the flights DataFrame corresponds to the plane_id in the planes DataFrame.
Remember that the lengths of the strings should be calculated after removing leading and trailing whitespaces. Save your resulting DataFrame as result_df and order the output by flight_id in ascending order.
File Path
- Flights Dataset:
/home/interview/flights.csv
- Airports Dataset:
/home/interview/airports.csv
- Planes Dataset:
/home/interview/planes.csv
- Starter script:
/home/interview/busy_airline.py
Schema
flights.csv
| Column |
Type |
| flight_id |
int |
| origin_airport |
string |
| destination_airport |
string |
airports.csv
| Column |
Type |
| airport_id |
string |
| airport_name |
string |
planes.csv
| Column |
Type |
| plane_id |
int |
| plane_model |
string |
Expected Output Schema
| Column |
Type |
| flight_id |
int |
| origin_airport_name_length |
int |
| destination_airport_name_length |
int |
| plane_model_length |
int |
Example
Given this sample input:
flights
| flight_id |
origin_airport |
destination_airport |
| 1 |
A1 |
B1 |
| 2 |
A2 |
B2 |
| 3 |
A3 |
B3 |
airports
| airport_id |
airport_name |
| A1 |
San Francisco |
| B1 |
Los Angeles |
| A2 |
New York |
| B2 |
Boston |
| A3 |
Miami |
| B3 |
Orlando |
planes
| plane_id |
plane_model |
| 1 |
Airbus A320 |
| 2 |
Boeing 737 |
| 3 |
Airbus A380 |
The expected output would be:
| flight_id |
origin_airport_name_length |
destination_airport_name_length |
plane_model_length |
| 1 |
13 |
11 |
11 |
| 2 |
8 |
6 |
10 |
| 3 |
5 |
7 |
11 |
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
flights = spark.read.csv("/home/interview/flights.csv", header=True, inferSchema=True)
airports = spark.read.csv("/home/interview/airports.csv", header=True, inferSchema=True)
planes = spark.read.csv("/home/interview/planes.csv", header=True, inferSchema=True)
# Alias the airports DataFrame so we can join it twice without column conflicts
orig_airports = airports.alias("orig")
dest_airports = airports.alias("dest")
# Chain the joins: planes, then origin airports, then destination airports
joined_df = flights.join(planes, flights.flight_id == planes.plane_id, how="inner") \
.join(orig_airports, F.col("origin_airport") == F.col("orig.airport_id"), how="inner") \
.join(dest_airports, F.col("destination_airport") == F.col("dest.airport_id"), how="inner")
# Calculate lengths with trim, format the schema, and order by flight_id
result_df = joined_df.select(
"flight_id",
F.length(F.trim(F.col("orig.airport_name"))).alias("origin_airport_name_length"),
F.length(F.trim(F.col("dest.airport_name"))).alias("destination_airport_name_length"),
F.length(F.trim(F.col("plane_model"))).alias("plane_model_length")
).orderBy("flight_id")
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Joining the Same Table Twice
Python
orig_airports = airports.alias("orig")
dest_airports = airports.alias("dest")
joined_df = flights.join(planes, flights.flight_id == planes.plane_id, how="inner") \
.join(orig_airports, F.col("origin_airport") == F.col("orig.airport_id"), how="inner") \
.join(dest_airports, F.col("destination_airport") == F.col("dest.airport_id"), how="inner")
A common scenario in relational data is needing to look up two different reference values from the exact same table. Because a single flight has two different airports (origin and destination), we must join the airports table to the flights table twice. To prevent "ambiguous column" errors, we create two distinct aliases ("orig" and "dest") of the airports DataFrame before performing the joins.
Step 2: Trimming and Calculating String Lengths
Python
F.length(F.trim(F.col("orig.airport_name"))).alias("origin_airport_name_length")
The prompt specifically requires us to calculate string lengths after removing any leading or trailing whitespaces. We achieve this by nesting PySpark functions. First, F.trim() strips away the unwanted spaces. Then, we immediately wrap that result inside F.length() to count the remaining characters. Finally, we use .alias() to map it perfectly to the Expected Output Schema.
Step 3: Final Formatting
Python
result_df = joined_df.select(
"flight_id",
F.length(F.trim(F.col("orig.airport_name"))).alias("origin_airport_name_length"),
F.length(F.trim(F.col("dest.airport_name"))).alias("destination_airport_name_length"),
F.length(F.trim(F.col("plane_model"))).alias("plane_model_length")
).orderBy("flight_id")
By performing all of the F.length() transformations directly inside a .select() block, we automatically drop the excess columns like plane_id and the raw airport_name strings, leaving us with a clean, streamlined output. We then chain .orderBy("flight_id") to finish the job.