001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.azure; 020 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.EOFException; 024import java.io.FileNotFoundException; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.OutputStream; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.nio.charset.Charset; 031import java.text.SimpleDateFormat; 032import java.util.ArrayList; 033import java.util.Date; 034import java.util.EnumSet; 035import java.util.Set; 036import java.util.TimeZone; 037import java.util.TreeSet; 038import java.util.UUID; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.regex.Matcher; 041import java.util.regex.Pattern; 042 043import org.apache.commons.lang.StringUtils; 044import org.apache.hadoop.classification.InterfaceAudience; 045import org.apache.hadoop.classification.InterfaceStability; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.BlockLocation; 048import org.apache.hadoop.fs.BufferedFSInputStream; 049import org.apache.hadoop.fs.CreateFlag; 050import org.apache.hadoop.fs.FSDataInputStream; 051import org.apache.hadoop.fs.FSDataOutputStream; 052import org.apache.hadoop.fs.FSExceptionMessages; 053import org.apache.hadoop.fs.FSInputStream; 054import org.apache.hadoop.fs.FileAlreadyExistsException; 055import org.apache.hadoop.fs.FileStatus; 056import org.apache.hadoop.fs.FileSystem; 057import org.apache.hadoop.fs.Path; 058import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; 059import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem; 060import org.apache.hadoop.fs.permission.FsPermission; 061import org.apache.hadoop.fs.permission.PermissionStatus; 062import org.apache.hadoop.io.IOUtils; 063import org.apache.hadoop.security.UserGroupInformation; 064import org.apache.hadoop.util.Progressable; 065import org.apache.hadoop.util.Time; 066import org.codehaus.jackson.JsonNode; 067import org.codehaus.jackson.JsonParseException; 068import org.codehaus.jackson.JsonParser; 069import org.codehaus.jackson.map.JsonMappingException; 070import org.codehaus.jackson.map.ObjectMapper; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import com.google.common.annotations.VisibleForTesting; 075import com.microsoft.azure.storage.StorageException; 076 077/** 078 * A {@link FileSystem} for reading and writing files stored on <a 079 * href="http://store.azure.com/">Windows Azure</a>. This implementation is 080 * blob-based and stores files on Azure in their native form so they can be read 081 * by other Azure tools. 082 */ 083@InterfaceAudience.Public 084@InterfaceStability.Stable 085public class NativeAzureFileSystem extends FileSystem { 086 private static final int USER_WX_PERMISION = 0300; 087 /** 088 * A description of a folder rename operation, including the source and 089 * destination keys, and descriptions of the files in the source folder. 090 */ 091 092 public static class FolderRenamePending { 093 private SelfRenewingLease folderLease; 094 private String srcKey; 095 private String dstKey; 096 private FileMetadata[] fileMetadata = null; // descriptions of source files 097 private ArrayList<String> fileStrings = null; 098 private NativeAzureFileSystem fs; 099 private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000; 100 private static final int FORMATTING_BUFFER = 10000; 101 private boolean committed; 102 public static final String SUFFIX = "-RenamePending.json"; 103 104 // Prepare in-memory information needed to do or redo a folder rename. 105 public FolderRenamePending(String srcKey, String dstKey, SelfRenewingLease lease, 106 NativeAzureFileSystem fs) throws IOException { 107 this.srcKey = srcKey; 108 this.dstKey = dstKey; 109 this.folderLease = lease; 110 this.fs = fs; 111 ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>(); 112 113 // List all the files in the folder. 114 long start = Time.monotonicNow(); 115 String priorLastKey = null; 116 do { 117 PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL, 118 AZURE_UNBOUNDED_DEPTH, priorLastKey); 119 for(FileMetadata file : listing.getFiles()) { 120 fileMetadataList.add(file); 121 } 122 priorLastKey = listing.getPriorLastKey(); 123 } while (priorLastKey != null); 124 fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]); 125 long end = Time.monotonicNow(); 126 LOG.debug("Time taken to list {} blobs for rename operation is: {} ms", fileMetadata.length, (end - start)); 127 128 this.committed = true; 129 } 130 131 // Prepare in-memory information needed to do or redo folder rename from 132 // a -RenamePending.json file read from storage. This constructor is to use during 133 // redo processing. 134 public FolderRenamePending(Path redoFile, NativeAzureFileSystem fs) 135 throws IllegalArgumentException, IOException { 136 137 this.fs = fs; 138 139 // open redo file 140 Path f = redoFile; 141 FSDataInputStream input = fs.open(f); 142 byte[] bytes = new byte[MAX_RENAME_PENDING_FILE_SIZE]; 143 int l = input.read(bytes); 144 if (l <= 0) { 145 // Jira HADOOP-12678 -Handle empty rename pending metadata file during 146 // atomic rename in redo path. If during renamepending file is created 147 // but not written yet, then this means that rename operation 148 // has not started yet. So we should delete rename pending metadata file. 149 LOG.error("Deleting empty rename pending file " 150 + redoFile + " -- no data available"); 151 deleteRenamePendingFile(fs, redoFile); 152 return; 153 } 154 if (l == MAX_RENAME_PENDING_FILE_SIZE) { 155 throw new IOException( 156 "Error reading pending rename file contents -- " 157 + "maximum file size exceeded"); 158 } 159 String contents = new String(bytes, 0, l, Charset.forName("UTF-8")); 160 161 // parse the JSON 162 ObjectMapper objMapper = new ObjectMapper(); 163 objMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); 164 JsonNode json = null; 165 try { 166 json = objMapper.readValue(contents, JsonNode.class); 167 this.committed = true; 168 } catch (JsonMappingException e) { 169 170 // The -RedoPending.json file is corrupted, so we assume it was 171 // not completely written 172 // and the redo operation did not commit. 173 this.committed = false; 174 } catch (JsonParseException e) { 175 this.committed = false; 176 } catch (IOException e) { 177 this.committed = false; 178 } 179 180 if (!this.committed) { 181 LOG.error("Deleting corruped rename pending file {} \n {}", 182 redoFile, contents); 183 184 // delete the -RenamePending.json file 185 deleteRenamePendingFile(fs, redoFile); 186 return; 187 } 188 189 // initialize this object's fields 190 ArrayList<String> fileStrList = new ArrayList<String>(); 191 JsonNode oldFolderName = json.get("OldFolderName"); 192 JsonNode newFolderName = json.get("NewFolderName"); 193 if (oldFolderName == null || newFolderName == null) { 194 this.committed = false; 195 } else { 196 this.srcKey = oldFolderName.getTextValue(); 197 this.dstKey = newFolderName.getTextValue(); 198 if (this.srcKey == null || this.dstKey == null) { 199 this.committed = false; 200 } else { 201 JsonNode fileList = json.get("FileList"); 202 if (fileList == null) { 203 this.committed = false; 204 } else { 205 for (int i = 0; i < fileList.size(); i++) { 206 fileStrList.add(fileList.get(i).getTextValue()); 207 } 208 } 209 } 210 } 211 this.fileStrings = fileStrList; 212 } 213 214 public FileMetadata[] getFiles() { 215 return fileMetadata; 216 } 217 218 public SelfRenewingLease getFolderLease() { 219 return folderLease; 220 } 221 222 /** 223 * Deletes rename pending metadata file 224 * @param fs -- the file system 225 * @param redoFile - rename pending metadata file path 226 * @throws IOException - If deletion fails 227 */ 228 @VisibleForTesting 229 void deleteRenamePendingFile(FileSystem fs, Path redoFile) 230 throws IOException { 231 try { 232 fs.delete(redoFile, false); 233 } catch (IOException e) { 234 // If the rename metadata was not found then somebody probably 235 // raced with us and finished the delete first 236 Throwable t = e.getCause(); 237 if (t != null && t instanceof StorageException 238 && "BlobNotFound".equals(((StorageException) t).getErrorCode())) { 239 LOG.warn("rename pending file " + redoFile + " is already deleted"); 240 } else { 241 throw e; 242 } 243 } 244 } 245 246 /** 247 * Write to disk the information needed to redo folder rename, 248 * in JSON format. The file name will be 249 * {@code wasb://<sourceFolderPrefix>/folderName-RenamePending.json} 250 * The file format will be: 251 * <pre>{@code 252 * { 253 * FormatVersion: "1.0", 254 * OperationTime: "<YYYY-MM-DD HH:MM:SS.MMM>", 255 * OldFolderName: "<key>", 256 * NewFolderName: "<key>", 257 * FileList: [ <string> , <string> , ... ] 258 * } 259 * 260 * Here's a sample: 261 * { 262 * FormatVersion: "1.0", 263 * OperationUTCTime: "2014-07-01 23:50:35.572", 264 * OldFolderName: "user/ehans/folderToRename", 265 * NewFolderName: "user/ehans/renamedFolder", 266 * FileList: [ 267 * "innerFile", 268 * "innerFile2" 269 * ] 270 * } }</pre> 271 * @throws IOException 272 */ 273 public void writeFile(FileSystem fs) throws IOException { 274 Path path = getRenamePendingFilePath(); 275 LOG.debug("Preparing to write atomic rename state to {}", path.toString()); 276 OutputStream output = null; 277 278 String contents = makeRenamePendingFileContents(); 279 280 // Write file. 281 try { 282 output = fs.create(path); 283 output.write(contents.getBytes(Charset.forName("UTF-8"))); 284 } catch (IOException e) { 285 throw new IOException("Unable to write RenamePending file for folder rename from " 286 + srcKey + " to " + dstKey, e); 287 } finally { 288 NativeAzureFileSystemHelper.cleanup(LOG, output); 289 } 290 } 291 292 /** 293 * Return the contents of the JSON file to represent the operations 294 * to be performed for a folder rename. 295 */ 296 public String makeRenamePendingFileContents() { 297 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 298 sdf.setTimeZone(TimeZone.getTimeZone("UTC")); 299 String time = sdf.format(new Date()); 300 301 // Make file list string 302 StringBuilder builder = new StringBuilder(); 303 builder.append("[\n"); 304 for (int i = 0; i != fileMetadata.length; i++) { 305 if (i > 0) { 306 builder.append(",\n"); 307 } 308 builder.append(" "); 309 String noPrefix = StringUtils.removeStart(fileMetadata[i].getKey(), srcKey + "/"); 310 311 // Quote string file names, escaping any possible " characters or other 312 // necessary characters in the name. 313 builder.append(quote(noPrefix)); 314 if (builder.length() >= 315 MAX_RENAME_PENDING_FILE_SIZE - FORMATTING_BUFFER) { 316 317 // Give up now to avoid using too much memory. 318 LOG.error("Internal error: Exceeded maximum rename pending file size of {} bytes.", 319 MAX_RENAME_PENDING_FILE_SIZE); 320 321 // return some bad JSON with an error message to make it human readable 322 return "exceeded maximum rename pending file size"; 323 } 324 } 325 builder.append("\n ]"); 326 String fileList = builder.toString(); 327 328 // Make file contents as a string. Again, quote file names, escaping 329 // characters as appropriate. 330 String contents = "{\n" 331 + " FormatVersion: \"1.0\",\n" 332 + " OperationUTCTime: \"" + time + "\",\n" 333 + " OldFolderName: " + quote(srcKey) + ",\n" 334 + " NewFolderName: " + quote(dstKey) + ",\n" 335 + " FileList: " + fileList + "\n" 336 + "}\n"; 337 338 return contents; 339 } 340 341 /** 342 * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote 343 * method. 344 * 345 * Produce a string in double quotes with backslash sequences in all the 346 * right places. A backslash will be inserted within </, allowing JSON 347 * text to be delivered in HTML. In JSON text, a string cannot contain a 348 * control character or an unescaped quote or backslash. 349 * @param string A String 350 * @return A String correctly formatted for insertion in a JSON text. 351 */ 352 private String quote(String string) { 353 if (string == null || string.length() == 0) { 354 return "\"\""; 355 } 356 357 char c = 0; 358 int i; 359 int len = string.length(); 360 StringBuilder sb = new StringBuilder(len + 4); 361 String t; 362 363 sb.append('"'); 364 for (i = 0; i < len; i += 1) { 365 c = string.charAt(i); 366 switch (c) { 367 case '\\': 368 case '"': 369 sb.append('\\'); 370 sb.append(c); 371 break; 372 case '/': 373 sb.append('\\'); 374 sb.append(c); 375 break; 376 case '\b': 377 sb.append("\\b"); 378 break; 379 case '\t': 380 sb.append("\\t"); 381 break; 382 case '\n': 383 sb.append("\\n"); 384 break; 385 case '\f': 386 sb.append("\\f"); 387 break; 388 case '\r': 389 sb.append("\\r"); 390 break; 391 default: 392 if (c < ' ') { 393 t = "000" + Integer.toHexString(c); 394 sb.append("\\u" + t.substring(t.length() - 4)); 395 } else { 396 sb.append(c); 397 } 398 } 399 } 400 sb.append('"'); 401 return sb.toString(); 402 } 403 404 public String getSrcKey() { 405 return srcKey; 406 } 407 408 public String getDstKey() { 409 return dstKey; 410 } 411 412 public FileMetadata getSourceMetadata() throws IOException { 413 return fs.getStoreInterface().retrieveMetadata(srcKey); 414 } 415 416 /** 417 * Execute a folder rename. This is the execution path followed 418 * when everything is working normally. See redo() for the alternate 419 * execution path for the case where we're recovering from a folder rename 420 * failure. 421 * @throws IOException 422 */ 423 public void execute() throws IOException { 424 425 AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() { 426 @Override 427 public boolean execute(FileMetadata file) throws IOException{ 428 renameFile(file); 429 return true; 430 } 431 }; 432 433 AzureFileSystemThreadPoolExecutor executor = this.fs.getThreadPoolExecutor(this.fs.renameThreadCount, 434 "AzureBlobRenameThread", "Rename", getSrcKey(), AZURE_RENAME_THREADS); 435 436 executor.executeParallel(this.getFiles(), task); 437 438 // Rename the source folder 0-byte root file itself. 439 FileMetadata srcMetadata2 = this.getSourceMetadata(); 440 if (srcMetadata2.getBlobMaterialization() == 441 BlobMaterialization.Explicit) { 442 443 // It already has a lease on it from the "prepare" phase so there's no 444 // need to get one now. Pass in existing lease to allow file delete. 445 fs.getStoreInterface().rename(this.getSrcKey(), this.getDstKey(), 446 false, folderLease); 447 } 448 449 // Update the last-modified time of the parent folders of both source and 450 // destination. 451 fs.updateParentFolderLastModifiedTime(srcKey); 452 fs.updateParentFolderLastModifiedTime(dstKey); 453 } 454 455 // Rename a single file 456 @VisibleForTesting 457 void renameFile(FileMetadata file) throws IOException{ 458 // Rename all materialized entries under the folder to point to the 459 // final destination. 460 if (file.getBlobMaterialization() == BlobMaterialization.Explicit) { 461 String srcName = file.getKey(); 462 String suffix = srcName.substring((this.getSrcKey()).length()); 463 String dstName = this.getDstKey() + suffix; 464 465 // Rename gets exclusive access (via a lease) for files 466 // designated for atomic rename. 467 // The main use case is for HBase write-ahead log (WAL) and data 468 // folder processing correctness. See the rename code for details. 469 boolean acquireLease = this.fs.getStoreInterface().isAtomicRenameKey(srcName); 470 this.fs.getStoreInterface().rename(srcName, dstName, acquireLease, null); 471 } 472 } 473 474 /** Clean up after execution of rename. 475 * @throws IOException */ 476 public void cleanup() throws IOException { 477 478 if (fs.getStoreInterface().isAtomicRenameKey(srcKey)) { 479 480 // Remove RenamePending file 481 fs.delete(getRenamePendingFilePath(), false); 482 483 // Freeing source folder lease is not necessary since the source 484 // folder file was deleted. 485 } 486 } 487 488 private Path getRenamePendingFilePath() { 489 String fileName = srcKey + SUFFIX; 490 Path fileNamePath = keyToPath(fileName); 491 Path path = fs.makeAbsolute(fileNamePath); 492 return path; 493 } 494 495 /** 496 * Recover from a folder rename failure by redoing the intended work, 497 * as recorded in the -RenamePending.json file. 498 * 499 * @throws IOException 500 */ 501 public void redo() throws IOException { 502 503 if (!committed) { 504 505 // Nothing to do. The -RedoPending.json file should have already been 506 // deleted. 507 return; 508 } 509 510 // Try to get a lease on source folder to block concurrent access to it. 511 // It may fail if the folder is already gone. We don't check if the 512 // source exists explicitly because that could recursively trigger redo 513 // and give an infinite recursion. 514 SelfRenewingLease lease = null; 515 boolean sourceFolderGone = false; 516 try { 517 lease = fs.leaseSourceFolder(srcKey); 518 } catch (AzureException e) { 519 520 // If the source folder was not found then somebody probably 521 // raced with us and finished the rename first, or the 522 // first rename failed right before deleting the rename pending 523 // file. 524 String errorCode = ""; 525 try { 526 StorageException se = (StorageException) e.getCause(); 527 errorCode = se.getErrorCode(); 528 } catch (Exception e2) { 529 ; // do nothing -- could not get errorCode 530 } 531 if (errorCode.equals("BlobNotFound")) { 532 sourceFolderGone = true; 533 } else { 534 throw new IOException( 535 "Unexpected error when trying to lease source folder name during " 536 + "folder rename redo", 537 e); 538 } 539 } 540 541 if (!sourceFolderGone) { 542 // Make sure the target folder exists. 543 Path dst = fullPath(dstKey); 544 if (!fs.exists(dst)) { 545 fs.mkdirs(dst); 546 } 547 548 // For each file inside the folder to be renamed, 549 // make sure it has been renamed. 550 for(String fileName : fileStrings) { 551 finishSingleFileRename(fileName); 552 } 553 554 // Remove the source folder. Don't check explicitly if it exists, 555 // to avoid triggering redo recursively. 556 try { 557 fs.getStoreInterface().delete(srcKey, lease); 558 } catch (Exception e) { 559 LOG.info("Unable to delete source folder during folder rename redo. " 560 + "If the source folder is already gone, this is not an error " 561 + "condition. Continuing with redo.", e); 562 } 563 564 // Update the last-modified time of the parent folders of both source 565 // and destination. 566 fs.updateParentFolderLastModifiedTime(srcKey); 567 fs.updateParentFolderLastModifiedTime(dstKey); 568 } 569 570 // Remove the -RenamePending.json file. 571 fs.delete(getRenamePendingFilePath(), false); 572 } 573 574 // See if the source file is still there, and if it is, rename it. 575 private void finishSingleFileRename(String fileName) 576 throws IOException { 577 Path srcFile = fullPath(srcKey, fileName); 578 Path dstFile = fullPath(dstKey, fileName); 579 String srcName = fs.pathToKey(srcFile); 580 String dstName = fs.pathToKey(dstFile); 581 boolean srcExists = fs.getStoreInterface().explicitFileExists(srcName); 582 boolean dstExists = fs.getStoreInterface().explicitFileExists(dstName); 583 if(srcExists) { 584 // Rename gets exclusive access (via a lease) for HBase write-ahead log 585 // (WAL) file processing correctness. See the rename code for details. 586 fs.getStoreInterface().rename(srcName, dstName, true, null); 587 } else if (!srcExists && dstExists) { 588 // The rename already finished, so do nothing. 589 ; 590 } else { 591 throw new IOException( 592 "Attempting to complete rename of file " + srcKey + "/" + fileName 593 + " during folder rename redo, and file was not found in source " 594 + "or destination."); 595 } 596 } 597 598 // Return an absolute path for the specific fileName within the folder 599 // specified by folderKey. 600 private Path fullPath(String folderKey, String fileName) { 601 return new Path(new Path(fs.getUri()), "/" + folderKey + "/" + fileName); 602 } 603 604 private Path fullPath(String fileKey) { 605 return new Path(new Path(fs.getUri()), "/" + fileKey); 606 } 607 } 608 609 private static final String TRAILING_PERIOD_PLACEHOLDER = "[[.]]"; 610 private static final Pattern TRAILING_PERIOD_PLACEHOLDER_PATTERN = 611 Pattern.compile("\\[\\[\\.\\]\\](?=$|/)"); 612 private static final Pattern TRAILING_PERIOD_PATTERN = Pattern.compile("\\.(?=$|/)"); 613 614 @Override 615 public String getScheme() { 616 return "wasb"; 617 } 618 619 620 /** 621 * <p> 622 * A {@link FileSystem} for reading and writing files stored on <a 623 * href="http://store.azure.com/">Windows Azure</a>. This implementation is 624 * blob-based and stores files on Azure in their native form so they can be read 625 * by other Azure tools. This implementation uses HTTPS for secure network communication. 626 * </p> 627 */ 628 public static class Secure extends NativeAzureFileSystem { 629 @Override 630 public String getScheme() { 631 return "wasbs"; 632 } 633 } 634 635 public static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystem.class); 636 637 static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size"; 638 /** 639 * The time span in seconds before which we consider a temp blob to be 640 * dangling (not being actively uploaded to) and up for reclamation. 641 * 642 * So e.g. if this is 60, then any temporary blobs more than a minute old 643 * would be considered dangling. 644 */ 645 static final String AZURE_TEMP_EXPIRY_PROPERTY_NAME = "fs.azure.fsck.temp.expiry.seconds"; 646 private static final int AZURE_TEMP_EXPIRY_DEFAULT = 3600; 647 static final String PATH_DELIMITER = Path.SEPARATOR; 648 static final String AZURE_TEMP_FOLDER = "_$azuretmpfolder$"; 649 650 private static final int AZURE_LIST_ALL = -1; 651 private static final int AZURE_UNBOUNDED_DEPTH = -1; 652 653 private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L; 654 655 /** 656 * The configuration property that determines which group owns files created 657 * in WASB. 658 */ 659 private static final String AZURE_DEFAULT_GROUP_PROPERTY_NAME = "fs.azure.permissions.supergroup"; 660 /** 661 * The default value for fs.azure.permissions.supergroup. Chosen as the same 662 * default as DFS. 663 */ 664 static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup"; 665 666 static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = 667 "fs.azure.block.location.impersonatedhost"; 668 private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = 669 "localhost"; 670 static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME = 671 "fs.azure.ring.buffer.capacity"; 672 static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME = 673 "fs.azure.output.stream.buffer.size"; 674 675 public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics"; 676 677 /* 678 * Property to enable Append API. 679 */ 680 public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support"; 681 682 /** 683 * The configuration property to set number of threads to be used for rename operation. 684 */ 685 public static final String AZURE_RENAME_THREADS = "fs.azure.rename.threads"; 686 687 /** 688 * The default number of threads to be used for rename operation. 689 */ 690 public static final int DEFAULT_AZURE_RENAME_THREADS = 0; 691 692 /** 693 * The configuration property to set number of threads to be used for delete operation. 694 */ 695 public static final String AZURE_DELETE_THREADS = "fs.azure.delete.threads"; 696 697 /** 698 * The default number of threads to be used for delete operation. 699 */ 700 public static final int DEFAULT_AZURE_DELETE_THREADS = 0; 701 702 /** 703 * The number of threads to be used for delete operation after reading user configuration. 704 */ 705 private int deleteThreadCount = 0; 706 707 /** 708 * The number of threads to be used for rename operation after reading user configuration. 709 */ 710 private int renameThreadCount = 0; 711 712 private class NativeAzureFsInputStream extends FSInputStream { 713 private InputStream in; 714 private final String key; 715 private long pos = 0; 716 private boolean closed = false; 717 private boolean isPageBlob; 718 719 // File length, valid only for streams over block blobs. 720 private long fileLength; 721 722 public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) { 723 this.in = in; 724 this.key = key; 725 this.isPageBlob = store.isPageBlobKey(key); 726 this.fileLength = fileLength; 727 } 728 729 /** 730 * Return the size of the remaining available bytes 731 * if the size is less than or equal to {@link Integer#MAX_VALUE}, 732 * otherwise, return {@link Integer#MAX_VALUE}. 733 * 734 * This is to match the behavior of DFSInputStream.available(), 735 * which some clients may rely on (HBase write-ahead log reading in 736 * particular). 737 */ 738 @Override 739 public synchronized int available() throws IOException { 740 if (isPageBlob) { 741 return in.available(); 742 } else { 743 if (closed) { 744 throw new IOException("Stream closed"); 745 } 746 final long remaining = this.fileLength - pos; 747 return remaining <= Integer.MAX_VALUE ? 748 (int) remaining : Integer.MAX_VALUE; 749 } 750 } 751 752 /* 753 * Reads the next byte of data from the input stream. The value byte is 754 * returned as an integer in the range 0 to 255. If no byte is available 755 * because the end of the stream has been reached, the value -1 is returned. 756 * This method blocks until input data is available, the end of the stream 757 * is detected, or an exception is thrown. 758 * 759 * @returns int An integer corresponding to the byte read. 760 */ 761 @Override 762 public synchronized int read() throws FileNotFoundException, IOException { 763 try { 764 int result = 0; 765 result = in.read(); 766 if (result != -1) { 767 pos++; 768 if (statistics != null) { 769 statistics.incrementBytesRead(1); 770 } 771 } 772 // Return to the caller with the result. 773 // 774 return result; 775 } catch(EOFException e) { 776 return -1; 777 } catch(IOException e) { 778 779 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 780 781 if (innerException instanceof StorageException) { 782 783 LOG.error("Encountered Storage Exception for read on Blob : {}" 784 + " Exception details: {} Error Code : {}", 785 key, e, ((StorageException) innerException).getErrorCode()); 786 787 if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 788 throw new FileNotFoundException(String.format("%s is not found", key)); 789 } 790 } 791 792 throw e; 793 } 794 } 795 796 /* 797 * Reads up to len bytes of data from the input stream into an array of 798 * bytes. An attempt is made to read as many as len bytes, but a smaller 799 * number may be read. The number of bytes actually read is returned as an 800 * integer. This method blocks until input data is available, end of file is 801 * detected, or an exception is thrown. If len is zero, then no bytes are 802 * read and 0 is returned; otherwise, there is an attempt to read at least 803 * one byte. If no byte is available because the stream is at end of file, 804 * the value -1 is returned; otherwise, at least one byte is read and stored 805 * into b. 806 * 807 * @param b -- the buffer into which data is read 808 * 809 * @param off -- the start offset in the array b at which data is written 810 * 811 * @param len -- the maximum number of bytes read 812 * 813 * @ returns int The total number of byes read into the buffer, or -1 if 814 * there is no more data because the end of stream is reached. 815 */ 816 @Override 817 public synchronized int read(byte[] b, int off, int len) throws FileNotFoundException, IOException { 818 try { 819 int result = 0; 820 result = in.read(b, off, len); 821 if (result > 0) { 822 pos += result; 823 } 824 825 if (null != statistics && result > 0) { 826 statistics.incrementBytesRead(result); 827 } 828 829 // Return to the caller with the result. 830 return result; 831 } catch(IOException e) { 832 833 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 834 835 if (innerException instanceof StorageException) { 836 837 LOG.error("Encountered Storage Exception for read on Blob : {}" 838 + " Exception details: {} Error Code : {}", 839 key, e, ((StorageException) innerException).getErrorCode()); 840 841 if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 842 throw new FileNotFoundException(String.format("%s is not found", key)); 843 } 844 } 845 846 throw e; 847 } 848 } 849 850 @Override 851 public synchronized void close() throws IOException { 852 if (!closed) { 853 closed = true; 854 IOUtils.closeStream(in); 855 in = null; 856 } 857 } 858 859 @Override 860 public synchronized void seek(long pos) throws FileNotFoundException, EOFException, IOException { 861 try { 862 checkNotClosed(); 863 if (pos < 0) { 864 throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); 865 } 866 IOUtils.closeStream(in); 867 in = store.retrieve(key); 868 this.pos = in.skip(pos); 869 LOG.debug("Seek to position {}. Bytes skipped {}", pos, 870 this.pos); 871 } catch(IOException e) { 872 873 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 874 875 if (innerException instanceof StorageException 876 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 877 throw new FileNotFoundException(String.format("%s is not found", key)); 878 } 879 880 throw e; 881 } catch(IndexOutOfBoundsException e) { 882 throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); 883 } 884 } 885 886 @Override 887 public synchronized long getPos() throws IOException { 888 return pos; 889 } 890 891 @Override 892 public boolean seekToNewSource(long targetPos) throws IOException { 893 return false; 894 } 895 896 897 /* 898 * Helper method to check if a stream is closed. 899 */ 900 private void checkNotClosed() throws IOException { 901 if (closed) { 902 throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); 903 } 904 } 905 } 906 907 private class NativeAzureFsOutputStream extends OutputStream { 908 // We should not override flush() to actually close current block and flush 909 // to DFS, this will break applications that assume flush() is a no-op. 910 // Applications are advised to use Syncable.hflush() for that purpose. 911 // NativeAzureFsOutputStream needs to implement Syncable if needed. 912 private String key; 913 private String keyEncoded; 914 private OutputStream out; 915 916 public NativeAzureFsOutputStream(OutputStream out, String aKey, 917 String anEncodedKey) throws IOException { 918 // Check input arguments. The output stream should be non-null and the 919 // keys 920 // should be valid strings. 921 if (null == out) { 922 throw new IllegalArgumentException( 923 "Illegal argument: the output stream is null."); 924 } 925 926 if (null == aKey || 0 == aKey.length()) { 927 throw new IllegalArgumentException( 928 "Illegal argument the key string is null or empty"); 929 } 930 931 if (null == anEncodedKey || 0 == anEncodedKey.length()) { 932 throw new IllegalArgumentException( 933 "Illegal argument the encoded key string is null or empty"); 934 } 935 936 // Initialize the member variables with the incoming parameters. 937 this.out = out; 938 939 setKey(aKey); 940 setEncodedKey(anEncodedKey); 941 } 942 943 @Override 944 public synchronized void close() throws IOException { 945 if (out != null) { 946 // Close the output stream and decode the key for the output stream 947 // before returning to the caller. 948 // 949 out.close(); 950 restoreKey(); 951 out = null; 952 } 953 } 954 955 /** 956 * Writes the specified byte to this output stream. The general contract for 957 * write is that one byte is written to the output stream. The byte to be 958 * written is the eight low-order bits of the argument b. The 24 high-order 959 * bits of b are ignored. 960 * 961 * @param b 962 * 32-bit integer of block of 4 bytes 963 */ 964 @Override 965 public void write(int b) throws IOException { 966 try { 967 out.write(b); 968 } catch(IOException e) { 969 if (e.getCause() instanceof StorageException) { 970 StorageException storageExcp = (StorageException) e.getCause(); 971 LOG.error("Encountered Storage Exception for write on Blob : {}" 972 + " Exception details: {} Error Code : {}", 973 key, e.getMessage(), storageExcp.getErrorCode()); 974 } 975 throw e; 976 } 977 } 978 979 /** 980 * Writes b.length bytes from the specified byte array to this output 981 * stream. The general contract for write(b) is that it should have exactly 982 * the same effect as the call write(b, 0, b.length). 983 * 984 * @param b 985 * Block of bytes to be written to the output stream. 986 */ 987 @Override 988 public void write(byte[] b) throws IOException { 989 try { 990 out.write(b); 991 } catch(IOException e) { 992 if (e.getCause() instanceof StorageException) { 993 StorageException storageExcp = (StorageException) e.getCause(); 994 LOG.error("Encountered Storage Exception for write on Blob : {}" 995 + " Exception details: {} Error Code : {}", 996 key, e.getMessage(), storageExcp.getErrorCode()); 997 } 998 throw e; 999 } 1000 } 1001 1002 /** 1003 * Writes <code>len</code> from the specified byte array starting at offset 1004 * <code>off</code> to the output stream. The general contract for write(b, 1005 * off, len) is that some of the bytes in the array <code> 1006 * b</code b> are written to the output stream in order; element 1007 * <code>b[off]</code> is the first byte written and 1008 * <code>b[off+len-1]</code> is the last byte written by this operation. 1009 * 1010 * @param b 1011 * Byte array to be written. 1012 * @param off 1013 * Write this offset in stream. 1014 * @param len 1015 * Number of bytes to be written. 1016 */ 1017 @Override 1018 public void write(byte[] b, int off, int len) throws IOException { 1019 try { 1020 out.write(b, off, len); 1021 } catch(IOException e) { 1022 if (e.getCause() instanceof StorageException) { 1023 StorageException storageExcp = (StorageException) e.getCause(); 1024 LOG.error("Encountered Storage Exception for write on Blob : {}" 1025 + " Exception details: {} Error Code : {}", 1026 key, e.getMessage(), storageExcp.getErrorCode()); 1027 } 1028 throw e; 1029 } 1030 } 1031 1032 /** 1033 * Get the blob name. 1034 * 1035 * @return String Blob name. 1036 */ 1037 public String getKey() { 1038 return key; 1039 } 1040 1041 /** 1042 * Set the blob name. 1043 * 1044 * @param key 1045 * Blob name. 1046 */ 1047 public void setKey(String key) { 1048 this.key = key; 1049 } 1050 1051 /** 1052 * Get the blob name. 1053 * 1054 * @return String Blob name. 1055 */ 1056 public String getEncodedKey() { 1057 return keyEncoded; 1058 } 1059 1060 /** 1061 * Set the blob name. 1062 * 1063 * @param anEncodedKey 1064 * Blob name. 1065 */ 1066 public void setEncodedKey(String anEncodedKey) { 1067 this.keyEncoded = anEncodedKey; 1068 } 1069 1070 /** 1071 * Restore the original key name from the m_key member variable. Note: The 1072 * output file stream is created with an encoded blob store key to guarantee 1073 * load balancing on the front end of the Azure storage partition servers. 1074 * The create also includes the name of the original key value which is 1075 * stored in the m_key member variable. This method should only be called 1076 * when the stream is closed. 1077 */ 1078 private void restoreKey() throws IOException { 1079 store.rename(getEncodedKey(), getKey()); 1080 } 1081 } 1082 1083 private URI uri; 1084 private NativeFileSystemStore store; 1085 private AzureNativeFileSystemStore actualStore; 1086 private Path workingDir; 1087 private long blockSize = MAX_AZURE_BLOCK_SIZE; 1088 private AzureFileSystemInstrumentation instrumentation; 1089 private String metricsSourceName; 1090 private boolean isClosed = false; 1091 private static boolean suppressRetryPolicy = false; 1092 // A counter to create unique (within-process) names for my metrics sources. 1093 private static AtomicInteger metricsSourceNameCounter = new AtomicInteger(); 1094 private boolean appendSupportEnabled = false; 1095 1096 public NativeAzureFileSystem() { 1097 // set store in initialize() 1098 } 1099 1100 public NativeAzureFileSystem(NativeFileSystemStore store) { 1101 this.store = store; 1102 } 1103 1104 /** 1105 * Suppress the default retry policy for the Storage, useful in unit tests to 1106 * test negative cases without waiting forever. 1107 */ 1108 @VisibleForTesting 1109 static void suppressRetryPolicy() { 1110 suppressRetryPolicy = true; 1111 } 1112 1113 /** 1114 * Undo the effect of suppressRetryPolicy. 1115 */ 1116 @VisibleForTesting 1117 static void resumeRetryPolicy() { 1118 suppressRetryPolicy = false; 1119 } 1120 1121 /** 1122 * Creates a new metrics source name that's unique within this process. 1123 */ 1124 @VisibleForTesting 1125 public static String newMetricsSourceName() { 1126 int number = metricsSourceNameCounter.incrementAndGet(); 1127 final String baseName = "AzureFileSystemMetrics"; 1128 if (number == 1) { // No need for a suffix for the first one 1129 return baseName; 1130 } else { 1131 return baseName + number; 1132 } 1133 } 1134 1135 /** 1136 * Checks if the given URI scheme is a scheme that's affiliated with the Azure 1137 * File System. 1138 * 1139 * @param scheme 1140 * The URI scheme. 1141 * @return true iff it's an Azure File System URI scheme. 1142 */ 1143 private static boolean isWasbScheme(String scheme) { 1144 // The valid schemes are: asv (old name), asvs (old name over HTTPS), 1145 // wasb (new name), wasbs (new name over HTTPS). 1146 return scheme != null 1147 && (scheme.equalsIgnoreCase("asv") || scheme.equalsIgnoreCase("asvs") 1148 || scheme.equalsIgnoreCase("wasb") || scheme 1149 .equalsIgnoreCase("wasbs")); 1150 } 1151 1152 /** 1153 * Puts in the authority of the default file system if it is a WASB file 1154 * system and the given URI's authority is null. 1155 * 1156 * @return The URI with reconstructed authority if necessary and possible. 1157 */ 1158 private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) { 1159 if (null == uri.getAuthority()) { 1160 // If WASB is the default file system, get the authority from there 1161 URI defaultUri = FileSystem.getDefaultUri(conf); 1162 if (defaultUri != null && isWasbScheme(defaultUri.getScheme())) { 1163 try { 1164 // Reconstruct the URI with the authority from the default URI. 1165 return new URI(uri.getScheme(), defaultUri.getAuthority(), 1166 uri.getPath(), uri.getQuery(), uri.getFragment()); 1167 } catch (URISyntaxException e) { 1168 // This should never happen. 1169 throw new Error("Bad URI construction", e); 1170 } 1171 } 1172 } 1173 return uri; 1174 } 1175 1176 @Override 1177 protected void checkPath(Path path) { 1178 // Make sure to reconstruct the path's authority if needed 1179 super.checkPath(new Path(reconstructAuthorityIfNeeded(path.toUri(), 1180 getConf()))); 1181 } 1182 1183 @Override 1184 public void initialize(URI uri, Configuration conf) 1185 throws IOException, IllegalArgumentException { 1186 // Check authority for the URI to guarantee that it is non-null. 1187 uri = reconstructAuthorityIfNeeded(uri, conf); 1188 if (null == uri.getAuthority()) { 1189 final String errMsg = String 1190 .format("Cannot initialize WASB file system, URI authority not recognized."); 1191 throw new IllegalArgumentException(errMsg); 1192 } 1193 super.initialize(uri, conf); 1194 1195 if (store == null) { 1196 store = createDefaultStore(conf); 1197 } 1198 1199 instrumentation = new AzureFileSystemInstrumentation(conf); 1200 if(!conf.getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { 1201 // Make sure the metrics system is available before interacting with Azure 1202 AzureFileSystemMetricsSystem.fileSystemStarted(); 1203 metricsSourceName = newMetricsSourceName(); 1204 String sourceDesc = "Azure Storage Volume File System metrics"; 1205 AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc, 1206 instrumentation); 1207 } 1208 1209 store.initialize(uri, conf, instrumentation); 1210 setConf(conf); 1211 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 1212 this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser() 1213 .getShortUserName()).makeQualified(getUri(), getWorkingDirectory()); 1214 this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, 1215 MAX_AZURE_BLOCK_SIZE); 1216 1217 this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false); 1218 LOG.debug("NativeAzureFileSystem. Initializing."); 1219 LOG.debug(" blockSize = {}", 1220 conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE)); 1221 1222 // Initialize thread counts from user configuration 1223 deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS); 1224 renameThreadCount = conf.getInt(AZURE_RENAME_THREADS, DEFAULT_AZURE_RENAME_THREADS); 1225 } 1226 1227 private NativeFileSystemStore createDefaultStore(Configuration conf) { 1228 actualStore = new AzureNativeFileSystemStore(); 1229 1230 if (suppressRetryPolicy) { 1231 actualStore.suppressRetryPolicy(); 1232 } 1233 return actualStore; 1234 } 1235 1236 /** 1237 * Azure Storage doesn't allow the blob names to end in a period, 1238 * so encode this here to work around that limitation. 1239 */ 1240 private static String encodeTrailingPeriod(String toEncode) { 1241 Matcher matcher = TRAILING_PERIOD_PATTERN.matcher(toEncode); 1242 return matcher.replaceAll(TRAILING_PERIOD_PLACEHOLDER); 1243 } 1244 1245 /** 1246 * Reverse the encoding done by encodeTrailingPeriod(). 1247 */ 1248 private static String decodeTrailingPeriod(String toDecode) { 1249 Matcher matcher = TRAILING_PERIOD_PLACEHOLDER_PATTERN.matcher(toDecode); 1250 return matcher.replaceAll("."); 1251 } 1252 1253 /** 1254 * Convert the path to a key. By convention, any leading or trailing slash is 1255 * removed, except for the special case of a single slash. 1256 */ 1257 @VisibleForTesting 1258 public String pathToKey(Path path) { 1259 // Convert the path to a URI to parse the scheme, the authority, and the 1260 // path from the path object. 1261 URI tmpUri = path.toUri(); 1262 String pathUri = tmpUri.getPath(); 1263 1264 // The scheme and authority is valid. If the path does not exist add a "/" 1265 // separator to list the root of the container. 1266 Path newPath = path; 1267 if ("".equals(pathUri)) { 1268 newPath = new Path(tmpUri.toString() + Path.SEPARATOR); 1269 } 1270 1271 // Verify path is absolute if the path refers to a windows drive scheme. 1272 if (!newPath.isAbsolute()) { 1273 throw new IllegalArgumentException("Path must be absolute: " + path); 1274 } 1275 1276 String key = null; 1277 key = newPath.toUri().getPath(); 1278 key = removeTrailingSlash(key); 1279 key = encodeTrailingPeriod(key); 1280 if (key.length() == 1) { 1281 return key; 1282 } else { 1283 return key.substring(1); // remove initial slash 1284 } 1285 } 1286 1287 // Remove any trailing slash except for the case of a single slash. 1288 private static String removeTrailingSlash(String key) { 1289 if (key.length() == 0 || key.length() == 1) { 1290 return key; 1291 } 1292 if (key.charAt(key.length() - 1) == '/') { 1293 return key.substring(0, key.length() - 1); 1294 } else { 1295 return key; 1296 } 1297 } 1298 1299 private static Path keyToPath(String key) { 1300 if (key.equals("/")) { 1301 return new Path("/"); // container 1302 } 1303 return new Path("/" + decodeTrailingPeriod(key)); 1304 } 1305 1306 /** 1307 * Get the absolute version of the path (fully qualified). 1308 * This is public for testing purposes. 1309 * 1310 * @param path 1311 * @return fully qualified path 1312 */ 1313 @VisibleForTesting 1314 public Path makeAbsolute(Path path) { 1315 if (path.isAbsolute()) { 1316 return path; 1317 } 1318 return new Path(workingDir, path); 1319 } 1320 1321 /** 1322 * For unit test purposes, retrieves the AzureNativeFileSystemStore store 1323 * backing this file system. 1324 * 1325 * @return The store object. 1326 */ 1327 @VisibleForTesting 1328 public AzureNativeFileSystemStore getStore() { 1329 return actualStore; 1330 } 1331 1332 NativeFileSystemStore getStoreInterface() { 1333 return store; 1334 } 1335 1336 /** 1337 * Gets the metrics source for this file system. 1338 * This is mainly here for unit testing purposes. 1339 * 1340 * @return the metrics source. 1341 */ 1342 public AzureFileSystemInstrumentation getInstrumentation() { 1343 return instrumentation; 1344 } 1345 1346 /** This optional operation is not yet supported. */ 1347 @Override 1348 public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) 1349 throws IOException { 1350 1351 if (!appendSupportEnabled) { 1352 throw new UnsupportedOperationException("Append Support not enabled"); 1353 } 1354 1355 LOG.debug("Opening file: {} for append", f); 1356 1357 Path absolutePath = makeAbsolute(f); 1358 String key = pathToKey(absolutePath); 1359 FileMetadata meta = null; 1360 try { 1361 meta = store.retrieveMetadata(key); 1362 } catch(Exception ex) { 1363 1364 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 1365 1366 if (innerException instanceof StorageException 1367 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1368 1369 throw new FileNotFoundException(String.format("%s is not found", key)); 1370 } else { 1371 throw ex; 1372 } 1373 } 1374 1375 if (meta == null) { 1376 throw new FileNotFoundException(f.toString()); 1377 } 1378 1379 if (meta.isDir()) { 1380 throw new FileNotFoundException(f.toString() 1381 + " is a directory not a file."); 1382 } 1383 1384 if (store.isPageBlobKey(key)) { 1385 throw new IOException("Append not supported for Page Blobs"); 1386 } 1387 1388 DataOutputStream appendStream = null; 1389 1390 try { 1391 appendStream = store.retrieveAppendStream(key, bufferSize); 1392 } catch (Exception ex) { 1393 1394 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 1395 1396 if (innerException instanceof StorageException 1397 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1398 throw new FileNotFoundException(String.format("%s is not found", key)); 1399 } else { 1400 throw ex; 1401 } 1402 } 1403 1404 return new FSDataOutputStream(appendStream, statistics); 1405 } 1406 1407 @Override 1408 public FSDataOutputStream create(Path f, FsPermission permission, 1409 boolean overwrite, int bufferSize, short replication, long blockSize, 1410 Progressable progress) throws IOException { 1411 return create(f, permission, overwrite, true, 1412 bufferSize, replication, blockSize, progress, 1413 (SelfRenewingLease) null); 1414 } 1415 1416 /** 1417 * Get a self-renewing lease on the specified file. 1418 */ 1419 public SelfRenewingLease acquireLease(Path path) throws AzureException { 1420 String fullKey = pathToKey(makeAbsolute(path)); 1421 return getStore().acquireLease(fullKey); 1422 } 1423 1424 @Override 1425 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 1426 boolean overwrite, int bufferSize, short replication, long blockSize, 1427 Progressable progress) throws IOException { 1428 1429 Path parent = f.getParent(); 1430 1431 // Get exclusive access to folder if this is a directory designated 1432 // for atomic rename. The primary use case of for HBase write-ahead 1433 // log file management. 1434 SelfRenewingLease lease = null; 1435 if (store.isAtomicRenameKey(pathToKey(f))) { 1436 try { 1437 lease = acquireLease(parent); 1438 } catch (AzureException e) { 1439 1440 String errorCode = ""; 1441 try { 1442 StorageException e2 = (StorageException) e.getCause(); 1443 errorCode = e2.getErrorCode(); 1444 } catch (Exception e3) { 1445 // do nothing if cast fails 1446 } 1447 if (errorCode.equals("BlobNotFound")) { 1448 throw new FileNotFoundException("Cannot create file " + 1449 f.getName() + " because parent folder does not exist."); 1450 } 1451 1452 LOG.warn("Got unexpected exception trying to get lease on {} . {}", 1453 pathToKey(parent), e.getMessage()); 1454 throw e; 1455 } 1456 } 1457 1458 // See if the parent folder exists. If not, throw error. 1459 // The exists() check will push any pending rename operation forward, 1460 // if there is one, and return false. 1461 // 1462 // At this point, we have exclusive access to the source folder 1463 // via the lease, so we will not conflict with an active folder 1464 // rename operation. 1465 if (!exists(parent)) { 1466 try { 1467 1468 // This'll let the keep-alive thread exit as soon as it wakes up. 1469 lease.free(); 1470 } catch (Exception e) { 1471 LOG.warn("Unable to free lease because: {}", e.getMessage()); 1472 } 1473 throw new FileNotFoundException("Cannot create file " + 1474 f.getName() + " because parent folder does not exist."); 1475 } 1476 1477 // Create file inside folder. 1478 FSDataOutputStream out = null; 1479 try { 1480 out = create(f, permission, overwrite, false, 1481 bufferSize, replication, blockSize, progress, lease); 1482 } finally { 1483 // Release exclusive access to folder. 1484 try { 1485 if (lease != null) { 1486 lease.free(); 1487 } 1488 } catch (Exception e) { 1489 NativeAzureFileSystemHelper.cleanup(LOG, out); 1490 String msg = "Unable to free lease on " + parent.toUri(); 1491 LOG.error(msg); 1492 throw new IOException(msg, e); 1493 } 1494 } 1495 return out; 1496 } 1497 1498 @Override 1499 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 1500 EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize, 1501 Progressable progress) throws IOException { 1502 1503 // Check if file should be appended or overwritten. Assume that the file 1504 // is overwritten on if the CREATE and OVERWRITE create flags are set. Note 1505 // that any other combinations of create flags will result in an open new or 1506 // open with append. 1507 final EnumSet<CreateFlag> createflags = 1508 EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); 1509 boolean overwrite = flags.containsAll(createflags); 1510 1511 // Delegate the create non-recursive call. 1512 return this.createNonRecursive(f, permission, overwrite, 1513 bufferSize, replication, blockSize, progress); 1514 } 1515 1516 @Override 1517 public FSDataOutputStream createNonRecursive(Path f, 1518 boolean overwrite, int bufferSize, short replication, long blockSize, 1519 Progressable progress) throws IOException { 1520 return this.createNonRecursive(f, FsPermission.getFileDefault(), 1521 overwrite, bufferSize, replication, blockSize, progress); 1522 } 1523 1524 1525 /** 1526 * Create an Azure blob and return an output stream to use 1527 * to write data to it. 1528 * 1529 * @param f 1530 * @param permission 1531 * @param overwrite 1532 * @param createParent 1533 * @param bufferSize 1534 * @param replication 1535 * @param blockSize 1536 * @param progress 1537 * @param parentFolderLease Lease on parent folder (or null if 1538 * no lease). 1539 * @return 1540 * @throws IOException 1541 */ 1542 private FSDataOutputStream create(Path f, FsPermission permission, 1543 boolean overwrite, boolean createParent, int bufferSize, 1544 short replication, long blockSize, Progressable progress, 1545 SelfRenewingLease parentFolderLease) 1546 throws FileAlreadyExistsException, IOException { 1547 1548 LOG.debug("Creating file: {}", f.toString()); 1549 1550 if (containsColon(f)) { 1551 throw new IOException("Cannot create file " + f 1552 + " through WASB that has colons in the name"); 1553 } 1554 1555 Path absolutePath = makeAbsolute(f); 1556 String key = pathToKey(absolutePath); 1557 1558 FileMetadata existingMetadata = store.retrieveMetadata(key); 1559 if (existingMetadata != null) { 1560 if (existingMetadata.isDir()) { 1561 throw new FileAlreadyExistsException("Cannot create file " + f 1562 + "; already exists as a directory."); 1563 } 1564 if (!overwrite) { 1565 throw new FileAlreadyExistsException("File already exists:" + f); 1566 } 1567 } 1568 1569 Path parentFolder = absolutePath.getParent(); 1570 if (parentFolder != null && parentFolder.getParent() != null) { // skip root 1571 // Update the parent folder last modified time if the parent folder 1572 // already exists. 1573 String parentKey = pathToKey(parentFolder); 1574 FileMetadata parentMetadata = store.retrieveMetadata(parentKey); 1575 if (parentMetadata != null && parentMetadata.isDir() && 1576 parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) { 1577 if (parentFolderLease != null) { 1578 store.updateFolderLastModifiedTime(parentKey, parentFolderLease); 1579 } else { 1580 updateParentFolderLastModifiedTime(key); 1581 } 1582 } else { 1583 // Make sure that the parent folder exists. 1584 // Create it using inherited permissions from the first existing directory going up the path 1585 Path firstExisting = parentFolder.getParent(); 1586 FileMetadata metadata = store.retrieveMetadata(pathToKey(firstExisting)); 1587 while(metadata == null) { 1588 // Guaranteed to terminate properly because we will eventually hit root, which will return non-null metadata 1589 firstExisting = firstExisting.getParent(); 1590 metadata = store.retrieveMetadata(pathToKey(firstExisting)); 1591 } 1592 mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true); 1593 } 1594 } 1595 1596 // Mask the permission first (with the default permission mask as well). 1597 FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile); 1598 PermissionStatus permissionStatus = createPermissionStatus(masked); 1599 1600 OutputStream bufOutStream; 1601 if (store.isPageBlobKey(key)) { 1602 // Store page blobs directly in-place without renames. 1603 bufOutStream = store.storefile(key, permissionStatus); 1604 } else { 1605 // This is a block blob, so open the output blob stream based on the 1606 // encoded key. 1607 // 1608 String keyEncoded = encodeKey(key); 1609 1610 1611 // First create a blob at the real key, pointing back to the temporary file 1612 // This accomplishes a few things: 1613 // 1. Makes sure we can create a file there. 1614 // 2. Makes it visible to other concurrent threads/processes/nodes what 1615 // we're 1616 // doing. 1617 // 3. Makes it easier to restore/cleanup data in the event of us crashing. 1618 store.storeEmptyLinkFile(key, keyEncoded, permissionStatus); 1619 1620 // The key is encoded to point to a common container at the storage server. 1621 // This reduces the number of splits on the server side when load balancing. 1622 // Ingress to Azure storage can take advantage of earlier splits. We remove 1623 // the root path to the key and prefix a random GUID to the tail (or leaf 1624 // filename) of the key. Keys are thus broadly and randomly distributed over 1625 // a single container to ease load balancing on the storage server. When the 1626 // blob is committed it is renamed to its earlier key. Uncommitted blocks 1627 // are not cleaned up and we leave it to Azure storage to garbage collect 1628 // these 1629 // blocks. 1630 bufOutStream = new NativeAzureFsOutputStream(store.storefile( 1631 keyEncoded, permissionStatus), key, keyEncoded); 1632 } 1633 // Construct the data output stream from the buffered output stream. 1634 FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics); 1635 1636 1637 // Increment the counter 1638 instrumentation.fileCreated(); 1639 1640 // Return data output stream to caller. 1641 return fsOut; 1642 } 1643 1644 @Override 1645 @Deprecated 1646 public boolean delete(Path path) throws IOException { 1647 return delete(path, true); 1648 } 1649 1650 @Override 1651 public boolean delete(Path f, boolean recursive) throws IOException { 1652 return delete(f, recursive, false); 1653 } 1654 1655 /** 1656 * Delete the specified file or folder. The parameter 1657 * skipParentFolderLastModifidedTimeUpdate 1658 * is used in the case of atomic folder rename redo. In that case, there is 1659 * a lease on the parent folder, so (without reworking the code) modifying 1660 * the parent folder update time will fail because of a conflict with the 1661 * lease. Since we are going to delete the folder soon anyway so accurate 1662 * modified time is not necessary, it's easier to just skip 1663 * the modified time update. 1664 * 1665 * @param f 1666 * @param recursive 1667 * @param skipParentFolderLastModifidedTimeUpdate If true, don't update the folder last 1668 * modified time. 1669 * @return true if and only if the file is deleted 1670 * @throws IOException 1671 */ 1672 public boolean delete(Path f, boolean recursive, 1673 boolean skipParentFolderLastModifidedTimeUpdate) throws IOException { 1674 1675 LOG.debug("Deleting file: {}", f.toString()); 1676 1677 Path absolutePath = makeAbsolute(f); 1678 String key = pathToKey(absolutePath); 1679 1680 // Capture the metadata for the path. 1681 // 1682 FileMetadata metaFile = null; 1683 try { 1684 metaFile = store.retrieveMetadata(key); 1685 } catch (IOException e) { 1686 1687 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1688 1689 if (innerException instanceof StorageException 1690 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1691 1692 return false; 1693 } 1694 throw e; 1695 } 1696 1697 if (null == metaFile) { 1698 // The path to be deleted does not exist. 1699 return false; 1700 } 1701 1702 // The path exists, determine if it is a folder containing objects, 1703 // an empty folder, or a simple file and take the appropriate actions. 1704 if (!metaFile.isDir()) { 1705 // The path specifies a file. We need to check the parent path 1706 // to make sure it's a proper materialized directory before we 1707 // delete the file. Otherwise we may get into a situation where 1708 // the file we were deleting was the last one in an implicit directory 1709 // (e.g. the blob store only contains the blob a/b and there's no 1710 // corresponding directory blob a) and that would implicitly delete 1711 // the directory as well, which is not correct. 1712 Path parentPath = absolutePath.getParent(); 1713 if (parentPath.getParent() != null) {// Not root 1714 String parentKey = pathToKey(parentPath); 1715 1716 FileMetadata parentMetadata = null; 1717 try { 1718 parentMetadata = store.retrieveMetadata(parentKey); 1719 } catch (IOException e) { 1720 1721 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1722 1723 if (innerException instanceof StorageException) { 1724 // Invalid State. 1725 // A FileNotFoundException is not thrown here as the API returns false 1726 // if the file not present. But not retrieving metadata here is an 1727 // unrecoverable state and can only happen if there is a race condition 1728 // hence throwing a IOException 1729 if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1730 throw new IOException("File " + f + " has a parent directory " 1731 + parentPath + " whose metadata cannot be retrieved. Can't resolve"); 1732 } 1733 } 1734 throw e; 1735 } 1736 1737 // Invalid State. 1738 // A FileNotFoundException is not thrown here as the API returns false 1739 // if the file not present. But not retrieving metadata here is an 1740 // unrecoverable state and can only happen if there is a race condition 1741 // hence throwing a IOException 1742 if (parentMetadata == null) { 1743 throw new IOException("File " + f + " has a parent directory " 1744 + parentPath + " whose metadata cannot be retrieved. Can't resolve"); 1745 } 1746 1747 if (!parentMetadata.isDir()) { 1748 // Invalid state: the parent path is actually a file. Throw. 1749 throw new AzureException("File " + f + " has a parent directory " 1750 + parentPath + " which is also a file. Can't resolve."); 1751 } 1752 1753 if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 1754 LOG.debug("Found an implicit parent directory while trying to" 1755 + " delete the file {}. Creating the directory blob for" 1756 + " it in {}.", f, parentKey); 1757 1758 store.storeEmptyFolder(parentKey, 1759 createPermissionStatus(FsPermission.getDefault())); 1760 } else { 1761 if (!skipParentFolderLastModifidedTimeUpdate) { 1762 updateParentFolderLastModifiedTime(key); 1763 } 1764 } 1765 } 1766 1767 try { 1768 store.delete(key); 1769 instrumentation.fileDeleted(); 1770 } catch(IOException e) { 1771 1772 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1773 1774 if (innerException instanceof StorageException 1775 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1776 return false; 1777 } 1778 1779 throw e; 1780 } 1781 } else { 1782 // The path specifies a folder. Recursively delete all entries under the 1783 // folder. 1784 LOG.debug("Directory Delete encountered: {}", f.toString()); 1785 Path parentPath = absolutePath.getParent(); 1786 if (parentPath.getParent() != null) { 1787 String parentKey = pathToKey(parentPath); 1788 FileMetadata parentMetadata = null; 1789 1790 try { 1791 parentMetadata = store.retrieveMetadata(parentKey); 1792 } catch (IOException e) { 1793 1794 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1795 1796 if (innerException instanceof StorageException) { 1797 // Invalid State. 1798 // A FileNotFoundException is not thrown here as the API returns false 1799 // if the file not present. But not retrieving metadata here is an 1800 // unrecoverable state and can only happen if there is a race condition 1801 // hence throwing a IOException 1802 if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1803 throw new IOException("File " + f + " has a parent directory " 1804 + parentPath + " whose metadata cannot be retrieved. Can't resolve"); 1805 } 1806 } 1807 throw e; 1808 } 1809 1810 // Invalid State. 1811 // A FileNotFoundException is not thrown here as the API returns false 1812 // if the file not present. But not retrieving metadata here is an 1813 // unrecoverable state and can only happen if there is a race condition 1814 // hence throwing a IOException 1815 if (parentMetadata == null) { 1816 throw new IOException("File " + f + " has a parent directory " 1817 + parentPath + " whose metadata cannot be retrieved. Can't resolve"); 1818 } 1819 1820 if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 1821 LOG.debug("Found an implicit parent directory while trying to" 1822 + " delete the directory {}. Creating the directory blob for" 1823 + " it in {}. ", f, parentKey); 1824 1825 store.storeEmptyFolder(parentKey, 1826 createPermissionStatus(FsPermission.getDefault())); 1827 } 1828 } 1829 1830 // List all the blobs in the current folder. 1831 String priorLastKey = null; 1832 1833 // Start time for list operation 1834 long start = Time.monotonicNow(); 1835 ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>(); 1836 1837 // List all the files in the folder with AZURE_UNBOUNDED_DEPTH depth. 1838 do { 1839 try { 1840 PartialListing listing = store.listAll(key, AZURE_LIST_ALL, 1841 AZURE_UNBOUNDED_DEPTH, priorLastKey); 1842 for(FileMetadata file : listing.getFiles()) { 1843 fileMetadataList.add(file); 1844 } 1845 priorLastKey = listing.getPriorLastKey(); 1846 } catch (IOException e) { 1847 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1848 1849 if (innerException instanceof StorageException 1850 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1851 return false; 1852 } 1853 1854 throw e; 1855 } 1856 } while (priorLastKey != null); 1857 1858 long end = Time.monotonicNow(); 1859 LOG.debug("Time taken to list {} blobs for delete operation: {} ms", fileMetadataList.size(), (end - start)); 1860 1861 final FileMetadata[] contents = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]); 1862 1863 if (!recursive && contents.length > 0) { 1864 // The folder is non-empty and recursive delete was not specified. 1865 // Throw an exception indicating that a non-recursive delete was 1866 // specified for a non-empty folder. 1867 throw new IOException("Non-recursive delete of non-empty directory " 1868 + f.toString()); 1869 } 1870 1871 // Delete all files / folders in current directory stored as list in 'contents'. 1872 AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() { 1873 @Override 1874 public boolean execute(FileMetadata file) throws IOException{ 1875 return deleteFile(file.getKey(), file.isDir()); 1876 } 1877 }; 1878 1879 AzureFileSystemThreadPoolExecutor executor = getThreadPoolExecutor(this.deleteThreadCount, 1880 "AzureBlobDeleteThread", "Delete", key, AZURE_DELETE_THREADS); 1881 1882 if (!executor.executeParallel(contents, task)) { 1883 LOG.error("Failed to delete files / subfolders in blob {}", key); 1884 return false; 1885 } 1886 1887 // Delete the current directory 1888 if (!deleteFile(metaFile.getKey(), metaFile.isDir())) { 1889 LOG.error("Failed delete directory {}", f.toString()); 1890 return false; 1891 } 1892 1893 // Update parent directory last modified time 1894 Path parent = absolutePath.getParent(); 1895 if (parent != null && parent.getParent() != null) { // not root 1896 if (!skipParentFolderLastModifidedTimeUpdate) { 1897 updateParentFolderLastModifiedTime(key); 1898 } 1899 } 1900 } 1901 1902 // File or directory was successfully deleted. 1903 LOG.debug("Delete Successful for : {}", f.toString()); 1904 return true; 1905 } 1906 1907 public AzureFileSystemThreadPoolExecutor getThreadPoolExecutor(int threadCount, 1908 String threadNamePrefix, String operation, String key, String config) { 1909 return new AzureFileSystemThreadPoolExecutor(threadCount, threadNamePrefix, operation, key, config); 1910 } 1911 1912 // Delete single file / directory from key. 1913 @VisibleForTesting 1914 boolean deleteFile(String key, boolean isDir) throws IOException { 1915 try { 1916 store.delete(key); 1917 if (isDir) { 1918 instrumentation.directoryDeleted(); 1919 } else { 1920 instrumentation.fileDeleted(); 1921 } 1922 } catch(IOException e) { 1923 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1924 1925 if (innerException instanceof StorageException 1926 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1927 return false; 1928 } 1929 1930 throw e; 1931 } 1932 1933 return true; 1934 } 1935 1936 @Override 1937 public FileStatus getFileStatus(Path f) throws FileNotFoundException, IOException { 1938 1939 LOG.debug("Getting the file status for {}", f.toString()); 1940 1941 // Capture the absolute path and the path to key. 1942 Path absolutePath = makeAbsolute(f); 1943 String key = pathToKey(absolutePath); 1944 if (key.length() == 0) { // root always exists 1945 return newDirectory(null, absolutePath); 1946 } 1947 1948 // The path is either a folder or a file. Retrieve metadata to 1949 // determine if it is a directory or file. 1950 FileMetadata meta = null; 1951 try { 1952 meta = store.retrieveMetadata(key); 1953 } catch(Exception ex) { 1954 1955 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 1956 1957 if (innerException instanceof StorageException 1958 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1959 1960 throw new FileNotFoundException(String.format("%s is not found", key)); 1961 } 1962 1963 throw ex; 1964 } 1965 1966 if (meta != null) { 1967 if (meta.isDir()) { 1968 // The path is a folder with files in it. 1969 // 1970 1971 LOG.debug("Path {} is a folder.", f.toString()); 1972 1973 // If a rename operation for the folder was pending, redo it. 1974 // Then the file does not exist, so signal that. 1975 if (conditionalRedoFolderRename(f)) { 1976 throw new FileNotFoundException( 1977 absolutePath + ": No such file or directory."); 1978 } 1979 1980 // Return reference to the directory object. 1981 return newDirectory(meta, absolutePath); 1982 } 1983 1984 // The path is a file. 1985 LOG.debug("Found the path: {} as a file.", f.toString()); 1986 1987 // Return with reference to a file object. 1988 return newFile(meta, absolutePath); 1989 } 1990 1991 // File not found. Throw exception no such file or directory. 1992 // 1993 throw new FileNotFoundException( 1994 absolutePath + ": No such file or directory."); 1995 } 1996 1997 // Return true if there is a rename pending and we redo it, otherwise false. 1998 private boolean conditionalRedoFolderRename(Path f) throws IOException { 1999 2000 // Can't rename /, so return immediately in that case. 2001 if (f.getName().equals("")) { 2002 return false; 2003 } 2004 2005 // Check if there is a -RenamePending.json file for this folder, and if so, 2006 // redo the rename. 2007 Path absoluteRenamePendingFile = renamePendingFilePath(f); 2008 if (exists(absoluteRenamePendingFile)) { 2009 FolderRenamePending pending = 2010 new FolderRenamePending(absoluteRenamePendingFile, this); 2011 pending.redo(); 2012 return true; 2013 } else { 2014 return false; 2015 } 2016 } 2017 2018 // Return the path name that would be used for rename of folder with path f. 2019 private Path renamePendingFilePath(Path f) { 2020 Path absPath = makeAbsolute(f); 2021 String key = pathToKey(absPath); 2022 key += "-RenamePending.json"; 2023 return keyToPath(key); 2024 } 2025 2026 @Override 2027 public URI getUri() { 2028 return uri; 2029 } 2030 2031 /** 2032 * Retrieve the status of a given path if it is a file, or of all the 2033 * contained files if it is a directory. 2034 */ 2035 @Override 2036 public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { 2037 2038 LOG.debug("Listing status for {}", f.toString()); 2039 2040 Path absolutePath = makeAbsolute(f); 2041 String key = pathToKey(absolutePath); 2042 Set<FileStatus> status = new TreeSet<FileStatus>(); 2043 FileMetadata meta = null; 2044 try { 2045 meta = store.retrieveMetadata(key); 2046 } catch (IOException ex) { 2047 2048 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2049 2050 if (innerException instanceof StorageException 2051 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2052 2053 throw new FileNotFoundException(String.format("%s is not found", f)); 2054 } 2055 2056 throw ex; 2057 } 2058 2059 if (meta != null) { 2060 if (!meta.isDir()) { 2061 2062 LOG.debug("Found path as a file"); 2063 2064 return new FileStatus[] { newFile(meta, absolutePath) }; 2065 } 2066 2067 String partialKey = null; 2068 PartialListing listing = null; 2069 2070 try { 2071 listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); 2072 } catch (IOException ex) { 2073 2074 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2075 2076 if (innerException instanceof StorageException 2077 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2078 2079 throw new FileNotFoundException(String.format("%s is not found", key)); 2080 } 2081 2082 throw ex; 2083 } 2084 // NOTE: We don't check for Null condition as the Store API should return 2085 // an empty list if there are not listing. 2086 2087 // For any -RenamePending.json files in the listing, 2088 // push the rename forward. 2089 boolean renamed = conditionalRedoFolderRenames(listing); 2090 2091 // If any renames were redone, get another listing, 2092 // since the current one may have changed due to the redo. 2093 if (renamed) { 2094 listing = null; 2095 try { 2096 listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); 2097 } catch (IOException ex) { 2098 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2099 2100 if (innerException instanceof StorageException 2101 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2102 2103 throw new FileNotFoundException(String.format("%s is not found", key)); 2104 } 2105 2106 throw ex; 2107 } 2108 } 2109 2110 // NOTE: We don't check for Null condition as the Store API should return 2111 // and empty list if there are not listing. 2112 2113 for (FileMetadata fileMetadata : listing.getFiles()) { 2114 Path subpath = keyToPath(fileMetadata.getKey()); 2115 2116 // Test whether the metadata represents a file or directory and 2117 // add the appropriate metadata object. 2118 // 2119 // Note: There was a very old bug here where directories were added 2120 // to the status set as files flattening out recursive listings 2121 // using "-lsr" down the file system hierarchy. 2122 if (fileMetadata.isDir()) { 2123 // Make sure we hide the temp upload folder 2124 if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) { 2125 // Don't expose that. 2126 continue; 2127 } 2128 status.add(newDirectory(fileMetadata, subpath)); 2129 } else { 2130 status.add(newFile(fileMetadata, subpath)); 2131 } 2132 } 2133 2134 LOG.debug("Found path as a directory with {}" 2135 + " files in it.", status.size()); 2136 2137 } else { 2138 // There is no metadata found for the path. 2139 LOG.debug("Did not find any metadata for path: {}", key); 2140 2141 throw new FileNotFoundException("File" + f + " does not exist."); 2142 } 2143 2144 return status.toArray(new FileStatus[0]); 2145 } 2146 2147 // Redo any folder renames needed if there are rename pending files in the 2148 // directory listing. Return true if one or more redo operations were done. 2149 private boolean conditionalRedoFolderRenames(PartialListing listing) 2150 throws IllegalArgumentException, IOException { 2151 boolean renamed = false; 2152 for (FileMetadata fileMetadata : listing.getFiles()) { 2153 Path subpath = keyToPath(fileMetadata.getKey()); 2154 if (isRenamePendingFile(subpath)) { 2155 FolderRenamePending pending = 2156 new FolderRenamePending(subpath, this); 2157 pending.redo(); 2158 renamed = true; 2159 } 2160 } 2161 return renamed; 2162 } 2163 2164 // True if this is a folder rename pending file, else false. 2165 private boolean isRenamePendingFile(Path path) { 2166 return path.toString().endsWith(FolderRenamePending.SUFFIX); 2167 } 2168 2169 private FileStatus newFile(FileMetadata meta, Path path) { 2170 return new FileStatus ( 2171 meta.getLength(), 2172 false, 2173 1, 2174 blockSize, 2175 meta.getLastModified(), 2176 0, 2177 meta.getPermissionStatus().getPermission(), 2178 meta.getPermissionStatus().getUserName(), 2179 meta.getPermissionStatus().getGroupName(), 2180 path.makeQualified(getUri(), getWorkingDirectory())); 2181 } 2182 2183 private FileStatus newDirectory(FileMetadata meta, Path path) { 2184 return new FileStatus ( 2185 0, 2186 true, 2187 1, 2188 blockSize, 2189 meta == null ? 0 : meta.getLastModified(), 2190 0, 2191 meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(), 2192 meta == null ? "" : meta.getPermissionStatus().getUserName(), 2193 meta == null ? "" : meta.getPermissionStatus().getGroupName(), 2194 path.makeQualified(getUri(), getWorkingDirectory())); 2195 } 2196 2197 private static enum UMaskApplyMode { 2198 NewFile, 2199 NewDirectory, 2200 NewDirectoryNoUmask, 2201 ChangeExistingFile, 2202 ChangeExistingDirectory, 2203 } 2204 2205 /** 2206 * Applies the applicable UMASK's on the given permission. 2207 * 2208 * @param permission 2209 * The permission to mask. 2210 * @param applyMode 2211 * Whether to also apply the default umask. 2212 * @return The masked persmission. 2213 */ 2214 private FsPermission applyUMask(final FsPermission permission, 2215 final UMaskApplyMode applyMode) { 2216 FsPermission newPermission = new FsPermission(permission); 2217 // Apply the default umask - this applies for new files or directories. 2218 if (applyMode == UMaskApplyMode.NewFile 2219 || applyMode == UMaskApplyMode.NewDirectory) { 2220 newPermission = newPermission 2221 .applyUMask(FsPermission.getUMask(getConf())); 2222 } 2223 return newPermission; 2224 } 2225 2226 /** 2227 * Creates the PermissionStatus object to use for the given permission, based 2228 * on the current user in context. 2229 * 2230 * @param permission 2231 * The permission for the file. 2232 * @return The permission status object to use. 2233 * @throws IOException 2234 * If login fails in getCurrentUser 2235 */ 2236 @VisibleForTesting 2237 PermissionStatus createPermissionStatus(FsPermission permission) 2238 throws IOException { 2239 // Create the permission status for this file based on current user 2240 return new PermissionStatus( 2241 UserGroupInformation.getCurrentUser().getShortUserName(), 2242 getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME, 2243 AZURE_DEFAULT_GROUP_DEFAULT), 2244 permission); 2245 } 2246 2247 @Override 2248 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 2249 return mkdirs(f, permission, false); 2250 } 2251 2252 public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException { 2253 2254 2255 LOG.debug("Creating directory: {}", f.toString()); 2256 2257 if (containsColon(f)) { 2258 throw new IOException("Cannot create directory " + f 2259 + " through WASB that has colons in the name"); 2260 } 2261 2262 Path absolutePath = makeAbsolute(f); 2263 PermissionStatus permissionStatus = null; 2264 if(noUmask) { 2265 // ensure owner still has wx permissions at the minimum 2266 permissionStatus = createPermissionStatus( 2267 applyUMask(FsPermission.createImmutable((short) (permission.toShort() | USER_WX_PERMISION)), 2268 UMaskApplyMode.NewDirectoryNoUmask)); 2269 } else { 2270 permissionStatus = createPermissionStatus( 2271 applyUMask(permission, UMaskApplyMode.NewDirectory)); 2272 } 2273 2274 2275 ArrayList<String> keysToCreateAsFolder = new ArrayList<String>(); 2276 ArrayList<String> keysToUpdateAsFolder = new ArrayList<String>(); 2277 boolean childCreated = false; 2278 // Check that there is no file in the parent chain of the given path. 2279 for (Path current = absolutePath, parent = current.getParent(); 2280 parent != null; // Stop when you get to the root 2281 current = parent, parent = current.getParent()) { 2282 String currentKey = pathToKey(current); 2283 FileMetadata currentMetadata = store.retrieveMetadata(currentKey); 2284 if (currentMetadata != null && !currentMetadata.isDir()) { 2285 throw new FileAlreadyExistsException("Cannot create directory " + f + " because " 2286 + current + " is an existing file."); 2287 } else if (currentMetadata == null) { 2288 keysToCreateAsFolder.add(currentKey); 2289 childCreated = true; 2290 } else { 2291 // The directory already exists. Its last modified time need to be 2292 // updated if there is a child directory created under it. 2293 if (childCreated) { 2294 keysToUpdateAsFolder.add(currentKey); 2295 } 2296 childCreated = false; 2297 } 2298 } 2299 2300 for (String currentKey : keysToCreateAsFolder) { 2301 store.storeEmptyFolder(currentKey, permissionStatus); 2302 } 2303 2304 instrumentation.directoryCreated(); 2305 2306 // otherwise throws exception 2307 return true; 2308 } 2309 2310 @Override 2311 public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException { 2312 2313 LOG.debug("Opening file: {}", f.toString()); 2314 2315 Path absolutePath = makeAbsolute(f); 2316 String key = pathToKey(absolutePath); 2317 FileMetadata meta = null; 2318 try { 2319 meta = store.retrieveMetadata(key); 2320 } catch(Exception ex) { 2321 2322 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2323 2324 if (innerException instanceof StorageException 2325 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2326 2327 throw new FileNotFoundException(String.format("%s is not found", key)); 2328 } 2329 2330 throw ex; 2331 } 2332 2333 if (meta == null) { 2334 throw new FileNotFoundException(f.toString()); 2335 } 2336 if (meta.isDir()) { 2337 throw new FileNotFoundException(f.toString() 2338 + " is a directory not a file."); 2339 } 2340 2341 DataInputStream inputStream = null; 2342 try { 2343 inputStream = store.retrieve(key); 2344 } catch(Exception ex) { 2345 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2346 2347 if (innerException instanceof StorageException 2348 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2349 2350 throw new FileNotFoundException(String.format("%s is not found", key)); 2351 } 2352 2353 throw ex; 2354 } 2355 2356 return new FSDataInputStream(new BufferedFSInputStream( 2357 new NativeAzureFsInputStream(inputStream, key, meta.getLength()), bufferSize)); 2358 } 2359 2360 @Override 2361 public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException { 2362 2363 FolderRenamePending renamePending = null; 2364 2365 LOG.debug("Moving {} to {}", src, dst); 2366 2367 if (containsColon(dst)) { 2368 throw new IOException("Cannot rename to file " + dst 2369 + " through WASB that has colons in the name"); 2370 } 2371 2372 String srcKey = pathToKey(makeAbsolute(src)); 2373 2374 if (srcKey.length() == 0) { 2375 // Cannot rename root of file system 2376 return false; 2377 } 2378 2379 // Figure out the final destination 2380 Path absoluteDst = makeAbsolute(dst); 2381 String dstKey = pathToKey(absoluteDst); 2382 FileMetadata dstMetadata = null; 2383 try { 2384 dstMetadata = store.retrieveMetadata(dstKey); 2385 } catch (IOException ex) { 2386 2387 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2388 2389 // A BlobNotFound storage exception in only thrown from retrieveMetdata API when 2390 // there is a race condition. If there is another thread which deletes the destination 2391 // file or folder, then this thread calling rename should be able to continue with 2392 // rename gracefully. Hence the StorageException is swallowed here. 2393 if (innerException instanceof StorageException) { 2394 if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2395 LOG.debug("BlobNotFound exception encountered for Destination key : {}. " 2396 + "Swallowin the exception to handle race condition gracefully", dstKey); 2397 } 2398 } else { 2399 throw ex; 2400 } 2401 } 2402 2403 if (dstMetadata != null && dstMetadata.isDir()) { 2404 // It's an existing directory. 2405 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); 2406 LOG.debug("Destination {} " 2407 + " is a directory, adjusted the destination to be {}", dst, dstKey); 2408 } else if (dstMetadata != null) { 2409 // Attempting to overwrite a file using rename() 2410 LOG.debug("Destination {}" 2411 + " is an already existing file, failing the rename.", dst); 2412 return false; 2413 } else { 2414 // Check that the parent directory exists. 2415 FileMetadata parentOfDestMetadata = null; 2416 try { 2417 parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDst.getParent())); 2418 } catch (IOException ex) { 2419 2420 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2421 2422 if (innerException instanceof StorageException 2423 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2424 2425 LOG.debug("Parent of destination {} doesn't exists. Failing rename", dst); 2426 return false; 2427 } 2428 2429 throw ex; 2430 } 2431 2432 if (parentOfDestMetadata == null) { 2433 LOG.debug("Parent of the destination {}" 2434 + " doesn't exist, failing the rename.", dst); 2435 return false; 2436 } else if (!parentOfDestMetadata.isDir()) { 2437 LOG.debug("Parent of the destination {}" 2438 + " is a file, failing the rename.", dst); 2439 return false; 2440 } 2441 } 2442 FileMetadata srcMetadata = null; 2443 try { 2444 srcMetadata = store.retrieveMetadata(srcKey); 2445 } catch (IOException ex) { 2446 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2447 2448 if (innerException instanceof StorageException 2449 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2450 2451 LOG.debug("Source {} doesn't exists. Failing rename", src); 2452 return false; 2453 } 2454 2455 throw ex; 2456 } 2457 2458 if (srcMetadata == null) { 2459 // Source doesn't exist 2460 LOG.debug("Source {} doesn't exist, failing the rename.", src); 2461 return false; 2462 } else if (!srcMetadata.isDir()) { 2463 LOG.debug("Source {} found as a file, renaming.", src); 2464 try { 2465 store.rename(srcKey, dstKey); 2466 } catch(IOException ex) { 2467 2468 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2469 2470 if (innerException instanceof StorageException 2471 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2472 2473 LOG.debug("BlobNotFoundException encountered. Failing rename", src); 2474 return false; 2475 } 2476 2477 throw ex; 2478 } 2479 } else { 2480 2481 // Prepare for, execute and clean up after of all files in folder, and 2482 // the root file, and update the last modified time of the source and 2483 // target parent folders. The operation can be redone if it fails part 2484 // way through, by applying the "Rename Pending" file. 2485 2486 // The following code (internally) only does atomic rename preparation 2487 // and lease management for page blob folders, limiting the scope of the 2488 // operation to HBase log file folders, where atomic rename is required. 2489 // In the future, we could generalize it easily to all folders. 2490 renamePending = prepareAtomicFolderRename(srcKey, dstKey); 2491 renamePending.execute(); 2492 2493 LOG.debug("Renamed {} to {} successfully.", src, dst); 2494 renamePending.cleanup(); 2495 return true; 2496 } 2497 2498 // Update the last-modified time of the parent folders of both source 2499 // and destination. 2500 updateParentFolderLastModifiedTime(srcKey); 2501 updateParentFolderLastModifiedTime(dstKey); 2502 2503 LOG.debug("Renamed {} to {} successfully.", src, dst); 2504 return true; 2505 } 2506 2507 /** 2508 * Update the last-modified time of the parent folder of the file 2509 * identified by key. 2510 * @param key 2511 * @throws IOException 2512 */ 2513 private void updateParentFolderLastModifiedTime(String key) 2514 throws IOException { 2515 Path parent = makeAbsolute(keyToPath(key)).getParent(); 2516 if (parent != null && parent.getParent() != null) { // not root 2517 String parentKey = pathToKey(parent); 2518 2519 // ensure the parent is a materialized folder 2520 FileMetadata parentMetadata = store.retrieveMetadata(parentKey); 2521 // The metadata could be null if the implicit folder only contains a 2522 // single file. In this case, the parent folder no longer exists if the 2523 // file is renamed; so we can safely ignore the null pointer case. 2524 if (parentMetadata != null) { 2525 if (parentMetadata.isDir() 2526 && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 2527 store.storeEmptyFolder(parentKey, 2528 createPermissionStatus(FsPermission.getDefault())); 2529 } 2530 2531 if (store.isAtomicRenameKey(parentKey)) { 2532 SelfRenewingLease lease = null; 2533 try { 2534 lease = leaseSourceFolder(parentKey); 2535 store.updateFolderLastModifiedTime(parentKey, lease); 2536 } catch (AzureException e) { 2537 String errorCode = ""; 2538 try { 2539 StorageException e2 = (StorageException) e.getCause(); 2540 errorCode = e2.getErrorCode(); 2541 } catch (Exception e3) { 2542 // do nothing if cast fails 2543 } 2544 if (errorCode.equals("BlobNotFound")) { 2545 throw new FileNotFoundException("Folder does not exist: " + parentKey); 2546 } 2547 LOG.warn("Got unexpected exception trying to get lease on {}. {}", 2548 parentKey, e.getMessage()); 2549 throw e; 2550 } finally { 2551 try { 2552 if (lease != null) { 2553 lease.free(); 2554 } 2555 } catch (Exception e) { 2556 LOG.error("Unable to free lease on {}", parentKey, e); 2557 } 2558 } 2559 } else { 2560 store.updateFolderLastModifiedTime(parentKey, null); 2561 } 2562 } 2563 } 2564 } 2565 2566 /** 2567 * If the source is a page blob folder, 2568 * prepare to rename this folder atomically. This means to get exclusive 2569 * access to the source folder, and record the actions to be performed for 2570 * this rename in a "Rename Pending" file. This code was designed to 2571 * meet the needs of HBase, which requires atomic rename of write-ahead log 2572 * (WAL) folders for correctness. 2573 * 2574 * Before calling this method, the caller must ensure that the source is a 2575 * folder. 2576 * 2577 * For non-page-blob directories, prepare the in-memory information needed, 2578 * but don't take the lease or write the redo file. This is done to limit the 2579 * scope of atomic folder rename to HBase, at least at the time of writing 2580 * this code. 2581 * 2582 * @param srcKey Source folder name. 2583 * @param dstKey Destination folder name. 2584 * @throws IOException 2585 */ 2586 @VisibleForTesting 2587 FolderRenamePending prepareAtomicFolderRename( 2588 String srcKey, String dstKey) throws IOException { 2589 2590 if (store.isAtomicRenameKey(srcKey)) { 2591 2592 // Block unwanted concurrent access to source folder. 2593 SelfRenewingLease lease = leaseSourceFolder(srcKey); 2594 2595 // Prepare in-memory information needed to do or redo a folder rename. 2596 FolderRenamePending renamePending = 2597 new FolderRenamePending(srcKey, dstKey, lease, this); 2598 2599 // Save it to persistent storage to help recover if the operation fails. 2600 renamePending.writeFile(this); 2601 return renamePending; 2602 } else { 2603 FolderRenamePending renamePending = 2604 new FolderRenamePending(srcKey, dstKey, null, this); 2605 return renamePending; 2606 } 2607 } 2608 2609 /** 2610 * Get a self-renewing Azure blob lease on the source folder zero-byte file. 2611 */ 2612 private SelfRenewingLease leaseSourceFolder(String srcKey) 2613 throws AzureException { 2614 return store.acquireLease(srcKey); 2615 } 2616 2617 /** 2618 * Return an array containing hostnames, offset and size of 2619 * portions of the given file. For WASB we'll just lie and give 2620 * fake hosts to make sure we get many splits in MR jobs. 2621 */ 2622 @Override 2623 public BlockLocation[] getFileBlockLocations(FileStatus file, 2624 long start, long len) throws IOException { 2625 if (file == null) { 2626 return null; 2627 } 2628 2629 if ((start < 0) || (len < 0)) { 2630 throw new IllegalArgumentException("Invalid start or len parameter"); 2631 } 2632 2633 if (file.getLen() < start) { 2634 return new BlockLocation[0]; 2635 } 2636 final String blobLocationHost = getConf().get( 2637 AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME, 2638 AZURE_BLOCK_LOCATION_HOST_DEFAULT); 2639 final String[] name = { blobLocationHost }; 2640 final String[] host = { blobLocationHost }; 2641 long blockSize = file.getBlockSize(); 2642 if (blockSize <= 0) { 2643 throw new IllegalArgumentException( 2644 "The block size for the given file is not a positive number: " 2645 + blockSize); 2646 } 2647 int numberOfLocations = (int) (len / blockSize) 2648 + ((len % blockSize == 0) ? 0 : 1); 2649 BlockLocation[] locations = new BlockLocation[numberOfLocations]; 2650 for (int i = 0; i < locations.length; i++) { 2651 long currentOffset = start + (i * blockSize); 2652 long currentLength = Math.min(blockSize, start + len - currentOffset); 2653 locations[i] = new BlockLocation(name, host, currentOffset, currentLength); 2654 } 2655 return locations; 2656 } 2657 2658 /** 2659 * Set the working directory to the given directory. 2660 */ 2661 @Override 2662 public void setWorkingDirectory(Path newDir) { 2663 workingDir = makeAbsolute(newDir); 2664 } 2665 2666 @Override 2667 public Path getWorkingDirectory() { 2668 return workingDir; 2669 } 2670 2671 @Override 2672 public void setPermission(Path p, FsPermission permission) throws FileNotFoundException, IOException { 2673 Path absolutePath = makeAbsolute(p); 2674 String key = pathToKey(absolutePath); 2675 FileMetadata metadata = null; 2676 try { 2677 metadata = store.retrieveMetadata(key); 2678 } catch (IOException ex) { 2679 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2680 2681 if (innerException instanceof StorageException 2682 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2683 2684 throw new FileNotFoundException(String.format("File %s doesn't exists.", p)); 2685 } 2686 2687 throw ex; 2688 } 2689 2690 if (metadata == null) { 2691 throw new FileNotFoundException("File doesn't exist: " + p); 2692 } 2693 permission = applyUMask(permission, 2694 metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory 2695 : UMaskApplyMode.ChangeExistingFile); 2696 if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 2697 // It's an implicit folder, need to materialize it. 2698 store.storeEmptyFolder(key, createPermissionStatus(permission)); 2699 } else if (!metadata.getPermissionStatus().getPermission(). 2700 equals(permission)) { 2701 store.changePermissionStatus(key, new PermissionStatus( 2702 metadata.getPermissionStatus().getUserName(), 2703 metadata.getPermissionStatus().getGroupName(), 2704 permission)); 2705 } 2706 } 2707 2708 @Override 2709 public void setOwner(Path p, String username, String groupname) 2710 throws IOException { 2711 Path absolutePath = makeAbsolute(p); 2712 String key = pathToKey(absolutePath); 2713 FileMetadata metadata = null; 2714 2715 try { 2716 metadata = store.retrieveMetadata(key); 2717 } catch (IOException ex) { 2718 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2719 2720 if (innerException instanceof StorageException 2721 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2722 2723 throw new FileNotFoundException(String.format("File %s doesn't exists.", p)); 2724 } 2725 2726 throw ex; 2727 } 2728 2729 if (metadata == null) { 2730 throw new FileNotFoundException("File doesn't exist: " + p); 2731 } 2732 2733 PermissionStatus newPermissionStatus = new PermissionStatus( 2734 username == null ? 2735 metadata.getPermissionStatus().getUserName() : username, 2736 groupname == null ? 2737 metadata.getPermissionStatus().getGroupName() : groupname, 2738 metadata.getPermissionStatus().getPermission()); 2739 if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 2740 // It's an implicit folder, need to materialize it. 2741 store.storeEmptyFolder(key, newPermissionStatus); 2742 } else { 2743 store.changePermissionStatus(key, newPermissionStatus); 2744 } 2745 } 2746 2747 @Override 2748 public synchronized void close() throws IOException { 2749 if (isClosed) { 2750 return; 2751 } 2752 2753 // Call the base close() to close any resources there. 2754 super.close(); 2755 // Close the store to close any resources there - e.g. the bandwidth 2756 // updater thread would be stopped at this time. 2757 store.close(); 2758 // Notify the metrics system that this file system is closed, which may 2759 // trigger one final metrics push to get the accurate final file system 2760 // metrics out. 2761 2762 long startTime = System.currentTimeMillis(); 2763 2764 if(!getConf().getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { 2765 AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName); 2766 AzureFileSystemMetricsSystem.fileSystemClosed(); 2767 } 2768 2769 LOG.debug("Submitting metrics when file system closed took {} ms.", 2770 (System.currentTimeMillis() - startTime)); 2771 isClosed = true; 2772 } 2773 2774 /** 2775 * A handler that defines what to do with blobs whose upload was 2776 * interrupted. 2777 */ 2778 private abstract class DanglingFileHandler { 2779 abstract void handleFile(FileMetadata file, FileMetadata tempFile) 2780 throws IOException; 2781 } 2782 2783 /** 2784 * Handler implementation for just deleting dangling files and cleaning 2785 * them up. 2786 */ 2787 private class DanglingFileDeleter extends DanglingFileHandler { 2788 @Override 2789 void handleFile(FileMetadata file, FileMetadata tempFile) 2790 throws IOException { 2791 2792 LOG.debug("Deleting dangling file {}", file.getKey()); 2793 store.delete(file.getKey()); 2794 store.delete(tempFile.getKey()); 2795 } 2796 } 2797 2798 /** 2799 * Handler implementation for just moving dangling files to recovery 2800 * location (/lost+found). 2801 */ 2802 private class DanglingFileRecoverer extends DanglingFileHandler { 2803 private final Path destination; 2804 2805 DanglingFileRecoverer(Path destination) { 2806 this.destination = destination; 2807 } 2808 2809 @Override 2810 void handleFile(FileMetadata file, FileMetadata tempFile) 2811 throws IOException { 2812 2813 LOG.debug("Recovering {}", file.getKey()); 2814 // Move to the final destination 2815 String finalDestinationKey = 2816 pathToKey(new Path(destination, file.getKey())); 2817 store.rename(tempFile.getKey(), finalDestinationKey); 2818 if (!finalDestinationKey.equals(file.getKey())) { 2819 // Delete the empty link file now that we've restored it. 2820 store.delete(file.getKey()); 2821 } 2822 } 2823 } 2824 2825 /** 2826 * Check if a path has colons in its name 2827 */ 2828 private boolean containsColon(Path p) { 2829 return p.toUri().getPath().toString().contains(":"); 2830 } 2831 2832 /** 2833 * Implements recover and delete (-move and -delete) behaviors for handling 2834 * dangling files (blobs whose upload was interrupted). 2835 * 2836 * @param root 2837 * The root path to check from. 2838 * @param handler 2839 * The handler that deals with dangling files. 2840 */ 2841 private void handleFilesWithDanglingTempData(Path root, 2842 DanglingFileHandler handler) throws IOException { 2843 // Calculate the cut-off for when to consider a blob to be dangling. 2844 long cutoffForDangling = new Date().getTime() 2845 - getConf().getInt(AZURE_TEMP_EXPIRY_PROPERTY_NAME, 2846 AZURE_TEMP_EXPIRY_DEFAULT) * 1000; 2847 // Go over all the blobs under the given root and look for blobs to 2848 // recover. 2849 String priorLastKey = null; 2850 do { 2851 PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL, 2852 AZURE_UNBOUNDED_DEPTH, priorLastKey); 2853 2854 for (FileMetadata file : listing.getFiles()) { 2855 if (!file.isDir()) { // We don't recover directory blobs 2856 // See if this blob has a link in it (meaning it's a place-holder 2857 // blob for when the upload to the temp blob is complete). 2858 String link = store.getLinkInFileMetadata(file.getKey()); 2859 if (link != null) { 2860 // It has a link, see if the temp blob it is pointing to is 2861 // existent and old enough to be considered dangling. 2862 FileMetadata linkMetadata = store.retrieveMetadata(link); 2863 if (linkMetadata != null 2864 && linkMetadata.getLastModified() >= cutoffForDangling) { 2865 // Found one! 2866 handler.handleFile(file, linkMetadata); 2867 } 2868 } 2869 } 2870 } 2871 priorLastKey = listing.getPriorLastKey(); 2872 } while (priorLastKey != null); 2873 } 2874 2875 /** 2876 * Looks under the given root path for any blob that are left "dangling", 2877 * meaning that they are place-holder blobs that we created while we upload 2878 * the data to a temporary blob, but for some reason we crashed in the middle 2879 * of the upload and left them there. If any are found, we move them to the 2880 * destination given. 2881 * 2882 * @param root 2883 * The root path to consider. 2884 * @param destination 2885 * The destination path to move any recovered files to. 2886 * @throws IOException 2887 */ 2888 public void recoverFilesWithDanglingTempData(Path root, Path destination) 2889 throws IOException { 2890 2891 LOG.debug("Recovering files with dangling temp data in {}", root); 2892 handleFilesWithDanglingTempData(root, 2893 new DanglingFileRecoverer(destination)); 2894 } 2895 2896 /** 2897 * Looks under the given root path for any blob that are left "dangling", 2898 * meaning that they are place-holder blobs that we created while we upload 2899 * the data to a temporary blob, but for some reason we crashed in the middle 2900 * of the upload and left them there. If any are found, we delete them. 2901 * 2902 * @param root 2903 * The root path to consider. 2904 * @throws IOException 2905 */ 2906 public void deleteFilesWithDanglingTempData(Path root) throws IOException { 2907 2908 LOG.debug("Deleting files with dangling temp data in {}", root); 2909 handleFilesWithDanglingTempData(root, new DanglingFileDeleter()); 2910 } 2911 2912 @Override 2913 protected void finalize() throws Throwable { 2914 LOG.debug("finalize() called."); 2915 close(); 2916 super.finalize(); 2917 } 2918 2919 /** 2920 * Encode the key with a random prefix for load balancing in Azure storage. 2921 * Upload data to a random temporary file then do storage side renaming to 2922 * recover the original key. 2923 * 2924 * @param aKey 2925 * @return Encoded version of the original key. 2926 */ 2927 private static String encodeKey(String aKey) { 2928 // Get the tail end of the key name. 2929 // 2930 String fileName = aKey.substring(aKey.lastIndexOf(Path.SEPARATOR) + 1, 2931 aKey.length()); 2932 2933 // Construct the randomized prefix of the file name. The prefix ensures the 2934 // file always drops into the same folder but with a varying tail key name. 2935 String filePrefix = AZURE_TEMP_FOLDER + Path.SEPARATOR 2936 + UUID.randomUUID().toString(); 2937 2938 // Concatenate the randomized prefix with the tail of the key name. 2939 String randomizedKey = filePrefix + fileName; 2940 2941 // Return to the caller with the randomized key. 2942 return randomizedKey; 2943 } 2944}