Is it OK to set mapreduce.{map|reduce}.failures.maxpercent > 0?

Above pair of options (which were previously named mapred.{min|max}.{map|reduce}.failures.percent) allow some tasks to fail without aborting the whole job. Normally, if at least one task fails (because all of its attempts failed) then the whole job is terminated immediately. But when the above options are set to values higher than zero, the job is allowed to succeed even when some percentage of the tasks fails, meaning that some portion of input data may be skipped. Most of the time this is unacceptable, but there are also cases when it is helpful, typically in dirty data mining.

For example, consider the task of parsing and analyzing billions of binary files crawled from the web by using random walk, like images or Excel spreadsheets. Web is really dirty. There are high chances that input data contains some malicious files which will cause parser to fail with segfault or with out of memory error. What are we going to do in this case? Isolating malicious file, debugging and fixing the parser would be complete waste of time. And if we manage to do so, there is no guarantee that the parser wouldn’t fail again in another place when parsing another malicious file. We need more robust solution. This is when failures.maxpercent becomes handy. We can solve our problem much easier just by adding single line to our job configuration: job.setInt("mapreduce.map.failures.maxpercent", 2);. Now the job will succeed even if up to 2% of tasks fail (which can be translated to 2% of input data).

One non-obvious downside of failures.maxpercent is that it may break further steps in data pipeline due to idempotency violation. You would normally expect that if you run the same job twice with the same input data, then it will produce identical outputs. However, in case when failures.maxpercent is higher than zero, this is not true. Multiple runs with the same input data may produce different results. This may create hard-to-debug issues. How about a job that deletes records in a database except the "best ones"? Consider the following workflow. First we make a dump of the database into HDFS. Then we run a job that reads this dump and decides for every group of records, which one record must be retained and which ones must be deleted from the database. Now, for some reason, we run the job twice, and we do so with failures.maxpercent enabled. If some tasks fail, it will have the effect as if two job instances observed slightly different input. One possible scenario is that first job observes and processes only records (A, B, C, D) and decides to retain B. Second job processes only records (A, B, C, E) and decides to retain E. Combined, these two jobs delete all five records from the database, hence breaking the very purpose of database cleanup.

Hadoop kills my job because of timeout in custom output format

It is common problem that you want to save data in some special format, and this format is not of "streaming" type, i.e. it accumulates all key/values of a single reducer in memory, then performs some complex aggregate analysis on it, and only then dumps the data. It is also quite common that it is performed by some third party library, possibly written in C or C++, maybe even proprietary, which has absolutely no knowledge of and no reference to Hadoop. For example, consider some library that takes long list of string values, then builds optimal compression dictionary of it (at least O(n^2) time), and only then dumps string values in compressed format. Building compression dictionary is the troublesome step because it will block the execution of the task attempt for quite long time. Provided that the amount of input data is huge, it will take more time than mapreduce.task.timeout, after which Hadoop framework will kill the attempt because no progress() was made in that amount of time.

The straightforward solution would be just to increase mapreduce.task.timeout to a higher value or to disable timeout altogether by setting option value to 0. However, such approach is not very robust:

  1. You may not know in advance the upper boundary of time it takes to perform non-progressable operation. And if you do, it would be only a rough guess because actual timing depends on the resource usage by adjacent tasks, which is unpredictable.

  2. It affects other steps of MR job, including IO, map() and reduce() functions and so on. Thus, Hadoop framework won’t be able to kill an attempt if it truly hangs.

The better solution is to spawn heartbeat thread that would call progress() every couple of seconds while non-progressable operation is running. Heartbeat thread should be started immediately before entering non-progressable section and stopped immediately after exiting it.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
public class AutoProgress { private Thread t; private Object o = new Object(); private boolean isStopping = false; private TaskAttemptContext ctx; public AutoProgress(TaskAttemptContext ctx) { this.ctx = ctx; } public void start() { t = new Thread() { @Override public void run() { while (true) { synchronized (o) { if (isStopping) { return; } ctx.progress(); try { o.wait(1000); } catch (InterruptedException exc) { return; } } } } }; t.setName("AutoProgress"); t.setDaemon(true); t.start(); } public void stop() { synchronized (o) { isStopping = true; o.notify(); } while (t.isAlive()); } } // custom format public static class DictionaryCompressedOutputFormat extends FileOutputFormat<NullWritable, Text> { public DictionaryCompressedOutputFormat() { super(); } @Override public RecordWriter<NullWritable, Text> getRecordWriter(TaskAttemptContext ctx) throws IOException, InterruptedException { Configuration conf = ctx.getConfiguration(); Path path = getDefaultWorkFile(ctx, ".bin"); FileSystem fs = path.getFileSystem(conf); final FSDataOutputStream stream = fs.create(path, false); return new RecordWriter<NullWritable, Text>() { private List<String> values = new ArrayList<String>(); @Override public void write(NullWritable key, Text value) throws IOException, InterruptedException { values.add(value.toString()); } @Override public void close(TaskAttemptContext ctx) throws IOException, InterruptedException { AutoProgress p = new AutoProgress(ctx); p.start(); Dictionary dict = null; try { dict = Dictionary.buildOptimalDictionary(values); } finally { p.stop(); } stream.writeInt(values.size()); for (String value : values) { byte[] data = dict.encode(value); stream.writeInt(data.length); stream.write(data); } stream.close(); } }; } }

One of task attempts is hung

Debugging bigdata applications is tedious job. No matter how hard you test your application, there is always a chance that it will work incorrectly on production data. This is simply because tests are performed on some very limited dataset compared to production volumes.

If some task attempt is hung, usually the fastest way you can learn more about it is to find out its stack trace. In all versions of Hadoop you can do this the hard way:

  1. Locate task attempt in the web UI. You will need to traverse along a long list of links, something like this: YARN resource manager UI → application_xxxxxx → appattempt_xxxxxx → Tracking URL: Appliation Master → job_xxxxxx → Reduce running tasks (1) → task_xxxxxx

  2. Log in to the server where task attempt is running: ssh <Node>

  3. Find out PID of the process with ps aux | grep <Attempt>

  4. Send SIGQUIT to it: kill -3 <pid>

JVM installs signal handler for SIGQUIT that dumps stacktraces of all threads to stdout. You can capture the output from web UI (→ logs → stdout) or directly from task attempt local log file in the server where you logged in. Stack trace should give you enough information about where you should focus on finding the bug.

I am tired of implementing Writable interface

Manual implementation of Writable interface makes sense only for very small classes and typically only for keys (to speed up comparison). For larger classes it is cheaper to use codegenerating with serialization, like protobuf or thrift along with Twitter’s elephant-bird wrapper.

More than that, I advise to encapsulate all classes used in the whole data pipeline into single outer class (I call it Any in the example below). Rationale is to reduce number of entities and, in turn, number of errors related to incorrect specification of setOutputKeyClass(), setMapOutputKeyClass() and so on. If single outer class is used, then you can always store data in sequence files in format (NullWritable, ProtobufWritable<Any>). If you add more jobs to your dataflow, you just need to add additional fields into Any class. Without Any, you probably would need to create at least two separate message classes for every job: one is to act as map output value class, and another one is for job output value class.

Below is the skeleton for a job that takes an HTML page and tries to parse it as an article into title, keywords and so on. First of all we create a protobuf message:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
message Keyword { required string keyword = 1; required double weight = 2; } message Point { required double lat = 1; required double lon = 2; } message Location { required string name = 1; repeated Point polygon = 2; } message Any { optional string url = 1; optional string html = 2; optional string title = 3; repeated Keyword keywords = 4; optional Location location = 5; optional bytes image = 6; // next ID to use: 7 // this message can be really huge }

Input collection is stored as sequence file of NullWritable as key and Any{url, html} as a value, meaning that only two fields are set. Output of the job is stored as sequence file of NullWritable and Any{url, title, keywords, location, image}.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
public static class MapperImpl extends Mapper<NullWritable, ProtobufWritable<Any>, NullWritable, ProtobufWritable<Any>> { @Override protected void map(NullWritable key, ProtobufWritable<Any> value, Mapper<NullWritable, ProtobufWritable<Any>, NullWritable, ProtobufWritable<Any>>.Context context) throws IOException, InterruptedException { ParseContext pc = new ParseContext(value.get().getHtml()); if (pc.parses()) { Any any = Any.newBuilder() .setUrl(value.get().getUrl()) .addAllKeywords(...) .setLocation(...) .build(); context.write(NullWritable.get(), new ProtobufWritable<Any>(any, new TypeRef<Any>(){})); } } } ... job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(ProtobufWritable.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setOutputPath(job, new Path("hdfs://...")); SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);

My MR job uses only single mapper for every input file

Your input files are not in splittable format. The most probable cause is that they are in ".txt.gz" format. You would be fine if you used raw text files or if you used SequenceFile (with any compression type and codec), but not gzipped text files.

When job is started, Hadoop generates a number of splits for every input file, one split is for one mapper. Single split is a tuple of (path, offset, length) except for some exotic input formats. You would expect that a split is created by key/value boundary, but it is not true. The reason is that in order for split to be created by key/value boundary, the file must be read entirely in advance, which is unacceptable. Instead, every file is divided into splits of exactly block size bytes, ignoring the fact that single key/value pair may easily span two adjacent splits. Now, when mapper starts, it opens the file, seeks offset, and then synchronizes to the beginning of the next record.

KeyValueInputStream stream = inputFormat.open(...);
stream.seek(split.offset());
stream.synchronizeUntilNextRecord();
while (stream.position() < split.offset() + split.length()) {
  handle(stream.readRecord());
}

Such algorithm guarantees that every record will be processed once and only once.

splits

Synchronization implementation depends on file format. In plain text files, for example, every single line is a single key/value. To synchronize to the next record from arbitrary binary position, you just need to read bytes until '\n' is encountered. SequenceFile implements synchronization by incorporating special unique binary markers between key/values. But there is no way to synchronize to the next record in gzipped text files without reading the file from the very beginning. Thus, for every *.txt.gz., Hadoop creates single split (path, offset=0, length=path.size()) no matter how large the file is.

Where can I find all configuration options?

All options, their default values and descriptions reside inside *-default.xml files: core-default.xml, hdfs-default.xml, mapred-default.xml, yarn-default.xml. Hadoop web site hosts most recent version of every default configuration file rendered into easily readable HTML table. So, the easiest way to find full list of options is just to search for one of the above files on the web.

Another way is to extract default configuration file directly from respective JAR file. This will get you exactly the version used in your environment. For example, mapred-default.xml file is packaged inside hadoop-mapreduce-client-core JAR and can be extracted with the following commands:

$ cd $HADOOP_HOME
$ find . -name "hadoop-mapreduce-client-core*"
./share/hadoop/mapreduce/sources/hadoop-mapreduce-client-core-2.9.2-sources.jar
./share/hadoop/mapreduce/sources/hadoop-mapreduce-client-core-2.9.2-test-sources.jar
./share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.9.2.jar
$ cp hadoop-mapreduce-client-core-2.9.2.jar /tmp/
$ cd /tmp
$ jar xvf hadoop-mapreduce-client-core-2.9.2.jar
<all files are extracted into current directory>
$ cat mapred-default.xml

What compression settings to use?

You most likely want to compress output files, but not the intermediate files between mappers and reducers (mapreduce.map.output.compress).

When using compression, do not use heavy codecs like bz2. If you’re working with Hadoop, then capacity and throughput are not the issues. The bottleneck is CPU time it takes to compress and decompress the data. So, stick with faster codecs like lz4, lzo or snappy.

Use block compression instead of per-record. If records are small, then compressing every single record independently won’t provide any reduction in size. There is not way codec is able to build sufficiently large dictionary over such small piece of data. But it will anyway consume CPU cycles. Record compression makes sense only when records are large enough and SequenceFile is used as random-access key-value storage, like in MapFile. In other cases block compression is better because it compresses every decently-sized group of records, thus giving codec more opportunity to build high-quality dictionary.

Finally, watch out for warnings about inability to load native libraries. You definitely want to use fast native libraries for compression and not slow pure java implementations.

Assuming that output format is SequenceFile, the following piece of code demonstrates sane compression settings:

1 2 3 4 5
job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setOutputPath(job, new Path("hdfs://.../output_dir")); SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); SequenceFileOutputFormat.setOutputCompressorClass(job, Lz4Codec.class);

Idling reducers prevent other jobs from running

The most probable cause is that you use default value for option mapreduce.job.reduce.slowstart.completedmaps, which is 0.05. While the number of completed map tasks is less than 5%, no reducers are started. But as soon as 5% mappers are finished, reducers are launched. There is not limit on the number of started reducers. If cluster resources are ample at this moment, then they all will start.

The term "to start" is an overstatement. Because only 5% of the mappers are finished, reducers won’t be able to do anything useful apart from occasionally fetching data from recently completed mappers. All other time they will be simply idling. But they still occupy cluster resources, leading to resource underutilization and preventing other jobs from being executed.

In order to prevent reducers from idling, increase the aforementioned option to much higher value, 0.8 or even 0.95. Reducers will start only after 80% (95%) of mappers are finished, and they will have immediately a lot of work to do. Alternatively, you can put a limit on a number of simultaneously running reducers per job with the option mapreduce.job.running.reduce.limit, or to tune the scheduler.

How do I pass data from job driver to the tasks?

It is quite common that you need to pass some data, such as runtime parameters, from job driver to mappers and reducers. There are a lot of ways to achieve this depending on data size. If the data is just a couple of primitive values, you can put them directly into job’s Configuration one by one by unique names and retrieve later in Mapper.setup() and Reducer.setup() methods.

For more complex data you can create separate class, serialize it into base64 and pass to Configuration as a single option, as demonstrated below. Obviously, this approach should be used only if data size is relatively small, up to 100 kilobytes.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
public static class Coeffs implements Serializable { private static final long serialVersionUID = 1L; private static final String CONF_NAME = "job.coeffs"; public double[][] transform_mat; public double threshold_x; public double threshold_y; public double threshold_z; public static void store(Configuration conf, Coeffs data) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(data); oos.close(); String x = Base64.getEncoder().encodeToString(baos.toByteArray()); conf.set(CONF_NAME, x); } public static Coeffs load(Configuration conf) throws IOException, ClassNotFoundException { String x = conf.get(CONF_NAME); ByteArrayInputStream bais = new ByteArrayInputStream(Base64.getDecoder().decode(x)); ObjectInputStream ois = new ObjectInputStream(bais); return (Coeffs)ois.readObject(); } }

For even larger data sizes, MapReduce framework provides DistributedCache. See its API reference for detailed instructions. Data that you pass via distributed cache is copied onto local filesystem of every node. The benefit of using distributed cache is that only single copy of data per node is made, no matter how many job tasks are running on the node.

Finally, you can always use HDFS directly. Put your data into HDFS and set some option in your job configuration to let mappers and reducers know where to look for the data. Next you can use this data in any way you wish. But bear in mind that order in which mappers and reducers are launched is unpredictable, and that tasks can fail and can also be executed speculatively (multiple identical tasks running at the same time, as soon as one of the tasks completes successfully, other tasks become unnecessary and are killed). To be independent from the task execution order, you should keep data in HDFS read-only.

How do I correctly configure DNS?

Arguably, DNS is the most troublesome piece of setup for any distributed system, not only Hadoop. To ensure smooth operation, you should do all of the following:

  1. Select unique hostname for every cluster node, e.g. "node1", "node2", and so on.

  2. Ensure that every node "knows" its own hostname correctly. Command hostname must returned the hostname you selected and not something else. To change hostname, you typically (depends on Linux distribution) need to edit /etc/hostname and reboot the machine.

  3. Ensure that your DNS server is configured properly for forward lookup. This means that no matter which node your are currently logged in to, the command nslookup <nodename> returns the primary IP address of the respective node for every node in the cluster.

  4. Ensure that DNS server is configured properly for reverse lookup. No matter which node you are currently logged in to, the command nslookup <primary_ip_address> must return selected hostname.

  5. Use selected hostnames in all Hadoop configuration files (/etc/hadoop/slaves, hdfs-site.xml, etc) and in paths (hdfs://node1:9000/a/b/c).

You should ideally avoid aliases. This doesn’t mean that cluster won’t work with aliases, but avoiding aliases will make setup easier.