# Setup - Run only once per Kernel App
# %conda install openjdk -y
# # install PySpark
# %pip install pyspark==3.2.0
# restart kernel
from IPython.core.display import HTML
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.functions import when, col, desc, regexp_replace, lower, udf, to_date
from pyspark.sql import DataFrame
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, VectorAssembler, StringIndexer
from pyspark.sql.types import BooleanType, StringType, DateType, IntegerType, DoubleType
from pyspark.ml import Pipeline
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
spark = (
.config("spark.executor.memory", "10g")
.config("spark.driver.memory", "10g")
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
import time
import sagemaker
from sagemaker.spark.processing import PySparkProcessor
# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
# s3 paths
session = sagemaker.Session()
bucket = session.default_bucket()
output_prefix_logs = f"spark_logs"
df_path = f"reddit_soccer_data/"
s3_path = f"s3a://{bucket}/{df_path}"
print(f"reading submissions from {s3_path}")
df = spark.read.parquet(s3_path, header=True)
print(f"shape of the dataframe is {df.count():,}x{len(df.columns)}")
reading submissions from s3a://sagemaker-us-east-1-634977863454/reddit_soccer_data/
shape of the dataframe is 5,617,852x9 CPU times: user 39.7 ms, sys: 4.61 ms, total: 44.3 ms Wall time: 1min 5s
# Select only the required columns
selected_df = df.select('author_flair_text', 'body', 'created_utc', 'score', 'subreddit')
# DataFrame for Arsenal
df_arsenal = selected_df.filter(
(col('subreddit') == 'Gunners') |
((col('subreddit') == 'soccer') & (col('author_flair_text') == ':Arsenal:'))
# DataFrame for Manchester United
df_man_united = selected_df.filter(
(col('subreddit') == 'reddevils') |
((col('subreddit') == 'soccer') & (col('author_flair_text') == ':Manchester_United:'))
df_chelsea = selected_df.filter(
(col('subreddit') == 'chelseafc') |
((col('subreddit') == 'soccer') & (col('author_flair_text') == ':Chelsea:'))
# Removing non-alphanumeric characters
df_arsenal = df_arsenal.withColumn('clean_body', regexp_replace('body', '[^A-Za-z0-9\s]+', ''))
df_man_united = df_man_united.withColumn('clean_body', regexp_replace('body', '[^A-Za-z0-9\s]+', ''))
df_chelsea = df_chelsea.withColumn('clean_body', regexp_replace('body', '[^A-Za-z0-9\s]+', ''))
# Converting to lowercase
df_arsenal = df_arsenal.withColumn('clean_body', lower(col('clean_body')))
df_man_united = df_man_united.withColumn('clean_body', lower(col('clean_body')))
df_chelsea = df_chelsea.withColumn('clean_body', lower(col('clean_body')))
# Tokenize text
tokenizer = Tokenizer(inputCol='clean_body', outputCol='words')
df_arsenal = tokenizer.transform(df_arsenal)
tokenizer = Tokenizer(inputCol='clean_body', outputCol='words')
df_man_united = tokenizer.transform(df_man_united)
tokenizer = Tokenizer(inputCol='clean_body', outputCol='words')
df_chelsea = tokenizer.transform(df_chelsea)
# Remove stop words
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
df_arsenal = remover.transform(df_arsenal)
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
df_man_united = remover.transform(df_man_united)
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
df_chelsea = remover.transform(df_chelsea)
def filter_words(words_list):
return words_list == ['deleted'] or words_list == []
# Define the UDF for Filter Words
filter_words_udf = udf(filter_words, BooleanType())
df_arsenal = df_arsenal.withColumn('remove_row', filter_words_udf(col('filtered_words')))
df_arsenal = df_arsenal.filter(col('remove_row') == False)
df_arsenal = df_arsenal.drop('remove_row')
df_man_united = df_man_united.withColumn('remove_row', filter_words_udf(col('filtered_words')))
df_man_united = df_man_united.filter(col('remove_row') == False)
df_man_united = df_man_united.drop('remove_row')
df_chelsea = df_chelsea.withColumn('remove_row', filter_words_udf(col('filtered_words')))
df_chelsea = df_chelsea.filter(col('remove_row') == False)
df_chelsea = df_chelsea.drop('remove_row')
# Define the UDF for Concatting
concat_udf = udf(lambda words: ' '.join(words), StringType())
df_arsenal = df_arsenal.withColumn("concatenated_text", concat_udf(col("filtered_words")))
df_arsenal = df_arsenal.drop('body', 'clean_body', 'words', 'filtered_words')
df_man_united = df_man_united.withColumn("concatenated_text", concat_udf(col("filtered_words")))
df_man_united = df_man_united.drop('body', 'clean_body', 'words', 'filtered_words')
df_chelsea = df_chelsea.withColumn("concatenated_text", concat_udf(col("filtered_words")))
df_chelsea = df_chelsea.drop('body', 'clean_body', 'words', 'filtered_words')
df_arsenal = df_arsenal.withColumn('date', to_date('created_utc'))
df_arsenal = df_arsenal.drop('created_utc')
df_man_united = df_man_united.withColumn('date', to_date('created_utc'))
df_man_united = df_man_united.drop('created_utc')
df_chelsea = df_chelsea.withColumn('date', to_date('created_utc'))
df_chelsea = df_chelsea.drop('created_utc')
matches = pd.read_csv("./top_5_teams_scores.csv")
matches['date'] = pd.to_datetime(matches['date'], format='%m/%d/%Y')
df_matches = spark.createDataFrame(matches)
# Convert the 'date' column in matches_spark to date type if it's not already
df_matches = df_matches.withColumn("date", df_matches["date"].cast(DateType()))
+-------+----------+----------+------+--------+-------+--------------+ | team| opponent| date|scored|conceded|outcome| tournament| +-------+----------+----------+------+--------+-------+--------------+ |Arsenal| Newcastle|2023-01-03| 0| 0| draw|Premier League| |Arsenal|Oxford Utd|2023-01-09| 3| 0| win| FA Cup| |Arsenal| Tottenham|2023-01-15| 2| 0| win|Premier League| |Arsenal|Man United|2023-01-22| 3| 2| win|Premier League| |Arsenal| Man City|2023-01-27| 0| 1| loss| FA Cup| +-------+----------+----------+------+--------+-------+--------------+ only showing top 5 rows
df_arsenal_matches = df_matches.filter(df_matches['team'] == 'Arsenal')
df_arsenal_merged = df_arsenal.join(df_arsenal_matches, on='date', how='inner')
df_arsenal_merged = df_arsenal_merged.drop('author_flair_text', 'subreddit', 'team', 'opponent', 'scored', 'conceded', 'tournament')
# df_arsenal_merged.cache()
df_man_united_matches = df_matches.filter(df_matches['team'] == 'Man United')
df_man_united_merged = df_man_united.join(df_man_united_matches, on='date', how='inner')
df_man_united_merged = df_man_united_merged.drop('author_flair_text', 'subreddit', 'team', 'opponent', 'scored', 'conceded', 'tournament')
# df_man_united_merged.cache()
df_chelsea_matches = df_matches.filter(df_matches['team'] == 'Chelsea')
df_chelsea_merged = df_chelsea.join(df_chelsea_matches, on='date', how='inner')
df_chelsea_merged = df_chelsea_merged.drop('author_flair_text', 'subreddit', 'team', 'opponent', 'scored', 'conceded', 'tournament')
# df_chelsea_merged.cache()
df_ml = df_arsenal_merged.union(df_man_united_merged).union(df_chelsea_merged)
df_ml = df_ml.dropna()
# Filter out rows where 'outcome' is 'draw'
df_ml = df_ml.filter(df_ml['outcome'] != 'draw')
df_ml = df_ml.orderBy('date')
DataFrame[date: date, score: bigint, concatenated_text: string, outcome: string]
+----------+-----+--------------------+-------+ | date|score| concatenated_text|outcome| +----------+-----+--------------------+-------+ |2023-01-03| 7|bournemouth win ...| win| |2023-01-03| 3|problem city like...| win| |2023-01-03| 20|big beat city ars...| win| |2023-01-03| 4| season| win| |2023-01-03| 9|glad sign alex sa...| win| |2023-01-03| 40|penalty pulling s...| win| |2023-01-03| 7|bournemouth great...| win| |2023-01-03| 8|cas bit early mak...| win| |2023-01-03| 40|mikel disappointi...| win| |2023-01-03| 54|happy new year wa...| win| +----------+-----+--------------------+-------+ only showing top 10 rows
root |-- date: date (nullable = true) |-- score: long (nullable = true) |-- concatenated_text: string (nullable = true) |-- outcome: string (nullable = true)
+-------+------+ |outcome| count| +-------+------+ | win|431555| | loss|199280| +-------+------+
There are 2 outcomes as the label of this machine learning task. Logistic Regression would be a good starting point.
tokenizer = Tokenizer(inputCol="concatenated_text", outputCol="words")
df_words = tokenizer.transform(df_ml)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
df_tf = hashingTF.transform(df_words)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(df_tf)
df_tfidf = idfModel.transform(df_tf)
df_tfidf = df_tfidf.select("date", "score", "features", "outcome")
root |-- date: date (nullable = true) |-- score: long (nullable = true) |-- features: vector (nullable = true) |-- outcome: string (nullable = true)
+----------+-----+--------------------+-------+ | date|score| features|outcome| +----------+-----+--------------------+-------+ |2023-01-03| 7|(262144,[14385,18...| win| |2023-01-03| 3|(262144,[16293,71...| win| |2023-01-03| 20|(262144,[54961,11...| win| |2023-01-03| 4|(262144,[55270],[...| win| |2023-01-03| 9|(262144,[89721,20...| win| +----------+-----+--------------------+-------+ only showing top 5 rows
indexer = StringIndexer(inputCol="outcome", outputCol="indexedOutcome")
df_indexed = indexer.fit(df_tfidf).transform(df_tfidf)
# Check the assigned indices
indexer_model = indexer.fit(df_tfidf)
labels = indexer_model.labels
win_index = labels.index('win')
loss_index = labels.index('loss')
def map_outcome(outcome_index):
return 1 if outcome_index == win_index else 0
map_outcome_udf = udf(map_outcome, IntegerType())
df_final = df_indexed.withColumn("label", map_outcome_udf("indexedOutcome"))
df_final = df_final.drop('outcome', 'indexedOutcome')
root |-- date: date (nullable = true) |-- score: long (nullable = true) |-- features: vector (nullable = true) |-- label: integer (nullable = true)
+----------+-----+--------------------+-----+ | date|score| features|label| +----------+-----+--------------------+-----+ |2023-01-03| 7|(262144,[14385,18...| 1| |2023-01-03| 3|(262144,[16293,71...| 1| |2023-01-03| 20|(262144,[54961,11...| 1| |2023-01-03| 4|(262144,[55270],[...| 1| |2023-01-03| 9|(262144,[89721,20...| 1| +----------+-----+--------------------+-----+ only showing top 5 rows
assembler = VectorAssembler(inputCols=["score", "features"], outputCol="combined_features")
df_combined = assembler.transform(df_final)
train_data, test_data = df_combined.randomSplit([0.75, 0.25], seed=140)
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="combined_features", labelCol="label")
lrModel = lr.fit(train_data)
model_path = './ml_score_model'
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
# Make predictions
predictions = lrModel.transform(test_data)
# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")
accuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
print("Accuracy: ", accuracy.evaluate(predictions))
print("Precision: ", precision.evaluate(predictions))
print("Recall: ", recall.evaluate(predictions))
print("F1 Score: ", f1.evaluate(predictions))
Accuracy: 0.7115235836493462
Precision: 0.6889781799748806
Recall: 0.7115235836493462
F1 Score: 0.6842595014191711
from pyspark.mllib.evaluation import MulticlassMetrics
# Convert prediction and label columns to DoubleType
predictions = predictions.withColumn("label", col("label").cast(DoubleType()))
predictions = predictions.withColumn("prediction", col("prediction").cast(DoubleType()))
# Convert to RDD
predictionAndLabels = predictions.select("prediction", "label").rdd
# Instantiate metrics object
metrics = MulticlassMetrics(predictionAndLabels)
# Confusion Matrix
confusionMatrix = metrics.confusionMatrix().toArray()
print("Confusion Matrix:\n", confusionMatrix)
Confusion Matrix: [[16162. 33682.] [11809. 96041.]]
# Set the labels for the confusion matrix
class_labels = ['Loss', 'Win']
# Create a pandas DataFrame from the confusion matrix
conf_matrix_df = pd.DataFrame(confusionMatrix, index=class_labels, columns=class_labels)
# Plot using seaborn
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix_df, annot=True, fmt='g', cmap='Blues')
plt.title('Confusion Matrix on Match Score prediction', fontsize=24)
plt.ylabel('Actual', fontsize=18)
plt.xlabel('Predicted', fontsize=18)
Three most popular teams are chosen to retrieve the comments of their fans on a matchday, and the match result of the team on that day are also recorded. The figure here shows the cleaned comments text body along with the comments' score as input features, where the training and predicting labels are the outcome of the match on the day the comments were made. Prior to the actual machine learning section, a few preparations are done on this dataset, including vectorizing the text bodies using tokenizer and hashing into a new feature, transforming the outcomes into 1s (win) and 0s (loss), and combining the new feature and the score of each comment into the correct traning feature.
Since only 2 outcome: win or loss are used in this machine learning study, logistic regression is the model to be trained. After the model is trained on the comment scores and the vectorized comments texts, it can then predict on the testing features. According the results of the measurements, the accuracy, precision and recall of the predictions compared to the testing labels are all around 0.7, which is a bit disappointing for a binary classification learning. According to the confusion matrix, it seems that the model is good at predicting actual wins, however when predicting losses the performance is not quite good. Thus we can conclude that the model can moderately tell the outcome of a match based on the comments and their scores.