001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.ref.WeakReference;
024import java.lang.ref.ReferenceQueue;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.security.PrivilegedExceptionAction;
028import java.util.ArrayList;
029import java.util.Collection;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.IdentityHashMap;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.NoSuchElementException;
038import java.util.ServiceConfigurationError;
039import java.util.ServiceLoader;
040import java.util.Set;
041import java.util.Stack;
042import java.util.TreeSet;
043import java.util.concurrent.atomic.AtomicLong;
044
045import org.apache.commons.logging.Log;
046import org.apache.commons.logging.LogFactory;
047import org.apache.hadoop.classification.InterfaceAudience;
048import org.apache.hadoop.classification.InterfaceStability;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.conf.Configured;
051import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
052import org.apache.hadoop.fs.Options.ChecksumOpt;
053import org.apache.hadoop.fs.Options.Rename;
054import org.apache.hadoop.fs.permission.AclEntry;
055import org.apache.hadoop.fs.permission.AclStatus;
056import org.apache.hadoop.fs.permission.FsAction;
057import org.apache.hadoop.fs.permission.FsCreateModes;
058import org.apache.hadoop.fs.permission.FsPermission;
059import org.apache.hadoop.io.MultipleIOException;
060import org.apache.hadoop.io.Text;
061import org.apache.hadoop.net.NetUtils;
062import org.apache.hadoop.security.AccessControlException;
063import org.apache.hadoop.security.Credentials;
064import org.apache.hadoop.security.SecurityUtil;
065import org.apache.hadoop.security.UserGroupInformation;
066import org.apache.hadoop.security.token.Token;
067import org.apache.hadoop.util.ClassUtil;
068import org.apache.hadoop.util.DataChecksum;
069import org.apache.hadoop.util.Progressable;
070import org.apache.hadoop.util.ReflectionUtils;
071import org.apache.hadoop.util.ShutdownHookManager;
072import org.apache.hadoop.util.StringUtils;
073import org.apache.htrace.core.Tracer;
074import org.apache.htrace.core.TraceScope;
075
076import com.google.common.annotations.VisibleForTesting;
077
078import static com.google.common.base.Preconditions.checkArgument;
079import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
080
081/****************************************************************
082 * An abstract base class for a fairly generic filesystem.  It
083 * may be implemented as a distributed filesystem, or as a "local"
084 * one that reflects the locally-connected disk.  The local version
085 * exists for small Hadoop instances and for testing.
086 *
087 * <p>
088 *
089 * All user code that may potentially use the Hadoop Distributed
090 * File System should be written to use a FileSystem object.  The
091 * Hadoop DFS is a multi-machine system that appears as a single
092 * disk.  It's useful because of its fault tolerance and potentially
093 * very large capacity.
094 * 
095 * <p>
096 * The local implementation is {@link LocalFileSystem} and distributed
097 * implementation is DistributedFileSystem.
098 *****************************************************************/
099@InterfaceAudience.Public
100@InterfaceStability.Stable
101public abstract class FileSystem extends Configured implements Closeable {
102  public static final String FS_DEFAULT_NAME_KEY = 
103                   CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
104  public static final String DEFAULT_FS = 
105                   CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
106
107  public static final Log LOG = LogFactory.getLog(FileSystem.class);
108
109  /**
110   * Priority of the FileSystem shutdown hook.
111   */
112  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
113
114  public static final String TRASH_PREFIX = ".Trash";
115
116  /** FileSystem cache */
117  static final Cache CACHE = new Cache();
118
119  /** The key this instance is stored under in the cache. */
120  private Cache.Key key;
121
122  /** Recording statistics per a FileSystem class */
123  private static final Map<Class<? extends FileSystem>, Statistics> 
124    statisticsTable =
125      new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
126  
127  /**
128   * The statistics for this file system.
129   */
130  protected Statistics statistics;
131
132  /**
133   * A cache of files that should be deleted when filesystem is closed
134   * or the JVM is exited.
135   */
136  private Set<Path> deleteOnExit = new TreeSet<Path>();
137  
138  boolean resolveSymlinks;
139
140  /**
141   * This method adds a file system for testing so that we can find it later. It
142   * is only for testing.
143   * @param uri the uri to store it under
144   * @param conf the configuration to store it under
145   * @param fs the file system to store
146   * @throws IOException
147   */
148  static void addFileSystemForTesting(URI uri, Configuration conf,
149      FileSystem fs) throws IOException {
150    CACHE.map.put(new Cache.Key(uri, conf), fs);
151  }
152
153  /**
154   * Get a filesystem instance based on the uri, the passed
155   * configuration and the user
156   * @param uri of the filesystem
157   * @param conf the configuration to use
158   * @param user to perform the get as
159   * @return the filesystem instance
160   * @throws IOException
161   * @throws InterruptedException
162   */
163  public static FileSystem get(final URI uri, final Configuration conf,
164        final String user) throws IOException, InterruptedException {
165    String ticketCachePath =
166      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
167    UserGroupInformation ugi =
168        UserGroupInformation.getBestUGI(ticketCachePath, user);
169    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
170      @Override
171      public FileSystem run() throws IOException {
172        return get(uri, conf);
173      }
174    });
175  }
176
177  /**
178   * Returns the configured filesystem implementation.
179   * @param conf the configuration to use
180   */
181  public static FileSystem get(Configuration conf) throws IOException {
182    return get(getDefaultUri(conf), conf);
183  }
184  
185  /** Get the default filesystem URI from a configuration.
186   * @param conf the configuration to use
187   * @return the uri of the default filesystem
188   */
189  public static URI getDefaultUri(Configuration conf) {
190    return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
191  }
192
193  /** Set the default filesystem URI in a configuration.
194   * @param conf the configuration to alter
195   * @param uri the new default filesystem uri
196   */
197  public static void setDefaultUri(Configuration conf, URI uri) {
198    conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
199  }
200
201  /** Set the default filesystem URI in a configuration.
202   * @param conf the configuration to alter
203   * @param uri the new default filesystem uri
204   */
205  public static void setDefaultUri(Configuration conf, String uri) {
206    setDefaultUri(conf, URI.create(fixName(uri)));
207  }
208
209  /** Called after a new FileSystem instance is constructed.
210   * @param name a uri whose authority section names the host, port, etc.
211   *   for this FileSystem
212   * @param conf the configuration
213   */
214  public void initialize(URI name, Configuration conf) throws IOException {
215    final String scheme;
216    if (name.getScheme() == null || name.getScheme().isEmpty()) {
217      scheme = getDefaultUri(conf).getScheme();
218    } else {
219      scheme = name.getScheme();
220    }
221    statistics = getStatistics(scheme, getClass());
222    resolveSymlinks = conf.getBoolean(
223        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
224        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
225  }
226
227  /**
228   * Return the protocol scheme for the FileSystem.
229   * <p/>
230   * This implementation throws an <code>UnsupportedOperationException</code>.
231   *
232   * @return the protocol scheme for the FileSystem.
233   */
234  public String getScheme() {
235    throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
236  }
237
238  /** Returns a URI whose scheme and authority identify this FileSystem.*/
239  public abstract URI getUri();
240  
241  /**
242   * Return a canonicalized form of this FileSystem's URI.
243   * 
244   * The default implementation simply calls {@link #canonicalizeUri(URI)}
245   * on the filesystem's own URI, so subclasses typically only need to
246   * implement that method.
247   *
248   * @see #canonicalizeUri(URI)
249   */
250  protected URI getCanonicalUri() {
251    return canonicalizeUri(getUri());
252  }
253  
254  /**
255   * Canonicalize the given URI.
256   * 
257   * This is filesystem-dependent, but may for example consist of
258   * canonicalizing the hostname using DNS and adding the default
259   * port if not specified.
260   * 
261   * The default implementation simply fills in the default port if
262   * not specified and if the filesystem has a default port.
263   *
264   * @return URI
265   * @see NetUtils#getCanonicalUri(URI, int)
266   */
267  protected URI canonicalizeUri(URI uri) {
268    if (uri.getPort() == -1 && getDefaultPort() > 0) {
269      // reconstruct the uri with the default port set
270      try {
271        uri = new URI(uri.getScheme(), uri.getUserInfo(),
272            uri.getHost(), getDefaultPort(),
273            uri.getPath(), uri.getQuery(), uri.getFragment());
274      } catch (URISyntaxException e) {
275        // Should never happen!
276        throw new AssertionError("Valid URI became unparseable: " +
277            uri);
278      }
279    }
280    
281    return uri;
282  }
283  
284  /**
285   * Get the default port for this file system.
286   * @return the default port or 0 if there isn't one
287   */
288  protected int getDefaultPort() {
289    return 0;
290  }
291
292  protected static FileSystem getFSofPath(final Path absOrFqPath,
293      final Configuration conf)
294      throws UnsupportedFileSystemException, IOException {
295    absOrFqPath.checkNotSchemeWithRelative();
296    absOrFqPath.checkNotRelative();
297
298    // Uses the default file system if not fully qualified
299    return get(absOrFqPath.toUri(), conf);
300  }
301
302  /**
303   * Get a canonical service name for this file system.  The token cache is
304   * the only user of the canonical service name, and uses it to lookup this
305   * filesystem's service tokens.
306   * If file system provides a token of its own then it must have a canonical
307   * name, otherwise canonical name can be null.
308   * 
309   * Default Impl: If the file system has child file systems 
310   * (such as an embedded file system) then it is assumed that the fs has no
311   * tokens of its own and hence returns a null name; otherwise a service
312   * name is built using Uri and port.
313   * 
314   * @return a service string that uniquely identifies this file system, null
315   *         if the filesystem does not implement tokens
316   * @see SecurityUtil#buildDTServiceName(URI, int) 
317   */
318  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
319  public String getCanonicalServiceName() {
320    return (getChildFileSystems() == null)
321      ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
322      : null;
323  }
324
325  /** @deprecated call #getUri() instead.*/
326  @Deprecated
327  public String getName() { return getUri().toString(); }
328
329  /** @deprecated call #get(URI,Configuration) instead. */
330  @Deprecated
331  public static FileSystem getNamed(String name, Configuration conf)
332    throws IOException {
333    return get(URI.create(fixName(name)), conf);
334  }
335  
336  /** Update old-format filesystem names, for back-compatibility.  This should
337   * eventually be replaced with a checkName() method that throws an exception
338   * for old-format names. */ 
339  private static String fixName(String name) {
340    // convert old-format name to new-format name
341    if (name.equals("local")) {         // "local" is now "file:///".
342      LOG.warn("\"local\" is a deprecated filesystem name."
343               +" Use \"file:///\" instead.");
344      name = "file:///";
345    } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
346      LOG.warn("\""+name+"\" is a deprecated filesystem name."
347               +" Use \"hdfs://"+name+"/\" instead.");
348      name = "hdfs://"+name;
349    }
350    return name;
351  }
352
353  /**
354   * Get the local file system.
355   * @param conf the configuration to configure the file system with
356   * @return a LocalFileSystem
357   */
358  public static LocalFileSystem getLocal(Configuration conf)
359    throws IOException {
360    return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
361  }
362
363  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
364   * of the URI determines a configuration property name,
365   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
366   * The entire URI is passed to the FileSystem instance's initialize method.
367   */
368  public static FileSystem get(URI uri, Configuration conf) throws IOException {
369    String scheme = uri.getScheme();
370    String authority = uri.getAuthority();
371
372    if (scheme == null && authority == null) {     // use default FS
373      return get(conf);
374    }
375
376    if (scheme != null && authority == null) {     // no authority
377      URI defaultUri = getDefaultUri(conf);
378      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
379          && defaultUri.getAuthority() != null) {  // & default has authority
380        return get(defaultUri, conf);              // return default
381      }
382    }
383    
384    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
385    if (conf.getBoolean(disableCacheName, false)) {
386      return createFileSystem(uri, conf);
387    }
388
389    return CACHE.get(uri, conf);
390  }
391
392  /**
393   * Returns the FileSystem for this URI's scheme and authority and the 
394   * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
395   * @param uri of the filesystem
396   * @param conf the configuration to use
397   * @param user to perform the get as
398   * @return filesystem instance
399   * @throws IOException
400   * @throws InterruptedException
401   */
402  public static FileSystem newInstance(final URI uri, final Configuration conf,
403      final String user) throws IOException, InterruptedException {
404    String ticketCachePath =
405      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
406    UserGroupInformation ugi =
407        UserGroupInformation.getBestUGI(ticketCachePath, user);
408    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
409      @Override
410      public FileSystem run() throws IOException {
411        return newInstance(uri,conf); 
412      }
413    });
414  }
415  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
416   * of the URI determines a configuration property name,
417   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
418   * The entire URI is passed to the FileSystem instance's initialize method.
419   * This always returns a new FileSystem object.
420   */
421  public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
422    String scheme = uri.getScheme();
423    String authority = uri.getAuthority();
424
425    if (scheme == null) {                       // no scheme: use default FS
426      return newInstance(conf);
427    }
428
429    if (authority == null) {                       // no authority
430      URI defaultUri = getDefaultUri(conf);
431      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
432          && defaultUri.getAuthority() != null) {  // & default has authority
433        return newInstance(defaultUri, conf);              // return default
434      }
435    }
436    return CACHE.getUnique(uri, conf);
437  }
438
439  /** Returns a unique configured filesystem implementation.
440   * This always returns a new FileSystem object.
441   * @param conf the configuration to use
442   */
443  public static FileSystem newInstance(Configuration conf) throws IOException {
444    return newInstance(getDefaultUri(conf), conf);
445  }
446
447  /**
448   * Get a unique local file system object
449   * @param conf the configuration to configure the file system with
450   * @return a LocalFileSystem
451   * This always returns a new FileSystem object.
452   */
453  public static LocalFileSystem newInstanceLocal(Configuration conf)
454    throws IOException {
455    return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
456  }
457
458  /**
459   * Close all cached filesystems. Be sure those filesystems are not
460   * used anymore.
461   * 
462   * @throws IOException
463   */
464  public static void closeAll() throws IOException {
465    CACHE.closeAll();
466  }
467
468  /**
469   * Close all cached filesystems for a given UGI. Be sure those filesystems 
470   * are not used anymore.
471   * @param ugi user group info to close
472   * @throws IOException
473   */
474  public static void closeAllForUGI(UserGroupInformation ugi) 
475  throws IOException {
476    CACHE.closeAll(ugi);
477  }
478
479  /** 
480   * Make sure that a path specifies a FileSystem.
481   * @param path to use
482   */
483  public Path makeQualified(Path path) {
484    checkPath(path);
485    return path.makeQualified(this.getUri(), this.getWorkingDirectory());
486  }
487    
488  /**
489   * Get a new delegation token for this file system.
490   * This is an internal method that should have been declared protected
491   * but wasn't historically.
492   * Callers should use {@link #addDelegationTokens(String, Credentials)}
493   * 
494   * @param renewer the account name that is allowed to renew the token.
495   * @return a new delegation token
496   * @throws IOException
497   */
498  @InterfaceAudience.Private()
499  public Token<?> getDelegationToken(String renewer) throws IOException {
500    return null;
501  }
502  
503  /**
504   * Obtain all delegation tokens used by this FileSystem that are not
505   * already present in the given Credentials.  Existing tokens will neither
506   * be verified as valid nor having the given renewer.  Missing tokens will
507   * be acquired and added to the given Credentials.
508   * 
509   * Default Impl: works for simple fs with its own token
510   * and also for an embedded fs whose tokens are those of its
511   * children file system (i.e. the embedded fs has not tokens of its
512   * own).
513   * 
514   * @param renewer the user allowed to renew the delegation tokens
515   * @param credentials cache in which to add new delegation tokens
516   * @return list of new delegation tokens
517   * @throws IOException
518   */
519  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
520  public Token<?>[] addDelegationTokens(
521      final String renewer, Credentials credentials) throws IOException {
522    if (credentials == null) {
523      credentials = new Credentials();
524    }
525    final List<Token<?>> tokens = new ArrayList<Token<?>>();
526    collectDelegationTokens(renewer, credentials, tokens);
527    return tokens.toArray(new Token<?>[tokens.size()]);
528  }
529  
530  /**
531   * Recursively obtain the tokens for this FileSystem and all descended
532   * FileSystems as determined by getChildFileSystems().
533   * @param renewer the user allowed to renew the delegation tokens
534   * @param credentials cache in which to add the new delegation tokens
535   * @param tokens list in which to add acquired tokens
536   * @throws IOException
537   */
538  private void collectDelegationTokens(final String renewer,
539                                       final Credentials credentials,
540                                       final List<Token<?>> tokens)
541                                           throws IOException {
542    final String serviceName = getCanonicalServiceName();
543    // Collect token of the this filesystem and then of its embedded children
544    if (serviceName != null) { // fs has token, grab it
545      final Text service = new Text(serviceName);
546      Token<?> token = credentials.getToken(service);
547      if (token == null) {
548        token = getDelegationToken(renewer);
549        if (token != null) {
550          tokens.add(token);
551          credentials.addToken(service, token);
552        }
553      }
554    }
555    // Now collect the tokens from the children
556    final FileSystem[] children = getChildFileSystems();
557    if (children != null) {
558      for (final FileSystem fs : children) {
559        fs.collectDelegationTokens(renewer, credentials, tokens);
560      }
561    }
562  }
563
564  /**
565   * Get all the immediate child FileSystems embedded in this FileSystem.
566   * It does not recurse and get grand children.  If a FileSystem
567   * has multiple child FileSystems, then it should return a unique list
568   * of those FileSystems.  Default is to return null to signify no children.
569   * 
570   * @return FileSystems used by this FileSystem
571   */
572  @InterfaceAudience.LimitedPrivate({ "HDFS" })
573  @VisibleForTesting
574  public FileSystem[] getChildFileSystems() {
575    return null;
576  }
577  
578  /** create a file with the provided permission
579   * The permission of the file is set to be the provided permission as in
580   * setPermission, not permission&~umask
581   * 
582   * It is implemented using two RPCs. It is understood that it is inefficient,
583   * but the implementation is thread-safe. The other option is to change the
584   * value of umask in configuration to be 0, but it is not thread-safe.
585   * 
586   * @param fs file system handle
587   * @param file the name of the file to be created
588   * @param permission the permission of the file
589   * @return an output stream
590   * @throws IOException
591   */
592  public static FSDataOutputStream create(FileSystem fs,
593      Path file, FsPermission permission) throws IOException {
594    // create the file with default permission
595    FSDataOutputStream out = fs.create(file);
596    // set its permission to the supplied one
597    fs.setPermission(file, permission);
598    return out;
599  }
600
601  /** create a directory with the provided permission
602   * The permission of the directory is set to be the provided permission as in
603   * setPermission, not permission&~umask
604   * 
605   * @see #create(FileSystem, Path, FsPermission)
606   * 
607   * @param fs file system handle
608   * @param dir the name of the directory to be created
609   * @param permission the permission of the directory
610   * @return true if the directory creation succeeds; false otherwise
611   * @throws IOException
612   */
613  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
614  throws IOException {
615    // create the directory using the default permission
616    boolean result = fs.mkdirs(dir);
617    // set its permission to be the supplied one
618    fs.setPermission(dir, permission);
619    return result;
620  }
621
622  ///////////////////////////////////////////////////////////////
623  // FileSystem
624  ///////////////////////////////////////////////////////////////
625
626  protected FileSystem() {
627    super(null);
628  }
629
630  /** 
631   * Check that a Path belongs to this FileSystem.
632   * @param path to check
633   */
634  protected void checkPath(Path path) {
635    URI uri = path.toUri();
636    String thatScheme = uri.getScheme();
637    if (thatScheme == null)                // fs is relative
638      return;
639    URI thisUri = getCanonicalUri();
640    String thisScheme = thisUri.getScheme();
641    //authority and scheme are not case sensitive
642    if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
643      String thisAuthority = thisUri.getAuthority();
644      String thatAuthority = uri.getAuthority();
645      if (thatAuthority == null &&                // path's authority is null
646          thisAuthority != null) {                // fs has an authority
647        URI defaultUri = getDefaultUri(getConf());
648        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
649          uri = defaultUri; // schemes match, so use this uri instead
650        } else {
651          uri = null; // can't determine auth of the path
652        }
653      }
654      if (uri != null) {
655        // canonicalize uri before comparing with this fs
656        uri = canonicalizeUri(uri);
657        thatAuthority = uri.getAuthority();
658        if (thisAuthority == thatAuthority ||       // authorities match
659            (thisAuthority != null &&
660             thisAuthority.equalsIgnoreCase(thatAuthority)))
661          return;
662      }
663    }
664    throw new IllegalArgumentException("Wrong FS: "+path+
665                                       ", expected: "+this.getUri());
666  }
667
668  /**
669   * Return an array containing hostnames, offset and size of 
670   * portions of the given file.  For a nonexistent 
671   * file or regions, null will be returned.
672   *
673   * This call is most helpful with DFS, where it returns 
674   * hostnames of machines that contain the given file.
675   *
676   * The FileSystem will simply return an elt containing 'localhost'.
677   *
678   * @param file FilesStatus to get data from
679   * @param start offset into the given file
680   * @param len length for which to get locations for
681   */
682  public BlockLocation[] getFileBlockLocations(FileStatus file, 
683      long start, long len) throws IOException {
684    if (file == null) {
685      return null;
686    }
687
688    if (start < 0 || len < 0) {
689      throw new IllegalArgumentException("Invalid start or len parameter");
690    }
691
692    if (file.getLen() <= start) {
693      return new BlockLocation[0];
694
695    }
696    String[] name = {"localhost:9866"};
697    String[] host = {"localhost"};
698    return new BlockLocation[] {
699      new BlockLocation(name, host, 0, file.getLen()) };
700  }
701 
702
703  /**
704   * Return an array containing hostnames, offset and size of 
705   * portions of the given file.  For a nonexistent 
706   * file or regions, null will be returned.
707   *
708   * This call is most helpful with DFS, where it returns 
709   * hostnames of machines that contain the given file.
710   *
711   * The FileSystem will simply return an elt containing 'localhost'.
712   *
713   * @param p path is used to identify an FS since an FS could have
714   *          another FS that it could be delegating the call to
715   * @param start offset into the given file
716   * @param len length for which to get locations for
717   */
718  public BlockLocation[] getFileBlockLocations(Path p, 
719      long start, long len) throws IOException {
720    if (p == null) {
721      throw new NullPointerException();
722    }
723    FileStatus file = getFileStatus(p);
724    return getFileBlockLocations(file, start, len);
725  }
726  
727  /**
728   * Return a set of server default configuration values
729   * @return server default configuration values
730   * @throws IOException
731   * @deprecated use {@link #getServerDefaults(Path)} instead
732   */
733  @Deprecated
734  public FsServerDefaults getServerDefaults() throws IOException {
735    Configuration conf = getConf();
736    // CRC32 is chosen as default as it is available in all 
737    // releases that support checksum.
738    // The client trash configuration is ignored.
739    return new FsServerDefaults(getDefaultBlockSize(), 
740        conf.getInt("io.bytes.per.checksum", 512), 
741        64 * 1024, 
742        getDefaultReplication(),
743        conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
744        false,
745        FS_TRASH_INTERVAL_DEFAULT,
746        DataChecksum.Type.CRC32);
747  }
748
749  /**
750   * Return a set of server default configuration values
751   * @param p path is used to identify an FS since an FS could have
752   *          another FS that it could be delegating the call to
753   * @return server default configuration values
754   * @throws IOException
755   */
756  public FsServerDefaults getServerDefaults(Path p) throws IOException {
757    return getServerDefaults();
758  }
759
760  /**
761   * Return the fully-qualified path of path f resolving the path
762   * through any symlinks or mount point
763   * @param p path to be resolved
764   * @return fully qualified path 
765   * @throws FileNotFoundException
766   */
767   public Path resolvePath(final Path p) throws IOException {
768     checkPath(p);
769     return getFileStatus(p).getPath();
770   }
771
772  /**
773   * Opens an FSDataInputStream at the indicated Path.
774   * @param f the file name to open
775   * @param bufferSize the size of the buffer to be used.
776   */
777  public abstract FSDataInputStream open(Path f, int bufferSize)
778    throws IOException;
779    
780  /**
781   * Opens an FSDataInputStream at the indicated Path.
782   * @param f the file to open
783   */
784  public FSDataInputStream open(Path f) throws IOException {
785    return open(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
786        IO_FILE_BUFFER_SIZE_DEFAULT));
787  }
788
789  /**
790   * Create an FSDataOutputStream at the indicated Path.
791   * Files are overwritten by default.
792   * @param f the file to create
793   */
794  public FSDataOutputStream create(Path f) throws IOException {
795    return create(f, true);
796  }
797
798  /**
799   * Create an FSDataOutputStream at the indicated Path.
800   * @param f the file to create
801   * @param overwrite if a file with this name already exists, then if true,
802   *   the file will be overwritten, and if false an exception will be thrown.
803   */
804  public FSDataOutputStream create(Path f, boolean overwrite)
805      throws IOException {
806    return create(f, overwrite, 
807                  getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
808                      IO_FILE_BUFFER_SIZE_DEFAULT),
809                  getDefaultReplication(f),
810                  getDefaultBlockSize(f));
811  }
812
813  /**
814   * Create an FSDataOutputStream at the indicated Path with write-progress
815   * reporting.
816   * Files are overwritten by default.
817   * @param f the file to create
818   * @param progress to report progress
819   */
820  public FSDataOutputStream create(Path f, Progressable progress) 
821      throws IOException {
822    return create(f, true, 
823                  getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
824                      IO_FILE_BUFFER_SIZE_DEFAULT),
825                  getDefaultReplication(f),
826                  getDefaultBlockSize(f), progress);
827  }
828
829  /**
830   * Create an FSDataOutputStream at the indicated Path.
831   * Files are overwritten by default.
832   * @param f the file to create
833   * @param replication the replication factor
834   */
835  public FSDataOutputStream create(Path f, short replication)
836      throws IOException {
837    return create(f, true, 
838                  getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
839                      IO_FILE_BUFFER_SIZE_DEFAULT),
840                  replication,
841                  getDefaultBlockSize(f));
842  }
843
844  /**
845   * Create an FSDataOutputStream at the indicated Path with write-progress
846   * reporting.
847   * Files are overwritten by default.
848   * @param f the file to create
849   * @param replication the replication factor
850   * @param progress to report progress
851   */
852  public FSDataOutputStream create(Path f, short replication, 
853      Progressable progress) throws IOException {
854    return create(f, true, 
855                  getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
856                      IO_FILE_BUFFER_SIZE_DEFAULT),
857                  replication, getDefaultBlockSize(f), progress);
858  }
859
860    
861  /**
862   * Create an FSDataOutputStream at the indicated Path.
863   * @param f the file name to create
864   * @param overwrite if a file with this name already exists, then if true,
865   *   the file will be overwritten, and if false an error will be thrown.
866   * @param bufferSize the size of the buffer to be used.
867   */
868  public FSDataOutputStream create(Path f, 
869                                   boolean overwrite,
870                                   int bufferSize
871                                   ) throws IOException {
872    return create(f, overwrite, bufferSize, 
873                  getDefaultReplication(f),
874                  getDefaultBlockSize(f));
875  }
876    
877  /**
878   * Create an FSDataOutputStream at the indicated Path with write-progress
879   * reporting.
880   * @param f the path of the file to open
881   * @param overwrite if a file with this name already exists, then if true,
882   *   the file will be overwritten, and if false an error will be thrown.
883   * @param bufferSize the size of the buffer to be used.
884   */
885  public FSDataOutputStream create(Path f, 
886                                   boolean overwrite,
887                                   int bufferSize,
888                                   Progressable progress
889                                   ) throws IOException {
890    return create(f, overwrite, bufferSize, 
891                  getDefaultReplication(f),
892                  getDefaultBlockSize(f), progress);
893  }
894    
895    
896  /**
897   * Create an FSDataOutputStream at the indicated Path.
898   * @param f the file name to open
899   * @param overwrite if a file with this name already exists, then if true,
900   *   the file will be overwritten, and if false an error will be thrown.
901   * @param bufferSize the size of the buffer to be used.
902   * @param replication required block replication for the file. 
903   */
904  public FSDataOutputStream create(Path f, 
905                                   boolean overwrite,
906                                   int bufferSize,
907                                   short replication,
908                                   long blockSize
909                                   ) throws IOException {
910    return create(f, overwrite, bufferSize, replication, blockSize, null);
911  }
912
913  /**
914   * Create an FSDataOutputStream at the indicated Path with write-progress
915   * reporting.
916   * @param f the file name to open
917   * @param overwrite if a file with this name already exists, then if true,
918   *   the file will be overwritten, and if false an error will be thrown.
919   * @param bufferSize the size of the buffer to be used.
920   * @param replication required block replication for the file. 
921   */
922  public FSDataOutputStream create(Path f,
923                                            boolean overwrite,
924                                            int bufferSize,
925                                            short replication,
926                                            long blockSize,
927                                            Progressable progress
928                                            ) throws IOException {
929    return this.create(f, FsCreateModes.applyUMask(
930        FsPermission.getFileDefault(), FsPermission.getUMask(getConf())),
931        overwrite, bufferSize, replication, blockSize, progress);
932  }
933
934  /**
935   * Create an FSDataOutputStream at the indicated Path with write-progress
936   * reporting.
937   * @param f the file name to open
938   * @param permission file permission
939   * @param overwrite if a file with this name already exists, then if true,
940   *   the file will be overwritten, and if false an error will be thrown.
941   * @param bufferSize the size of the buffer to be used.
942   * @param replication required block replication for the file.
943   * @param blockSize block size
944   * @param progress the progress reporter
945   * @throws IOException
946   * @see #setPermission(Path, FsPermission)
947   */
948  public abstract FSDataOutputStream create(Path f,
949      FsPermission permission,
950      boolean overwrite,
951      int bufferSize,
952      short replication,
953      long blockSize,
954      Progressable progress) throws IOException;
955  
956  /**
957   * Create an FSDataOutputStream at the indicated Path with write-progress
958   * reporting.
959   * @param f the file name to open
960   * @param permission file permission
961   * @param flags {@link CreateFlag}s to use for this stream.
962   * @param bufferSize the size of the buffer to be used.
963   * @param replication required block replication for the file.
964   * @param blockSize block size
965   * @param progress the progress reporter
966   * @throws IOException
967   * @see #setPermission(Path, FsPermission)
968   */
969  public FSDataOutputStream create(Path f,
970      FsPermission permission,
971      EnumSet<CreateFlag> flags,
972      int bufferSize,
973      short replication,
974      long blockSize,
975      Progressable progress) throws IOException {
976    return create(f, permission, flags, bufferSize, replication,
977        blockSize, progress, null);
978  }
979  
980  /**
981   * Create an FSDataOutputStream at the indicated Path with a custom
982   * checksum option
983   * @param f the file name to open
984   * @param permission file permission
985   * @param flags {@link CreateFlag}s to use for this stream.
986   * @param bufferSize the size of the buffer to be used.
987   * @param replication required block replication for the file.
988   * @param blockSize block size
989   * @param progress the progress reporter
990   * @param checksumOpt checksum parameter. If null, the values
991   *        found in conf will be used.
992   * @throws IOException
993   * @see #setPermission(Path, FsPermission)
994   */
995  public FSDataOutputStream create(Path f,
996      FsPermission permission,
997      EnumSet<CreateFlag> flags,
998      int bufferSize,
999      short replication,
1000      long blockSize,
1001      Progressable progress,
1002      ChecksumOpt checksumOpt) throws IOException {
1003    // Checksum options are ignored by default. The file systems that
1004    // implement checksum need to override this method. The full
1005    // support is currently only available in DFS.
1006    return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
1007        bufferSize, replication, blockSize, progress);
1008  }
1009
1010  /*.
1011   * This create has been added to support the FileContext that processes
1012   * the permission
1013   * with umask before calling this method.
1014   * This a temporary method added to support the transition from FileSystem
1015   * to FileContext for user applications.
1016   */
1017  @Deprecated
1018  protected FSDataOutputStream primitiveCreate(Path f,
1019     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
1020     short replication, long blockSize, Progressable progress,
1021     ChecksumOpt checksumOpt) throws IOException {
1022
1023    boolean pathExists = exists(f);
1024    CreateFlag.validate(f, pathExists, flag);
1025    
1026    // Default impl  assumes that permissions do not matter and 
1027    // nor does the bytesPerChecksum  hence
1028    // calling the regular create is good enough.
1029    // FSs that implement permissions should override this.
1030
1031    if (pathExists && flag.contains(CreateFlag.APPEND)) {
1032      return append(f, bufferSize, progress);
1033    }
1034    
1035    return this.create(f, absolutePermission,
1036        flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
1037        blockSize, progress);
1038  }
1039  
1040  /**
1041   * This version of the mkdirs method assumes that the permission is absolute.
1042   * It has been added to support the FileContext that processes the permission
1043   * with umask before calling this method.
1044   * This a temporary method added to support the transition from FileSystem
1045   * to FileContext for user applications.
1046   */
1047  @Deprecated
1048  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
1049    throws IOException {
1050    // Default impl is to assume that permissions do not matter and hence
1051    // calling the regular mkdirs is good enough.
1052    // FSs that implement permissions should override this.
1053   return this.mkdirs(f, absolutePermission);
1054  }
1055
1056
1057  /**
1058   * This version of the mkdirs method assumes that the permission is absolute.
1059   * It has been added to support the FileContext that processes the permission
1060   * with umask before calling this method.
1061   * This a temporary method added to support the transition from FileSystem
1062   * to FileContext for user applications.
1063   */
1064  @Deprecated
1065  protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
1066                    boolean createParent)
1067    throws IOException {
1068    
1069    if (!createParent) { // parent must exist.
1070      // since the this.mkdirs makes parent dirs automatically
1071      // we must throw exception if parent does not exist.
1072      final FileStatus stat = getFileStatus(f.getParent());
1073      if (stat == null) {
1074        throw new FileNotFoundException("Missing parent:" + f);
1075      }
1076      if (!stat.isDirectory()) {
1077        throw new ParentNotDirectoryException("parent is not a dir");
1078      }
1079      // parent does exist - go ahead with mkdir of leaf
1080    }
1081    // Default impl is to assume that permissions do not matter and hence
1082    // calling the regular mkdirs is good enough.
1083    // FSs that implement permissions should override this.
1084    if (!this.mkdirs(f, absolutePermission)) {
1085      throw new IOException("mkdir of "+ f + " failed");
1086    }
1087  }
1088
1089  /**
1090   * Opens an FSDataOutputStream at the indicated Path with write-progress
1091   * reporting. Same as create(), except fails if parent directory doesn't
1092   * already exist.
1093   * @param f the file name to open
1094   * @param overwrite if a file with this name already exists, then if true,
1095   * the file will be overwritten, and if false an error will be thrown.
1096   * @param bufferSize the size of the buffer to be used.
1097   * @param replication required block replication for the file.
1098   * @param blockSize block size
1099   * @param progress the progress reporter
1100   * @throws IOException
1101   * @see #setPermission(Path, FsPermission)
1102   */
1103  public FSDataOutputStream createNonRecursive(Path f,
1104      boolean overwrite,
1105      int bufferSize, short replication, long blockSize,
1106      Progressable progress) throws IOException {
1107    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1108        overwrite, bufferSize, replication, blockSize, progress);
1109  }
1110
1111  /**
1112   * Opens an FSDataOutputStream at the indicated Path with write-progress
1113   * reporting. Same as create(), except fails if parent directory doesn't
1114   * already exist.
1115   * @param f the file name to open
1116   * @param permission file permission
1117   * @param overwrite if a file with this name already exists, then if true,
1118   * the file will be overwritten, and if false an error will be thrown.
1119   * @param bufferSize the size of the buffer to be used.
1120   * @param replication required block replication for the file.
1121   * @param blockSize block size
1122   * @param progress the progress reporter
1123   * @throws IOException
1124   * @see #setPermission(Path, FsPermission)
1125   */
1126   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1127       boolean overwrite, int bufferSize, short replication, long blockSize,
1128       Progressable progress) throws IOException {
1129     return createNonRecursive(f, permission,
1130         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1131             : EnumSet.of(CreateFlag.CREATE), bufferSize,
1132             replication, blockSize, progress);
1133   }
1134
1135   /**
1136    * Opens an FSDataOutputStream at the indicated Path with write-progress
1137    * reporting. Same as create(), except fails if parent directory doesn't
1138    * already exist.
1139    * @param f the file name to open
1140    * @param permission file permission
1141    * @param flags {@link CreateFlag}s to use for this stream.
1142    * @param bufferSize the size of the buffer to be used.
1143    * @param replication required block replication for the file.
1144    * @param blockSize block size
1145    * @param progress the progress reporter
1146    * @throws IOException
1147    * @see #setPermission(Path, FsPermission)
1148    */
1149    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1150        EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1151        Progressable progress) throws IOException {
1152      throw new IOException("createNonRecursive unsupported for this filesystem "
1153          + this.getClass());
1154    }
1155
1156  /**
1157   * Creates the given Path as a brand-new zero-length file.  If
1158   * create fails, or if it already existed, return false.
1159   *
1160   * @param f path to use for create
1161   */
1162  public boolean createNewFile(Path f) throws IOException {
1163    if (exists(f)) {
1164      return false;
1165    } else {
1166      create(f, false, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
1167          IO_FILE_BUFFER_SIZE_DEFAULT)).close();
1168      return true;
1169    }
1170  }
1171
1172  /**
1173   * Append to an existing file (optional operation).
1174   * Same as append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
1175   *     IO_FILE_BUFFER_SIZE_DEFAULT), null)
1176   * @param f the existing file to be appended.
1177   * @throws IOException
1178   */
1179  public FSDataOutputStream append(Path f) throws IOException {
1180    return append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
1181        IO_FILE_BUFFER_SIZE_DEFAULT), null);
1182  }
1183  /**
1184   * Append to an existing file (optional operation).
1185   * Same as append(f, bufferSize, null).
1186   * @param f the existing file to be appended.
1187   * @param bufferSize the size of the buffer to be used.
1188   * @throws IOException
1189   */
1190  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1191    return append(f, bufferSize, null);
1192  }
1193
1194  /**
1195   * Append to an existing file (optional operation).
1196   * @param f the existing file to be appended.
1197   * @param bufferSize the size of the buffer to be used.
1198   * @param progress for reporting progress if it is not null.
1199   * @throws IOException
1200   */
1201  public abstract FSDataOutputStream append(Path f, int bufferSize,
1202      Progressable progress) throws IOException;
1203
1204  /**
1205   * Concat existing files together.
1206   * @param trg the path to the target destination.
1207   * @param psrcs the paths to the sources to use for the concatenation.
1208   * @throws IOException
1209   */
1210  public void concat(final Path trg, final Path [] psrcs) throws IOException {
1211    throw new UnsupportedOperationException("Not implemented by the " + 
1212        getClass().getSimpleName() + " FileSystem implementation");
1213  }
1214
1215 /**
1216   * Get replication.
1217   * 
1218   * @deprecated Use getFileStatus() instead
1219   * @param src file name
1220   * @return file replication
1221   * @throws IOException
1222   */ 
1223  @Deprecated
1224  public short getReplication(Path src) throws IOException {
1225    return getFileStatus(src).getReplication();
1226  }
1227
1228  /**
1229   * Set replication for an existing file.
1230   * 
1231   * @param src file name
1232   * @param replication new replication
1233   * @throws IOException
1234   * @return true if successful;
1235   *         false if file does not exist or is a directory
1236   */
1237  public boolean setReplication(Path src, short replication)
1238    throws IOException {
1239    return true;
1240  }
1241
1242  /**
1243   * Renames Path src to Path dst.  Can take place on local fs
1244   * or remote DFS.
1245   * @param src path to be renamed
1246   * @param dst new path after rename
1247   * @throws IOException on failure
1248   * @return true if rename is successful
1249   */
1250  public abstract boolean rename(Path src, Path dst) throws IOException;
1251
1252  /**
1253   * Renames Path src to Path dst
1254   * <ul>
1255   * <li>Fails if src is a file and dst is a directory.
1256   * <li>Fails if src is a directory and dst is a file.
1257   * <li>Fails if the parent of dst does not exist or is a file.
1258   * </ul>
1259   * <p>
1260   * If OVERWRITE option is not passed as an argument, rename fails
1261   * if the dst already exists.
1262   * <p>
1263   * If OVERWRITE option is passed as an argument, rename overwrites
1264   * the dst if it is a file or an empty directory. Rename fails if dst is
1265   * a non-empty directory.
1266   * <p>
1267   * Note that atomicity of rename is dependent on the file system
1268   * implementation. Please refer to the file system documentation for
1269   * details. This default implementation is non atomic.
1270   * <p>
1271   * This method is deprecated since it is a temporary method added to 
1272   * support the transition from FileSystem to FileContext for user 
1273   * applications.
1274   * 
1275   * @param src path to be renamed
1276   * @param dst new path after rename
1277   * @throws IOException on failure
1278   */
1279  @Deprecated
1280  protected void rename(final Path src, final Path dst,
1281      final Rename... options) throws IOException {
1282    // Default implementation
1283    final FileStatus srcStatus = getFileLinkStatus(src);
1284    if (srcStatus == null) {
1285      throw new FileNotFoundException("rename source " + src + " not found.");
1286    }
1287
1288    boolean overwrite = false;
1289    if (null != options) {
1290      for (Rename option : options) {
1291        if (option == Rename.OVERWRITE) {
1292          overwrite = true;
1293        }
1294      }
1295    }
1296
1297    FileStatus dstStatus;
1298    try {
1299      dstStatus = getFileLinkStatus(dst);
1300    } catch (IOException e) {
1301      dstStatus = null;
1302    }
1303    if (dstStatus != null) {
1304      if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1305        throw new IOException("Source " + src + " Destination " + dst
1306            + " both should be either file or directory");
1307      }
1308      if (!overwrite) {
1309        throw new FileAlreadyExistsException("rename destination " + dst
1310            + " already exists.");
1311      }
1312      // Delete the destination that is a file or an empty directory
1313      if (dstStatus.isDirectory()) {
1314        FileStatus[] list = listStatus(dst);
1315        if (list != null && list.length != 0) {
1316          throw new IOException(
1317              "rename cannot overwrite non empty destination directory " + dst);
1318        }
1319      }
1320      delete(dst, false);
1321    } else {
1322      final Path parent = dst.getParent();
1323      final FileStatus parentStatus = getFileStatus(parent);
1324      if (parentStatus == null) {
1325        throw new FileNotFoundException("rename destination parent " + parent
1326            + " not found.");
1327      }
1328      if (!parentStatus.isDirectory()) {
1329        throw new ParentNotDirectoryException("rename destination parent " + parent
1330            + " is a file.");
1331      }
1332    }
1333    if (!rename(src, dst)) {
1334      throw new IOException("rename from " + src + " to " + dst + " failed.");
1335    }
1336  }
1337
1338  /**
1339   * Truncate the file in the indicated path to the indicated size.
1340   * <ul>
1341   * <li>Fails if path is a directory.
1342   * <li>Fails if path does not exist.
1343   * <li>Fails if path is not closed.
1344   * <li>Fails if new size is greater than current size.
1345   * </ul>
1346   * @param f The path to the file to be truncated
1347   * @param newLength The size the file is to be truncated to
1348   *
1349   * @return <code>true</code> if the file has been truncated to the desired
1350   * <code>newLength</code> and is immediately available to be reused for
1351   * write operations such as <code>append</code>, or
1352   * <code>false</code> if a background process of adjusting the length of
1353   * the last block has been started, and clients should wait for it to
1354   * complete before proceeding with further file updates.
1355   */
1356  public boolean truncate(Path f, long newLength) throws IOException {
1357    throw new UnsupportedOperationException("Not implemented by the " +
1358        getClass().getSimpleName() + " FileSystem implementation");
1359  }
1360  
1361  /**
1362   * Delete a file 
1363   * @deprecated Use {@link #delete(Path, boolean)} instead.
1364   */
1365  @Deprecated
1366  public boolean delete(Path f) throws IOException {
1367    return delete(f, true);
1368  }
1369  
1370  /** Delete a file.
1371   *
1372   * @param f the path to delete.
1373   * @param recursive if path is a directory and set to 
1374   * true, the directory is deleted else throws an exception. In
1375   * case of a file the recursive can be set to either true or false. 
1376   * @return  true if delete is successful else false. 
1377   * @throws IOException
1378   */
1379  public abstract boolean delete(Path f, boolean recursive) throws IOException;
1380
1381  /**
1382   * Mark a path to be deleted when FileSystem is closed.
1383   * When the JVM shuts down,
1384   * all FileSystem objects will be closed automatically.
1385   * Then,
1386   * the marked path will be deleted as a result of closing the FileSystem.
1387   *
1388   * The path has to exist in the file system.
1389   * 
1390   * @param f the path to delete.
1391   * @return  true if deleteOnExit is successful, otherwise false.
1392   * @throws IOException
1393   */
1394  public boolean deleteOnExit(Path f) throws IOException {
1395    if (!exists(f)) {
1396      return false;
1397    }
1398    synchronized (deleteOnExit) {
1399      deleteOnExit.add(f);
1400    }
1401    return true;
1402  }
1403  
1404  /**
1405   * Cancel the deletion of the path when the FileSystem is closed
1406   * @param f the path to cancel deletion
1407   */
1408  public boolean cancelDeleteOnExit(Path f) {
1409    synchronized (deleteOnExit) {
1410      return deleteOnExit.remove(f);
1411    }
1412  }
1413
1414  /**
1415   * Delete all files that were marked as delete-on-exit. This recursively
1416   * deletes all files in the specified paths.
1417   */
1418  protected void processDeleteOnExit() {
1419    synchronized (deleteOnExit) {
1420      for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1421        Path path = iter.next();
1422        try {
1423          if (exists(path)) {
1424            delete(path, true);
1425          }
1426        }
1427        catch (IOException e) {
1428          LOG.info("Ignoring failure to deleteOnExit for path " + path);
1429        }
1430        iter.remove();
1431      }
1432    }
1433  }
1434  
1435  /** Check if exists.
1436   * @param f source file
1437   */
1438  public boolean exists(Path f) throws IOException {
1439    try {
1440      return getFileStatus(f) != null;
1441    } catch (FileNotFoundException e) {
1442      return false;
1443    }
1444  }
1445
1446  /** True iff the named path is a directory.
1447   * Note: Avoid using this method. Instead reuse the FileStatus 
1448   * returned by getFileStatus() or listStatus() methods.
1449   * @param f path to check
1450   */
1451  public boolean isDirectory(Path f) throws IOException {
1452    try {
1453      return getFileStatus(f).isDirectory();
1454    } catch (FileNotFoundException e) {
1455      return false;               // f does not exist
1456    }
1457  }
1458
1459  /** True iff the named path is a regular file.
1460   * Note: Avoid using this method. Instead reuse the FileStatus 
1461   * returned by getFileStatus() or listStatus() methods.
1462   * @param f path to check
1463   */
1464  public boolean isFile(Path f) throws IOException {
1465    try {
1466      return getFileStatus(f).isFile();
1467    } catch (FileNotFoundException e) {
1468      return false;               // f does not exist
1469    }
1470  }
1471  
1472  /** The number of bytes in a file. */
1473  /** @deprecated Use getFileStatus() instead */
1474  @Deprecated
1475  public long getLength(Path f) throws IOException {
1476    return getFileStatus(f).getLen();
1477  }
1478    
1479  /** Return the {@link ContentSummary} of a given {@link Path}.
1480  * @param f path to use
1481  */
1482  public ContentSummary getContentSummary(Path f) throws IOException {
1483    FileStatus status = getFileStatus(f);
1484    if (status.isFile()) {
1485      // f is a file
1486      long length = status.getLen();
1487      return new ContentSummary.Builder().length(length).
1488          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1489    }
1490    // f is a directory
1491    long[] summary = {0, 0, 1};
1492    for(FileStatus s : listStatus(f)) {
1493      long length = s.getLen();
1494      ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1495          new ContentSummary.Builder().length(length).
1496          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1497      summary[0] += c.getLength();
1498      summary[1] += c.getFileCount();
1499      summary[2] += c.getDirectoryCount();
1500    }
1501    return new ContentSummary.Builder().length(summary[0]).
1502        fileCount(summary[1]).directoryCount(summary[2]).
1503        spaceConsumed(summary[0]).build();
1504  }
1505
1506  /** Return the {@link QuotaUsage} of a given {@link Path}.
1507   * @param f path to use
1508   */
1509  public QuotaUsage getQuotaUsage(Path f) throws IOException {
1510    return getContentSummary(f);
1511  }
1512
1513  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1514    @Override
1515    public boolean accept(Path file) {
1516      return true;
1517    }
1518  };
1519    
1520  /**
1521   * List the statuses of the files/directories in the given path if the path is
1522   * a directory.
1523   * <p>
1524   * Does not guarantee to return the List of files/directories status in a
1525   * sorted order.
1526   * @param f given path
1527   * @return the statuses of the files/directories in the given patch
1528   * @throws FileNotFoundException when the path does not exist;
1529   *         IOException see specific implementation
1530   */
1531  public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1532                                                         IOException;
1533    
1534  /*
1535   * Filter files/directories in the given path using the user-supplied path
1536   * filter. Results are added to the given array <code>results</code>.
1537   */
1538  private void listStatus(ArrayList<FileStatus> results, Path f,
1539      PathFilter filter) throws FileNotFoundException, IOException {
1540    FileStatus listing[] = listStatus(f);
1541    if (listing == null) {
1542      throw new IOException("Error accessing " + f);
1543    }
1544
1545    for (int i = 0; i < listing.length; i++) {
1546      if (filter.accept(listing[i].getPath())) {
1547        results.add(listing[i]);
1548      }
1549    }
1550  }
1551
1552  /**
1553   * @return an iterator over the corrupt files under the given path
1554   * (may contain duplicates if a file has more than one corrupt block)
1555   * @throws IOException
1556   */
1557  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1558    throws IOException {
1559    throw new UnsupportedOperationException(getClass().getCanonicalName() +
1560                                            " does not support" +
1561                                            " listCorruptFileBlocks");
1562  }
1563
1564  /**
1565   * Filter files/directories in the given path using the user-supplied path
1566   * filter.
1567   * <p>
1568   * Does not guarantee to return the List of files/directories status in a
1569   * sorted order.
1570   * 
1571   * @param f
1572   *          a path name
1573   * @param filter
1574   *          the user-supplied path filter
1575   * @return an array of FileStatus objects for the files under the given path
1576   *         after applying the filter
1577   * @throws FileNotFoundException when the path does not exist;
1578   *         IOException see specific implementation   
1579   */
1580  public FileStatus[] listStatus(Path f, PathFilter filter) 
1581                                   throws FileNotFoundException, IOException {
1582    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1583    listStatus(results, f, filter);
1584    return results.toArray(new FileStatus[results.size()]);
1585  }
1586
1587  /**
1588   * Filter files/directories in the given list of paths using default
1589   * path filter.
1590   * <p>
1591   * Does not guarantee to return the List of files/directories status in a
1592   * sorted order.
1593   * 
1594   * @param files
1595   *          a list of paths
1596   * @return a list of statuses for the files under the given paths after
1597   *         applying the filter default Path filter
1598   * @throws FileNotFoundException when the path does not exist;
1599   *         IOException see specific implementation
1600   */
1601  public FileStatus[] listStatus(Path[] files)
1602      throws FileNotFoundException, IOException {
1603    return listStatus(files, DEFAULT_FILTER);
1604  }
1605
1606  /**
1607   * Filter files/directories in the given list of paths using user-supplied
1608   * path filter.
1609   * <p>
1610   * Does not guarantee to return the List of files/directories status in a
1611   * sorted order.
1612   * 
1613   * @param files
1614   *          a list of paths
1615   * @param filter
1616   *          the user-supplied path filter
1617   * @return a list of statuses for the files under the given paths after
1618   *         applying the filter
1619   * @throws FileNotFoundException when the path does not exist;
1620   *         IOException see specific implementation
1621   */
1622  public FileStatus[] listStatus(Path[] files, PathFilter filter)
1623      throws FileNotFoundException, IOException {
1624    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1625    for (int i = 0; i < files.length; i++) {
1626      listStatus(results, files[i], filter);
1627    }
1628    return results.toArray(new FileStatus[results.size()]);
1629  }
1630
1631  /**
1632   * <p>Return all the files that match filePattern and are not checksum
1633   * files. Results are sorted by their names.
1634   * 
1635   * <p>
1636   * A filename pattern is composed of <i>regular</i> characters and
1637   * <i>special pattern matching</i> characters, which are:
1638   *
1639   * <dl>
1640   *  <dd>
1641   *   <dl>
1642   *    <p>
1643   *    <dt> <tt> ? </tt>
1644   *    <dd> Matches any single character.
1645   *
1646   *    <p>
1647   *    <dt> <tt> * </tt>
1648   *    <dd> Matches zero or more characters.
1649   *
1650   *    <p>
1651   *    <dt> <tt> [<i>abc</i>] </tt>
1652   *    <dd> Matches a single character from character set
1653   *     <tt>{<i>a,b,c</i>}</tt>.
1654   *
1655   *    <p>
1656   *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1657   *    <dd> Matches a single character from the character range
1658   *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1659   *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1660   *
1661   *    <p>
1662   *    <dt> <tt> [^<i>a</i>] </tt>
1663   *    <dd> Matches a single character that is not from character set or range
1664   *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1665   *     immediately to the right of the opening bracket.
1666   *
1667   *    <p>
1668   *    <dt> <tt> \<i>c</i> </tt>
1669   *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1670   *
1671   *    <p>
1672   *    <dt> <tt> {ab,cd} </tt>
1673   *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1674   *    
1675   *    <p>
1676   *    <dt> <tt> {ab,c{de,fh}} </tt>
1677   *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1678   *
1679   *   </dl>
1680   *  </dd>
1681   * </dl>
1682   *
1683   * @param pathPattern a regular expression specifying a pth pattern
1684
1685   * @return an array of paths that match the path pattern
1686   * @throws IOException
1687   */
1688  public FileStatus[] globStatus(Path pathPattern) throws IOException {
1689    return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
1690  }
1691  
1692  /**
1693   * Return an array of FileStatus objects whose path names match
1694   * {@code pathPattern} and is accepted by the user-supplied path filter.
1695   * Results are sorted by their path names.
1696   * 
1697   * @param pathPattern a regular expression specifying the path pattern
1698   * @param filter a user-supplied path filter
1699   * @return null if {@code pathPattern} has no glob and the path does not exist
1700   *         an empty array if {@code pathPattern} has a glob and no path
1701   *         matches it else an array of {@link FileStatus} objects matching the
1702   *         pattern
1703   * @throws IOException if any I/O error occurs when fetching file status
1704   */
1705  public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1706      throws IOException {
1707    return new Globber(this, pathPattern, filter).glob();
1708  }
1709  
1710  /**
1711   * List the statuses of the files/directories in the given path if the path is
1712   * a directory. 
1713   * Return the file's status and block locations If the path is a file.
1714   * 
1715   * If a returned status is a file, it contains the file's block locations.
1716   * 
1717   * @param f is the path
1718   *
1719   * @return an iterator that traverses statuses of the files/directories 
1720   *         in the given path
1721   *
1722   * @throws FileNotFoundException If <code>f</code> does not exist
1723   * @throws IOException If an I/O error occurred
1724   */
1725  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1726  throws FileNotFoundException, IOException {
1727    return listLocatedStatus(f, DEFAULT_FILTER);
1728  }
1729
1730  /**
1731   * Listing a directory
1732   * The returned results include its block location if it is a file
1733   * The results are filtered by the given path filter
1734   * @param f a path
1735   * @param filter a path filter
1736   * @return an iterator that traverses statuses of the files/directories 
1737   *         in the given path
1738   * @throws FileNotFoundException if <code>f</code> does not exist
1739   * @throws IOException if any I/O error occurred
1740   */
1741  protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1742      final PathFilter filter)
1743  throws FileNotFoundException, IOException {
1744    return new RemoteIterator<LocatedFileStatus>() {
1745      private final FileStatus[] stats = listStatus(f, filter);
1746      private int i = 0;
1747
1748      @Override
1749      public boolean hasNext() {
1750        return i<stats.length;
1751      }
1752
1753      @Override
1754      public LocatedFileStatus next() throws IOException {
1755        if (!hasNext()) {
1756          throw new NoSuchElementException("No more entries in " + f);
1757        }
1758        FileStatus result = stats[i++];
1759        // for files, use getBlockLocations(FileStatus, int, int) to avoid
1760        // calling getFileStatus(Path) to load the FileStatus again
1761        BlockLocation[] locs = result.isFile() ?
1762            getFileBlockLocations(result, 0, result.getLen()) :
1763            null;
1764        return new LocatedFileStatus(result, locs);
1765      }
1766    };
1767  }
1768
1769  /**
1770   * Returns a remote iterator so that followup calls are made on demand
1771   * while consuming the entries. Each file system implementation should
1772   * override this method and provide a more efficient implementation, if
1773   * possible. 
1774   * Does not guarantee to return the iterator that traverses statuses
1775   * of the files in a sorted order.
1776   *
1777   * @param p target path
1778   * @return remote iterator
1779   */
1780  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
1781  throws FileNotFoundException, IOException {
1782    return new RemoteIterator<FileStatus>() {
1783      private final FileStatus[] stats = listStatus(p);
1784      private int i = 0;
1785
1786      @Override
1787      public boolean hasNext() {
1788        return i<stats.length;
1789      }
1790
1791      @Override
1792      public FileStatus next() throws IOException {
1793        if (!hasNext()) {
1794          throw new NoSuchElementException("No more entry in " + p);
1795        }
1796        return stats[i++];
1797      }
1798    };
1799  }
1800
1801  /**
1802   * List the statuses and block locations of the files in the given path.
1803   * Does not guarantee to return the iterator that traverses statuses
1804   * of the files in a sorted order.
1805   * 
1806   * If the path is a directory, 
1807   *   if recursive is false, returns files in the directory;
1808   *   if recursive is true, return files in the subtree rooted at the path.
1809   * If the path is a file, return the file's status and block locations.
1810   * 
1811   * @param f is the path
1812   * @param recursive if the subdirectories need to be traversed recursively
1813   *
1814   * @return an iterator that traverses statuses of the files
1815   *
1816   * @throws FileNotFoundException when the path does not exist;
1817   *         IOException see specific implementation
1818   */
1819  public RemoteIterator<LocatedFileStatus> listFiles(
1820      final Path f, final boolean recursive)
1821  throws FileNotFoundException, IOException {
1822    return new RemoteIterator<LocatedFileStatus>() {
1823      private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1824        new Stack<RemoteIterator<LocatedFileStatus>>();
1825      private RemoteIterator<LocatedFileStatus> curItor =
1826        listLocatedStatus(f);
1827      private LocatedFileStatus curFile;
1828     
1829      @Override
1830      public boolean hasNext() throws IOException {
1831        while (curFile == null) {
1832          if (curItor.hasNext()) {
1833            handleFileStat(curItor.next());
1834          } else if (!itors.empty()) {
1835            curItor = itors.pop();
1836          } else {
1837            return false;
1838          }
1839        }
1840        return true;
1841      }
1842
1843      /**
1844       * Process the input stat.
1845       * If it is a file, return the file stat.
1846       * If it is a directory, traverse the directory if recursive is true;
1847       * ignore it if recursive is false.
1848       * @param stat input status
1849       * @throws IOException if any IO error occurs
1850       */
1851      private void handleFileStat(LocatedFileStatus stat) throws IOException {
1852        if (stat.isFile()) { // file
1853          curFile = stat;
1854        } else if (recursive) { // directory
1855          itors.push(curItor);
1856          curItor = listLocatedStatus(stat.getPath());
1857        }
1858      }
1859
1860      @Override
1861      public LocatedFileStatus next() throws IOException {
1862        if (hasNext()) {
1863          LocatedFileStatus result = curFile;
1864          curFile = null;
1865          return result;
1866        } 
1867        throw new java.util.NoSuchElementException("No more entry in " + f);
1868      }
1869    };
1870  }
1871  
1872  /** Return the current user's home directory in this filesystem.
1873   * The default implementation returns "/user/$USER/".
1874   */
1875  public Path getHomeDirectory() {
1876    return this.makeQualified(
1877        new Path("/user/"+System.getProperty("user.name")));
1878  }
1879
1880
1881  /**
1882   * Set the current working directory for the given file system. All relative
1883   * paths will be resolved relative to it.
1884   * 
1885   * @param new_dir Path of new working directory
1886   */
1887  public abstract void setWorkingDirectory(Path new_dir);
1888    
1889  /**
1890   * Get the current working directory for the given file system
1891   * @return the directory pathname
1892   */
1893  public abstract Path getWorkingDirectory();
1894  
1895  
1896  /**
1897   * Note: with the new FilesContext class, getWorkingDirectory()
1898   * will be removed. 
1899   * The working directory is implemented in FilesContext.
1900   * 
1901   * Some file systems like LocalFileSystem have an initial workingDir
1902   * that we use as the starting workingDir. For other file systems
1903   * like HDFS there is no built in notion of an initial workingDir.
1904   * 
1905   * @return if there is built in notion of workingDir then it
1906   * is returned; else a null is returned.
1907   */
1908  protected Path getInitialWorkingDirectory() {
1909    return null;
1910  }
1911
1912  /**
1913   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1914   */
1915  public boolean mkdirs(Path f) throws IOException {
1916    return mkdirs(f, FsPermission.getDirDefault());
1917  }
1918
1919  /**
1920   * Make the given file and all non-existent parents into
1921   * directories. Has the semantics of Unix 'mkdir -p'.
1922   * Existence of the directory hierarchy is not an error.
1923   * @param f path to create
1924   * @param permission to apply to f
1925   */
1926  public abstract boolean mkdirs(Path f, FsPermission permission
1927      ) throws IOException;
1928
1929  /**
1930   * The src file is on the local disk.  Add it to FS at
1931   * the given dst name and the source is kept intact afterwards
1932   * @param src path
1933   * @param dst path
1934   */
1935  public void copyFromLocalFile(Path src, Path dst)
1936    throws IOException {
1937    copyFromLocalFile(false, src, dst);
1938  }
1939
1940  /**
1941   * The src files is on the local disk.  Add it to FS at
1942   * the given dst name, removing the source afterwards.
1943   * @param srcs path
1944   * @param dst path
1945   */
1946  public void moveFromLocalFile(Path[] srcs, Path dst)
1947    throws IOException {
1948    copyFromLocalFile(true, true, srcs, dst);
1949  }
1950
1951  /**
1952   * The src file is on the local disk.  Add it to FS at
1953   * the given dst name, removing the source afterwards.
1954   * @param src path
1955   * @param dst path
1956   */
1957  public void moveFromLocalFile(Path src, Path dst)
1958    throws IOException {
1959    copyFromLocalFile(true, src, dst);
1960  }
1961
1962  /**
1963   * The src file is on the local disk.  Add it to FS at
1964   * the given dst name.
1965   * delSrc indicates if the source should be removed
1966   * @param delSrc whether to delete the src
1967   * @param src path
1968   * @param dst path
1969   */
1970  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1971    throws IOException {
1972    copyFromLocalFile(delSrc, true, src, dst);
1973  }
1974  
1975  /**
1976   * The src files are on the local disk.  Add it to FS at
1977   * the given dst name.
1978   * delSrc indicates if the source should be removed
1979   * @param delSrc whether to delete the src
1980   * @param overwrite whether to overwrite an existing file
1981   * @param srcs array of paths which are source
1982   * @param dst path
1983   */
1984  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1985                                Path[] srcs, Path dst)
1986    throws IOException {
1987    Configuration conf = getConf();
1988    FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1989  }
1990  
1991  /**
1992   * The src file is on the local disk.  Add it to FS at
1993   * the given dst name.
1994   * delSrc indicates if the source should be removed
1995   * @param delSrc whether to delete the src
1996   * @param overwrite whether to overwrite an existing file
1997   * @param src path
1998   * @param dst path
1999   */
2000  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
2001                                Path src, Path dst)
2002    throws IOException {
2003    Configuration conf = getConf();
2004    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
2005  }
2006    
2007  /**
2008   * The src file is under FS, and the dst is on the local disk.
2009   * Copy it from FS control to the local dst name.
2010   * @param src path
2011   * @param dst path
2012   */
2013  public void copyToLocalFile(Path src, Path dst) throws IOException {
2014    copyToLocalFile(false, src, dst);
2015  }
2016    
2017  /**
2018   * The src file is under FS, and the dst is on the local disk.
2019   * Copy it from FS control to the local dst name.
2020   * Remove the source afterwards
2021   * @param src path
2022   * @param dst path
2023   */
2024  public void moveToLocalFile(Path src, Path dst) throws IOException {
2025    copyToLocalFile(true, src, dst);
2026  }
2027
2028  /**
2029   * The src file is under FS, and the dst is on the local disk.
2030   * Copy it from FS control to the local dst name.
2031   * delSrc indicates if the src will be removed or not.
2032   * @param delSrc whether to delete the src
2033   * @param src path
2034   * @param dst path
2035   */   
2036  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
2037    throws IOException {
2038    copyToLocalFile(delSrc, src, dst, false);
2039  }
2040  
2041    /**
2042   * The src file is under FS, and the dst is on the local disk. Copy it from FS
2043   * control to the local dst name. delSrc indicates if the src will be removed
2044   * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
2045   * as local file system or not. RawLocalFileSystem is non crc file system.So,
2046   * It will not create any crc files at local.
2047   * 
2048   * @param delSrc
2049   *          whether to delete the src
2050   * @param src
2051   *          path
2052   * @param dst
2053   *          path
2054   * @param useRawLocalFileSystem
2055   *          whether to use RawLocalFileSystem as local file system or not.
2056   * 
2057   * @throws IOException
2058   *           - if any IO error
2059   */
2060  public void copyToLocalFile(boolean delSrc, Path src, Path dst,
2061      boolean useRawLocalFileSystem) throws IOException {
2062    Configuration conf = getConf();
2063    FileSystem local = null;
2064    if (useRawLocalFileSystem) {
2065      local = getLocal(conf).getRawFileSystem();
2066    } else {
2067      local = getLocal(conf);
2068    }
2069    FileUtil.copy(this, src, local, dst, delSrc, conf);
2070  }
2071
2072  /**
2073   * Returns a local File that the user can write output to.  The caller
2074   * provides both the eventual FS target name and the local working
2075   * file.  If the FS is local, we write directly into the target.  If
2076   * the FS is remote, we write into the tmp local area.
2077   * @param fsOutputFile path of output file
2078   * @param tmpLocalFile path of local tmp file
2079   */
2080  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2081    throws IOException {
2082    return tmpLocalFile;
2083  }
2084
2085  /**
2086   * Called when we're all done writing to the target.  A local FS will
2087   * do nothing, because we've written to exactly the right place.  A remote
2088   * FS will copy the contents of tmpLocalFile to the correct target at
2089   * fsOutputFile.
2090   * @param fsOutputFile path of output file
2091   * @param tmpLocalFile path to local tmp file
2092   */
2093  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2094    throws IOException {
2095    moveFromLocalFile(tmpLocalFile, fsOutputFile);
2096  }
2097
2098  /**
2099   * No more filesystem operations are needed.  Will
2100   * release any held locks.
2101   */
2102  @Override
2103  public void close() throws IOException {
2104    // delete all files that were marked as delete-on-exit.
2105    processDeleteOnExit();
2106    CACHE.remove(this.key, this);
2107  }
2108
2109  /** Return the total size of all files in the filesystem. */
2110  public long getUsed() throws IOException {
2111    Path path = new Path("/");
2112    return getUsed(path);
2113  }
2114
2115  /** Return the total size of all files from a specified path. */
2116  public long getUsed(Path path) throws IOException {
2117    return getContentSummary(path).getLength();
2118  }
2119
2120  /**
2121   * Get the block size for a particular file.
2122   * @param f the filename
2123   * @return the number of bytes in a block
2124   */
2125  /** @deprecated Use getFileStatus() instead */
2126  @Deprecated
2127  public long getBlockSize(Path f) throws IOException {
2128    return getFileStatus(f).getBlockSize();
2129  }
2130
2131  /**
2132   * Return the number of bytes that large input files should be optimally
2133   * be split into to minimize i/o time.
2134   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
2135   */
2136  @Deprecated
2137  public long getDefaultBlockSize() {
2138    // default to 32MB: large enough to minimize the impact of seeks
2139    return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2140  }
2141    
2142  /** Return the number of bytes that large input files should be optimally
2143   * be split into to minimize i/o time.  The given path will be used to
2144   * locate the actual filesystem.  The full path does not have to exist.
2145   * @param f path of file
2146   * @return the default block size for the path's filesystem
2147   */
2148  public long getDefaultBlockSize(Path f) {
2149    return getDefaultBlockSize();
2150  }
2151
2152  /**
2153   * Get the default replication.
2154   * @deprecated use {@link #getDefaultReplication(Path)} instead
2155   */
2156  @Deprecated
2157  public short getDefaultReplication() { return 1; }
2158
2159  /**
2160   * Get the default replication for a path.   The given path will be used to
2161   * locate the actual filesystem.  The full path does not have to exist.
2162   * @param path of the file
2163   * @return default replication for the path's filesystem 
2164   */
2165  public short getDefaultReplication(Path path) {
2166    return getDefaultReplication();
2167  }
2168  
2169  /**
2170   * Return a file status object that represents the path.
2171   * @param f The path we want information from
2172   * @return a FileStatus object
2173   * @throws FileNotFoundException when the path does not exist;
2174   *         IOException see specific implementation
2175   */
2176  public abstract FileStatus getFileStatus(Path f) throws IOException;
2177
2178  /**
2179   * Checks if the user can access a path.  The mode specifies which access
2180   * checks to perform.  If the requested permissions are granted, then the
2181   * method returns normally.  If access is denied, then the method throws an
2182   * {@link AccessControlException}.
2183   * <p/>
2184   * The default implementation of this method calls {@link #getFileStatus(Path)}
2185   * and checks the returned permissions against the requested permissions.
2186   * Note that the getFileStatus call will be subject to authorization checks.
2187   * Typically, this requires search (execute) permissions on each directory in
2188   * the path's prefix, but this is implementation-defined.  Any file system
2189   * that provides a richer authorization model (such as ACLs) may override the
2190   * default implementation so that it checks against that model instead.
2191   * <p>
2192   * In general, applications should avoid using this method, due to the risk of
2193   * time-of-check/time-of-use race conditions.  The permissions on a file may
2194   * change immediately after the access call returns.  Most applications should
2195   * prefer running specific file system actions as the desired user represented
2196   * by a {@link UserGroupInformation}.
2197   *
2198   * @param path Path to check
2199   * @param mode type of access to check
2200   * @throws AccessControlException if access is denied
2201   * @throws FileNotFoundException if the path does not exist
2202   * @throws IOException see specific implementation
2203   */
2204  @InterfaceAudience.LimitedPrivate({"HDFS", "Hive"})
2205  public void access(Path path, FsAction mode) throws AccessControlException,
2206      FileNotFoundException, IOException {
2207    checkAccessPermissions(this.getFileStatus(path), mode);
2208  }
2209
2210  /**
2211   * This method provides the default implementation of
2212   * {@link #access(Path, FsAction)}.
2213   *
2214   * @param stat FileStatus to check
2215   * @param mode type of access to check
2216   * @throws IOException for any error
2217   */
2218  @InterfaceAudience.Private
2219  static void checkAccessPermissions(FileStatus stat, FsAction mode)
2220      throws IOException {
2221    FsPermission perm = stat.getPermission();
2222    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
2223    String user = ugi.getShortUserName();
2224    if (user.equals(stat.getOwner())) {
2225      if (perm.getUserAction().implies(mode)) {
2226        return;
2227      }
2228    } else if (ugi.getGroups().contains(stat.getGroup())) {
2229      if (perm.getGroupAction().implies(mode)) {
2230        return;
2231      }
2232    } else {
2233      if (perm.getOtherAction().implies(mode)) {
2234        return;
2235      }
2236    }
2237    throw new AccessControlException(String.format(
2238      "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat.getPath(),
2239      stat.getOwner(), stat.getGroup(), stat.isDirectory() ? "d" : "-", perm));
2240  }
2241
2242  /**
2243   * See {@link FileContext#fixRelativePart}
2244   */
2245  protected Path fixRelativePart(Path p) {
2246    if (p.isUriPathAbsolute()) {
2247      return p;
2248    } else {
2249      return new Path(getWorkingDirectory(), p);
2250    }
2251  }
2252
2253  /**
2254   * See {@link FileContext#createSymlink(Path, Path, boolean)}
2255   */
2256  public void createSymlink(final Path target, final Path link,
2257      final boolean createParent) throws AccessControlException,
2258      FileAlreadyExistsException, FileNotFoundException,
2259      ParentNotDirectoryException, UnsupportedFileSystemException, 
2260      IOException {
2261    // Supporting filesystems should override this method
2262    throw new UnsupportedOperationException(
2263        "Filesystem does not support symlinks!");
2264  }
2265
2266  /**
2267   * See {@link FileContext#getFileLinkStatus(Path)}
2268   */
2269  public FileStatus getFileLinkStatus(final Path f)
2270      throws AccessControlException, FileNotFoundException,
2271      UnsupportedFileSystemException, IOException {
2272    // Supporting filesystems should override this method
2273    return getFileStatus(f);
2274  }
2275
2276  /**
2277   * See {@link AbstractFileSystem#supportsSymlinks()}
2278   */
2279  public boolean supportsSymlinks() {
2280    return false;
2281  }
2282
2283  /**
2284   * See {@link FileContext#getLinkTarget(Path)}
2285   */
2286  public Path getLinkTarget(Path f) throws IOException {
2287    // Supporting filesystems should override this method
2288    throw new UnsupportedOperationException(
2289        "Filesystem does not support symlinks!");
2290  }
2291
2292  /**
2293   * See {@link AbstractFileSystem#getLinkTarget(Path)}
2294   */
2295  protected Path resolveLink(Path f) throws IOException {
2296    // Supporting filesystems should override this method
2297    throw new UnsupportedOperationException(
2298        "Filesystem does not support symlinks!");
2299  }
2300
2301  /**
2302   * Get the checksum of a file.
2303   *
2304   * @param f The file path
2305   * @return The file checksum.  The default return value is null,
2306   *  which indicates that no checksum algorithm is implemented
2307   *  in the corresponding FileSystem.
2308   */
2309  public FileChecksum getFileChecksum(Path f) throws IOException {
2310    return getFileChecksum(f, Long.MAX_VALUE);
2311  }
2312
2313  /**
2314   * Get the checksum of a file, from the beginning of the file till the
2315   * specific length.
2316   * @param f The file path
2317   * @param length The length of the file range for checksum calculation
2318   * @return The file checksum.
2319   */
2320  public FileChecksum getFileChecksum(Path f, final long length)
2321      throws IOException {
2322    return null;
2323  }
2324
2325  /**
2326   * Set the verify checksum flag. This is only applicable if the 
2327   * corresponding FileSystem supports checksum. By default doesn't do anything.
2328   * @param verifyChecksum Verify checksum flag
2329   */
2330  public void setVerifyChecksum(boolean verifyChecksum) {
2331    //doesn't do anything
2332  }
2333
2334  /**
2335   * Set the write checksum flag. This is only applicable if the 
2336   * corresponding FileSystem supports checksum. By default doesn't do anything.
2337   * @param writeChecksum Write checsum flag
2338   */
2339  public void setWriteChecksum(boolean writeChecksum) {
2340    //doesn't do anything
2341  }
2342
2343  /**
2344   * Returns a status object describing the use and capacity of the
2345   * file system. If the file system has multiple partitions, the
2346   * use and capacity of the root partition is reflected.
2347   * 
2348   * @return a FsStatus object
2349   * @throws IOException
2350   *           see specific implementation
2351   */
2352  public FsStatus getStatus() throws IOException {
2353    return getStatus(null);
2354  }
2355
2356  /**
2357   * Returns a status object describing the use and capacity of the
2358   * file system. If the file system has multiple partitions, the
2359   * use and capacity of the partition pointed to by the specified
2360   * path is reflected.
2361   * @param p Path for which status should be obtained. null means
2362   * the default partition. 
2363   * @return a FsStatus object
2364   * @throws IOException
2365   *           see specific implementation
2366   */
2367  public FsStatus getStatus(Path p) throws IOException {
2368    return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2369  }
2370
2371  /**
2372   * Set permission of a path.
2373   * @param p The path
2374   * @param permission permission
2375   */
2376  public void setPermission(Path p, FsPermission permission
2377      ) throws IOException {
2378  }
2379
2380  /**
2381   * Set owner of a path (i.e. a file or a directory).
2382   * The parameters username and groupname cannot both be null.
2383   * @param p The path
2384   * @param username If it is null, the original username remains unchanged.
2385   * @param groupname If it is null, the original groupname remains unchanged.
2386   */
2387  public void setOwner(Path p, String username, String groupname
2388      ) throws IOException {
2389  }
2390
2391  /**
2392   * Set access time of a file
2393   * @param p The path
2394   * @param mtime Set the modification time of this file.
2395   *              The number of milliseconds since Jan 1, 1970. 
2396   *              A value of -1 means that this call should not set modification time.
2397   * @param atime Set the access time of this file.
2398   *              The number of milliseconds since Jan 1, 1970. 
2399   *              A value of -1 means that this call should not set access time.
2400   */
2401  public void setTimes(Path p, long mtime, long atime
2402      ) throws IOException {
2403  }
2404
2405  /**
2406   * Create a snapshot with a default name.
2407   * @param path The directory where snapshots will be taken.
2408   * @return the snapshot path.
2409   */
2410  public final Path createSnapshot(Path path) throws IOException {
2411    return createSnapshot(path, null);
2412  }
2413
2414  /**
2415   * Create a snapshot
2416   * @param path The directory where snapshots will be taken.
2417   * @param snapshotName The name of the snapshot
2418   * @return the snapshot path.
2419   */
2420  public Path createSnapshot(Path path, String snapshotName)
2421      throws IOException {
2422    throw new UnsupportedOperationException(getClass().getSimpleName()
2423        + " doesn't support createSnapshot");
2424  }
2425  
2426  /**
2427   * Rename a snapshot
2428   * @param path The directory path where the snapshot was taken
2429   * @param snapshotOldName Old name of the snapshot
2430   * @param snapshotNewName New name of the snapshot
2431   * @throws IOException
2432   */
2433  public void renameSnapshot(Path path, String snapshotOldName,
2434      String snapshotNewName) throws IOException {
2435    throw new UnsupportedOperationException(getClass().getSimpleName()
2436        + " doesn't support renameSnapshot");
2437  }
2438  
2439  /**
2440   * Delete a snapshot of a directory
2441   * @param path  The directory that the to-be-deleted snapshot belongs to
2442   * @param snapshotName The name of the snapshot
2443   */
2444  public void deleteSnapshot(Path path, String snapshotName)
2445      throws IOException {
2446    throw new UnsupportedOperationException(getClass().getSimpleName()
2447        + " doesn't support deleteSnapshot");
2448  }
2449  
2450  /**
2451   * Modifies ACL entries of files and directories.  This method can add new ACL
2452   * entries or modify the permissions on existing ACL entries.  All existing
2453   * ACL entries that are not specified in this call are retained without
2454   * changes.  (Modifications are merged into the current ACL.)
2455   *
2456   * @param path Path to modify
2457   * @param aclSpec List<AclEntry> describing modifications
2458   * @throws IOException if an ACL could not be modified
2459   */
2460  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
2461      throws IOException {
2462    throw new UnsupportedOperationException(getClass().getSimpleName()
2463        + " doesn't support modifyAclEntries");
2464  }
2465
2466  /**
2467   * Removes ACL entries from files and directories.  Other ACL entries are
2468   * retained.
2469   *
2470   * @param path Path to modify
2471   * @param aclSpec List<AclEntry> describing entries to remove
2472   * @throws IOException if an ACL could not be modified
2473   */
2474  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
2475      throws IOException {
2476    throw new UnsupportedOperationException(getClass().getSimpleName()
2477        + " doesn't support removeAclEntries");
2478  }
2479
2480  /**
2481   * Removes all default ACL entries from files and directories.
2482   *
2483   * @param path Path to modify
2484   * @throws IOException if an ACL could not be modified
2485   */
2486  public void removeDefaultAcl(Path path)
2487      throws IOException {
2488    throw new UnsupportedOperationException(getClass().getSimpleName()
2489        + " doesn't support removeDefaultAcl");
2490  }
2491
2492  /**
2493   * Removes all but the base ACL entries of files and directories.  The entries
2494   * for user, group, and others are retained for compatibility with permission
2495   * bits.
2496   *
2497   * @param path Path to modify
2498   * @throws IOException if an ACL could not be removed
2499   */
2500  public void removeAcl(Path path)
2501      throws IOException {
2502    throw new UnsupportedOperationException(getClass().getSimpleName()
2503        + " doesn't support removeAcl");
2504  }
2505
2506  /**
2507   * Fully replaces ACL of files and directories, discarding all existing
2508   * entries.
2509   *
2510   * @param path Path to modify
2511   * @param aclSpec List<AclEntry> describing modifications, must include entries
2512   *   for user, group, and others for compatibility with permission bits.
2513   * @throws IOException if an ACL could not be modified
2514   */
2515  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
2516    throw new UnsupportedOperationException(getClass().getSimpleName()
2517        + " doesn't support setAcl");
2518  }
2519
2520  /**
2521   * Gets the ACL of a file or directory.
2522   *
2523   * @param path Path to get
2524   * @return AclStatus describing the ACL of the file or directory
2525   * @throws IOException if an ACL could not be read
2526   */
2527  public AclStatus getAclStatus(Path path) throws IOException {
2528    throw new UnsupportedOperationException(getClass().getSimpleName()
2529        + " doesn't support getAclStatus");
2530  }
2531
2532  /**
2533   * Set an xattr of a file or directory.
2534   * The name must be prefixed with the namespace followed by ".". For example,
2535   * "user.attr".
2536   * <p/>
2537   * Refer to the HDFS extended attributes user documentation for details.
2538   *
2539   * @param path Path to modify
2540   * @param name xattr name.
2541   * @param value xattr value.
2542   * @throws IOException
2543   */
2544  public void setXAttr(Path path, String name, byte[] value)
2545      throws IOException {
2546    setXAttr(path, name, value, EnumSet.of(XAttrSetFlag.CREATE,
2547        XAttrSetFlag.REPLACE));
2548  }
2549
2550  /**
2551   * Set an xattr of a file or directory.
2552   * The name must be prefixed with the namespace followed by ".". For example,
2553   * "user.attr".
2554   * <p/>
2555   * Refer to the HDFS extended attributes user documentation for details.
2556   *
2557   * @param path Path to modify
2558   * @param name xattr name.
2559   * @param value xattr value.
2560   * @param flag xattr set flag
2561   * @throws IOException
2562   */
2563  public void setXAttr(Path path, String name, byte[] value,
2564      EnumSet<XAttrSetFlag> flag) throws IOException {
2565    throw new UnsupportedOperationException(getClass().getSimpleName()
2566        + " doesn't support setXAttr");
2567  }
2568
2569  /**
2570   * Get an xattr name and value for a file or directory.
2571   * The name must be prefixed with the namespace followed by ".". For example,
2572   * "user.attr".
2573   * <p/>
2574   * Refer to the HDFS extended attributes user documentation for details.
2575   *
2576   * @param path Path to get extended attribute
2577   * @param name xattr name.
2578   * @return byte[] xattr value.
2579   * @throws IOException
2580   */
2581  public byte[] getXAttr(Path path, String name) throws IOException {
2582    throw new UnsupportedOperationException(getClass().getSimpleName()
2583        + " doesn't support getXAttr");
2584  }
2585
2586  /**
2587   * Get all of the xattr name/value pairs for a file or directory.
2588   * Only those xattrs which the logged-in user has permissions to view
2589   * are returned.
2590   * <p/>
2591   * Refer to the HDFS extended attributes user documentation for details.
2592   *
2593   * @param path Path to get extended attributes
2594   * @return Map describing the XAttrs of the file or directory
2595   * @throws IOException
2596   */
2597  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
2598    throw new UnsupportedOperationException(getClass().getSimpleName()
2599        + " doesn't support getXAttrs");
2600  }
2601
2602  /**
2603   * Get all of the xattrs name/value pairs for a file or directory.
2604   * Only those xattrs which the logged-in user has permissions to view
2605   * are returned.
2606   * <p/>
2607   * Refer to the HDFS extended attributes user documentation for details.
2608   *
2609   * @param path Path to get extended attributes
2610   * @param names XAttr names.
2611   * @return Map describing the XAttrs of the file or directory
2612   * @throws IOException
2613   */
2614  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
2615      throws IOException {
2616    throw new UnsupportedOperationException(getClass().getSimpleName()
2617        + " doesn't support getXAttrs");
2618  }
2619
2620  /**
2621   * Get all of the xattr names for a file or directory.
2622   * Only those xattr names which the logged-in user has permissions to view
2623   * are returned.
2624   * <p/>
2625   * Refer to the HDFS extended attributes user documentation for details.
2626   *
2627   * @param path Path to get extended attributes
2628   * @return List<String> of the XAttr names of the file or directory
2629   * @throws IOException
2630   */
2631  public List<String> listXAttrs(Path path) throws IOException {
2632    throw new UnsupportedOperationException(getClass().getSimpleName()
2633            + " doesn't support listXAttrs");
2634  }
2635
2636  /**
2637   * Remove an xattr of a file or directory.
2638   * The name must be prefixed with the namespace followed by ".". For example,
2639   * "user.attr".
2640   * <p/>
2641   * Refer to the HDFS extended attributes user documentation for details.
2642   *
2643   * @param path Path to remove extended attribute
2644   * @param name xattr name
2645   * @throws IOException
2646   */
2647  public void removeXAttr(Path path, String name) throws IOException {
2648    throw new UnsupportedOperationException(getClass().getSimpleName()
2649        + " doesn't support removeXAttr");
2650  }
2651
2652  /**
2653   * Set the storage policy for a given file or directory.
2654   *
2655   * @param src file or directory path.
2656   * @param policyName the name of the target storage policy. The list
2657   *                   of supported Storage policies can be retrieved
2658   *                   via {@link #getAllStoragePolicies}.
2659   * @throws IOException
2660   */
2661  public void setStoragePolicy(final Path src, final String policyName)
2662      throws IOException {
2663    throw new UnsupportedOperationException(getClass().getSimpleName()
2664        + " doesn't support setStoragePolicy");
2665  }
2666
2667  /**
2668   * Unset the storage policy set for a given file or directory.
2669   * @param src file or directory path.
2670   * @throws IOException
2671   */
2672  public void unsetStoragePolicy(final Path src) throws IOException {
2673    throw new UnsupportedOperationException(getClass().getSimpleName()
2674        + " doesn't support unsetStoragePolicy");
2675  }
2676
2677  /**
2678   * Query the effective storage policy ID for the given file or directory.
2679   *
2680   * @param src file or directory path.
2681   * @return storage policy for give file.
2682   * @throws IOException
2683   */
2684  public BlockStoragePolicySpi getStoragePolicy(final Path src)
2685      throws IOException {
2686    throw new UnsupportedOperationException(getClass().getSimpleName()
2687        + " doesn't support getStoragePolicy");
2688  }
2689
2690  /**
2691   * Retrieve all the storage policies supported by this file system.
2692   *
2693   * @return all storage policies supported by this filesystem.
2694   * @throws IOException
2695   */
2696  public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
2697      throws IOException {
2698    throw new UnsupportedOperationException(getClass().getSimpleName()
2699        + " doesn't support getAllStoragePolicies");
2700  }
2701
2702  /**
2703   * Get the root directory of Trash for current user when the path specified
2704   * is deleted.
2705   *
2706   * @param path the trash root of the path to be determined.
2707   * @return the default implementation returns "/user/$USER/.Trash".
2708   */
2709  public Path getTrashRoot(Path path) {
2710    return this.makeQualified(new Path(getHomeDirectory().toUri().getPath(),
2711        TRASH_PREFIX));
2712  }
2713
2714  /**
2715   * Get all the trash roots for current user or all users.
2716   *
2717   * @param allUsers return trash roots for all users if true.
2718   * @return all the trash root directories.
2719   *         Default FileSystem returns .Trash under users' home directories if
2720   *         /user/$USER/.Trash exists.
2721   */
2722  public Collection<FileStatus> getTrashRoots(boolean allUsers) {
2723    Path userHome = new Path(getHomeDirectory().toUri().getPath());
2724    List<FileStatus> ret = new ArrayList<>();
2725    try {
2726      if (!allUsers) {
2727        Path userTrash = new Path(userHome, TRASH_PREFIX);
2728        if (exists(userTrash)) {
2729          ret.add(getFileStatus(userTrash));
2730        }
2731      } else {
2732        Path homeParent = userHome.getParent();
2733        if (exists(homeParent)) {
2734          FileStatus[] candidates = listStatus(homeParent);
2735          for (FileStatus candidate : candidates) {
2736            Path userTrash = new Path(candidate.getPath(), TRASH_PREFIX);
2737            if (exists(userTrash)) {
2738              candidate.setPath(userTrash);
2739              ret.add(candidate);
2740            }
2741          }
2742        }
2743      }
2744    } catch (IOException e) {
2745      LOG.warn("Cannot get all trash roots", e);
2746    }
2747    return ret;
2748  }
2749
2750  // making it volatile to be able to do a double checked locking
2751  private volatile static boolean FILE_SYSTEMS_LOADED = false;
2752
2753  private static final Map<String, Class<? extends FileSystem>>
2754    SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2755
2756  private static void loadFileSystems() {
2757    synchronized (FileSystem.class) {
2758      if (!FILE_SYSTEMS_LOADED) {
2759        ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2760        Iterator<FileSystem> it = serviceLoader.iterator();
2761        while (it.hasNext()) {
2762          FileSystem fs = null;
2763          try {
2764            fs = it.next();
2765            try {
2766              SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2767            } catch (Exception e) {
2768              LOG.warn("Cannot load: " + fs + " from " +
2769                  ClassUtil.findContainingJar(fs.getClass()), e);
2770            }
2771          } catch (ServiceConfigurationError ee) {
2772            LOG.warn("Cannot load filesystem", ee);
2773          }
2774        }
2775        FILE_SYSTEMS_LOADED = true;
2776      }
2777    }
2778  }
2779
2780  public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2781      Configuration conf) throws IOException {
2782    if (!FILE_SYSTEMS_LOADED) {
2783      loadFileSystems();
2784    }
2785    Class<? extends FileSystem> clazz = null;
2786    if (conf != null) {
2787      clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2788    }
2789    if (clazz == null) {
2790      clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2791    }
2792    if (clazz == null) {
2793      throw new IOException("No FileSystem for scheme: " + scheme);
2794    }
2795    return clazz;
2796  }
2797
2798  private static FileSystem createFileSystem(URI uri, Configuration conf
2799      ) throws IOException {
2800    Tracer tracer = FsTracer.get(conf);
2801    TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
2802    scope.addKVAnnotation("scheme", uri.getScheme());
2803    try {
2804      Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2805      FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2806      fs.initialize(uri, conf);
2807      return fs;
2808    } finally {
2809      scope.close();
2810    }
2811  }
2812
2813  /** Caching FileSystem objects */
2814  static class Cache {
2815    private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2816
2817    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2818    private final Set<Key> toAutoClose = new HashSet<Key>();
2819
2820    /** A variable that makes all objects in the cache unique */
2821    private static AtomicLong unique = new AtomicLong(1);
2822
2823    FileSystem get(URI uri, Configuration conf) throws IOException{
2824      Key key = new Key(uri, conf);
2825      return getInternal(uri, conf, key);
2826    }
2827
2828    /** The objects inserted into the cache using this method are all unique */
2829    FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2830      Key key = new Key(uri, conf, unique.getAndIncrement());
2831      return getInternal(uri, conf, key);
2832    }
2833
2834    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2835      FileSystem fs;
2836      synchronized (this) {
2837        fs = map.get(key);
2838      }
2839      if (fs != null) {
2840        return fs;
2841      }
2842
2843      fs = createFileSystem(uri, conf);
2844      synchronized (this) { // refetch the lock again
2845        FileSystem oldfs = map.get(key);
2846        if (oldfs != null) { // a file system is created while lock is releasing
2847          fs.close(); // close the new file system
2848          return oldfs;  // return the old file system
2849        }
2850        
2851        // now insert the new file system into the map
2852        if (map.isEmpty()
2853                && !ShutdownHookManager.get().isShutdownInProgress()) {
2854          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2855        }
2856        fs.key = key;
2857        map.put(key, fs);
2858        if (conf.getBoolean("fs.automatic.close", true)) {
2859          toAutoClose.add(key);
2860        }
2861        return fs;
2862      }
2863    }
2864
2865    synchronized void remove(Key key, FileSystem fs) {
2866      FileSystem cachedFs = map.remove(key);
2867      if (fs == cachedFs) {
2868        toAutoClose.remove(key);
2869      } else if (cachedFs != null) {
2870        map.put(key, cachedFs);
2871      }
2872    }
2873
2874    synchronized void closeAll() throws IOException {
2875      closeAll(false);
2876    }
2877
2878    /**
2879     * Close all FileSystem instances in the Cache.
2880     * @param onlyAutomatic only close those that are marked for automatic closing
2881     */
2882    synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2883      List<IOException> exceptions = new ArrayList<IOException>();
2884
2885      // Make a copy of the keys in the map since we'll be modifying
2886      // the map while iterating over it, which isn't safe.
2887      List<Key> keys = new ArrayList<Key>();
2888      keys.addAll(map.keySet());
2889
2890      for (Key key : keys) {
2891        final FileSystem fs = map.get(key);
2892
2893        if (onlyAutomatic && !toAutoClose.contains(key)) {
2894          continue;
2895        }
2896
2897        //remove from cache
2898        map.remove(key);
2899        toAutoClose.remove(key);
2900
2901        if (fs != null) {
2902          try {
2903            fs.close();
2904          }
2905          catch(IOException ioe) {
2906            exceptions.add(ioe);
2907          }
2908        }
2909      }
2910
2911      if (!exceptions.isEmpty()) {
2912        throw MultipleIOException.createIOException(exceptions);
2913      }
2914    }
2915
2916    private class ClientFinalizer implements Runnable {
2917      @Override
2918      public synchronized void run() {
2919        try {
2920          closeAll(true);
2921        } catch (IOException e) {
2922          LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2923        }
2924      }
2925    }
2926
2927    synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2928      List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2929      //Make a pass over the list and collect the filesystems to close
2930      //we cannot close inline since close() removes the entry from the Map
2931      for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2932        final Key key = entry.getKey();
2933        final FileSystem fs = entry.getValue();
2934        if (ugi.equals(key.ugi) && fs != null) {
2935          targetFSList.add(fs);   
2936        }
2937      }
2938      List<IOException> exceptions = new ArrayList<IOException>();
2939      //now make a pass over the target list and close each
2940      for (FileSystem fs : targetFSList) {
2941        try {
2942          fs.close();
2943        }
2944        catch(IOException ioe) {
2945          exceptions.add(ioe);
2946        }
2947      }
2948      if (!exceptions.isEmpty()) {
2949        throw MultipleIOException.createIOException(exceptions);
2950      }
2951    }
2952
2953    /** FileSystem.Cache.Key */
2954    static class Key {
2955      final String scheme;
2956      final String authority;
2957      final UserGroupInformation ugi;
2958      final long unique;   // an artificial way to make a key unique
2959
2960      Key(URI uri, Configuration conf) throws IOException {
2961        this(uri, conf, 0);
2962      }
2963
2964      Key(URI uri, Configuration conf, long unique) throws IOException {
2965        scheme = uri.getScheme()==null ?
2966            "" : StringUtils.toLowerCase(uri.getScheme());
2967        authority = uri.getAuthority()==null ?
2968            "" : StringUtils.toLowerCase(uri.getAuthority());
2969        this.unique = unique;
2970        
2971        this.ugi = UserGroupInformation.getCurrentUser();
2972      }
2973
2974      @Override
2975      public int hashCode() {
2976        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2977      }
2978
2979      static boolean isEqual(Object a, Object b) {
2980        return a == b || (a != null && a.equals(b));        
2981      }
2982
2983      @Override
2984      public boolean equals(Object obj) {
2985        if (obj == this) {
2986          return true;
2987        }
2988        if (obj != null && obj instanceof Key) {
2989          Key that = (Key)obj;
2990          return isEqual(this.scheme, that.scheme)
2991                 && isEqual(this.authority, that.authority)
2992                 && isEqual(this.ugi, that.ugi)
2993                 && (this.unique == that.unique);
2994        }
2995        return false;        
2996      }
2997
2998      @Override
2999      public String toString() {
3000        return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
3001      }
3002    }
3003  }
3004  
3005  /**
3006   * Tracks statistics about how many reads, writes, and so forth have been
3007   * done in a FileSystem.
3008   * 
3009   * Since there is only one of these objects per FileSystem, there will 
3010   * typically be many threads writing to this object.  Almost every operation
3011   * on an open file will involve a write to this object.  In contrast, reading
3012   * statistics is done infrequently by most programs, and not at all by others.
3013   * Hence, this is optimized for writes.
3014   * 
3015   * Each thread writes to its own thread-local area of memory.  This removes 
3016   * contention and allows us to scale up to many, many threads.  To read
3017   * statistics, the reader thread totals up the contents of all of the 
3018   * thread-local data areas.
3019   */
3020  public static final class Statistics {
3021    /**
3022     * Statistics data.
3023     * 
3024     * There is only a single writer to thread-local StatisticsData objects.
3025     * Hence, volatile is adequate here-- we do not need AtomicLong or similar
3026     * to prevent lost updates.
3027     * The Java specification guarantees that updates to volatile longs will
3028     * be perceived as atomic with respect to other threads, which is all we
3029     * need.
3030     */
3031    public static class StatisticsData {
3032      private volatile long bytesRead;
3033      private volatile long bytesWritten;
3034      private volatile int readOps;
3035      private volatile int largeReadOps;
3036      private volatile int writeOps;
3037      private volatile long bytesReadLocalHost;
3038      private volatile long bytesReadDistanceOfOneOrTwo;
3039      private volatile long bytesReadDistanceOfThreeOrFour;
3040      private volatile long bytesReadDistanceOfFiveOrLarger;
3041
3042      /**
3043       * Add another StatisticsData object to this one.
3044       */
3045      void add(StatisticsData other) {
3046        this.bytesRead += other.bytesRead;
3047        this.bytesWritten += other.bytesWritten;
3048        this.readOps += other.readOps;
3049        this.largeReadOps += other.largeReadOps;
3050        this.writeOps += other.writeOps;
3051        this.bytesReadLocalHost += other.bytesReadLocalHost;
3052        this.bytesReadDistanceOfOneOrTwo += other.bytesReadDistanceOfOneOrTwo;
3053        this.bytesReadDistanceOfThreeOrFour +=
3054            other.bytesReadDistanceOfThreeOrFour;
3055        this.bytesReadDistanceOfFiveOrLarger +=
3056            other.bytesReadDistanceOfFiveOrLarger;
3057      }
3058
3059      /**
3060       * Negate the values of all statistics.
3061       */
3062      void negate() {
3063        this.bytesRead = -this.bytesRead;
3064        this.bytesWritten = -this.bytesWritten;
3065        this.readOps = -this.readOps;
3066        this.largeReadOps = -this.largeReadOps;
3067        this.writeOps = -this.writeOps;
3068        this.bytesReadLocalHost = -this.bytesReadLocalHost;
3069        this.bytesReadDistanceOfOneOrTwo = -this.bytesReadDistanceOfOneOrTwo;
3070        this.bytesReadDistanceOfThreeOrFour =
3071            -this.bytesReadDistanceOfThreeOrFour;
3072        this.bytesReadDistanceOfFiveOrLarger =
3073            -this.bytesReadDistanceOfFiveOrLarger;
3074      }
3075
3076      @Override
3077      public String toString() {
3078        return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
3079            + readOps + " read ops, " + largeReadOps + " large read ops, "
3080            + writeOps + " write ops";
3081      }
3082      
3083      public long getBytesRead() {
3084        return bytesRead;
3085      }
3086      
3087      public long getBytesWritten() {
3088        return bytesWritten;
3089      }
3090      
3091      public int getReadOps() {
3092        return readOps;
3093      }
3094      
3095      public int getLargeReadOps() {
3096        return largeReadOps;
3097      }
3098      
3099      public int getWriteOps() {
3100        return writeOps;
3101      }
3102
3103      public long getBytesReadLocalHost() {
3104        return bytesReadLocalHost;
3105      }
3106
3107      public long getBytesReadDistanceOfOneOrTwo() {
3108        return bytesReadDistanceOfOneOrTwo;
3109      }
3110
3111      public long getBytesReadDistanceOfThreeOrFour() {
3112        return bytesReadDistanceOfThreeOrFour;
3113      }
3114
3115      public long getBytesReadDistanceOfFiveOrLarger() {
3116        return bytesReadDistanceOfFiveOrLarger;
3117      }
3118    }
3119
3120    private interface StatisticsAggregator<T> {
3121      void accept(StatisticsData data);
3122      T aggregate();
3123    }
3124
3125    private final String scheme;
3126
3127    /**
3128     * rootData is data that doesn't belong to any thread, but will be added
3129     * to the totals.  This is useful for making copies of Statistics objects,
3130     * and for storing data that pertains to threads that have been garbage
3131     * collected.  Protected by the Statistics lock.
3132     */
3133    private final StatisticsData rootData;
3134
3135    /**
3136     * Thread-local data.
3137     */
3138    private final ThreadLocal<StatisticsData> threadData;
3139
3140    /**
3141     * Set of all thread-local data areas.  Protected by the Statistics lock.
3142     * The references to the statistics data are kept using weak references
3143     * to the associated threads. Proper clean-up is performed by the cleaner
3144     * thread when the threads are garbage collected.
3145     */
3146    private final Set<StatisticsDataReference> allData;
3147
3148    /**
3149     * Global reference queue and a cleaner thread that manage statistics data
3150     * references from all filesystem instances.
3151     */
3152    private static final ReferenceQueue<Thread> STATS_DATA_REF_QUEUE;
3153    private static final Thread STATS_DATA_CLEANER;
3154
3155    static {
3156      STATS_DATA_REF_QUEUE = new ReferenceQueue<Thread>();
3157      // start a single daemon cleaner thread
3158      STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
3159      STATS_DATA_CLEANER.
3160          setName(StatisticsDataReferenceCleaner.class.getName());
3161      STATS_DATA_CLEANER.setDaemon(true);
3162      STATS_DATA_CLEANER.start();
3163    }
3164
3165    public Statistics(String scheme) {
3166      this.scheme = scheme;
3167      this.rootData = new StatisticsData();
3168      this.threadData = new ThreadLocal<StatisticsData>();
3169      this.allData = new HashSet<StatisticsDataReference>();
3170    }
3171
3172    /**
3173     * Copy constructor.
3174     * 
3175     * @param other    The input Statistics object which is cloned.
3176     */
3177    public Statistics(Statistics other) {
3178      this.scheme = other.scheme;
3179      this.rootData = new StatisticsData();
3180      other.visitAll(new StatisticsAggregator<Void>() {
3181        @Override
3182        public void accept(StatisticsData data) {
3183          rootData.add(data);
3184        }
3185
3186        public Void aggregate() {
3187          return null;
3188        }
3189      });
3190      this.threadData = new ThreadLocal<StatisticsData>();
3191      this.allData = new HashSet<StatisticsDataReference>();
3192    }
3193
3194    /**
3195     * A weak reference to a thread that also includes the data associated
3196     * with that thread. On the thread being garbage collected, it is enqueued
3197     * to the reference queue for clean-up.
3198     */
3199    private class StatisticsDataReference extends WeakReference<Thread> {
3200      private final StatisticsData data;
3201
3202      public StatisticsDataReference(StatisticsData data, Thread thread) {
3203        super(thread, STATS_DATA_REF_QUEUE);
3204        this.data = data;
3205      }
3206
3207      public StatisticsData getData() {
3208        return data;
3209      }
3210
3211      /**
3212       * Performs clean-up action when the associated thread is garbage
3213       * collected.
3214       */
3215      public void cleanUp() {
3216        // use the statistics lock for safety
3217        synchronized (Statistics.this) {
3218          /*
3219           * If the thread that created this thread-local data no longer exists,
3220           * remove the StatisticsData from our list and fold the values into
3221           * rootData.
3222           */
3223          rootData.add(data);
3224          allData.remove(this);
3225        }
3226      }
3227    }
3228
3229    /**
3230     * Background action to act on references being removed.
3231     */
3232    private static class StatisticsDataReferenceCleaner implements Runnable {
3233      @Override
3234      public void run() {
3235        while (!Thread.interrupted()) {
3236          try {
3237            StatisticsDataReference ref =
3238                (StatisticsDataReference)STATS_DATA_REF_QUEUE.remove();
3239            ref.cleanUp();
3240          } catch (InterruptedException ie) {
3241            LOG.warn("Cleaner thread interrupted, will stop", ie);
3242            Thread.currentThread().interrupt();
3243          } catch (Throwable th) {
3244            LOG.warn("Exception in the cleaner thread but it will continue to "
3245                + "run", th);
3246          }
3247        }
3248      }
3249    }
3250
3251    /**
3252     * Get or create the thread-local data associated with the current thread.
3253     */
3254    public StatisticsData getThreadStatistics() {
3255      StatisticsData data = threadData.get();
3256      if (data == null) {
3257        data = new StatisticsData();
3258        threadData.set(data);
3259        StatisticsDataReference ref =
3260            new StatisticsDataReference(data, Thread.currentThread());
3261        synchronized(this) {
3262          allData.add(ref);
3263        }
3264      }
3265      return data;
3266    }
3267
3268    /**
3269     * Increment the bytes read in the statistics
3270     * @param newBytes the additional bytes read
3271     */
3272    public void incrementBytesRead(long newBytes) {
3273      getThreadStatistics().bytesRead += newBytes;
3274    }
3275    
3276    /**
3277     * Increment the bytes written in the statistics
3278     * @param newBytes the additional bytes written
3279     */
3280    public void incrementBytesWritten(long newBytes) {
3281      getThreadStatistics().bytesWritten += newBytes;
3282    }
3283    
3284    /**
3285     * Increment the number of read operations
3286     * @param count number of read operations
3287     */
3288    public void incrementReadOps(int count) {
3289      getThreadStatistics().readOps += count;
3290    }
3291
3292    /**
3293     * Increment the number of large read operations
3294     * @param count number of large read operations
3295     */
3296    public void incrementLargeReadOps(int count) {
3297      getThreadStatistics().largeReadOps += count;
3298    }
3299
3300    /**
3301     * Increment the number of write operations
3302     * @param count number of write operations
3303     */
3304    public void incrementWriteOps(int count) {
3305      getThreadStatistics().writeOps += count;
3306    }
3307
3308    /**
3309     * Increment the bytes read by the network distance in the statistics
3310     * In the common network topology setup, distance value should be an even
3311     * number such as 0, 2, 4, 6. To make it more general, we group distance
3312     * by {1, 2}, {3, 4} and {5 and beyond} for accounting.
3313     * @param distance the network distance
3314     * @param newBytes the additional bytes read
3315     */
3316    public void incrementBytesReadByDistance(int distance, long newBytes) {
3317      switch (distance) {
3318      case 0:
3319        getThreadStatistics().bytesReadLocalHost += newBytes;
3320        break;
3321      case 1:
3322      case 2:
3323        getThreadStatistics().bytesReadDistanceOfOneOrTwo += newBytes;
3324        break;
3325      case 3:
3326      case 4:
3327        getThreadStatistics().bytesReadDistanceOfThreeOrFour += newBytes;
3328        break;
3329      default:
3330        getThreadStatistics().bytesReadDistanceOfFiveOrLarger += newBytes;
3331        break;
3332      }
3333    }
3334
3335    /**
3336     * Apply the given aggregator to all StatisticsData objects associated with
3337     * this Statistics object.
3338     *
3339     * For each StatisticsData object, we will call accept on the visitor.
3340     * Finally, at the end, we will call aggregate to get the final total. 
3341     *
3342     * @param         visitor to use.
3343     * @return        The total.
3344     */
3345    private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
3346      visitor.accept(rootData);
3347      for (StatisticsDataReference ref: allData) {
3348        StatisticsData data = ref.getData();
3349        visitor.accept(data);
3350      }
3351      return visitor.aggregate();
3352    }
3353
3354    /**
3355     * Get the total number of bytes read
3356     * @return the number of bytes
3357     */
3358    public long getBytesRead() {
3359      return visitAll(new StatisticsAggregator<Long>() {
3360        private long bytesRead = 0;
3361
3362        @Override
3363        public void accept(StatisticsData data) {
3364          bytesRead += data.bytesRead;
3365        }
3366
3367        public Long aggregate() {
3368          return bytesRead;
3369        }
3370      });
3371    }
3372    
3373    /**
3374     * Get the total number of bytes written
3375     * @return the number of bytes
3376     */
3377    public long getBytesWritten() {
3378      return visitAll(new StatisticsAggregator<Long>() {
3379        private long bytesWritten = 0;
3380
3381        @Override
3382        public void accept(StatisticsData data) {
3383          bytesWritten += data.bytesWritten;
3384        }
3385
3386        public Long aggregate() {
3387          return bytesWritten;
3388        }
3389      });
3390    }
3391    
3392    /**
3393     * Get the number of file system read operations such as list files
3394     * @return number of read operations
3395     */
3396    public int getReadOps() {
3397      return visitAll(new StatisticsAggregator<Integer>() {
3398        private int readOps = 0;
3399
3400        @Override
3401        public void accept(StatisticsData data) {
3402          readOps += data.readOps;
3403          readOps += data.largeReadOps;
3404        }
3405
3406        public Integer aggregate() {
3407          return readOps;
3408        }
3409      });
3410    }
3411
3412    /**
3413     * Get the number of large file system read operations such as list files
3414     * under a large directory
3415     * @return number of large read operations
3416     */
3417    public int getLargeReadOps() {
3418      return visitAll(new StatisticsAggregator<Integer>() {
3419        private int largeReadOps = 0;
3420
3421        @Override
3422        public void accept(StatisticsData data) {
3423          largeReadOps += data.largeReadOps;
3424        }
3425
3426        public Integer aggregate() {
3427          return largeReadOps;
3428        }
3429      });
3430    }
3431
3432    /**
3433     * Get the number of file system write operations such as create, append 
3434     * rename etc.
3435     * @return number of write operations
3436     */
3437    public int getWriteOps() {
3438      return visitAll(new StatisticsAggregator<Integer>() {
3439        private int writeOps = 0;
3440
3441        @Override
3442        public void accept(StatisticsData data) {
3443          writeOps += data.writeOps;
3444        }
3445
3446        public Integer aggregate() {
3447          return writeOps;
3448        }
3449      });
3450    }
3451
3452    /**
3453     * In the common network topology setup, distance value should be an even
3454     * number such as 0, 2, 4, 6. To make it more general, we group distance
3455     * by {1, 2}, {3, 4} and {5 and beyond} for accounting. So if the caller
3456     * ask for bytes read for distance 2, the function will return the value
3457     * for group {1, 2}.
3458     * @param distance the network distance
3459     * @return the total number of bytes read by the network distance
3460     */
3461    public long getBytesReadByDistance(int distance) {
3462      long bytesRead;
3463      switch (distance) {
3464      case 0:
3465        bytesRead = getData().getBytesReadLocalHost();
3466        break;
3467      case 1:
3468      case 2:
3469        bytesRead = getData().getBytesReadDistanceOfOneOrTwo();
3470        break;
3471      case 3:
3472      case 4:
3473        bytesRead = getData().getBytesReadDistanceOfThreeOrFour();
3474        break;
3475      default:
3476        bytesRead = getData().getBytesReadDistanceOfFiveOrLarger();
3477        break;
3478      }
3479      return bytesRead;
3480    }
3481
3482    /**
3483     * Get all statistics data
3484     * MR or other frameworks can use the method to get all statistics at once.
3485     * @return the StatisticsData
3486     */
3487    public StatisticsData getData() {
3488      return visitAll(new StatisticsAggregator<StatisticsData>() {
3489        private StatisticsData all = new StatisticsData();
3490
3491        @Override
3492        public void accept(StatisticsData data) {
3493          all.add(data);
3494        }
3495
3496        public StatisticsData aggregate() {
3497          return all;
3498        }
3499      });
3500    }
3501
3502    @Override
3503    public String toString() {
3504      return visitAll(new StatisticsAggregator<String>() {
3505        private StatisticsData total = new StatisticsData();
3506
3507        @Override
3508        public void accept(StatisticsData data) {
3509          total.add(data);
3510        }
3511
3512        public String aggregate() {
3513          return total.toString();
3514        }
3515      });
3516    }
3517
3518    /**
3519     * Resets all statistics to 0.
3520     *
3521     * In order to reset, we add up all the thread-local statistics data, and
3522     * set rootData to the negative of that.
3523     *
3524     * This may seem like a counterintuitive way to reset the statistics.  Why
3525     * can't we just zero out all the thread-local data?  Well, thread-local
3526     * data can only be modified by the thread that owns it.  If we tried to
3527     * modify the thread-local data from this thread, our modification might get
3528     * interleaved with a read-modify-write operation done by the thread that
3529     * owns the data.  That would result in our update getting lost.
3530     *
3531     * The approach used here avoids this problem because it only ever reads
3532     * (not writes) the thread-local data.  Both reads and writes to rootData
3533     * are done under the lock, so we're free to modify rootData from any thread
3534     * that holds the lock.
3535     */
3536    public void reset() {
3537      visitAll(new StatisticsAggregator<Void>() {
3538        private StatisticsData total = new StatisticsData();
3539
3540        @Override
3541        public void accept(StatisticsData data) {
3542          total.add(data);
3543        }
3544
3545        public Void aggregate() {
3546          total.negate();
3547          rootData.add(total);
3548          return null;
3549        }
3550      });
3551    }
3552    
3553    /**
3554     * Get the uri scheme associated with this statistics object.
3555     * @return the schema associated with this set of statistics
3556     */
3557    public String getScheme() {
3558      return scheme;
3559    }
3560
3561    @VisibleForTesting
3562    synchronized int getAllThreadLocalDataSize() {
3563      return allData.size();
3564    }
3565  }
3566  
3567  /**
3568   * Get the Map of Statistics object indexed by URI Scheme.
3569   * @return a Map having a key as URI scheme and value as Statistics object
3570   * @deprecated use {@link #getGlobalStorageStatistics()}
3571   */
3572  @Deprecated
3573  public static synchronized Map<String, Statistics> getStatistics() {
3574    Map<String, Statistics> result = new HashMap<String, Statistics>();
3575    for(Statistics stat: statisticsTable.values()) {
3576      result.put(stat.getScheme(), stat);
3577    }
3578    return result;
3579  }
3580
3581  /**
3582   * Return the FileSystem classes that have Statistics.
3583   * @deprecated use {@link #getGlobalStorageStatistics()}
3584   */
3585  @Deprecated
3586  public static synchronized List<Statistics> getAllStatistics() {
3587    return new ArrayList<Statistics>(statisticsTable.values());
3588  }
3589  
3590  /**
3591   * Get the statistics for a particular file system
3592   * @param cls the class to lookup
3593   * @return a statistics object
3594   * @deprecated use {@link #getGlobalStorageStatistics()}
3595   */
3596  @Deprecated
3597  public static synchronized Statistics getStatistics(final String scheme,
3598      Class<? extends FileSystem> cls) {
3599    checkArgument(scheme != null,
3600        "No statistics is allowed for a file system with null scheme!");
3601    Statistics result = statisticsTable.get(cls);
3602    if (result == null) {
3603      final Statistics newStats = new Statistics(scheme);
3604      statisticsTable.put(cls, newStats);
3605      result = newStats;
3606      GlobalStorageStatistics.INSTANCE.put(scheme,
3607          new StorageStatisticsProvider() {
3608            @Override
3609            public StorageStatistics provide() {
3610              return new FileSystemStorageStatistics(scheme, newStats);
3611            }
3612          });
3613    }
3614    return result;
3615  }
3616  
3617  /**
3618   * Reset all statistics for all file systems
3619   */
3620  public static synchronized void clearStatistics() {
3621    final Iterator<StorageStatistics> iterator =
3622        GlobalStorageStatistics.INSTANCE.iterator();
3623    while (iterator.hasNext()) {
3624      final StorageStatistics statistics = iterator.next();
3625      statistics.reset();
3626    }
3627  }
3628
3629  /**
3630   * Print all statistics for all file systems
3631   */
3632  public static synchronized
3633  void printStatistics() throws IOException {
3634    for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
3635            statisticsTable.entrySet()) {
3636      System.out.println("  FileSystem " + pair.getKey().getName() + 
3637                         ": " + pair.getValue());
3638    }
3639  }
3640
3641  // Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
3642  private static boolean symlinksEnabled = false;
3643
3644  private static Configuration conf = null;
3645
3646  @VisibleForTesting
3647  public static boolean areSymlinksEnabled() {
3648    return symlinksEnabled;
3649  }
3650
3651  @VisibleForTesting
3652  public static void enableSymlinks() {
3653    symlinksEnabled = true;
3654  }
3655
3656  /**
3657   * Get the StorageStatistics for this FileSystem object.  These statistics are
3658   * per-instance.  They are not shared with any other FileSystem object.
3659   *
3660   * <p>This is a default method which is intended to be overridden by
3661   * subclasses. The default implementation returns an empty storage statistics
3662   * object.</p>
3663   *
3664   * @return    The StorageStatistics for this FileSystem instance.
3665   *            Will never be null.
3666   */
3667  public StorageStatistics getStorageStatistics() {
3668    return new EmptyStorageStatistics(getUri().toString());
3669  }
3670
3671  /**
3672   * Get the global storage statistics.
3673   */
3674  public static GlobalStorageStatistics getGlobalStorageStatistics() {
3675    return GlobalStorageStatistics.INSTANCE;
3676  }
3677}