in Big Data | Hadoop by
Task Execution & Environment MapReducer

1 Answer

0 votes
by

The MRAppMaster executes the Mapper/Reducer task as a child process in a separate jvm.

The child-task inherits the environment of the parent MRAppMaster. The user can specify additional options to the child-jvm via the mapreduce.{map|reduce}.java.opts and configuration parameter in the Job such as non-standard paths for the run-time linker to search shared libraries via -Djava.library.path=<> etc. If the mapreduce.{map|reduce}.java.opts parameters contains the symbol @taskid@ it is interpolated with value of taskid of the MapReduce task.

Here is an example with multiple arguments and substitutions, showing jvm GC logging, and start of a passwordless JVM JMX agent so that it can connect with jconsole and the likes to watch child memory, threads and get thread dumps. It also sets the maximum heap-size of the map and reduce child jvm to 512MB & 1024MB respectively. It also adds an additional path to the java.library.path of the child-jvm.

<property>

  <name>mapreduce.map.java.opts</name>

  <value>

  -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc

  -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false

  </value>

</property>

<property>

  <name>mapreduce.reduce.java.opts</name>

  <value>

  -Xmx1024M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc

  -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false

  </value>

</property>

Memory Management

Users/admins can also specify the maximum virtual memory of the launched child-task, and any sub-process it launches recursively, using mapreduce.{map|reduce}.memory.mb. Note that the value set here is a per process limit. The value for mapreduce.{map|reduce}.memory.mb should be specified in mega bytes (MB). And also the value must be greater than or equal to the -Xmx passed to JavaVM, else the VM might not start.

Note: mapreduce.{map|reduce}.java.opts are used only for configuring the launched child tasks from MRAppMaster. Configuring the memory options for daemons is documented in Configuring the Environment of the Hadoop Daemons.

The memory available to some parts of the framework is also configurable. In map and reduce tasks, performance may be influenced by adjusting parameters influencing the concurrency of operations and the frequency with which data will hit disk. Monitoring the filesystem counters for a job- particularly relative to byte counts from the map and into the reduce- is invaluable to the tuning of these parameters.

Map Parameters

A record emitted from a map will be serialized into a buffer and metadata will be stored into accounting buffers. As described in the following options, when either the serialization buffer or the metadata exceed a threshold, the contents of the buffers will be sorted and written to disk in the background while the map continues to output records. If either buffer fills completely while the spill is in progress, the map thread will block. When the map is finished, any remaining records are written to disk and all on-disk segments are merged into a single file. Minimizing the number of spills to disk can decrease map time, but a larger buffer also decreases the memory available to the mapper.

Name Type Description

mapreduce.task.io.sort.mb int The cumulative size of the serialization and accounting buffers storing records emitted from the map, in megabytes.

mapreduce.map.sort.spill.percent float The soft limit in the serialization buffer. Once reached, a thread will begin to spill the contents to disk in the background.

Other notes

If either spill threshold is exceeded while a spill is in progress, collection will continue until the spill is finished. For example, if mapreduce.map.sort.spill.percent is set to 0.33, and the remainder of the buffer is filled while the spill runs, the next spill will include all the collected records, or 0.66 of the buffer, and will not generate additional spills. In other words, the thresholds are defining triggers, not blocking.

A record larger than the serialization buffer will first trigger a spill, then be spilled to a separate file. It is undefined whether or not this record will first pass through the combiner.

Shuffle/Reduce Parameters

As described previously, each reduce fetches the output assigned to it by the Partitioner via HTTP into memory and periodically merges these outputs to disk. If intermediate compression of map outputs is turned on, each output is decompressed into memory. The following options affect the frequency of these merges to disk prior to the reduce and the memory allocated to map output during the reduce.

Name Type Description

mapreduce.task.io.soft.factor int Specifies the number of segments on disk to be merged at the same time. It limits the number of open files and compression codecs during merge. If the number of files exceeds this limit, the merge will proceed in several passes. Though this limit also applies to the map, most jobs should be configured so that hitting this limit is unlikely there.

mapreduce.reduce.merge.inmem.thresholds int The number of sorted map outputs fetched into memory before being merged to disk. Like the spill thresholds in the preceding note, this is not defining a unit of partition, but a trigger. In practice, this is usually set very high (1000) or disabled (0), since merging in-memory segments is often less expensive than merging from disk (see notes following this table). This threshold influences only the frequency of in-memory merges during the shuffle.

mapreduce.reduce.shuffle.merge.percent float The memory threshold for fetched map outputs before an in-memory merge is started, expressed as a percentage of memory allocated to storing map outputs in memory. Since map outputs that can’t fit in memory can be stalled, setting this high may decrease parallelism between the fetch and merge. Conversely, values as high as 1.0 have been effective for reduces whose input can fit entirely in memory. This parameter influences only the frequency of in-memory merges during the shuffle.

mapreduce.reduce.shuffle.input.buffer.percent float The percentage of memory- relative to the maximum heapsize as typically specified in mapreduce.reduce.java.opts- that can be allocated to storing map outputs during the shuffle. Though some memory should be set aside for the framework, in general it is advantageous to set this high enough to store large and numerous map outputs.

mapreduce.reduce.input.buffer.percent float The percentage of memory relative to the maximum heapsize in which map outputs may be retained during the reduce. When the reduce begins, map outputs will be merged to disk until those that remain are under the resource limit this defines. By default, all map outputs are merged to disk before the reduce begins to maximize the memory available to the reduce. For less memory-intensive reduces, this should be increased to avoid trips to disk.

Other notes

If a map output is larger than 25 percent of the memory allocated to copying map outputs, it will be written directly to disk without first staging through memory.

When running with a combiner, the reasoning about high merge thresholds and large buffers may not hold. For merges started before all map outputs have been fetched, the combiner is run while spilling to disk. In some cases, one can obtain better reduce times by spending resources combining map outputs- making disk spills small and parallelizing spilling and fetching- rather than aggressively increasing buffer sizes.

When merging in-memory map outputs to disk to begin the reduce, if an intermediate merge is necessary because there are segments to spill and at least mapreduce.task.io.sort.factor segments already on disk, the in-memory map outputs will be part of the intermediate merge.

Configured Parameters

The following properties are localized in the job configuration for each task’s execution:

Name Type Description

mapreduce.job.id String The job id

mapreduce.job.jar String job.jar location in job directory

mapreduce.job.local.dir String The job specific shared scratch space

mapreduce.task.id String The task id

mapreduce.task.attempt.id String The task attempt id

mapreduce.task.is.map boolean Is this a map task

mapreduce.task.partition int The id of the task within the job

mapreduce.map.input.file String The filename that the map is reading from

mapreduce.map.input.start long The offset of the start of the map input split

mapreduce.map.input.length long The number of bytes in the map input split

mapreduce.task.output.dir String The task’s temporary output directory

Note: During the execution of a streaming job, the names of the “mapreduce” parameters are transformed. The dots ( . ) become underscores ( _ ). For example, mapreduce.job.id becomes mapreduce_job_id and mapreduce.job.jar becomes mapreduce_job_jar. To get the values in a streaming job’s mapper/reducer use the parameter names with the underscores.

Task Logs

The standard output (stdout) and error (stderr) streams and the syslog of the task are read by the NodeManager and logged to ${HADOOP_LOG_DIR}/userlogs.

Distributing Libraries

The DistributedCache can also be used to distribute both jars and native libraries for use in the map and/or reduce tasks. The child-jvm always has its current working directory added to the java.library.path and LD_LIBRARY_PATH. And hence the cached libraries can be loaded via System.loadLibrary or System.load. More details on how to load shared libraries through distributed cache are documented at Native Libraries.

...