Build recommender systems for product cross-selling opportunity
Iowa Liquor Sales Dataset(link)
The dataset contains alcoholic purchases by liquor stores registered with the Iowa Department of Commerce that were logged in the Commerce department system from January 1, 2012 to current.
Method : Collaborative Filtering
PySpark MLlib: Alternating Least Squares (ALS) algorithm
spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, month, year, date_format, date_sub, to_date, date_add, lag, dayofweek, dayofmonth, dayofyear, dayofweek
import pandas as pd
import numpy as np
from pyspark.sql.functions import approxCountDistinct
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
from pyspark.sql.types import DateType, IntegerType,NumericType
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.feature import Bucketizer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder,CrossValidatorModel
import matplotlib.pyplot as plt
%matplotlib inline
spark = SparkSession.builder.appName('LiquorSales').getOrCreate()
df = spark.read.csv('gs://bigdata2_data/liquor_sales', inferSchema=True, header=True)
df.count()
df.dtypes
df.printSchema()
df.show(3)
Let's look at how many liquor stores in Iowa
df.select(["Store Number","Store Name"]).dropDuplicates().show(5)
df.select("Store Name").distinct().count()
df.select("Store Number").distinct().count()
The number of unique store number and store name are not equal, indicating there are might be some typos in the store names or name changed for some stores. In the following analysis, we are using store number as the unique identifier. And we will use a lookup table, Lowa_liquor_stores (sourced from City of Lowa website) to look up store names.
df=df.withColumn('Date',
unix_timestamp(col('Date'), "MM/dd/yyyy").cast("timestamp"))
# add the month column
df = df.withColumn('month', month(df['Date']))
# add the year column
df = df.withColumn('year', year(df['Date']))
# add a week_day column
df = df.withColumn("week_day", dayofweek(df['Date']))\
.withColumn("DayofMonth",dayofmonth(df['Date']))\
.withColumn("DayofYear", dayofyear(df['Date']))
# add a column of 1's
df= df.withColumn("Number of Sale",lit(1))
This step to normalize text data for liquor category and store name
# read in updated categories file
liquor_categories = spark.read.csv('gs://bigdata2_data/liquor_categories.csv', header=True)
liquor_categories.show()
# join categories to sales data
df = df.join(liquor_categories,df['Category Name'] == liquor_categories['Category Name'],how='left')
The names of liquor store is not consistent within the dataset. We cleaned up the liquor store names to match with liquor store ID
# read in liquor store lookup tables
liquor_store_clean = spark.read.csv('gs://bigdata2_data/Iowa_Liquor_Stores.csv', header=True)
liquor_store_clean.show(5)
liquor_store_clean.printSchema()
# Update Store column to integertype for later modeling purpose
liquor_store_clean = liquor_store_clean.withColumn("Store", liquor_store_clean["Store"].cast(IntegerType()))
# join categories to sales data
df = df.join(liquor_store_clean,df['Store Number'] == liquor_store_clean['Store'],how='left')
df.columns
df.show(2)
As we need a rating feature for the ALS model, we are going to use the sales(in dollars) for a liquor product as a percentage of total sales for that store to infer the rating of product by the store. The more a liquor store order a specific product, the more likely a store like that product. We added a calculated column Perc_of_store_total_salewe calculate the percentage of sales from that liquor item out of the total sales by the store
# Calculate the most popular liqour item for each store
sale_by_store_item = df.groupBy("Store", "Item Number").sum("Sale (Dollars)")
# calculate total sales in dallors for each store
sales_by_store = df.groupBy("Store").sum("Sale (Dollars)")
# rename columns
sales_by_store = sales_by_store.withColumnRenamed("sum(Sale (Dollars))", "store_total_sale_dollars")
# Rename columns
sales_by_store = sales_by_store.withColumnRenamed("Store", "store_num")
# join two tables
sale_by_store_by_item = sale_by_store_item.alias("a").join(sales_by_store\
.alias("b"),sale_by_store_item['Store'] == sales_by_store['store_num'],how='left')
Here we created a table to group sales in dollars for each store and further for each items. We also calulated the total sales in dollars for all transactions of each store. Then we left-joined table 1 by table 2.
*The sales in dollars means the cost of ordering that liquor products liquor store
sale_by_store_by_item.show(5)
Perc_of_store_total_sale : percentage of sales generated from each liquor item (Item Number) for that specific store(store_num). We will use this column as the "rating" by a store towards a liquor item.
sale_by_store_by_item = sale_by_store_by_item.withColumn("perc_of_store_total_sale",
sale_by_store_by_item["sum(Sale (Dollars))"]/ sale_by_store_by_item["store_total_sale_dollars"] )
sale_by_store_by_item.show(2)
# For example, here are the top 3 liquor items in sales for store_num 2659
sale_by_store_by_item.filter(sale_by_store_by_item.store_num == 2659).sort('perc_of_store_total_sale',ascending=False)\
.show(3)
from pyspark.sql.functions import *
from pyspark.sql.window import Window
#For each store, we can rank liquor items from most popular to least based on sales in dolloars
ranked = sale_by_store_by_item.withColumn("rank", dense_rank().over(Window.partitionBy("store_num").orderBy(desc("perc_of_store_total_sale"))))
ranked.show(5)
store_item_rated = QuantileDiscretizer(numBuckets=5, inputCol="perc_of_store_total_sale",outputCol="rating")\
.fit(sale_by_store_by_item).transform(sale_by_store_by_item)
# For example for store 2659, the most highly rated items are the following:
store_item_rated.filter(store_item_rated.Store == 2659).sort('rating',ascending=False)\
.show(2)
ratings = store_item_rated.select("Store","Item Number", "rating","perc_of_store_total_sale")
ratings.show(5)
Convert item number to format that's acceptable by the Recommender ALS function
df = df.withColumn("Item Number", regexp_extract("Item Number", "\\d+", 0))
ratings= ratings.withColumn("Item Number", regexp_extract("Item Number", "\\d+", 0))
ratings = ratings.withColumn("Item Number", ratings["Item Number"].cast(IntegerType()))
ratings = ratings.withColumnRenamed("Item Number", "item_num")
ratings.show(5)
#Count if any missing values in the row
ratings.dropna().count()
ratings.count()
# drop na rows
ratings = ratings.dropna()
liquor_store_clean = liquor_store_clean.withColumnRenamed("Store", "store_num_b")
training, test = ratings.randomSplit([0.8,0.2], seed = 42)
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=10, regParam=0.01, userCol="Store", itemCol="item_num", ratingCol="rating",
coldStartStrategy="drop", nonnegative = True)
#fit model
model = als.fit(training)
# Apply model on the test set to predict
predictions = model.transform(test)
#explain parameters of the model
model.explainParams()
#item factors
model.itemFactors.show(10, truncate = False)
predictions = predictions.join(liquor_store_clean,liquor_store_clean.store_num_b== predictions.Store)
predictions.show(5)
predictions = predictions.na.drop()
predictions.show(10, truncate = False)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating')
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
The ALS function recommend these liquor items to liquor store to purchased based on our customed ratings
userRecs = model.recommendForAllUsers(10)
userRecs.show(10, truncate=False)
The ALS function recommend these liquor stores to liquor items to sell to based on our customed ratings
liquorRecs = model.recommendForAllItems(10)
liquorRecs.show(10, truncate=False)
# Find the top 10 stores by total sales in dollars
top_stores = df.groupBy(df['Store']).agg({'Sale (Dollars)':"sum"}).sort("sum(Sale (Dollars))", ascending=False).dropna().limit(10)
top_stores.select("Store").collect()
#.select("Store")
#top_stores.collect()
#Get the store number of top 10 stores by total sales in dollars
top10_store_list = [row.Store for row in top_stores.select("Store").collect()]
top10_store_list
# collect the recommendations to top 10 stores by total sales in dollars
rec_liquor_top10_store ={}
for store in top10_store_list:
rec_liquor = userRecs.where(userRecs.Store == store).select("recommendations").collect()
rec_liquor_top10_store[store] = [i.item_num for i in rec_liquor[0]["recommendations"]]
rec_liquor_top10_store
liquor_product_list = df.select("Item Number",'Item Description','Category Name Clean').dropDuplicates()
liquor_store_clean.filter(liquor_store_clean.store_num_b==2190).select('store_num_b', "Name")\
.show(truncate = False)
# liquor products recommended to store 2190
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store[2190]))\
.show(truncate = False)
# Rename columns
sale_by_store_by_item =sale_by_store_by_item.withColumnRenamed("Item Number", "item_num")
sale_by_store_by_item.join(liquor_product_list ,sale_by_store_by_item["item_num"] == liquor_product_list["Item Number"], how='inner')\
.filter(sale_by_store_by_item.store_num==2190)\
.sort("sum(Sale (Dollars))", ascending=False)\
.select("store_num", "item_num", "Item Description","Category Name Clean")\
.dropDuplicates(["item_num"])\
.show(5, truncate=False)
liquor_store_clean.filter(liquor_store_clean.store_num_b==2512).select('store_num_b', "Name")\
.show(truncate = False)
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store[2512])).show(truncate = False)
liquor_store_clean.filter(liquor_store_clean.store_num_b==2633).select('store_num_b', "Name")\
.show(truncate = False)
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store[2633]))\
.show(truncate = False)
liquor_store_clean.filter(liquor_store_clean.store_num_b==3354).select('store_num_b', "Name")\
.show(truncate = False)
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store[3354]))\
.show(truncate = False)
liquor_store_clean.filter(liquor_store_clean.store_num_b==3385).select('store_num_b', "Name")\
.show(truncate = False)
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store[3385]))\
.show(truncate = False)
liquor_store_clean.filter(liquor_store_clean.store_num_b==3420).select('store_num_b', "Name")\
.show(truncate = False)
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store[3420]))\
.show(truncate = False)
liquor_store_clean.filter(liquor_store_clean.store_num_b==3814).select('store_num_b', "Name")\
.show(truncate = False)
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store[3814]))\
.show(truncate = False)
liquor_store_clean.filter(liquor_store_clean.store_num_b==3773).select('store_num_b', "Name")\
.show(truncate = False)
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store[3773]))\
.show(truncate = False)
Identify most popular sales liquor items
# Find the top 10 liquor items by total sales in dollars
df=df.withColumnRenamed("Item Number", "item_num")
top_items = df.groupBy(df['item_num']).agg({'Sale (Dollars)':"sum"}).sort("sum(Sale (Dollars))", ascending=False).dropna().limit(10)
# Find the top 10 stores by total sales in dollars
top_stores = df.groupBy(df['Store']).agg({'Sale (Dollars)':"sum"}).sort("sum(Sale (Dollars))", ascending=False).dropna().limit(10)
# Find the top 10 liquor items by total sales in dollars
top10_item_list = [row.item_num for row in top_items.select("item_num").collect()]
# Find the top 110 liquor items by total sales in dollars
top10_item_list
rec_store_top10_item ={}
for item in top10_item_list:
rec_store = liquorRecs.where(liquorRecs.item_num == item).select("recommendations").collect()
rec_store_top10_item[item] = [i.Store for i in rec_store[0]["recommendations"]]
rec_store_top10_item
df.filter(df.item_num=='11296').select("item_num", 'Item Description').dropDuplicates().show()
df.filter(df.item_num=='11296').select("item_num", 'Item Description','Category Name Clean').dropDuplicates().show(truncate = False)
liquor_store_clean.filter(liquor_store_clean["store_num_b"].isin(rec_store_top10_item['11296'])).select("store_num_b",
"Name", "Address","City").show(truncate = False)
df.filter(df.item_num=='11297').select("item_num", 'Item Description','Category Name Clean').dropDuplicates().show(truncate = False)
liquor_store_clean.filter(liquor_store_clean["store_num_b"].isin(rec_store_top10_item['11297'])).select("store_num_b",
"Name", "Address","City").show(truncate = False)
df.filter(df.item_num=='11788').select("item_num", 'Item Description','Category Name Clean')\
.dropDuplicates().show(truncate = False)
liquor_store_clean.filter(liquor_store_clean["store_num_b"].isin(rec_store_top10_item['11788'])).select("store_num_b",
"Name", "Address","City").show(truncate = False)
df.filter(df.item_num=='1799').select("item_num", 'Item Description','Category Name Clean').dropDuplicates().show(truncate = False)
liquor_store_clean.filter(liquor_store_clean["store_num_b"].isin(rec_store_top10_item['1799'])).select("store_num_b",
"Name", "Address","City").show(truncate = False)
df.filter(df.item_num=='26826').select("item_num", 'Item Description','Category Name Clean').dropDuplicates().show(truncate = False)
liquor_store_clean.filter(liquor_store_clean["store_num_b"].isin(rec_store_top10_item['26826'])).select("store_num_b",
"Name", "Address","City").show(truncate = False)
df.filter(df.item_num=='26827').select("item_num", 'Item Description','Category Name Clean').dropDuplicates().show(truncate = False)
liquor_store_clean.filter(liquor_store_clean["store_num_b"].isin(rec_store_top10_item["26827"])).select("store_num_b",
"Name", "Address","City").show(truncate = False)
df.filter(df.item_num=='36308').select("item_num", 'Item Description','Category Name Clean').dropDuplicates().show(truncate = False)
liquor_store_clean.filter(liquor_store_clean["store_num_b"].isin(rec_store_top10_item['36308'])).select("store_num_b",
"Name", "Address","City").show(truncate = False)
As the rating matrix is derived from another source of information (i.e. it is inferred from product sales as percentage of store total sales), we set implicitPrefs to true to see how it performs.
For this model, instead of using the rating column (a column transformed by QuantileDiscretizerizing Percentage of Total Store Sales), we are using the Percentage of Total Store Sales the column itself.
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als_imp = ALS(maxIter=10, regParam=0.01, implicitPrefs=True, userCol="Store", itemCol="item_num",
ratingCol="perc_of_store_total_sale",
coldStartStrategy="drop", nonnegative = True)
#fit model
model_imp = als_imp.fit(training)
# Apply model on the test set to predict
predictions_imp = model_imp.transform(test)
#explain parameters of the model
model_imp.explainParams()
#item factors
model_imp.itemFactors.show(10, truncate = False)
#liquor_store_clean = liquor_store_clean.withColumnRenamed("Store", "store_num_b")
predictions_imp = predictions_imp.join(liquor_store_clean,liquor_store_clean.store_num_b== predictions.Store)
predictions_imp.show(5)
predictions_imp =predictions_imp.na.drop()
predictions_imp.show(3, truncate = False)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='perc_of_store_total_sale')
rmse_imp = evaluator.evaluate(predictions_imp)
print("Root-mean-square error = " + str(rmse_imp))
userRecs_imp = model_imp.recommendForAllUsers(10)
userRecs_imp.show(2, truncate=False)
The ALS function recommend these liquor stores to liquor items to sell to based on our customed ratings
liquorRecs_imp = model_imp.recommendForAllItems(10)
liquorRecs_imp.show(2, truncate=False)
#Get the store number of top 10 stores by total sales in dollars
top10_store_list
# collect the recommendations to top 10 stores by total sales in dollars
rec_liquor_top10_store_imp ={}
for store in top10_store_list:
rec_liquor_imp = userRecs_imp.where(userRecs_imp.Store == store).select("recommendations").collect()
rec_liquor_top10_store_imp[store] = [i.item_num for i in rec_liquor_imp[0]["recommendations"]]
rec_liquor_top10_store_imp
liquor_store_clean.filter(liquor_store_clean.store_num_b==2190).select('store_num_b', "Name")\
.show(truncate = False)
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store_imp[2190]))\
.show(truncate = False)
liquor_store_clean.filter(liquor_store_clean.store_num_b== 2512).select('store_num_b', "Name")\
.show(truncate = False)
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store_imp[2512]))\
.show(truncate = False)
rec_store_top10_item_imp ={}
for item in top10_item_list:
rec_store_imp = liquorRecs_imp.where(liquorRecs_imp.item_num == item).select("recommendations").collect()
rec_store_top10_item_imp[item] = [i.Store for i in rec_store_imp[0]["recommendations"]]
rec_store_top10_item_imp
df.filter(df.item_num=='11296').select("item_num", 'Item Description','Category Name Clean').dropDuplicates().show(truncate = False)
liquor_store_clean.filter(liquor_store_clean["store_num_b"].isin(rec_store_top10_item_imp['11296'])).select("store_num_b",
"Name", "Address","City").show(truncate = False)
df.filter(df.item_num=='11297').select("item_num", 'Item Description','Category Name Clean').dropDuplicates().show(truncate = False)
liquor_store_clean.filter(liquor_store_clean["store_num_b"].isin(rec_store_top10_item_imp['11297'])).select("store_num_b",
"Name", "Address","City").show(truncate = False)
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als_cv = ALS(userCol="Store", itemCol="item_num", ratingCol="rating",
coldStartStrategy="drop", nonnegative = True)
evaluator_cv = RegressionEvaluator(metricName='rmse', labelCol='rating')
# build parameter grid
paramGrid = ParamGridBuilder() \
.addGrid(als_cv.maxIter, [10, 15, 20]) \
.addGrid(als_cv.rank, [10, 15, 20]) \
.addGrid(als_cv.regParam, [0.01, 0.001]) \
.build()
crossval = CrossValidator(estimator=als_cv,
estimatorParamMaps=paramGrid,
evaluator=evaluator_cv,
numFolds=5)
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
# Apply model on the test set to predict
prediction_cv = cvModel.transform(test)
prediction_cv = prediction_cv.join(liquor_store_clean,liquor_store_clean.store_num_b== predictions.Store)
prediction_cv.show(10, truncate = False)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating')
rmse_cv = evaluator.evaluate(prediction_cv)
print("Root-mean-square error = " + str(rmse_cv))
sorted(zip(cvModel.avgMetrics, paramGrid))[0][1]
params = [{p.name: v for p, v in m.items()} for m in cvModel.getEstimatorParamMaps()]
# All grid search results
cv_rmse_df = pd.DataFrame.from_dict({"params": params, cvModel.getEvaluator().getMetricName():cvModel.avgMetrics}).sort_values(by="rmse")
cv_rmse_df
# parameters for best model
for k,v in cv_rmse_df.params[cv_rmse_df.rmse == cv_rmse_df.rmse.min()].values[0].items():
print k, v
# RMSE for best model
cv_rmse_df.rmse.min()
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als_cv = ALS(maxIter=20, regParam=0.01, rank = 10, userCol="Store", itemCol="item_num", ratingCol="rating",
coldStartStrategy="drop", nonnegative = True)
#fit model
als_cv_model = als_cv.fit(training)
# Apply model on the test set to predict
als_cv_predictions = als_cv_model.transform(test)
userRecs_cv = als_cv_model.recommendForAllUsers(10)
userRecs_cv.show(3, truncate=False)
liquorRecs_cv = als_cv_model.recommendForAllItems(10)
liquorRecs_cv.show(3, truncate=False)
# collect the recommendations to top 10 stores by total sales in dollars
rec_liquor_top10_store_cv ={}
for store in top10_store_list:
rec_liquor = userRecs_cv.where(userRecs_cv.Store == store).select("recommendations").collect()
rec_liquor_top10_store_cv[store] = [i.item_num for i in rec_liquor[0]["recommendations"]]
rec_liquor_top10_store_cv
liquor_store_clean.filter(liquor_store_clean.store_num_b==2190).select('store_num_b', "Name")\
.show(truncate = False)
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store_cv[2190]))\
.show(truncate = False)
liquor_store_clean.filter(liquor_store_clean.store_num_b== 2512).select('store_num_b', "Name")\
.show(truncate = False)
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store_cv[2512]))\
.show(truncate = False)
rec_store_top10_item_cv ={}
for item in top10_item_list:
rec_store = liquorRecs_cv.where(liquorRecs_cv.item_num == item).select("recommendations").collect()
rec_store_top10_item_cv[item] = [i.Store for i in rec_store[0]["recommendations"]]
rec_store_top10_item_cv
df.filter(df.item_num=='11296').select("item_num", 'Item Description','Category Name Clean').dropDuplicates().show(truncate = False)
liquor_store_clean.filter(liquor_store_clean["store_num_b"].isin(rec_store_top10_item_cv['11296'])).select("store_num_b",
"Name", "Address","City").show(truncate = False)
df.filter(df.item_num=='11297').select("item_num", 'Item Description','Category Name Clean').dropDuplicates().show(truncate = False)
liquor_store_clean.filter(liquor_store_clean["store_num_b"].isin(rec_store_top10_item_cv['11297'])).select("store_num_b",
"Name", "Address","City").show(truncate = False)
rec_liquor_top10_store_cv
liquor_store_clean.filter(liquor_store_clean.store_num_b==2190).select('store_num_b', "Name")\
.show(truncate = False)
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store_cv[2190]))\
.show(truncate = False)
liquor_store_clean.filter(liquor_store_clean.store_num_b== 2512).select('store_num_b', "Name")\
.show(truncate = False)
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store_cv[2512]))\
.show(truncate = False)
rec_store_top10_item_cv ={}
for item in top10_item_list:
rec_store = liquorRecs_cv.where(liquorRecs_cv.item_num == item).select("recommendations").collect()
rec_store_top10_item_cv[item] = [i.Store for i in rec_store[0]["recommendations"]]
rec_store_top10_item_cv
df.filter(df.item_num=='11296').select("item_num", 'Item Description','Category Name Clean').dropDuplicates().show(truncate = False)
liquor_store_clean.filter(liquor_store_clean["store_num_b"].isin(rec_store_top10_item_cv['11296'])).select("store_num_b",
"Name", "Address","City").show(truncate = False)
df.filter(df.item_num=='11297').select("item_num", 'Item Description','Category Name Clean').dropDuplicates().show(truncate = False)
liquor_store_clean.filter(liquor_store_clean["store_num_b"].isin(rec_store_top10_item_cv['11297'])).select("store_num_b",
"Name", "Address","City").show(truncate = False)
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als_imp_cv = ALS(implicitPrefs=True, userCol="Store", itemCol="item_num",
ratingCol="perc_of_store_total_sale",
coldStartStrategy="drop", nonnegative = True)
evaluator_cv = RegressionEvaluator(metricName='rmse', labelCol='perc_of_store_total_sale')
# build parameter grid
paramGrid = ParamGridBuilder() \
.addGrid(als_imp_cv.maxIter, [10, 15, 20]) \
.addGrid(als_imp_cv.rank, [10, 15, 20]) \
.addGrid(als_imp_cv.regParam, [0.01, 0.001]) \
.build()
crossval_imp = CrossValidator(estimator=als_imp_cv,
estimatorParamMaps=paramGrid,
evaluator=evaluator_cv,
numFolds=5)
# Run cross-validation, and choose the best set of parameters.
cvModel_imp = crossval_imp.fit(training)
# Apply model on the test set to predict
prediction_cv_imp = cvModel_imp.transform(test)
prediction_cv_imp = prediction_cv_imp.join(liquor_store_clean,liquor_store_clean.store_num_b== prediction_cv_imp.Store)
prediction_cv_imp.show(10, truncate = False)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='perc_of_store_total_sale')
rmse_cv_imp = evaluator.evaluate(prediction_cv_imp)
print("Root-mean-square error = " + str(rmse_cv_imp))
sorted(zip(cvModel_imp.avgMetrics, paramGrid))[0][1]
params = [{p.name: v for p, v in m.items()} for m in cvModel_imp.getEstimatorParamMaps()]
# All grid search results
cv_rmse_df_imp = pd.DataFrame.from_dict({"params": params, cvModel_imp.getEvaluator().getMetricName():cvModel_imp.avgMetrics}).sort_values(by="rmse")
cv_rmse_df_imp
# parameters for best model
for k,v in cv_rmse_df_imp.params[cv_rmse_df_imp.rmse == cv_rmse_df_imp.rmse.min()].values[0].items():
print k, v
# RMSE for best model
cv_rmse_df_imp.rmse.min()
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als_cv_imp = ALS(maxIter=10, regParam=0.01, rank = 10, implicitPrefs=True,
userCol="Store", itemCol="item_num", ratingCol="perc_of_store_total_sale",
coldStartStrategy="drop", nonnegative = True)
#fit model
als_cv_model_imp = als_cv_imp.fit(training)
# Apply model on the test set to predict
als_cv_predictions_imp = als_cv_model_imp.transform(test)
userRecs_cv_imp = als_cv_model_imp.recommendForAllUsers(10)
userRecs_cv_imp.show(3, truncate=False)
liquorRecs_cv_imp = als_cv_model_imp.recommendForAllItems(10)
liquorRecs_cv_imp.show(3, truncate=False)
# collect the recommendations to top 10 stores by total sales in dollars
rec_liquor_top10_store_cv_imp ={}
for store in top10_store_list:
rec_liquor = userRecs_cv_imp.where(userRecs_cv_imp.Store == store).select("recommendations").collect()
rec_liquor_top10_store_cv_imp[store] = [i.item_num for i in rec_liquor[0]["recommendations"]]
rec_liquor_top10_store_cv_imp
liquor_store_clean.filter(liquor_store_clean.store_num_b==2190).select('store_num_b', "Name")\
.show(truncate = False)
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store_cv_imp[2190]))\
.show(truncate = False)
liquor_store_clean.filter(liquor_store_clean.store_num_b== 2512).select('store_num_b', "Name")\
.show(truncate = False)
liquor_product_list.filter(liquor_product_list["Item Number"].isin(rec_liquor_top10_store_cv_imp[2512]))\
.show(truncate = False)
rec_store_top10_item_cv_imp ={}
for item in top10_item_list:
rec_store = liquorRecs_cv_imp.where(liquorRecs_cv_imp.item_num == item).select("recommendations").collect()
rec_store_top10_item_cv_imp[item] = [i.Store for i in rec_store[0]["recommendations"]]
rec_store_top10_item_cv_imp
df.filter(df.item_num=='11296').select("item_num", 'Item Description','Category Name Clean').dropDuplicates().show(truncate = False)
liquor_store_clean.filter(liquor_store_clean["store_num_b"].isin(rec_store_top10_item_cv_imp['11296'])).select("store_num_b",
"Name", "Address","City").show(truncate = False)
df.filter(df.item_num=='1799').select("item_num", 'Item Description','Category Name Clean').dropDuplicates().show(truncate = False)
liquor_store_clean.filter(liquor_store_clean["store_num_b"].isin(rec_store_top10_item_cv_imp['1799'])).select("store_num_b",
"Name", "Address","City").show(truncate = False)