Files
2025-12-18 17:51:40 -05:00

87 lines
3.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# HW3 - Task 2
# Nicholas Pease
# This loads a directed graph from an input file,
# computes the Page Rank for each vertex using Spark,
# and saves the results to an output file.
# This follows the same structure as the assignment description dictates.
from pyspark import SparkConf, SparkContext
def main():
conf = SparkConf().setAppName("Task2").setMaster("local[*]")
sc = SparkContext(conf=conf)
# Read the input file
data = sc.textFile("t2-in.txt")
# Parse edges and create initial graph structure
edges = data.map(lambda line: line.strip().split()).map(lambda x: (int(x[0]), int(x[1])))
all_vertices = edges.flatMap(lambda x: [x[0], x[1]]).distinct().collect()
all_vertices_set = set(all_vertices)
num_vertices = len(all_vertices_set)
# Create adjacency list: (vertex, [list of neighbors])
adjacency_list = edges.groupByKey().mapValues(list).collectAsMap()
# Step 0: Handle vertices with no outgoing edges
# Add edges from nodes with no outgoing edges to all other nodes
vertices_with_outgoing = set(adjacency_list.keys())
vertices_without_outgoing = all_vertices_set - vertices_with_outgoing
# Add all other vertices as neighbors for vertices without outgoing edges
for vertex in vertices_without_outgoing:
adjacency_list[vertex] = [v for v in all_vertices if v != vertex]
# Ensure all vertices have an entry in adjacency list
for vertex in all_vertices:
if vertex not in adjacency_list:
adjacency_list[vertex] = []
# Convert to RDD format: (vertex, (neighbors, current_rank))
# Step 1: Initialize page rank of every node as 1
graph_rdd = sc.parallelize([(vertex, (neighbors, 1.0)) for vertex, neighbors in adjacency_list.items()])
for iteration in range(10):
# Step 2: Each vertex contributes rank(v)/|neighbors(v)| to its neighbors
contributions = graph_rdd.flatMap(lambda vertex_data:
[(neighbor, vertex_data[1][1] / len(vertex_data[1][0]))
for neighbor in vertex_data[1][0]] if len(vertex_data[1][0]) > 0 else []
)
# Sum contributions for each vertex
summed_contributions = contributions.reduceByKey(lambda a, b: a + b)
# Convert to dictionary
contributions_dict = summed_contributions.collectAsMap()
# Step 3: Update ranks: 0.15 + 0.85 × (contributions)
# Join with original graph to maintain adjacency lists and update ranks
def update_rank(vertex_data):
vertex, (neighbors, old_rank) = vertex_data
contribution = contributions_dict.get(vertex, 0.0)
new_rank = 0.15 + 0.85 * contribution
return (vertex, (neighbors, new_rank))
graph_rdd = graph_rdd.map(update_rank)
# Step 5: Divide by total number of vertices
final_graph_rdd = graph_rdd.map(lambda vertex_data:
(vertex_data[0], vertex_data[1][1] / num_vertices)
)
# Collect final results + save
final_ranks = final_graph_rdd.collect()
final_ranks.sort(key=lambda x: x[0])
print(f"\nFinal Page Rank results:")
for vertex, rank in final_ranks:
print(f"{vertex} {rank}")
output_rdd = sc.parallelize(final_ranks).map(lambda x: f"{x[0]} {x[1]}")
with open("t2-out.txt", "w") as f:
for vertex, rank in final_ranks:
f.write(f"{vertex} {rank}\n")
sc.stop()
main()