48 lines
1.9 KiB
Python
48 lines
1.9 KiB
Python
# HW3 - Task 3
|
|
# Nicholas Pease
|
|
# This takes the output from Task 2 (Page Rank results),
|
|
# loads it into a Spark DataFrame, performs SQL queries to
|
|
# extract specific information, joins with another input file with labels,
|
|
# and saves the final results to a CSV file.
|
|
|
|
from pyspark import SparkConf, SparkContext
|
|
from pyspark.sql import SparkSession
|
|
|
|
def main():
|
|
conf = SparkConf().setAppName("Task3").setMaster("local[*]")
|
|
sc = SparkContext(conf=conf)
|
|
spark = SparkSession.builder.getOrCreate()
|
|
|
|
# Read the output file from Task 2
|
|
t2_out = sc.textFile("t2-out.txt")
|
|
|
|
# a. Create dataframe from RDD (id, page_rank)
|
|
page_rank_rdd = t2_out.map(lambda line: line.strip().split()).map(lambda x: (int(x[0]), float(x[1])))
|
|
page_rank_df = spark.createDataFrame(page_rank_rdd, ["id", "page_rank"])
|
|
|
|
# b. Spark SQL query to find page rank of vertex 2
|
|
page_rank_df.createOrReplaceTempView("page_rank")
|
|
result = spark.sql("SELECT page_rank FROM page_rank WHERE id = 2")
|
|
print("Page Rank of vertex 2:")
|
|
result.show()
|
|
|
|
# c. Find vertex with highest page rank
|
|
result = spark.sql("SELECT id, page_rank FROM page_rank ORDER BY page_rank DESC LIMIT 1")
|
|
print("Vertex with highest Page Rank:")
|
|
result.show()
|
|
|
|
# d. Create dataframe from t3-in (id, name)
|
|
t3_in = sc.textFile("t3-in.txt")
|
|
names_rdd = t3_in.map(lambda line: line.strip().split()).map(lambda x: (int(x[0]), x[1]))
|
|
names_df = spark.createDataFrame(names_rdd, ["id", "name"])
|
|
names_df.createOrReplaceTempView("names")
|
|
|
|
# e. Join and print dataframes on id, save as CSV (t3-out.csv)
|
|
result = spark.sql("SELECT n.id, n.name, p.page_rank FROM names n JOIN page_rank p ON n.id = p.id")
|
|
print("Joined DataFrame (id, name, page_rank):")
|
|
result.show()
|
|
with open("t3-out.csv", "w") as f:
|
|
for row in result.collect():
|
|
f.write(f"{row['id']} {row['name']} {row['page_rank']}\n")
|
|
|
|
main() |