Implementing a custom partitioner

In this section, we'll implement a custom partitioner and create a partitioner that takes a list of parses with ranges. If our key falls into a specific range, we will assign the partition number index of the list.

We will cover the following topics:

  • Implementing a custom partitioner
  • Implementing a range partitioner
  • Testing our partitioner

We will implement the logic range partitioning based on our own range partitioning and then test our partitioner. Let's start with the black box test without looking at the implementation.

The first part of the code is similar to what we have used already, but this time we have keyBy amount of data, as shown in the following example:

 val keysWithValuesList =
Array(
UserTransaction("A", 100),
UserTransaction("B", 4),
UserTransaction("A", 100001),
UserTransaction("B", 10),
UserTransaction("C", 10)
)
val data = spark.parallelize(keysWithValuesList)
val keyed = data.keyBy(_.amount)

We are keying by the amount and we have the following keys: 100, 4100001, 10, and 10.

We will then create a partitioner and call it CustomRangePartitioner, which will take a list of tuples, as shown in the following example:

 val partitioned = keyed.partitionBy(new CustomRangePartitioner(List((0,100), (100, 10000), (10000, 1000000))))

The first element is from 0 to 100, which means if the key is within the range of 0 to 100, it should go to partition 0. So, we have four keys that should fall into that partition. The next partition number has a range of 100 and 10000, so every record within that range should fall into partition number 1, inclusive of both ends. The last range is between 10000 and 1000000 elements, so, if the record is between that range, it should fall into that partition. If we have an element out of range, then the partitioner will fail with an illegal argument exception.

Let's look at the following example, which shows the implementation of our custom range partitioner:

class CustomRangePartitioner(ranges: List[(Int,Int)]) extends Partitioner{
override def numPartitions: Int = ranges.size
override def getPartition(key: Any): Int = {
if(!key.isInstanceOf[Int]){
throw new IllegalArgumentException("partitioner works only for Int type")
}
val keyInt = key.asInstanceOf[Int]
val index = ranges.lastIndexWhere(v => keyInt >= v._1 && keyInt <= v._2)
println(s"for key: $key return $index")
index
}
}

It takes ranges as an argument list of tuples, as shown in the following example:

(ranges: List[(Int,Int)])

Our numPartitions should be equal to ranges.size, so the number of partitions is equal to the number of ranges in size.

Next, we have the getPartition method. First, our partitioner will work only for integers, as shown in the following example:

if(!key.isInstanceOf[Int])

 We can see that this is an integer and cannot be used for other types. For the same reason, we first need to check whether our key is an instance of integer, and, if it is not, we get an IllegalArgumentException because that partitioner works only for the int type.

We can now test our keyInt by using asInstanceOf. Once this is done, we are able to iterate over ranges and take the last range when the index is between predicates. Our predicate is a tuple v, and should be as follows:

 val index = ranges.lastIndexWhere(v => keyInt >= v._1 && keyInt <= v._2)

KeyInt should be more than or equal to v._1, which is the first element of the tuple, but it should also be lower than the second element, v._2.

The start of the range is v._1 and the end of the range is v._2, so we can check that our element is within range.

In the end, we will print the for key we found in the index for debugging purposes, and we will return the index, which will be our partition. This is shown in the following example:

println(s"for key: $key return $index")

Let's start the following test:

We can see that for key 100001, the code returned partition number 2, which is as expected. For key 100 returns partition one and for 10, 4, 10 it returns partition zero, which means our code works as expected.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset