Due to popular request, I’ve updated my simple framework for processing ZIP files in Hadoop Map/Reduce jobs. Previously the only easy solution was to unzip files locally and then upload them to the Hadoop Distributed File System (HDFS) for processing. This adds a lot of unnecessary complexity when you are dealing with thousands of ZIP files; Java already has a ZipInputStream – it should be a lot easier.
When you create a Map/Reduce job in Java, you set the InputFormat & OutputFormat you wish to use. Most people have heard of and used the basic ones like TextInputFormat and SequenceFileInputFormat – but it’s fairly trivial to extend FileInputFormat to create new ones.
There are two parts to the puzzle:
- InputFormat which opens the data source and splits the data into chunks,
- RecordReader which actually parses the chunks into Key/Value pairs.
Once you’ve implemented these two pieces, the rest of the Map/Reduce framework can use your new-fangled InputFormat as normal. This is also true for OutputFormat‘s exception you need to implement a RecordWriter instead of a RecordReader.
Introducing ZipFileInputFormat & ZipFileRecordReader
As you might have guessed, ZipFileInputFormat & ZipFileRecordReader do exactly what their names suggest. ZipFileInputFormat is actually just a very thin wrapper around FileInputFormat to prevent it from splitting files – ZIP files aren’t a split-able file format. The real meat to the code is in ZipFileRecordReader where it uses the Java built in ZipInputStream to parse out each ZipEntry.
Each file inside a ZIP archive is a separate ZipEntry, among other attributes you can extract the original file name and decompress the contents. Despite most ZIP software presenting hierarchical directory structures, the namespace is effectively ‘flat’ with ZipEntry‘s i.e. you get “subdir2/subdir2subdir/Ulysses-22.txt” as a filename instead of a complex directory structure.
Since filename and content neatly fit the Key/Value format Map/Reduce is accustomed to, it is a no-brainer to use these as the for the ZipFileInputFormat & ZipFileRecordReader. Passing the filename to the mapper allows you to easily filter out the files you desire from ZIP archives while ignoring the others.
Zen and the Art of Tolerating Failure
Anyone that’s worked ZIP files before has probably encountered corrupt files at some point. Having worked on a Map/Reduce use-case that trawled through tens of thousands of ZIP files (of varying quality) – I have already encountered Jobs that fail because of tiny file corruptions.
ZipFileInputFormat has a handy method “setLenient( boolean lenient )”, this defaults to false meaning any errors processing a ZIP file will be fatal to the overall Job. However if you are dealing with ZIP files of varying quality you can “setLenient( true )” which means ZIP parsing problems will be quietly ignored.
Note: with the “setLenient( true )” ZIP files may be partially processed. Take the example of a truncated ZIP file, the contents of the ZIP archive up to the point of the file truncation will be passed to Mappers to be processed. Upon encountering the file corruption the Mapper will be informed that the file is “finished / completed”, and move on to the Reducer phase.
A word of caution…
One of the consequences of working with ZIP files is that they are not split-able. If your use-case is a single ZIP file with GB’s of content inside it – you might be better off extracting the files into HDFS and processing them as normal. This ZipFileInputFormat solution is ideally suited to use-cases which need to process the contents of hundreds/thousands of ZIP files. Because each ZIP file is processed entirely by one Mapper slot it scales nicely.
That’s great, so how do I actually use it?
I’m glad you asked, in my initial post last year I kinda neglected to demonstrate exactly how to use it. To make amends I updated the source code and added 7 test cases to provide examples of its usage.
ZipFileTest showcases ZipFileInputFormat for common use-cases, because it is based on FileInputFormat you can pass it individual files, directories (which are recursed) or simple wildcard patterns to match multiple files.
This example demonstrates the simplest of use-cases:
|// Standard stuff|
|Job job = new Job(conf);|
|// Hello there ZipFileInputFormat!|
|// The output files will contain "Word [TAB] Count"|
|// We want to be fault-tolerant|
|ZipFileInputFormat.setLenient( true );|
|ZipFileInputFormat.setInputPaths(job, new Path("/data/archives/*.zip"));|
|TextOutputFormat.setOutputPath(job, new Path("/tmp/zip_wordcount"));|
Once you’ve prepared the Job, you need to make sense of the data in your Mapper, the example below is based on the infamous Wordcount Map/Reduce example:
|* This Mapper class checks the filename ends with the .txt extension, cleans|
|* the text and then applies the simple WordCount algorithm.|
|public static class MyMapper|
|extends Mapper<Text, BytesWritable, Text, IntWritable>|
|private final static IntWritable one = new IntWritable( 1 );|
|private Text word = new Text();|
|public void map( Text key, BytesWritable value, Context context )|
|throws IOException, InterruptedException|
|// NOTE: the filename is the *full* path within the ZIP file|
|// e.g. "subdir1/subsubdir2/Ulysses-18.txt"|
|String filename = key.toString();|
|LOG.info( "map: " + filename );|
|// We only want to process .txt files|
|if ( filename.endsWith(".txt") == false )|
|// Prepare the content|
|String content = new String( value.getBytes(), "UTF-8" );|
|content = content.replaceAll( "[^A-Za-z \n]", "" ).toLowerCase();|
|// Tokenize the content|
|StringTokenizer tokenizer = new StringTokenizer( content );|
|while ( tokenizer.hasMoreTokens() )|
|word.set( tokenizer.nextToken() );|
|context.write( word, one );|
The most important thing to note is the Key/Value arguments to the Mapper.
- Text key – contains the filename
- BytesWritable value – contains the uncompressed file contents
If you have any more questions, feel free to comment / contact me. When I get around to it I might post this patch for inclusion in Hadoop, but until then please grab the source and play. Always check the GitHub project page for the updated source.