MapReduce testing with PigUnit and JUnit - codecentric AG Blog

:

Unit testing. What is unit testing? How do we do it?
It is well known how a unit test of a (simple) Java class looks like:

  • there is preparation part of it, something that runs before all tests and/or before each of the tests (instantiating of needed things, mocking the behavior, …)
  • there is a real test part, a set of tests of the class, positive and negative tests
  • finally, there is a part where we reset and dispose of all the things we used during testing

This does not sound that complicated or complex. But we started from a premise that we have a simple Java class. What would happen to our unit test if the Java class it tests is not a well designed Java class? Let me change our premise and make things interesting. Let me show you how a test of an Apache Pig script would look like.

For those who are not familiar with Apache Pig, Pig scripts are programs written in Pig Latin. This is a procedural language with a compiler that compiles scripts into Java Map/Reduce jobs. A Pig script is easier to read and understand than equal Java M/R implementation would be. Also, it is easier to maintain. These two are main benefits of the approach.

I have prepared a short and simple Pig script, an awesomeness calculator, that is going to be my starting point. In next sentences, I will describe what it does, how it does it, and I will present a way how a test of it could look like.

Awesomeness calculator, as the name suggests, is implemented with the goal to calculate which users have awesomeness rating above average. Test data is highly reliable and taken from real life, so if by some chance it turns out that I have the highest rating its not a set up :).

The Pig script itself is pretty straightforward, for starters we need to load the input data which will be processed.

-- Load users from Hadoop File System
users = LOAD '/some/path/users.txt' USING PigStorage(',') AS (id:long, firstName:chararray, lastName:chararray, country:chararray, city:chararray, company:chararray);
 
-- Load ratings from Hadoop File System
awesomenessRating = LOAD '/some/path/rating.txt' USING PigStorage(',') AS (userId:long, rating:long);

-- Load users from Hadoop File System users = LOAD '/some/path/users.txt' USING PigStorage(',') AS (id:long, firstName:chararray, lastName:chararray, country:chararray, city:chararray, company:chararray);-- Load ratings from Hadoop File System awesomenessRating = LOAD '/some/path/rating.txt' USING PigStorage(',') AS (userId:long, rating:long);

As you can see, from the code above, we have one file which contains all the users and the other one with their awesomeness rating. Since we are reading from TSV (Tab Separated Values) files, Pig requires appropriate schema defined which will be used for representation of the columns within.

In order to actually get the information about user’s rating, we need to join these two aliases on userId column.

-- Join records by userId
joinedRecords = JOIN users BY id, awesomenessRating BY userId;

-- Join records by userId joinedRecords = JOIN users BY id, awesomenessRating BY userId;

Now we have the required information and we can proceed with our business. Lets filter only users with rating above average.

-- Filter users with awesomenessRating > 150
filteredRecords = FILTER joinedRecords BY awesomenessRating::rating > 150;

-- Filter users with awesomenessRating > 150 filteredRecords = FILTER joinedRecords BY awesomenessRating::rating > 150;

For the final part, we need to pick only the fields that we are interested in and save the results.

-- Generate fields that we are interested in
generatedRecords = FOREACH filteredRecords GENERATE
						users::id AS id,
						users::firstName AS firstName,
						users::country AS country,
						awesomenessRating::rating AS rating;
 
-- Store results
STORE generatedRecords INTO '/results/awesomeness' USING PigStorage();

-- Generate fields that we are interested in generatedRecords = FOREACH filteredRecords GENERATE users::id AS id, users::firstName AS firstName, users::country AS country, awesomenessRating::rating AS rating;-- Store results STORE generatedRecords INTO '/results/awesomeness' USING PigStorage();

After compiling, Pig compiler, will end up with one Map and one Reduce phase. Graphical representation follows:
MR phase

For this purpose I used Graphviz, a tool which will generate graphical representation of your Pig script in regards of Map/Reduce jobs. This can be very useful when you are working with more complex Pig scripts. It will show you what is really happening behind the scenes and help you improve performance.

Now that we are familiar with our Pig script we can start with unit tests. For unit testing of Pig scripts, there is PigUnit library provided by Apache group. It enables running of Pig script using JUnit. PigUnit can run in Local and MapReduce mode. By default Local mode will be used, this mode does not require a cluster. It will enable you to use your local file system as a Hadoop cluster, each time it will create a new local one. On the other hand, MapReduce mode does require Hadoop cluster and installation of HDFS. In this example, we will run PigUnit in Local mode, because we want to be able to run this test on every machine.

For this occasion, I have used Maven as dependency management tool, so I will start with freshly created Maven project (if you are not familiar with Maven, you have enough info here to get your started). First, lets add all libraries to the pom.xml that we will need for our little project.

The minimal required setup consists of four libraries:
1.org.apache.pig.pigunit – core component for running tests

<dependency>
  <groupId>org.apache.pig</groupId>
  <artifactId>pigunit</artifactId>
  <version>0.12.0</version>
  <scope>test</scope>
</dependency>

<dependency> <groupId>org.apache.pig</groupId> <artifactId>pigunit</artifactId> <version>0.12.0</version> <scope>test</scope> </dependency>

    2.org.apache.hadoop.hadoop-core – needed for working with Hadoop File System

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.2.1</version>
  <scope>test</scope>
</dependency>

<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> <scope>test</scope> </dependency>

    3.jline – needed for reading input

<dependency>
  <groupId>jline</groupId>
  <artifactId>jline</artifactId>
  <version>0.9.94</version>
  <scope>test</scope>
</dependency>

<dependency> <groupId>jline</groupId> <artifactId>jline</artifactId> <version>0.9.94</version> <scope>test</scope> </dependency>

    4.joda-time – needed for time operations used by PigUnit

<dependency>
  <groupId>joda-time</groupId>
  <artifactId>joda-time</artifactId>
  <version>1.6</version>
  <scope>test</scope>
</dependency>

<dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>1.6</version> <scope>test</scope> </dependency>

Lets get to the main part, what we need now is simple Java class which we will use to run our tests. Class PigTest is used to represent our actual test, we will use it to load our Pig script mentioned before.

private static PigTest test;

private static PigTest test;

As with all unit tests, we have setUp() method in which we override (mock) aliases with our input data. In this example those are aliases “users” and “awesomenessRating”. Note that by default PigUnit will override STORE and DUMP statements, so you dont have to worry about commenting out those in your Pig script.

@BeforeClass
public static void setUp() throws IOException, ParseException {
    test = new PigTest("src/main/resources/example.pig");
    test.override("users", "users = LOAD '" + TEST_PATH + "input/users.txt' USING PigStorage(',') AS (id:long, firstName:chararray, lastName:chararray, country:chararray, city:chararray, company:chararray);");
    test.override("awesomenessRating", "awesomenessRating = LOAD '" + TEST_PATH + "input/awesomeness-rating.txt' USING PigStorage(',') AS (userId:long, rating:long);");
}

@BeforeClass public static void setUp() throws IOException, ParseException { test = new PigTest("src/main/resources/example.pig"); test.override("users", "users = LOAD '" + TEST_PATH + "input/users.txt' USING PigStorage(',') AS (id:long, firstName:chararray, lastName:chararray, country:chararray, city:chararray, company:chararray);"); test.override("awesomenessRating", "awesomenessRating = LOAD '" + TEST_PATH + "input/awesomeness-rating.txt' USING PigStorage(',') AS (userId:long, rating:long);"); }

As you can see, we are loading our mocked data from already prepared files “users.txt” and “awesomeness-rating.txt”. Take care that PigUnit is using comma (,) delimiter as default one. So our input date looks like this: users.txt with columns: id, firstName, lastName, country, city and company.

1,Ozren,Gulan,Serbia,Novi Sad,codecentric
2,Petar,Petrovic,Serbia,Belgrade,some.company
3,John,Smith,England,London,brits.co
4,Linda,Jefferson,USA,New York,ae.com
5,Oscar,Hugo,Sweden,Stockholm,swe.co
123,Random,Random,Random,Random,Random

1,Ozren,Gulan,Serbia,Novi Sad,codecentric 2,Petar,Petrovic,Serbia,Belgrade,some.company 3,John,Smith,England,London,brits.co 4,Linda,Jefferson,USA,New York,ae.com 5,Oscar,Hugo,Sweden,Stockholm,swe.co 123,Random,Random,Random,Random,Random

awesomeness-rating.txt with columns: userId and rating.

1,1000
2,15
3,200
4,11
5,5

1,1000 2,15 3,200 4,11 5,5

After we prepared our PigUnit test and input data, we can assert desired aliases and see if actual results match the expected ones. Since our Pig script is rather simple, we dont have much aliases to choose from. I will just show you one example, others are done analogous.

We need to create a method with annotaion @Test which will be execute with JUnit. As you can see in the code below, all that needs to be done is to define which alias we want to test and expected data for it.

@Test
public void testGeneratedRecords() throws IOException, ParseException {
    test.assertOutput("generatedRecords", new File(TEST_PATH + "results/generatedRecords.txt"));
}

@Test public void testGeneratedRecords() throws IOException, ParseException { test.assertOutput("generatedRecords", new File(TEST_PATH + "results/generatedRecords.txt")); }

generatedRecords.txt showing expected result of our aggregation with mocked input data:

(1,Ozren,Serbia,1000)
(3,John,England,200)

(1,Ozren,Serbia,1000) (3,John,England,200)

Now we can execute test using JUnit and confirm that data was asserted successfully and that our tests passed!

PigUnit library is still in development and for time being it offers fairly little possibilities. That being said, if we consider that Pig Latin is scripting language still in his growth used mainly for performing batch tasks, having unit tests will greatly contribute to continuos integration and agile development.

Source code for this example is attached here, so you can check it out and have some fun on your local box.

Thanks for reading, if you have any questions feel free to ask in comments or contact me at ozren.gulan@codecentric.de.