You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Contributors (Alphabetical): Vandana Ayyalasomayajula, Francis Liu, Olga Natkovich, Andreas Neumann

Problem:

A business models its data as a dimension store. In this data model, there are two types of tables: fact and dimension. The former containing atomic information about an event while the latter further describes information about an event for a given context. In the context of the business, the fact table is a click stream and one of the dimensions is Ad Campaign information.

Naive schema as an example:

Clickstream

clickId

campaignId

timestamp

Campaign

campaignId

pricePerClick

effectiveTime

effectiveTime - a campaign's attributes can change over time. This field defines the starting time updates for the campaign will be in effect.

A common use case for these tables would be to determine the revenue generated for a particular month. This can be achieved by filtering the clickstream to clicks within the month we are interested in then joining the results with the correct campaign based on the "campaignId" and "effectiveTime".

Clickstream

clickId

campaignId

timestamp

1

a

12/1/11 9:33

2

a

12/1/11 10:30

3

a

12/1/11 12:00

Campaign

campaignId

pricePerClick

effectiveTime

a

1

12/1/11 0:00

a

2

12/1/11 11:00

a

2

12/1/11 14:00

An inner join should yield:

clickId

timestamp

campaignId

pricePerClick

effectiveTime

1

12/1/11 9:33

a

1

12/1/11 0:00

2

12/1/11 10:30

a

1

12/1/11 0:00

3

12/1/11 12:00

a

2

12/1/11 11:00

*note that 12:00 is joined with the updated campaign

Revenue is: 4

Another thing to bear in mind is that the system supports retroactive updates to campaign information. Such that the previously processed campaigns may be updated with new pricePerClick values.

i.e

Campaign

campaignId

pricePerClick

effectiveTime

a

2

12/1/11 0:00

a

2

12/1/11 11:00

a

2

12/1/11 14:00

Previously calculated months which are affected by the change will have to be reprocessed. In this case the new inner join should yield:

clickId

timestamp

campaignId

pricePerClick

effectiveTime

1

12/1/11 9:33

a

2

12/1/11 0:00

2

12/1/11 10:30

a

2

12/1/11 0:00

3

12/1/11 12:00

a

2

12/1/11 11:00

Revenue is: 6

All changes applied to the campaign table's row will be included as part of reprocessing, since all the columns in a given row are related any changes are corrections thus invalidating revisions apart from the latest.

Consider the expanded Campaign table:

Campaign

campaignId

pricePerClick

effectiveTime

owner

a

1

12/1/11 0:00

foo

a

2

12/1/11 11:00

foo

a

2

12/1/11 14:00

foo

Price revised:

Campaign

campaignId

pricePerClick

effectiveTime

owner

a

2

12/1/11 0:00

foo

a

2

12/1/11 11:00

foo

a

2

12/1/11 14:00

foo

Reprocessing the inner join would yield:

clickId

timestamp

campaignId

pricePerClick

effectiveTime

owner

1

12/1/11 9:33

a

2

12/1/11 0:00

foo

2

12/1/11 10:30

a

2

12/1/11 0:00

foo

3

12/1/11 12:00

a

2

12/1/11 11:00

foo

Owner revised:

Campaign

campaignId

pricePerClick

effectiveTime

owner

a

2

12/1/11 0:00

bar

a

2

12/1/11 11:00

bar

a

2

12/1/11 14:00

bar

Reprocessing a second time would yield:

clickId

timestamp

campaignId

pricePerClick

effectiveTime

owner

1

12/1/11 9:33

a

2

12/1/11 0:00

bar

2

12/1/11 10:30

a

2

12/1/11 0:00

bar

3

12/1/11 12:00

a

2

12/1/11 11:00

bar

As stated earlier it should not matter if a single reprocessing picks up both revisions to the table as the latest revision invalidates previous revisions.

Solution

HCatalog's HBaseStorageHandler has two features which makes it's solution for this problem compelling: random reads and snapshots.

Random reads

HBase supports random reads, having random reads enables us to perform efficient skewed joins between the fact (clickstream) and dimension (campaign) table. Since HBase only supports random lookups by row keys, for this use can we can assign the composite key : as our row key.

ie

Campaign

rowKey

pricePerClick

a:1322697600

1

a:1322737200

2

a:1322748000

2

Filtering itself is trivial so we will focus on how the random read functionality will look like:

...
Click click = ... //click entry
HTable table = ...
byte[] lbKey = Bytes.toBytes(click.getCampaignID()+":");
byte[] ubKey = Bytes.toBytes(click.getCampaignID()+":"+(click.getTimestamp()+1)); //ubKey is exclusive
Scan scan = new Scan(lbKey,ubKey);
...
ResultScanner scanner = table.getScanner(scan);
Result result;
while((result = scanner.next()) != null);
...

In Pig we can implement the same functionality as a UDF:

org.apache.hcatalog.hbase.pig.BoundedCeilLookup(lbKey:chararray , ubKey:charray, tableName:charrary, selected_columns....)

"lbKey" and "ubKey" work the same way as the previous java example, except in this case ubKey is inclusive for convenience. While "selected_columns" is used to define the hcatalog columns that will be projected.

Sample usage:
A = LOAD 'click_data' AS (clickId: chararray, campaignId: chararray, timestamp: long);
#Filter
B = FILTER A BY (timestamp > LB) AND (timestamp < UB);
#Skewed join
C = FOREACH B GENERATE clickId, timestamp,
        org.apache.hcatalog.hbase.boundedCeilLookup(CONCAT(campaignId,':'),CONCAT(campaignId,CONCAT(':',(chararray)timestamp),
            tableName:charrary, campaignId:charray, pricePerClick:double, effectiveTime:long);
D = FILTER C BY NOT isEmpty(campaignId);

Snapshots

Unlike performing joins against other files on HDFS, HBase tables are mutable. Hence random reads using precisely the same row key may return different results (ie one job is updating a table while another is reading from it). HCatalog's integration with HBase introduces the notion of snapshots, which guarantees consistent reads over a HBase Table during the lifetime of a MR job. Snapshots can also be shared, guaranteeing consistency over a DAG of MR jobs. In the context of the problem snapshots guarantee that retroactive updates do not affect jobs that are running concurrently.

More complicated Pig jobs may need the same snapshot to be reused a number of times. The HBase StorageHandler exposes Java apis for creating and reusing snapshots. We can mimic similar functionality in pig.

We can extend the previous lookup udf to support specifying a snapshotName:

org.apache.hcatalog.hbase.pig.BoundedCeilLookup(snapshotName:chararray, lbKey:chararray , ubKey:charray, tableName:charrary, selected_columns....)

Once invoked the UDFs will search for the named snapshot. If none is found a snapshot is created and stored in the job's temporary work directory which can then be reused by other UDFs using the specified snapshotName.

Sample usage:
A = LOAD 'click_data1' AS (clickId: chararray, campaignId: chararray, timestamp: long);
B = LOAD 'click_data1' AS (clickId: chararray, campaignId: chararray, timestamp: long);
#Skewed join, snapshot will be take and stored as 'my_snapshot'
C = FOREACH A GENERATE clickId, timestamp,
        org.apache.hcatalog.hbase.boundedCeilLookup('my_snapshot',CONCAT(campaignId,':'),CONCAT(campaignId,CONCAT(':',(chararray)timestamp),
            tableName:charrary, campaignId:charray, pricePerClick:double, effectiveTime:long);
D = FILTER C BY NOT isEmpty(campaignId);
#Skewed join, previous snapshot 'my_snapshot' will be reused
E = FOREACH B GENERATE clickId, timestamp,
        org.apache.hcatalog.hbase.boundedCeilLookup('my_snapshot',CONCAT(campaignId,':'),CONCAT(campaignId,CONCAT(':',(chararray)timestamp),
            tableName:charrary, campaignId:charray, pricePerClick:double, effectiveTime:long);
F = FILTER E BY NOT isEmpty(campaignId);
  • No labels