Ethereum & Analytics: Explore the blockchain using Hadoop, Hive, Flink and Spark

HadoopCryptoLedger release 1.1.0 added support for another well-known cryptocurrency: Ethereum and its Altcoins. Of course similar to its Bitcoin & Altcoin support you can use the library with many different frameworks in the Hadoop ecosystem:

Furthermore, you can use it with various other frameworks due to its implementation as a FileInputFormat for Big Data platforms.

Analysing blockchains, such as Ethereum or Bitcoin, using Big Data platforms brings you several advantages. You can analyse its liquidity, how many people are using it or if it is a valid blockchain at all. Furthermore, you can cross-integrate any other data source, such as currency rates, news or events from Internet of Things platforms (e.g. payments using hardware wallets). Governments can integrate it with various other sources to investigate criminal activity.

All and all it enables transparency for everyone.

Advertisements

Mapred vs MapReduce – The API question of Hadoop and impact on the Ecosystem

I will describe in this blog post the difference between the mapred.* and mapreduce.* API in Hadoop with respect to the custom InputFormats and OutputFormats. Additionally I will write on the impact of having both APIs on the Hadoop Ecosystem and related Big Data platforms, such as Apache Flink, Apache Hive and Apache Spark. Finally, I will conclude and outlook future needs for innovation.

These APIs are of key importance for nearly all Big Data applications based on popular Big Data platforms. They all leverage these APIs to connect to various file and non-filebased data sources and sinks. These APIs offer transparent access to open any custom or standard file format on any storage on-premise (e.g. HDFS) or in the cloud (e.g. Amazon S3 or Microsoft Azure Blob Storage). Furthermore, they can be used to transparently decompress files using any given codec without the need that the fileformat needs to support the codec. This allows introducing new compression formats without changing the underlying fileformat (* this might be not in all situations useful, e.g. in case of of unsplitable compression formats. Hence ORC / parquet use a different approach for parallel processing of compressed data).

Some History

Basically from a custom input/output format perspective both APIs provide essentially the same functionality and performance, but the interface is different. The APIs have merely been introduced to provide different ways to express MapReduce jobs, but this is not the focus here. Historically the idea was to move from the older mapred.* API to the new mapreduce.* API. In fact, mapred.* was already marked as deprecated with the aim to be removed. However, a very rare event happened in the software world to un-deprecate the mapred.* API. Officially, the reason was that the new mapreduce.* API was not yet finished leading to confusions and deprecated warnings. Interestingly, there was the plan already to deprecate the mapred.* API again, but it has never been executed (status: 10/2017).

Effect on Big Data platforms

I mentioned before that several open and commercial Big Data platforms essentially rely on this APIs, because they want to have out-of-the box support for various filesystems and file formats without reinventing the wheel. This is how the coexistence of the APIs affected them.

Hadoop MapReduce

Naturally, since the mapred.* and mapreduce.* API stem from Hadoop MapReduce both are supported. However, in the supplied documentation for Hadoop you find only examples using the mapreduce.* API. Popular books, such as “Hadoop – The Definitive Guide” by Tom White also only use the mapreduce.* API.

Hive

Since Hive is already a very mature product and was open sourced by Facebook rather recently after Hadoop appeared it does use the mapred.* API. This means to leverage any custom input/output format in Hive (more about this later) it needs to use the mapred.* API. If the input/output format supports only the new mapreduce.* API then an error will be thrown in Hive.

Flink

Apache Flink supports custom Hadoop Input and Output formats for the mapred.* and mapreduce.* API.

Additionally you can implement custom Flink Input and Output formats. I found the API very concise and if you have anyway already developed a reusable input/output format to support the two Hadoop APIs then it is rather trivial to create a custom Flink FileInput and FileOutput format.

Spark

Apache spark supports custom Hadoop Input and Output formats for the mapred.* and mapreduce.* API. In case of a FileInput format you can use the hadoopRDD(mapred.*)/newAPIHadoopRDD (mapreduce.*) methods of the SparkContext class. In case of a FileOutput format you can use the saveAsHadoopFile (mapred,*)/saveAsNewHadoopFile (mapreduce.*) methods of the PairRDD class.

This loads/saves data only from/to RDDs in Spark. However, in order to use the modern DataFrame (aka Dataset<Row>) API with all its features, such as compact and highly efficient in-memory representation, you need to convert them explicitly which takes time. This is, for example, not the case for Flink which always uses the highly efficient in-memory representation.

Apache Spark has a DataSource API, which can be compared to a custom InputFormat in Spark terms. However, it is not as well designed as the one for Apache Flink. For instance, most data loading/storing operations happening in the driver (the node running the program that coordinates the execution of distributed tasks), which makes the driver a bottleneck. Certain DataSources avoided this by calling the HadoopRdd/newAPIHadoopRDD methods of Spark within the data source, which distributes loading to the executors and not the driver. For instance, this is the way the HadoopCryptoLedger library to read blockchains, such as Bitcoin, did it.

Hence, at some point of time the Apache Spark project decided to add certain data sources to the Apache Spark source code and use not the public data source API and create a dedicated internal data source API bound to each Spark release. Hence, these formats did not need to use the HadoopRdd/newAPIHadoopRDD methods, but could be more native. Nevertheless, as mentioned before, the Hadoop APIs are still used to read/write from/to various different filesystem or compressed files.

Other external data sources, such as the one for the Avro format or the HadoopOffice library for reading/writing office documents, such as Excel, then also leveraged these internal APIs, which had the disadvantage that they may not work immediately for new Spark version, because the internal APIs arbitrarily changed.

In the end the Spark developers recognized that this is not an optimal situation and started thinking about a data source V2 API.

Impala

Impala supports only a fixed set of Input/Output formats and no custom input or iutput format. These fixed input/output formats are based on the Hadoop APIs for the aforementioned reasons. Nevertheless, you can use any custom input/output format by defining the tables in Hive and letting Impala read/write via Hive.

Pig

Pig was an early adaptor of the new mapreduce.* API and does not support the mapred.* API anymore for custom input/output formats. You can run PIG on different engines, such as TEZ or Spark.

Beam

Apache Beam is a programing framework in which you express computation once and it can be executed on different big data engines, such as Apache Apex, Apache Flink, Apache Spark, Google Cloud Dataflow and Apache Gearpump.

This means it can be easier to switch between different platforms depending on needs at the cost that you may not always be able to use the latest new feature of the specific engine.

Apache Beam supports reading of Hadoop InputFormats using its HadoopInputFormatIO interface. Apache Beam supports writing of Hadoop OutputFormats using its HDFSFileSink interface. Both support only the new mapreduce.* API.

Effect on file formats

Provided as part of Hadoop

Hadoop provides file formats as part of its distribution. However, there is a difference of file formats provided by mapreduce.* API (input / output) and mapred.* API as shown in the table.

Mapreduce.* API Mapred.* API
KeyValueTextInputFormat KeyValueTextInputFormat
MapFileOutputFormat MapFileOutputFormat
MultipleInputs (covers more than mapred.*) MultiFileInputFormat
SequenceFileAsBinaryInputFormat SequenceFileAsBinaryInputFormat
SequenceFileAsBinaryOutputFormat SequenceFileAsBinaryOutputFormat
SequenceFileAsTextInputFormat SequenceFileAsTextInputFormat
SequenceFileInputFormat SequenceFileInputFormat
SequenceFileOutputFormat SequenceFileOutputFormat
TextInputFormat TextInputFormat
TextOutputFormat TextOutputFormat
CombineFileInputFormat CombineFileInputFormat
CombineSequenceFileInputFormat CombineSequenceFileInputFormat
CombineTextInputFormat CombineTextInputFormat
DelegatingInputFormat DelegatingInputFormat
FilterOutputFormat FilterOutputFormat
LazyOutputFormat LazyOutputFormat
MultipleOutputs (covers more than mapred.*) MultipleOutputFormat
MultipleOutputs (covers more than mapred.*) MultipleSequenceFileOutputFormat
MultipleOutputs (covers more than mapred.*) MultipleTextOutputFormat
NLineInputFormat NLineInputFormat
NullOutputFormat NullOutputFormat
DBInputFormat DBInputFormat
DBOutputFormat DBOutputFormat
ComposableInputFormat ComposableInputFormat
CompositeInputFormat CompositeInputFormat
FixedLengthInputFormat FixedLengthInputFormat

It can be noted that both APIs are comparable, but you can see already that there are difference, for example, the MultipleInputs and MultipleOutputs format of mapreduce.* is much more flexible, e.g. you can have different input or outputformats at the same time in comparison to the mapred.* API, you can use MultipleOutputs in map and reduce functions and you can have different key value types.

Reading/Writing of JSON files is already implicitly covered in this API. Usually one use the TextInputFormat and parses each line as a JSON object or one uses the NLineInputFormat and parsing several lines as one JSON object. Similarly you can write JSON as output.

Hint: Many developers are not aware of all of those input and output formats available out of the box in Hadoop, but changing from one of the more known to the more unknown ones can drastically improve performance.

Avro

Apache Avro is a compact exchange format between Big Data platforms. It is much more efficient than XML (in terms of space and computational needs) and should be used for exchanging large data volumes of structured data.

Avro supports both APIs and is supported by virtually all Big Data platforms

ORC

Apache ORC is an extremely efficient data query format. It supports predicate pushdown so that data that is not relevant for a query can automatically skipped without reading it. This reduces query time significantly. More details can be found here.

It is virtually supported by all Big Data platforms.

ORC supports both APIs.

Parquet

Apache Parquet is another efficient data query format. It also supports predicate pushdown.

It is virtually supported by all Big Data platforms

Apache Parquet supports both APIs, but the mapred.* one is named Deprecated… However, it is clearly not marked as deprecated and thus both APIs are supported.

Hbase

Apache Hbase is a columnar key/value store suitable for reading frequently small data volumes out of a large dataset or for writing frequently small data volumes into a large dataset.

Hbase can be read using the TableInputFormat and written using the TableOutputFormat.

Hbase supports both APIs.

HadoopCryptoLedger

HadoopCryptoLedger supports reading of cryptoledger, such as the Bitcoin blockchain, using Big Data platforms. It supports many Big Data platforms.

HadoopCryptoLedger supports both APIs due to the need to cater for many Big Data platforms, but also commercial internal developments using different APIs.

HadoopOffice

HadoopOffice supports reading/writing of office formats, such as MS Excel, on Big Data platforms. It also supports many Big Data platforms.

HadoopOffice supports both APIs due to the need to cater for many Big Data platforms, but also commercial internal developments using different APIs.

Kudu

Apache Kudu is a format that supports querying large data volumes and writing small data volumes in large datasets. Performance-wise it is lower than Hbase for writing and lower than ORC or Parquet for querying. However, the advantage is that it is one format. It has a much larger operational complexity, because several additional non-standard daemons are added to the nodes in a Big Data cluster to manage it.

Apache Kudu only supports the mapreduce.* API.

XMLInput

Apache Mahout (a machine learning library for Hadoop) supports reading XML files using its input format.
It only supports the mapreduce.* API.

Conclusion

Hadoop has for historically reasons two APIs to describe input and output formats (as well as mapreduce jobs). These formats are leveraged by all Big Data platforms for the aforementioned benefits. However, in fact there are not so many input/output formats. Hence, developers will need to provide their own ones to be even more efficient for processing data or simply for processing legacy formats.

Basically the advice here is as follows.

  • Developers that leverage only existing input/output formats should use the mapreduce.* API wherever possible.
  • Developers that create their custom input/output format should write generic classes as much as possible independent of the API. Then, they should first provide a mapreduce.* API and afterwards a mapred.* API. These are the reasons
    • Generic classes are highly beneficial to write Big Data platform specific input/output formats. For example, Flink or Spark can use Hadoop formats, but it can be beneficial to provide them a „native“ format. It has been proven highly beneficial for me for writing the HadoopCryptoLedger and HadoopOffice library.
    • You will support all important platforms. Although Hive seems to be the only one requiring the mapred.* API, it is a very important one. It is the defacto standard to connect analytical tools via a SQL API. It has been out since a long time and is used nowadays in many organizations. Additionally, there is a lot of internal (non Hive-related) code in organizations that may still use the mapred.* API (however there are no statistics about this).
    • If you have written unit tests for one of the APIs (which is mandatory) it is easily to derive the same unit test case for the other API

Nevertheless, there is still a lot of potential of innovation in the area of input/output formats:

  • For high performance processing it would be beneficial to support complex predicate pushdown operations including statistics.

  • Another improvement for performance would be that the format on disk is the same as in-memory to avoid costly serializations/deserializations. This is currently the case for none of the Big Data platforms.

  • Formats supporting legacy formats are rare and many are missing. This means they need to be converted locally without support of a Big Data platform which introduces additional costs ansd complexity. Here, artificial intelligence may be used to extract meaningful data out of unknown or not well-specified formats.
  • Proof of validation. A lot of data needs to have more or less complex validations (e.g. validating that a date field is valid or that references to other data are correct). Usually these validations are done in the source system and in the destination system again. A proof of validation would be a cryptographically secured proof that the source system has done the validations, so that the destination system does not have to redo them again.

  • The complex quest of OLTP and OLAP loads using the same format has not been solved.

  • Formats supporting other data structures besides tables, such as graphs, hash maps etc. are rare, but could lead for certain analytic scenarios to drastically improved performance.

  • Input/output formats supporting encryption at rest and while processing do not exist. For instance, MIT CryptDB allows query over encrypted data without decrypting it. A similar approach for input/output format could signficantly increase certain security aspects of data.

  • Leveraging of new CPU security features, such as Intel SGX, to isolate processing of formats in-memory from other processes in-memory.

Spark+Scala+Graphx: Analyzing the Bitcoin Transaction Graph

The hadoopcryptoledger library provides now an example how you can generate a Bitcoin Transaction Graph using the Big Data graph analysis technologies Spark+Scala+Graphx. Basically it demonstrates how to read the Bitcoin Blockchain from HDFS, transform it into a graph with Bitcoin addresses as vertices and transactions between them as edges. The example returns the 5 top bitcoin addresses having the most input transactions. This could indicate that they belong to Mixing services that try to obfuscate transactions between two addresses. The graph exemplified in the following figure showing four vertices with transactions between them:

transactiongraph

Of course this is just one example. You can think about numerous of other analysis related to this graph using algorithms such as strongly connected components or PageRank. Particularly if you connect it with other data that you collect related to the blockchain. You can also use this graph to do visual analytics on it.

In the coming weeks, further extensions are planned to be published:

  • Some common analytics pattern to analyze the Bitcoin economy

  • Some technical patterns, such as Bitcoin block validation

  • A flume source for receiving new Bitcoin blocks including Economic and technical consensus (storing and accessing it in the Hadoop ecosystem, e.g. in Hbase)

  • Adding support for more crypto ledgers, such as Ethereum

Analyzing the Bitcoin Blockchain using the Hadoop Ecosystem – A first Approach

Bitcoin and other crytocurrencies have drawn a lot of attention of companies, public organizations and individuals. While many use cases exists there is still a long road ahead to make them part of everybody’s life.

The recently released first version of the open source hadoopycryptoledger library is a first attempt to make this happen. It currently allows analyzing the Bitcoin blockchain together with any data using Hadoop ecosystem tools. The Bitcoin blockchain is a distributed ledger containing all transactions executed over the Bitcoin network.

Hence, virtually all use cases related to analysis of the Bitcoin blockchain are possible. Some examples:

  • Predict Bitcoin exchange prices by analysing the Blockchain together with pricing information from Bitcoin exchanges
  • Explore relationships between counterparties in the blockchain
  • Explore impact of Bitcoin miners on the Bitcoin ecosystem
  • Trace Bitcoin money flows around the network
  • Link news events with Bitcoin blockchain data
  • Link economic data with Blockchain transactions

Currently the library provides a Hadoop File Format to analyze the Blockchain with any Hadoop application. For example, one can develop a Hadoop MapReduce, Spark or TEZ josb.

There are several enhancements planned for the library over the coming weeks, such as

  • Provide an example how to use Spark with the hadoopcryptoledger library
  • Integration of Blockchain data into Hive to enable end users to use SQL queries to analyze the blockchain
  • A flume source for receiving new Bitcoin blocks
  • Adding support for more crypto ledgers, such as Ethereum

Hive Optimizations with Indexes, Bloom-Filters and Statistics

This blog post describes how Storage Indexes, Bitmap Indexes, Compact Indexes, Aggregate Indexes, Covering Indexes/Materialized Views, Bloom-Filters and statistics can increase performance with Apache Hive to enable a real-time datawarehouse. Furthermore, I will address how index-paradigms change due to big data volumes. Generally it is recommended to use less traditional indexes, but focus on storage indexes and bloom filters. Especially bloom filters are a recommendation for any Big Data warehouse. Additionally, I will explain how you can verify that your query benefits from the indexes described here. I expect that you have some experiences with standard warehousing technologies and know what Hadoop and Hive are. Most of the optimizations are only available in newer versions of Hive (at least 1.2.0) and it is strongly recommended to upgrade to this version, because it enables interactive queries and performance experiences similar to specialized data warehouse technologies.

The Use Case

We assume a simple use case here which is simply one customer table with the following fields:

Name Type
customerId int
gender tinyint
age tinyint
revenue decimal(10,2)
name varchar(100)
customerCategory int

What Changes with Big Data with respect to Indexes and Statistics

Firstly, you need to know that even if you have a Big Data technology, such as Hadoop/Hive, you still need to process your data intelligently to execute efficient queries. While Hadoop/Hive certainly scale to nearly any amount of data, you need to do optimizations in order to process it quickly. Hence, similar approaches, but also new ones exists compared to traditional warehousing technologies. The main difference to a warehouse is that block sizes are usually significantly larger than in standard warehousing. For example, the current default block size in Hive when using the ORC-format for database blocks, which is highly recommended for Big Data Enterprise Warehousing formats, is 256 MB compared to around 4 MB in traditional databases. This means you should not even bother to optimize for tables which do fit in one block, because most of the optimization technologies aim at avoiding to read all data blocks in a table. If you just have one block for a table then you need to read it anyway if you access the table and optimization techniques would make no sense.

Storage-Index

The storage index is only available if your table is in ORC-Format in Hive. Similarly the Parquet-format has also a storage index (min/max index), but many applications including Hive do not leverage it currently for Parquet. The storage index let you skip blocks where a value is not contained without needing to read the block itself. It is most suitable for numeric values (double, float, int, decimal etc.). It kicks in if you have a query using where together with the <,>,= operator or joins. For example:

SELECT * FROM table WHERE customerId=1000

or

SELECT * FROM table WHERE age>20 AND age < 40.

Keep in mind that for an effective storage index you need to insert the data into the table sorted on the columns for which you want to leverage the storage index. It is much less effective on unsorted tables, because it contains the min-max values of the column. If you have a badly sorted table the min-max value for all blocks will be overlapping or even the same and then the storage index is of no use. This is how you can create a table with storage index in Hive (setting tblproperties ‘orc.create.index’=’true’ ):

CREATE TABLE CUSTOMER (

customerId int,

gender tinyint,

age tinyint,

revenue decimal(10,2),

name varchar(100),

customerCategory int )

STORED AS ORC

TBLPROPERTIES ( ‘orc.compress’=’SNAPPY’,

‘orc.create.index’=’true’,

‘orc.bloom.filter.columns’=”,

‘orc.bloom.filter.fpp’=’0.05’,

‘orc.stripe.size’=’268435456’,

‘orc.row.index.stride’=’10000’);

You may want to insert the data sorted on the columns for which you want to use the storage index. It is recommended to do this only for 1-2 columns, depending on the data you may want to include more. It is recommended to set the setting ‘hive.enforce.sorting’ to ‘true’ and describe in the create table statement which columns should be sorted (e.g. using clustered by (userid) sorted by (age)). You can also insert them sorted by using order by or sort by in the insert/select statement. Alternatively you may want to insert the data clustered by certain columns that are correlated. This clustering has to be done before inserting data into Hive.

Keep in mind that depending on your data you may also tune the index by changing orc.row.index.stride.

Bloom-Filter

Bloom filters are relatively new feature in Hive (1.2.0) and should be leveraged for any high-performance applications. Bloom filter are suitable for queries using where together with the = operator:

SELECT AVG(revenue) WHERE gender=0

A bloom filter can apply to numeric, but also non-numeric (categorical) data, which is an advantage over the storage index. Internally, a bloom filter is a hash value for the data in a column in a given block. This means you can „ask“ a bloom filter if it contains a certain value, such as gender=male, without you needing to read the block at all. This obviously increases performance significantly, because most of the time a database is busy with reading non-relevant blocks for a query.

You can tune a bloom filter by configuring the false positive rate (‘orc.bloom.filter.fpp’), but you only should deviate from the default if you have understood how bloom filters work in general.

You can specify a bloom filter when using the create or alter statement of the table by setting the TBL property ‘orc.bloom.filter.columns’ to the columns for which you want to create the bloom filter. It is only available if you use the ORC format:

CREATE TABLE CUSTOMER (

customerId int,

gender tinyint,

age tinyint,

revenue decimal(10,2),

name varchar(100),

customerCategory int )

STORED AS ORC

TBLPROPERTIES

( ‘orc.compress’=’SNAPPY’,

‘orc.create.index’=’true’, ‘orc.bloom.filter.columns’=’gender’,

‘orc.bloom.filter.fpp’=’0.05’,

‘orc.stripe.size’=’268435456’,

‘orc.row.index.stride’=’10000’);

By default you should use a bloom filter instead of a Bitmap Index or a Compact Index if you have select queries using where together with the = operator or when (equi-)joining large tables. You should increase effectiveness of the bloom filter by inserting data only sorted on the columns for which you define the bloom filter to avoid that all blocks of a table contain all distinct values of the column. It is recommended to set the setting ‘hive.enforce.sorting’ to ‘true’ and describe in the create table statement which columns should be sorted (e.g. using clustered by (userid) sorted by (gender)). You can also insert them sorted by using order by or sort by in the insert/select statement. Alternatively you may want to insert the data clustered by certain columns that are correlated. This clustering has to be done before inserting data into Hive.

Statistics

Another important aspect is to generate statistics on tables and columns by using the following commands:

ANALYZE TABLE CUSTOMER COMPUTE STATISTICS

ANALYZE TABLE CUSTOMER COMPUTE STATISTICS FOR COLUMNS

If you have a lot of columns you may only calculate statistics for selected columns. Additionally, Hive cannot currently generate statistics for all column types, e.g. timestamp. Basically, statistics enables Hive to optimize your queries using the cost-based optimizer, which is available in newer versions of Hive. This means usually significant performance improvements. However, you need to recompute statistics every time you change the content of a table. You can also decide only to recompute statistics for selected partitions, which make sense if you only change a partition.

The corresponding commands for tables including partitions are the following:

ANALYZE TABLE CUSTOMER PARTITION(ds, hr) COMPUTE STATISTICS

ANALYZE TABLE CUSTOMER PARTITION(ds, hr) COMPUTE STATISTICS FOR COLUMNS

Never Hive versions with Hbase as a meta data store allow caching of statistics:

ANALYZE TABLE CUSTOMER CACHE METADATA

Aggregate Index

The Aggregate Index has no counterpart in the traditional database world. Even within Hive it is one of the least documented and probably one of the least used ones. Most of the documentation can be derived from analyzing the source code.

An Aggregate Index is basically a table with predefined aggregations (e.g. count,sum,average etc.) of a certain column, e.g. gender, which are grouped by the same column. For example, the following query leverages an Aggregate Index:

SELECT gender, count(gender) FROM CUSTOMER GROUP BY gender;

The following query does NOT leverage an Aggregate Index:

SELECT customerCategory , count(gender) FROM CUSTOMER GROUP BY customerCategory;

Hence, the Aggregate Index is quite limited and only suitable in niche scenarios. However, you can create one as follows in Hive:

CREATE INDEX idx_GENDER

ON TABLE CUSTOMER(GENDER) AS ‘org.apache.hadoop.hive.ql.index.AggregateIndexHandler’

WITH DEFERRED REBUILD

IDXPROPERTIES(‘AGGREGATES’=’count(gender)’)

STORED AS ORC TBLPROPERTIES

( ‘orc.compress’=’SNAPPY’,

‘orc.create.index’=’true’);

After the index is created you need to update them once you changed the data in the source table as follows:

ALTER INDEX idx_GENDER ON CUSTOMER REBUILD;

You can choose to auto-update the index, however in a data warehouse environment it makes usually more sense if you do it manually. Keep in mind that you always should as little indexes as possible, because having too many indexes on a table affects seriously performance.

The Aggregate Index is certainly an interesting one, but the community need to put some more thinking into it to make it more effective, e.g. learning from past aggregate queries to have all the required aggregates stored appropriately. This should be not a dump caching index, but make it smart. For instance, if users often calculate the aggregates for customer numbers on a certain continent then the individual countries should be added automatically to the index. Finally, the Aggregate Index could be augmented with a counting bloom filter, but this one is not implemented in Hive.

Covering Index/Materialized Views

Hive has currently no command to create and maintain automatically a covering index or materialized view. Nevertheless, these are simply subsets of the table they are referring to. Hence, you can imitate the behavior by using the following commands:

CREATE TABLE CUSTOMER2040

STORED AS ORC

TBLPROPERTIES (

‘orc.compress’=’SNAPPY’,

‘orc.create.index’=’true’, ‘orc.bloom.filter.columns’=’customerid’,

‘orc.bloom.filter.fpp’=’0.05’,

‘orc.stripe.size’=’268435456’,

‘orc.row.index.stride’=’10000’)

AS SELECT * FROM CUSTOMER WHERE (age>=20) AND (age<=40)

This creates a “materialized view” on the customers aged between 20-40. Of course you have to update them manually, which can be a little bit tricky. You may have a flag in the original table to indicate if a customer has been upgraded from the last time you created a „materialized view“ from it. You may partition the alternative table differently to gain performance improvements.

Bitmap Index

A Bitmap Index in hive can be compared to the Bitmap Index in traditional databases. However, one should evaluate if it can be replaced by a Bloom-Filter, which performs significantly better. The Bitmap-index is suitable for queries using where together with the = operator.

It is suitable if the column on which you put the Bitmap-index has only few distinct values, e.g. it is a gender column or a customer category.

Similar to the storage index, it is most effective if you sort the data in this column, because otherwise every block contains values in the bitmap index and hence every block is read anyway. It is recommended to set the setting ‘hive.enforce.sorting’ to ‘true’ and describe in the create table statement which columns should be sorted. Alternatively, you can create reasonably smaller block sizes (by setting orc.stripe.size) for the table to which the index applies to reduce the likelihood that one block contains all types of values.

Internally, a bitmap index in Hive is simply another table, which contains the unique values of the column together with the block where they are stored. Of course, you can also define column-combinations. For example, you can put an index covering gender and customer category. This is most effective if the where part of your query discriminates often against both at the same time.

Since the bitmap index is „just“ another table you need to also select the right format as for normal tables. It is recommended to use the same as for normal tables, such as ORC or Parquet. You may also want to decide to compress the index, but only if it is really large. If your table is already partitioned the index is also partitioned. You may change this partition structure for the index to increase performance.

You can create a bitmap index as follows in Hive (ORC format, compressed with Snappy):

CREATE INDEX idx_GENDER

ON TABLE CUSTOMER (GENDER) AS ‘org.apache.hadoop.hive.ql.index.bitmap.BitmapIndexHandler’

WITH DEFERRED REBUILD

STORED AS ORC

TBLPROPERTIES (

‘orc.compress’=’SNAPPY’,

‘orc.create.index’=’true’);

After the index is created you need to update them once you changed the data in the source table as follows:

ALTER INDEX idx_GENDER ON CUSTOMER REBUILD;

You can choose to auto-update the index, however in a data warehouse environment it makes usually more sense if you do it manually.

Compact Index

The Compact Index can be compared to a normal index in a traditional database. Similarly to a Bitmap Index one should evaluate if it makes sense to use a bloom filter and storage indexes instead. In most of the cases the Compact Index is not needed anymore.

A Compact Index is suitable for columns containing a lot of distinct values, e.g. customer identifier. It is recommended to use a Compact Index for numeric values. It is also suitable for queries using where with the <,>,= operators or joins. Internally, a Compact Index is represented in Hive as a table sorted table containing all the values of the column to be indexed and the blocks where they occur. Of course you can define an index on several columns, if you use them very often together in the where part of a query or in the join-part.

Since the index is a table, you should store it in ORC format and optionally compress it. If your table is already partitioned the index is also partitioned. You may change this partition structure for the index to increase performance. You can define that the index can be used for compact binary search by setting the IDXPROPERTIES ‘hive.index.compact.binary.search’=’true’. This optimizes search within the index. You do not need to sort the data, because this index is sorted.

You can create a Compact Index as follows in Hive (ORC format, compressed with Snappy, binary search enabled):

CREATE INDEX idx_CUSTOMERID

ON TABLE CUSTOMER (CUSTOMERID) AS ‘org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler’

WITH DEFERRED REBUILD

IDXPROPERTIES (‘hive.index.compact.binary.search’=’true’)

STORED AS ORC

TBLPROPERTIES (

‘orc.compress’=’SNAPPY’,

‘orc.create.index’=’true’);

After the index is created you need to update them once you changed the data in the source table as follows:

ALTER INDEX idx_CUSTOMERID ON CUSTOMER REBUILD;

You can choose to auto-update the index, however in a data warehouse environment it makes usually more sense if you do it manually.

Verifying Index-and Bloom-Filter Usage in Queries

You can verify that your bloom filters and storage indexes work by setting the following property before executing your query in beeline (assuming you use TEZ as an execution engine):

set hive.tez.exec.print.summary=true;

Afterwards you execute the query in beeline and get at the end some summary statistics about how many bytes have been read, written and how many rows have been processed. This will help you to identify further optimizations of your storage index or bloom filters, such as inserting data sorted on a selected relevant subset of columns. Alternatively, you can use TEZ-UI to see a summary of these statistics.

Similarly you can verify the usage of traditional indexes, such as the Bitmap-Index or the Compact-Index. Additionally, you can get information using EXPLAIN or EXPLAIN DEPENDENCY on your query.

Conclusions

We presented here different approaches to optimize your Big Data data warehouse Hive. Especially the new version of Hive (1.2.0) offers performance comparable to traditional data warehouses or appliances, but suitable for Big Data. It also came clear that traditional warehouse approaches for optimizations are supported, but are deprecated in the Big Data analytics world:

  • Bitmap Indexes can be replaced by a bloom filter
  • Compact Indexes can be replaced by a bloom filter and storage indexes

In some rare cases Compact Indexes can still make sense in addition to bloom filters and storage indexes. Examples for these cases are OLTP scenarios, where one looks up one specific row by a primary key or similar. Alternatively, you may duplicate tables with different sort order to improve performance. Niche methods, such as Aggregate Indexes, are promising, but need to mature more. However, if you have a bad data model consisting only of Strings or varchars then you will face decreased performance anyway. Always use the right data type for your data, particularly the numeric data. You may also want to replace dates with ints. I have not addressed here several other optimization methods that are available in Hive:

  • Think about pre-joining tables into a flat table. Storage indexes and bloom filters work efficiently for flat tables instead of joining every time when needed. This is a paradigm shift from traditional databases.
  • Understand how the Yarn Scheduler (Fair or Capacity) works for optimal concurrency and high performance queries
  • hive.execution.engine: You should set it to TEZ for most of your scenarios. Alternatively you may set Spark, but this will mean some optimizations do not work.
  • Prewarm TEZ containers.
  • Use TEZ > 0.8 for interactive subsecond queries (TEZ Service) together with LLAP
  • Do not use single insert, updates, deletes for analytics tables, but bulk operations, such as CREATE TABLE AS SELECT … (CTAS).
  • Use in-database analytics and machine learning algorithms provided by HiveMall
  • Use in-memory techniques for often used tables/partitions, e.g. with Apache Ignite
  • Store lookup tables in Hbase: You can create external Tables in Hive pointing to tables in Hbase. These show a high performance for lookup operations of reference data, e.g. in case you want to validate incoming data on a row by row base in staging tables avoiding costly joins.
  • Use the Mapside join hint to load (small) dimension tables in-memory
  • Increase replication factor of selected tables,partitions,buckets,indexes: The more copies you have on different nodes the more performance you gain with the disadvantage of wasting more space. You can use the hadoop dfs -setrep -R -w X /path/to/hive/tableinhdfs (X is the replication count, 3 is the default)
  • Use partitions: Partitions increase significantly your performance, because only a selected subset of the data needs to be read. Be aware that partitions are only enabled when you use it in the where clause and in case of joins additionally in the on-clause (otherwise you will do anyway a full table scan for older Hive versions). Example for a transaction table partitioned by transaction date:
    • SELECT * FROM CUSTOMER T1 LEFT JOIN TRANSACTIONS T2 ON (T1.CUSTOMERID=T2.CUSTOMERID AND T2.DATE=20140101) WHERE T2.DATE=20140101
  • Use compression: We used here in the example Snappy compression, because it is fast. However, it does not compress as much as zlib. Hence, for older data in different partitions you may use zlib instead of Snappy, because they are accessed less often.
  • Use Bucketing: Bucketing is suitable for optimizing map-side joins or if you want to sample data . For instance, in an extremely large table you may only select a sample of data and compute the average for this sample, because the result will be similar to the one obtained if you calculate the average over the full table:
    • SELECT AVG (AGE) FROM CUSTOMER TABLESAMPLE(BUCKET 3 OUT OF 100)

Update: Next Generation Big Data Lab V2 in the Cloud

Recently, I presented the first version of the Big Data Lab in the cloud. Now I extended this version and kept most of the features of the previous version. However, I provide upgrades for important software components. It still runs on Amazon EMR, but with the newest Amazon AMI (including Amazon Linux). It now features Hadoop 2.4, Spark 1.1.1, R 3 and for the first time SparkR, so you can do in-memory  analytics in R by leveraging your whole Big Data cluster.

You can find the new version here.

Attention: It may not yet work in all availability zones, but has been tested successfully in Ireland.

In future blog posts, I will show how to write R scripts that distribute machine learning computation in R libraries to different nodes in your Big Data cluster by leveraging Apache Spark in-memory analytics.

Creating a Big Data lab in the Cloud using Amazon EMR

This first blog post is about creating your own Big Data lab in the Cloud using Amazon EMR. Follow my instructions here.

These instructions allow you within 15 minutes the following:

  • You can use the analytics language R in a browser to access the full functionality of Hadoop/Spark, Hive/Shark (data warehouse), Rhipe (MapReduce for R), RMR (Map Reduce for R)
  • Leverage the unlimited data and computing power of the Amazon Elastic Map Reduce cloud
  • Create reports about your analytics results that you can distribute in any format
  • Data Scientists simply use their browser to work with the data
  • They can come up with new models based on your data in the organization to enhance your business processes and applications
    • Improved personalized advertisement
    • Improved sales targeting
    • Predictive Maintenance for your assets
    • User preference learning
    • Gamification
    • Resilience: Detect disasters in your software systems before they happen