Books about Hadoop are usually focused on solving artificial problems. Learning from them creates false feeling of the simplicity of commercial coding with MapReduce. When time to solve real-world problems comes, it turns out that straightforward solution either doesn’t fit in resource limits (typically RAM) or runs for unacceptably long time. It takes months of practicing by trial and error before programmers accumulate enough experience to be able to create satisfactory solutions for real-world problems. In this article, I’ll describe example of recurring problem of exactly such kind — join of one to many relation. A series of issues will be identified and resolved, eventually leading to commercial-grade solution.

Our problem deals with joining two tables, one of which is thousands times larger than another one. Such problem is typical for MapReduce. For example, consider following practical problem. We are building advertising engine and we are interested in data analysis of user activity. Our primary table consists of immense amount of sessions. One session is a sequence of events related to a single user during short period of time, such as when some URL was loaded, page was scrolled down, advertisement was displayed or clicked. Such sessions are the basis for data analysis and user segmentation. Total size of sessions table may be extremely huge — dozens of terabytes stored in sequence files. Sessions are different in size: some users open website and leave soon, others do moderate amount of clicking, and finally, there are bad bots who automatically generate hundreds of clicks per each session.

In order to perform session classification and analysis, we would like to join sessions with information about users, such as gender, age group and set of interests. This information comes from various external data providers and can be joined with sessions by means of uid identifier, which typically is some sort of tracking cookie. Users table is much smaller than sessions table: relation between them is one-to-many, so that single user may generate from zero to as many as thousands of sessions. In the end, we would like to get a copy of sessions table where each session is enriched with information about corresponding user.

join problem

If you completed any random tutorial on Hadoop, then you should immediately come to the naive solution. Let us create M/R job that accepts both data sources on input simultaneously. Mapper doesn’t need to do anything special except emitting records <uid, Session> or <uid, User> depending on the type of the input value. It is the reducer that does actual join job. It buffers all data about single user: user object goes into user variable, and all sessions go into sessions array. When input ends, reducer emits all buffered sessions enriching them with the same user.

join dataflow v1

In the code below, I create and use domain-specific container class Any which can hold single user, single session or both. Instances of Any are used to store all types of data: input files with sessions contain Any{session}, input file with users contain Any{user}, and output files with enriched sessions will contain both fields: Any{session, user}. This is a common trick that makes moving objects of different classes from mappers to reducers and from one job to another one easy. Otherwise, you’d have to create small separate container class for every job and setup Job appropriately. Code below uses handmade Any class that implements Writable interface. Multi-job task would require significantly more fields in Any class than just user and session to cover the needs of the task at hand. As such, for real-world problems I recommend to codegenerate Any by using protobuf or similar tool to avoid tedious and error-prone job of implementing Writable methods by hand.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
public class Any implements Writable { public Session session; public User user; //... } protected void map(NullWritable key, Any value, Mapper<NullWritable, Any, LongWritable, Any>.Context context) { if (value.session != null) { // comes from sessions file(s) context.write(new LongWritable(value.session.uid), new Any(value.session)); } if (value.user != null) { // comes from users file(s) context.write(new LongWritable(value.user.uid), new Any(value.user)); } } protected void reduce(LongWritable key, Iterable<Any> values, Reducer<LongWritable, Any, NullWritable, Any>.Context context) { List<Session> sessions = new ArrayList<Session>(); User user = null; for (Any value : values) { if (value.session != null) { sessions.add(new Session(value.session)); } if (value.user != null) { if (user == null) { // ignore duplicate user objects user = new User(value.user); } } } for (Session session : sessions) { context.write(NullWritable.get(), new Any(session, user)); } }

Alas, above code is doomed to fail. It assumes that all sessions of a single user fit into the RAM of one reducer process. While it is true for the vast majority of users, there will be definitely users with thousands of sessions, either real people or bots. The result is out of memory error in some of the reducers:

2017-02-12 21:48:58,395 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: Java heap space
        at mrhints.Session.readFields(Session.java:31)
        at mrhints.Any.readField(Any.java:52)
        at mrhints.Any.readFields(Any.java:37)
        at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
        ...

2017-02-12 21:48:58,842 INFO  [main] mapreduce.Job (Job.java:printTaskEvents(1453)) - Task Id : attempt_1486921044666_0003_r_000000_0, Status : FAILED
Error: Java heap space

What can we do? Fast and dirty solution is to limit the number of buffered sessions at the cost of losing some data. Once reduce() accumulates max allowed number of sessions per single uid, all further sessions are simply skipped. Of course, we would like to avoid that: data is a precious resource for analysis to simply through it away. We need some better solution.

Brief observation of the above piece of code reveals the source of the problem: user object may appear in any position in the values stream. This is the only reason why we have to buffer all sessions in RAM, even though we do not perform any pairwise operations on them. It would be great if we could somehow force user object to appear as the very first object in the values stream. In this case, it won’t be necessary to buffer all sessions. Instead, we would be able to enrich them and stream directly from the input to the output one by one. Luckily M/R provides low level API specifically designed for such sort of problems. But to use it, we need to understand how data is processed between mappers and reducers.

When some key/value pair is emitted from mapper, it immediately goes to partitioner, which is responsible for deciding which reducer will get this key/value pair. Default partitioner implementation scatters key/value pairs between reducers based on key.hashCode() % job.getNumReduceTasks(), thus ensuring that pairs with identical keys will be delivered to the same reducer and that keys (but not key/value pairs) will be uniformly distributed between reducers.

join mr framework

Reducer’s job is more complicated, it consists of three distinct phases. During the first phase, reducer copies key/value pairs emitted by map() functions from all mappers. In web interface, this phase is called reduce > copy and it occupies progress range between 0% and 33%. Next phase is dedicated entirely to sorting of copied data into large sequential stream (reduce > sort, 33%..66%). Key/value pairs become ordered in this stream according to sort comparator. Similar to partitioner, its default implementation compares whole keys: key/value pairs with identical keys are located adjacently in this stream, but their pairwise order is unspecified. For example, it may produce following sequence of key/value pairs: (k1, 15), (k1, 9), (k1, 10), (k1, 4). Finally, reducer transitions to reduce > reduce phase (66%..100%) where actual reducing is done. It scans sorted stream from the beginning and uses grouping comparator to extract subsequences of key/value pairs which should be processed in single reduce() call. If grouping comparator says that next key is identical to the previous one, then its value is passed to the current reduce() call. If grouping comparator says that next key is different, then reducer completes current reduce() call (values.hasNext() returns false) and starts new one. Hadoop’s ability to use different sort and grouping comparators makes it possible to sort not only keys, but also values (implicitly) — exactly what we need.

Name API method Responsibility

Partitioner

Job.setPartitionerClass()

Chooses which of reducers gets key/value pair

Sort comparator

Job.setSortComparatorClass()

Orders key/value pairs in the input stream of a single reducer

Grouping comparator

Job.setGroupingComparatorClass()

Chops sequences of key/value pairs in an input stream of a single reducer, each such sequence is triggers separate reduce() call

Actual implementation of the above logic involves a number of optimizations. It is noteworthy that sort comparator works on both mappers and reducers. Mappers sort output directly on the fly. Reducers benefit from this because now all they need to do is to run merge sort on a number of already sorted streams from mappers, which is significantly cheaper than sorting everything from scratch. As such, do not get frustrated if your erronous implementation of sort comparator triggers mapper failure.

Let’s return back to our problem. What we need to do is to create composite key JoinKey consisting of two fields: first one is already mentioned uid, and second one is the flag identifying what is stored in value, user (0) or session (1). Such composite key makes it possible to order key/value pairs in a reducer stream in such way that for each uid, its user object appears first (if exists), followed by a sequence of sessions. Partitioner and grouping comparator must be manually implemented. We want them to work as before, by comparing only uid field and ignoring value type field. This ensures that all objects with the same uid are routed to the same reducer and to the same reduce() call. Sorting comparator, on the other hand, compares both fields. First it compares uid fields, and if they turn out to be equal, it further compares value type fields, with user object considered to be “less” than session object.

join dataflow v2

Now reducer knows that user object (if present) always is the very first object in values stream. This allows us to reimplement reducer to use only O(1) memory no matter how large reducer input is.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
public static class JoinKey implements Writable { public long uid; public ValueType vtype; //... } protected void map(NullWritable key, Any value, Mapper<NullWritable, Any, JoinKey, Any>.Context context) { if (value.session != null) { context.write(new JoinKey(value.session.uid, ValueType.SESSION), new Any(value.session)); } if (value.user != null) { context.write(new JoinKey(value.user.uid, ValueType.USER), new Any(value.user)); } } public static class PartitionerImpl extends Partitioner<JoinKey, Any> { public int getPartition(JoinKey key, Any any, int numPartitions) { return ((int)(key.uid & 0x7fff_ffff)) % numPartitions; } } public static class SortComparatorImpl implements RawComparator<JoinKey> { public int compare(byte[] buf1, int off1, int len1, byte[] buf2, int off2, int len2) { JoinKey key1 = Utils.readObject(new JoinKey(), buf1, off1, len1); JoinKey key2 = Utils.readObject(new JoinKey(), buf2, off2, len2); return Utils.compareMultilevel( Utils.compareLongs(key1.uid, key2.uid), Utils.compareInts(key1.vtype.ordinal(), key2.vtype.ordinal()) ); } } public static class GroupingComparatorImpl implements RawComparator<JoinKey> { public int compare(byte[] buf1, int off1, int len1, byte[] buf2, int off2, int len2) { JoinKey key1 = Utils.readObject(new JoinKey(), buf1, off1, len1); JoinKey key2 = Utils.readObject(new JoinKey(), buf2, off2, len2); return Utils.compareLongs(key1.uid, key2.uid); } } protected void reduce(JoinKey key, Iterable<Any> values, Reducer<JoinKey, Any, NullWritable, Any>.Context context) { User user = null; for (Any value : values) { if (value.user != null) { // the very first record should be *user* if (user == null) { user = new User(value.user); } } if (value.session != null) { // all further records should be *sessions* if (user != null) { context.write(NullWritable.get(), new Any(value.session, user)); } } } }

Although it works as expected and doesn’t fail with out of memory error anymore, there is still one severe problem left. Number of sessions for some uid may be so huge, that reducer that joins its objects runs tens or hundreds times longer than other reducers. When this happens, you can see that all reducers but one complete in a matter of minutes, while this poor guy is projected to last for one full day more. This is unacceptable considering that join job is only a single step in multistage dataflow. Add to that that if MapReduce cluster goes down, then this job has to be restarted from the very beginning. The most pessimistic scenario consists of never-ending loop: start the job, wait until cluster failure, curse Hadoop, repeat. In general, long-lasting jobs is the primary coding problem with M/R. It makes any ETA forecasting absolutely unreliable. Deprived of any sense of confidence, programmers prefer to avoid batch processing in general and Hadoop in particular.

join slow reducer

To solve the problem of non-uniform reducers, we will extend join key again. Idea is to split sessions of a single uid into multiple reduce() calls, like if they belonged to different users. This is fine because we need only to join user object with every single session, but we do not have any operations that require two sessions to be present in a single place simultaneously. So, we are going to add third field to our JoinKey — shard number. We select number of shards SHARDS from range [5 .. 100], so that running time of heaviest reducer divided by SHARDS becomes acceptable. The larger SHARDS value, the more uniform reducer load becomes. But too large value would generate large amount of excessive data to sort and transfer between mappers and reducers.

join dataflow v3

Now we think of a pair {uid, shard} as of true join key, while full triplet — {{uid, shard}, vtype} — is used only for sorting as in previous version. When mapper encounters session object, its shard is computed as session.hashCode()%SHARDS, thus ensuring that sessions of a single uid will be scattered almost uniformly between reducers rather than delivered to a single reducer. When mapper encounters user object, it emits SHARDS identical values but with different keys: their shard value runs from 0 to SHARDS-1. Implementation of reducer is not changed from previous version.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
public static class JoinKey implements Writable { public long uid; public ValueType vtype; public int shard; ... } protected void map(NullWritable key, Any value, Mapper<NullWritable, Any, JoinKey, Any>.Context context) { if (value.session != null) { int shard = (Arrays.hashCode(value.session.data) & 0x7fff_ffff) % SHARDS; // [0..SHARDS-1] context.write(new JoinKey(value.session.uid, ValueType.SESSION, shard), new Any(value.session)); } if (value.user != null) { for (int shard = 0; shard < SHARDS; shard++) { context.write(new JoinKey(value.user.uid, ValueType.USER, shard), new Any(value.user)); } } } public static class PartitionerImpl extends Partitioner<JoinKey, Any> { public int getPartition(JoinKey key, Any any, int numPartitions) { int hash = 1; hash = hash * 13 + (int)(key.uid & 0xffff_ffff); hash = hash * 13 + key.shard; return (hash & 0x7fff_ffff) % numPartitions; } } public static class SortComparatorImpl implements RawComparator<JoinKey> { public int compare(byte[] buf1, int off1, int len1, byte[] buf2, int off2, int len2) { JoinKey key1 = Utils.readObject(new JoinKey(), buf1, off1, len1); JoinKey key2 = Utils.readObject(new JoinKey(), buf2, off2, len2); return Utils.compareMultilevel( Utils.compareLongs(key1.uid, key2.uid), Utils.compareInts(key1.shard, key2.shard), Utils.compareInts(key1.vtype.ordinal(), key2.vtype.ordinal()) ); } } public static class GroupingComparatorImpl implements RawComparator<JoinKey> { public int compare(byte[] buf1, int off1, int len1, byte[] buf2, int off2, int len2) { JoinKey key1 = Utils.readObject(new JoinKey(), buf1, off1, len1); JoinKey key2 = Utils.readObject(new JoinKey(), buf2, off2, len2); return Utils.compareMultilevel( Utils.compareLongs(key1.uid, key2.uid), Utils.compareInts(key1.shard, key2.shard) ); } }

Above piece of code completes the solution of the join problem. It works for any input data volume and takes acceptable amount of time to complete. Experienced programmer would further add number of improvements which I omitted for the sake of clarity: avoid deserialization in comparators, increase object reuse, add counters and asserts. Anyway, above solution should give you the feeling that coding with MapReduce is not so easy as described in tutorials. In general, simplicity of the MapReduce concept is a benefit and a drawback at the same time. On the one hand, its basics can be learned very fast. On the other hand, even basic problems require constructing complicated dataflow because of overly simplistic MapReduce model.

Sources

Full source code can be found at github.