87 lines
3.4 KiB
Python
87 lines
3.4 KiB
Python
# HW3 - Task 2
|
||
# Nicholas Pease
|
||
# This loads a directed graph from an input file,
|
||
# computes the Page Rank for each vertex using Spark,
|
||
# and saves the results to an output file.
|
||
# This follows the same structure as the assignment description dictates.
|
||
|
||
from pyspark import SparkConf, SparkContext
|
||
|
||
def main():
|
||
conf = SparkConf().setAppName("Task2").setMaster("local[*]")
|
||
sc = SparkContext(conf=conf)
|
||
|
||
# Read the input file
|
||
data = sc.textFile("t2-in.txt")
|
||
|
||
# Parse edges and create initial graph structure
|
||
edges = data.map(lambda line: line.strip().split()).map(lambda x: (int(x[0]), int(x[1])))
|
||
all_vertices = edges.flatMap(lambda x: [x[0], x[1]]).distinct().collect()
|
||
all_vertices_set = set(all_vertices)
|
||
num_vertices = len(all_vertices_set)
|
||
|
||
# Create adjacency list: (vertex, [list of neighbors])
|
||
adjacency_list = edges.groupByKey().mapValues(list).collectAsMap()
|
||
|
||
# Step 0: Handle vertices with no outgoing edges
|
||
# Add edges from nodes with no outgoing edges to all other nodes
|
||
vertices_with_outgoing = set(adjacency_list.keys())
|
||
vertices_without_outgoing = all_vertices_set - vertices_with_outgoing
|
||
|
||
# Add all other vertices as neighbors for vertices without outgoing edges
|
||
for vertex in vertices_without_outgoing:
|
||
adjacency_list[vertex] = [v for v in all_vertices if v != vertex]
|
||
|
||
# Ensure all vertices have an entry in adjacency list
|
||
for vertex in all_vertices:
|
||
if vertex not in adjacency_list:
|
||
adjacency_list[vertex] = []
|
||
|
||
# Convert to RDD format: (vertex, (neighbors, current_rank))
|
||
# Step 1: Initialize page rank of every node as 1
|
||
graph_rdd = sc.parallelize([(vertex, (neighbors, 1.0)) for vertex, neighbors in adjacency_list.items()])
|
||
|
||
for iteration in range(10):
|
||
# Step 2: Each vertex contributes rank(v)/|neighbors(v)| to its neighbors
|
||
contributions = graph_rdd.flatMap(lambda vertex_data:
|
||
[(neighbor, vertex_data[1][1] / len(vertex_data[1][0]))
|
||
for neighbor in vertex_data[1][0]] if len(vertex_data[1][0]) > 0 else []
|
||
)
|
||
|
||
# Sum contributions for each vertex
|
||
summed_contributions = contributions.reduceByKey(lambda a, b: a + b)
|
||
|
||
# Convert to dictionary
|
||
contributions_dict = summed_contributions.collectAsMap()
|
||
|
||
# Step 3: Update ranks: 0.15 + 0.85 × (contributions)
|
||
# Join with original graph to maintain adjacency lists and update ranks
|
||
def update_rank(vertex_data):
|
||
vertex, (neighbors, old_rank) = vertex_data
|
||
contribution = contributions_dict.get(vertex, 0.0)
|
||
new_rank = 0.15 + 0.85 * contribution
|
||
return (vertex, (neighbors, new_rank))
|
||
|
||
graph_rdd = graph_rdd.map(update_rank)
|
||
|
||
# Step 5: Divide by total number of vertices
|
||
final_graph_rdd = graph_rdd.map(lambda vertex_data:
|
||
(vertex_data[0], vertex_data[1][1] / num_vertices)
|
||
)
|
||
|
||
# Collect final results + save
|
||
final_ranks = final_graph_rdd.collect()
|
||
final_ranks.sort(key=lambda x: x[0])
|
||
|
||
print(f"\nFinal Page Rank results:")
|
||
for vertex, rank in final_ranks:
|
||
print(f"{vertex} {rank}")
|
||
|
||
output_rdd = sc.parallelize(final_ranks).map(lambda x: f"{x[0]} {x[1]}")
|
||
with open("t2-out.txt", "w") as f:
|
||
for vertex, rank in final_ranks:
|
||
f.write(f"{vertex} {rank}\n")
|
||
|
||
sc.stop()
|
||
|
||
main() |