Mapred vs MapReduce – The API question of Hadoop and impact on the Ecosystem

I will describe in this blog post the difference between the mapred.* and mapreduce.* API in Hadoop with respect to the custom InputFormats and OutputFormats. Additionally I will write on the impact of having both APIs on the Hadoop Ecosystem and related Big Data platforms, such as Apache Flink, Apache Hive and Apache Spark. Finally, I will conclude and outlook future needs for innovation.

These APIs are of key importance for nearly all Big Data applications based on popular Big Data platforms. They all leverage these APIs to connect to various file and non-filebased data sources and sinks. These APIs offer transparent access to open any custom or standard file format on any storage on-premise (e.g. HDFS) or in the cloud (e.g. Amazon S3 or Microsoft Azure Blob Storage). Furthermore, they can be used to transparently decompress files using any given codec without the need that the fileformat needs to support the codec. This allows introducing new compression formats without changing the underlying fileformat (* this might be not in all situations useful, e.g. in case of of unsplitable compression formats. Hence ORC / parquet use a different approach for parallel processing of compressed data).

Some History

Basically from a custom input/output format perspective both APIs provide essentially the same functionality and performance, but the interface is different. The APIs have merely been introduced to provide different ways to express MapReduce jobs, but this is not the focus here. Historically the idea was to move from the older mapred.* API to the new mapreduce.* API. In fact, mapred.* was already marked as deprecated with the aim to be removed. However, a very rare event happened in the software world to un-deprecate the mapred.* API. Officially, the reason was that the new mapreduce.* API was not yet finished leading to confusions and deprecated warnings. Interestingly, there was the plan already to deprecate the mapred.* API again, but it has never been executed (status: 10/2017).

Effect on Big Data platforms

I mentioned before that several open and commercial Big Data platforms essentially rely on this APIs, because they want to have out-of-the box support for various filesystems and file formats without reinventing the wheel. This is how the coexistence of the APIs affected them.

Hadoop MapReduce

Naturally, since the mapred.* and mapreduce.* API stem from Hadoop MapReduce both are supported. However, in the supplied documentation for Hadoop you find only examples using the mapreduce.* API. Popular books, such as “Hadoop – The Definitive Guide” by Tom White also only use the mapreduce.* API.

Hive

Since Hive is already a very mature product and was open sourced by Facebook rather recently after Hadoop appeared it does use the mapred.* API. This means to leverage any custom input/output format in Hive (more about this later) it needs to use the mapred.* API. If the input/output format supports only the new mapreduce.* API then an error will be thrown in Hive.

Flink

Apache Flink supports custom Hadoop Input and Output formats for the mapred.* and mapreduce.* API.

Additionally you can implement custom Flink Input and Output formats. I found the API very concise and if you have anyway already developed a reusable input/output format to support the two Hadoop APIs then it is rather trivial to create a custom Flink FileInput and FileOutput format.

Spark

Apache spark supports custom Hadoop Input and Output formats for the mapred.* and mapreduce.* API. In case of a FileInput format you can use the hadoopRDD(mapred.*)/newAPIHadoopRDD (mapreduce.*) methods of the SparkContext class. In case of a FileOutput format you can use the saveAsHadoopFile (mapred,*)/saveAsNewHadoopFile (mapreduce.*) methods of the PairRDD class.

This loads/saves data only from/to RDDs in Spark. However, in order to use the modern DataFrame (aka Dataset<Row>) API with all its features, such as compact and highly efficient in-memory representation, you need to convert them explicitly which takes time. This is, for example, not the case for Flink which always uses the highly efficient in-memory representation.

Apache Spark has a DataSource API, which can be compared to a custom InputFormat in Spark terms. However, it is not as well designed as the one for Apache Flink. For instance, most data loading/storing operations happening in the driver (the node running the program that coordinates the execution of distributed tasks), which makes the driver a bottleneck. Certain DataSources avoided this by calling the HadoopRdd/newAPIHadoopRDD methods of Spark within the data source, which distributes loading to the executors and not the driver. For instance, this is the way the HadoopCryptoLedger library to read blockchains, such as Bitcoin, did it.

Hence, at some point of time the Apache Spark project decided to add certain data sources to the Apache Spark source code and use not the public data source API and create a dedicated internal data source API bound to each Spark release. Hence, these formats did not need to use the HadoopRdd/newAPIHadoopRDD methods, but could be more native. Nevertheless, as mentioned before, the Hadoop APIs are still used to read/write from/to various different filesystem or compressed files.

Other external data sources, such as the one for the Avro format or the HadoopOffice library for reading/writing office documents, such as Excel, then also leveraged these internal APIs, which had the disadvantage that they may not work immediately for new Spark version, because the internal APIs arbitrarily changed.

In the end the Spark developers recognized that this is not an optimal situation and started thinking about a data source V2 API.

Impala

Impala supports only a fixed set of Input/Output formats and no custom input or iutput format. These fixed input/output formats are based on the Hadoop APIs for the aforementioned reasons. Nevertheless, you can use any custom input/output format by defining the tables in Hive and letting Impala read/write via Hive.

Pig

Pig was an early adaptor of the new mapreduce.* API and does not support the mapred.* API anymore for custom input/output formats. You can run PIG on different engines, such as TEZ or Spark.

Beam

Apache Beam is a programing framework in which you express computation once and it can be executed on different big data engines, such as Apache Apex, Apache Flink, Apache Spark, Google Cloud Dataflow and Apache Gearpump.

This means it can be easier to switch between different platforms depending on needs at the cost that you may not always be able to use the latest new feature of the specific engine.

Apache Beam supports reading of Hadoop InputFormats using its HadoopInputFormatIO interface. Apache Beam supports writing of Hadoop OutputFormats using its HDFSFileSink interface. Both support only the new mapreduce.* API.

Effect on file formats

Provided as part of Hadoop

Hadoop provides file formats as part of its distribution. However, there is a difference of file formats provided by mapreduce.* API (input / output) and mapred.* API as shown in the table.

Mapreduce.* API Mapred.* API
KeyValueTextInputFormat KeyValueTextInputFormat
MapFileOutputFormat MapFileOutputFormat
MultipleInputs (covers more than mapred.*) MultiFileInputFormat
SequenceFileAsBinaryInputFormat SequenceFileAsBinaryInputFormat
SequenceFileAsBinaryOutputFormat SequenceFileAsBinaryOutputFormat
SequenceFileAsTextInputFormat SequenceFileAsTextInputFormat
SequenceFileInputFormat SequenceFileInputFormat
SequenceFileOutputFormat SequenceFileOutputFormat
TextInputFormat TextInputFormat
TextOutputFormat TextOutputFormat
CombineFileInputFormat CombineFileInputFormat
CombineSequenceFileInputFormat CombineSequenceFileInputFormat
CombineTextInputFormat CombineTextInputFormat
DelegatingInputFormat DelegatingInputFormat
FilterOutputFormat FilterOutputFormat
LazyOutputFormat LazyOutputFormat
MultipleOutputs (covers more than mapred.*) MultipleOutputFormat
MultipleOutputs (covers more than mapred.*) MultipleSequenceFileOutputFormat
MultipleOutputs (covers more than mapred.*) MultipleTextOutputFormat
NLineInputFormat NLineInputFormat
NullOutputFormat NullOutputFormat
DBInputFormat DBInputFormat
DBOutputFormat DBOutputFormat
ComposableInputFormat ComposableInputFormat
CompositeInputFormat CompositeInputFormat
FixedLengthInputFormat FixedLengthInputFormat

It can be noted that both APIs are comparable, but you can see already that there are difference, for example, the MultipleInputs and MultipleOutputs format of mapreduce.* is much more flexible, e.g. you can have different input or outputformats at the same time in comparison to the mapred.* API, you can use MultipleOutputs in map and reduce functions and you can have different key value types.

Reading/Writing of JSON files is already implicitly covered in this API. Usually one use the TextInputFormat and parses each line as a JSON object or one uses the NLineInputFormat and parsing several lines as one JSON object. Similarly you can write JSON as output.

Hint: Many developers are not aware of all of those input and output formats available out of the box in Hadoop, but changing from one of the more known to the more unknown ones can drastically improve performance.

Avro

Apache Avro is a compact exchange format between Big Data platforms. It is much more efficient than XML (in terms of space and computational needs) and should be used for exchanging large data volumes of structured data.

Avro supports both APIs and is supported by virtually all Big Data platforms

ORC

Apache ORC is an extremely efficient data query format. It supports predicate pushdown so that data that is not relevant for a query can automatically skipped without reading it. This reduces query time significantly. More details can be found here.

It is virtually supported by all Big Data platforms.

ORC supports both APIs.

Parquet

Apache Parquet is another efficient data query format. It also supports predicate pushdown.

It is virtually supported by all Big Data platforms

Apache Parquet supports both APIs, but the mapred.* one is named Deprecated… However, it is clearly not marked as deprecated and thus both APIs are supported.

Hbase

Apache Hbase is a columnar key/value store suitable for reading frequently small data volumes out of a large dataset or for writing frequently small data volumes into a large dataset.

Hbase can be read using the TableInputFormat and written using the TableOutputFormat.

Hbase supports both APIs.

HadoopCryptoLedger

HadoopCryptoLedger supports reading of cryptoledger, such as the Bitcoin blockchain, using Big Data platforms. It supports many Big Data platforms.

HadoopCryptoLedger supports both APIs due to the need to cater for many Big Data platforms, but also commercial internal developments using different APIs.

HadoopOffice

HadoopOffice supports reading/writing of office formats, such as MS Excel, on Big Data platforms. It also supports many Big Data platforms.

HadoopOffice supports both APIs due to the need to cater for many Big Data platforms, but also commercial internal developments using different APIs.

Kudu

Apache Kudu is a format that supports querying large data volumes and writing small data volumes in large datasets. Performance-wise it is lower than Hbase for writing and lower than ORC or Parquet for querying. However, the advantage is that it is one format. It has a much larger operational complexity, because several additional non-standard daemons are added to the nodes in a Big Data cluster to manage it.

Apache Kudu only supports the mapreduce.* API.

XMLInput

Apache Mahout (a machine learning library for Hadoop) supports reading XML files using its input format.
It only supports the mapreduce.* API.

Conclusion

Hadoop has for historically reasons two APIs to describe input and output formats (as well as mapreduce jobs). These formats are leveraged by all Big Data platforms for the aforementioned benefits. However, in fact there are not so many input/output formats. Hence, developers will need to provide their own ones to be even more efficient for processing data or simply for processing legacy formats.

Basically the advice here is as follows.

  • Developers that leverage only existing input/output formats should use the mapreduce.* API wherever possible.
  • Developers that create their custom input/output format should write generic classes as much as possible independent of the API. Then, they should first provide a mapreduce.* API and afterwards a mapred.* API. These are the reasons
    • Generic classes are highly beneficial to write Big Data platform specific input/output formats. For example, Flink or Spark can use Hadoop formats, but it can be beneficial to provide them a „native“ format. It has been proven highly beneficial for me for writing the HadoopCryptoLedger and HadoopOffice library.
    • You will support all important platforms. Although Hive seems to be the only one requiring the mapred.* API, it is a very important one. It is the defacto standard to connect analytical tools via a SQL API. It has been out since a long time and is used nowadays in many organizations. Additionally, there is a lot of internal (non Hive-related) code in organizations that may still use the mapred.* API (however there are no statistics about this).
    • If you have written unit tests for one of the APIs (which is mandatory) it is easily to derive the same unit test case for the other API

Nevertheless, there is still a lot of potential of innovation in the area of input/output formats:

  • For high performance processing it would be beneficial to support complex predicate pushdown operations including statistics.

  • Another improvement for performance would be that the format on disk is the same as in-memory to avoid costly serializations/deserializations. This is currently the case for none of the Big Data platforms.

  • Formats supporting legacy formats are rare and many are missing. This means they need to be converted locally without support of a Big Data platform which introduces additional costs ansd complexity. Here, artificial intelligence may be used to extract meaningful data out of unknown or not well-specified formats.
  • Proof of validation. A lot of data needs to have more or less complex validations (e.g. validating that a date field is valid or that references to other data are correct). Usually these validations are done in the source system and in the destination system again. A proof of validation would be a cryptographically secured proof that the source system has done the validations, so that the destination system does not have to redo them again.

  • The complex quest of OLTP and OLAP loads using the same format has not been solved.

  • Formats supporting other data structures besides tables, such as graphs, hash maps etc. are rare, but could lead for certain analytic scenarios to drastically improved performance.

  • Input/output formats supporting encryption at rest and while processing do not exist. For instance, MIT CryptDB allows query over encrypted data without decrypting it. A similar approach for input/output format could signficantly increase certain security aspects of data.

  • Leveraging of new CPU security features, such as Intel SGX, to isolate processing of formats in-memory from other processes in-memory.

Advertisements

Templates, low footprint mode, improved integration with Spark for the HadoopOffice library for reading/writing Excel files on Big data platforms

Although it seems to be that it was only a small improvement, version 1.0.4 of the HadoopOffice library has a lot of new features for reading/writing Excel files:

  • Templates, so you can define complex documents with diagrams or other features in MSExcel and fill it with data or formulas from your Big Data platform in Hadoop, Spark & Co
  • Low footprint mode – this mode leverages the Apache POI event and streaming APIs. It saves CPU and memory consumption significantly at the expense of certain features (e.g. evaluation of formulas which is only supported in standard mode). This mode supports reading old MS Excel (.xls)/new MS Excel (.xlsx) and writing new MS Excel (.xlsx) documents
  • New features in the Spark 2 datasource:
    • Inferring of the DataFrame schema consisting of simple Spark SQL DataTypes (Boolean, Date, Byte, Short, Integer, Long, Decimal, String) based on the data in the Excel file
    • Improved writing of a DataFrame based on a schema with simpel Spark SQL DataTypes
    • Interpreting the first row of an Excel file as column names for the DataFrame for reading (“header”)
    • Writing column names of a DataFrame as the first row of an Excel file (“header”)
    • Support for Spark 2.0.1, 2.1, 2.2

 

Of course still other features are still usable, such as metadata reading/writing, encryption/decryption or linked workbooks, support for Hadoop MapReduce, support for Spark2 datasources and  support for Spark 1.

 

What is next?

  • Support for Apache Flink for reading/writing Excel files
  • Support for Apache Hive (Hive SerDe) for reading/writing Excel files
  • Support for digitally signing/verifying signature(s) of Excel files
  • Support for reading access files
  • … many more

Reading/Writing Excel documents with the HadoopOffice library on Hadoop and Spark – First release

Reading/Writing office documents, such as Excel, has been always challenging on Big data platforms. Although many libraries exist for reading/writing office documents, they have never been really integrated in Hadoop or Spark and thus lead to a lot of development efforts.

There are several use cases for using office documents jointly with Big data technologies:

  • Enabling the full customer-centric data science lifecycle: Within your Big Data platform you crunch numbers for complex models. However, you have to make them accessible to your customers. Le us assume you work in the insurance industry. Your Big Data platform calculates various models focused on your customer for insurance products. Your sales staff receives the models in Excel format. They can now play together with the customers on the different parameters, e.g. retirement age, individual risks etc. They may also come up with a different proposal more suitable for your customer and you want to feed it back into your Big Data platform to see if it is feasible.
  • You still have a lot of data in Excel files related to your computation. Let it be code lists, data collected manually or your existing systems simply support this format.

Hence, the HadoopOffice library was created and the first version has just been released!

It features:

Of course, further releases are planned:

  • Support for signing and verification of signature of Excel documents
  • Going beyond Excel with further office formats, such as ODF Calc
  • A Hive Serde for querying and writing Excel documents directly in Hive
  • Further examples including one for Apache Flink

Sneak Preview – HadoopOffice: Processing Office documents using the Hadoop Ecosystem – The example of Excel files

I present in this blog post the sneak preview of the hadoopoffice library that will enable you to process Office files, such as MS Excel, using the Hadoop Ecosystem including Hive/Spark.
It currently contains only an ExcelInputFormat, which is based on Apache POI.

Additionally, it contains an example that demonstrates how an Excel input file on HDFS can be converted into a simple CSV file on HDFS.

Finally, you may want to look at this wiki page that explains how you can improve the performance for processing a lot of small files, such as Office documents, on Hadoop.

Of course this is only the beginning. The following things are planned for the near future:

  • Support of other office formats as input: ODF Spreadsheets, ODF Database, MS Access, Dbase, MS Word….
  • Support of other office formats as output
  • A HiveSerde to query office documents in Hive using SQL
  • An official release on Maven Central
  • An example for Apache Spark