Objective
You are working with a dataset of research paper co-authorships where each row links an author to the paper they contributed to.
Task
Assign a sequential row number to each author within their paper group, ordered by author_id. The row number should reset to 1 for each new paper_id. Use a window function partitioned by paper_id and ordered by author_id to compute the numbering. The output should contain paper_id, author_id, name, and row_number. Save your result as result_df.
File Path
- Authors:
/home/interview/authors.csv
- Starter script:
/home/interview/author_numbering.py
Schema
authors.csv
| Column |
Type |
| paper_id |
string |
| author_id |
string |
| name |
string |
Expected output schema
| Column |
Type |
| paper_id |
string |
| author_id |
string |
| name |
string |
| row_number |
integer |
Example
Given this sample input:
authors
| paper_id |
author_id |
name |
| P1 |
A3 |
Carol Davis |
| P1 |
A1 |
Alice Chen |
| P1 |
A2 |
Bob Martinez |
| P2 |
A5 |
Eva Patel |
| P2 |
A4 |
David Kim |
The output would be:
| paper_id |
author_id |
name |
row_number |
| P1 |
A1 |
Alice Chen |
1 |
| P1 |
A2 |
Bob Martinez |
2 |
| P1 |
A3 |
Carol Davis |
3 |
| P2 |
A4 |
David Kim |
1 |
| P2 |
A5 |
Eva Patel |
2 |
Within paper P1, authors are ordered by author_id (A1, A2, A3) and assigned row numbers 1 through 3. The numbering restarts at 1 for paper P2.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window as W
spark = SparkSession.builder.appName("PrepareshSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
authors = spark.read.csv("/home/interview/authors.csv", header=True, inferSchema=True)
window_spec = W.partitionBy("paper_id").orderBy("author_id")
result_df = authors.withColumn("row_number", F.row_number().over(window_spec))
# --- Do not edit below this line ---
result_df.coalesce(1).write.csv("/home/interview/output", header=True, mode="overwrite")
spark.stop()
Explanation
Step 1: Loading the Data
authors = spark.read.csv("/home/interview/authors.csv", header=True, inferSchema=True)
This reads the CSV file into a DataFrame. The header=True flag tells Spark to use the first row as column names, and inferSchema=True lets Spark detect data types automatically. Since paper_id and author_id are alphanumeric (like P1, A3), they will be inferred as strings.
Step 2: Defining the Window Specification
window_spec = W.partitionBy("paper_id").orderBy("author_id")
A window specification controls how rows are grouped and ordered for window functions. partitionBy("paper_id") splits the data into independent groups, one per paper. orderBy("author_id") sorts authors within each partition by their ID. This means the row numbering will be computed separately for each paper, with authors ordered by author_id.
Step 3: Applying row_number()
result_df = authors.withColumn("row_number", F.row_number().over(window_spec))
row_number() assigns a unique sequential integer starting from 1 within each partition. The .over(window_spec) ties it to the window definition from Step 2. Using withColumn adds the new row_number column to the existing DataFrame, preserving all original columns (paper_id, author_id, name).
Step 4: Understanding row_number vs rank vs dense_rank
All three are window functions that assign numbers based on ordering, but they handle ties differently. row_number() always produces unique sequential integers -- if two rows have the same order key, one still gets a higher number (the tie-breaking is non-deterministic). rank() gives tied rows the same number but leaves gaps (1, 1, 3). dense_rank() gives tied rows the same number with no gaps (1, 1, 2). In this problem, author_id values are unique within each paper, so there are no ties and all three would produce identical results. However, row_number() is the most natural choice when you want simple sequential numbering.