Market basket analysis with Spark Core

In this section, we will look at how to develop a large-scale machine learning pipeline in terms of market basket analysis. Other than using the Spark ML and MLlib, we will demonstrate how to use Spark Core to develop such an application.

Background

In an early paper, An Efficient Market Basket Analysis Technique with Improved MapReduce Framework on Hadoop: An E-commerce Perspective (available at http://onlinepresent.org/proceedings/vol6_2012/8.pdf), the authors have argued that the market basket analysis (MBA) technique is of substantial importance to everyday business decision, since customers' purchase rules can be extracted from the association rules by discovering what items they are buying frequently and together. Consequently, purchase rules can be revealed for frequent shoppers based on these association rules.

You might still be wondering why we need market basket analysis, why it is important, and why it is computationally expensive. Well, if you could identify highly specific association rules like, for example, if a customer prefers mango or orange jam along with their milk or butter, you need to have large-scale transactional data to be analyzed and processed. Moreover, some massive chain retailers or supermarkets, for example, E-mart (UK), HomePlus (Korea), Aldi (Germany), or Dunnes Stores (Ireland) use databases of many millions, or even billions, of transactions in order to find the associations among particular items with regard to brand, color, origin, or even flavor, to increase the probability of sales and profit.

In this section, we will look at an efficient approach for large-scale market basket analysis with Spark libraries. After reading and practising this, you will be able to show how the Spark framework lifts the existing single-node pipeline to a pipeline usable on a multi-node data-mining cluster. The result is that our proposed association-rules mining algorithm can be reused in parallel with the same benefits.

We use the acronym SAMBA, for Spark-based Market Basket Analysis, min_sup for minimum support, and min_conf for minimum confidence. We also use the terms frequent patterns and frequent itemset interchangeably.

Motivations

Traditional main memory or disk-based computing and RDBMS are not capable of handling ever-increasing large transactional data. Furthermore, as discussed in Chapter 1, Introduction to Data Analytics with Spark, MapReduce has several issues with the I/O operation, algorithmic complexity, low-latency, and fully disk-based operation. Therefore, finding the null transactions and later eliminating them from the future scheme is the initial part of this approach.

It is quite possible to find all the null transactions by identifying those transactions that do not appear against at least one frequent 1-itemset. As already mentioned, Spark caches the intermediate data into memory and provides an abstraction of Resilient Distributed Datasets (RDDs), which can be used to overcome these issues by making a huge difference, achieving tremendous success in the last three years for handling large-scale data in distributed computing systems. These successes are promising and motivating examples to explore this research work to applying Spark in market basket analysis.

Exploring the dataset

Please download the grocery dataset for the market basket analysis from https://github.com/stedy/Machine-Learning-with-R-datasets/blob/master/groceries.csv. The first five rows of the raw grocery.csv data are as follows in Figure 11. These lines indicate 10 separate grocery-store transactions. The first transaction includes four items: citrus fruit, semi-finished bread, margarine, and ready soups. In comparison, the third transaction includes only one item, whole milk:

Exploring the dataset

Figure 11: A snapshot of the groceries dataset

Problem statements

We believe we have enough motivations and reasons for why we need to analyze the market basket using transactional or retail datasets. Now, let us discuss some background studies, which are needed to apply our Spark-based market basket analysis technique.

Suppose you have a set of distinct items I = {i1, i2...in} and n is the number of distinct items. A transactional database T = {t1, t2...tN} is a set of N transactions and |N| is the number of total transactions. A set X Problem statements is called a pattern or itemset. We assume that input is given as a sequence of transactions, where items are separated by a comma, as shown in Table 1.

For the sake of simplicity to describe the background study, the same transactions are presented with a single character in Table 2:

Transaction 1

Transaction 2

Transaction 3

Transaction 4

...

crackers, ice-cream, coke, orange,

beef, pizza, coke, bread

baguette, soda, shampoo, crackers, pepsi

burger, cream cheese, diapers, milk

...

Table 1. Sample transactions made by a customer

TID

Itemset (Sequence of items)

10

A, B, C, F

20

C, D, E

30

A, C, E, D

40

A

50

D, E, G

60

B, D

70

B

80

A, E, C

90

A, C, D

100

B, E, D

Table 2. A transactional database

If Problem statements , it is said that X occurs in t or t contains X. The support count is the frequency of occurrence of an itemset in all transactions, which can be described as follows:

Problem statements

In other words, if the supportProblem statements , we say that X is a frequent itemset. For example, in Table 2, the occurrences of itemsets CD, DE, and CDE are 3, 3, and 2, respectively, and if the min_sup is 2, all of these are frequent itemsets.

On the other hand, association rules are statements of form Problem statements or more formally:

Problem statements

Therefore, we can say that an association rule is a pattern that states when X occurs, then Y occurs with a certain probability. Confidence for the association rule defined in equation 1 can be expressed as how often items in Y appear in transactions that also contain X, as follows:

Problem statements

Now we need to introduce a new parameter, called lift, which as a metric is a measure of how much more likely one item is to be purchased relative to its typical purchase rate, given that you know another item has been purchased. This is defined by the following equation:

Problem statements

In a nutshell, given a transactional database, now the problem of market basket analysis is to find the complete set of a customer's purchase rules by means of association rules from the frequent itemsets whose support and confidence are no less than the min_sup and min_conf threshold, respectively.

Large-scale market basket analysis using Spark

As shown in Figure 12, we assume that transactional databases are stored in a distributed way in a cluster of DB servers. A DB server is a computing node with large storage and main memory. Therefore, it can store large datasets, so it can compute any task assigned to it. The Driver PC is also a computing node, which mainly works as a client and controls the overall process.

Obviously, it needs to have a large memory for processing and holding the Spark codes to send across the computing nodes. The codes consist of a DB server ID, minimum support, minimum confidence, and mining algorithm:

Large-scale market basket analysis using Spark

Figure 12: Workflow of the SAMBA algorithm using Spark

From the patterns, frequent patterns are generated using reduce phase 1, which satisfies the constraints min_sup. The map phase is applied on the computed frequent patterns to generate the sub-patterns that eventually help to generate the association rules. From the sub-patterns, reduce phase 2 is applied to generate the association rules that satisfy the constraints min_conf.

The incorporation of two Map and Reduce phases is possible because of the Spark ecosystem toward the Spark core and associated APIs. The final results are the complete set of association rules with their respective support count and confidence.

These store keepers have the full form to place their items, based on the association between items, to increase sales to frequent and non-frequent shoppers. Due to space constraints, we cannot show a step-by-step example for the sample transactional database presented in Table 2.

However, we believe that the workflow and the pseudo codes will suffice to understand the total scenario. A DB server takes the input of codes sent from the Driver PC and starts the computation. From an environment variable Spark session, we create some initial data reference or RDD objects. Then, the initial RDD objects are transformed to create more and brand new RDD objects in the DB server. At first, it reads the dataset as a plain text (or other supported format) and null transactions using narrow/wide transformations (that is, flatMap, mapToPair, and reduceByKey).

Thereby, the filter join RDD operation provides a data segment without null transactions. Then the RDD objects are materialized to dump the RDD into the DB server's storage as filtered datasets. Spark's inter RDD join operation allows for the combining of the contents of multiple RDDs within a single data node. In summary, we follow the steps given here before getting the filtered dataset:

  1. Set the system property of the distributed processing model and cluster manager (that is, Mesos) as true. This value can be saved on your application development as standard Spark code.
  2. Set SparkConf, AppName, Master URL, Spark local IP, Spark driver host IP, Spark executor memory, and Spark driver memory.
  3. Create JavaSparkContext using the SparkConf.
  4. Create JavaRDD and read the dataset as plain text, as transactions, and perform necessary partitioning.
  5. Perform a flatMap operation over the RDD to split the transactions as items.
  6. Perform the mapToPair operation to ease finding the key/value pairs of the items.
  7. Perform the filter operation to remove all the null transactions.

When we have the filtered databases, we materialize an action inter-RDD join operation to save the dataset on a DB server or partition if it does not have enough storage for a single machine, or cache if there's not enough memory.

Figure 12 shows the complete workflow of getting association rules as the final results using Spark's APIs. On the other hand, Figure 13 shows the pseudo-code of the algorithm, namely, Spark-Based Market Basket Analysis (SAMBA). There are actually two Map and Reduce operations associated, as outlined here:

  • Map/Reduce phase 1: Mappers read the transactions from the HDFS servers and convert the transactions to patterns. Reducers, on the other hand, find the frequent patterns.
  • Map/Reduce phase 2: Mappers convert the frequent patterns into sub-patterns. On the other hand, a reducer generates the association rules based on the given constraints (min_conf and lift):
    Large-scale market basket analysis using Spark

    Figure 13: The SAMBA algorithm

After that, the SAMBA algorithm reads the filtered database (FTDB) and applies map phase 1 to generate all the possible combinations of the patterns. Then the mapToPair() methods them as patterns with their respective supports.

The algorithm solution using Spark Core

Here we will look at how to do the market basket analysis using Spark Core. Please note, we will not use the Spark ML or MLlib since although MLlib provides a technique of calculating the association rules, however, it does not show how to calculate some other parameters such as calculating confidence, support, and lift that are very needed for a complete analysis of groceries dataset. Therefore, we will show a complete example, step-by-step, from data exploration to association rules generation.

Step 1: Import the necessary packages and APIs

Here is the code to import packages and APIs:

import java.util.ArrayList; 
import java.util.Iterator; 
import java.util.List; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFlatMapFunction; 
import org.apache.spark.rdd.RDD; 
import org.apache.spark.sql.SparkSession; 
import scala.Tuple2;  
import scala.Tuple4; 

Step 2: Create the entry point by specifying the Spark session

The entry point can be created with the help of the following code:

SparkSession spark = SparkSession 
.builder() 
.appName("MarketBasketAnalysis") 
.master("local[*]") 
.config("spark.sql.warehouse.dir", "E:/Exp/") 
.getOrCreate(); 

Step 3: Create the Java RDD for the transactions

Java RDD for the transactions can be created with the help of the following code:

String transactionsFileName = "Input/groceries.data"; 
RDD<String> transactions = spark.sparkContext().textFile(transactionsFileName, 1); 
transactions.saveAsTextFile("output/transactions"); 

Step 4: Create a method for creating the list

Create a method named toList from the created transactions RDDs, which will add all the items in the transactions:

  static List<String> toList(String transaction) { 
    String[] items = transaction.trim().split(","); 
    List<String>list = new ArrayList<String>(); 
    for (String item :items) { 
      list.add(item); 
    } 
    returnlist; 
  } 

Step 5: Remove infrequent items and null transactions

Create a method named removeOneItemAndNullTransactions to remove infrequent items and null transactions:

static List<String> removeOneItemAndNullTransactions(List<String>list, int i) { 
    if ((list == null) || (list.isEmpty())) { 
      returnlist; 
    } 
    if ((i< 0) || (i> (list.size() - 1))) { 
      returnlist; 
    } 
    List<String>cloned = new ArrayList<String>(list); 
    cloned.remove(i); 
    return cloned; 
  } 

Step 6: Flat mapping and 1-itemsets creation (map phase 1)

Do the flatmap and create 1-itemsets. Finally, save the patterns:

JavaPairRDD<List<String>, Integer> patterns = transactions.toJavaRDD() 
        .flatMapToPair(new PairFlatMapFunction<String, List<String>, Integer>() { 
          @Override 
  public Iterator<Tuple2<List<String>, Integer>> call(String transaction) { 
  List<String> list = toList(transaction); 
  List<List<String>> combinations = Combination.findSortedCombinations(list); 
  List<Tuple2<List<String>, Integer>> result = new ArrayList<Tuple2<List<String>, Integer>>(); 
for (List<String> combList : combinations) { 
  if (combList.size() > 0) { 
  result.add(new Tuple2<List<String>, Integer>(combList, 1)); 
              } 
            } 
    return result.iterator(); 
          } 
        }); 
    patterns.saveAsTextFile("output/1itemsets"); 

Note

Note that the last saving of the patterns RDD is for optional reference purposes and so that you can see the contents of the RDDs.

The following is a screenshot of the 1-itemsets:

The algorithm solution using Spark Core

Figure 14: 1-itemsets

Step 7: Combine and reduce frequent patterns (reduce phase 1)

Combine and reduce all the frequent patterns, and save them:

JavaPairRDD<List<String>, Integer> combined = patterns.reduceByKey(new Function2<Integer, Integer, Integer>() { 
      public Integer call(Integer i1, Integer i2) { 
        int support = 0; 
        if (i1 + i2 >= 2) { 
          support = i1 + i2; 
        } 
        // if(support >= 2) 
        return support; 
      } 
    }); 
  combined.saveAsTextFile("output/frequent_patterns"); 

The following is a snapshot of the frequent patterns with their respective support (frequency in Figure 15):

The algorithm solution using Spark Core

Figure 15: Frequent patterns with their respective support (frequency)

Step 8: Generate all the candidate frequent patterns (map phase 2)

Generate all the candidate frequent patterns or sub-patterns by removing the 1-itemsets from the frequent patterns, and finally, save the candidate patterns:

JavaPairRDD<List<String>, Tuple2<List<String>, Integer>> candidate-patterns = combined.flatMapToPair( 
new PairFlatMapFunction<Tuple2<List<String>, Integer>, List<String>, Tuple2<List<String>, Integer>>() { 
          @Override 
public Iterator<Tuple2<List<String>, Tuple2<List<String>, Integer>>> call( 
Tuple2<List<String>, Integer> pattern) { 
List<Tuple2<List<String>, Tuple2<List<String>, Integer>>> result = new ArrayList<Tuple2<List<String>, Tuple2<List<String>, Integer>>>(); 
  List<String> list = pattern._1; 
  frequency = pattern._2; 
  result.add(new Tuple2(list, new Tuple2(null, frequency))); 
            if (list.size() == 1) { 
              return result.iterator(); 
            } 
 
  // pattern has more than one item 
  // result.add(new Tuple2(list, new Tuple2(null,size))); 
    for (int i = 0; i < list.size(); i++) { 
    List<String> sublist = removeOneItem(list, i); 
              result.add(new Tuple2<List<String>, Tuple2<List<String>, Integer>>(sublist, 
                  new Tuple2(list, frequency))); 
            } 
            return result.iterator(); 
          } 
        }); 
candidate-patterns.saveAsTextFile("output/sub_patterns"); 

The following is a snapshot of the sub-patterns:

The algorithm solution using Spark Core

Figure 16: Sub-patterns for the items

Step 9: Combine all the sub-patterns

Combine all the sub-patterns and save them on disk or persist on memory:

JavaPairRDD<List<String>, Iterable<Tuple2<List<String>, Integer>>>rules = candidate_patterns.groupByKey(); 
rules.saveAsTextFile("Output/combined_subpatterns"); 

The following is a screenshot of the candidate patterns (sub-patterns) in combined form:

The algorithm solution using Spark Core

Figure 17: Candidate patterns (sub-patterns) in combined form

Step 10: Generate association rules

Generate all the association rules from the sub-patterns (reduce phase 2) by specifying the confidence and lift:

JavaRDD<List<Tuple4<List<String>, List<String>, Double, Double>>> assocRules = rules.map( 
        new Function<Tuple2<List<String>, Iterable<Tuple2<List<String>, Integer>>>, List<Tuple4<List<String>, List<String>, Double, Double>>>() { 
          @Override 
public List<Tuple4<List<String>, List<String>, Double, Double>> call( 
Tuple2<List<String>, Iterable<Tuple2<List<String>, Integer>>> in) throws Exception { 
 
List<Tuple4<List<String>, List<String>, Double, Double>> result = new ArrayList<Tuple4<List<String>, List<String>, Double, Double>>(); 
  List<String> fromList = in._1; 
  Iterable<Tuple2<List<String>, Integer>> to = in._2; 
  List<Tuple2<List<String>, Integer>> toList = new ArrayList<Tuple2<List<String>, Integer>>(); 
Tuple2<List<String>, Integer> fromCount = null; 
      for (Tuple2<List<String>, Integer> t2 : to) { 
        // find the "count" object 
      if (t2._1 == null) { 
                fromCount = t2; 
              } else { 
                toList.add(t2); 
              } 
            } 
            if (toList.isEmpty()) { 
              return result; 
            } 
for (Tuple2<List<String>, Integer> t2 : toList) { 
  double confidence = (double) t2._2 / (double) fromCount._2; 
double lift = confidence / (double) t2._2; 
double support = (double) fromCount._2; 
List<String> t2List = new ArrayList<String>(t2._1); 
t2List.removeAll(fromList); 
if (support >= 2.0 && fromList != null && t2List != null) { 
  result.add(new Tuple4(fromList, t2List, support, confidence)); 
System.out.println(fromList + "=>" + t2List + "," + support + "," + confidence + "," + lift); 
              } 
            } 
            return result; 
          } 
        }); 
assocRules.saveAsTextFile("output/association_rules_with_conf_lift"); 

The following is the output of the association rules including their confidence and lift. For more details on support, confidence, and lift, refer to the problem statement section.

[Antecedent=>Consequent], Support, Confidence, Lift:

The algorithm solution using Spark Core

Figure 18: Association rules including their confidence and lift

Tuning and setting the correct parameters in SAMBA

Note that if you attempt to use the default parameter settings as support = 0.1 and confidence = 0.6, you might end up with null rules, or technically, no rules, to be generated. You might be wondering why. Actually, the default support of 0.1 means that in order to generate an association rule, an item must have appeared in at least 0.1 * 9385 = 938.5 transactions or 938.5 times (for the dataset we are using, |N| = 9385).

However, in this regard, in their book titled Machine Learning with R, Packt Publishing, 2015, Brett Lantz at el. argue that there is one way to tackle this issue while setting support. They suggest considering the minimum number of transactions needed before you would consider a pattern's interestingness. Moreover, for example, you could also argue that if an item is purchased twice a day (which is approximately 60 times a month), then it may be non-trivial to consider that transaction.

From this perspective, it is possible to estimate how to set the value of support needed to find only rules matching at least that many transactions. Consequently, you can set the value of minimum support as 0.006, because 60 out of 9,835 equals 0.006; we'll try setting the support there first.

On the other hand, setting the minimum confidence also requires a tricky balance and in this regard, again, we would like to refer you to the book by Brett Lantz et al. titled Machine Learning with R, Packt Publishing, 2015. If confidence is too low, obviously we might be incredulous with a pretty large of unreliable rules false positive results.

As a result, the optimum value of the minimum confidence threshold depends heavily on the goals of your analysis. Consequently, if you start with conservative values, you can always reduce them to broaden the search if you aren't finding actionable intelligence. If you set the minimum confidence threshold at 0.25, it means that in order to be included in the results, the rule has to be correct at least 25 percent of the time. This will eliminate the most unreliable rules while allowing some room for us to modify behavior with targeted promotions of the product.

Now, let's talk about the third parameter, the lift. Before suggesting how to set the value of the lift, let's see a practical example of how it might affect the generation of the association rules in the first place. For the third time, we refer to the book by Brett Lantz et al. titled Machine Learning with R, Packt Publishing, 2015.

For example, suppose at a supermarket store, many people often purchase milk and bread together. Therefore, naturally, you would expect to find many transactions that contain both milk and bread. However, if lift (milk => bread) is greater than 1, this implies that the two items are found together more often than one would expect by chance. As a consequence, a large lift value is, therefore, a strong indicator that a rule is important, and reflects a true connection between the items in the transactions.

In summary, we need to set the values of these parameters carefully, by considering the preceding examples. However, as a standalone model the algorithms might take hours to finish. So, run the application with enough time. Alternatively, reduce the long transaction to reduce the time overhead.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset