This commit is contained in:
2025-12-18 23:06:51 +00:00
parent 68df013917
commit 47a7fed915
4 changed files with 176 additions and 325 deletions
-2
View File
@@ -1,2 +0,0 @@
ipykernel==6.4.1
pyspark==3.1.2
+157 -321
View File
@@ -1,6 +1,7 @@
# HW3 - Task 1 # HW3 - Task 1
# Nicholas Pease # Nicholas Pease
# IMDB Data Loading into MongoDB # I decided to try MongoD because structure of documents, which aligns closely with JSON format, where I store a majority of the data I work with in other projects.
# I stored the data in this assignment in a collection of movie documents, where each document contains information about a movie, its directors, and its cast.
import csv import csv
import pandas as pd import pandas as pd
@@ -10,7 +11,6 @@ import os
from typing import Dict, List, Optional from typing import Dict, List, Optional
from collections import defaultdict from collections import defaultdict
# Global data structures for processing
movies = {} movies = {}
persons = {} persons = {}
directors = {} directors = {}
@@ -18,317 +18,168 @@ client = None
db = None db = None
movies_collection = None movies_collection = None
def connect(connection_string: str = "mongodb://localhost:27017/", def connect():
database_name: str = "imdb_database") -> bool:
global client, db, movies_collection global client, db, movies_collection
try:
client = MongoClient(connection_string, serverSelectionTimeoutMS=5000) client = MongoClient("mongodb://localhost:27017/", serverSelectionTimeoutMS=5000)
client.admin.command('ping') client.admin.command('ping')
db = client[database_name] db = client["imdb_database"]
movies_collection = db.movies movies_collection = db.movies
print(f"Connected to MongoDB database: {database_name}") print(f"Connected to MongoDB database: imdb_database")
return True
except ConnectionFailure as e:
print(f"Failed to connect to MongoDB: {e}")
return False
except Exception as e:
print(f"MongoDB connection error: {e}")
return False
def disconnect(): def load_movies_data(file_path):
global client
if client:
client.close()
def load_movies_data(file_path: str, sample_size: Optional[int] = None) -> bool:
global movies global movies
try: movies_loaded = 0
movies_loaded = 0 with open(file_path, 'r', encoding='latin-1') as file:
with open(file_path, 'r', encoding='latin-1') as file: next(file) # skip header
next(file)
for line in file:
line = line.strip()
for line in file: parts = line.split(',')
if sample_size and movies_loaded >= sample_size:
break movie_id = int(parts[0])
line = line.strip() year_str = parts[-2] if len(parts) >= 3 else ''
if not line: rank_str = parts[-1] if len(parts) >= 4 else ''
continue
movie_name = ','.join(parts[1:-2]) if len(parts) > 3 else parts[1] if len(parts) > 1 else ''
try:
parts = line.split(',') year = int(year_str) if year_str and year_str.strip() else None
rank = float(rank_str) if rank_str and rank_str.strip() else None
movie_id = int(parts[0])
movies[movie_id] = {
year_str = parts[-2] if len(parts) >= 3 else '' 'movie_id': movie_id,
rank_str = parts[-1] if len(parts) >= 4 else '' 'movie_name': movie_name,
'year': year,
movie_name = ','.join(parts[1:-2]) if len(parts) > 3 else parts[1] if len(parts) > 1 else '' 'rank': rank,
'directors': [],
year = int(year_str) if year_str and year_str.strip() else None 'cast': []
rank = float(rank_str) if rank_str and rank_str.strip() else None }
movies[movie_id] = { movies_loaded += 1
'movie_id': movie_id,
'movie_name': movie_name, print(f"Loaded {len(movies)} movies")
'year': year,
'rank': rank,
'directors': [],
'cast': []
}
movies_loaded += 1
except (ValueError, IndexError):
continue
print(f"Loaded {len(movies)} movies")
return True
except Exception as e:
print(f"Error loading movies: {e}")
return False
def load_persons_data(file_path: str) -> bool: def load_persons_data(file_path):
global persons global persons
try: persons_df = pd.read_csv(file_path,encoding='latin-1',dtype={'id': 'int32','fname': 'string','lname': 'string', 'gender': 'string'})
persons_df = pd.read_csv(
file_path, for _, row in persons_df.iterrows():
encoding='latin-1', person_id = int(row['id'])
dtype={ full_name = f"{row['fname']} {row['lname']}".strip()
'id': 'int32', persons[person_id] = {
'fname': 'string', 'person_id': person_id,
'lname': 'string', 'name': full_name,
'gender': 'string' 'gender': str(row['gender'])
} }
)
print(f"Loaded {len(persons)} persons")
for _, row in persons_df.iterrows():
person_id = int(row['id'])
full_name = f"{row['fname']} {row['lname']}".strip()
persons[person_id] = {
'person_id': person_id,
'name': full_name,
'gender': str(row['gender'])
}
print(f"Loaded {len(persons)} persons")
return True
except Exception as e:
print(f"Error loading persons: {e}")
return False
def load_directors_data(file_path: str) -> bool: def load_directors_data(file_path):
global directors global directors
try: directors_df = pd.read_csv(file_path,encoding="latin-1",dtype={'id': 'int32','fname': 'string','lname': 'string'})
directors_df = pd.read_csv(
file_path, for _, row in directors_df.iterrows():
encoding="latin-1", director_id = int(row['id'])
dtype={ full_name = f"{row['fname']} {row['lname']}".strip()
'id': 'int32', directors[director_id] = {
'fname': 'string', 'director_id': director_id,
'lname': 'string' 'name': full_name
} }
)
print(f"Loaded {len(directors)} directors")
for _, row in directors_df.iterrows():
director_id = int(row['id'])
full_name = f"{row['fname']} {row['lname']}".strip()
directors[director_id] = {
'director_id': director_id,
'name': full_name
}
print(f"Loaded {len(directors)} directors")
return True
except Exception as e:
print(f"Error loading directors: {e}")
return False
def load_movie_directors_data(file_path: str) -> bool: def load_movie_directors_data(file_path):
global movies, directors global movies, directors
try: movie_directors_df = pd.read_csv(file_path,encoding='latin-1',dtype={'did': 'int32', 'mid': 'int32'})
movie_directors_df = pd.read_csv(
file_path, linked_count = 0
encoding='latin-1', for _, row in movie_directors_df.iterrows():
dtype={'did': 'int32', 'mid': 'int32'} director_id = int(row['did'])
) movie_id = int(row['mid'])
linked_count = 0 if movie_id in movies and director_id in directors:
for _, row in movie_directors_df.iterrows(): director_info = {
director_id = int(row['did']) 'director_id': director_id,
movie_id = int(row['mid']) 'name': directors[director_id]['name']
if movie_id in movies and director_id in directors:
director_info = {
'director_id': director_id,
'name': directors[director_id]['name']
}
movies[movie_id]['directors'].append(director_info)
linked_count += 1
return True
except Exception as e:
print(f"Error loading movie-director relationships: {e}")
return False
def load_cast_data(file_path: str, max_cast_per_movie: int = 25) -> bool:
global movies, persons
try:
if not os.path.exists(file_path):
return True
movie_cast_count = defaultdict(int)
linked_count = 0
with open(file_path, 'r', encoding='latin-1') as file:
next(file)
for line_num, line in enumerate(file):
if line_num > 100000:
break
line = line.strip()
if not line:
continue
try:
parts = line.split(',')
if len(parts) < 3:
continue
person_id = int(parts[0])
movie_id = int(parts[1])
role = ','.join(parts[2:]) if len(parts) > 2 else "[Unknown]"
if (movie_id in movies and person_id in persons and
movie_cast_count[movie_id] < max_cast_per_movie):
cast_info = {
'person_id': person_id,
'name': persons[person_id]['name'],
'gender': persons[person_id]['gender'],
'role': role
}
movies[movie_id]['cast'].append(cast_info)
movie_cast_count[movie_id] += 1
linked_count += 1
except (ValueError, KeyError):
continue
if linked_count > 0:
print(f"Loaded {linked_count} cast entries")
return True
except Exception as e:
return True
def create_indexes():
global movies_collection
try:
movies_collection.create_index("movie_name", unique=True, name="idx_movie_name")
movies_collection.create_index("year", name="idx_year")
movies_collection.create_index("rank", name="idx_rank")
movies_collection.create_index("directors.name", name="idx_director_name")
except Exception as e:
pass
def insert_movies_to_mongodb(batch_size: int = 2000) -> bool:
global movies, movies_collection
try:
movies_collection.drop()
documents = []
for movie_id, movie_data in movies.items():
document = {
'movie_id': movie_data['movie_id'],
'movie_name': movie_data['movie_name'],
'year': movie_data['year'],
'rank': movie_data['rank'],
'directors': movie_data['directors'],
'cast': movie_data['cast']
} }
documents.append(document) movies[movie_id]['directors'].append(director_info)
linked_count += 1
def load_cast_data(file_path):
global movies, persons
linked_count = 0
with open(file_path, 'r', encoding='latin-1') as file:
next(file) # skip header
for line in file:
line = line.strip()
if not line:
continue
parts = line.split(',')
if len(parts) < 3:
continue
if len(documents) >= batch_size: person_id = int(parts[0])
movies_collection.insert_many(documents, ordered=False) movie_id = int(parts[1])
documents = [] role = parts[2]
if documents: if (movie_id in movies and person_id in persons):
movies_collection.insert_many(documents, ordered=False) cast_info = {
'person_id': person_id,
create_indexes() 'name': persons[person_id]['name'],
'gender': persons[person_id]['gender'],
total_count = movies_collection.count_documents({}) 'role': role
print(f"Inserted {total_count} movies into MongoDB") }
movies[movie_id]['cast'].append(cast_info)
return True linked_count += 1
except Exception as e: print(f"Loaded {linked_count} cast entries")
print(f"Error inserting data into MongoDB: {e}")
return False
def query_movie_by_name(movie_name: str) -> Optional[Dict]: def insert_movies_to_mongodb():
global movies_collection global movies, movies_collection
try: movies_collection.drop()
result = movies_collection.find_one({"movie_name": movie_name})
return result inserted_count = 0
except Exception as e:
print(f"Error querying movie by name: {e}") for movie_id, movie_data in movies.items():
return None document = {
'movie_id': movie_data['movie_id'],
def query_movies_by_year_range(start_year: int, end_year: int) -> List[Dict]: 'movie_name': movie_data['movie_name'],
global movies_collection 'year': movie_data['year'],
try: 'rank': movie_data['rank'],
query = {"year": {"$gte": start_year, "$lte": end_year}} 'directors': movie_data['directors'],
results = movies_collection.find(query).sort("year", 1) 'cast': movie_data['cast']
return list(results) }
except Exception as e:
print(f"Error querying movies by year range: {e}") movies_collection.insert_one(document)
return [] inserted_count += 1
# Create indexes
movies_collection.create_index("movie_name", name="idx_movie_name")
total_count = movies_collection.count_documents({})
print(f"Inserted {inserted_count} movies into MongoDB")
def main(): def main():
try: # Load in the files
print("IMDB DATA LOADING FOR MONGODB") movie_file = os.path.join("IMDB", "IMDBMovie.txt")
print("=" * 50) person_file = os.path.join("IMDB", "IMDBPerson.txt")
director_file = os.path.join("IMDB", "IMDBDirectors.txt")
movie_director_file = os.path.join("IMDB", "IMDBMovie_Directors.txt")
cast_file = os.path.join("IMDB", "IMDBCast.txt")
data_directory = "IMDB" # Load the data into memory
sample_size = None load_movies_data(movie_file)
load_persons_data(person_file)
movie_file = os.path.join(data_directory, "IMDBMovie.txt") load_directors_data(director_file)
person_file = os.path.join(data_directory, "IMDBPerson.txt") load_movie_directors_data(movie_director_file)
director_file = os.path.join(data_directory, "IMDBDirectors.txt") load_cast_data(cast_file)
movie_director_file = os.path.join(data_directory, "IMDBMovie_Directors.txt")
cast_file = os.path.join(data_directory, "IMDBCast.txt")
if not load_movies_data(movie_file, sample_size=sample_size):
print("Failed to load movies")
return
if not load_persons_data(person_file):
print("Failed to load persons")
return
if not load_directors_data(director_file):
print("Failed to load directors")
return
if not load_movie_directors_data(movie_director_file):
print("Failed to load movie-director relationships")
return
if not load_cast_data(cast_file):
print("Failed to load cast data")
return
total_movies = len(movies) total_movies = len(movies)
movies_with_directors = sum(1 for movie in movies.values() if movie['directors']) movies_with_directors = sum(1 for movie in movies.values() if movie['directors'])
@@ -338,31 +189,16 @@ def main():
print(f"Movies with directors: {movies_with_directors}") print(f"Movies with directors: {movies_with_directors}")
print(f"Movies with cast: {movies_with_cast}") print(f"Movies with cast: {movies_with_cast}")
if connect(): # Insert the data into MongoDB
if insert_movies_to_mongodb(batch_size=1000): connect()
print("Data inserted into MongoDB successfully") insert_movies_to_mongodb()
test_movie = query_movie_by_name("$ (1971)")
if test_movie:
print(f"\nFound movie: {test_movie['movie_name']} ({test_movie.get('year', 'N/A')})")
total_in_db = movies_collection.count_documents({})
movies_with_directors_db = movies_collection.count_documents({"directors": {"$ne": []}})
print(f"Total movies in database: {total_in_db}")
print(f"Movies with directors: {movies_with_directors_db}")
else:
print("Failed to insert data into MongoDB")
else:
print("MongoDB not available")
except Exception as e: # b Query DB for "Shrek (2001)"
print(f"Error in main execution: {e}") result = movies_collection.find_one({'movie_name': "Shrek (2001)"})
print('\nQuery Result for "Shrek (2001)":')
finally: print(result)
disconnect()
global client
client.close()
if __name__ == "__main__": main()
main()
+7
View File
@@ -1,3 +1,10 @@
# HW3 - Task 2
# Nicholas Pease
# This loads a directed graph from an input file,
# computes the Page Rank for each vertex using Spark,
# and saves the results to an output file.
# This follows the same structure as the assignment description dictates.
from pyspark import SparkConf, SparkContext from pyspark import SparkConf, SparkContext
def main(): def main():
+12 -2
View File
@@ -1,3 +1,10 @@
# HW3 - Task 3
# Nicholas Pease
# This takes the output from Task 2 (Page Rank results),
# loads it into a Spark DataFrame, performs SQL queries to
# extract specific information, joins with another input file with labels,
# and saves the final results to a CSV file.
from pyspark import SparkConf, SparkContext from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
@@ -30,9 +37,12 @@ def main():
names_df = spark.createDataFrame(names_rdd, ["id", "name"]) names_df = spark.createDataFrame(names_rdd, ["id", "name"])
names_df.createOrReplaceTempView("names") names_df.createOrReplaceTempView("names")
# e. Join and print dataframes on id # e. Join and print dataframes on id, save as CSV (t3-out.csv)
result = spark.sql("SELECT n.id, n.name, p.page_rank FROM names n JOIN page_rank p ON n.id = p.id") result = spark.sql("SELECT n.id, n.name, p.page_rank FROM names n JOIN page_rank p ON n.id = p.id")
print("Joined DataFrame (id, name, page_rank):") print("Joined DataFrame (id, name, page_rank):")
result.show() result.show()
with open("t3-out.csv", "w") as f:
for row in result.collect():
f.write(f"{row['id']} {row['name']} {row['page_rank']}\n")
main() main()