001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.s3native;
020
021import java.io.BufferedOutputStream;
022import java.io.EOFException;
023import java.io.File;
024import java.io.FileNotFoundException;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.InputStream;
028import java.io.OutputStream;
029import java.net.URI;
030import java.security.DigestOutputStream;
031import java.security.MessageDigest;
032import java.security.NoSuchAlgorithmException;
033import java.util.ArrayList;
034import java.util.HashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.TreeSet;
039import java.util.concurrent.TimeUnit;
040
041import com.google.common.base.Preconditions;
042import org.apache.hadoop.classification.InterfaceAudience;
043import org.apache.hadoop.classification.InterfaceStability;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.BufferedFSInputStream;
046import org.apache.hadoop.fs.FSDataInputStream;
047import org.apache.hadoop.fs.FSDataOutputStream;
048import org.apache.hadoop.fs.FSExceptionMessages;
049import org.apache.hadoop.fs.FSInputStream;
050import org.apache.hadoop.fs.FileAlreadyExistsException;
051import org.apache.hadoop.fs.FileStatus;
052import org.apache.hadoop.fs.FileSystem;
053import org.apache.hadoop.fs.LocalDirAllocator;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.fs.permission.FsPermission;
056import org.apache.hadoop.io.IOUtils;
057import org.apache.hadoop.io.retry.RetryPolicies;
058import org.apache.hadoop.io.retry.RetryPolicy;
059import org.apache.hadoop.io.retry.RetryProxy;
060import org.apache.hadoop.util.Progressable;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_BUFFER_DIR_DEFAULT;
065import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_BUFFER_DIR_KEY;
066import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_MAX_RETRIES_DEFAUL;
067import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_MAX_RETRIES_KEY;
068import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_SLEEP_TIME_DEFAULT;
069import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_SLEEP_TIME_KEY;
070import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.addDeprecatedConfigKeys;
071
072/**
073 * A {@link FileSystem} for reading and writing files stored on
074 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
075 * This implementation stores files on S3 in their native form so they can be
076 * read by other S3 tools.
077 * <p>
078 * A note about directories. S3 of course has no "native" support for them.
079 * The idiom we choose then is: for any directory created by this class,
080 * we use an empty object "#{dirpath}_$folder$" as a marker.
081 * Further, to interoperate with other S3 tools, we also accept the following:
082 * <ul>
083 *   <li>an object "#{dirpath}/' denoting a directory marker</li>
084 *   <li>
085 *     if there exists any objects with the prefix "#{dirpath}/", then the
086 *     directory is said to exist
087 *   </li>
088 *   <li>
089 *     if both a file with the name of a directory and a marker for that
090 *     directory exists, then the *file masks the directory*, and the directory
091 *     is never returned.
092 *   </li>
093 * </ul>
094 */
095@InterfaceAudience.Public
096@InterfaceStability.Stable
097public class NativeS3FileSystem extends FileSystem {
098  
099  public static final Logger LOG =
100      LoggerFactory.getLogger(NativeS3FileSystem.class);
101  
102  private static final String FOLDER_SUFFIX = "_$folder$";
103  static final String PATH_DELIMITER = Path.SEPARATOR;
104  private static final int S3_MAX_LISTING_LENGTH = 1000;
105
106  static {
107    // Add the deprecated config keys
108    addDeprecatedConfigKeys();
109  }
110
111  static class NativeS3FsInputStream extends FSInputStream {
112    
113    private NativeFileSystemStore store;
114    private Statistics statistics;
115    private InputStream in;
116    private final String key;
117    private long pos = 0;
118    
119    public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
120      Preconditions.checkNotNull(in, "Null input stream");
121      this.store = store;
122      this.statistics = statistics;
123      this.in = in;
124      this.key = key;
125    }
126    
127    @Override
128    public synchronized int read() throws IOException {
129      int result;
130      try {
131        result = in.read();
132      } catch (IOException e) {
133        LOG.info("Received IOException while reading '{}', attempting to reopen",
134            key);
135        LOG.debug("{}", e, e);
136        try {
137          reopen(pos);
138          result = in.read();
139        } catch (EOFException eof) {
140          LOG.debug("EOF on input stream read: {}", eof, eof);
141          result = -1;
142        }
143      } 
144      if (result != -1) {
145        pos++;
146      }
147      if (statistics != null && result != -1) {
148        statistics.incrementBytesRead(1);
149      }
150      return result;
151    }
152    @Override
153    public synchronized int read(byte[] b, int off, int len)
154      throws IOException {
155      if (in == null) {
156        throw new EOFException("Cannot read closed stream");
157      }
158      int result = -1;
159      try {
160        result = in.read(b, off, len);
161      } catch (EOFException eof) {
162        throw eof;
163      } catch (IOException e) {
164        LOG.info( "Received IOException while reading '{}'," +
165                  " attempting to reopen.", key);
166        reopen(pos);
167        result = in.read(b, off, len);
168      }
169      if (result > 0) {
170        pos += result;
171      }
172      if (statistics != null && result > 0) {
173        statistics.incrementBytesRead(result);
174      }
175      return result;
176    }
177
178    @Override
179    public synchronized void close() throws IOException {
180      closeInnerStream();
181    }
182
183    /**
184     * Close the inner stream if not null. Even if an exception
185     * is raised during the close, the field is set to null
186     */
187    private void closeInnerStream() {
188      IOUtils.closeStream(in);
189      in = null;
190    }
191
192    /**
193     * Reopen a new input stream with the specified position
194     * @param pos the position to reopen a new stream
195     * @throws IOException
196     */
197    private synchronized void reopen(long pos) throws IOException {
198        LOG.debug("Reopening key '{}' for reading at position '{}", key, pos);
199        InputStream newStream = store.retrieve(key, pos);
200        updateInnerStream(newStream, pos);
201    }
202
203    /**
204     * Update inner stream with a new stream and position
205     * @param newStream new stream -must not be null
206     * @param newpos new position
207     * @throws IOException IO exception on a failure to close the existing
208     * stream.
209     */
210    private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException {
211      Preconditions.checkNotNull(newStream, "Null newstream argument");
212      closeInnerStream();
213      in = newStream;
214      this.pos = newpos;
215    }
216
217    @Override
218    public synchronized void seek(long newpos) throws IOException {
219      if (newpos < 0) {
220        throw new EOFException(
221            FSExceptionMessages.NEGATIVE_SEEK);
222      }
223      if (pos != newpos) {
224        // the seek is attempting to move the current position
225        reopen(newpos);
226      }
227    }
228
229    @Override
230    public synchronized long getPos() throws IOException {
231      return pos;
232    }
233    @Override
234    public boolean seekToNewSource(long targetPos) throws IOException {
235      return false;
236    }
237  }
238  
239  private class NativeS3FsOutputStream extends OutputStream {
240    
241    private Configuration conf;
242    private String key;
243    private File backupFile;
244    private OutputStream backupStream;
245    private MessageDigest digest;
246    private boolean closed;
247    private LocalDirAllocator lDirAlloc;
248    
249    public NativeS3FsOutputStream(Configuration conf,
250        NativeFileSystemStore store, String key, Progressable progress,
251        int bufferSize) throws IOException {
252      this.conf = conf;
253      this.key = key;
254      this.backupFile = newBackupFile();
255      LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
256      try {
257        this.digest = MessageDigest.getInstance("MD5");
258        this.backupStream = new BufferedOutputStream(new DigestOutputStream(
259            new FileOutputStream(backupFile), this.digest));
260      } catch (NoSuchAlgorithmException e) {
261        LOG.warn("Cannot load MD5 digest algorithm," +
262            "skipping message integrity check.", e);
263        this.backupStream = new BufferedOutputStream(
264            new FileOutputStream(backupFile));
265      }
266    }
267
268    private File newBackupFile() throws IOException {
269      if (conf.get(S3_NATIVE_BUFFER_DIR_KEY, null) != null) {
270        lDirAlloc = new LocalDirAllocator(S3_NATIVE_BUFFER_DIR_KEY);
271      } else {
272        lDirAlloc = new LocalDirAllocator(S3_NATIVE_BUFFER_DIR_DEFAULT);
273      }
274      File result = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
275      result.deleteOnExit();
276      return result;
277    }
278    
279    @Override
280    public void flush() throws IOException {
281      backupStream.flush();
282    }
283    
284    @Override
285    public synchronized void close() throws IOException {
286      if (closed) {
287        return;
288      }
289
290      backupStream.close();
291      LOG.info("OutputStream for key '{}' closed. Now beginning upload", key);
292      
293      try {
294        byte[] md5Hash = digest == null ? null : digest.digest();
295        store.storeFile(key, backupFile, md5Hash);
296      } finally {
297        if (!backupFile.delete()) {
298          LOG.warn("Could not delete temporary s3n file: " + backupFile);
299        }
300        super.close();
301        closed = true;
302      } 
303      LOG.info("OutputStream for key '{}' upload complete", key);
304    }
305
306    @Override
307    public void write(int b) throws IOException {
308      backupStream.write(b);
309    }
310
311    @Override
312    public void write(byte[] b, int off, int len) throws IOException {
313      backupStream.write(b, off, len);
314    }
315  }
316  
317  private URI uri;
318  private NativeFileSystemStore store;
319  private Path workingDir;
320  
321  public NativeS3FileSystem() {
322    // set store in initialize()
323  }
324  
325  public NativeS3FileSystem(NativeFileSystemStore store) {
326    this.store = store;
327  }
328
329  /**
330   * Return the protocol scheme for the FileSystem.
331   *
332   * @return <code>s3n</code>
333   */
334  @Override
335  public String getScheme() {
336    return "s3n";
337  }
338
339  @Override
340  public void initialize(URI uri, Configuration conf) throws IOException {
341    super.initialize(uri, conf);
342    if (store == null) {
343      store = createDefaultStore(conf);
344    }
345    store.initialize(uri, conf);
346    setConf(conf);
347    this.uri = S3xLoginHelper.buildFSURI(uri);
348    this.workingDir =
349      new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory());
350  }
351  
352  private static NativeFileSystemStore createDefaultStore(Configuration conf) {
353    NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
354    
355    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
356        conf.getInt(S3_NATIVE_MAX_RETRIES_KEY, S3_NATIVE_MAX_RETRIES_DEFAUL),
357        conf.getLong(S3_NATIVE_SLEEP_TIME_KEY, S3_NATIVE_SLEEP_TIME_DEFAULT),
358        TimeUnit.SECONDS);
359    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
360      new HashMap<Class<? extends Exception>, RetryPolicy>();
361    exceptionToPolicyMap.put(IOException.class, basePolicy);
362    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
363    
364    RetryPolicy methodPolicy = RetryPolicies.retryByException(
365        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
366    Map<String, RetryPolicy> methodNameToPolicyMap =
367      new HashMap<String, RetryPolicy>();
368    methodNameToPolicyMap.put("storeFile", methodPolicy);
369    methodNameToPolicyMap.put("rename", methodPolicy);
370    
371    return (NativeFileSystemStore)
372      RetryProxy.create(NativeFileSystemStore.class, store,
373          methodNameToPolicyMap);
374  }
375  
376  private static String pathToKey(Path path) {
377    if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
378      // allow uris without trailing slash after bucket to refer to root,
379      // like s3n://mybucket
380      return "";
381    }
382    if (!path.isAbsolute()) {
383      throw new IllegalArgumentException("Path must be absolute: " + path);
384    }
385    String ret = path.toUri().getPath().substring(1); // remove initial slash
386    if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
387      ret = ret.substring(0, ret.length() -1);
388  }
389    return ret;
390  }
391  
392  private static Path keyToPath(String key) {
393    return new Path("/" + key);
394  }
395  
396  private Path makeAbsolute(Path path) {
397    if (path.isAbsolute()) {
398      return path;
399    }
400    return new Path(workingDir, path);
401  }
402
403  /**
404   * Check that a Path belongs to this FileSystem.
405   * Unlike the superclass, this version does not look at authority,
406   * only hostnames.
407   * @param path to check
408   * @throws IllegalArgumentException if there is an FS mismatch
409   */
410  @Override
411  protected void checkPath(Path path) {
412    S3xLoginHelper.checkPath(getConf(), getUri(), path, getDefaultPort());
413  }
414
415  @Override
416  protected URI canonicalizeUri(URI rawUri) {
417    return S3xLoginHelper.canonicalizeUri(rawUri, getDefaultPort());
418  }
419
420  /** This optional operation is not yet supported. */
421  @Override
422  public FSDataOutputStream append(Path f, int bufferSize,
423      Progressable progress) throws IOException {
424    throw new UnsupportedOperationException("Append is not supported "
425        + "by NativeS3FileSystem");
426  }
427  
428  @Override
429  public FSDataOutputStream create(Path f, FsPermission permission,
430      boolean overwrite, int bufferSize, short replication, long blockSize,
431      Progressable progress) throws IOException {
432
433    if (exists(f) && !overwrite) {
434      throw new FileAlreadyExistsException("File already exists: " + f);
435    }
436    
437    if(LOG.isDebugEnabled()) {
438      LOG.debug("Creating new file '" + f + "' in S3");
439    }
440    Path absolutePath = makeAbsolute(f);
441    String key = pathToKey(absolutePath);
442    return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
443        key, progress, bufferSize), statistics);
444  }
445  
446  @Override
447  public boolean delete(Path f, boolean recurse) throws IOException {
448    FileStatus status;
449    try {
450      status = getFileStatus(f);
451    } catch (FileNotFoundException e) {
452      if(LOG.isDebugEnabled()) {
453        LOG.debug("Delete called for '" + f +
454            "' but file does not exist, so returning false");
455      }
456      return false;
457    }
458    Path absolutePath = makeAbsolute(f);
459    String key = pathToKey(absolutePath);
460    if (status.isDirectory()) {
461      if (!recurse && listStatus(f).length > 0) {
462        throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false");
463      }
464
465      createParent(f);
466
467      if(LOG.isDebugEnabled()) {
468        LOG.debug("Deleting directory '" + f  + "'");
469      }
470      String priorLastKey = null;
471      do {
472        PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
473        for (FileMetadata file : listing.getFiles()) {
474          store.delete(file.getKey());
475        }
476        priorLastKey = listing.getPriorLastKey();
477      } while (priorLastKey != null);
478
479      try {
480        store.delete(key + FOLDER_SUFFIX);
481      } catch (FileNotFoundException e) {
482        //this is fine, we don't require a marker
483      }
484    } else {
485      if(LOG.isDebugEnabled()) {
486        LOG.debug("Deleting file '" + f + "'");
487      }
488      createParent(f);
489      store.delete(key);
490    }
491    return true;
492  }
493
494  @Override
495  public FileStatus getFileStatus(Path f) throws IOException {
496    Path absolutePath = makeAbsolute(f);
497    String key = pathToKey(absolutePath);
498    
499    if (key.length() == 0) { // root always exists
500      return newDirectory(absolutePath);
501    }
502    
503    if(LOG.isDebugEnabled()) {
504      LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
505    }
506    FileMetadata meta = store.retrieveMetadata(key);
507    if (meta != null) {
508      if(LOG.isDebugEnabled()) {
509        LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
510      }
511      return newFile(meta, absolutePath);
512    }
513    if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
514      if(LOG.isDebugEnabled()) {
515        LOG.debug("getFileStatus returning 'directory' for key '" + key +
516            "' as '" + key + FOLDER_SUFFIX + "' exists");
517      }
518      return newDirectory(absolutePath);
519    }
520    
521    if(LOG.isDebugEnabled()) {
522      LOG.debug("getFileStatus listing key '" + key + "'");
523    }
524    PartialListing listing = store.list(key, 1);
525    if (listing.getFiles().length > 0 ||
526        listing.getCommonPrefixes().length > 0) {
527      if(LOG.isDebugEnabled()) {
528        LOG.debug("getFileStatus returning 'directory' for key '" + key +
529            "' as it has contents");
530      }
531      return newDirectory(absolutePath);
532    }
533    
534    if(LOG.isDebugEnabled()) {
535      LOG.debug("getFileStatus could not find key '" + key + "'");
536    }
537    throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
538  }
539
540  @Override
541  public URI getUri() {
542    return uri;
543  }
544
545  /**
546   * <p>
547   * If <code>f</code> is a file, this method will make a single call to S3.
548   * If <code>f</code> is a directory, this method will make a maximum of
549   * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
550   * files and directories contained directly in <code>f</code>.
551   * </p>
552   */
553  @Override
554  public FileStatus[] listStatus(Path f) throws IOException {
555
556    Path absolutePath = makeAbsolute(f);
557    String key = pathToKey(absolutePath);
558    
559    if (key.length() > 0) {
560      FileMetadata meta = store.retrieveMetadata(key);
561      if (meta != null) {
562        return new FileStatus[] { newFile(meta, absolutePath) };
563      }
564    }
565    
566    URI pathUri = absolutePath.toUri();
567    Set<FileStatus> status = new TreeSet<FileStatus>();
568    String priorLastKey = null;
569    do {
570      PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
571      for (FileMetadata fileMetadata : listing.getFiles()) {
572        Path subpath = keyToPath(fileMetadata.getKey());
573        String relativePath = pathUri.relativize(subpath.toUri()).getPath();
574
575        if (fileMetadata.getKey().equals(key + "/")) {
576          // this is just the directory we have been asked to list
577        }
578        else if (relativePath.endsWith(FOLDER_SUFFIX)) {
579          status.add(newDirectory(new Path(
580              absolutePath,
581              relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
582        }
583        else {
584          status.add(newFile(fileMetadata, subpath));
585        }
586      }
587      for (String commonPrefix : listing.getCommonPrefixes()) {
588        Path subpath = keyToPath(commonPrefix);
589        String relativePath = pathUri.relativize(subpath.toUri()).getPath();
590        status.add(newDirectory(new Path(absolutePath, relativePath)));
591      }
592      priorLastKey = listing.getPriorLastKey();
593    } while (priorLastKey != null);
594    
595    if (status.isEmpty() &&
596        key.length() > 0 &&
597        store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
598      throw new FileNotFoundException("File " + f + " does not exist.");
599    }
600    
601    return status.toArray(new FileStatus[status.size()]);
602  }
603  
604  private FileStatus newFile(FileMetadata meta, Path path) {
605    return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
606        meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory()));
607  }
608  
609  private FileStatus newDirectory(Path path) {
610    return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory()));
611  }
612
613  @Override
614  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
615    Path absolutePath = makeAbsolute(f);
616    List<Path> paths = new ArrayList<Path>();
617    do {
618      paths.add(0, absolutePath);
619      absolutePath = absolutePath.getParent();
620    } while (absolutePath != null);
621    
622    boolean result = true;
623    for (Path path : paths) {
624      result &= mkdir(path);
625    }
626    return result;
627  }
628  
629  private boolean mkdir(Path f) throws IOException {
630    try {
631      FileStatus fileStatus = getFileStatus(f);
632      if (fileStatus.isFile()) {
633        throw new FileAlreadyExistsException(String.format(
634            "Can't make directory for path '%s' since it is a file.", f));
635
636      }
637    } catch (FileNotFoundException e) {
638      if(LOG.isDebugEnabled()) {
639        LOG.debug("Making dir '" + f + "' in S3");
640      }
641      String key = pathToKey(f) + FOLDER_SUFFIX;
642      store.storeEmptyFile(key);    
643    }
644    return true;
645  }
646
647  @Override
648  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
649    FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
650    if (fs.isDirectory()) {
651      throw new FileNotFoundException("'" + f + "' is a directory");
652    }
653    LOG.info("Opening '" + f + "' for reading");
654    Path absolutePath = makeAbsolute(f);
655    String key = pathToKey(absolutePath);
656    return new FSDataInputStream(new BufferedFSInputStream(
657        new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize));
658  }
659  
660  // rename() and delete() use this method to ensure that the parent directory
661  // of the source does not vanish.
662  private void createParent(Path path) throws IOException {
663    Path parent = path.getParent();
664    if (parent != null) {
665      String key = pathToKey(makeAbsolute(parent));
666      if (key.length() > 0) {
667          store.storeEmptyFile(key + FOLDER_SUFFIX);
668      }
669    }
670  }
671  
672    
673  @Override
674  public boolean rename(Path src, Path dst) throws IOException {
675
676    String srcKey = pathToKey(makeAbsolute(src));
677    final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
678
679    if (srcKey.length() == 0) {
680      // Cannot rename root of file system
681      if (LOG.isDebugEnabled()) {
682        LOG.debug(debugPreamble +
683                  "returning false as cannot rename the root of a filesystem");
684      }
685      return false;
686    }
687
688    //get status of source
689    boolean srcIsFile;
690    try {
691      srcIsFile = getFileStatus(src).isFile();
692    } catch (FileNotFoundException e) {
693      //bail out fast if the source does not exist
694      if (LOG.isDebugEnabled()) {
695        LOG.debug(debugPreamble + "returning false as src does not exist");
696      }
697      return false;
698    }
699    // Figure out the final destination
700    String dstKey = pathToKey(makeAbsolute(dst));
701
702    try {
703      boolean dstIsFile = getFileStatus(dst).isFile();
704      if (dstIsFile) {
705        //destination is a file.
706        //you can't copy a file or a directory onto an existing file
707        //except for the special case of dest==src, which is a no-op
708        if(LOG.isDebugEnabled()) {
709          LOG.debug(debugPreamble +
710              "returning without rename as dst is an already existing file");
711        }
712        //exit, returning true iff the rename is onto self
713        return srcKey.equals(dstKey);
714      } else {
715        //destination exists and is a directory
716        if(LOG.isDebugEnabled()) {
717          LOG.debug(debugPreamble + "using dst as output directory");
718        }
719        //destination goes under the dst path, with the name of the
720        //source entry
721        dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
722      }
723    } catch (FileNotFoundException e) {
724      //destination does not exist => the source file or directory
725      //is copied over with the name of the destination
726      if(LOG.isDebugEnabled()) {
727        LOG.debug(debugPreamble + "using dst as output destination");
728      }
729      try {
730        if (getFileStatus(dst.getParent()).isFile()) {
731          if(LOG.isDebugEnabled()) {
732            LOG.debug(debugPreamble +
733                "returning false as dst parent exists and is a file");
734          }
735          return false;
736        }
737      } catch (FileNotFoundException ex) {
738        if(LOG.isDebugEnabled()) {
739          LOG.debug(debugPreamble +
740              "returning false as dst parent does not exist");
741        }
742        return false;
743      }
744    }
745
746    //rename to self behavior follows Posix rules and is different
747    //for directories and files -the return code is driven by src type
748    if (srcKey.equals(dstKey)) {
749      //fully resolved destination key matches source: fail
750      if (LOG.isDebugEnabled()) {
751        LOG.debug(debugPreamble + "renamingToSelf; returning true");
752      }
753      return true;
754    }
755    if (srcIsFile) {
756      //source is a file; COPY then DELETE
757      if(LOG.isDebugEnabled()) {
758        LOG.debug(debugPreamble +
759            "src is file, so doing copy then delete in S3");
760      }
761      store.copy(srcKey, dstKey);
762      store.delete(srcKey);
763    } else {
764      //src is a directory
765      if(LOG.isDebugEnabled()) {
766        LOG.debug(debugPreamble + "src is directory, so copying contents");
767      }
768      //Verify dest is not a child of the parent
769      if (dstKey.startsWith(srcKey + "/")) {
770        if (LOG.isDebugEnabled()) {
771          LOG.debug(
772            debugPreamble + "cannot rename a directory to a subdirectory of self");
773        }
774        return false;
775      }
776      //create the subdir under the destination
777      store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
778
779      List<String> keysToDelete = new ArrayList<String>();
780      String priorLastKey = null;
781      do {
782        PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
783        for (FileMetadata file : listing.getFiles()) {
784          keysToDelete.add(file.getKey());
785          store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
786        }
787        priorLastKey = listing.getPriorLastKey();
788      } while (priorLastKey != null);
789
790      if(LOG.isDebugEnabled()) {
791        LOG.debug(debugPreamble +
792            "all files in src copied, now removing src files");
793      }
794      for (String key: keysToDelete) {
795        store.delete(key);
796      }
797
798      try {
799        store.delete(srcKey + FOLDER_SUFFIX);
800      } catch (FileNotFoundException e) {
801        //this is fine, we don't require a marker
802      }
803      if(LOG.isDebugEnabled()) {
804        LOG.debug(debugPreamble + "done");
805      }
806    }
807
808    return true;
809  }
810  
811  @Override
812  public long getDefaultBlockSize() {
813    return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
814  }
815
816  /**
817   * Set the working directory to the given directory.
818   */
819  @Override
820  public void setWorkingDirectory(Path newDir) {
821    workingDir = newDir;
822  }
823  
824  @Override
825  public Path getWorkingDirectory() {
826    return workingDir;
827  }
828
829  @Override
830  public String getCanonicalServiceName() {
831    // Does not support Token
832    return null;
833  }
834}