From 68df013917bfc7b8df6c419fb98aae7462eabe0f Mon Sep 17 00:00:00 2001 From: Nicholas Pease Date: Thu, 18 Dec 2025 03:31:49 +0000 Subject: [PATCH] Final --- .devcontainer/Dockerfile | 14 - .devcontainer/devcontainer.json | 32 -- .devcontainer/docker-compose.yml | 20 - pt2-in.txt => t2-in.txt | 0 t2-out.txt | 4 + t3-in.txt | 4 + task1.py | 691 +++++++++++++------------------ task2.py | 84 +++- task3.py | 38 ++ 9 files changed, 419 insertions(+), 468 deletions(-) delete mode 100644 .devcontainer/Dockerfile delete mode 100644 .devcontainer/devcontainer.json delete mode 100644 .devcontainer/docker-compose.yml rename pt2-in.txt => t2-in.txt (100%) create mode 100644 t2-out.txt create mode 100644 t3-in.txt create mode 100644 task3.py diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile deleted file mode 100644 index ded4ad1..0000000 --- a/.devcontainer/Dockerfile +++ /dev/null @@ -1,14 +0,0 @@ -# [Choice] Python version: 3, 3.8, 3.7, 3.6 -ARG PYTHON_VARIANT=3.9 -FROM python:${PYTHON_VARIANT} - -# Install python dependencies -COPY requirements.txt /tmp/pip-tmp/ -RUN pip3 --disable-pip-version-check --no-cache-dir \ - install -r /tmp/pip-tmp/requirements.txt && rm -rf /tmp/pip-tmp - -# Install OpenJDK -ARG OPENJDK_VARIANT=11 -RUN apt-get update && \ - apt-get install -y openjdk-21-jdk && \ - apt-get clean \ No newline at end of file diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json deleted file mode 100644 index 33df37f..0000000 --- a/.devcontainer/devcontainer.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "name": "Python 3 & Apache Spark", - "dockerComposeFile": "docker-compose.yml", - "service": "app", - "workspaceFolder": "/workspace", - "settings": { - "terminal.integrated.profiles.linux": { - "bash": { - "path": "/bin/bash", - "icon": "terminal-bash" - } - }, - "python.pythonPath": "/usr/local/bin/python", - "python.languageServer": "Pylance", - "python.linting.enabled": true, - "python.linting.pylintEnabled": true, - "python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8", - "python.formatting.blackPath": "/usr/local/py-utils/bin/black", - "python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf", - "python.linting.banditPath": "/usr/local/py-utils/bin/bandit", - "python.linting.flake8Path": "/usr/local/py-utils/bin/flake8", - "python.linting.mypyPath": "/usr/local/py-utils/bin/mypy", - "python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle", - "python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle", - "python.linting.pylintPath": "/usr/local/py-utils/bin/pylint", - "python.testing.pytestPath": "/usr/local/py-utils/bin/pytest" - }, - "extensions": [ - "ms-python.python", - "ms-python.vscode-pylance" - ] -} \ No newline at end of file diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml deleted file mode 100644 index 18e5815..0000000 --- a/.devcontainer/docker-compose.yml +++ /dev/null @@ -1,20 +0,0 @@ -version: '3' - -services: - app: - build: - context: .. - dockerfile: .devcontainer/Dockerfile - args: - PYTHON_VARIANT: 3.9 - JAVA_VARIANT: 11-bullseye - volumes: - - ..:/workspace:cached - command: sleep infinity - - pyspark: - image: jupyter/pyspark-notebook:spark-3.1.2 - environment: - - JUPYTER_ENABLE_LAB=yes - ports: - - 8889:8888 \ No newline at end of file diff --git a/pt2-in.txt b/t2-in.txt similarity index 100% rename from pt2-in.txt rename to t2-in.txt diff --git a/t2-out.txt b/t2-out.txt new file mode 100644 index 0000000..e9d12b5 --- /dev/null +++ b/t2-out.txt @@ -0,0 +1,4 @@ +0 0.19017201099717568 +1 0.21116445874870277 +2 0.27099244775152703 +3 0.3276710825025942 diff --git a/t3-in.txt b/t3-in.txt new file mode 100644 index 0000000..96038ac --- /dev/null +++ b/t3-in.txt @@ -0,0 +1,4 @@ +0 Adam +1 Lisa +2 Bert +3 Ralph \ No newline at end of file diff --git a/task1.py b/task1.py index bd5466a..60f8d02 100644 --- a/task1.py +++ b/task1.py @@ -10,436 +10,344 @@ import os from typing import Dict, List, Optional from collections import defaultdict -class IMDBMongoDBLoader: - """ - IMDB Data Loader for MongoDB - - MongoDB Schema Design for Fast Access by Movie Name: - - Database: imdb_database - Collection: movies (single collection containing all data) - - Shard Key: movie_name (for O(1) lookup by movie name) - - Document Structure: - { - "_id": ObjectId, - "movie_name": "The Godfather", # SHARD KEY - optimized for fast access - "movie_id": 123, - "year": 1972, - "rank": 9.2, - "directors": [ - { - "director_id": 456, - "name": "Francis Ford Coppola" - } - ], - "cast": [ - { - "person_id": 789, - "name": "Marlon Brando", - "gender": "M", - "role": "[Don Vito Corleone]" - } - ] - } - """ - - def __init__(self, connection_string: str = "mongodb://localhost:27017/", - database_name: str = "imdb_database"): - """ - Initialize MongoDB connection - - Args: - connection_string: MongoDB connection string - database_name: Name of the database to use - """ - self.connection_string = connection_string - self.database_name = database_name - self.client = None - self.db = None - self.movies_collection = None - - # In-memory data structures for processing - self.movies = {} - self.persons = {} - self.directors = {} - - def connect(self) -> bool: - """Connect to MongoDB and set up the database and collection""" - try: - self.client = MongoClient(self.connection_string, serverSelectionTimeoutMS=5000) - # Test connection - self.client.admin.command('ping') - self.db = self.client[self.database_name] - self.movies_collection = self.db.movies - - print(f"Connected to MongoDB database: {self.database_name}") - return True - - except ConnectionFailure as e: - print(f"Failed to connect to MongoDB: {e}") - return False - except Exception as e: - print(f"MongoDB connection error: {e}") - return False - - def disconnect(self): - """Close MongoDB connection""" - if self.client: - self.client.close() - - def load_movies_data(self, file_path: str, sample_size: Optional[int] = None) -> bool: - """ - Load movie data from IMDBMovie.txt - Handles CSV parsing issues with embedded commas in movie names - """ - try: - movies_loaded = 0 - with open(file_path, 'r', encoding='latin-1') as file: - # Skip header - next(file) - - for line in file: - if sample_size and movies_loaded >= sample_size: - break - - line = line.strip() - if not line: - continue - - try: - # Parse CSV line manually - handle embedded commas in movie names - parts = line.split(',') - - # First part is always movie ID - movie_id = int(parts[0]) - - # Last two parts are year and rank (rank might be empty) - year_str = parts[-2] if len(parts) >= 3 else '' - rank_str = parts[-1] if len(parts) >= 4 else '' - - # Everything in between is the movie name - movie_name = ','.join(parts[1:-2]) if len(parts) > 3 else parts[1] if len(parts) > 1 else '' - - # Parse year and rank - year = int(year_str) if year_str and year_str.strip() else None - rank = float(rank_str) if rank_str and rank_str.strip() else None - - self.movies[movie_id] = { - 'movie_id': movie_id, - 'movie_name': movie_name, - 'year': year, - 'rank': rank, - 'directors': [], - 'cast': [] - } - - movies_loaded += 1 - - except (ValueError, IndexError): - # Skip malformed lines - continue - - print(f"Loaded {len(self.movies)} movies") - return True - - except Exception as e: - print(f"Error loading movies: {e}") - return False - - def load_persons_data(self, file_path: str) -> bool: - """Load person data from IMDBPerson.txt""" - try: - # Try different encodings to handle international characters - persons_df = pd.read_csv( - file_path, - encoding='latin-1', - dtype={ - 'id': 'int32', - 'fname': 'string', - 'lname': 'string', - 'gender': 'string' - } - ) - - for _, row in persons_df.iterrows(): - person_id = int(row['id']) - full_name = f"{row['fname']} {row['lname']}".strip() - self.persons[person_id] = { - 'person_id': person_id, - 'name': full_name, - 'gender': str(row['gender']) - } - - print(f"Loaded {len(self.persons)} persons") - return True - - except Exception as e: - print(f"Error loading persons: {e}") - return False - - def load_directors_data(self, file_path: str) -> bool: - """Load director data from IMDBDirectors.txt""" - try: - # Try different encodings to handle international characters - directors_df = pd.read_csv( - file_path, - encoding="latin-1", - dtype={ - 'id': 'int32', - 'fname': 'string', - 'lname': 'string' - } - ) - - for _, row in directors_df.iterrows(): - director_id = int(row['id']) - full_name = f"{row['fname']} {row['lname']}".strip() - self.directors[director_id] = { - 'director_id': director_id, - 'name': full_name - } - - print(f"Loaded {len(self.directors)} directors") - return True - - except Exception as e: - print(f"Error loading directors: {e}") - return False - - def load_movie_directors_data(self, file_path: str) -> bool: - """Load movie-director relationships from IMDBMovie_Directors.txt""" - try: - movie_directors_df = pd.read_csv( - file_path, - encoding='latin-1', - dtype={'did': 'int32', 'mid': 'int32'} - ) - - linked_count = 0 - for _, row in movie_directors_df.iterrows(): - director_id = int(row['did']) - movie_id = int(row['mid']) - - if movie_id in self.movies and director_id in self.directors: - director_info = { - 'director_id': director_id, - 'name': self.directors[director_id]['name'] - } - self.movies[movie_id]['directors'].append(director_info) - linked_count += 1 - - return True - - except Exception as e: - print(f"Error loading movie-director relationships: {e}") - return False - - def load_cast_data(self, file_path: str, max_cast_per_movie: int = 25) -> bool: - """Load cast data from IMDBCast.txt (large file - process efficiently)""" - try: - if not os.path.exists(file_path): - return True - - movie_cast_count = defaultdict(int) - linked_count = 0 - - # Read line by line to handle CSV parsing issues - with open(file_path, 'r', encoding='latin-1') as file: - # Skip header - next(file) - - for line_num, line in enumerate(file): - if line_num > 100000: # Limit for efficiency - break - - line = line.strip() - if not line: - continue - - try: - # Parse CSV line manually - handle embedded commas in roles - parts = line.split(',') - - if len(parts) < 3: - continue - - person_id = int(parts[0]) - movie_id = int(parts[1]) - # Role is everything after the second comma - role = ','.join(parts[2:]) if len(parts) > 2 else "[Unknown]" - - # Limit cast per movie to prevent overly large documents - if (movie_id in self.movies and person_id in self.persons and - movie_cast_count[movie_id] < max_cast_per_movie): - - cast_info = { - 'person_id': person_id, - 'name': self.persons[person_id]['name'], - 'gender': self.persons[person_id]['gender'], - 'role': role - } - self.movies[movie_id]['cast'].append(cast_info) - movie_cast_count[movie_id] += 1 - linked_count += 1 - - except (ValueError, KeyError): - continue - - if linked_count > 0: - print(f"Loaded {linked_count} cast entries") - - return True - - except Exception as e: - return True - - def create_indexes(self): - """Create indexes for optimal query performance""" - try: - # Primary index on movie_name (shard key) - self.movies_collection.create_index("movie_name", unique=True, name="idx_movie_name") - - # Secondary indexes for common query patterns - self.movies_collection.create_index("year", name="idx_year") - self.movies_collection.create_index("rank", name="idx_rank") - self.movies_collection.create_index("directors.name", name="idx_director_name") - - except Exception as e: - pass - - def insert_movies_to_mongodb(self, batch_size: int = 2000) -> bool: - """Insert all loaded movie data into MongoDB""" - try: - # Clear existing collection - self.movies_collection.drop() - - # Prepare documents for batch insertion - documents = [] - for movie_id, movie_data in self.movies.items(): - document = { - 'movie_id': movie_data['movie_id'], - 'movie_name': movie_data['movie_name'], # SHARD KEY - 'year': movie_data['year'], - 'rank': movie_data['rank'], - 'directors': movie_data['directors'], - 'cast': movie_data['cast'] - } - documents.append(document) - - # Insert in batches for efficiency - if len(documents) >= batch_size: - self.movies_collection.insert_many(documents, ordered=False) - documents = [] - - # Insert remaining documents - if documents: - self.movies_collection.insert_many(documents, ordered=False) - - # Create indexes for optimal querying - self.create_indexes() - - total_count = self.movies_collection.count_documents({}) - print(f"Inserted {total_count} movies into MongoDB") - - return True - - except Exception as e: - print(f"Error inserting data into MongoDB: {e}") - return False - - #TODO: DELETE - def query_movie_by_name(self, movie_name: str) -> Optional[Dict]: - """Query movie by name - OPTIMIZED with shard key (O(1) lookup)""" - try: - result = self.movies_collection.find_one({"movie_name": movie_name}) - return result - except Exception as e: - print(f"Error querying movie by name: {e}") - return None - - #TODO: DELETE - def query_movies_by_year_range(self, start_year: int, end_year: int) -> List[Dict]: - """Query movies by year range""" - try: - query = {"year": {"$gte": start_year, "$lte": end_year}} - results = self.movies_collection.find(query).sort("year", 1) - return list(results) - except Exception as e: - print(f"Error querying movies by year range: {e}") - return [] +# Global data structures for processing +movies = {} +persons = {} +directors = {} +client = None +db = None +movies_collection = None +def connect(connection_string: str = "mongodb://localhost:27017/", + database_name: str = "imdb_database") -> bool: + global client, db, movies_collection + try: + client = MongoClient(connection_string, serverSelectionTimeoutMS=5000) + client.admin.command('ping') + db = client[database_name] + movies_collection = db.movies + + print(f"Connected to MongoDB database: {database_name}") + return True + + except ConnectionFailure as e: + print(f"Failed to connect to MongoDB: {e}") + return False + except Exception as e: + print(f"MongoDB connection error: {e}") + return False + +def disconnect(): + global client + if client: + client.close() + +def load_movies_data(file_path: str, sample_size: Optional[int] = None) -> bool: + global movies + try: + movies_loaded = 0 + with open(file_path, 'r', encoding='latin-1') as file: + next(file) + + for line in file: + if sample_size and movies_loaded >= sample_size: + break + + line = line.strip() + if not line: + continue + + try: + parts = line.split(',') + + movie_id = int(parts[0]) + + year_str = parts[-2] if len(parts) >= 3 else '' + rank_str = parts[-1] if len(parts) >= 4 else '' + + movie_name = ','.join(parts[1:-2]) if len(parts) > 3 else parts[1] if len(parts) > 1 else '' + + year = int(year_str) if year_str and year_str.strip() else None + rank = float(rank_str) if rank_str and rank_str.strip() else None + + movies[movie_id] = { + 'movie_id': movie_id, + 'movie_name': movie_name, + 'year': year, + 'rank': rank, + 'directors': [], + 'cast': [] + } + + movies_loaded += 1 + + except (ValueError, IndexError): + continue + + print(f"Loaded {len(movies)} movies") + return True + + except Exception as e: + print(f"Error loading movies: {e}") + return False + +def load_persons_data(file_path: str) -> bool: + global persons + try: + persons_df = pd.read_csv( + file_path, + encoding='latin-1', + dtype={ + 'id': 'int32', + 'fname': 'string', + 'lname': 'string', + 'gender': 'string' + } + ) + + for _, row in persons_df.iterrows(): + person_id = int(row['id']) + full_name = f"{row['fname']} {row['lname']}".strip() + persons[person_id] = { + 'person_id': person_id, + 'name': full_name, + 'gender': str(row['gender']) + } + + print(f"Loaded {len(persons)} persons") + return True + + except Exception as e: + print(f"Error loading persons: {e}") + return False + +def load_directors_data(file_path: str) -> bool: + global directors + try: + directors_df = pd.read_csv( + file_path, + encoding="latin-1", + dtype={ + 'id': 'int32', + 'fname': 'string', + 'lname': 'string' + } + ) + + for _, row in directors_df.iterrows(): + director_id = int(row['id']) + full_name = f"{row['fname']} {row['lname']}".strip() + directors[director_id] = { + 'director_id': director_id, + 'name': full_name + } + + print(f"Loaded {len(directors)} directors") + return True + + except Exception as e: + print(f"Error loading directors: {e}") + return False + +def load_movie_directors_data(file_path: str) -> bool: + global movies, directors + try: + movie_directors_df = pd.read_csv( + file_path, + encoding='latin-1', + dtype={'did': 'int32', 'mid': 'int32'} + ) + + linked_count = 0 + for _, row in movie_directors_df.iterrows(): + director_id = int(row['did']) + movie_id = int(row['mid']) + + if movie_id in movies and director_id in directors: + director_info = { + 'director_id': director_id, + 'name': directors[director_id]['name'] + } + movies[movie_id]['directors'].append(director_info) + linked_count += 1 + + return True + + except Exception as e: + print(f"Error loading movie-director relationships: {e}") + return False + +def load_cast_data(file_path: str, max_cast_per_movie: int = 25) -> bool: + global movies, persons + try: + if not os.path.exists(file_path): + return True + + movie_cast_count = defaultdict(int) + linked_count = 0 + + with open(file_path, 'r', encoding='latin-1') as file: + next(file) + + for line_num, line in enumerate(file): + if line_num > 100000: + break + + line = line.strip() + if not line: + continue + + try: + parts = line.split(',') + + if len(parts) < 3: + continue + + person_id = int(parts[0]) + movie_id = int(parts[1]) + role = ','.join(parts[2:]) if len(parts) > 2 else "[Unknown]" + + if (movie_id in movies and person_id in persons and + movie_cast_count[movie_id] < max_cast_per_movie): + + cast_info = { + 'person_id': person_id, + 'name': persons[person_id]['name'], + 'gender': persons[person_id]['gender'], + 'role': role + } + movies[movie_id]['cast'].append(cast_info) + movie_cast_count[movie_id] += 1 + linked_count += 1 + + except (ValueError, KeyError): + continue + + if linked_count > 0: + print(f"Loaded {linked_count} cast entries") + + return True + + except Exception as e: + return True + +def create_indexes(): + global movies_collection + try: + movies_collection.create_index("movie_name", unique=True, name="idx_movie_name") + movies_collection.create_index("year", name="idx_year") + movies_collection.create_index("rank", name="idx_rank") + movies_collection.create_index("directors.name", name="idx_director_name") + + except Exception as e: + pass + +def insert_movies_to_mongodb(batch_size: int = 2000) -> bool: + global movies, movies_collection + try: + movies_collection.drop() + + documents = [] + for movie_id, movie_data in movies.items(): + document = { + 'movie_id': movie_data['movie_id'], + 'movie_name': movie_data['movie_name'], + 'year': movie_data['year'], + 'rank': movie_data['rank'], + 'directors': movie_data['directors'], + 'cast': movie_data['cast'] + } + documents.append(document) + + if len(documents) >= batch_size: + movies_collection.insert_many(documents, ordered=False) + documents = [] + + if documents: + movies_collection.insert_many(documents, ordered=False) + + create_indexes() + + total_count = movies_collection.count_documents({}) + print(f"Inserted {total_count} movies into MongoDB") + + return True + + except Exception as e: + print(f"Error inserting data into MongoDB: {e}") + return False + +def query_movie_by_name(movie_name: str) -> Optional[Dict]: + global movies_collection + try: + result = movies_collection.find_one({"movie_name": movie_name}) + return result + except Exception as e: + print(f"Error querying movie by name: {e}") + return None + +def query_movies_by_year_range(start_year: int, end_year: int) -> List[Dict]: + global movies_collection + try: + query = {"year": {"$gte": start_year, "$lte": end_year}} + results = movies_collection.find(query).sort("year", 1) + return list(results) + except Exception as e: + print(f"Error querying movies by year range: {e}") + return [] def main(): - """ - Main function to demonstrate IMDB data loading into MongoDB - Showcases the schema design optimized for movie name access - """ - # Initialize the MongoDB loader - loader = IMDBMongoDBLoader( - connection_string="mongodb://localhost:27017/", - database_name="imdb_database" - ) - try: - # Step 1: Load data from IMDB files print("IMDB DATA LOADING FOR MONGODB") print("=" * 50) data_directory = "IMDB" - sample_size = None # Load subset for demonstration (set to None for full dataset) + sample_size = None - # Define file paths movie_file = os.path.join(data_directory, "IMDBMovie.txt") person_file = os.path.join(data_directory, "IMDBPerson.txt") director_file = os.path.join(data_directory, "IMDBDirectors.txt") movie_director_file = os.path.join(data_directory, "IMDBMovie_Directors.txt") cast_file = os.path.join(data_directory, "IMDBCast.txt") - # Load data sequentially - if not loader.load_movies_data(movie_file, sample_size=sample_size): + if not load_movies_data(movie_file, sample_size=sample_size): print("Failed to load movies") return - if not loader.load_persons_data(person_file): + if not load_persons_data(person_file): print("Failed to load persons") return - if not loader.load_directors_data(director_file): + if not load_directors_data(director_file): print("Failed to load directors") return - if not loader.load_movie_directors_data(movie_director_file): + if not load_movie_directors_data(movie_director_file): print("Failed to load movie-director relationships") return - if not loader.load_cast_data(cast_file): + if not load_cast_data(cast_file): print("Failed to load cast data") return - # Show data loading statistics - total_movies = len(loader.movies) - movies_with_directors = sum(1 for movie in loader.movies.values() if movie['directors']) - movies_with_cast = sum(1 for movie in loader.movies.values() if movie['cast']) + total_movies = len(movies) + movies_with_directors = sum(1 for movie in movies.values() if movie['directors']) + movies_with_cast = sum(1 for movie in movies.values() if movie['cast']) print(f"\nTotal movies loaded: {total_movies}") print(f"Movies with directors: {movies_with_directors}") print(f"Movies with cast: {movies_with_cast}") - # Step 2: Try to connect to MongoDB and insert data - if loader.connect(): - if loader.insert_movies_to_mongodb(batch_size=1000): + if connect(): + if insert_movies_to_mongodb(batch_size=1000): print("Data inserted into MongoDB successfully") - # Test query by movie name (shard key) - test_movie = loader.query_movie_by_name("$ (1971)") + test_movie = query_movie_by_name("$ (1971)") if test_movie: print(f"\nFound movie: {test_movie['movie_name']} ({test_movie.get('year', 'N/A')})") - # Database statistics - total_in_db = loader.movies_collection.count_documents({}) - movies_with_directors_db = loader.movies_collection.count_documents({"directors": {"$ne": []}}) + total_in_db = movies_collection.count_documents({}) + movies_with_directors_db = movies_collection.count_documents({"directors": {"$ne": []}}) print(f"Total movies in database: {total_in_db}") print(f"Movies with directors: {movies_with_directors_db}") @@ -453,8 +361,7 @@ def main(): print(f"Error in main execution: {e}") finally: - # Clean up - loader.disconnect() + disconnect() if __name__ == "__main__": diff --git a/task2.py b/task2.py index f12f945..9c62e64 100644 --- a/task2.py +++ b/task2.py @@ -1,16 +1,80 @@ -# import findspark -# findspark.init() - from pyspark import SparkConf, SparkContext + def main(): - conf = SparkConf().setAppName("local123").setMaster("local[*]") + conf = SparkConf().setAppName("Task2").setMaster("local[*]") sc = SparkContext(conf=conf) - data = sc.textFile("pt2-in.txt") - - # Read and display the first few lines of the data - first_lines = data.take(10) - for line in first_lines: - print(line) + # Read the input file + data = sc.textFile("t2-in.txt") + # Parse edges and create initial graph structure + edges = data.map(lambda line: line.strip().split()).map(lambda x: (int(x[0]), int(x[1]))) + all_vertices = edges.flatMap(lambda x: [x[0], x[1]]).distinct().collect() + all_vertices_set = set(all_vertices) + num_vertices = len(all_vertices_set) + + # Create adjacency list: (vertex, [list of neighbors]) + adjacency_list = edges.groupByKey().mapValues(list).collectAsMap() + + # Step 0: Handle vertices with no outgoing edges + # Add edges from nodes with no outgoing edges to all other nodes + vertices_with_outgoing = set(adjacency_list.keys()) + vertices_without_outgoing = all_vertices_set - vertices_with_outgoing + + # Add all other vertices as neighbors for vertices without outgoing edges + for vertex in vertices_without_outgoing: + adjacency_list[vertex] = [v for v in all_vertices if v != vertex] + + # Ensure all vertices have an entry in adjacency list + for vertex in all_vertices: + if vertex not in adjacency_list: + adjacency_list[vertex] = [] + + # Convert to RDD format: (vertex, (neighbors, current_rank)) + # Step 1: Initialize page rank of every node as 1 + graph_rdd = sc.parallelize([(vertex, (neighbors, 1.0)) for vertex, neighbors in adjacency_list.items()]) + + for iteration in range(10): + # Step 2: Each vertex contributes rank(v)/|neighbors(v)| to its neighbors + contributions = graph_rdd.flatMap(lambda vertex_data: + [(neighbor, vertex_data[1][1] / len(vertex_data[1][0])) + for neighbor in vertex_data[1][0]] if len(vertex_data[1][0]) > 0 else [] + ) + + # Sum contributions for each vertex + summed_contributions = contributions.reduceByKey(lambda a, b: a + b) + + # Convert to dictionary + contributions_dict = summed_contributions.collectAsMap() + + # Step 3: Update ranks: 0.15 + 0.85 × (contributions) + # Join with original graph to maintain adjacency lists and update ranks + def update_rank(vertex_data): + vertex, (neighbors, old_rank) = vertex_data + contribution = contributions_dict.get(vertex, 0.0) + new_rank = 0.15 + 0.85 * contribution + return (vertex, (neighbors, new_rank)) + + graph_rdd = graph_rdd.map(update_rank) + + # Step 5: Divide by total number of vertices + final_graph_rdd = graph_rdd.map(lambda vertex_data: + (vertex_data[0], vertex_data[1][1] / num_vertices) + ) + + # Collect final results + save + final_ranks = final_graph_rdd.collect() + final_ranks.sort(key=lambda x: x[0]) + + print(f"\nFinal Page Rank results:") + for vertex, rank in final_ranks: + print(f"{vertex} {rank}") + + output_rdd = sc.parallelize(final_ranks).map(lambda x: f"{x[0]} {x[1]}") + with open("t2-out.txt", "w") as f: + for vertex, rank in final_ranks: + f.write(f"{vertex} {rank}\n") + + sc.stop() + main() \ No newline at end of file diff --git a/task3.py b/task3.py new file mode 100644 index 0000000..c2d4268 --- /dev/null +++ b/task3.py @@ -0,0 +1,38 @@ +from pyspark import SparkConf, SparkContext +from pyspark.sql import SparkSession + +def main(): + conf = SparkConf().setAppName("Task3").setMaster("local[*]") + sc = SparkContext(conf=conf) + spark = SparkSession.builder.getOrCreate() + + # Read the output file from Task 2 + t2_out = sc.textFile("t2-out.txt") + + # a. Create dataframe from RDD (id, page_rank) + page_rank_rdd = t2_out.map(lambda line: line.strip().split()).map(lambda x: (int(x[0]), float(x[1]))) + page_rank_df = spark.createDataFrame(page_rank_rdd, ["id", "page_rank"]) + + # b. Spark SQL query to find page rank of vertex 2 + page_rank_df.createOrReplaceTempView("page_rank") + result = spark.sql("SELECT page_rank FROM page_rank WHERE id = 2") + print("Page Rank of vertex 2:") + result.show() + + # c. Find vertex with highest page rank + result = spark.sql("SELECT id, page_rank FROM page_rank ORDER BY page_rank DESC LIMIT 1") + print("Vertex with highest Page Rank:") + result.show() + + # d. Create dataframe from t3-in (id, name) + t3_in = sc.textFile("t3-in.txt") + names_rdd = t3_in.map(lambda line: line.strip().split()).map(lambda x: (int(x[0]), x[1])) + names_df = spark.createDataFrame(names_rdd, ["id", "name"]) + names_df.createOrReplaceTempView("names") + + # e. Join and print dataframes on id + result = spark.sql("SELECT n.id, n.name, p.page_rank FROM names n JOIN page_rank p ON n.id = p.id") + print("Joined DataFrame (id, name, page_rank):") + result.show() + +main() \ No newline at end of file