From 291cad2b4c922b20bc359cd5c4702be3592d0234 Mon Sep 17 00:00:00 2001 From: Nicholas Pease Date: Thu, 18 Dec 2025 17:51:40 -0500 Subject: [PATCH] Final --- requirements.txt | 2 - task1.py | 478 ++++++++++++++++------------------------------- task2.py | 7 + task3.py | 14 +- 4 files changed, 175 insertions(+), 326 deletions(-) delete mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index fdee051..0000000 --- a/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -ipykernel==6.4.1 -pyspark==3.1.2 \ No newline at end of file diff --git a/task1.py b/task1.py index 60f8d02..c0edf5d 100644 --- a/task1.py +++ b/task1.py @@ -1,6 +1,7 @@ # HW3 - Task 1 # Nicholas Pease -# IMDB Data Loading into MongoDB +# I decided to try MongoD because structure of documents, which aligns closely with JSON format, where I store a majority of the data I work with in other projects. +# I stored the data in this assignment in a collection of movie documents, where each document contains information about a movie, its directors, and its cast. import csv import pandas as pd @@ -10,7 +11,6 @@ import os from typing import Dict, List, Optional from collections import defaultdict -# Global data structures for processing movies = {} persons = {} directors = {} @@ -18,317 +18,171 @@ client = None db = None movies_collection = None -def connect(connection_string: str = "mongodb://localhost:27017/", - database_name: str = "imdb_database") -> bool: +def connect(): global client, db, movies_collection - try: - client = MongoClient(connection_string, serverSelectionTimeoutMS=5000) - client.admin.command('ping') - db = client[database_name] - movies_collection = db.movies - - print(f"Connected to MongoDB database: {database_name}") - return True - - except ConnectionFailure as e: - print(f"Failed to connect to MongoDB: {e}") - return False - except Exception as e: - print(f"MongoDB connection error: {e}") - return False + + client = MongoClient("mongodb://localhost:27017/", serverSelectionTimeoutMS=5000) + client.admin.command('ping') + db = client["imdb_database"] + movies_collection = db.movies + + print(f"Connected to MongoDB database: imdb_database") -def disconnect(): - global client - if client: - client.close() - -def load_movies_data(file_path: str, sample_size: Optional[int] = None) -> bool: +def load_movies_data(file_path): global movies - try: - movies_loaded = 0 - with open(file_path, 'r', encoding='latin-1') as file: - next(file) + movies_loaded = 0 + with open(file_path, 'r', encoding='latin-1') as file: + next(file) # skip header + + for line in file: + line = line.strip() - for line in file: - if sample_size and movies_loaded >= sample_size: - break - - line = line.strip() - if not line: - continue - - try: - parts = line.split(',') - - movie_id = int(parts[0]) - - year_str = parts[-2] if len(parts) >= 3 else '' - rank_str = parts[-1] if len(parts) >= 4 else '' - - movie_name = ','.join(parts[1:-2]) if len(parts) > 3 else parts[1] if len(parts) > 1 else '' - - year = int(year_str) if year_str and year_str.strip() else None - rank = float(rank_str) if rank_str and rank_str.strip() else None - - movies[movie_id] = { - 'movie_id': movie_id, - 'movie_name': movie_name, - 'year': year, - 'rank': rank, - 'directors': [], - 'cast': [] - } - - movies_loaded += 1 - - except (ValueError, IndexError): - continue - - print(f"Loaded {len(movies)} movies") - return True - - except Exception as e: - print(f"Error loading movies: {e}") - return False + parts = line.split(',') + + movie_id = int(parts[0]) + + year_str = parts[-2] if len(parts) >= 3 else '' + rank_str = parts[-1] if len(parts) >= 4 else '' + + movie_name = ','.join(parts[1:-2]) if len(parts) > 3 else parts[1] if len(parts) > 1 else '' + + year = int(year_str) if year_str and year_str.strip() else None + rank = float(rank_str) if rank_str and rank_str.strip() else None + + movies[movie_id] = { + 'movie_id': movie_id, + 'movie_name': movie_name, + 'year': year, + 'rank': rank, + 'directors': [], + 'cast': [] + } + + movies_loaded += 1 + + print(f"Loaded {len(movies)} movies") -def load_persons_data(file_path: str) -> bool: +def load_persons_data(file_path): global persons - try: - persons_df = pd.read_csv( - file_path, - encoding='latin-1', - dtype={ - 'id': 'int32', - 'fname': 'string', - 'lname': 'string', - 'gender': 'string' - } - ) - - for _, row in persons_df.iterrows(): - person_id = int(row['id']) - full_name = f"{row['fname']} {row['lname']}".strip() - persons[person_id] = { - 'person_id': person_id, - 'name': full_name, - 'gender': str(row['gender']) - } - - print(f"Loaded {len(persons)} persons") - return True - - except Exception as e: - print(f"Error loading persons: {e}") - return False + persons_df = pd.read_csv(file_path,encoding='latin-1',dtype={'id': 'int32','fname': 'string','lname': 'string', 'gender': 'string'}) + + for _, row in persons_df.iterrows(): + person_id = int(row['id']) + full_name = f"{row['fname']} {row['lname']}".strip() + persons[person_id] = { + 'person_id': person_id, + 'name': full_name, + 'gender': str(row['gender']) + } + + print(f"Loaded {len(persons)} persons") -def load_directors_data(file_path: str) -> bool: +def load_directors_data(file_path): global directors - try: - directors_df = pd.read_csv( - file_path, - encoding="latin-1", - dtype={ - 'id': 'int32', - 'fname': 'string', - 'lname': 'string' - } - ) - - for _, row in directors_df.iterrows(): - director_id = int(row['id']) - full_name = f"{row['fname']} {row['lname']}".strip() - directors[director_id] = { - 'director_id': director_id, - 'name': full_name - } - - print(f"Loaded {len(directors)} directors") - return True - - except Exception as e: - print(f"Error loading directors: {e}") - return False + directors_df = pd.read_csv(file_path,encoding="latin-1",dtype={'id': 'int32','fname': 'string','lname': 'string'}) + + for _, row in directors_df.iterrows(): + director_id = int(row['id']) + full_name = f"{row['fname']} {row['lname']}".strip() + directors[director_id] = { + 'director_id': director_id, + 'name': full_name + } + + print(f"Loaded {len(directors)} directors") -def load_movie_directors_data(file_path: str) -> bool: +def load_movie_directors_data(file_path): global movies, directors - try: - movie_directors_df = pd.read_csv( - file_path, - encoding='latin-1', - dtype={'did': 'int32', 'mid': 'int32'} - ) + movie_directors_df = pd.read_csv(file_path,encoding='latin-1',dtype={'did': 'int32', 'mid': 'int32'}) + + linked_count = 0 + for _, row in movie_directors_df.iterrows(): + director_id = int(row['did']) + movie_id = int(row['mid']) - linked_count = 0 - for _, row in movie_directors_df.iterrows(): - director_id = int(row['did']) - movie_id = int(row['mid']) - - if movie_id in movies and director_id in directors: - director_info = { - 'director_id': director_id, - 'name': directors[director_id]['name'] - } - movies[movie_id]['directors'].append(director_info) - linked_count += 1 - - return True - - except Exception as e: - print(f"Error loading movie-director relationships: {e}") - return False - -def load_cast_data(file_path: str, max_cast_per_movie: int = 25) -> bool: - global movies, persons - try: - if not os.path.exists(file_path): - return True - - movie_cast_count = defaultdict(int) - linked_count = 0 - - with open(file_path, 'r', encoding='latin-1') as file: - next(file) - - for line_num, line in enumerate(file): - if line_num > 100000: - break - - line = line.strip() - if not line: - continue - - try: - parts = line.split(',') - - if len(parts) < 3: - continue - - person_id = int(parts[0]) - movie_id = int(parts[1]) - role = ','.join(parts[2:]) if len(parts) > 2 else "[Unknown]" - - if (movie_id in movies and person_id in persons and - movie_cast_count[movie_id] < max_cast_per_movie): - - cast_info = { - 'person_id': person_id, - 'name': persons[person_id]['name'], - 'gender': persons[person_id]['gender'], - 'role': role - } - movies[movie_id]['cast'].append(cast_info) - movie_cast_count[movie_id] += 1 - linked_count += 1 - - except (ValueError, KeyError): - continue - - if linked_count > 0: - print(f"Loaded {linked_count} cast entries") - - return True - - except Exception as e: - return True - -def create_indexes(): - global movies_collection - try: - movies_collection.create_index("movie_name", unique=True, name="idx_movie_name") - movies_collection.create_index("year", name="idx_year") - movies_collection.create_index("rank", name="idx_rank") - movies_collection.create_index("directors.name", name="idx_director_name") - - except Exception as e: - pass - -def insert_movies_to_mongodb(batch_size: int = 2000) -> bool: - global movies, movies_collection - try: - movies_collection.drop() - - documents = [] - for movie_id, movie_data in movies.items(): - document = { - 'movie_id': movie_data['movie_id'], - 'movie_name': movie_data['movie_name'], - 'year': movie_data['year'], - 'rank': movie_data['rank'], - 'directors': movie_data['directors'], - 'cast': movie_data['cast'] + if movie_id in movies and director_id in directors: + director_info = { + 'director_id': director_id, + 'name': directors[director_id]['name'] } - documents.append(document) + movies[movie_id]['directors'].append(director_info) + linked_count += 1 + +def load_cast_data(file_path): + global movies, persons + + linked_count = 0 + + with open(file_path, 'r', encoding='latin-1') as file: + next(file) # skip header + + for line in file: + line = line.strip() + if not line: + continue + + parts = line.split(',') + if len(parts) < 3: + continue - if len(documents) >= batch_size: - movies_collection.insert_many(documents, ordered=False) - documents = [] - - if documents: - movies_collection.insert_many(documents, ordered=False) - - create_indexes() - - total_count = movies_collection.count_documents({}) - print(f"Inserted {total_count} movies into MongoDB") - - return True - - except Exception as e: - print(f"Error inserting data into MongoDB: {e}") - return False + person_id = int(parts[0]) + movie_id = int(parts[1]) + role = parts[2] + + if (movie_id in movies and person_id in persons): + cast_info = { + 'person_id': person_id, + 'name': persons[person_id]['name'], + 'gender': persons[person_id]['gender'], + 'role': role + } + movies[movie_id]['cast'].append(cast_info) + linked_count += 1 + + print(f"Loaded {linked_count} cast entries") -def query_movie_by_name(movie_name: str) -> Optional[Dict]: - global movies_collection - try: - result = movies_collection.find_one({"movie_name": movie_name}) - return result - except Exception as e: - print(f"Error querying movie by name: {e}") - return None - -def query_movies_by_year_range(start_year: int, end_year: int) -> List[Dict]: - global movies_collection - try: - query = {"year": {"$gte": start_year, "$lte": end_year}} - results = movies_collection.find(query).sort("year", 1) - return list(results) - except Exception as e: - print(f"Error querying movies by year range: {e}") - return [] +def insert_movies_to_mongodb(): + global movies, movies_collection + movies_collection.drop() + + inserted_count = 0 + + for movie_id, movie_data in movies.items(): + document = { + 'movie_id': movie_data['movie_id'], + 'movie_name': movie_data['movie_name'], + 'year': movie_data['year'], + 'rank': movie_data['rank'], + 'directors': movie_data['directors'], + 'cast': movie_data['cast'] + } + + movies_collection.insert_one(document) + inserted_count += 1 + + # Create indexes + movies_collection.create_index("movie_name", name="idx_movie_name") + movies_collection.create_index("year", name="idx_year") + movies_collection.create_index("rank", name="idx_rank") + movies_collection.create_index("directors.name", name="idx_director_name") + + total_count = movies_collection.count_documents({}) + print(f"Inserted {inserted_count} movies into MongoDB") def main(): - try: - print("IMDB DATA LOADING FOR MONGODB") - print("=" * 50) + # Load in the files + movie_file = os.path.join("IMDB", "IMDBMovie.txt") + person_file = os.path.join("IMDB", "IMDBPerson.txt") + director_file = os.path.join("IMDB", "IMDBDirectors.txt") + movie_director_file = os.path.join("IMDB", "IMDBMovie_Directors.txt") + cast_file = os.path.join("IMDB", "IMDBCast.txt") - data_directory = "IMDB" - sample_size = None - - movie_file = os.path.join(data_directory, "IMDBMovie.txt") - person_file = os.path.join(data_directory, "IMDBPerson.txt") - director_file = os.path.join(data_directory, "IMDBDirectors.txt") - movie_director_file = os.path.join(data_directory, "IMDBMovie_Directors.txt") - cast_file = os.path.join(data_directory, "IMDBCast.txt") - - if not load_movies_data(movie_file, sample_size=sample_size): - print("Failed to load movies") - return - - if not load_persons_data(person_file): - print("Failed to load persons") - return - - if not load_directors_data(director_file): - print("Failed to load directors") - return - - if not load_movie_directors_data(movie_director_file): - print("Failed to load movie-director relationships") - return - - if not load_cast_data(cast_file): - print("Failed to load cast data") - return + # Load the data into memory + load_movies_data(movie_file) + load_persons_data(person_file) + load_directors_data(director_file) + load_movie_directors_data(movie_director_file) + load_cast_data(cast_file) total_movies = len(movies) movies_with_directors = sum(1 for movie in movies.values() if movie['directors']) @@ -338,31 +192,11 @@ def main(): print(f"Movies with directors: {movies_with_directors}") print(f"Movies with cast: {movies_with_cast}") - if connect(): - if insert_movies_to_mongodb(batch_size=1000): - print("Data inserted into MongoDB successfully") - - test_movie = query_movie_by_name("$ (1971)") - if test_movie: - print(f"\nFound movie: {test_movie['movie_name']} ({test_movie.get('year', 'N/A')})") - - total_in_db = movies_collection.count_documents({}) - movies_with_directors_db = movies_collection.count_documents({"directors": {"$ne": []}}) - - print(f"Total movies in database: {total_in_db}") - print(f"Movies with directors: {movies_with_directors_db}") - - else: - print("Failed to insert data into MongoDB") - else: - print("MongoDB not available") - - except Exception as e: - print(f"Error in main execution: {e}") - - finally: - disconnect() + # Insert the data into MongoDB + connect() + insert_movies_to_mongodb() + global client + client.close() -if __name__ == "__main__": - main() \ No newline at end of file +main() \ No newline at end of file diff --git a/task2.py b/task2.py index 9c62e64..719072b 100644 --- a/task2.py +++ b/task2.py @@ -1,3 +1,10 @@ +# HW3 - Task 2 +# Nicholas Pease +# This loads a directed graph from an input file, +# computes the Page Rank for each vertex using Spark, +# and saves the results to an output file. +# This follows the same structure as the assignment description dictates. + from pyspark import SparkConf, SparkContext def main(): diff --git a/task3.py b/task3.py index c2d4268..de37d39 100644 --- a/task3.py +++ b/task3.py @@ -1,3 +1,10 @@ +# HW3 - Task 3 +# Nicholas Pease +# This takes the output from Task 2 (Page Rank results), +# loads it into a Spark DataFrame, performs SQL queries to +# extract specific information, joins with another input file with labels, +# and saves the final results to a CSV file. + from pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession @@ -30,9 +37,12 @@ def main(): names_df = spark.createDataFrame(names_rdd, ["id", "name"]) names_df.createOrReplaceTempView("names") - # e. Join and print dataframes on id + # e. Join and print dataframes on id, save as CSV (t3-out.csv) result = spark.sql("SELECT n.id, n.name, p.page_rank FROM names n JOIN page_rank p ON n.id = p.id") print("Joined DataFrame (id, name, page_rank):") - result.show() + result.show() + with open("t3-out.csv", "w") as f: + for row in result.collect(): + f.write(f"{row['id']} {row['name']} {row['page_rank']}\n") main() \ No newline at end of file