This commit is contained in:
2025-12-18 03:31:49 +00:00
parent bf1d2e22f5
commit 68df013917
9 changed files with 419 additions and 468 deletions
-14
View File
@@ -1,14 +0,0 @@
# [Choice] Python version: 3, 3.8, 3.7, 3.6
ARG PYTHON_VARIANT=3.9
FROM python:${PYTHON_VARIANT}
# Install python dependencies
COPY requirements.txt /tmp/pip-tmp/
RUN pip3 --disable-pip-version-check --no-cache-dir \
install -r /tmp/pip-tmp/requirements.txt && rm -rf /tmp/pip-tmp
# Install OpenJDK
ARG OPENJDK_VARIANT=11
RUN apt-get update && \
apt-get install -y openjdk-21-jdk && \
apt-get clean
-32
View File
@@ -1,32 +0,0 @@
{
"name": "Python 3 & Apache Spark",
"dockerComposeFile": "docker-compose.yml",
"service": "app",
"workspaceFolder": "/workspace",
"settings": {
"terminal.integrated.profiles.linux": {
"bash": {
"path": "/bin/bash",
"icon": "terminal-bash"
}
},
"python.pythonPath": "/usr/local/bin/python",
"python.languageServer": "Pylance",
"python.linting.enabled": true,
"python.linting.pylintEnabled": true,
"python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8",
"python.formatting.blackPath": "/usr/local/py-utils/bin/black",
"python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf",
"python.linting.banditPath": "/usr/local/py-utils/bin/bandit",
"python.linting.flake8Path": "/usr/local/py-utils/bin/flake8",
"python.linting.mypyPath": "/usr/local/py-utils/bin/mypy",
"python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle",
"python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle",
"python.linting.pylintPath": "/usr/local/py-utils/bin/pylint",
"python.testing.pytestPath": "/usr/local/py-utils/bin/pytest"
},
"extensions": [
"ms-python.python",
"ms-python.vscode-pylance"
]
}
-20
View File
@@ -1,20 +0,0 @@
version: '3'
services:
app:
build:
context: ..
dockerfile: .devcontainer/Dockerfile
args:
PYTHON_VARIANT: 3.9
JAVA_VARIANT: 11-bullseye
volumes:
- ..:/workspace:cached
command: sleep infinity
pyspark:
image: jupyter/pyspark-notebook:spark-3.1.2
environment:
- JUPYTER_ENABLE_LAB=yes
ports:
- 8889:8888
View File
+4
View File
@@ -0,0 +1,4 @@
0 0.19017201099717568
1 0.21116445874870277
2 0.27099244775152703
3 0.3276710825025942
+4
View File
@@ -0,0 +1,4 @@
0 Adam
1 Lisa
2 Bert
3 Ralph
+299 -392
View File
@@ -10,436 +10,344 @@ import os
from typing import Dict, List, Optional
from collections import defaultdict
class IMDBMongoDBLoader:
"""
IMDB Data Loader for MongoDB
MongoDB Schema Design for Fast Access by Movie Name:
Database: imdb_database
Collection: movies (single collection containing all data)
Shard Key: movie_name (for O(1) lookup by movie name)
Document Structure:
{
"_id": ObjectId,
"movie_name": "The Godfather", # SHARD KEY - optimized for fast access
"movie_id": 123,
"year": 1972,
"rank": 9.2,
"directors": [
{
"director_id": 456,
"name": "Francis Ford Coppola"
}
],
"cast": [
{
"person_id": 789,
"name": "Marlon Brando",
"gender": "M",
"role": "[Don Vito Corleone]"
}
]
}
"""
def __init__(self, connection_string: str = "mongodb://localhost:27017/",
database_name: str = "imdb_database"):
"""
Initialize MongoDB connection
Args:
connection_string: MongoDB connection string
database_name: Name of the database to use
"""
self.connection_string = connection_string
self.database_name = database_name
self.client = None
self.db = None
self.movies_collection = None
# In-memory data structures for processing
self.movies = {}
self.persons = {}
self.directors = {}
def connect(self) -> bool:
"""Connect to MongoDB and set up the database and collection"""
try:
self.client = MongoClient(self.connection_string, serverSelectionTimeoutMS=5000)
# Test connection
self.client.admin.command('ping')
self.db = self.client[self.database_name]
self.movies_collection = self.db.movies
print(f"Connected to MongoDB database: {self.database_name}")
return True
except ConnectionFailure as e:
print(f"Failed to connect to MongoDB: {e}")
return False
except Exception as e:
print(f"MongoDB connection error: {e}")
return False
def disconnect(self):
"""Close MongoDB connection"""
if self.client:
self.client.close()
def load_movies_data(self, file_path: str, sample_size: Optional[int] = None) -> bool:
"""
Load movie data from IMDBMovie.txt
Handles CSV parsing issues with embedded commas in movie names
"""
try:
movies_loaded = 0
with open(file_path, 'r', encoding='latin-1') as file:
# Skip header
next(file)
for line in file:
if sample_size and movies_loaded >= sample_size:
break
line = line.strip()
if not line:
continue
try:
# Parse CSV line manually - handle embedded commas in movie names
parts = line.split(',')
# First part is always movie ID
movie_id = int(parts[0])
# Last two parts are year and rank (rank might be empty)
year_str = parts[-2] if len(parts) >= 3 else ''
rank_str = parts[-1] if len(parts) >= 4 else ''
# Everything in between is the movie name
movie_name = ','.join(parts[1:-2]) if len(parts) > 3 else parts[1] if len(parts) > 1 else ''
# Parse year and rank
year = int(year_str) if year_str and year_str.strip() else None
rank = float(rank_str) if rank_str and rank_str.strip() else None
self.movies[movie_id] = {
'movie_id': movie_id,
'movie_name': movie_name,
'year': year,
'rank': rank,
'directors': [],
'cast': []
}
movies_loaded += 1
except (ValueError, IndexError):
# Skip malformed lines
continue
print(f"Loaded {len(self.movies)} movies")
return True
except Exception as e:
print(f"Error loading movies: {e}")
return False
def load_persons_data(self, file_path: str) -> bool:
"""Load person data from IMDBPerson.txt"""
try:
# Try different encodings to handle international characters
persons_df = pd.read_csv(
file_path,
encoding='latin-1',
dtype={
'id': 'int32',
'fname': 'string',
'lname': 'string',
'gender': 'string'
}
)
for _, row in persons_df.iterrows():
person_id = int(row['id'])
full_name = f"{row['fname']} {row['lname']}".strip()
self.persons[person_id] = {
'person_id': person_id,
'name': full_name,
'gender': str(row['gender'])
}
print(f"Loaded {len(self.persons)} persons")
return True
except Exception as e:
print(f"Error loading persons: {e}")
return False
def load_directors_data(self, file_path: str) -> bool:
"""Load director data from IMDBDirectors.txt"""
try:
# Try different encodings to handle international characters
directors_df = pd.read_csv(
file_path,
encoding="latin-1",
dtype={
'id': 'int32',
'fname': 'string',
'lname': 'string'
}
)
for _, row in directors_df.iterrows():
director_id = int(row['id'])
full_name = f"{row['fname']} {row['lname']}".strip()
self.directors[director_id] = {
'director_id': director_id,
'name': full_name
}
print(f"Loaded {len(self.directors)} directors")
return True
except Exception as e:
print(f"Error loading directors: {e}")
return False
def load_movie_directors_data(self, file_path: str) -> bool:
"""Load movie-director relationships from IMDBMovie_Directors.txt"""
try:
movie_directors_df = pd.read_csv(
file_path,
encoding='latin-1',
dtype={'did': 'int32', 'mid': 'int32'}
)
linked_count = 0
for _, row in movie_directors_df.iterrows():
director_id = int(row['did'])
movie_id = int(row['mid'])
if movie_id in self.movies and director_id in self.directors:
director_info = {
'director_id': director_id,
'name': self.directors[director_id]['name']
}
self.movies[movie_id]['directors'].append(director_info)
linked_count += 1
return True
except Exception as e:
print(f"Error loading movie-director relationships: {e}")
return False
def load_cast_data(self, file_path: str, max_cast_per_movie: int = 25) -> bool:
"""Load cast data from IMDBCast.txt (large file - process efficiently)"""
try:
if not os.path.exists(file_path):
return True
movie_cast_count = defaultdict(int)
linked_count = 0
# Read line by line to handle CSV parsing issues
with open(file_path, 'r', encoding='latin-1') as file:
# Skip header
next(file)
for line_num, line in enumerate(file):
if line_num > 100000: # Limit for efficiency
break
line = line.strip()
if not line:
continue
try:
# Parse CSV line manually - handle embedded commas in roles
parts = line.split(',')
if len(parts) < 3:
continue
person_id = int(parts[0])
movie_id = int(parts[1])
# Role is everything after the second comma
role = ','.join(parts[2:]) if len(parts) > 2 else "[Unknown]"
# Limit cast per movie to prevent overly large documents
if (movie_id in self.movies and person_id in self.persons and
movie_cast_count[movie_id] < max_cast_per_movie):
cast_info = {
'person_id': person_id,
'name': self.persons[person_id]['name'],
'gender': self.persons[person_id]['gender'],
'role': role
}
self.movies[movie_id]['cast'].append(cast_info)
movie_cast_count[movie_id] += 1
linked_count += 1
except (ValueError, KeyError):
continue
if linked_count > 0:
print(f"Loaded {linked_count} cast entries")
return True
except Exception as e:
return True
def create_indexes(self):
"""Create indexes for optimal query performance"""
try:
# Primary index on movie_name (shard key)
self.movies_collection.create_index("movie_name", unique=True, name="idx_movie_name")
# Secondary indexes for common query patterns
self.movies_collection.create_index("year", name="idx_year")
self.movies_collection.create_index("rank", name="idx_rank")
self.movies_collection.create_index("directors.name", name="idx_director_name")
except Exception as e:
pass
def insert_movies_to_mongodb(self, batch_size: int = 2000) -> bool:
"""Insert all loaded movie data into MongoDB"""
try:
# Clear existing collection
self.movies_collection.drop()
# Prepare documents for batch insertion
documents = []
for movie_id, movie_data in self.movies.items():
document = {
'movie_id': movie_data['movie_id'],
'movie_name': movie_data['movie_name'], # SHARD KEY
'year': movie_data['year'],
'rank': movie_data['rank'],
'directors': movie_data['directors'],
'cast': movie_data['cast']
}
documents.append(document)
# Insert in batches for efficiency
if len(documents) >= batch_size:
self.movies_collection.insert_many(documents, ordered=False)
documents = []
# Insert remaining documents
if documents:
self.movies_collection.insert_many(documents, ordered=False)
# Create indexes for optimal querying
self.create_indexes()
total_count = self.movies_collection.count_documents({})
print(f"Inserted {total_count} movies into MongoDB")
return True
except Exception as e:
print(f"Error inserting data into MongoDB: {e}")
return False
#TODO: DELETE
def query_movie_by_name(self, movie_name: str) -> Optional[Dict]:
"""Query movie by name - OPTIMIZED with shard key (O(1) lookup)"""
try:
result = self.movies_collection.find_one({"movie_name": movie_name})
return result
except Exception as e:
print(f"Error querying movie by name: {e}")
return None
#TODO: DELETE
def query_movies_by_year_range(self, start_year: int, end_year: int) -> List[Dict]:
"""Query movies by year range"""
try:
query = {"year": {"$gte": start_year, "$lte": end_year}}
results = self.movies_collection.find(query).sort("year", 1)
return list(results)
except Exception as e:
print(f"Error querying movies by year range: {e}")
return []
# Global data structures for processing
movies = {}
persons = {}
directors = {}
client = None
db = None
movies_collection = None
def connect(connection_string: str = "mongodb://localhost:27017/",
database_name: str = "imdb_database") -> bool:
global client, db, movies_collection
try:
client = MongoClient(connection_string, serverSelectionTimeoutMS=5000)
client.admin.command('ping')
db = client[database_name]
movies_collection = db.movies
print(f"Connected to MongoDB database: {database_name}")
return True
except ConnectionFailure as e:
print(f"Failed to connect to MongoDB: {e}")
return False
except Exception as e:
print(f"MongoDB connection error: {e}")
return False
def disconnect():
global client
if client:
client.close()
def load_movies_data(file_path: str, sample_size: Optional[int] = None) -> bool:
global movies
try:
movies_loaded = 0
with open(file_path, 'r', encoding='latin-1') as file:
next(file)
for line in file:
if sample_size and movies_loaded >= sample_size:
break
line = line.strip()
if not line:
continue
try:
parts = line.split(',')
movie_id = int(parts[0])
year_str = parts[-2] if len(parts) >= 3 else ''
rank_str = parts[-1] if len(parts) >= 4 else ''
movie_name = ','.join(parts[1:-2]) if len(parts) > 3 else parts[1] if len(parts) > 1 else ''
year = int(year_str) if year_str and year_str.strip() else None
rank = float(rank_str) if rank_str and rank_str.strip() else None
movies[movie_id] = {
'movie_id': movie_id,
'movie_name': movie_name,
'year': year,
'rank': rank,
'directors': [],
'cast': []
}
movies_loaded += 1
except (ValueError, IndexError):
continue
print(f"Loaded {len(movies)} movies")
return True
except Exception as e:
print(f"Error loading movies: {e}")
return False
def load_persons_data(file_path: str) -> bool:
global persons
try:
persons_df = pd.read_csv(
file_path,
encoding='latin-1',
dtype={
'id': 'int32',
'fname': 'string',
'lname': 'string',
'gender': 'string'
}
)
for _, row in persons_df.iterrows():
person_id = int(row['id'])
full_name = f"{row['fname']} {row['lname']}".strip()
persons[person_id] = {
'person_id': person_id,
'name': full_name,
'gender': str(row['gender'])
}
print(f"Loaded {len(persons)} persons")
return True
except Exception as e:
print(f"Error loading persons: {e}")
return False
def load_directors_data(file_path: str) -> bool:
global directors
try:
directors_df = pd.read_csv(
file_path,
encoding="latin-1",
dtype={
'id': 'int32',
'fname': 'string',
'lname': 'string'
}
)
for _, row in directors_df.iterrows():
director_id = int(row['id'])
full_name = f"{row['fname']} {row['lname']}".strip()
directors[director_id] = {
'director_id': director_id,
'name': full_name
}
print(f"Loaded {len(directors)} directors")
return True
except Exception as e:
print(f"Error loading directors: {e}")
return False
def load_movie_directors_data(file_path: str) -> bool:
global movies, directors
try:
movie_directors_df = pd.read_csv(
file_path,
encoding='latin-1',
dtype={'did': 'int32', 'mid': 'int32'}
)
linked_count = 0
for _, row in movie_directors_df.iterrows():
director_id = int(row['did'])
movie_id = int(row['mid'])
if movie_id in movies and director_id in directors:
director_info = {
'director_id': director_id,
'name': directors[director_id]['name']
}
movies[movie_id]['directors'].append(director_info)
linked_count += 1
return True
except Exception as e:
print(f"Error loading movie-director relationships: {e}")
return False
def load_cast_data(file_path: str, max_cast_per_movie: int = 25) -> bool:
global movies, persons
try:
if not os.path.exists(file_path):
return True
movie_cast_count = defaultdict(int)
linked_count = 0
with open(file_path, 'r', encoding='latin-1') as file:
next(file)
for line_num, line in enumerate(file):
if line_num > 100000:
break
line = line.strip()
if not line:
continue
try:
parts = line.split(',')
if len(parts) < 3:
continue
person_id = int(parts[0])
movie_id = int(parts[1])
role = ','.join(parts[2:]) if len(parts) > 2 else "[Unknown]"
if (movie_id in movies and person_id in persons and
movie_cast_count[movie_id] < max_cast_per_movie):
cast_info = {
'person_id': person_id,
'name': persons[person_id]['name'],
'gender': persons[person_id]['gender'],
'role': role
}
movies[movie_id]['cast'].append(cast_info)
movie_cast_count[movie_id] += 1
linked_count += 1
except (ValueError, KeyError):
continue
if linked_count > 0:
print(f"Loaded {linked_count} cast entries")
return True
except Exception as e:
return True
def create_indexes():
global movies_collection
try:
movies_collection.create_index("movie_name", unique=True, name="idx_movie_name")
movies_collection.create_index("year", name="idx_year")
movies_collection.create_index("rank", name="idx_rank")
movies_collection.create_index("directors.name", name="idx_director_name")
except Exception as e:
pass
def insert_movies_to_mongodb(batch_size: int = 2000) -> bool:
global movies, movies_collection
try:
movies_collection.drop()
documents = []
for movie_id, movie_data in movies.items():
document = {
'movie_id': movie_data['movie_id'],
'movie_name': movie_data['movie_name'],
'year': movie_data['year'],
'rank': movie_data['rank'],
'directors': movie_data['directors'],
'cast': movie_data['cast']
}
documents.append(document)
if len(documents) >= batch_size:
movies_collection.insert_many(documents, ordered=False)
documents = []
if documents:
movies_collection.insert_many(documents, ordered=False)
create_indexes()
total_count = movies_collection.count_documents({})
print(f"Inserted {total_count} movies into MongoDB")
return True
except Exception as e:
print(f"Error inserting data into MongoDB: {e}")
return False
def query_movie_by_name(movie_name: str) -> Optional[Dict]:
global movies_collection
try:
result = movies_collection.find_one({"movie_name": movie_name})
return result
except Exception as e:
print(f"Error querying movie by name: {e}")
return None
def query_movies_by_year_range(start_year: int, end_year: int) -> List[Dict]:
global movies_collection
try:
query = {"year": {"$gte": start_year, "$lte": end_year}}
results = movies_collection.find(query).sort("year", 1)
return list(results)
except Exception as e:
print(f"Error querying movies by year range: {e}")
return []
def main():
"""
Main function to demonstrate IMDB data loading into MongoDB
Showcases the schema design optimized for movie name access
"""
# Initialize the MongoDB loader
loader = IMDBMongoDBLoader(
connection_string="mongodb://localhost:27017/",
database_name="imdb_database"
)
try:
# Step 1: Load data from IMDB files
print("IMDB DATA LOADING FOR MONGODB")
print("=" * 50)
data_directory = "IMDB"
sample_size = None # Load subset for demonstration (set to None for full dataset)
sample_size = None
# Define file paths
movie_file = os.path.join(data_directory, "IMDBMovie.txt")
person_file = os.path.join(data_directory, "IMDBPerson.txt")
director_file = os.path.join(data_directory, "IMDBDirectors.txt")
movie_director_file = os.path.join(data_directory, "IMDBMovie_Directors.txt")
cast_file = os.path.join(data_directory, "IMDBCast.txt")
# Load data sequentially
if not loader.load_movies_data(movie_file, sample_size=sample_size):
if not load_movies_data(movie_file, sample_size=sample_size):
print("Failed to load movies")
return
if not loader.load_persons_data(person_file):
if not load_persons_data(person_file):
print("Failed to load persons")
return
if not loader.load_directors_data(director_file):
if not load_directors_data(director_file):
print("Failed to load directors")
return
if not loader.load_movie_directors_data(movie_director_file):
if not load_movie_directors_data(movie_director_file):
print("Failed to load movie-director relationships")
return
if not loader.load_cast_data(cast_file):
if not load_cast_data(cast_file):
print("Failed to load cast data")
return
# Show data loading statistics
total_movies = len(loader.movies)
movies_with_directors = sum(1 for movie in loader.movies.values() if movie['directors'])
movies_with_cast = sum(1 for movie in loader.movies.values() if movie['cast'])
total_movies = len(movies)
movies_with_directors = sum(1 for movie in movies.values() if movie['directors'])
movies_with_cast = sum(1 for movie in movies.values() if movie['cast'])
print(f"\nTotal movies loaded: {total_movies}")
print(f"Movies with directors: {movies_with_directors}")
print(f"Movies with cast: {movies_with_cast}")
# Step 2: Try to connect to MongoDB and insert data
if loader.connect():
if loader.insert_movies_to_mongodb(batch_size=1000):
if connect():
if insert_movies_to_mongodb(batch_size=1000):
print("Data inserted into MongoDB successfully")
# Test query by movie name (shard key)
test_movie = loader.query_movie_by_name("$ (1971)")
test_movie = query_movie_by_name("$ (1971)")
if test_movie:
print(f"\nFound movie: {test_movie['movie_name']} ({test_movie.get('year', 'N/A')})")
# Database statistics
total_in_db = loader.movies_collection.count_documents({})
movies_with_directors_db = loader.movies_collection.count_documents({"directors": {"$ne": []}})
total_in_db = movies_collection.count_documents({})
movies_with_directors_db = movies_collection.count_documents({"directors": {"$ne": []}})
print(f"Total movies in database: {total_in_db}")
print(f"Movies with directors: {movies_with_directors_db}")
@@ -453,8 +361,7 @@ def main():
print(f"Error in main execution: {e}")
finally:
# Clean up
loader.disconnect()
disconnect()
if __name__ == "__main__":
+74 -10
View File
@@ -1,16 +1,80 @@
# import findspark
# findspark.init()
from pyspark import SparkConf, SparkContext
def main():
conf = SparkConf().setAppName("local123").setMaster("local[*]")
conf = SparkConf().setAppName("Task2").setMaster("local[*]")
sc = SparkContext(conf=conf)
data = sc.textFile("pt2-in.txt")
# Read and display the first few lines of the data
first_lines = data.take(10)
for line in first_lines:
print(line)
# Read the input file
data = sc.textFile("t2-in.txt")
# Parse edges and create initial graph structure
edges = data.map(lambda line: line.strip().split()).map(lambda x: (int(x[0]), int(x[1])))
all_vertices = edges.flatMap(lambda x: [x[0], x[1]]).distinct().collect()
all_vertices_set = set(all_vertices)
num_vertices = len(all_vertices_set)
# Create adjacency list: (vertex, [list of neighbors])
adjacency_list = edges.groupByKey().mapValues(list).collectAsMap()
# Step 0: Handle vertices with no outgoing edges
# Add edges from nodes with no outgoing edges to all other nodes
vertices_with_outgoing = set(adjacency_list.keys())
vertices_without_outgoing = all_vertices_set - vertices_with_outgoing
# Add all other vertices as neighbors for vertices without outgoing edges
for vertex in vertices_without_outgoing:
adjacency_list[vertex] = [v for v in all_vertices if v != vertex]
# Ensure all vertices have an entry in adjacency list
for vertex in all_vertices:
if vertex not in adjacency_list:
adjacency_list[vertex] = []
# Convert to RDD format: (vertex, (neighbors, current_rank))
# Step 1: Initialize page rank of every node as 1
graph_rdd = sc.parallelize([(vertex, (neighbors, 1.0)) for vertex, neighbors in adjacency_list.items()])
for iteration in range(10):
# Step 2: Each vertex contributes rank(v)/|neighbors(v)| to its neighbors
contributions = graph_rdd.flatMap(lambda vertex_data:
[(neighbor, vertex_data[1][1] / len(vertex_data[1][0]))
for neighbor in vertex_data[1][0]] if len(vertex_data[1][0]) > 0 else []
)
# Sum contributions for each vertex
summed_contributions = contributions.reduceByKey(lambda a, b: a + b)
# Convert to dictionary
contributions_dict = summed_contributions.collectAsMap()
# Step 3: Update ranks: 0.15 + 0.85 × (contributions)
# Join with original graph to maintain adjacency lists and update ranks
def update_rank(vertex_data):
vertex, (neighbors, old_rank) = vertex_data
contribution = contributions_dict.get(vertex, 0.0)
new_rank = 0.15 + 0.85 * contribution
return (vertex, (neighbors, new_rank))
graph_rdd = graph_rdd.map(update_rank)
# Step 5: Divide by total number of vertices
final_graph_rdd = graph_rdd.map(lambda vertex_data:
(vertex_data[0], vertex_data[1][1] / num_vertices)
)
# Collect final results + save
final_ranks = final_graph_rdd.collect()
final_ranks.sort(key=lambda x: x[0])
print(f"\nFinal Page Rank results:")
for vertex, rank in final_ranks:
print(f"{vertex} {rank}")
output_rdd = sc.parallelize(final_ranks).map(lambda x: f"{x[0]} {x[1]}")
with open("t2-out.txt", "w") as f:
for vertex, rank in final_ranks:
f.write(f"{vertex} {rank}\n")
sc.stop()
main()
+38
View File
@@ -0,0 +1,38 @@
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
def main():
conf = SparkConf().setAppName("Task3").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
# Read the output file from Task 2
t2_out = sc.textFile("t2-out.txt")
# a. Create dataframe from RDD (id, page_rank)
page_rank_rdd = t2_out.map(lambda line: line.strip().split()).map(lambda x: (int(x[0]), float(x[1])))
page_rank_df = spark.createDataFrame(page_rank_rdd, ["id", "page_rank"])
# b. Spark SQL query to find page rank of vertex 2
page_rank_df.createOrReplaceTempView("page_rank")
result = spark.sql("SELECT page_rank FROM page_rank WHERE id = 2")
print("Page Rank of vertex 2:")
result.show()
# c. Find vertex with highest page rank
result = spark.sql("SELECT id, page_rank FROM page_rank ORDER BY page_rank DESC LIMIT 1")
print("Vertex with highest Page Rank:")
result.show()
# d. Create dataframe from t3-in (id, name)
t3_in = sc.textFile("t3-in.txt")
names_rdd = t3_in.map(lambda line: line.strip().split()).map(lambda x: (int(x[0]), x[1]))
names_df = spark.createDataFrame(names_rdd, ["id", "name"])
names_df.createOrReplaceTempView("names")
# e. Join and print dataframes on id
result = spark.sql("SELECT n.id, n.name, p.page_rank FROM names n JOIN page_rank p ON n.id = p.id")
print("Joined DataFrame (id, name, page_rank):")
result.show()
main()