001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred; 020 021 022import java.io.IOException; 023import java.util.regex.Matcher; 024import java.util.regex.Pattern; 025 026import com.google.common.annotations.VisibleForTesting; 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceAudience.Private; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.io.LongWritable; 037import org.apache.hadoop.io.RawComparator; 038import org.apache.hadoop.io.Text; 039import org.apache.hadoop.io.WritableComparable; 040import org.apache.hadoop.io.WritableComparator; 041import org.apache.hadoop.io.compress.CompressionCodec; 042import org.apache.hadoop.mapred.lib.HashPartitioner; 043import org.apache.hadoop.mapred.lib.IdentityMapper; 044import org.apache.hadoop.mapred.lib.IdentityReducer; 045import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator; 046import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner; 047import org.apache.hadoop.mapreduce.MRConfig; 048import org.apache.hadoop.mapreduce.MRJobConfig; 049import org.apache.hadoop.mapreduce.TaskType; 050import org.apache.hadoop.mapreduce.filecache.DistributedCache; 051import org.apache.hadoop.mapreduce.util.ConfigUtil; 052import org.apache.hadoop.security.Credentials; 053import org.apache.hadoop.util.ClassUtil; 054import org.apache.hadoop.util.ReflectionUtils; 055import org.apache.hadoop.util.Tool; 056import org.apache.log4j.Level; 057 058/** 059 * A map/reduce job configuration. 060 * 061 * <p><code>JobConf</code> is the primary interface for a user to describe a 062 * map-reduce job to the Hadoop framework for execution. The framework tries to 063 * faithfully execute the job as-is described by <code>JobConf</code>, however: 064 * <ol> 065 * <li> 066 * Some configuration parameters might have been marked as 067 * <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams"> 068 * final</a> by administrators and hence cannot be altered. 069 * </li> 070 * <li> 071 * While some job parameters are straight-forward to set 072 * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly 073 * with the rest of the framework and/or job-configuration and is relatively 074 * more complex for the user to control finely 075 * (e.g. {@link #setNumMapTasks(int)}). 076 * </li> 077 * </ol> 078 * 079 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 080 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 081 * {@link OutputFormat} implementations to be used etc. 082 * 083 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 084 * of the job such as <code>Comparator</code>s to be used, files to be put in 085 * the {@link DistributedCache}, whether or not intermediate and/or job outputs 086 * are to be compressed (and how), debugability via user-provided scripts 087 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}), 088 * for doing post-processing on task logs, task's stdout, stderr, syslog. 089 * and etc.</p> 090 * 091 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p> 092 * <p><blockquote><pre> 093 * // Create a new JobConf 094 * JobConf job = new JobConf(new Configuration(), MyJob.class); 095 * 096 * // Specify various job-specific parameters 097 * job.setJobName("myjob"); 098 * 099 * FileInputFormat.setInputPaths(job, new Path("in")); 100 * FileOutputFormat.setOutputPath(job, new Path("out")); 101 * 102 * job.setMapperClass(MyJob.MyMapper.class); 103 * job.setCombinerClass(MyJob.MyReducer.class); 104 * job.setReducerClass(MyJob.MyReducer.class); 105 * 106 * job.setInputFormat(SequenceFileInputFormat.class); 107 * job.setOutputFormat(SequenceFileOutputFormat.class); 108 * </pre></blockquote> 109 * 110 * @see JobClient 111 * @see ClusterStatus 112 * @see Tool 113 * @see DistributedCache 114 */ 115@InterfaceAudience.Public 116@InterfaceStability.Stable 117public class JobConf extends Configuration { 118 119 private static final Log LOG = LogFactory.getLog(JobConf.class); 120 private static final Pattern JAVA_OPTS_XMX_PATTERN = 121 Pattern.compile(".*(?:^|\\s)-Xmx(\\d+)([gGmMkK]?)(?:$|\\s).*"); 122 123 static{ 124 ConfigUtil.loadResources(); 125 } 126 127 /** 128 * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and 129 * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY} 130 */ 131 @Deprecated 132 public static final String MAPRED_TASK_MAXVMEM_PROPERTY = 133 "mapred.task.maxvmem"; 134 135 /** 136 * @deprecated 137 */ 138 @Deprecated 139 public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY = 140 "mapred.task.limit.maxvmem"; 141 142 /** 143 * @deprecated 144 */ 145 @Deprecated 146 public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY = 147 "mapred.task.default.maxvmem"; 148 149 /** 150 * @deprecated 151 */ 152 @Deprecated 153 public static final String MAPRED_TASK_MAXPMEM_PROPERTY = 154 "mapred.task.maxpmem"; 155 156 /** 157 * A value which if set for memory related configuration options, 158 * indicates that the options are turned off. 159 * Deprecated because it makes no sense in the context of MR2. 160 */ 161 @Deprecated 162 public static final long DISABLED_MEMORY_LIMIT = -1L; 163 164 /** 165 * Property name for the configuration property mapreduce.cluster.local.dir 166 */ 167 public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR; 168 169 /** 170 * Name of the queue to which jobs will be submitted, if no queue 171 * name is mentioned. 172 */ 173 public static final String DEFAULT_QUEUE_NAME = "default"; 174 175 static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY = 176 JobContext.MAP_MEMORY_MB; 177 178 static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY = 179 JobContext.REDUCE_MEMORY_MB; 180 181 /** 182 * The variable is kept for M/R 1.x applications, while M/R 2.x applications 183 * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} 184 */ 185 @Deprecated 186 public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY = 187 "mapred.job.map.memory.mb"; 188 189 /** 190 * The variable is kept for M/R 1.x applications, while M/R 2.x applications 191 * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY} 192 */ 193 @Deprecated 194 public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY = 195 "mapred.job.reduce.memory.mb"; 196 197 /** Pattern for the default unpacking behavior for job jars */ 198 public static final Pattern UNPACK_JAR_PATTERN_DEFAULT = 199 Pattern.compile("(?:classes/|lib/).*"); 200 201 /** 202 * Configuration key to set the java command line options for the child 203 * map and reduce tasks. 204 * 205 * Java opts for the task tracker child processes. 206 * The following symbol, if present, will be interpolated: @taskid@. 207 * It is replaced by current TaskID. Any other occurrences of '@' will go 208 * unchanged. 209 * For example, to enable verbose gc logging to a file named for the taskid in 210 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 211 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 212 * 213 * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 214 * other environment variables to the child processes. 215 * 216 * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 217 * {@link #MAPRED_REDUCE_TASK_JAVA_OPTS} 218 */ 219 @Deprecated 220 public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts"; 221 222 /** 223 * Configuration key to set the java command line options for the map tasks. 224 * 225 * Java opts for the task tracker child map processes. 226 * The following symbol, if present, will be interpolated: @taskid@. 227 * It is replaced by current TaskID. Any other occurrences of '@' will go 228 * unchanged. 229 * For example, to enable verbose gc logging to a file named for the taskid in 230 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 231 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 232 * 233 * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 234 * other environment variables to the map processes. 235 */ 236 public static final String MAPRED_MAP_TASK_JAVA_OPTS = 237 JobContext.MAP_JAVA_OPTS; 238 239 /** 240 * Configuration key to set the java command line options for the reduce tasks. 241 * 242 * Java opts for the task tracker child reduce processes. 243 * The following symbol, if present, will be interpolated: @taskid@. 244 * It is replaced by current TaskID. Any other occurrences of '@' will go 245 * unchanged. 246 * For example, to enable verbose gc logging to a file named for the taskid in 247 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 248 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 249 * 250 * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 251 * pass process environment variables to the reduce processes. 252 */ 253 public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 254 JobContext.REDUCE_JAVA_OPTS; 255 256 public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = ""; 257 258 /** 259 * @deprecated 260 * Configuration key to set the maximum virtual memory available to the child 261 * map and reduce tasks (in kilo-bytes). This has been deprecated and will no 262 * longer have any effect. 263 */ 264 @Deprecated 265 public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit"; 266 267 /** 268 * @deprecated 269 * Configuration key to set the maximum virtual memory available to the 270 * map tasks (in kilo-bytes). This has been deprecated and will no 271 * longer have any effect. 272 */ 273 @Deprecated 274 public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit"; 275 276 /** 277 * @deprecated 278 * Configuration key to set the maximum virtual memory available to the 279 * reduce tasks (in kilo-bytes). This has been deprecated and will no 280 * longer have any effect. 281 */ 282 @Deprecated 283 public static final String MAPRED_REDUCE_TASK_ULIMIT = 284 "mapreduce.reduce.ulimit"; 285 286 287 /** 288 * Configuration key to set the environment of the child map/reduce tasks. 289 * 290 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 291 * reference existing environment variables via <code>$key</code> on 292 * Linux or <code>%key%</code> on Windows. 293 * 294 * Example: 295 * <ul> 296 * <li> A=foo - This will set the env variable A to foo. </li> 297 * </ul> 298 * 299 * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 300 * {@link #MAPRED_REDUCE_TASK_ENV} 301 */ 302 @Deprecated 303 public static final String MAPRED_TASK_ENV = "mapred.child.env"; 304 305 /** 306 * Configuration key to set the environment of the child map tasks. 307 * 308 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 309 * reference existing environment variables via <code>$key</code> on 310 * Linux or <code>%key%</code> on Windows. 311 * 312 * Example: 313 * <ul> 314 * <li> A=foo - This will set the env variable A to foo. </li> 315 * </ul> 316 */ 317 public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV; 318 319 /** 320 * Configuration key to set the environment of the child reduce tasks. 321 * 322 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 323 * reference existing environment variables via <code>$key</code> on 324 * Linux or <code>%key%</code> on Windows. 325 * 326 * Example: 327 * <ul> 328 * <li> A=foo - This will set the env variable A to foo. </li> 329 * </ul> 330 */ 331 public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV; 332 333 private Credentials credentials = new Credentials(); 334 335 /** 336 * Configuration key to set the logging {@link Level} for the map task. 337 * 338 * The allowed logging levels are: 339 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 340 */ 341 public static final String MAPRED_MAP_TASK_LOG_LEVEL = 342 JobContext.MAP_LOG_LEVEL; 343 344 /** 345 * Configuration key to set the logging {@link Level} for the reduce task. 346 * 347 * The allowed logging levels are: 348 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 349 */ 350 public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 351 JobContext.REDUCE_LOG_LEVEL; 352 353 /** 354 * Default logging level for map/reduce tasks. 355 */ 356 public static final Level DEFAULT_LOG_LEVEL = Level.INFO; 357 358 /** 359 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 360 * use {@link MRJobConfig#WORKFLOW_ID} instead 361 */ 362 @Deprecated 363 public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID; 364 365 /** 366 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 367 * use {@link MRJobConfig#WORKFLOW_NAME} instead 368 */ 369 @Deprecated 370 public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME; 371 372 /** 373 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 374 * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead 375 */ 376 @Deprecated 377 public static final String WORKFLOW_NODE_NAME = 378 MRJobConfig.WORKFLOW_NODE_NAME; 379 380 /** 381 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 382 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead 383 */ 384 @Deprecated 385 public static final String WORKFLOW_ADJACENCY_PREFIX_STRING = 386 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING; 387 388 /** 389 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 390 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead 391 */ 392 @Deprecated 393 public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN = 394 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN; 395 396 /** 397 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 398 * use {@link MRJobConfig#WORKFLOW_TAGS} instead 399 */ 400 @Deprecated 401 public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS; 402 403 /** 404 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 405 * not use it 406 */ 407 @Deprecated 408 public static final String MAPREDUCE_RECOVER_JOB = 409 "mapreduce.job.restart.recover"; 410 411 /** 412 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 413 * not use it 414 */ 415 @Deprecated 416 public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true; 417 418 /** 419 * Construct a map/reduce job configuration. 420 */ 421 public JobConf() { 422 checkAndWarnDeprecation(); 423 } 424 425 /** 426 * Construct a map/reduce job configuration. 427 * 428 * @param exampleClass a class whose containing jar is used as the job's jar. 429 */ 430 public JobConf(Class exampleClass) { 431 setJarByClass(exampleClass); 432 checkAndWarnDeprecation(); 433 } 434 435 /** 436 * Construct a map/reduce job configuration. 437 * 438 * @param conf a Configuration whose settings will be inherited. 439 */ 440 public JobConf(Configuration conf) { 441 super(conf); 442 443 if (conf instanceof JobConf) { 444 JobConf that = (JobConf)conf; 445 credentials = that.credentials; 446 } 447 448 checkAndWarnDeprecation(); 449 } 450 451 452 /** Construct a map/reduce job configuration. 453 * 454 * @param conf a Configuration whose settings will be inherited. 455 * @param exampleClass a class whose containing jar is used as the job's jar. 456 */ 457 public JobConf(Configuration conf, Class exampleClass) { 458 this(conf); 459 setJarByClass(exampleClass); 460 } 461 462 463 /** Construct a map/reduce configuration. 464 * 465 * @param config a Configuration-format XML job description file. 466 */ 467 public JobConf(String config) { 468 this(new Path(config)); 469 } 470 471 /** Construct a map/reduce configuration. 472 * 473 * @param config a Configuration-format XML job description file. 474 */ 475 public JobConf(Path config) { 476 super(); 477 addResource(config); 478 checkAndWarnDeprecation(); 479 } 480 481 /** A new map/reduce configuration where the behavior of reading from the 482 * default resources can be turned off. 483 * <p> 484 * If the parameter {@code loadDefaults} is false, the new instance 485 * will not load resources from the default files. 486 * 487 * @param loadDefaults specifies whether to load from the default files 488 */ 489 public JobConf(boolean loadDefaults) { 490 super(loadDefaults); 491 checkAndWarnDeprecation(); 492 } 493 494 /** 495 * Get credentials for the job. 496 * @return credentials for the job 497 */ 498 public Credentials getCredentials() { 499 return credentials; 500 } 501 502 @Private 503 public void setCredentials(Credentials credentials) { 504 this.credentials = credentials; 505 } 506 507 /** 508 * Get the user jar for the map-reduce job. 509 * 510 * @return the user jar for the map-reduce job. 511 */ 512 public String getJar() { return get(JobContext.JAR); } 513 514 /** 515 * Set the user jar for the map-reduce job. 516 * 517 * @param jar the user jar for the map-reduce job. 518 */ 519 public void setJar(String jar) { set(JobContext.JAR, jar); } 520 521 /** 522 * Get the pattern for jar contents to unpack on the tasktracker 523 */ 524 public Pattern getJarUnpackPattern() { 525 return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT); 526 } 527 528 529 /** 530 * Set the job's jar file by finding an example class location. 531 * 532 * @param cls the example class. 533 */ 534 public void setJarByClass(Class cls) { 535 String jar = ClassUtil.findContainingJar(cls); 536 if (jar != null) { 537 setJar(jar); 538 } 539 } 540 541 public String[] getLocalDirs() throws IOException { 542 return getTrimmedStrings(MRConfig.LOCAL_DIR); 543 } 544 545 /** 546 * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead. 547 */ 548 @Deprecated 549 public void deleteLocalFiles() throws IOException { 550 String[] localDirs = getLocalDirs(); 551 for (int i = 0; i < localDirs.length; i++) { 552 FileSystem.getLocal(this).delete(new Path(localDirs[i]), true); 553 } 554 } 555 556 public void deleteLocalFiles(String subdir) throws IOException { 557 String[] localDirs = getLocalDirs(); 558 for (int i = 0; i < localDirs.length; i++) { 559 FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true); 560 } 561 } 562 563 /** 564 * Constructs a local file name. Files are distributed among configured 565 * local directories. 566 */ 567 public Path getLocalPath(String pathString) throws IOException { 568 return getLocalPath(MRConfig.LOCAL_DIR, pathString); 569 } 570 571 /** 572 * Get the reported username for this job. 573 * 574 * @return the username 575 */ 576 public String getUser() { 577 return get(JobContext.USER_NAME); 578 } 579 580 /** 581 * Set the reported username for this job. 582 * 583 * @param user the username for this job. 584 */ 585 public void setUser(String user) { 586 set(JobContext.USER_NAME, user); 587 } 588 589 590 591 /** 592 * Set whether the framework should keep the intermediate files for 593 * failed tasks. 594 * 595 * @param keep <code>true</code> if framework should keep the intermediate files 596 * for failed tasks, <code>false</code> otherwise. 597 * 598 */ 599 public void setKeepFailedTaskFiles(boolean keep) { 600 setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep); 601 } 602 603 /** 604 * Should the temporary files for failed tasks be kept? 605 * 606 * @return should the files be kept? 607 */ 608 public boolean getKeepFailedTaskFiles() { 609 return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false); 610 } 611 612 /** 613 * Set a regular expression for task names that should be kept. 614 * The regular expression ".*_m_000123_0" would keep the files 615 * for the first instance of map 123 that ran. 616 * 617 * @param pattern the java.util.regex.Pattern to match against the 618 * task names. 619 */ 620 public void setKeepTaskFilesPattern(String pattern) { 621 set(JobContext.PRESERVE_FILES_PATTERN, pattern); 622 } 623 624 /** 625 * Get the regular expression that is matched against the task names 626 * to see if we need to keep the files. 627 * 628 * @return the pattern as a string, if it was set, othewise null. 629 */ 630 public String getKeepTaskFilesPattern() { 631 return get(JobContext.PRESERVE_FILES_PATTERN); 632 } 633 634 /** 635 * Set the current working directory for the default file system. 636 * 637 * @param dir the new current working directory. 638 */ 639 public void setWorkingDirectory(Path dir) { 640 dir = new Path(getWorkingDirectory(), dir); 641 set(JobContext.WORKING_DIR, dir.toString()); 642 } 643 644 /** 645 * Get the current working directory for the default file system. 646 * 647 * @return the directory name. 648 */ 649 public Path getWorkingDirectory() { 650 String name = get(JobContext.WORKING_DIR); 651 if (name != null) { 652 return new Path(name); 653 } else { 654 try { 655 Path dir = FileSystem.get(this).getWorkingDirectory(); 656 set(JobContext.WORKING_DIR, dir.toString()); 657 return dir; 658 } catch (IOException e) { 659 throw new RuntimeException(e); 660 } 661 } 662 } 663 664 /** 665 * Sets the number of tasks that a spawned task JVM should run 666 * before it exits 667 * @param numTasks the number of tasks to execute; defaults to 1; 668 * -1 signifies no limit 669 */ 670 public void setNumTasksToExecutePerJvm(int numTasks) { 671 setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks); 672 } 673 674 /** 675 * Get the number of tasks that a spawned JVM should execute 676 */ 677 public int getNumTasksToExecutePerJvm() { 678 return getInt(JobContext.JVM_NUMTASKS_TORUN, 1); 679 } 680 681 /** 682 * Get the {@link InputFormat} implementation for the map-reduce job, 683 * defaults to {@link TextInputFormat} if not specified explicity. 684 * 685 * @return the {@link InputFormat} implementation for the map-reduce job. 686 */ 687 public InputFormat getInputFormat() { 688 return ReflectionUtils.newInstance(getClass("mapred.input.format.class", 689 TextInputFormat.class, 690 InputFormat.class), 691 this); 692 } 693 694 /** 695 * Set the {@link InputFormat} implementation for the map-reduce job. 696 * 697 * @param theClass the {@link InputFormat} implementation for the map-reduce 698 * job. 699 */ 700 public void setInputFormat(Class<? extends InputFormat> theClass) { 701 setClass("mapred.input.format.class", theClass, InputFormat.class); 702 } 703 704 /** 705 * Get the {@link OutputFormat} implementation for the map-reduce job, 706 * defaults to {@link TextOutputFormat} if not specified explicity. 707 * 708 * @return the {@link OutputFormat} implementation for the map-reduce job. 709 */ 710 public OutputFormat getOutputFormat() { 711 return ReflectionUtils.newInstance(getClass("mapred.output.format.class", 712 TextOutputFormat.class, 713 OutputFormat.class), 714 this); 715 } 716 717 /** 718 * Get the {@link OutputCommitter} implementation for the map-reduce job, 719 * defaults to {@link FileOutputCommitter} if not specified explicitly. 720 * 721 * @return the {@link OutputCommitter} implementation for the map-reduce job. 722 */ 723 public OutputCommitter getOutputCommitter() { 724 return (OutputCommitter)ReflectionUtils.newInstance( 725 getClass("mapred.output.committer.class", FileOutputCommitter.class, 726 OutputCommitter.class), this); 727 } 728 729 /** 730 * Set the {@link OutputCommitter} implementation for the map-reduce job. 731 * 732 * @param theClass the {@link OutputCommitter} implementation for the map-reduce 733 * job. 734 */ 735 public void setOutputCommitter(Class<? extends OutputCommitter> theClass) { 736 setClass("mapred.output.committer.class", theClass, OutputCommitter.class); 737 } 738 739 /** 740 * Set the {@link OutputFormat} implementation for the map-reduce job. 741 * 742 * @param theClass the {@link OutputFormat} implementation for the map-reduce 743 * job. 744 */ 745 public void setOutputFormat(Class<? extends OutputFormat> theClass) { 746 setClass("mapred.output.format.class", theClass, OutputFormat.class); 747 } 748 749 /** 750 * Should the map outputs be compressed before transfer? 751 * 752 * @param compress should the map outputs be compressed? 753 */ 754 public void setCompressMapOutput(boolean compress) { 755 setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress); 756 } 757 758 /** 759 * Are the outputs of the maps be compressed? 760 * 761 * @return <code>true</code> if the outputs of the maps are to be compressed, 762 * <code>false</code> otherwise. 763 */ 764 public boolean getCompressMapOutput() { 765 return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false); 766 } 767 768 /** 769 * Set the given class as the {@link CompressionCodec} for the map outputs. 770 * 771 * @param codecClass the {@link CompressionCodec} class that will compress 772 * the map outputs. 773 */ 774 public void 775 setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) { 776 setCompressMapOutput(true); 777 setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 778 CompressionCodec.class); 779 } 780 781 /** 782 * Get the {@link CompressionCodec} for compressing the map outputs. 783 * 784 * @param defaultValue the {@link CompressionCodec} to return if not set 785 * @return the {@link CompressionCodec} class that should be used to compress the 786 * map outputs. 787 * @throws IllegalArgumentException if the class was specified, but not found 788 */ 789 public Class<? extends CompressionCodec> 790 getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) { 791 Class<? extends CompressionCodec> codecClass = defaultValue; 792 String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC); 793 if (name != null) { 794 try { 795 codecClass = getClassByName(name).asSubclass(CompressionCodec.class); 796 } catch (ClassNotFoundException e) { 797 throw new IllegalArgumentException("Compression codec " + name + 798 " was not found.", e); 799 } 800 } 801 return codecClass; 802 } 803 804 /** 805 * Get the key class for the map output data. If it is not set, use the 806 * (final) output key class. This allows the map output key class to be 807 * different than the final output key class. 808 * 809 * @return the map output key class. 810 */ 811 public Class<?> getMapOutputKeyClass() { 812 Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class); 813 if (retv == null) { 814 retv = getOutputKeyClass(); 815 } 816 return retv; 817 } 818 819 /** 820 * Set the key class for the map output data. This allows the user to 821 * specify the map output key class to be different than the final output 822 * value class. 823 * 824 * @param theClass the map output key class. 825 */ 826 public void setMapOutputKeyClass(Class<?> theClass) { 827 setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class); 828 } 829 830 /** 831 * Get the value class for the map output data. If it is not set, use the 832 * (final) output value class This allows the map output value class to be 833 * different than the final output value class. 834 * 835 * @return the map output value class. 836 */ 837 public Class<?> getMapOutputValueClass() { 838 Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null, 839 Object.class); 840 if (retv == null) { 841 retv = getOutputValueClass(); 842 } 843 return retv; 844 } 845 846 /** 847 * Set the value class for the map output data. This allows the user to 848 * specify the map output value class to be different than the final output 849 * value class. 850 * 851 * @param theClass the map output value class. 852 */ 853 public void setMapOutputValueClass(Class<?> theClass) { 854 setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class); 855 } 856 857 /** 858 * Get the key class for the job output data. 859 * 860 * @return the key class for the job output data. 861 */ 862 public Class<?> getOutputKeyClass() { 863 return getClass(JobContext.OUTPUT_KEY_CLASS, 864 LongWritable.class, Object.class); 865 } 866 867 /** 868 * Set the key class for the job output data. 869 * 870 * @param theClass the key class for the job output data. 871 */ 872 public void setOutputKeyClass(Class<?> theClass) { 873 setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class); 874 } 875 876 /** 877 * Get the {@link RawComparator} comparator used to compare keys. 878 * 879 * @return the {@link RawComparator} comparator used to compare keys. 880 */ 881 public RawComparator getOutputKeyComparator() { 882 Class<? extends RawComparator> theClass = getClass( 883 JobContext.KEY_COMPARATOR, null, RawComparator.class); 884 if (theClass != null) 885 return ReflectionUtils.newInstance(theClass, this); 886 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this); 887 } 888 889 /** 890 * Set the {@link RawComparator} comparator used to compare keys. 891 * 892 * @param theClass the {@link RawComparator} comparator used to 893 * compare keys. 894 * @see #setOutputValueGroupingComparator(Class) 895 */ 896 public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) { 897 setClass(JobContext.KEY_COMPARATOR, 898 theClass, RawComparator.class); 899 } 900 901 /** 902 * Set the {@link KeyFieldBasedComparator} options used to compare keys. 903 * 904 * @param keySpec the key specification of the form -k pos1[,pos2], where, 905 * pos is of the form f[.c][opts], where f is the number 906 * of the key field to use, and c is the number of the first character from 907 * the beginning of the field. Fields and character posns are numbered 908 * starting with 1; a character position of zero in pos2 indicates the 909 * field's last character. If '.c' is omitted from pos1, it defaults to 1 910 * (the beginning of the field); if omitted from pos2, it defaults to 0 911 * (the end of the field). opts are ordering options. The supported options 912 * are: 913 * -n, (Sort numerically) 914 * -r, (Reverse the result of comparison) 915 */ 916 public void setKeyFieldComparatorOptions(String keySpec) { 917 setOutputKeyComparatorClass(KeyFieldBasedComparator.class); 918 set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec); 919 } 920 921 /** 922 * Get the {@link KeyFieldBasedComparator} options 923 */ 924 public String getKeyFieldComparatorOption() { 925 return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS); 926 } 927 928 /** 929 * Set the {@link KeyFieldBasedPartitioner} options used for 930 * {@link Partitioner} 931 * 932 * @param keySpec the key specification of the form -k pos1[,pos2], where, 933 * pos is of the form f[.c][opts], where f is the number 934 * of the key field to use, and c is the number of the first character from 935 * the beginning of the field. Fields and character posns are numbered 936 * starting with 1; a character position of zero in pos2 indicates the 937 * field's last character. If '.c' is omitted from pos1, it defaults to 1 938 * (the beginning of the field); if omitted from pos2, it defaults to 0 939 * (the end of the field). 940 */ 941 public void setKeyFieldPartitionerOptions(String keySpec) { 942 setPartitionerClass(KeyFieldBasedPartitioner.class); 943 set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec); 944 } 945 946 /** 947 * Get the {@link KeyFieldBasedPartitioner} options 948 */ 949 public String getKeyFieldPartitionerOption() { 950 return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS); 951 } 952 953 /** 954 * Get the user defined {@link WritableComparable} comparator for 955 * grouping keys of inputs to the combiner. 956 * 957 * @return comparator set by the user for grouping values. 958 * @see #setCombinerKeyGroupingComparator(Class) for details. 959 */ 960 public RawComparator getCombinerKeyGroupingComparator() { 961 Class<? extends RawComparator> theClass = getClass( 962 JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class); 963 if (theClass == null) { 964 return getOutputKeyComparator(); 965 } 966 967 return ReflectionUtils.newInstance(theClass, this); 968 } 969 970 /** 971 * Get the user defined {@link WritableComparable} comparator for 972 * grouping keys of inputs to the reduce. 973 * 974 * @return comparator set by the user for grouping values. 975 * @see #setOutputValueGroupingComparator(Class) for details. 976 */ 977 public RawComparator getOutputValueGroupingComparator() { 978 Class<? extends RawComparator> theClass = getClass( 979 JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class); 980 if (theClass == null) { 981 return getOutputKeyComparator(); 982 } 983 984 return ReflectionUtils.newInstance(theClass, this); 985 } 986 987 /** 988 * Set the user defined {@link RawComparator} comparator for 989 * grouping keys in the input to the combiner. 990 * 991 * <p>This comparator should be provided if the equivalence rules for keys 992 * for sorting the intermediates are different from those for grouping keys 993 * before each call to 994 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> 995 * 996 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed 997 * in a single call to the reduce function if K1 and K2 compare as equal.</p> 998 * 999 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 1000 * how keys are sorted, this can be used in conjunction to simulate 1001 * <i>secondary sort on values</i>.</p> 1002 * 1003 * <p><i>Note</i>: This is not a guarantee of the combiner sort being 1004 * <i>stable</i> in any sense. (In any case, with the order of available 1005 * map-outputs to the combiner being non-deterministic, it wouldn't make 1006 * that much sense.)</p> 1007 * 1008 * @param theClass the comparator class to be used for grouping keys for the 1009 * combiner. It should implement <code>RawComparator</code>. 1010 * @see #setOutputKeyComparatorClass(Class) 1011 */ 1012 public void setCombinerKeyGroupingComparator( 1013 Class<? extends RawComparator> theClass) { 1014 setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS, 1015 theClass, RawComparator.class); 1016 } 1017 1018 /** 1019 * Set the user defined {@link RawComparator} comparator for 1020 * grouping keys in the input to the reduce. 1021 * 1022 * <p>This comparator should be provided if the equivalence rules for keys 1023 * for sorting the intermediates are different from those for grouping keys 1024 * before each call to 1025 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> 1026 * 1027 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed 1028 * in a single call to the reduce function if K1 and K2 compare as equal.</p> 1029 * 1030 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 1031 * how keys are sorted, this can be used in conjunction to simulate 1032 * <i>secondary sort on values</i>.</p> 1033 * 1034 * <p><i>Note</i>: This is not a guarantee of the reduce sort being 1035 * <i>stable</i> in any sense. (In any case, with the order of available 1036 * map-outputs to the reduce being non-deterministic, it wouldn't make 1037 * that much sense.)</p> 1038 * 1039 * @param theClass the comparator class to be used for grouping keys. 1040 * It should implement <code>RawComparator</code>. 1041 * @see #setOutputKeyComparatorClass(Class) 1042 * @see #setCombinerKeyGroupingComparator(Class) 1043 */ 1044 public void setOutputValueGroupingComparator( 1045 Class<? extends RawComparator> theClass) { 1046 setClass(JobContext.GROUP_COMPARATOR_CLASS, 1047 theClass, RawComparator.class); 1048 } 1049 1050 /** 1051 * Should the framework use the new context-object code for running 1052 * the mapper? 1053 * @return true, if the new api should be used 1054 */ 1055 public boolean getUseNewMapper() { 1056 return getBoolean("mapred.mapper.new-api", false); 1057 } 1058 /** 1059 * Set whether the framework should use the new api for the mapper. 1060 * This is the default for jobs submitted with the new Job api. 1061 * @param flag true, if the new api should be used 1062 */ 1063 public void setUseNewMapper(boolean flag) { 1064 setBoolean("mapred.mapper.new-api", flag); 1065 } 1066 1067 /** 1068 * Should the framework use the new context-object code for running 1069 * the reducer? 1070 * @return true, if the new api should be used 1071 */ 1072 public boolean getUseNewReducer() { 1073 return getBoolean("mapred.reducer.new-api", false); 1074 } 1075 /** 1076 * Set whether the framework should use the new api for the reducer. 1077 * This is the default for jobs submitted with the new Job api. 1078 * @param flag true, if the new api should be used 1079 */ 1080 public void setUseNewReducer(boolean flag) { 1081 setBoolean("mapred.reducer.new-api", flag); 1082 } 1083 1084 /** 1085 * Get the value class for job outputs. 1086 * 1087 * @return the value class for job outputs. 1088 */ 1089 public Class<?> getOutputValueClass() { 1090 return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class); 1091 } 1092 1093 /** 1094 * Set the value class for job outputs. 1095 * 1096 * @param theClass the value class for job outputs. 1097 */ 1098 public void setOutputValueClass(Class<?> theClass) { 1099 setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class); 1100 } 1101 1102 /** 1103 * Get the {@link Mapper} class for the job. 1104 * 1105 * @return the {@link Mapper} class for the job. 1106 */ 1107 public Class<? extends Mapper> getMapperClass() { 1108 return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class); 1109 } 1110 1111 /** 1112 * Set the {@link Mapper} class for the job. 1113 * 1114 * @param theClass the {@link Mapper} class for the job. 1115 */ 1116 public void setMapperClass(Class<? extends Mapper> theClass) { 1117 setClass("mapred.mapper.class", theClass, Mapper.class); 1118 } 1119 1120 /** 1121 * Get the {@link MapRunnable} class for the job. 1122 * 1123 * @return the {@link MapRunnable} class for the job. 1124 */ 1125 public Class<? extends MapRunnable> getMapRunnerClass() { 1126 return getClass("mapred.map.runner.class", 1127 MapRunner.class, MapRunnable.class); 1128 } 1129 1130 /** 1131 * Expert: Set the {@link MapRunnable} class for the job. 1132 * 1133 * Typically used to exert greater control on {@link Mapper}s. 1134 * 1135 * @param theClass the {@link MapRunnable} class for the job. 1136 */ 1137 public void setMapRunnerClass(Class<? extends MapRunnable> theClass) { 1138 setClass("mapred.map.runner.class", theClass, MapRunnable.class); 1139 } 1140 1141 /** 1142 * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 1143 * to be sent to the {@link Reducer}s. 1144 * 1145 * @return the {@link Partitioner} used to partition map-outputs. 1146 */ 1147 public Class<? extends Partitioner> getPartitionerClass() { 1148 return getClass("mapred.partitioner.class", 1149 HashPartitioner.class, Partitioner.class); 1150 } 1151 1152 /** 1153 * Set the {@link Partitioner} class used to partition 1154 * {@link Mapper}-outputs to be sent to the {@link Reducer}s. 1155 * 1156 * @param theClass the {@link Partitioner} used to partition map-outputs. 1157 */ 1158 public void setPartitionerClass(Class<? extends Partitioner> theClass) { 1159 setClass("mapred.partitioner.class", theClass, Partitioner.class); 1160 } 1161 1162 /** 1163 * Get the {@link Reducer} class for the job. 1164 * 1165 * @return the {@link Reducer} class for the job. 1166 */ 1167 public Class<? extends Reducer> getReducerClass() { 1168 return getClass("mapred.reducer.class", 1169 IdentityReducer.class, Reducer.class); 1170 } 1171 1172 /** 1173 * Set the {@link Reducer} class for the job. 1174 * 1175 * @param theClass the {@link Reducer} class for the job. 1176 */ 1177 public void setReducerClass(Class<? extends Reducer> theClass) { 1178 setClass("mapred.reducer.class", theClass, Reducer.class); 1179 } 1180 1181 /** 1182 * Get the user-defined <i>combiner</i> class used to combine map-outputs 1183 * before being sent to the reducers. Typically the combiner is same as the 1184 * the {@link Reducer} for the job i.e. {@link #getReducerClass()}. 1185 * 1186 * @return the user-defined combiner class used to combine map-outputs. 1187 */ 1188 public Class<? extends Reducer> getCombinerClass() { 1189 return getClass("mapred.combiner.class", null, Reducer.class); 1190 } 1191 1192 /** 1193 * Set the user-defined <i>combiner</i> class used to combine map-outputs 1194 * before being sent to the reducers. 1195 * 1196 * <p>The combiner is an application-specified aggregation operation, which 1197 * can help cut down the amount of data transferred between the 1198 * {@link Mapper} and the {@link Reducer}, leading to better performance.</p> 1199 * 1200 * <p>The framework may invoke the combiner 0, 1, or multiple times, in both 1201 * the mapper and reducer tasks. In general, the combiner is called as the 1202 * sort/merge result is written to disk. The combiner must: 1203 * <ul> 1204 * <li> be side-effect free</li> 1205 * <li> have the same input and output key types and the same input and 1206 * output value types</li> 1207 * </ul> 1208 * 1209 * <p>Typically the combiner is same as the <code>Reducer</code> for the 1210 * job i.e. {@link #setReducerClass(Class)}.</p> 1211 * 1212 * @param theClass the user-defined combiner class used to combine 1213 * map-outputs. 1214 */ 1215 public void setCombinerClass(Class<? extends Reducer> theClass) { 1216 setClass("mapred.combiner.class", theClass, Reducer.class); 1217 } 1218 1219 /** 1220 * Should speculative execution be used for this job? 1221 * Defaults to <code>true</code>. 1222 * 1223 * @return <code>true</code> if speculative execution be used for this job, 1224 * <code>false</code> otherwise. 1225 */ 1226 public boolean getSpeculativeExecution() { 1227 return (getMapSpeculativeExecution() || getReduceSpeculativeExecution()); 1228 } 1229 1230 /** 1231 * Turn speculative execution on or off for this job. 1232 * 1233 * @param speculativeExecution <code>true</code> if speculative execution 1234 * should be turned on, else <code>false</code>. 1235 */ 1236 public void setSpeculativeExecution(boolean speculativeExecution) { 1237 setMapSpeculativeExecution(speculativeExecution); 1238 setReduceSpeculativeExecution(speculativeExecution); 1239 } 1240 1241 /** 1242 * Should speculative execution be used for this job for map tasks? 1243 * Defaults to <code>true</code>. 1244 * 1245 * @return <code>true</code> if speculative execution be 1246 * used for this job for map tasks, 1247 * <code>false</code> otherwise. 1248 */ 1249 public boolean getMapSpeculativeExecution() { 1250 return getBoolean(JobContext.MAP_SPECULATIVE, true); 1251 } 1252 1253 /** 1254 * Turn speculative execution on or off for this job for map tasks. 1255 * 1256 * @param speculativeExecution <code>true</code> if speculative execution 1257 * should be turned on for map tasks, 1258 * else <code>false</code>. 1259 */ 1260 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1261 setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution); 1262 } 1263 1264 /** 1265 * Should speculative execution be used for this job for reduce tasks? 1266 * Defaults to <code>true</code>. 1267 * 1268 * @return <code>true</code> if speculative execution be used 1269 * for reduce tasks for this job, 1270 * <code>false</code> otherwise. 1271 */ 1272 public boolean getReduceSpeculativeExecution() { 1273 return getBoolean(JobContext.REDUCE_SPECULATIVE, true); 1274 } 1275 1276 /** 1277 * Turn speculative execution on or off for this job for reduce tasks. 1278 * 1279 * @param speculativeExecution <code>true</code> if speculative execution 1280 * should be turned on for reduce tasks, 1281 * else <code>false</code>. 1282 */ 1283 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1284 setBoolean(JobContext.REDUCE_SPECULATIVE, 1285 speculativeExecution); 1286 } 1287 1288 /** 1289 * Get configured the number of reduce tasks for this job. 1290 * Defaults to <code>1</code>. 1291 * 1292 * @return the number of reduce tasks for this job. 1293 */ 1294 public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); } 1295 1296 /** 1297 * Set the number of map tasks for this job. 1298 * 1299 * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 1300 * number of spawned map tasks depends on the number of {@link InputSplit}s 1301 * generated by the job's {@link InputFormat#getSplits(JobConf, int)}. 1302 * 1303 * A custom {@link InputFormat} is typically used to accurately control 1304 * the number of map tasks for the job.</p> 1305 * 1306 * <b id="NoOfMaps">How many maps?</b> 1307 * 1308 * <p>The number of maps is usually driven by the total size of the inputs 1309 * i.e. total number of blocks of the input files.</p> 1310 * 1311 * <p>The right level of parallelism for maps seems to be around 10-100 maps 1312 * per-node, although it has been set up to 300 or so for very cpu-light map 1313 * tasks. Task setup takes awhile, so it is best if the maps take at least a 1314 * minute to execute.</p> 1315 * 1316 * <p>The default behavior of file-based {@link InputFormat}s is to split the 1317 * input into <i>logical</i> {@link InputSplit}s based on the total size, in 1318 * bytes, of input files. However, the {@link FileSystem} blocksize of the 1319 * input files is treated as an upper bound for input splits. A lower bound 1320 * on the split size can be set via 1321 * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize"> 1322 * mapreduce.input.fileinputformat.split.minsize</a>.</p> 1323 * 1324 * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 1325 * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 1326 * used to set it even higher.</p> 1327 * 1328 * @param n the number of map tasks for this job. 1329 * @see InputFormat#getSplits(JobConf, int) 1330 * @see FileInputFormat 1331 * @see FileSystem#getDefaultBlockSize() 1332 * @see FileStatus#getBlockSize() 1333 */ 1334 public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); } 1335 1336 /** 1337 * Get configured the number of reduce tasks for this job. Defaults to 1338 * <code>1</code>. 1339 * 1340 * @return the number of reduce tasks for this job. 1341 */ 1342 public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); } 1343 1344 /** 1345 * Set the requisite number of reduce tasks for this job. 1346 * 1347 * <b id="NoOfReduces">How many reduces?</b> 1348 * 1349 * <p>The right number of reduces seems to be <code>0.95</code> or 1350 * <code>1.75</code> multiplied by (<<i>no. of nodes</i>> * 1351 * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum"> 1352 * mapreduce.tasktracker.reduce.tasks.maximum</a>). 1353 * </p> 1354 * 1355 * <p>With <code>0.95</code> all of the reduces can launch immediately and 1356 * start transfering map outputs as the maps finish. With <code>1.75</code> 1357 * the faster nodes will finish their first round of reduces and launch a 1358 * second wave of reduces doing a much better job of load balancing.</p> 1359 * 1360 * <p>Increasing the number of reduces increases the framework overhead, but 1361 * increases load balancing and lowers the cost of failures.</p> 1362 * 1363 * <p>The scaling factors above are slightly less than whole numbers to 1364 * reserve a few reduce slots in the framework for speculative-tasks, failures 1365 * etc.</p> 1366 * 1367 * <b id="ReducerNone">Reducer NONE</b> 1368 * 1369 * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p> 1370 * 1371 * <p>In this case the output of the map-tasks directly go to distributed 1372 * file-system, to the path set by 1373 * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 1374 * framework doesn't sort the map-outputs before writing it out to HDFS.</p> 1375 * 1376 * @param n the number of reduce tasks for this job. 1377 */ 1378 public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); } 1379 1380 /** 1381 * Get the configured number of maximum attempts that will be made to run a 1382 * map task, as specified by the <code>mapreduce.map.maxattempts</code> 1383 * property. If this property is not already set, the default is 4 attempts. 1384 * 1385 * @return the max number of attempts per map task. 1386 */ 1387 public int getMaxMapAttempts() { 1388 return getInt(JobContext.MAP_MAX_ATTEMPTS, 4); 1389 } 1390 1391 /** 1392 * Expert: Set the number of maximum attempts that will be made to run a 1393 * map task. 1394 * 1395 * @param n the number of attempts per map task. 1396 */ 1397 public void setMaxMapAttempts(int n) { 1398 setInt(JobContext.MAP_MAX_ATTEMPTS, n); 1399 } 1400 1401 /** 1402 * Get the configured number of maximum attempts that will be made to run a 1403 * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code> 1404 * property. If this property is not already set, the default is 4 attempts. 1405 * 1406 * @return the max number of attempts per reduce task. 1407 */ 1408 public int getMaxReduceAttempts() { 1409 return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4); 1410 } 1411 /** 1412 * Expert: Set the number of maximum attempts that will be made to run a 1413 * reduce task. 1414 * 1415 * @param n the number of attempts per reduce task. 1416 */ 1417 public void setMaxReduceAttempts(int n) { 1418 setInt(JobContext.REDUCE_MAX_ATTEMPTS, n); 1419 } 1420 1421 /** 1422 * Get the user-specified job name. This is only used to identify the 1423 * job to the user. 1424 * 1425 * @return the job's name, defaulting to "". 1426 */ 1427 public String getJobName() { 1428 return get(JobContext.JOB_NAME, ""); 1429 } 1430 1431 /** 1432 * Set the user-specified job name. 1433 * 1434 * @param name the job's new name. 1435 */ 1436 public void setJobName(String name) { 1437 set(JobContext.JOB_NAME, name); 1438 } 1439 1440 /** 1441 * Get the user-specified session identifier. The default is the empty string. 1442 * 1443 * The session identifier is used to tag metric data that is reported to some 1444 * performance metrics system via the org.apache.hadoop.metrics API. The 1445 * session identifier is intended, in particular, for use by Hadoop-On-Demand 1446 * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 1447 * HOD will set the session identifier by modifying the mapred-site.xml file 1448 * before starting the cluster. 1449 * 1450 * When not running under HOD, this identifer is expected to remain set to 1451 * the empty string. 1452 * 1453 * @return the session identifier, defaulting to "". 1454 */ 1455 @Deprecated 1456 public String getSessionId() { 1457 return get("session.id", ""); 1458 } 1459 1460 /** 1461 * Set the user-specified session identifier. 1462 * 1463 * @param sessionId the new session id. 1464 */ 1465 @Deprecated 1466 public void setSessionId(String sessionId) { 1467 set("session.id", sessionId); 1468 } 1469 1470 /** 1471 * Set the maximum no. of failures of a given job per tasktracker. 1472 * If the no. of task failures exceeds <code>noFailures</code>, the 1473 * tasktracker is <i>blacklisted</i> for this job. 1474 * 1475 * @param noFailures maximum no. of failures of a given job per tasktracker. 1476 */ 1477 public void setMaxTaskFailuresPerTracker(int noFailures) { 1478 setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures); 1479 } 1480 1481 /** 1482 * Expert: Get the maximum no. of failures of a given job per tasktracker. 1483 * If the no. of task failures exceeds this, the tasktracker is 1484 * <i>blacklisted</i> for this job. 1485 * 1486 * @return the maximum no. of failures of a given job per tasktracker. 1487 */ 1488 public int getMaxTaskFailuresPerTracker() { 1489 return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3); 1490 } 1491 1492 /** 1493 * Get the maximum percentage of map tasks that can fail without 1494 * the job being aborted. 1495 * 1496 * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 1497 * attempts before being declared as <i>failed</i>. 1498 * 1499 * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in 1500 * the job being declared as {@link JobStatus#FAILED}. 1501 * 1502 * @return the maximum percentage of map tasks that can fail without 1503 * the job being aborted. 1504 */ 1505 public int getMaxMapTaskFailuresPercent() { 1506 return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0); 1507 } 1508 1509 /** 1510 * Expert: Set the maximum percentage of map tasks that can fail without the 1511 * job being aborted. 1512 * 1513 * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 1514 * before being declared as <i>failed</i>. 1515 * 1516 * @param percent the maximum percentage of map tasks that can fail without 1517 * the job being aborted. 1518 */ 1519 public void setMaxMapTaskFailuresPercent(int percent) { 1520 setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent); 1521 } 1522 1523 /** 1524 * Get the maximum percentage of reduce tasks that can fail without 1525 * the job being aborted. 1526 * 1527 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1528 * attempts before being declared as <i>failed</i>. 1529 * 1530 * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 1531 * in the job being declared as {@link JobStatus#FAILED}. 1532 * 1533 * @return the maximum percentage of reduce tasks that can fail without 1534 * the job being aborted. 1535 */ 1536 public int getMaxReduceTaskFailuresPercent() { 1537 return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0); 1538 } 1539 1540 /** 1541 * Set the maximum percentage of reduce tasks that can fail without the job 1542 * being aborted. 1543 * 1544 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1545 * attempts before being declared as <i>failed</i>. 1546 * 1547 * @param percent the maximum percentage of reduce tasks that can fail without 1548 * the job being aborted. 1549 */ 1550 public void setMaxReduceTaskFailuresPercent(int percent) { 1551 setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent); 1552 } 1553 1554 /** 1555 * Set {@link JobPriority} for this job. 1556 * 1557 * @param prio the {@link JobPriority} for this job. 1558 */ 1559 public void setJobPriority(JobPriority prio) { 1560 set(JobContext.PRIORITY, prio.toString()); 1561 } 1562 1563 /** 1564 * Set {@link JobPriority} for this job. 1565 * 1566 * @param prio the {@link JobPriority} for this job. 1567 */ 1568 public void setJobPriorityAsInteger(int prio) { 1569 set(JobContext.PRIORITY, Integer.toString(prio)); 1570 } 1571 1572 /** 1573 * Get the {@link JobPriority} for this job. 1574 * 1575 * @return the {@link JobPriority} for this job. 1576 */ 1577 public JobPriority getJobPriority() { 1578 String prio = get(JobContext.PRIORITY); 1579 if (prio == null) { 1580 return JobPriority.DEFAULT; 1581 } 1582 1583 JobPriority priority = JobPriority.DEFAULT; 1584 try { 1585 priority = JobPriority.valueOf(prio); 1586 } catch (IllegalArgumentException e) { 1587 return convertToJobPriority(Integer.parseInt(prio)); 1588 } 1589 return priority; 1590 } 1591 1592 /** 1593 * Get the priority for this job. 1594 * 1595 * @return the priority for this job. 1596 */ 1597 public int getJobPriorityAsInteger() { 1598 String priority = get(JobContext.PRIORITY); 1599 if (priority == null) { 1600 return 0; 1601 } 1602 1603 int jobPriority = 0; 1604 try { 1605 jobPriority = convertPriorityToInteger(priority); 1606 } catch (IllegalArgumentException e) { 1607 return Integer.parseInt(priority); 1608 } 1609 return jobPriority; 1610 } 1611 1612 private int convertPriorityToInteger(String priority) { 1613 JobPriority jobPriority = JobPriority.valueOf(priority); 1614 switch (jobPriority) { 1615 case VERY_HIGH : 1616 return 5; 1617 case HIGH : 1618 return 4; 1619 case NORMAL : 1620 return 3; 1621 case LOW : 1622 return 2; 1623 case VERY_LOW : 1624 return 1; 1625 case DEFAULT : 1626 return 0; 1627 default: 1628 break; 1629 } 1630 1631 // If a user sets the priority as "UNDEFINED_PRIORITY", we can return 1632 // 0 which is also default value. 1633 return 0; 1634 } 1635 1636 private JobPriority convertToJobPriority(int priority) { 1637 switch (priority) { 1638 case 5 : 1639 return JobPriority.VERY_HIGH; 1640 case 4 : 1641 return JobPriority.HIGH; 1642 case 3 : 1643 return JobPriority.NORMAL; 1644 case 2 : 1645 return JobPriority.LOW; 1646 case 1 : 1647 return JobPriority.VERY_LOW; 1648 case 0 : 1649 return JobPriority.DEFAULT; 1650 default: 1651 break; 1652 } 1653 1654 return JobPriority.UNDEFINED_PRIORITY; 1655 } 1656 1657 /** 1658 * Set JobSubmitHostName for this job. 1659 * 1660 * @param hostname the JobSubmitHostName for this job. 1661 */ 1662 void setJobSubmitHostName(String hostname) { 1663 set(MRJobConfig.JOB_SUBMITHOST, hostname); 1664 } 1665 1666 /** 1667 * Get the JobSubmitHostName for this job. 1668 * 1669 * @return the JobSubmitHostName for this job. 1670 */ 1671 String getJobSubmitHostName() { 1672 String hostname = get(MRJobConfig.JOB_SUBMITHOST); 1673 1674 return hostname; 1675 } 1676 1677 /** 1678 * Set JobSubmitHostAddress for this job. 1679 * 1680 * @param hostadd the JobSubmitHostAddress for this job. 1681 */ 1682 void setJobSubmitHostAddress(String hostadd) { 1683 set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd); 1684 } 1685 1686 /** 1687 * Get JobSubmitHostAddress for this job. 1688 * 1689 * @return JobSubmitHostAddress for this job. 1690 */ 1691 String getJobSubmitHostAddress() { 1692 String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR); 1693 1694 return hostadd; 1695 } 1696 1697 /** 1698 * Get whether the task profiling is enabled. 1699 * @return true if some tasks will be profiled 1700 */ 1701 public boolean getProfileEnabled() { 1702 return getBoolean(JobContext.TASK_PROFILE, false); 1703 } 1704 1705 /** 1706 * Set whether the system should collect profiler information for some of 1707 * the tasks in this job? The information is stored in the user log 1708 * directory. 1709 * @param newValue true means it should be gathered 1710 */ 1711 public void setProfileEnabled(boolean newValue) { 1712 setBoolean(JobContext.TASK_PROFILE, newValue); 1713 } 1714 1715 /** 1716 * Get the profiler configuration arguments. 1717 * 1718 * The default value for this property is 1719 * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s" 1720 * 1721 * @return the parameters to pass to the task child to configure profiling 1722 */ 1723 public String getProfileParams() { 1724 return get(JobContext.TASK_PROFILE_PARAMS, 1725 MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS); 1726 } 1727 1728 /** 1729 * Set the profiler configuration arguments. If the string contains a '%s' it 1730 * will be replaced with the name of the profiling output file when the task 1731 * runs. 1732 * 1733 * This value is passed to the task child JVM on the command line. 1734 * 1735 * @param value the configuration string 1736 */ 1737 public void setProfileParams(String value) { 1738 set(JobContext.TASK_PROFILE_PARAMS, value); 1739 } 1740 1741 /** 1742 * Get the range of maps or reduces to profile. 1743 * @param isMap is the task a map? 1744 * @return the task ranges 1745 */ 1746 public IntegerRanges getProfileTaskRange(boolean isMap) { 1747 return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 1748 JobContext.NUM_REDUCE_PROFILES), "0-2"); 1749 } 1750 1751 /** 1752 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1753 * must also be called. 1754 * @param newValue a set of integer ranges of the map ids 1755 */ 1756 public void setProfileTaskRange(boolean isMap, String newValue) { 1757 // parse the value to make sure it is legal 1758 new Configuration.IntegerRanges(newValue); 1759 set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 1760 newValue); 1761 } 1762 1763 /** 1764 * Set the debug script to run when the map tasks fail. 1765 * 1766 * <p>The debug script can aid debugging of failed map tasks. The script is 1767 * given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1768 * 1769 * <p>The debug command, run on the node where the map failed, is:</p> 1770 * <p><blockquote><pre> 1771 * $script $stdout $stderr $syslog $jobconf. 1772 * </pre></blockquote> 1773 * 1774 * <p> The script file is distributed through {@link DistributedCache} 1775 * APIs. The script needs to be symlinked. </p> 1776 * 1777 * <p>Here is an example on how to submit a script 1778 * <p><blockquote><pre> 1779 * job.setMapDebugScript("./myscript"); 1780 * DistributedCache.createSymlink(job); 1781 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1782 * </pre></blockquote> 1783 * 1784 * @param mDbgScript the script name 1785 */ 1786 public void setMapDebugScript(String mDbgScript) { 1787 set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript); 1788 } 1789 1790 /** 1791 * Get the map task's debug script. 1792 * 1793 * @return the debug Script for the mapred job for failed map tasks. 1794 * @see #setMapDebugScript(String) 1795 */ 1796 public String getMapDebugScript() { 1797 return get(JobContext.MAP_DEBUG_SCRIPT); 1798 } 1799 1800 /** 1801 * Set the debug script to run when the reduce tasks fail. 1802 * 1803 * <p>The debug script can aid debugging of failed reduce tasks. The script 1804 * is given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1805 * 1806 * <p>The debug command, run on the node where the map failed, is:</p> 1807 * <p><blockquote><pre> 1808 * $script $stdout $stderr $syslog $jobconf. 1809 * </pre></blockquote> 1810 * 1811 * <p> The script file is distributed through {@link DistributedCache} 1812 * APIs. The script file needs to be symlinked </p> 1813 * 1814 * <p>Here is an example on how to submit a script 1815 * <p><blockquote><pre> 1816 * job.setReduceDebugScript("./myscript"); 1817 * DistributedCache.createSymlink(job); 1818 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1819 * </pre></blockquote> 1820 * 1821 * @param rDbgScript the script name 1822 */ 1823 public void setReduceDebugScript(String rDbgScript) { 1824 set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript); 1825 } 1826 1827 /** 1828 * Get the reduce task's debug Script 1829 * 1830 * @return the debug script for the mapred job for failed reduce tasks. 1831 * @see #setReduceDebugScript(String) 1832 */ 1833 public String getReduceDebugScript() { 1834 return get(JobContext.REDUCE_DEBUG_SCRIPT); 1835 } 1836 1837 /** 1838 * Get the uri to be invoked in-order to send a notification after the job 1839 * has completed (success/failure). 1840 * 1841 * @return the job end notification uri, <code>null</code> if it hasn't 1842 * been set. 1843 * @see #setJobEndNotificationURI(String) 1844 */ 1845 public String getJobEndNotificationURI() { 1846 return get(JobContext.MR_JOB_END_NOTIFICATION_URL); 1847 } 1848 1849 /** 1850 * Set the uri to be invoked in-order to send a notification after the job 1851 * has completed (success/failure). 1852 * 1853 * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 1854 * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 1855 * identifier and completion-status respectively.</p> 1856 * 1857 * <p>This is typically used by application-writers to implement chaining of 1858 * Map-Reduce jobs in an <i>asynchronous manner</i>.</p> 1859 * 1860 * @param uri the job end notification uri 1861 * @see JobStatus 1862 */ 1863 public void setJobEndNotificationURI(String uri) { 1864 set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri); 1865 } 1866 1867 /** 1868 * Get job-specific shared directory for use as scratch space 1869 * 1870 * <p> 1871 * When a job starts, a shared directory is created at location 1872 * <code> 1873 * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>. 1874 * This directory is exposed to the users through 1875 * <code>mapreduce.job.local.dir </code>. 1876 * So, the tasks can use this space 1877 * as scratch space and share files among them. </p> 1878 * This value is available as System property also. 1879 * 1880 * @return The localized job specific shared directory 1881 */ 1882 public String getJobLocalDir() { 1883 return get(JobContext.JOB_LOCAL_DIR); 1884 } 1885 1886 /** 1887 * Get memory required to run a map task of the job, in MB. 1888 * 1889 * If a value is specified in the configuration, it is returned. 1890 * Else, it returns {@link JobContext#DEFAULT_MAP_MEMORY_MB}. 1891 * <p> 1892 * For backward compatibility, if the job configuration sets the 1893 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1894 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1895 * after converting it from bytes to MB. 1896 * @return memory required to run a map task of the job, in MB, 1897 */ 1898 public long getMemoryForMapTask() { 1899 long value = getDeprecatedMemoryValue(); 1900 if (value < 0) { 1901 return getMemoryRequired(TaskType.MAP); 1902 } 1903 return value; 1904 } 1905 1906 public void setMemoryForMapTask(long mem) { 1907 setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem); 1908 // In case that M/R 1.x applications use the old property name 1909 setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem); 1910 } 1911 1912 /** 1913 * Get memory required to run a reduce task of the job, in MB. 1914 * 1915 * If a value is specified in the configuration, it is returned. 1916 * Else, it returns {@link JobContext#DEFAULT_REDUCE_MEMORY_MB}. 1917 * <p> 1918 * For backward compatibility, if the job configuration sets the 1919 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1920 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1921 * after converting it from bytes to MB. 1922 * @return memory required to run a reduce task of the job, in MB. 1923 */ 1924 public long getMemoryForReduceTask() { 1925 long value = getDeprecatedMemoryValue(); 1926 if (value < 0) { 1927 return getMemoryRequired(TaskType.REDUCE); 1928 } 1929 return value; 1930 } 1931 1932 // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY, 1933 // converted into MBs. 1934 // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative 1935 // value. 1936 private long getDeprecatedMemoryValue() { 1937 long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 1938 DISABLED_MEMORY_LIMIT); 1939 if (oldValue > 0) { 1940 oldValue /= (1024*1024); 1941 } 1942 return oldValue; 1943 } 1944 1945 public void setMemoryForReduceTask(long mem) { 1946 setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); 1947 // In case that M/R 1.x applications use the old property name 1948 setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); 1949 } 1950 1951 /** 1952 * Return the name of the queue to which this job is submitted. 1953 * Defaults to 'default'. 1954 * 1955 * @return name of the queue 1956 */ 1957 public String getQueueName() { 1958 return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME); 1959 } 1960 1961 /** 1962 * Set the name of the queue to which this job should be submitted. 1963 * 1964 * @param queueName Name of the queue 1965 */ 1966 public void setQueueName(String queueName) { 1967 set(JobContext.QUEUE_NAME, queueName); 1968 } 1969 1970 /** 1971 * Normalize the negative values in configuration 1972 * 1973 * @param val 1974 * @return normalized value 1975 */ 1976 public static long normalizeMemoryConfigValue(long val) { 1977 if (val < 0) { 1978 val = DISABLED_MEMORY_LIMIT; 1979 } 1980 return val; 1981 } 1982 1983 /** 1984 * Find a jar that contains a class of the same name, if any. 1985 * It will return a jar file, even if that is not the first thing 1986 * on the class path that has a class with the same name. 1987 * 1988 * @param my_class the class to find. 1989 * @return a jar file that contains the class, or null. 1990 */ 1991 public static String findContainingJar(Class my_class) { 1992 return ClassUtil.findContainingJar(my_class); 1993 } 1994 1995 /** 1996 * Get the memory required to run a task of this job, in bytes. See 1997 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 1998 * <p> 1999 * This method is deprecated. Now, different memory limits can be 2000 * set for map and reduce tasks of a job, in MB. 2001 * <p> 2002 * For backward compatibility, if the job configuration sets the 2003 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY}, that value is returned. 2004 * Otherwise, this method will return the larger of the values returned by 2005 * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()} 2006 * after converting them into bytes. 2007 * 2008 * @return Memory required to run a task of this job, in bytes. 2009 * @see #setMaxVirtualMemoryForTask(long) 2010 * @deprecated Use {@link #getMemoryForMapTask()} and 2011 * {@link #getMemoryForReduceTask()} 2012 */ 2013 @Deprecated 2014 public long getMaxVirtualMemoryForTask() { 2015 LOG.warn( 2016 "getMaxVirtualMemoryForTask() is deprecated. " + 2017 "Instead use getMemoryForMapTask() and getMemoryForReduceTask()"); 2018 2019 long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 2020 Math.max(getMemoryForMapTask(), getMemoryForReduceTask()) * 1024 * 1024); 2021 return value; 2022 } 2023 2024 /** 2025 * Set the maximum amount of memory any task of this job can use. See 2026 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 2027 * <p> 2028 * mapred.task.maxvmem is split into 2029 * mapreduce.map.memory.mb 2030 * and mapreduce.map.memory.mb,mapred 2031 * each of the new key are set 2032 * as mapred.task.maxvmem / 1024 2033 * as new values are in MB 2034 * 2035 * @param vmem Maximum amount of virtual memory in bytes any task of this job 2036 * can use. 2037 * @see #getMaxVirtualMemoryForTask() 2038 * @deprecated 2039 * Use {@link #setMemoryForMapTask(long mem)} and 2040 * Use {@link #setMemoryForReduceTask(long mem)} 2041 */ 2042 @Deprecated 2043 public void setMaxVirtualMemoryForTask(long vmem) { 2044 LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+ 2045 "Instead use setMemoryForMapTask() and setMemoryForReduceTask()"); 2046 if (vmem < 0) { 2047 throw new IllegalArgumentException("Task memory allocation may not be < 0"); 2048 } 2049 2050 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) { 2051 setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb 2052 setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb 2053 }else{ 2054 this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem); 2055 } 2056 } 2057 2058 /** 2059 * @deprecated this variable is deprecated and nolonger in use. 2060 */ 2061 @Deprecated 2062 public long getMaxPhysicalMemoryForTask() { 2063 LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated." 2064 + " Refer to the APIs getMemoryForMapTask() and" 2065 + " getMemoryForReduceTask() for details."); 2066 return -1; 2067 } 2068 2069 /* 2070 * @deprecated this 2071 */ 2072 @Deprecated 2073 public void setMaxPhysicalMemoryForTask(long mem) { 2074 LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated." 2075 + " The value set is ignored. Refer to " 2076 + " setMemoryForMapTask() and setMemoryForReduceTask() for details."); 2077 } 2078 2079 static String deprecatedString(String key) { 2080 return "The variable " + key + " is no longer used."; 2081 } 2082 2083 private void checkAndWarnDeprecation() { 2084 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) { 2085 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) 2086 + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY 2087 + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY); 2088 } 2089 if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) { 2090 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT)); 2091 } 2092 if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) { 2093 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT)); 2094 } 2095 if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) { 2096 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT)); 2097 } 2098 } 2099 2100 private String getConfiguredTaskJavaOpts(TaskType taskType) { 2101 String userClasspath = ""; 2102 String adminClasspath = ""; 2103 if (taskType == TaskType.MAP) { 2104 userClasspath = get(MAPRED_MAP_TASK_JAVA_OPTS, 2105 get(MAPRED_TASK_JAVA_OPTS, DEFAULT_MAPRED_TASK_JAVA_OPTS)); 2106 adminClasspath = get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, 2107 MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS); 2108 } else { 2109 userClasspath = get(MAPRED_REDUCE_TASK_JAVA_OPTS, 2110 get(MAPRED_TASK_JAVA_OPTS, DEFAULT_MAPRED_TASK_JAVA_OPTS)); 2111 adminClasspath = get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, 2112 MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS); 2113 } 2114 2115 return adminClasspath + " " + userClasspath; 2116 } 2117 2118 @Private 2119 public String getTaskJavaOpts(TaskType taskType) { 2120 String javaOpts = getConfiguredTaskJavaOpts(taskType); 2121 2122 if (!javaOpts.contains("-Xmx")) { 2123 float heapRatio = getFloat(MRJobConfig.HEAP_MEMORY_MB_RATIO, 2124 MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO); 2125 2126 if (heapRatio > 1.0f || heapRatio < 0) { 2127 LOG.warn("Invalid value for " + MRJobConfig.HEAP_MEMORY_MB_RATIO 2128 + ", using the default."); 2129 heapRatio = MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO; 2130 } 2131 2132 int taskContainerMb = getMemoryRequired(taskType); 2133 int taskHeapSize = (int)Math.ceil(taskContainerMb * heapRatio); 2134 2135 String xmxArg = String.format("-Xmx%dm", taskHeapSize); 2136 LOG.info("Task java-opts do not specify heap size. Setting task attempt" + 2137 " jvm max heap size to " + xmxArg); 2138 2139 javaOpts += " " + xmxArg; 2140 } 2141 2142 return javaOpts; 2143 } 2144 2145 /** 2146 * Parse the Maximum heap size from the java opts as specified by the -Xmx option 2147 * Format: -Xmx<size>[g|G|m|M|k|K] 2148 * @param javaOpts String to parse to read maximum heap size 2149 * @return Maximum heap size in MB or -1 if not specified 2150 */ 2151 @Private 2152 @VisibleForTesting 2153 public static int parseMaximumHeapSizeMB(String javaOpts) { 2154 // Find the last matching -Xmx following word boundaries 2155 Matcher m = JAVA_OPTS_XMX_PATTERN.matcher(javaOpts); 2156 if (m.matches()) { 2157 long size = Long.parseLong(m.group(1)); 2158 if (size <= 0) { 2159 return -1; 2160 } 2161 if (m.group(2).isEmpty()) { 2162 // -Xmx specified in bytes 2163 return (int) (size / (1024 * 1024)); 2164 } 2165 char unit = m.group(2).charAt(0); 2166 switch (unit) { 2167 case 'g': 2168 case 'G': 2169 // -Xmx specified in GB 2170 return (int) (size * 1024); 2171 case 'm': 2172 case 'M': 2173 // -Xmx specified in MB 2174 return (int) size; 2175 case 'k': 2176 case 'K': 2177 // -Xmx specified in KB 2178 return (int) (size / 1024); 2179 } 2180 } 2181 // -Xmx not specified 2182 return -1; 2183 } 2184 2185 private int getMemoryRequiredHelper( 2186 String configName, int defaultValue, int heapSize, float heapRatio) { 2187 int memory = getInt(configName, -1); 2188 if (memory <= 0) { 2189 if (heapSize > 0) { 2190 memory = (int) Math.ceil(heapSize / heapRatio); 2191 LOG.info("Figured value for " + configName + " from javaOpts"); 2192 } else { 2193 memory = defaultValue; 2194 } 2195 } 2196 2197 return memory; 2198 } 2199 2200 @Private 2201 public int getMemoryRequired(TaskType taskType) { 2202 int memory = 1024; 2203 int heapSize = parseMaximumHeapSizeMB(getConfiguredTaskJavaOpts(taskType)); 2204 float heapRatio = getFloat(MRJobConfig.HEAP_MEMORY_MB_RATIO, 2205 MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO); 2206 if (taskType == TaskType.MAP) { 2207 return getMemoryRequiredHelper(MRJobConfig.MAP_MEMORY_MB, 2208 MRJobConfig.DEFAULT_MAP_MEMORY_MB, heapSize, heapRatio); 2209 } else if (taskType == TaskType.REDUCE) { 2210 return getMemoryRequiredHelper(MRJobConfig.REDUCE_MEMORY_MB, 2211 MRJobConfig.DEFAULT_REDUCE_MEMORY_MB, heapSize, heapRatio); 2212 } else { 2213 return memory; 2214 } 2215 } 2216 2217 /* For debugging. Dump configurations to system output as XML format. */ 2218 public static void main(String[] args) throws Exception { 2219 new JobConf(new Configuration()).writeXml(System.out); 2220 } 2221 2222} 2223