001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.HashSet; 026import java.util.IdentityHashMap; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.concurrent.TimeUnit; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.apache.hadoop.classification.InterfaceAudience; 036import org.apache.hadoop.classification.InterfaceStability; 037import org.apache.hadoop.fs.BlockLocation; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.LocatedFileStatus; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.fs.PathFilter; 043import org.apache.hadoop.fs.RemoteIterator; 044import org.apache.hadoop.mapreduce.security.TokenCache; 045import org.apache.hadoop.net.NetworkTopology; 046import org.apache.hadoop.net.Node; 047import org.apache.hadoop.net.NodeBase; 048import org.apache.hadoop.util.ReflectionUtils; 049import org.apache.hadoop.util.StopWatch; 050import org.apache.hadoop.util.StringUtils; 051 052import com.google.common.collect.Iterables; 053 054/** 055 * A base class for file-based {@link InputFormat}. 056 * 057 * <p><code>FileInputFormat</code> is the base class for all file-based 058 * <code>InputFormat</code>s. This provides a generic implementation of 059 * {@link #getSplits(JobConf, int)}. 060 * 061 * Implementations of <code>FileInputFormat</code> can also override the 062 * {@link #isSplitable(FileSystem, Path)} method to prevent input files 063 * from being split-up in certain situations. Implementations that may 064 * deal with non-splittable files <i>must</i> override this method, since 065 * the default implementation assumes splitting is always possible. 066 */ 067@InterfaceAudience.Public 068@InterfaceStability.Stable 069public abstract class FileInputFormat<K, V> implements InputFormat<K, V> { 070 071 public static final Log LOG = 072 LogFactory.getLog(FileInputFormat.class); 073 074 @Deprecated 075 public static enum Counter { 076 BYTES_READ 077 } 078 079 public static final String NUM_INPUT_FILES = 080 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES; 081 082 public static final String INPUT_DIR_RECURSIVE = 083 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE; 084 085 086 private static final double SPLIT_SLOP = 1.1; // 10% slop 087 088 private long minSplitSize = 1; 089 private static final PathFilter hiddenFileFilter = new PathFilter(){ 090 public boolean accept(Path p){ 091 String name = p.getName(); 092 return !name.startsWith("_") && !name.startsWith("."); 093 } 094 }; 095 protected void setMinSplitSize(long minSplitSize) { 096 this.minSplitSize = minSplitSize; 097 } 098 099 /** 100 * Proxy PathFilter that accepts a path only if all filters given in the 101 * constructor do. Used by the listPaths() to apply the built-in 102 * hiddenFileFilter together with a user provided one (if any). 103 */ 104 private static class MultiPathFilter implements PathFilter { 105 private List<PathFilter> filters; 106 107 public MultiPathFilter(List<PathFilter> filters) { 108 this.filters = filters; 109 } 110 111 public boolean accept(Path path) { 112 for (PathFilter filter : filters) { 113 if (!filter.accept(path)) { 114 return false; 115 } 116 } 117 return true; 118 } 119 } 120 121 /** 122 * Is the given filename splittable? Usually, true, but if the file is 123 * stream compressed, it will not be. 124 * 125 * The default implementation in <code>FileInputFormat</code> always returns 126 * true. Implementations that may deal with non-splittable files <i>must</i> 127 * override this method. 128 * 129 * <code>FileInputFormat</code> implementations can override this and return 130 * <code>false</code> to ensure that individual input files are never split-up 131 * so that {@link Mapper}s process entire files. 132 * 133 * @param fs the file system that the file is on 134 * @param filename the file name to check 135 * @return is this file splitable? 136 */ 137 protected boolean isSplitable(FileSystem fs, Path filename) { 138 return true; 139 } 140 141 public abstract RecordReader<K, V> getRecordReader(InputSplit split, 142 JobConf job, 143 Reporter reporter) 144 throws IOException; 145 146 /** 147 * Set a PathFilter to be applied to the input paths for the map-reduce job. 148 * 149 * @param filter the PathFilter class use for filtering the input paths. 150 */ 151 public static void setInputPathFilter(JobConf conf, 152 Class<? extends PathFilter> filter) { 153 conf.setClass(org.apache.hadoop.mapreduce.lib.input. 154 FileInputFormat.PATHFILTER_CLASS, filter, PathFilter.class); 155 } 156 157 /** 158 * Get a PathFilter instance of the filter set for the input paths. 159 * 160 * @return the PathFilter instance set for the job, NULL if none has been set. 161 */ 162 public static PathFilter getInputPathFilter(JobConf conf) { 163 Class<? extends PathFilter> filterClass = conf.getClass( 164 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS, 165 null, PathFilter.class); 166 return (filterClass != null) ? 167 ReflectionUtils.newInstance(filterClass, conf) : null; 168 } 169 170 /** 171 * Add files in the input path recursively into the results. 172 * @param result 173 * The List to store all files. 174 * @param fs 175 * The FileSystem. 176 * @param path 177 * The input path. 178 * @param inputFilter 179 * The input filter that can be used to filter files/dirs. 180 * @throws IOException 181 */ 182 protected void addInputPathRecursively(List<FileStatus> result, 183 FileSystem fs, Path path, PathFilter inputFilter) 184 throws IOException { 185 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); 186 while (iter.hasNext()) { 187 LocatedFileStatus stat = iter.next(); 188 if (inputFilter.accept(stat.getPath())) { 189 if (stat.isDirectory()) { 190 addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 191 } else { 192 result.add(stat); 193 } 194 } 195 } 196 } 197 198 /** List input directories. 199 * Subclasses may override to, e.g., select only files matching a regular 200 * expression. 201 * 202 * @param job the job to list input paths for 203 * @return array of FileStatus objects 204 * @throws IOException if zero items. 205 */ 206 protected FileStatus[] listStatus(JobConf job) throws IOException { 207 Path[] dirs = getInputPaths(job); 208 if (dirs.length == 0) { 209 throw new IOException("No input paths specified in job"); 210 } 211 212 // get tokens for all the required FileSystems.. 213 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); 214 215 // Whether we need to recursive look into the directory structure 216 boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); 217 218 // creates a MultiPathFilter with the hiddenFileFilter and the 219 // user provided one (if any). 220 List<PathFilter> filters = new ArrayList<PathFilter>(); 221 filters.add(hiddenFileFilter); 222 PathFilter jobFilter = getInputPathFilter(job); 223 if (jobFilter != null) { 224 filters.add(jobFilter); 225 } 226 PathFilter inputFilter = new MultiPathFilter(filters); 227 228 FileStatus[] result; 229 int numThreads = job 230 .getInt( 231 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS, 232 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS); 233 234 StopWatch sw = new StopWatch().start(); 235 if (numThreads == 1) { 236 List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive); 237 result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]); 238 } else { 239 Iterable<FileStatus> locatedFiles = null; 240 try { 241 242 LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( 243 job, dirs, recursive, inputFilter, false); 244 locatedFiles = locatedFileStatusFetcher.getFileStatuses(); 245 } catch (InterruptedException e) { 246 throw new IOException("Interrupted while getting file statuses"); 247 } 248 result = Iterables.toArray(locatedFiles, FileStatus.class); 249 } 250 251 sw.stop(); 252 if (LOG.isDebugEnabled()) { 253 LOG.debug("Time taken to get FileStatuses: " 254 + sw.now(TimeUnit.MILLISECONDS)); 255 } 256 LOG.info("Total input files to process : " + result.length); 257 return result; 258 } 259 260 private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs, 261 PathFilter inputFilter, boolean recursive) throws IOException { 262 List<FileStatus> result = new ArrayList<FileStatus>(); 263 List<IOException> errors = new ArrayList<IOException>(); 264 for (Path p: dirs) { 265 FileSystem fs = p.getFileSystem(job); 266 FileStatus[] matches = fs.globStatus(p, inputFilter); 267 if (matches == null) { 268 errors.add(new IOException("Input path does not exist: " + p)); 269 } else if (matches.length == 0) { 270 errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 271 } else { 272 for (FileStatus globStat: matches) { 273 if (globStat.isDirectory()) { 274 RemoteIterator<LocatedFileStatus> iter = 275 fs.listLocatedStatus(globStat.getPath()); 276 while (iter.hasNext()) { 277 LocatedFileStatus stat = iter.next(); 278 if (inputFilter.accept(stat.getPath())) { 279 if (recursive && stat.isDirectory()) { 280 addInputPathRecursively(result, fs, stat.getPath(), 281 inputFilter); 282 } else { 283 result.add(stat); 284 } 285 } 286 } 287 } else { 288 result.add(globStat); 289 } 290 } 291 } 292 } 293 if (!errors.isEmpty()) { 294 throw new InvalidInputException(errors); 295 } 296 return result; 297 } 298 299 /** 300 * A factory that makes the split for this class. It can be overridden 301 * by sub-classes to make sub-types 302 */ 303 protected FileSplit makeSplit(Path file, long start, long length, 304 String[] hosts) { 305 return new FileSplit(file, start, length, hosts); 306 } 307 308 /** 309 * A factory that makes the split for this class. It can be overridden 310 * by sub-classes to make sub-types 311 */ 312 protected FileSplit makeSplit(Path file, long start, long length, 313 String[] hosts, String[] inMemoryHosts) { 314 return new FileSplit(file, start, length, hosts, inMemoryHosts); 315 } 316 317 /** Splits files returned by {@link #listStatus(JobConf)} when 318 * they're too big.*/ 319 public InputSplit[] getSplits(JobConf job, int numSplits) 320 throws IOException { 321 StopWatch sw = new StopWatch().start(); 322 FileStatus[] files = listStatus(job); 323 324 // Save the number of input files for metrics/loadgen 325 job.setLong(NUM_INPUT_FILES, files.length); 326 long totalSize = 0; // compute total size 327 for (FileStatus file: files) { // check we have valid files 328 if (file.isDirectory()) { 329 throw new IOException("Not a file: "+ file.getPath()); 330 } 331 totalSize += file.getLen(); 332 } 333 334 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); 335 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. 336 FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); 337 338 // generate splits 339 ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); 340 NetworkTopology clusterMap = new NetworkTopology(); 341 for (FileStatus file: files) { 342 Path path = file.getPath(); 343 long length = file.getLen(); 344 if (length != 0) { 345 FileSystem fs = path.getFileSystem(job); 346 BlockLocation[] blkLocations; 347 if (file instanceof LocatedFileStatus) { 348 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 349 } else { 350 blkLocations = fs.getFileBlockLocations(file, 0, length); 351 } 352 if (isSplitable(fs, path)) { 353 long blockSize = file.getBlockSize(); 354 long splitSize = computeSplitSize(goalSize, minSize, blockSize); 355 356 long bytesRemaining = length; 357 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 358 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, 359 length-bytesRemaining, splitSize, clusterMap); 360 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 361 splitHosts[0], splitHosts[1])); 362 bytesRemaining -= splitSize; 363 } 364 365 if (bytesRemaining != 0) { 366 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length 367 - bytesRemaining, bytesRemaining, clusterMap); 368 splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, 369 splitHosts[0], splitHosts[1])); 370 } 371 } else { 372 if (LOG.isDebugEnabled()) { 373 // Log only if the file is big enough to be splitted 374 if (length > Math.min(file.getBlockSize(), minSize)) { 375 LOG.debug("File is not splittable so no parallelization " 376 + "is possible: " + file.getPath()); 377 } 378 } 379 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); 380 splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); 381 } 382 } else { 383 //Create empty hosts array for zero length files 384 splits.add(makeSplit(path, 0, length, new String[0])); 385 } 386 } 387 sw.stop(); 388 if (LOG.isDebugEnabled()) { 389 LOG.debug("Total # of splits generated by getSplits: " + splits.size() 390 + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); 391 } 392 return splits.toArray(new FileSplit[splits.size()]); 393 } 394 395 protected long computeSplitSize(long goalSize, long minSize, 396 long blockSize) { 397 return Math.max(minSize, Math.min(goalSize, blockSize)); 398 } 399 400 protected int getBlockIndex(BlockLocation[] blkLocations, 401 long offset) { 402 for (int i = 0 ; i < blkLocations.length; i++) { 403 // is the offset inside this block? 404 if ((blkLocations[i].getOffset() <= offset) && 405 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 406 return i; 407 } 408 } 409 BlockLocation last = blkLocations[blkLocations.length -1]; 410 long fileLength = last.getOffset() + last.getLength() -1; 411 throw new IllegalArgumentException("Offset " + offset + 412 " is outside of file (0.." + 413 fileLength + ")"); 414 } 415 416 /** 417 * Sets the given comma separated paths as the list of inputs 418 * for the map-reduce job. 419 * 420 * @param conf Configuration of the job 421 * @param commaSeparatedPaths Comma separated paths to be set as 422 * the list of inputs for the map-reduce job. 423 */ 424 public static void setInputPaths(JobConf conf, String commaSeparatedPaths) { 425 setInputPaths(conf, StringUtils.stringToPath( 426 getPathStrings(commaSeparatedPaths))); 427 } 428 429 /** 430 * Add the given comma separated paths to the list of inputs for 431 * the map-reduce job. 432 * 433 * @param conf The configuration of the job 434 * @param commaSeparatedPaths Comma separated paths to be added to 435 * the list of inputs for the map-reduce job. 436 */ 437 public static void addInputPaths(JobConf conf, String commaSeparatedPaths) { 438 for (String str : getPathStrings(commaSeparatedPaths)) { 439 addInputPath(conf, new Path(str)); 440 } 441 } 442 443 /** 444 * Set the array of {@link Path}s as the list of inputs 445 * for the map-reduce job. 446 * 447 * @param conf Configuration of the job. 448 * @param inputPaths the {@link Path}s of the input directories/files 449 * for the map-reduce job. 450 */ 451 public static void setInputPaths(JobConf conf, Path... inputPaths) { 452 Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]); 453 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 454 for(int i = 1; i < inputPaths.length;i++) { 455 str.append(StringUtils.COMMA_STR); 456 path = new Path(conf.getWorkingDirectory(), inputPaths[i]); 457 str.append(StringUtils.escapeString(path.toString())); 458 } 459 conf.set(org.apache.hadoop.mapreduce.lib.input. 460 FileInputFormat.INPUT_DIR, str.toString()); 461 } 462 463 /** 464 * Add a {@link Path} to the list of inputs for the map-reduce job. 465 * 466 * @param conf The configuration of the job 467 * @param path {@link Path} to be added to the list of inputs for 468 * the map-reduce job. 469 */ 470 public static void addInputPath(JobConf conf, Path path ) { 471 path = new Path(conf.getWorkingDirectory(), path); 472 String dirStr = StringUtils.escapeString(path.toString()); 473 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 474 FileInputFormat.INPUT_DIR); 475 conf.set(org.apache.hadoop.mapreduce.lib.input. 476 FileInputFormat.INPUT_DIR, dirs == null ? dirStr : 477 dirs + StringUtils.COMMA_STR + dirStr); 478 } 479 480 // This method escapes commas in the glob pattern of the given paths. 481 private static String[] getPathStrings(String commaSeparatedPaths) { 482 int length = commaSeparatedPaths.length(); 483 int curlyOpen = 0; 484 int pathStart = 0; 485 boolean globPattern = false; 486 List<String> pathStrings = new ArrayList<String>(); 487 488 for (int i=0; i<length; i++) { 489 char ch = commaSeparatedPaths.charAt(i); 490 switch(ch) { 491 case '{' : { 492 curlyOpen++; 493 if (!globPattern) { 494 globPattern = true; 495 } 496 break; 497 } 498 case '}' : { 499 curlyOpen--; 500 if (curlyOpen == 0 && globPattern) { 501 globPattern = false; 502 } 503 break; 504 } 505 case ',' : { 506 if (!globPattern) { 507 pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 508 pathStart = i + 1 ; 509 } 510 break; 511 } 512 default: 513 continue; // nothing special to do for this character 514 } 515 } 516 pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 517 518 return pathStrings.toArray(new String[0]); 519 } 520 521 /** 522 * Get the list of input {@link Path}s for the map-reduce job. 523 * 524 * @param conf The configuration of the job 525 * @return the list of input {@link Path}s for the map-reduce job. 526 */ 527 public static Path[] getInputPaths(JobConf conf) { 528 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 529 FileInputFormat.INPUT_DIR, ""); 530 String [] list = StringUtils.split(dirs); 531 Path[] result = new Path[list.length]; 532 for (int i = 0; i < list.length; i++) { 533 result[i] = new Path(StringUtils.unEscapeString(list[i])); 534 } 535 return result; 536 } 537 538 539 private void sortInDescendingOrder(List<NodeInfo> mylist) { 540 Collections.sort(mylist, new Comparator<NodeInfo> () { 541 public int compare(NodeInfo obj1, NodeInfo obj2) { 542 543 if (obj1 == null || obj2 == null) 544 return -1; 545 546 if (obj1.getValue() == obj2.getValue()) { 547 return 0; 548 } 549 else { 550 return ((obj1.getValue() < obj2.getValue()) ? 1 : -1); 551 } 552 } 553 } 554 ); 555 } 556 557 /** 558 * This function identifies and returns the hosts that contribute 559 * most for a given split. For calculating the contribution, rack 560 * locality is treated on par with host locality, so hosts from racks 561 * that contribute the most are preferred over hosts on racks that 562 * contribute less 563 * @param blkLocations The list of block locations 564 * @param offset 565 * @param splitSize 566 * @return an array of hosts that contribute most to this split 567 * @throws IOException 568 */ 569 protected String[] getSplitHosts(BlockLocation[] blkLocations, 570 long offset, long splitSize, NetworkTopology clusterMap) throws IOException { 571 return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize, 572 clusterMap)[0]; 573 } 574 575 /** 576 * This function identifies and returns the hosts that contribute 577 * most for a given split. For calculating the contribution, rack 578 * locality is treated on par with host locality, so hosts from racks 579 * that contribute the most are preferred over hosts on racks that 580 * contribute less 581 * @param blkLocations The list of block locations 582 * @param offset 583 * @param splitSize 584 * @return two arrays - one of hosts that contribute most to this split, and 585 * one of hosts that contribute most to this split that have the data 586 * cached on them 587 * @throws IOException 588 */ 589 private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, 590 long offset, long splitSize, NetworkTopology clusterMap) 591 throws IOException { 592 593 int startIndex = getBlockIndex(blkLocations, offset); 594 595 long bytesInThisBlock = blkLocations[startIndex].getOffset() + 596 blkLocations[startIndex].getLength() - offset; 597 598 //If this is the only block, just return 599 if (bytesInThisBlock >= splitSize) { 600 return new String[][] { blkLocations[startIndex].getHosts(), 601 blkLocations[startIndex].getCachedHosts() }; 602 } 603 604 long bytesInFirstBlock = bytesInThisBlock; 605 int index = startIndex + 1; 606 splitSize -= bytesInThisBlock; 607 608 while (splitSize > 0) { 609 bytesInThisBlock = 610 Math.min(splitSize, blkLocations[index++].getLength()); 611 splitSize -= bytesInThisBlock; 612 } 613 614 long bytesInLastBlock = bytesInThisBlock; 615 int endIndex = index - 1; 616 617 Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>(); 618 Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>(); 619 String [] allTopos = new String[0]; 620 621 // Build the hierarchy and aggregate the contribution of 622 // bytes at each level. See TestGetSplitHosts.java 623 624 for (index = startIndex; index <= endIndex; index++) { 625 626 // Establish the bytes in this block 627 if (index == startIndex) { 628 bytesInThisBlock = bytesInFirstBlock; 629 } 630 else if (index == endIndex) { 631 bytesInThisBlock = bytesInLastBlock; 632 } 633 else { 634 bytesInThisBlock = blkLocations[index].getLength(); 635 } 636 637 allTopos = blkLocations[index].getTopologyPaths(); 638 639 // If no topology information is available, just 640 // prefix a fakeRack 641 if (allTopos.length == 0) { 642 allTopos = fakeRacks(blkLocations, index); 643 } 644 645 // NOTE: This code currently works only for one level of 646 // hierarchy (rack/host). However, it is relatively easy 647 // to extend this to support aggregation at different 648 // levels 649 650 for (String topo: allTopos) { 651 652 Node node, parentNode; 653 NodeInfo nodeInfo, parentNodeInfo; 654 655 node = clusterMap.getNode(topo); 656 657 if (node == null) { 658 node = new NodeBase(topo); 659 clusterMap.add(node); 660 } 661 662 nodeInfo = hostsMap.get(node); 663 664 if (nodeInfo == null) { 665 nodeInfo = new NodeInfo(node); 666 hostsMap.put(node,nodeInfo); 667 parentNode = node.getParent(); 668 parentNodeInfo = racksMap.get(parentNode); 669 if (parentNodeInfo == null) { 670 parentNodeInfo = new NodeInfo(parentNode); 671 racksMap.put(parentNode,parentNodeInfo); 672 } 673 parentNodeInfo.addLeaf(nodeInfo); 674 } 675 else { 676 nodeInfo = hostsMap.get(node); 677 parentNode = node.getParent(); 678 parentNodeInfo = racksMap.get(parentNode); 679 } 680 681 nodeInfo.addValue(index, bytesInThisBlock); 682 parentNodeInfo.addValue(index, bytesInThisBlock); 683 684 } // for all topos 685 686 } // for all indices 687 688 // We don't yet support cached hosts when bytesInThisBlock > splitSize 689 return new String[][] { identifyHosts(allTopos.length, racksMap), 690 new String[0]}; 691 } 692 693 private String[] identifyHosts(int replicationFactor, 694 Map<Node,NodeInfo> racksMap) { 695 696 String [] retVal = new String[replicationFactor]; 697 698 List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 699 700 rackList.addAll(racksMap.values()); 701 702 // Sort the racks based on their contribution to this split 703 sortInDescendingOrder(rackList); 704 705 boolean done = false; 706 int index = 0; 707 708 // Get the host list for all our aggregated items, sort 709 // them and return the top entries 710 for (NodeInfo ni: rackList) { 711 712 Set<NodeInfo> hostSet = ni.getLeaves(); 713 714 List<NodeInfo>hostList = new LinkedList<NodeInfo>(); 715 hostList.addAll(hostSet); 716 717 // Sort the hosts in this rack based on their contribution 718 sortInDescendingOrder(hostList); 719 720 for (NodeInfo host: hostList) { 721 // Strip out the port number from the host name 722 retVal[index++] = host.node.getName().split(":")[0]; 723 if (index == replicationFactor) { 724 done = true; 725 break; 726 } 727 } 728 729 if (done == true) { 730 break; 731 } 732 } 733 return retVal; 734 } 735 736 private String[] fakeRacks(BlockLocation[] blkLocations, int index) 737 throws IOException { 738 String[] allHosts = blkLocations[index].getHosts(); 739 String[] allTopos = new String[allHosts.length]; 740 for (int i = 0; i < allHosts.length; i++) { 741 allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i]; 742 } 743 return allTopos; 744 } 745 746 747 private static class NodeInfo { 748 final Node node; 749 final Set<Integer> blockIds; 750 final Set<NodeInfo> leaves; 751 752 private long value; 753 754 NodeInfo(Node node) { 755 this.node = node; 756 blockIds = new HashSet<Integer>(); 757 leaves = new HashSet<NodeInfo>(); 758 } 759 760 long getValue() {return value;} 761 762 void addValue(int blockIndex, long value) { 763 if (blockIds.add(blockIndex) == true) { 764 this.value += value; 765 } 766 } 767 768 Set<NodeInfo> getLeaves() { return leaves;} 769 770 void addLeaf(NodeInfo nodeInfo) { 771 leaves.add(nodeInfo); 772 } 773 } 774}