Implementing Joins in Hadoop Map-Reduce using MapFiles - CodeProject

:

Introduction

In this article, I will explain implementing joins during the Map-phase in Hadoop Map-Reduce applications using MapFiles

Background

In my previous article

http://www.codeproject.com/Articles/869383/Implementing-Join-in-Hadoop-Map-Reduce

I presented the concept of Joining data from different sources in Hadoop, and presented the technique to perform joins during both the Map-phase and Reduce-phase.

This time round, I will discuss an alternate technique of joining during the Map-phase: Joining using MapFiles. Joining during Map-phase is faster, and using MapFiles overcomes the limitations of joining using Cache files during Map-phase of a Map-Reduce application.

Sequence Files and MapFiles

MapFiles are a type of Sequence Files in Hadoop that support random access to data stored in the Sequence File. Before moving further, I will explain Sequence Files.

Sequence Files in Hadoop are written and read directly to Objects (instead of requiring to read line by line into Text, then and populating the Class instances in the Mapper class). Sequence Files are written using the SequenceFileInputFormat and are written using the SequenceFileOutputFormat. Sequence Files are stored in binary format, and the records stored in the Sequence Files have sync markers every few records, due to which the Sequence Files are also splittable. In addition to this, Sequence Files support record level compression and block level compression. The latter has sync marker between each compressed block. Please take a look at the following link for more details on Sequence Files:

https://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/SequenceFile.html

Now coming back to MapFiles.

There are two parts of a MapFile:

1. Data

Data file is the actual sequence file containing the data as key value pairs, and is sorted by key. The fact that the Data file is sorted by key is ensured by the MapFile.Writer which throws IOException when MapFile.Writer.append method is called with an out of order key.

2. Index

Index file is the smaller sequence file that contains some (or all) of the keys of the Data file, in a sorted order, with byte offset of the position of data against each key within the Data file. This Index file is used to look-up the data.

The Index file is looked for the key when a lookup is performed. If the key is found, data is extracted from the Data file using the byte offset, and if the exact key is not found, the offset of the previous key (keys are sorted) is used to go to that offset in the Data file, which is then sequentially traversed to find the key, and the value against it is returned.

The number of keys in the Index file is configurable, and can be set using the setIndexInterval() method which tells the number of keys to be skipped between each key in the Index file. However care should be taken since a very small value can result in a large Index file, which is not ideal for Map-phase joins.

For using MapFiles, the key class should inherit from WritableComparable to enable sorting of the keys.

MapFiles can be also be used as input to Map-Reduce programs, using the SequenceFileInputFormat, which ignores the Index file and sequentially reads the Data file.

Please visit the following page to learn more about MapFiles:

https://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/MapFile.html

Advantages of Joining during Map-phase

At this stage, it is important to discuss the advantages and disadvantages of joining during the Map-phase instead of Reduce-phase of Map-Reduce applications. It has the disadvantage that the files to be joined should be small enough to fit in memory, but using MapFiles overcomes this limitation since MapFiles are splittable, can be compressed, and depending on the IndexInterval value, they are relatively small.

Joining during the Map-phase is faster since it skips the sort and shuffle phases, which are costly, and the reduce phase. For jobs that only join the data supplied to it, such jobs are faster if they are Map-only.

In addition to this, since MapFiles, being a type of Sequence Files, are compressible as previously mentioned, which further reduces their size, making them suitable alternative of Cache files when the data to be joined is comparatively large. This allows to avail the advantages of Map-phase joins despite larger data files. In addition to this, since MapFiles are sorted, the lookup of keys is very fast, and this fact makes MapFiles ideal for joining data quickly.

Scenario implemented in this example

I shall use the same data files that were used in the previous article i.e. Microsoft's Adventure Works data. The .csv files for creating the Adventure Works Database along with the script file can be downloaded from:

http://msftdbprodsamples.codeplex.com/releases/view/55330

Download the Adventure Works 2012 OLTP Script. In tis example, the .csv files SalesOrderDetail.csv and Products.csv are used.

The join is between SalesOrderDetail data and Products data, and with each record in SalesOrderDetail.csv file, the Product name is output. It can be seen as the following SQL code:

SELECT SalesOrderId, OrderQty, Product.Name, LineTotal
FROM SalesOrderDetail join Products on SalesOrderDetail.ProductId = Products.ProductId

It is clear from the SQL code above that the job in this example is a Map-only job.

Initial Project Setup

I will be using Eclipse with Maven to create the Hadoop Map-Reduce project. The process of setting up the Maven plug-in in Eclipse is outside the scope of this article.

  • Open Eclipse, select new Project, and select Maven Project. Click next.
  • Select the check box Create a Simple Project.
  • Name the project. I am using the following:
Group Id: com.example.mapfileexample
Artifact Id: MapFileExample
Name: MapFileExample
  • Add the Hadoop dependencies. In eclipse, in the project structure (usually on the left), select the pom.xml file, in the Dependencies tab, select Add button and enter the following values:
Group Id: org.apache.hadoop
Artifact id: hadoop-client
Version: 2.5.1
  • Create the class named Driver, which is the main class of this application. Paste the following code in the class:
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapFile.Writer.Option;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Driver extends Configured implements Tool {

    public int run(String[] allArgs) throws Exception {
        return 0;
    }
    
    public static void main(String[] args) throws Exception
    {
        
        Configuration conf = new Configuration();
        int output = ToolRunner.run(new Driver(), args);
    }
}

Code to create and populate a MapFile

MapFile can be created by using the MapFile.Writer. In this example, I will read the Products.csv file line by line using FileInputStream, and output it to a MapFile with ProductId (first column) as key and Name (second column) as value. Note that this part of creating the MapFile is outside map-reduce, but MapFile can be the output of a MapReduce job.

The function that does so is:

private static void CreateMapFile(Configuration conf, String inputFilePath, String outputFilePath, int keyIndex, int valueIndex) throws IOException {
    Path outputLocation = new Path(outputFilePath);
    
    Option keyClass = (Option)MapFile.Writer.keyClass(IntWritable.class);
    SequenceFile.Writer.Option valueClass = MapFile.Writer.valueClass(Text.class);
    MapFile.Writer.setIndexInterval(conf, 1);
    MapFile.Writer writer = new MapFile.Writer(conf, outputLocation, keyClass, valueClass);
        
    File file = new File(inputFilePath);
    FileInputStream fis = new FileInputStream(file);
    BufferedReader br = new BufferedReader(new InputStreamReader(fis));
    String line;
    br.readLine();
    int i = 0;
    while ((line = br.readLine()) != null){
        String[] lineItems = line.split("\\t");
        IntWritable key = new IntWritable(Integer.parseInt(lineItems[keyIndex]));
        Text value = new Text(lineItems[valueIndex]);
        writer.append(key, value);
        i++;
    }
    br.close();
    writer.close();
}

As can be seen from the code, the function takes the input file path, and read the Products.csv file in which values are tab separated. We set the IndexInterval equal to 1 beucase it is a small file, and we can have a large number of keys in the Index file.

The MapFile.Writer constructor takes the Configuration instance, location where the file is to be created, and key class and value class wrapped in instances of org.apache.hadoop.io.SequenceFile.Writer.Option. Note that in the statement

Option keyClass = (Option)MapFile.Writer.keyClass(IntWritable.class);

The Option class is org.apache.hadoop.io.SequenceFile.Writer.Option which derives from  org.apache.hadoop.io.SequenceFile.Writer.Option class.

Here, keys of the MapFile are instances of IntWritable, and values are instances of Text. Next, BufferedReader is created and the file is read line by line. Each line is split on the Tab (\t) character, and first column is picked as key, and second (name column) is picked as value. Last step is to call MapFile.Writer.append method to append keys and values to the MapFile.

The CreateMapFile function can be called from main as:

createMapFile(conf, "/home/hduser/Desktop/sales_data/Product.csv",
"/home/hduser/Desktop/sales_data/ProductMapFile/", 0, 1);

Note that this function is not part of the Map phase and it is not used during the execution of the job. This method should be called separately just to create the MapFile, and although I have kept it in the Driver class, keeping it here is not necessary.

The last step before extending the Map class is to run the program after commenting out the call to ToolRunner.run method, with the above mentioned call in the main method. Remember to replace the source and destination file paths with the ones in your computer.

After running the file, open the destination folder to see Index and Data files (parts of the MapFile) created.

Mapper

In the Mapper, the created Map file is read, and is used to lookup using Product Ids to get Product Names. These Product Names are then outputted from the map function. The code of Mapper class called MapFileExampleMapper is:

public static class MapFileExampleMapper extends Mapper<LongWritable, Text, NullWritable, Text>
{
    private MapFile.Reader reader = null;
        
    public void setup(Context context) throws IOException{
        Configuration conf = context.getConfiguration();
        FileSystem fs = FileSystem.get(conf);
        Path dir = new Path(context.getCacheFiles()[0]);
        this.reader = new MapFile.Reader(fs, dir.toString(), conf);
    }
        
    private Text findKey(IntWritable key) throws IOException{
        Text value = new Text();
        this.reader.get(key, value);
        return value;
    }
        
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
        String[] values = value.toString().split("\\t");
        IntWritable productId = new IntWritable(Integer.parseInt(values[4]));
        Text productName = findKey(productId);
            
        context.write(NullWritable.get(), new Text(values[0] + "," + values[3] +  ", " + productName.toString() + " , " + values[8]));
            
    }
        
    public void cleanup(Context context) throws IOException{
        reader.close();
    }
}

The MapFile location set in Configuration is retrieved in the setup method of the Mapper class. The setup method is called once for each Map task. Here, the MapFile is read using the MapFile.Reader instance, which provides the get method to retrieve the value for keys.

Next, during each call of the map method of the Mapper class, the Product Id (fourth column in SalesOrderDetail.csv file) is passed to the findKey method, which uses MapFile.Reader.get method to search for the value against each key that is passed. The retrieved names of Products become part of the output of the Mapper class.

run Method

Next step is the run method, which sets the number of reducers equal to zero since it is a map-only job. Also don't forget to remove the call to CreateMapFile method in main if it was previously run from there.

Code for run method is:

public int run(String[] allArgs) throws Exception {
    Job job = Job.getInstance(getConf());
    job.setJarByClass(Driver.class);
        
    job.setMapperClass(MapFileExampleMapper.class);
    job.setMapOutputKeyClass(NullWritable.class);
    job.setMapOutputValueClass(Text.class);
    
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
        
    String[] args = new GenericOptionsParser(getConf(), allArgs).getRemainingArgs();
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[2]));
    URI[] cacheFileURI = { new URI(args[1]) };
    job.setCacheFiles(cacheFileURI);
        
    job.setNumReduceTasks(0);
        
    boolean result = job.waitForCompletion(true);
        
    if (result)    {
        return 1;
    } else {
        return 0;
    }
}

It sets the path of the passed MapFile as cache file, which is used in the Mapper class. Finally its time to compile and run the job.

Running the job in Local Mode

For running the job in local mode from within Eclipe:

  • Build the project, then right click the project in Package Explorer, select Run as → Run Configurations.
  • Right click Java Run Configurations, and select New.
  • Name the new configuration. I call it MapFileExample Java Run config.
  • In the Arguements Tab, enter in the Program Arguements field the path to SalesOrderDetails.csv file and the path to the MapFile folder containing Index and Data files, and the output folder. On my computer, it looks like:
/home/hduser/Desktop/sales_data/SalesOrderDetail.csv /home/hduser/Desktop/sales_data/ProductMapFile /home/hduser/Desktop/sales_data/mapfileOutput1

  • Click Apply and then Run.
  • After execution is complete, inspect the output

Running the job on the Cluster

First, create folder in the Hadoop file system for items related to this example. I will call the folder mapfileexample

hadoop fs -mkdir /hduser/mapfileexample/

Next, copy the SalesOrderDetail.csv to it using copyFromLocal command:

hadoop fs -copyFromLocal /home/hduser/Desktop/sales_data/SalesOrderDetail.csv /hduser/mapfileexample/

Next, copy the MapFile:

hadoop fs -mkdir /hduser/mapfileexample/ProductMapFile
hadoop fs -copyFromLocal /home/hduser/Desktop/sales_data/ProductMapFile/* /hduser/mapfileexample/ProductMapFile/

And then, run the job:

hadoop jar /home/hduser/Desktop/sales_data/MapFileExample-0.0.1-SNAPSHOT.jar Driver /hduser/mapfileexample/SalesOrderDetail.csv /hduser/mapfileexample/ProductMapFile /hduser/mapfileexample/output1

Finally, copy the output to local file system, and examine the output. Run the following:

hadoop fs -copyToLocal /hduser/mapfileexample/output1/* /home/hduser/Desktop/sales_data/output/fromCluster/

Open the part-m-00000 file, and see that the name of each Product is present in each row of the resultant output.

Final Code

This is what the code (Driver.java file) looks like in the end:

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapFile.Writer.Option;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Driver extends Configured implements Tool {
    
    public static class MapFileExampleMapper extends Mapper<LongWritable, Text, NullWritable, Text>
    {
        private MapFile.Reader reader = null;
        
        public void setup(Context context) throws IOException{
            Configuration conf = context.getConfiguration();
            FileSystem fs = FileSystem.get(conf);
            Path dir = new Path(context.getCacheFiles()[0]);
            this.reader = new MapFile.Reader(fs, dir.toString(), conf);
        }
        
        private Text findKey(IntWritable key) throws IOException{
            Text value = new Text();
            this.reader.get(key, value);
            return value;
        }
        
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String[] values = value.toString().split("\\t");
            IntWritable productId = new IntWritable(Integer.parseInt(values[4]));
            Text productName = findKey(productId);
            
            context.write(NullWritable.get(), new Text(values[0] + "," + values[3] +  ", " + productName.toString() + " , " + values[8]));
            
        }
        
        public void cleanup(Context context) throws IOException{
            reader.close();
        }
    }

    public int run(String[] allArgs) throws Exception {
        Job job = Job.getInstance(getConf());
        job.setJarByClass(Driver.class);
        
        job.setMapperClass(MapFileExampleMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        String[] args = new GenericOptionsParser(getConf(), allArgs).getRemainingArgs();
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        URI[] cacheFileURI = { new URI(args[1]) };
        job.setCacheFiles(cacheFileURI);
        
        job.setNumReduceTasks(0);
        
        boolean result = job.waitForCompletion(true);
        
        if (result)
        {
            return 1;
        }
        else
        {
            return 0;
        }
    }
    
    public static void main(String[] args) throws Exception
    {
        
        Configuration conf = new Configuration();
        int output = ToolRunner.run(new Driver(), args);
        
        /*CreateMapFile(conf, "/home/hduser/Desktop/sales_data/Product.csv",
                "/home/hduser/Desktop/sales_data/ProductMapFile/", 0, 1);*/
    }
    
    private static void CreateMapFile(Configuration conf, String inputFilePath, String outputFilePath,
            int keyIndex, int valueIndex) throws IOException
    {
        Path outputLocation = new Path(outputFilePath);
        
        Option keyClass = (Option)MapFile.Writer.keyClass(IntWritable.class);
        SequenceFile.Writer.Option valueClass = MapFile.Writer.valueClass(Text.class);
        MapFile.Writer.setIndexInterval(conf, 1);
        MapFile.Writer writer = new MapFile.Writer(conf, outputLocation, keyClass, valueClass);
        
        File file = new File(inputFilePath);
        FileInputStream fis = new FileInputStream(file);
        BufferedReader br = new BufferedReader(new InputStreamReader(fis));
        String line;
        br.readLine();
        int i = 0;
        while ((line = br.readLine()) != null){
            String[] lineItems = line.split("\\t");
            IntWritable key = new IntWritable(Integer.parseInt(lineItems[keyIndex]));
            Text value = new Text(lineItems[valueIndex]);
            writer.append(key, value);
            i++;
        }
        br.close();
        writer.close();
    }
}

Conclusion

This article explains how to perform Map-phase joins using MapFiles. It is often useful to perform joins during the Map-phase instead of Reduce-phase to make the jobs faster. Joining using the MapFiles has the advantage that MapFiles are splittable, they can be compressed, and index file is small. All these factors make MapFiles suitable for joining larger amount of data during the Map-phase.

This article is the continuation of my previous article on performing joins in Hadoop Map-Reduce jobs, and as an exercise, this technique can be applied to the example of the previous article.

Feedback, suggestions, corrections are welcome.