001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.s3native; 020 021import java.io.BufferedOutputStream; 022import java.io.EOFException; 023import java.io.File; 024import java.io.FileNotFoundException; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.io.InputStream; 028import java.io.OutputStream; 029import java.net.URI; 030import java.security.DigestOutputStream; 031import java.security.MessageDigest; 032import java.security.NoSuchAlgorithmException; 033import java.util.ArrayList; 034import java.util.HashMap; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.TreeSet; 039import java.util.concurrent.TimeUnit; 040 041import com.google.common.base.Preconditions; 042import org.apache.hadoop.classification.InterfaceAudience; 043import org.apache.hadoop.classification.InterfaceStability; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.BufferedFSInputStream; 046import org.apache.hadoop.fs.FSDataInputStream; 047import org.apache.hadoop.fs.FSDataOutputStream; 048import org.apache.hadoop.fs.FSExceptionMessages; 049import org.apache.hadoop.fs.FSInputStream; 050import org.apache.hadoop.fs.FileAlreadyExistsException; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.FileSystem; 053import org.apache.hadoop.fs.LocalDirAllocator; 054import org.apache.hadoop.fs.Path; 055import org.apache.hadoop.fs.permission.FsPermission; 056import org.apache.hadoop.io.IOUtils; 057import org.apache.hadoop.io.retry.RetryPolicies; 058import org.apache.hadoop.io.retry.RetryPolicy; 059import org.apache.hadoop.io.retry.RetryProxy; 060import org.apache.hadoop.util.Progressable; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_BUFFER_DIR_DEFAULT; 065import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_BUFFER_DIR_KEY; 066import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_MAX_RETRIES_DEFAUL; 067import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_MAX_RETRIES_KEY; 068import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_SLEEP_TIME_DEFAULT; 069import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_SLEEP_TIME_KEY; 070import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.addDeprecatedConfigKeys; 071 072/** 073 * A {@link FileSystem} for reading and writing files stored on 074 * <a href="http://aws.amazon.com/s3">Amazon S3</a>. 075 * This implementation stores files on S3 in their native form so they can be 076 * read by other S3 tools. 077 * <p> 078 * A note about directories. S3 of course has no "native" support for them. 079 * The idiom we choose then is: for any directory created by this class, 080 * we use an empty object "#{dirpath}_$folder$" as a marker. 081 * Further, to interoperate with other S3 tools, we also accept the following: 082 * <ul> 083 * <li>an object "#{dirpath}/' denoting a directory marker</li> 084 * <li> 085 * if there exists any objects with the prefix "#{dirpath}/", then the 086 * directory is said to exist 087 * </li> 088 * <li> 089 * if both a file with the name of a directory and a marker for that 090 * directory exists, then the *file masks the directory*, and the directory 091 * is never returned. 092 * </li> 093 * </ul> 094 */ 095@InterfaceAudience.Public 096@InterfaceStability.Stable 097public class NativeS3FileSystem extends FileSystem { 098 099 public static final Logger LOG = 100 LoggerFactory.getLogger(NativeS3FileSystem.class); 101 102 private static final String FOLDER_SUFFIX = "_$folder$"; 103 static final String PATH_DELIMITER = Path.SEPARATOR; 104 private static final int S3_MAX_LISTING_LENGTH = 1000; 105 106 static { 107 // Add the deprecated config keys 108 addDeprecatedConfigKeys(); 109 } 110 111 static class NativeS3FsInputStream extends FSInputStream { 112 113 private NativeFileSystemStore store; 114 private Statistics statistics; 115 private InputStream in; 116 private final String key; 117 private long pos = 0; 118 119 public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) { 120 Preconditions.checkNotNull(in, "Null input stream"); 121 this.store = store; 122 this.statistics = statistics; 123 this.in = in; 124 this.key = key; 125 } 126 127 @Override 128 public synchronized int read() throws IOException { 129 int result; 130 try { 131 result = in.read(); 132 } catch (IOException e) { 133 LOG.info("Received IOException while reading '{}', attempting to reopen", 134 key); 135 LOG.debug("{}", e, e); 136 try { 137 reopen(pos); 138 result = in.read(); 139 } catch (EOFException eof) { 140 LOG.debug("EOF on input stream read: {}", eof, eof); 141 result = -1; 142 } 143 } 144 if (result != -1) { 145 pos++; 146 } 147 if (statistics != null && result != -1) { 148 statistics.incrementBytesRead(1); 149 } 150 return result; 151 } 152 @Override 153 public synchronized int read(byte[] b, int off, int len) 154 throws IOException { 155 if (in == null) { 156 throw new EOFException("Cannot read closed stream"); 157 } 158 int result = -1; 159 try { 160 result = in.read(b, off, len); 161 } catch (EOFException eof) { 162 throw eof; 163 } catch (IOException e) { 164 LOG.info( "Received IOException while reading '{}'," + 165 " attempting to reopen.", key); 166 reopen(pos); 167 result = in.read(b, off, len); 168 } 169 if (result > 0) { 170 pos += result; 171 } 172 if (statistics != null && result > 0) { 173 statistics.incrementBytesRead(result); 174 } 175 return result; 176 } 177 178 @Override 179 public synchronized void close() throws IOException { 180 closeInnerStream(); 181 } 182 183 /** 184 * Close the inner stream if not null. Even if an exception 185 * is raised during the close, the field is set to null 186 */ 187 private void closeInnerStream() { 188 IOUtils.closeStream(in); 189 in = null; 190 } 191 192 /** 193 * Reopen a new input stream with the specified position 194 * @param pos the position to reopen a new stream 195 * @throws IOException 196 */ 197 private synchronized void reopen(long pos) throws IOException { 198 LOG.debug("Reopening key '{}' for reading at position '{}", key, pos); 199 InputStream newStream = store.retrieve(key, pos); 200 updateInnerStream(newStream, pos); 201 } 202 203 /** 204 * Update inner stream with a new stream and position 205 * @param newStream new stream -must not be null 206 * @param newpos new position 207 * @throws IOException IO exception on a failure to close the existing 208 * stream. 209 */ 210 private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException { 211 Preconditions.checkNotNull(newStream, "Null newstream argument"); 212 closeInnerStream(); 213 in = newStream; 214 this.pos = newpos; 215 } 216 217 @Override 218 public synchronized void seek(long newpos) throws IOException { 219 if (newpos < 0) { 220 throw new EOFException( 221 FSExceptionMessages.NEGATIVE_SEEK); 222 } 223 if (pos != newpos) { 224 // the seek is attempting to move the current position 225 reopen(newpos); 226 } 227 } 228 229 @Override 230 public synchronized long getPos() throws IOException { 231 return pos; 232 } 233 @Override 234 public boolean seekToNewSource(long targetPos) throws IOException { 235 return false; 236 } 237 } 238 239 private class NativeS3FsOutputStream extends OutputStream { 240 241 private Configuration conf; 242 private String key; 243 private File backupFile; 244 private OutputStream backupStream; 245 private MessageDigest digest; 246 private boolean closed; 247 private LocalDirAllocator lDirAlloc; 248 249 public NativeS3FsOutputStream(Configuration conf, 250 NativeFileSystemStore store, String key, Progressable progress, 251 int bufferSize) throws IOException { 252 this.conf = conf; 253 this.key = key; 254 this.backupFile = newBackupFile(); 255 LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'"); 256 try { 257 this.digest = MessageDigest.getInstance("MD5"); 258 this.backupStream = new BufferedOutputStream(new DigestOutputStream( 259 new FileOutputStream(backupFile), this.digest)); 260 } catch (NoSuchAlgorithmException e) { 261 LOG.warn("Cannot load MD5 digest algorithm," + 262 "skipping message integrity check.", e); 263 this.backupStream = new BufferedOutputStream( 264 new FileOutputStream(backupFile)); 265 } 266 } 267 268 private File newBackupFile() throws IOException { 269 if (conf.get(S3_NATIVE_BUFFER_DIR_KEY, null) != null) { 270 lDirAlloc = new LocalDirAllocator(S3_NATIVE_BUFFER_DIR_KEY); 271 } else { 272 lDirAlloc = new LocalDirAllocator(S3_NATIVE_BUFFER_DIR_DEFAULT); 273 } 274 File result = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf); 275 result.deleteOnExit(); 276 return result; 277 } 278 279 @Override 280 public void flush() throws IOException { 281 backupStream.flush(); 282 } 283 284 @Override 285 public synchronized void close() throws IOException { 286 if (closed) { 287 return; 288 } 289 290 backupStream.close(); 291 LOG.info("OutputStream for key '{}' closed. Now beginning upload", key); 292 293 try { 294 byte[] md5Hash = digest == null ? null : digest.digest(); 295 store.storeFile(key, backupFile, md5Hash); 296 } finally { 297 if (!backupFile.delete()) { 298 LOG.warn("Could not delete temporary s3n file: " + backupFile); 299 } 300 super.close(); 301 closed = true; 302 } 303 LOG.info("OutputStream for key '{}' upload complete", key); 304 } 305 306 @Override 307 public void write(int b) throws IOException { 308 backupStream.write(b); 309 } 310 311 @Override 312 public void write(byte[] b, int off, int len) throws IOException { 313 backupStream.write(b, off, len); 314 } 315 } 316 317 private URI uri; 318 private NativeFileSystemStore store; 319 private Path workingDir; 320 321 public NativeS3FileSystem() { 322 // set store in initialize() 323 } 324 325 public NativeS3FileSystem(NativeFileSystemStore store) { 326 this.store = store; 327 } 328 329 /** 330 * Return the protocol scheme for the FileSystem. 331 * 332 * @return <code>s3n</code> 333 */ 334 @Override 335 public String getScheme() { 336 return "s3n"; 337 } 338 339 @Override 340 public void initialize(URI uri, Configuration conf) throws IOException { 341 super.initialize(uri, conf); 342 if (store == null) { 343 store = createDefaultStore(conf); 344 } 345 store.initialize(uri, conf); 346 setConf(conf); 347 this.uri = S3xLoginHelper.buildFSURI(uri); 348 this.workingDir = 349 new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory()); 350 } 351 352 private static NativeFileSystemStore createDefaultStore(Configuration conf) { 353 NativeFileSystemStore store = new Jets3tNativeFileSystemStore(); 354 355 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 356 conf.getInt(S3_NATIVE_MAX_RETRIES_KEY, S3_NATIVE_MAX_RETRIES_DEFAUL), 357 conf.getLong(S3_NATIVE_SLEEP_TIME_KEY, S3_NATIVE_SLEEP_TIME_DEFAULT), 358 TimeUnit.SECONDS); 359 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = 360 new HashMap<Class<? extends Exception>, RetryPolicy>(); 361 exceptionToPolicyMap.put(IOException.class, basePolicy); 362 exceptionToPolicyMap.put(S3Exception.class, basePolicy); 363 364 RetryPolicy methodPolicy = RetryPolicies.retryByException( 365 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 366 Map<String, RetryPolicy> methodNameToPolicyMap = 367 new HashMap<String, RetryPolicy>(); 368 methodNameToPolicyMap.put("storeFile", methodPolicy); 369 methodNameToPolicyMap.put("rename", methodPolicy); 370 371 return (NativeFileSystemStore) 372 RetryProxy.create(NativeFileSystemStore.class, store, 373 methodNameToPolicyMap); 374 } 375 376 private static String pathToKey(Path path) { 377 if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) { 378 // allow uris without trailing slash after bucket to refer to root, 379 // like s3n://mybucket 380 return ""; 381 } 382 if (!path.isAbsolute()) { 383 throw new IllegalArgumentException("Path must be absolute: " + path); 384 } 385 String ret = path.toUri().getPath().substring(1); // remove initial slash 386 if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) { 387 ret = ret.substring(0, ret.length() -1); 388 } 389 return ret; 390 } 391 392 private static Path keyToPath(String key) { 393 return new Path("/" + key); 394 } 395 396 private Path makeAbsolute(Path path) { 397 if (path.isAbsolute()) { 398 return path; 399 } 400 return new Path(workingDir, path); 401 } 402 403 /** 404 * Check that a Path belongs to this FileSystem. 405 * Unlike the superclass, this version does not look at authority, 406 * only hostnames. 407 * @param path to check 408 * @throws IllegalArgumentException if there is an FS mismatch 409 */ 410 @Override 411 protected void checkPath(Path path) { 412 S3xLoginHelper.checkPath(getConf(), getUri(), path, getDefaultPort()); 413 } 414 415 @Override 416 protected URI canonicalizeUri(URI rawUri) { 417 return S3xLoginHelper.canonicalizeUri(rawUri, getDefaultPort()); 418 } 419 420 /** This optional operation is not yet supported. */ 421 @Override 422 public FSDataOutputStream append(Path f, int bufferSize, 423 Progressable progress) throws IOException { 424 throw new UnsupportedOperationException("Append is not supported " 425 + "by NativeS3FileSystem"); 426 } 427 428 @Override 429 public FSDataOutputStream create(Path f, FsPermission permission, 430 boolean overwrite, int bufferSize, short replication, long blockSize, 431 Progressable progress) throws IOException { 432 433 if (exists(f) && !overwrite) { 434 throw new FileAlreadyExistsException("File already exists: " + f); 435 } 436 437 if(LOG.isDebugEnabled()) { 438 LOG.debug("Creating new file '" + f + "' in S3"); 439 } 440 Path absolutePath = makeAbsolute(f); 441 String key = pathToKey(absolutePath); 442 return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store, 443 key, progress, bufferSize), statistics); 444 } 445 446 @Override 447 public boolean delete(Path f, boolean recurse) throws IOException { 448 FileStatus status; 449 try { 450 status = getFileStatus(f); 451 } catch (FileNotFoundException e) { 452 if(LOG.isDebugEnabled()) { 453 LOG.debug("Delete called for '" + f + 454 "' but file does not exist, so returning false"); 455 } 456 return false; 457 } 458 Path absolutePath = makeAbsolute(f); 459 String key = pathToKey(absolutePath); 460 if (status.isDirectory()) { 461 if (!recurse && listStatus(f).length > 0) { 462 throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false"); 463 } 464 465 createParent(f); 466 467 if(LOG.isDebugEnabled()) { 468 LOG.debug("Deleting directory '" + f + "'"); 469 } 470 String priorLastKey = null; 471 do { 472 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true); 473 for (FileMetadata file : listing.getFiles()) { 474 store.delete(file.getKey()); 475 } 476 priorLastKey = listing.getPriorLastKey(); 477 } while (priorLastKey != null); 478 479 try { 480 store.delete(key + FOLDER_SUFFIX); 481 } catch (FileNotFoundException e) { 482 //this is fine, we don't require a marker 483 } 484 } else { 485 if(LOG.isDebugEnabled()) { 486 LOG.debug("Deleting file '" + f + "'"); 487 } 488 createParent(f); 489 store.delete(key); 490 } 491 return true; 492 } 493 494 @Override 495 public FileStatus getFileStatus(Path f) throws IOException { 496 Path absolutePath = makeAbsolute(f); 497 String key = pathToKey(absolutePath); 498 499 if (key.length() == 0) { // root always exists 500 return newDirectory(absolutePath); 501 } 502 503 if(LOG.isDebugEnabled()) { 504 LOG.debug("getFileStatus retrieving metadata for key '" + key + "'"); 505 } 506 FileMetadata meta = store.retrieveMetadata(key); 507 if (meta != null) { 508 if(LOG.isDebugEnabled()) { 509 LOG.debug("getFileStatus returning 'file' for key '" + key + "'"); 510 } 511 return newFile(meta, absolutePath); 512 } 513 if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) { 514 if(LOG.isDebugEnabled()) { 515 LOG.debug("getFileStatus returning 'directory' for key '" + key + 516 "' as '" + key + FOLDER_SUFFIX + "' exists"); 517 } 518 return newDirectory(absolutePath); 519 } 520 521 if(LOG.isDebugEnabled()) { 522 LOG.debug("getFileStatus listing key '" + key + "'"); 523 } 524 PartialListing listing = store.list(key, 1); 525 if (listing.getFiles().length > 0 || 526 listing.getCommonPrefixes().length > 0) { 527 if(LOG.isDebugEnabled()) { 528 LOG.debug("getFileStatus returning 'directory' for key '" + key + 529 "' as it has contents"); 530 } 531 return newDirectory(absolutePath); 532 } 533 534 if(LOG.isDebugEnabled()) { 535 LOG.debug("getFileStatus could not find key '" + key + "'"); 536 } 537 throw new FileNotFoundException("No such file or directory '" + absolutePath + "'"); 538 } 539 540 @Override 541 public URI getUri() { 542 return uri; 543 } 544 545 /** 546 * <p> 547 * If <code>f</code> is a file, this method will make a single call to S3. 548 * If <code>f</code> is a directory, this method will make a maximum of 549 * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of 550 * files and directories contained directly in <code>f</code>. 551 * </p> 552 */ 553 @Override 554 public FileStatus[] listStatus(Path f) throws IOException { 555 556 Path absolutePath = makeAbsolute(f); 557 String key = pathToKey(absolutePath); 558 559 if (key.length() > 0) { 560 FileMetadata meta = store.retrieveMetadata(key); 561 if (meta != null) { 562 return new FileStatus[] { newFile(meta, absolutePath) }; 563 } 564 } 565 566 URI pathUri = absolutePath.toUri(); 567 Set<FileStatus> status = new TreeSet<FileStatus>(); 568 String priorLastKey = null; 569 do { 570 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false); 571 for (FileMetadata fileMetadata : listing.getFiles()) { 572 Path subpath = keyToPath(fileMetadata.getKey()); 573 String relativePath = pathUri.relativize(subpath.toUri()).getPath(); 574 575 if (fileMetadata.getKey().equals(key + "/")) { 576 // this is just the directory we have been asked to list 577 } 578 else if (relativePath.endsWith(FOLDER_SUFFIX)) { 579 status.add(newDirectory(new Path( 580 absolutePath, 581 relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX))))); 582 } 583 else { 584 status.add(newFile(fileMetadata, subpath)); 585 } 586 } 587 for (String commonPrefix : listing.getCommonPrefixes()) { 588 Path subpath = keyToPath(commonPrefix); 589 String relativePath = pathUri.relativize(subpath.toUri()).getPath(); 590 status.add(newDirectory(new Path(absolutePath, relativePath))); 591 } 592 priorLastKey = listing.getPriorLastKey(); 593 } while (priorLastKey != null); 594 595 if (status.isEmpty() && 596 key.length() > 0 && 597 store.retrieveMetadata(key + FOLDER_SUFFIX) == null) { 598 throw new FileNotFoundException("File " + f + " does not exist."); 599 } 600 601 return status.toArray(new FileStatus[status.size()]); 602 } 603 604 private FileStatus newFile(FileMetadata meta, Path path) { 605 return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(), 606 meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory())); 607 } 608 609 private FileStatus newDirectory(Path path) { 610 return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory())); 611 } 612 613 @Override 614 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 615 Path absolutePath = makeAbsolute(f); 616 List<Path> paths = new ArrayList<Path>(); 617 do { 618 paths.add(0, absolutePath); 619 absolutePath = absolutePath.getParent(); 620 } while (absolutePath != null); 621 622 boolean result = true; 623 for (Path path : paths) { 624 result &= mkdir(path); 625 } 626 return result; 627 } 628 629 private boolean mkdir(Path f) throws IOException { 630 try { 631 FileStatus fileStatus = getFileStatus(f); 632 if (fileStatus.isFile()) { 633 throw new FileAlreadyExistsException(String.format( 634 "Can't make directory for path '%s' since it is a file.", f)); 635 636 } 637 } catch (FileNotFoundException e) { 638 if(LOG.isDebugEnabled()) { 639 LOG.debug("Making dir '" + f + "' in S3"); 640 } 641 String key = pathToKey(f) + FOLDER_SUFFIX; 642 store.storeEmptyFile(key); 643 } 644 return true; 645 } 646 647 @Override 648 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 649 FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist 650 if (fs.isDirectory()) { 651 throw new FileNotFoundException("'" + f + "' is a directory"); 652 } 653 LOG.info("Opening '" + f + "' for reading"); 654 Path absolutePath = makeAbsolute(f); 655 String key = pathToKey(absolutePath); 656 return new FSDataInputStream(new BufferedFSInputStream( 657 new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize)); 658 } 659 660 // rename() and delete() use this method to ensure that the parent directory 661 // of the source does not vanish. 662 private void createParent(Path path) throws IOException { 663 Path parent = path.getParent(); 664 if (parent != null) { 665 String key = pathToKey(makeAbsolute(parent)); 666 if (key.length() > 0) { 667 store.storeEmptyFile(key + FOLDER_SUFFIX); 668 } 669 } 670 } 671 672 673 @Override 674 public boolean rename(Path src, Path dst) throws IOException { 675 676 String srcKey = pathToKey(makeAbsolute(src)); 677 final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - "; 678 679 if (srcKey.length() == 0) { 680 // Cannot rename root of file system 681 if (LOG.isDebugEnabled()) { 682 LOG.debug(debugPreamble + 683 "returning false as cannot rename the root of a filesystem"); 684 } 685 return false; 686 } 687 688 //get status of source 689 boolean srcIsFile; 690 try { 691 srcIsFile = getFileStatus(src).isFile(); 692 } catch (FileNotFoundException e) { 693 //bail out fast if the source does not exist 694 if (LOG.isDebugEnabled()) { 695 LOG.debug(debugPreamble + "returning false as src does not exist"); 696 } 697 return false; 698 } 699 // Figure out the final destination 700 String dstKey = pathToKey(makeAbsolute(dst)); 701 702 try { 703 boolean dstIsFile = getFileStatus(dst).isFile(); 704 if (dstIsFile) { 705 //destination is a file. 706 //you can't copy a file or a directory onto an existing file 707 //except for the special case of dest==src, which is a no-op 708 if(LOG.isDebugEnabled()) { 709 LOG.debug(debugPreamble + 710 "returning without rename as dst is an already existing file"); 711 } 712 //exit, returning true iff the rename is onto self 713 return srcKey.equals(dstKey); 714 } else { 715 //destination exists and is a directory 716 if(LOG.isDebugEnabled()) { 717 LOG.debug(debugPreamble + "using dst as output directory"); 718 } 719 //destination goes under the dst path, with the name of the 720 //source entry 721 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); 722 } 723 } catch (FileNotFoundException e) { 724 //destination does not exist => the source file or directory 725 //is copied over with the name of the destination 726 if(LOG.isDebugEnabled()) { 727 LOG.debug(debugPreamble + "using dst as output destination"); 728 } 729 try { 730 if (getFileStatus(dst.getParent()).isFile()) { 731 if(LOG.isDebugEnabled()) { 732 LOG.debug(debugPreamble + 733 "returning false as dst parent exists and is a file"); 734 } 735 return false; 736 } 737 } catch (FileNotFoundException ex) { 738 if(LOG.isDebugEnabled()) { 739 LOG.debug(debugPreamble + 740 "returning false as dst parent does not exist"); 741 } 742 return false; 743 } 744 } 745 746 //rename to self behavior follows Posix rules and is different 747 //for directories and files -the return code is driven by src type 748 if (srcKey.equals(dstKey)) { 749 //fully resolved destination key matches source: fail 750 if (LOG.isDebugEnabled()) { 751 LOG.debug(debugPreamble + "renamingToSelf; returning true"); 752 } 753 return true; 754 } 755 if (srcIsFile) { 756 //source is a file; COPY then DELETE 757 if(LOG.isDebugEnabled()) { 758 LOG.debug(debugPreamble + 759 "src is file, so doing copy then delete in S3"); 760 } 761 store.copy(srcKey, dstKey); 762 store.delete(srcKey); 763 } else { 764 //src is a directory 765 if(LOG.isDebugEnabled()) { 766 LOG.debug(debugPreamble + "src is directory, so copying contents"); 767 } 768 //Verify dest is not a child of the parent 769 if (dstKey.startsWith(srcKey + "/")) { 770 if (LOG.isDebugEnabled()) { 771 LOG.debug( 772 debugPreamble + "cannot rename a directory to a subdirectory of self"); 773 } 774 return false; 775 } 776 //create the subdir under the destination 777 store.storeEmptyFile(dstKey + FOLDER_SUFFIX); 778 779 List<String> keysToDelete = new ArrayList<String>(); 780 String priorLastKey = null; 781 do { 782 PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true); 783 for (FileMetadata file : listing.getFiles()) { 784 keysToDelete.add(file.getKey()); 785 store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length())); 786 } 787 priorLastKey = listing.getPriorLastKey(); 788 } while (priorLastKey != null); 789 790 if(LOG.isDebugEnabled()) { 791 LOG.debug(debugPreamble + 792 "all files in src copied, now removing src files"); 793 } 794 for (String key: keysToDelete) { 795 store.delete(key); 796 } 797 798 try { 799 store.delete(srcKey + FOLDER_SUFFIX); 800 } catch (FileNotFoundException e) { 801 //this is fine, we don't require a marker 802 } 803 if(LOG.isDebugEnabled()) { 804 LOG.debug(debugPreamble + "done"); 805 } 806 } 807 808 return true; 809 } 810 811 @Override 812 public long getDefaultBlockSize() { 813 return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024); 814 } 815 816 /** 817 * Set the working directory to the given directory. 818 */ 819 @Override 820 public void setWorkingDirectory(Path newDir) { 821 workingDir = newDir; 822 } 823 824 @Override 825 public Path getWorkingDirectory() { 826 return workingDir; 827 } 828 829 @Override 830 public String getCanonicalServiceName() { 831 // Does not support Token 832 return null; 833 } 834}