Files
2025-12-18 17:51:40 -05:00

48 lines
1.9 KiB
Python

# HW3 - Task 3
# Nicholas Pease
# This takes the output from Task 2 (Page Rank results),
# loads it into a Spark DataFrame, performs SQL queries to
# extract specific information, joins with another input file with labels,
# and saves the final results to a CSV file.
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
def main():
conf = SparkConf().setAppName("Task3").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
# Read the output file from Task 2
t2_out = sc.textFile("t2-out.txt")
# a. Create dataframe from RDD (id, page_rank)
page_rank_rdd = t2_out.map(lambda line: line.strip().split()).map(lambda x: (int(x[0]), float(x[1])))
page_rank_df = spark.createDataFrame(page_rank_rdd, ["id", "page_rank"])
# b. Spark SQL query to find page rank of vertex 2
page_rank_df.createOrReplaceTempView("page_rank")
result = spark.sql("SELECT page_rank FROM page_rank WHERE id = 2")
print("Page Rank of vertex 2:")
result.show()
# c. Find vertex with highest page rank
result = spark.sql("SELECT id, page_rank FROM page_rank ORDER BY page_rank DESC LIMIT 1")
print("Vertex with highest Page Rank:")
result.show()
# d. Create dataframe from t3-in (id, name)
t3_in = sc.textFile("t3-in.txt")
names_rdd = t3_in.map(lambda line: line.strip().split()).map(lambda x: (int(x[0]), x[1]))
names_df = spark.createDataFrame(names_rdd, ["id", "name"])
names_df.createOrReplaceTempView("names")
# e. Join and print dataframes on id, save as CSV (t3-out.csv)
result = spark.sql("SELECT n.id, n.name, p.page_rank FROM names n JOIN page_rank p ON n.id = p.id")
print("Joined DataFrame (id, name, page_rank):")
result.show()
with open("t3-out.csv", "w") as f:
for row in result.collect():
f.write(f"{row['id']} {row['name']} {row['page_rank']}\n")
main()