# Setup - Run only once per Kernel App
%conda install openjdk -y
# install PySpark
%pip install pyspark==3.2.0
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Collecting package metadata (current_repodata.json): done Solving environment: done ==> WARNING: A newer version of conda exists. <== current version: 23.3.1 latest version: 23.10.0 Please update conda by running $ conda update -n base -c defaults conda Or to minimize the number of packages updated during conda update use conda install conda=23.10.0 ## Package Plan ## environment location: /opt/conda added / updated specs: - openjdk The following packages will be downloaded: package | build ---------------------------|----------------- ca-certificates-2023.08.22 | h06a4308_0 123 KB certifi-2023.11.17 | py310h06a4308_0 158 KB openjdk-11.0.13 | h87a67e3_0 341.0 MB ------------------------------------------------------------ Total: 341.3 MB The following NEW packages will be INSTALLED: openjdk pkgs/main/linux-64::openjdk-11.0.13-h87a67e3_0 The following packages will be UPDATED: ca-certificates conda-forge::ca-certificates-2023.7.2~ --> pkgs/main::ca-certificates-2023.08.22-h06a4308_0 certifi conda-forge/noarch::certifi-2023.7.22~ --> pkgs/main/linux-64::certifi-2023.11.17-py310h06a4308_0 Downloading and Extracting Packages openjdk-11.0.13 | 341.0 MB | | 0% ca-certificates-2023 | 123 KB | | 0% certifi-2023.11.17 | 158 KB | | 0% certifi-2023.11.17 | 158 KB | ##################################### | 100% Preparing transaction: done Verifying transaction: done Executing transaction: done Note: you may need to restart the kernel to use updated packages. Collecting pyspark==3.2.0 Using cached pyspark-3.2.0-py2.py3-none-any.whl Collecting py4j==0.10.9.2 (from pyspark==3.2.0) Using cached py4j-0.10.9.2-py2.py3-none-any.whl (198 kB) Installing collected packages: py4j, pyspark Successfully installed py4j-0.10.9.2 pyspark-3.2.0 WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip Note: you may need to restart the kernel to use updated packages.
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("PySparkApp")
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider",
)
.getOrCreate()
)
print(spark.version)
Warning: Ignoring non-Spark config property: fs.s3a.aws.credentials.provider WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.0.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release
:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache The jars for the packages stored in: /root/.ivy2/jars org.apache.hadoop#hadoop-aws added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-3fa0670e-1f15-4929-827a-c656288f4e27;1.0 confs: [default] found org.apache.hadoop#hadoop-aws;3.2.2 in central found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central :: resolution report :: resolve 309ms :: artifacts dl 23ms :: modules in use: com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default] org.apache.hadoop#hadoop-aws;3.2.2 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 2 | 0 | 0 | 0 || 2 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-3fa0670e-1f15-4929-827a-c656288f4e27 confs: [default] 0 artifacts copied, 2 already retrieved (0kB/16ms) 23/11/30 20:39:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
3.2.0
%%time
import time
import sagemaker
from sagemaker.spark.processing import PySparkProcessor
# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
base_job_name="sm-spark-project",
framework_version="3.3",
role=role,
instance_count=8,
instance_type="ml.m5.xlarge",
max_runtime_in_seconds=3600,
)
# s3 paths
session = sagemaker.Session()
bucket = session.default_bucket()
output_prefix_logs = f"spark_logs"
# modify this comma separated list to choose the subreddits of interest
subreddits = "socialism, Economics"
configuration = [
{
"Classification": "spark-defaults",
"Properties": {"spark.executor.memory": "12g", "spark.executor.cores": "4"},
}
]
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml CPU times: user 2.43 s, sys: 402 ms, total: 2.83 s Wall time: 2.78 s
%%time
output_prefix_data_submissions = f"project/submissions/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_submissions}"
print(f"reading submissions from {s3_path}")
submissions = spark.read.parquet(s3_path, header=True)
print(f"shape of the submissions dataframe is {submissions.count():,}x{len(submissions.columns)}")
reading submissions from s3a://sagemaker-us-east-1-019290627849/project/submissions/yyyy=*
23/11/30 20:41:30 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 23/11/30 20:41:35 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'. [Stage 1:====================================================> (13 + 1) / 14]
shape of the submissions dataframe is 34,345x68 CPU times: user 22.2 ms, sys: 4.46 ms, total: 26.6 ms Wall time: 40 s
# check counts (ensuring all needed subreddits exist)
submissions.groupBy('subreddit').count().show()
[Stage 4:====================================================> (13 + 1) / 14]
+---------+-----+ |subreddit|count| +---------+-----+ | soccer|34345| +---------+-----+
# display a subset of columns
df = submissions.select("title", "created_utc", "score", "num_comments", "num_crossposts", "is_video", "over_18", "distinguished", "preview", "media").cache()
df.show()
[Stage 7:> (0 + 1) / 1]
+--------------------+-------------------+-----+------------+--------------+--------+-------+-------------+--------------------+--------------------+ | title| created_utc|score|num_comments|num_crossposts|is_video|over_18|distinguished| preview| media| +--------------------+-------------------+-----+------------+--------------+--------+-------+-------------+--------------------+--------------------+ |[Sky Sports Footb...|2023-01-19 15:13:46| 1| 1| 0| false| false| null| null|{null, {Sky Sport...| |[Portsmouth News]...|2023-01-19 15:16:17| 555| 85| 0| false| false| null| null| null| |Jonas Omlin signi...|2023-01-19 15:23:57| 96| 12| 0| false| false| null|{false, [{yVBwrxA...| null| |EFL: Championship...|2023-01-19 15:28:04| 33| 14| 0| false| false| null|{false, [{s1lRhtU...| null| |Al Ittihad [1] - ...|2023-01-19 15:28:50| 10| 3| 0| true| false| null|{false, [{g5WLpDe...|{null, null, {htt...| |Fabrizio on Twitt...|2023-01-19 15:29:39| 1| 1| 0| false| false| null| null|{null, {Fabrizio ...| |FC Barcelona Wome...|2023-01-19 15:30:45| 46| 11| 0| false| false| null|{false, [{6-_jKLY...| null| |[Fabrizio Romano]...|2023-01-19 15:32:16| 9| 12| 0| false| false| null| null|{null, {Fabrizio ...| |[Lyall Thomas] Sw...|2023-01-19 15:32:23| 42| 12| 0| false| false| null| null|{null, {Lyall Tho...| |Atalanta [4]-2 Sp...|2023-01-19 15:33:42| 168| 35| 1| false| false| null| null| null| |Mexico forfeits m...|2023-01-19 15:43:16| 23| 4| 0| false| false| null|{false, [{Rd1d4dj...| null| |[Hugo Delom - l'E...|2023-01-19 15:43:35| 17| 7| 0| false| false| null|{false, [{tc9GvYb...| null| |Columbus Crew acq...|2023-01-19 15:47:40| 28| 3| 0| false| false| null|{false, [{MhWL9gx...| null| |Drug testing afte...|2023-01-19 15:49:38| 1| 1| 0| false| false| null| null| null| |What’s the reason...|2023-02-16 12:23:41| 1| 1| 0| false| false| null| null| null| |[The Athletic] So...|2023-02-16 12:28:40| 2844| 285| 0| false| false| null|{false, [{fz-zv9Q...| null| |[Footballogue] Ne...|2023-02-16 12:33:48| 1| 1| 0| false| false| null| null|{null, {Footballo...| |[Gerard Romero] G...|2023-02-16 12:34:34| 574| 133| 1| false| false| null|{false, [{J5_mIVV...|{null, {Gerard Ro...| |The Vietnam Footb...|2023-02-16 12:34:34| 40| 11| 0| false| false| null|{false, [{6OJssU9...| null| |Legendary Iranian...|2023-02-16 12:35:36| 40| 5| 0| false| false| null|{false, [{RiiEhJ6...| null| +--------------------+-------------------+-----+------------+--------------+--------+-------+-------------+--------------------+--------------------+ only showing top 20 rows
from pyspark.sql.functions import explode, col, size, split, col, hour, from_utc_timestamp, when, coalesce, lit
# get columns from the second layer of the schema
df_cleaned = df.withColumn("preview_enabled", col("preview.enabled")).withColumn("duration", col("media.reddit_video.duration"))\
.drop("preview", "media").cache()
# get the length of title
df_cleaned = df_cleaned.withColumn("title_length", size(split(df["title"], " "))).drop("title", "")
# get the time of publish
df_cleaned = df_cleaned.withColumn("night", when((hour(col("created_utc")) >= 3) & (hour(col("created_utc")) < 18), False).otherwise(True))\
.drop("created_utc").withColumn("distinguished", when(col("distinguished").isNull(), False).otherwise(True))
# define the target column
df_cleaned = df_cleaned.withColumn("popular", when(df["num_comments"] > 20, 1).otherwise(0)).cache()
# replace the null values
df_cleaned = df_cleaned.withColumn("duration", coalesce(df_cleaned["duration"], lit(0)))\
.withColumn("preview_enabled", coalesce(df_cleaned["preview_enabled"], lit(False))).cache()
df_cleaned.show()
+-----+------------+--------------+--------+-------+-------------+---------------+--------+------------+-----+-------+ |score|num_comments|num_crossposts|is_video|over_18|distinguished|preview_enabled|duration|title_length|night|popular| +-----+------------+--------------+--------+-------+-------------+---------------+--------+------------+-----+-------+ | 1| 1| 0| false| false| false| false| 0| 14|false| 0| | 555| 85| 0| false| false| false| false| 0| 12|false| 1| | 96| 12| 0| false| false| false| false| 0| 8|false| 0| | 33| 14| 0| false| false| false| false| 0| 15|false| 0| | 10| 3| 0| true| false| false| false| 39| 21|false| 0| | 1| 1| 0| false| false| false| false| 0| 48|false| 0| | 46| 11| 0| false| false| false| false| 0| 22|false| 0| | 9| 12| 0| false| false| false| false| 0| 48|false| 0| | 42| 12| 0| false| false| false| false| 0| 26|false| 0| | 168| 35| 1| false| false| false| false| 0| 7|false| 1| | 23| 4| 0| false| false| false| false| 0| 8|false| 0| | 17| 7| 0| false| false| false| false| 0| 52|false| 0| | 28| 3| 0| false| false| false| false| 0| 12|false| 0| | 1| 1| 0| false| false| false| false| 0| 4|false| 0| | 1| 1| 0| false| false| false| false| 0| 12|false| 0| | 2844| 285| 0| false| false| false| false| 0| 25|false| 1| | 1| 1| 0| false| false| false| false| 0| 37|false| 0| | 574| 133| 1| false| false| false| false| 0| 44|false| 1| | 40| 11| 0| false| false| false| false| 0| 22|false| 0| | 40| 5| 0| false| false| false| false| 0| 13|false| 0| +-----+------------+--------------+--------+-------+-------------+---------------+--------+------------+-----+-------+ only showing top 20 rows
from pyspark.ml.feature import VectorAssembler, IndexToString, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
# drop extra columns
df_num = df_cleaned.drop("duration", "num_comments")
# assemble all variables for training
feature_list = []
for col in df_num.columns:
if col == 'popular':
continue
else:
feature_list.append(col)
assembler = VectorAssembler(inputCols=feature_list, outputCol="features")
# create a random forest model
rf = RandomForestClassifier(labelCol="popular", featuresCol="features")
# create a pipeline
pipeline = Pipeline(stages=[assembler, rf])
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import numpy as np
# create a parameter map for grid search
paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [int(x) for x in np.linspace(start = 10, stop = 50, num = 3)]) \
.addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 25, num = 3)]) \
.build()
# implement crossvalidation
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(labelCol="popular"),
numFolds=5)
# split the dataset
(trainingData, testData) = df_num.randomSplit([0.8, 0.2])
# train the data with training set and get predictions of test set
cvModel = crossval.fit(trainingData)
predictions = cvModel.transform(testData)
23/11/30 20:43:35 WARN DAGScheduler: Broadcasting large task binary with size 1291.7 KiB 23/11/30 20:43:36 WARN DAGScheduler: Broadcasting large task binary with size 1629.6 KiB 23/11/30 20:43:37 WARN DAGScheduler: Broadcasting large task binary with size 1945.5 KiB 23/11/30 20:43:38 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB 23/11/30 20:43:39 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 23/11/30 20:43:40 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 23/11/30 20:43:41 WARN DAGScheduler: Broadcasting large task binary with size 1039.1 KiB 23/11/30 20:43:45 WARN DAGScheduler: Broadcasting large task binary with size 1291.7 KiB 23/11/30 20:43:46 WARN DAGScheduler: Broadcasting large task binary with size 1629.6 KiB 23/11/30 20:43:46 WARN DAGScheduler: Broadcasting large task binary with size 1945.5 KiB 23/11/30 20:43:48 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB 23/11/30 20:43:49 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 23/11/30 20:43:49 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 23/11/30 20:43:50 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB 23/11/30 20:43:51 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB 23/11/30 20:43:52 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 23/11/30 20:43:53 WARN DAGScheduler: Broadcasting large task binary with size 1511.4 KiB 23/11/30 20:43:54 WARN DAGScheduler: Broadcasting large task binary with size 1063.5 KiB 23/11/30 20:44:00 WARN DAGScheduler: Broadcasting large task binary with size 1061.8 KiB 23/11/30 20:44:00 WARN DAGScheduler: Broadcasting large task binary with size 1490.4 KiB 23/11/30 20:44:01 WARN DAGScheduler: Broadcasting large task binary with size 2002.2 KiB 23/11/30 20:44:03 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 23/11/30 20:44:04 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB 23/11/30 20:44:06 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB 23/11/30 20:44:07 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB 23/11/30 20:44:09 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB 23/11/30 20:44:10 WARN DAGScheduler: Broadcasting large task binary with size 1552.2 KiB 23/11/30 20:44:14 WARN DAGScheduler: Broadcasting large task binary with size 1061.8 KiB 23/11/30 20:44:15 WARN DAGScheduler: Broadcasting large task binary with size 1490.4 KiB 23/11/30 20:44:16 WARN DAGScheduler: Broadcasting large task binary with size 2002.2 KiB 23/11/30 20:44:17 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 23/11/30 20:44:19 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB 23/11/30 20:44:20 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB 23/11/30 20:44:22 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB 23/11/30 20:44:23 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB 23/11/30 20:44:25 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB 23/11/30 20:44:26 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB 23/11/30 20:44:27 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB 23/11/30 20:44:28 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB 23/11/30 20:44:30 WARN DAGScheduler: Broadcasting large task binary with size 1591.7 KiB 23/11/30 20:44:49 WARN DAGScheduler: Broadcasting large task binary with size 1291.9 KiB 23/11/30 20:44:50 WARN DAGScheduler: Broadcasting large task binary with size 1624.6 KiB 23/11/30 20:44:50 WARN DAGScheduler: Broadcasting large task binary with size 1937.9 KiB 23/11/30 20:44:52 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB 23/11/30 20:44:52 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 23/11/30 20:44:53 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 23/11/30 20:44:54 WARN DAGScheduler: Broadcasting large task binary with size 1033.5 KiB 23/11/30 20:44:59 WARN DAGScheduler: Broadcasting large task binary with size 1291.9 KiB 23/11/30 20:44:59 WARN DAGScheduler: Broadcasting large task binary with size 1624.6 KiB 23/11/30 20:45:00 WARN DAGScheduler: Broadcasting large task binary with size 1937.9 KiB 23/11/30 20:45:01 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB 23/11/30 20:45:02 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 23/11/30 20:45:03 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 23/11/30 20:45:04 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB 23/11/30 20:45:05 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 23/11/30 20:45:05 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB 23/11/30 20:45:06 WARN DAGScheduler: Broadcasting large task binary with size 1211.9 KiB 23/11/30 20:45:07 WARN DAGScheduler: Broadcasting large task binary with size 1059.1 KiB 23/11/30 20:45:13 WARN DAGScheduler: Broadcasting large task binary with size 1058.3 KiB 23/11/30 20:45:14 WARN DAGScheduler: Broadcasting large task binary with size 1474.2 KiB 23/11/30 20:45:14 WARN DAGScheduler: Broadcasting large task binary with size 1962.8 KiB 23/11/30 20:45:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 23/11/30 20:45:17 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB 23/11/30 20:45:19 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB 23/11/30 20:45:20 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB 23/11/30 20:45:22 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB 23/11/30 20:45:23 WARN DAGScheduler: Broadcasting large task binary with size 1509.7 KiB 23/11/30 20:45:27 WARN DAGScheduler: Broadcasting large task binary with size 1058.3 KiB 23/11/30 20:45:28 WARN DAGScheduler: Broadcasting large task binary with size 1474.2 KiB 23/11/30 20:45:29 WARN DAGScheduler: Broadcasting large task binary with size 1962.8 KiB 23/11/30 20:45:30 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 23/11/30 20:45:31 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB 23/11/30 20:45:33 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB 23/11/30 20:45:34 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB 23/11/30 20:45:36 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB 23/11/30 20:45:37 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB 23/11/30 20:45:39 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB 23/11/30 20:45:40 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB 23/11/30 20:45:41 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 23/11/30 20:45:42 WARN DAGScheduler: Broadcasting large task binary with size 1288.1 KiB 23/11/30 20:45:43 WARN DAGScheduler: Broadcasting large task binary with size 1564.5 KiB 23/11/30 20:45:54 WARN DAGScheduler: Broadcasting large task binary with size 1014.2 KiB 23/11/30 20:46:02 WARN DAGScheduler: Broadcasting large task binary with size 1267.2 KiB 23/11/30 20:46:03 WARN DAGScheduler: Broadcasting large task binary with size 1588.2 KiB 23/11/30 20:46:04 WARN DAGScheduler: Broadcasting large task binary with size 1875.6 KiB 23/11/30 20:46:05 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB 23/11/30 20:46:05 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 23/11/30 20:46:06 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 23/11/30 20:46:07 WARN DAGScheduler: Broadcasting large task binary with size 1036.9 KiB 23/11/30 20:46:12 WARN DAGScheduler: Broadcasting large task binary with size 1267.2 KiB 23/11/30 20:46:12 WARN DAGScheduler: Broadcasting large task binary with size 1588.2 KiB 23/11/30 20:46:13 WARN DAGScheduler: Broadcasting large task binary with size 1875.6 KiB 23/11/30 20:46:14 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB 23/11/30 20:46:15 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 23/11/30 20:46:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 23/11/30 20:46:17 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 23/11/30 20:46:17 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 23/11/30 20:46:18 WARN DAGScheduler: Broadcasting large task binary with size 1900.8 KiB 23/11/30 20:46:19 WARN DAGScheduler: Broadcasting large task binary with size 1165.6 KiB 23/11/30 20:46:21 WARN DAGScheduler: Broadcasting large task binary with size 1072.6 KiB 23/11/30 20:46:26 WARN DAGScheduler: Broadcasting large task binary with size 1054.0 KiB 23/11/30 20:46:27 WARN DAGScheduler: Broadcasting large task binary with size 1471.0 KiB 23/11/30 20:46:28 WARN DAGScheduler: Broadcasting large task binary with size 1950.6 KiB 23/11/30 20:46:29 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 23/11/30 20:46:31 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB 23/11/30 20:46:32 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB 23/11/30 20:46:34 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB 23/11/30 20:46:35 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB 23/11/30 20:46:37 WARN DAGScheduler: Broadcasting large task binary with size 1567.1 KiB 23/11/30 20:46:41 WARN DAGScheduler: Broadcasting large task binary with size 1054.0 KiB 23/11/30 20:46:42 WARN DAGScheduler: Broadcasting large task binary with size 1471.0 KiB 23/11/30 20:46:43 WARN DAGScheduler: Broadcasting large task binary with size 1950.6 KiB 23/11/30 20:46:44 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 23/11/30 20:46:46 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB 23/11/30 20:46:47 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB 23/11/30 20:46:49 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB 23/11/30 20:46:50 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB 23/11/30 20:46:52 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB 23/11/30 20:46:53 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB 23/11/30 20:46:55 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB 23/11/30 20:46:56 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB 23/11/30 20:46:57 WARN DAGScheduler: Broadcasting large task binary with size 1750.5 KiB 23/11/30 20:46:58 WARN DAGScheduler: Broadcasting large task binary with size 1622.1 KiB 23/11/30 20:47:16 WARN DAGScheduler: Broadcasting large task binary with size 1228.1 KiB 23/11/30 20:47:17 WARN DAGScheduler: Broadcasting large task binary with size 1518.2 KiB 23/11/30 20:47:18 WARN DAGScheduler: Broadcasting large task binary with size 1792.4 KiB 23/11/30 20:47:19 WARN DAGScheduler: Broadcasting large task binary with size 2024.4 KiB 23/11/30 20:47:20 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB 23/11/30 20:47:21 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 23/11/30 20:47:26 WARN DAGScheduler: Broadcasting large task binary with size 1228.1 KiB 23/11/30 20:47:26 WARN DAGScheduler: Broadcasting large task binary with size 1518.2 KiB 23/11/30 20:47:27 WARN DAGScheduler: Broadcasting large task binary with size 1792.4 KiB 23/11/30 20:47:28 WARN DAGScheduler: Broadcasting large task binary with size 2024.4 KiB 23/11/30 20:47:29 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB 23/11/30 20:47:30 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 23/11/30 20:47:31 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB 23/11/30 20:47:31 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB 23/11/30 20:47:32 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB 23/11/30 20:47:33 WARN DAGScheduler: Broadcasting large task binary with size 1712.3 KiB 23/11/30 20:47:40 WARN DAGScheduler: Broadcasting large task binary with size 1062.3 KiB 23/11/30 20:47:40 WARN DAGScheduler: Broadcasting large task binary with size 1496.6 KiB 23/11/30 20:47:41 WARN DAGScheduler: Broadcasting large task binary with size 2007.1 KiB 23/11/30 20:47:43 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 23/11/30 20:47:44 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB 23/11/30 20:47:46 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB 23/11/30 20:47:47 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB 23/11/30 20:47:49 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB 23/11/30 20:47:50 WARN DAGScheduler: Broadcasting large task binary with size 1528.2 KiB 23/11/30 20:47:54 WARN DAGScheduler: Broadcasting large task binary with size 1062.3 KiB 23/11/30 20:47:54 WARN DAGScheduler: Broadcasting large task binary with size 1496.6 KiB 23/11/30 20:47:55 WARN DAGScheduler: Broadcasting large task binary with size 2007.1 KiB 23/11/30 20:47:57 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 23/11/30 20:47:58 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB 23/11/30 20:47:59 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB 23/11/30 20:48:01 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB 23/11/30 20:48:03 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB 23/11/30 20:48:04 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB 23/11/30 20:48:05 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB 23/11/30 20:48:07 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB 23/11/30 20:48:08 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 23/11/30 20:48:08 WARN DAGScheduler: Broadcasting large task binary with size 1294.3 KiB 23/11/30 20:48:09 WARN DAGScheduler: Broadcasting large task binary with size 1576.9 KiB 23/11/30 20:48:27 WARN DAGScheduler: Broadcasting large task binary with size 1275.7 KiB 23/11/30 20:48:28 WARN DAGScheduler: Broadcasting large task binary with size 1615.8 KiB 23/11/30 20:48:29 WARN DAGScheduler: Broadcasting large task binary with size 1945.3 KiB 23/11/30 20:48:30 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB 23/11/30 20:48:31 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 23/11/30 20:48:32 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 23/11/30 20:48:33 WARN DAGScheduler: Broadcasting large task binary with size 1048.5 KiB 23/11/30 20:48:37 WARN DAGScheduler: Broadcasting large task binary with size 1275.7 KiB 23/11/30 20:48:38 WARN DAGScheduler: Broadcasting large task binary with size 1615.8 KiB 23/11/30 20:48:39 WARN DAGScheduler: Broadcasting large task binary with size 1945.3 KiB 23/11/30 20:48:40 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB 23/11/30 20:48:41 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB 23/11/30 20:48:42 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 23/11/30 20:48:43 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB 23/11/30 20:48:44 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB 23/11/30 20:48:45 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB 23/11/30 20:48:46 WARN DAGScheduler: Broadcasting large task binary with size 1587.4 KiB 23/11/30 20:48:47 WARN DAGScheduler: Broadcasting large task binary with size 1080.0 KiB 23/11/30 20:48:53 WARN DAGScheduler: Broadcasting large task binary with size 1080.7 KiB 23/11/30 20:48:53 WARN DAGScheduler: Broadcasting large task binary with size 1528.5 KiB 23/11/30 20:48:55 WARN DAGScheduler: Broadcasting large task binary with size 2046.6 KiB 23/11/30 20:48:56 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 23/11/30 20:48:57 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB 23/11/30 20:48:59 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB 23/11/30 20:49:00 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB 23/11/30 20:49:02 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB 23/11/30 20:49:04 WARN DAGScheduler: Broadcasting large task binary with size 1590.0 KiB 23/11/30 20:49:07 WARN DAGScheduler: Broadcasting large task binary with size 1080.7 KiB 23/11/30 20:49:08 WARN DAGScheduler: Broadcasting large task binary with size 1528.5 KiB 23/11/30 20:49:09 WARN DAGScheduler: Broadcasting large task binary with size 2046.6 KiB 23/11/30 20:49:10 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB 23/11/30 20:49:12 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB 23/11/30 20:49:13 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB 23/11/30 20:49:15 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB 23/11/30 20:49:17 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB 23/11/30 20:49:18 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB 23/11/30 20:49:19 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB 23/11/30 20:49:21 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB 23/11/30 20:49:22 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB 23/11/30 20:49:22 WARN DAGScheduler: Broadcasting large task binary with size 1251.1 KiB 23/11/30 20:49:23 WARN DAGScheduler: Broadcasting large task binary with size 1079.5 KiB 23/11/30 20:49:23 WARN DAGScheduler: Broadcasting large task binary with size 1636.9 KiB 23/11/30 20:49:28 WARN DAGScheduler: Broadcasting large task binary with size 1077.2 KiB 23/11/30 20:49:29 WARN DAGScheduler: Broadcasting large task binary with size 1526.2 KiB 23/11/30 20:49:30 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB 23/11/30 20:49:31 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB 23/11/30 20:49:33 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB 23/11/30 20:49:34 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB 23/11/30 20:49:36 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB 23/11/30 20:49:38 WARN DAGScheduler: Broadcasting large task binary with size 4.3 MiB
# get the accuracy of prediction
evaluator = MulticlassClassificationEvaluator()
evaluator.setPredictionCol("prediction")
evaluator.setLabelCol("popular")
evaluator.evaluate(predictions)
23/11/30 20:49:40 WARN DAGScheduler: Broadcasting large task binary with size 1633.4 KiB
0.889344592828071
# check parameters of the model selected by grid search
model = cvModel.bestModel.stages[-1]
model
RandomForestClassificationModel: uid=RandomForestClassifier_c2d02e8d046f, numTrees=50, numClasses=2, numFeatures=8
model.getMaxDepth()
15
In this study, we aim to develop a machine learning model that predicts the popularity of Reddit posts based on various factors, with popularity defined as receiving more than 20 comments per post. This model is intended to identify key factors that contribute to increasing comment activity, serving as a guide for Reddit users to enhance the popularity of their posts. Our chosen methodology utilizes the random forest classifier, implemented through the "pyspark.ml" package. The process began with the collection of submission data from the soccer subreddit. Key features included in the analysis were post length and a binary indicator for nighttime posting. A pipeline was established to vectorize the training data and to construct the random forest model. Grid search and cross-validation techniques were employed to optimize the model, leading to the selection of a random forest model with 50 trees and a maximum depth of 15. The model demonstrated a high level of accuracy, achieving a test accuracy of 0.8893.
# get the feature importance of the model
importances = model.featureImportances
import matplotlib.pyplot as plt
import seaborn as sns
importance_values = importances.toArray()
sorted_importances, sorted_features = zip(*sorted(zip(importance_values, feature_list), reverse=True))
# Creating the bar plot
plt.figure(figsize=(20, 12))
sns.barplot(x=list(sorted_features), y=list(sorted_importances), palette='Set2')
plt.xticks(rotation=45)
plt.tick_params(axis='both', labelsize=24)
plt.ylabel('Importance', fontsize=28)
plt.xlabel('Feature', fontsize=28)
plt.title('Feature Importances', fontsize=40)
plt.show()
In terms of feature importance, which is based on the reduction in impurity, the 'score' factor (likes minus dislikes) emerged as the most influential. Secondary factors, such as the number of crossposts and title length, had a significant but lesser impact. Other contributing factors included posts authored by administrators, those containing videos, or with enabled previews, each showing a moderate increase in popularity. Conversely, the time of publishing and the presence of adult content were found to be less influential in determining post popularity.
# save self-trained random forest model to local
cvModel.bestModel.save("../../model/random forest")
# create a new test set and save it as par
(trainingData_new, testData_new) = df_num.randomSplit([0.8, 0.2])
testData_new.write.parquet("../../data/parquet/")