001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.azure;
020
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.EOFException;
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.OutputStream;
028import java.net.URI;
029import java.net.URISyntaxException;
030import java.nio.charset.Charset;
031import java.text.SimpleDateFormat;
032import java.util.ArrayList;
033import java.util.Date;
034import java.util.EnumSet;
035import java.util.Set;
036import java.util.TimeZone;
037import java.util.TreeSet;
038import java.util.UUID;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.regex.Matcher;
041import java.util.regex.Pattern;
042
043import org.apache.commons.lang.StringUtils;
044import org.apache.hadoop.classification.InterfaceAudience;
045import org.apache.hadoop.classification.InterfaceStability;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.BlockLocation;
048import org.apache.hadoop.fs.BufferedFSInputStream;
049import org.apache.hadoop.fs.CreateFlag;
050import org.apache.hadoop.fs.FSDataInputStream;
051import org.apache.hadoop.fs.FSDataOutputStream;
052import org.apache.hadoop.fs.FSExceptionMessages;
053import org.apache.hadoop.fs.FSInputStream;
054import org.apache.hadoop.fs.FileAlreadyExistsException;
055import org.apache.hadoop.fs.FileStatus;
056import org.apache.hadoop.fs.FileSystem;
057import org.apache.hadoop.fs.Path;
058import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
059import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
060import org.apache.hadoop.fs.permission.FsPermission;
061import org.apache.hadoop.fs.permission.PermissionStatus;
062import org.apache.hadoop.io.IOUtils;
063import org.apache.hadoop.security.UserGroupInformation;
064import org.apache.hadoop.util.Progressable;
065import org.apache.hadoop.util.Time;
066import org.codehaus.jackson.JsonNode;
067import org.codehaus.jackson.JsonParseException;
068import org.codehaus.jackson.JsonParser;
069import org.codehaus.jackson.map.JsonMappingException;
070import org.codehaus.jackson.map.ObjectMapper;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import com.google.common.annotations.VisibleForTesting;
075import com.microsoft.azure.storage.StorageException;
076
077/**
078 * A {@link FileSystem} for reading and writing files stored on <a
079 * href="http://store.azure.com/">Windows Azure</a>. This implementation is
080 * blob-based and stores files on Azure in their native form so they can be read
081 * by other Azure tools.
082 */
083@InterfaceAudience.Public
084@InterfaceStability.Stable
085public class NativeAzureFileSystem extends FileSystem {
086  private static final int USER_WX_PERMISION = 0300;
087  /**
088   * A description of a folder rename operation, including the source and
089   * destination keys, and descriptions of the files in the source folder.
090   */
091
092  public static class FolderRenamePending {
093    private SelfRenewingLease folderLease;
094    private String srcKey;
095    private String dstKey;
096    private FileMetadata[] fileMetadata = null;    // descriptions of source files
097    private ArrayList<String> fileStrings = null;
098    private NativeAzureFileSystem fs;
099    private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000;
100    private static final int FORMATTING_BUFFER = 10000;
101    private boolean committed;
102    public static final String SUFFIX = "-RenamePending.json";
103
104    // Prepare in-memory information needed to do or redo a folder rename.
105    public FolderRenamePending(String srcKey, String dstKey, SelfRenewingLease lease,
106        NativeAzureFileSystem fs) throws IOException {
107      this.srcKey = srcKey;
108      this.dstKey = dstKey;
109      this.folderLease = lease;
110      this.fs = fs;
111      ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
112
113      // List all the files in the folder.
114      long start = Time.monotonicNow();
115      String priorLastKey = null;
116      do {
117        PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL,
118          AZURE_UNBOUNDED_DEPTH, priorLastKey);
119        for(FileMetadata file : listing.getFiles()) {
120          fileMetadataList.add(file);
121        }
122        priorLastKey = listing.getPriorLastKey();
123      } while (priorLastKey != null);
124      fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
125      long end = Time.monotonicNow();
126      LOG.debug("Time taken to list {} blobs for rename operation is: {} ms", fileMetadata.length, (end - start));
127
128      this.committed = true;
129    }
130
131    // Prepare in-memory information needed to do or redo folder rename from
132    // a -RenamePending.json file read from storage. This constructor is to use during
133    // redo processing.
134    public FolderRenamePending(Path redoFile, NativeAzureFileSystem fs)
135        throws IllegalArgumentException, IOException {
136
137      this.fs = fs;
138
139      // open redo file
140      Path f = redoFile;
141      FSDataInputStream input = fs.open(f);
142      byte[] bytes = new byte[MAX_RENAME_PENDING_FILE_SIZE];
143      int l = input.read(bytes);
144      if (l <= 0) {
145        // Jira HADOOP-12678 -Handle empty rename pending metadata file during
146        // atomic rename in redo path. If during renamepending file is created
147        // but not written yet, then this means that rename operation
148        // has not started yet. So we should delete rename pending metadata file.
149        LOG.error("Deleting empty rename pending file "
150            + redoFile + " -- no data available");
151        deleteRenamePendingFile(fs, redoFile);
152        return;
153      }
154      if (l == MAX_RENAME_PENDING_FILE_SIZE) {
155        throw new IOException(
156            "Error reading pending rename file contents -- "
157                + "maximum file size exceeded");
158      }
159      String contents = new String(bytes, 0, l, Charset.forName("UTF-8"));
160
161      // parse the JSON
162      ObjectMapper objMapper = new ObjectMapper();
163      objMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
164      JsonNode json = null;
165      try {
166        json = objMapper.readValue(contents, JsonNode.class);
167        this.committed = true;
168      } catch (JsonMappingException e) {
169
170        // The -RedoPending.json file is corrupted, so we assume it was
171        // not completely written
172        // and the redo operation did not commit.
173        this.committed = false;
174      } catch (JsonParseException e) {
175        this.committed = false;
176      } catch (IOException e) {
177        this.committed = false;
178      }
179      
180      if (!this.committed) {
181        LOG.error("Deleting corruped rename pending file {} \n {}",
182            redoFile, contents);
183
184        // delete the -RenamePending.json file
185        deleteRenamePendingFile(fs, redoFile);
186        return;
187      }
188
189      // initialize this object's fields
190      ArrayList<String> fileStrList = new ArrayList<String>();
191      JsonNode oldFolderName = json.get("OldFolderName");
192      JsonNode newFolderName = json.get("NewFolderName");
193      if (oldFolderName == null || newFolderName == null) {
194          this.committed = false;
195      } else {
196        this.srcKey = oldFolderName.getTextValue();
197        this.dstKey = newFolderName.getTextValue();
198        if (this.srcKey == null || this.dstKey == null) {
199          this.committed = false;
200        } else {
201          JsonNode fileList = json.get("FileList");
202          if (fileList == null) {
203            this.committed = false;
204          } else {
205            for (int i = 0; i < fileList.size(); i++) {
206              fileStrList.add(fileList.get(i).getTextValue());
207            }
208          }
209        }
210      }
211      this.fileStrings = fileStrList;
212    }
213
214    public FileMetadata[] getFiles() {
215      return fileMetadata;
216    }
217
218    public SelfRenewingLease getFolderLease() {
219      return folderLease;
220    }
221
222    /**
223     * Deletes rename pending metadata file
224     * @param fs -- the file system
225     * @param redoFile - rename pending metadata file path
226     * @throws IOException - If deletion fails
227     */
228    @VisibleForTesting
229    void deleteRenamePendingFile(FileSystem fs, Path redoFile)
230        throws IOException {
231      try {
232        fs.delete(redoFile, false);
233      } catch (IOException e) {
234        // If the rename metadata was not found then somebody probably
235        // raced with us and finished the delete first
236        Throwable t = e.getCause();
237        if (t != null && t instanceof StorageException
238            && "BlobNotFound".equals(((StorageException) t).getErrorCode())) {
239          LOG.warn("rename pending file " + redoFile + " is already deleted");
240        } else {
241          throw e;
242        }
243      }
244    }
245
246    /**
247     * Write to disk the information needed to redo folder rename,
248     * in JSON format. The file name will be
249     * {@code wasb://<sourceFolderPrefix>/folderName-RenamePending.json}
250     * The file format will be:
251     * <pre>{@code
252     * {
253     *   FormatVersion: "1.0",
254     *   OperationTime: "<YYYY-MM-DD HH:MM:SS.MMM>",
255     *   OldFolderName: "<key>",
256     *   NewFolderName: "<key>",
257     *   FileList: [ <string> , <string> , ... ]
258     * }
259     *
260     * Here's a sample:
261     * {
262     *  FormatVersion: "1.0",
263     *  OperationUTCTime: "2014-07-01 23:50:35.572",
264     *  OldFolderName: "user/ehans/folderToRename",
265     *  NewFolderName: "user/ehans/renamedFolder",
266     *  FileList: [
267     *    "innerFile",
268     *    "innerFile2"
269     *  ]
270     * } }</pre>
271     * @throws IOException
272     */
273    public void writeFile(FileSystem fs) throws IOException {
274      Path path = getRenamePendingFilePath();
275      LOG.debug("Preparing to write atomic rename state to {}", path.toString());
276      OutputStream output = null;
277
278      String contents = makeRenamePendingFileContents();
279
280      // Write file.
281      try {
282        output = fs.create(path);
283        output.write(contents.getBytes(Charset.forName("UTF-8")));
284      } catch (IOException e) {
285        throw new IOException("Unable to write RenamePending file for folder rename from "
286            + srcKey + " to " + dstKey, e);
287      } finally {
288        NativeAzureFileSystemHelper.cleanup(LOG, output);
289      }
290    }
291
292    /**
293     * Return the contents of the JSON file to represent the operations
294     * to be performed for a folder rename.
295     */
296    public String makeRenamePendingFileContents() {
297      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
298      sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
299      String time = sdf.format(new Date());
300
301      // Make file list string
302      StringBuilder builder = new StringBuilder();
303      builder.append("[\n");
304      for (int i = 0; i != fileMetadata.length; i++) {
305        if (i > 0) {
306          builder.append(",\n");
307        }
308        builder.append("    ");
309        String noPrefix = StringUtils.removeStart(fileMetadata[i].getKey(), srcKey + "/");
310
311        // Quote string file names, escaping any possible " characters or other
312        // necessary characters in the name.
313        builder.append(quote(noPrefix));
314        if (builder.length() >=
315            MAX_RENAME_PENDING_FILE_SIZE - FORMATTING_BUFFER) {
316
317          // Give up now to avoid using too much memory.
318          LOG.error("Internal error: Exceeded maximum rename pending file size of {} bytes.",
319              MAX_RENAME_PENDING_FILE_SIZE);
320
321          // return some bad JSON with an error message to make it human readable
322          return "exceeded maximum rename pending file size";
323        }
324      }
325      builder.append("\n  ]");
326      String fileList = builder.toString();
327
328      // Make file contents as a string. Again, quote file names, escaping
329      // characters as appropriate.
330      String contents = "{\n"
331          + "  FormatVersion: \"1.0\",\n"
332          + "  OperationUTCTime: \"" + time + "\",\n"
333          + "  OldFolderName: " + quote(srcKey) + ",\n"
334          + "  NewFolderName: " + quote(dstKey) + ",\n"
335          + "  FileList: " + fileList + "\n"
336          + "}\n";
337
338      return contents;
339    }
340    
341    /**
342     * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote 
343     * method.
344     * 
345     * Produce a string in double quotes with backslash sequences in all the
346     * right places. A backslash will be inserted within </, allowing JSON
347     * text to be delivered in HTML. In JSON text, a string cannot contain a
348     * control character or an unescaped quote or backslash.
349     * @param string A String
350     * @return  A String correctly formatted for insertion in a JSON text.
351     */
352    private String quote(String string) {
353        if (string == null || string.length() == 0) {
354            return "\"\"";
355        }
356
357        char c = 0;
358        int  i;
359        int  len = string.length();
360        StringBuilder sb = new StringBuilder(len + 4);
361        String t;
362
363        sb.append('"');
364        for (i = 0; i < len; i += 1) {
365            c = string.charAt(i);
366            switch (c) {
367            case '\\':
368            case '"':
369                sb.append('\\');
370                sb.append(c);
371                break;
372            case '/':
373                sb.append('\\');
374                sb.append(c);
375                break;
376            case '\b':
377                sb.append("\\b");
378                break;
379            case '\t':
380                sb.append("\\t");
381                break;
382            case '\n':
383                sb.append("\\n");
384                break;
385            case '\f':
386                sb.append("\\f");
387                break;
388            case '\r':
389                sb.append("\\r");
390                break;
391            default:
392                if (c < ' ') {
393                    t = "000" + Integer.toHexString(c);
394                    sb.append("\\u" + t.substring(t.length() - 4));
395                } else {
396                    sb.append(c);
397                }
398            }
399        }
400        sb.append('"');
401        return sb.toString();
402    }
403
404    public String getSrcKey() {
405      return srcKey;
406    }
407
408    public String getDstKey() {
409      return dstKey;
410    }
411
412    public FileMetadata getSourceMetadata() throws IOException {
413      return fs.getStoreInterface().retrieveMetadata(srcKey);
414    }
415
416    /**
417     * Execute a folder rename. This is the execution path followed
418     * when everything is working normally. See redo() for the alternate
419     * execution path for the case where we're recovering from a folder rename
420     * failure.
421     * @throws IOException
422     */
423    public void execute() throws IOException {
424
425      AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
426        @Override
427        public boolean execute(FileMetadata file) throws IOException{
428          renameFile(file);
429          return true;
430        }
431      };
432
433      AzureFileSystemThreadPoolExecutor executor = this.fs.getThreadPoolExecutor(this.fs.renameThreadCount,
434          "AzureBlobRenameThread", "Rename", getSrcKey(), AZURE_RENAME_THREADS);
435
436      executor.executeParallel(this.getFiles(), task);
437
438      // Rename the source folder 0-byte root file itself.
439      FileMetadata srcMetadata2 = this.getSourceMetadata();
440      if (srcMetadata2.getBlobMaterialization() ==
441          BlobMaterialization.Explicit) {
442
443        // It already has a lease on it from the "prepare" phase so there's no
444        // need to get one now. Pass in existing lease to allow file delete.
445        fs.getStoreInterface().rename(this.getSrcKey(), this.getDstKey(),
446            false, folderLease);
447      }
448
449      // Update the last-modified time of the parent folders of both source and
450      // destination.
451      fs.updateParentFolderLastModifiedTime(srcKey);
452      fs.updateParentFolderLastModifiedTime(dstKey);
453    }
454
455    // Rename a single file
456    @VisibleForTesting
457    void renameFile(FileMetadata file) throws IOException{
458      // Rename all materialized entries under the folder to point to the
459      // final destination.
460      if (file.getBlobMaterialization() == BlobMaterialization.Explicit) {
461        String srcName = file.getKey();
462        String suffix  = srcName.substring((this.getSrcKey()).length());
463        String dstName = this.getDstKey() + suffix;
464
465        // Rename gets exclusive access (via a lease) for files
466        // designated for atomic rename.
467        // The main use case is for HBase write-ahead log (WAL) and data
468        // folder processing correctness.  See the rename code for details.
469        boolean acquireLease = this.fs.getStoreInterface().isAtomicRenameKey(srcName);
470        this.fs.getStoreInterface().rename(srcName, dstName, acquireLease, null);
471      }
472    }
473
474    /** Clean up after execution of rename.
475     * @throws IOException */
476    public void cleanup() throws IOException {
477
478      if (fs.getStoreInterface().isAtomicRenameKey(srcKey)) {
479
480        // Remove RenamePending file
481        fs.delete(getRenamePendingFilePath(), false);
482
483        // Freeing source folder lease is not necessary since the source
484        // folder file was deleted.
485      }
486    }
487
488    private Path getRenamePendingFilePath() {
489      String fileName = srcKey + SUFFIX;
490      Path fileNamePath = keyToPath(fileName);
491      Path path = fs.makeAbsolute(fileNamePath);
492      return path;
493    }
494
495    /**
496     * Recover from a folder rename failure by redoing the intended work,
497     * as recorded in the -RenamePending.json file.
498     * 
499     * @throws IOException
500     */
501    public void redo() throws IOException {
502
503      if (!committed) {
504
505        // Nothing to do. The -RedoPending.json file should have already been
506        // deleted.
507        return;
508      }
509
510      // Try to get a lease on source folder to block concurrent access to it.
511      // It may fail if the folder is already gone. We don't check if the
512      // source exists explicitly because that could recursively trigger redo
513      // and give an infinite recursion.
514      SelfRenewingLease lease = null;
515      boolean sourceFolderGone = false;
516      try {
517        lease = fs.leaseSourceFolder(srcKey);
518      } catch (AzureException e) {
519
520        // If the source folder was not found then somebody probably
521        // raced with us and finished the rename first, or the
522        // first rename failed right before deleting the rename pending
523        // file.
524        String errorCode = "";
525        try {
526          StorageException se = (StorageException) e.getCause();
527          errorCode = se.getErrorCode();
528        } catch (Exception e2) {
529          ; // do nothing -- could not get errorCode
530        }
531        if (errorCode.equals("BlobNotFound")) {
532          sourceFolderGone = true;
533        } else {
534          throw new IOException(
535              "Unexpected error when trying to lease source folder name during "
536              + "folder rename redo",
537              e);
538        }
539      }
540
541      if (!sourceFolderGone) {
542        // Make sure the target folder exists.
543        Path dst = fullPath(dstKey);
544        if (!fs.exists(dst)) {
545          fs.mkdirs(dst);
546        }
547
548        // For each file inside the folder to be renamed,
549        // make sure it has been renamed.
550        for(String fileName : fileStrings) {
551          finishSingleFileRename(fileName);
552        }
553
554        // Remove the source folder. Don't check explicitly if it exists,
555        // to avoid triggering redo recursively.
556        try {
557          fs.getStoreInterface().delete(srcKey, lease);
558        } catch (Exception e) {
559          LOG.info("Unable to delete source folder during folder rename redo. "
560              + "If the source folder is already gone, this is not an error "
561              + "condition. Continuing with redo.", e);
562        }
563
564        // Update the last-modified time of the parent folders of both source
565        // and destination.
566        fs.updateParentFolderLastModifiedTime(srcKey);
567        fs.updateParentFolderLastModifiedTime(dstKey);
568      }
569
570      // Remove the -RenamePending.json file.
571      fs.delete(getRenamePendingFilePath(), false);
572    }
573
574    // See if the source file is still there, and if it is, rename it.
575    private void finishSingleFileRename(String fileName)
576        throws IOException {
577      Path srcFile = fullPath(srcKey, fileName);
578      Path dstFile = fullPath(dstKey, fileName);
579      String srcName = fs.pathToKey(srcFile);
580      String dstName = fs.pathToKey(dstFile);
581      boolean srcExists = fs.getStoreInterface().explicitFileExists(srcName);
582      boolean dstExists = fs.getStoreInterface().explicitFileExists(dstName);
583      if(srcExists) {
584        // Rename gets exclusive access (via a lease) for HBase write-ahead log
585        // (WAL) file processing correctness.  See the rename code for details.
586        fs.getStoreInterface().rename(srcName, dstName, true, null);
587      } else if (!srcExists && dstExists) {
588        // The rename already finished, so do nothing.
589        ;
590      } else {
591        throw new IOException(
592            "Attempting to complete rename of file " + srcKey + "/" + fileName
593            + " during folder rename redo, and file was not found in source "
594            + "or destination.");
595      }
596    }
597
598    // Return an absolute path for the specific fileName within the folder
599    // specified by folderKey.
600    private Path fullPath(String folderKey, String fileName) {
601      return new Path(new Path(fs.getUri()), "/" + folderKey + "/" + fileName);
602    }
603
604    private Path fullPath(String fileKey) {
605      return new Path(new Path(fs.getUri()), "/" + fileKey);
606    }
607  }
608
609  private static final String TRAILING_PERIOD_PLACEHOLDER = "[[.]]";
610  private static final Pattern TRAILING_PERIOD_PLACEHOLDER_PATTERN =
611      Pattern.compile("\\[\\[\\.\\]\\](?=$|/)");
612  private static final Pattern TRAILING_PERIOD_PATTERN = Pattern.compile("\\.(?=$|/)");
613
614  @Override
615  public String getScheme() {
616    return "wasb";
617  }
618
619  
620  /**
621   * <p>
622   * A {@link FileSystem} for reading and writing files stored on <a
623   * href="http://store.azure.com/">Windows Azure</a>. This implementation is
624   * blob-based and stores files on Azure in their native form so they can be read
625   * by other Azure tools. This implementation uses HTTPS for secure network communication.
626   * </p>
627   */
628  public static class Secure extends NativeAzureFileSystem {
629    @Override
630    public String getScheme() {
631      return "wasbs";
632    }
633  }
634
635  public static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystem.class);
636
637  static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
638  /**
639   * The time span in seconds before which we consider a temp blob to be
640   * dangling (not being actively uploaded to) and up for reclamation.
641   * 
642   * So e.g. if this is 60, then any temporary blobs more than a minute old
643   * would be considered dangling.
644   */
645  static final String AZURE_TEMP_EXPIRY_PROPERTY_NAME = "fs.azure.fsck.temp.expiry.seconds";
646  private static final int AZURE_TEMP_EXPIRY_DEFAULT = 3600;
647  static final String PATH_DELIMITER = Path.SEPARATOR;
648  static final String AZURE_TEMP_FOLDER = "_$azuretmpfolder$";
649
650  private static final int AZURE_LIST_ALL = -1;
651  private static final int AZURE_UNBOUNDED_DEPTH = -1;
652
653  private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L;
654
655  /**
656   * The configuration property that determines which group owns files created
657   * in WASB.
658   */
659  private static final String AZURE_DEFAULT_GROUP_PROPERTY_NAME = "fs.azure.permissions.supergroup";
660  /**
661   * The default value for fs.azure.permissions.supergroup. Chosen as the same
662   * default as DFS.
663   */
664  static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup";
665
666  static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME =
667      "fs.azure.block.location.impersonatedhost";
668  private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT =
669      "localhost";
670  static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME =
671      "fs.azure.ring.buffer.capacity";
672  static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME =
673      "fs.azure.output.stream.buffer.size";
674
675  public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics";
676
677  /*
678   * Property to enable Append API.
679   */
680  public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support";
681
682  /**
683   * The configuration property to set number of threads to be used for rename operation.
684   */
685  public static final String AZURE_RENAME_THREADS = "fs.azure.rename.threads";
686
687  /**
688   * The default number of threads to be used for rename operation.
689   */
690  public static final int DEFAULT_AZURE_RENAME_THREADS = 0;
691
692  /**
693   * The configuration property to set number of threads to be used for delete operation.
694   */
695  public static final String AZURE_DELETE_THREADS = "fs.azure.delete.threads";
696
697  /**
698   * The default number of threads to be used for delete operation.
699   */
700  public static final int DEFAULT_AZURE_DELETE_THREADS = 0;
701
702  /**
703   * The number of threads to be used for delete operation after reading user configuration.
704   */
705  private int deleteThreadCount = 0;
706
707  /**
708   * The number of threads to be used for rename operation after reading user configuration.
709   */
710  private int renameThreadCount = 0;
711
712  private class NativeAzureFsInputStream extends FSInputStream {
713    private InputStream in;
714    private final String key;
715    private long pos = 0;
716    private boolean closed = false;
717    private boolean isPageBlob;
718
719    // File length, valid only for streams over block blobs.
720    private long fileLength;
721
722    public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) {
723      this.in = in;
724      this.key = key;
725      this.isPageBlob = store.isPageBlobKey(key);
726      this.fileLength = fileLength;
727    }
728
729    /**
730     * Return the size of the remaining available bytes
731     * if the size is less than or equal to {@link Integer#MAX_VALUE},
732     * otherwise, return {@link Integer#MAX_VALUE}.
733     *
734     * This is to match the behavior of DFSInputStream.available(),
735     * which some clients may rely on (HBase write-ahead log reading in
736     * particular).
737     */
738    @Override
739    public synchronized int available() throws IOException {
740      if (isPageBlob) {
741        return in.available();
742      } else {
743        if (closed) {
744          throw new IOException("Stream closed");
745        }
746        final long remaining = this.fileLength - pos;
747        return remaining <= Integer.MAX_VALUE ?
748            (int) remaining : Integer.MAX_VALUE;
749      }
750    }
751
752    /*
753     * Reads the next byte of data from the input stream. The value byte is
754     * returned as an integer in the range 0 to 255. If no byte is available
755     * because the end of the stream has been reached, the value -1 is returned.
756     * This method blocks until input data is available, the end of the stream
757     * is detected, or an exception is thrown.
758     *
759     * @returns int An integer corresponding to the byte read.
760     */
761    @Override
762    public synchronized int read() throws FileNotFoundException, IOException {
763      try {
764        int result = 0;
765        result = in.read();
766        if (result != -1) {
767          pos++;
768          if (statistics != null) {
769            statistics.incrementBytesRead(1);
770          }
771        }
772      // Return to the caller with the result.
773      //
774        return result;
775      } catch(EOFException e) {
776        return -1;
777      } catch(IOException e) {
778
779        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
780
781        if (innerException instanceof StorageException) {
782
783          LOG.error("Encountered Storage Exception for read on Blob : {}"
784              + " Exception details: {} Error Code : {}",
785              key, e, ((StorageException) innerException).getErrorCode());
786
787          if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
788            throw new FileNotFoundException(String.format("%s is not found", key));
789          }
790        }
791
792       throw e;
793      }
794    }
795
796    /*
797     * Reads up to len bytes of data from the input stream into an array of
798     * bytes. An attempt is made to read as many as len bytes, but a smaller
799     * number may be read. The number of bytes actually read is returned as an
800     * integer. This method blocks until input data is available, end of file is
801     * detected, or an exception is thrown. If len is zero, then no bytes are
802     * read and 0 is returned; otherwise, there is an attempt to read at least
803     * one byte. If no byte is available because the stream is at end of file,
804     * the value -1 is returned; otherwise, at least one byte is read and stored
805     * into b.
806     *
807     * @param b -- the buffer into which data is read
808     *
809     * @param off -- the start offset in the array b at which data is written
810     *
811     * @param len -- the maximum number of bytes read
812     *
813     * @ returns int The total number of byes read into the buffer, or -1 if
814     * there is no more data because the end of stream is reached.
815     */
816    @Override
817    public synchronized int read(byte[] b, int off, int len) throws FileNotFoundException, IOException {
818      try {
819        int result = 0;
820        result = in.read(b, off, len);
821        if (result > 0) {
822          pos += result;
823        }
824
825        if (null != statistics && result > 0) {
826          statistics.incrementBytesRead(result);
827        }
828
829        // Return to the caller with the result.
830        return result;
831      } catch(IOException e) {
832
833        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
834
835        if (innerException instanceof StorageException) {
836
837          LOG.error("Encountered Storage Exception for read on Blob : {}"
838              + " Exception details: {} Error Code : {}",
839              key, e, ((StorageException) innerException).getErrorCode());
840
841          if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
842            throw new FileNotFoundException(String.format("%s is not found", key));
843          }
844        }
845
846       throw e;
847      }
848    }
849
850    @Override
851    public synchronized void close() throws IOException {
852      if (!closed) {
853        closed = true;
854        IOUtils.closeStream(in);
855        in = null;
856      }
857    }
858
859    @Override
860    public synchronized void seek(long pos) throws FileNotFoundException, EOFException, IOException {
861      try {
862        checkNotClosed();
863        if (pos < 0) {
864          throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
865        }
866        IOUtils.closeStream(in);
867        in = store.retrieve(key);
868        this.pos = in.skip(pos);
869        LOG.debug("Seek to position {}. Bytes skipped {}", pos,
870          this.pos);
871      } catch(IOException e) {
872
873        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
874
875        if (innerException instanceof StorageException
876            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
877          throw new FileNotFoundException(String.format("%s is not found", key));
878        }
879
880        throw e;
881      } catch(IndexOutOfBoundsException e) {
882        throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
883      }
884    }
885
886    @Override
887    public synchronized long getPos() throws IOException {
888      return pos;
889    }
890
891    @Override
892    public boolean seekToNewSource(long targetPos) throws IOException {
893      return false;
894    }
895
896
897    /*
898     * Helper method to check if a stream is closed.
899     */
900    private void checkNotClosed() throws IOException {
901      if (closed) {
902        throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
903      }
904    }
905  }
906
907  private class NativeAzureFsOutputStream extends OutputStream {
908    // We should not override flush() to actually close current block and flush
909    // to DFS, this will break applications that assume flush() is a no-op.
910    // Applications are advised to use Syncable.hflush() for that purpose.
911    // NativeAzureFsOutputStream needs to implement Syncable if needed.
912    private String key;
913    private String keyEncoded;
914    private OutputStream out;
915
916    public NativeAzureFsOutputStream(OutputStream out, String aKey,
917        String anEncodedKey) throws IOException {
918      // Check input arguments. The output stream should be non-null and the
919      // keys
920      // should be valid strings.
921      if (null == out) {
922        throw new IllegalArgumentException(
923            "Illegal argument: the output stream is null.");
924      }
925
926      if (null == aKey || 0 == aKey.length()) {
927        throw new IllegalArgumentException(
928            "Illegal argument the key string is null or empty");
929      }
930
931      if (null == anEncodedKey || 0 == anEncodedKey.length()) {
932        throw new IllegalArgumentException(
933            "Illegal argument the encoded key string is null or empty");
934      }
935
936      // Initialize the member variables with the incoming parameters.
937      this.out = out;
938
939      setKey(aKey);
940      setEncodedKey(anEncodedKey);
941    }
942
943    @Override
944    public synchronized void close() throws IOException {
945      if (out != null) {
946        // Close the output stream and decode the key for the output stream
947        // before returning to the caller.
948        //
949        out.close();
950        restoreKey();
951        out = null;
952      }
953    }
954
955    /**
956     * Writes the specified byte to this output stream. The general contract for
957     * write is that one byte is written to the output stream. The byte to be
958     * written is the eight low-order bits of the argument b. The 24 high-order
959     * bits of b are ignored.
960     * 
961     * @param b
962     *          32-bit integer of block of 4 bytes
963     */
964    @Override
965    public void write(int b) throws IOException {
966      try {
967        out.write(b);
968      } catch(IOException e) {
969        if (e.getCause() instanceof StorageException) {
970          StorageException storageExcp  = (StorageException) e.getCause();
971          LOG.error("Encountered Storage Exception for write on Blob : {}"
972              + " Exception details: {} Error Code : {}",
973              key, e.getMessage(), storageExcp.getErrorCode());
974        }
975        throw e;
976      }
977    }
978
979    /**
980     * Writes b.length bytes from the specified byte array to this output
981     * stream. The general contract for write(b) is that it should have exactly
982     * the same effect as the call write(b, 0, b.length).
983     * 
984     * @param b
985     *          Block of bytes to be written to the output stream.
986     */
987    @Override
988    public void write(byte[] b) throws IOException {
989      try {
990        out.write(b);
991      } catch(IOException e) {
992        if (e.getCause() instanceof StorageException) {
993          StorageException storageExcp  = (StorageException) e.getCause();
994          LOG.error("Encountered Storage Exception for write on Blob : {}"
995              + " Exception details: {} Error Code : {}",
996              key, e.getMessage(), storageExcp.getErrorCode());
997        }
998        throw e;
999      }
1000    }
1001
1002    /**
1003     * Writes <code>len</code> from the specified byte array starting at offset
1004     * <code>off</code> to the output stream. The general contract for write(b,
1005     * off, len) is that some of the bytes in the array <code>
1006     * b</code b> are written to the output stream in order; element
1007     * <code>b[off]</code> is the first byte written and
1008     * <code>b[off+len-1]</code> is the last byte written by this operation.
1009     * 
1010     * @param b
1011     *          Byte array to be written.
1012     * @param off
1013     *          Write this offset in stream.
1014     * @param len
1015     *          Number of bytes to be written.
1016     */
1017    @Override
1018    public void write(byte[] b, int off, int len) throws IOException {
1019      try {
1020        out.write(b, off, len);
1021      } catch(IOException e) {
1022        if (e.getCause() instanceof StorageException) {
1023          StorageException storageExcp  = (StorageException) e.getCause();
1024          LOG.error("Encountered Storage Exception for write on Blob : {}"
1025              + " Exception details: {} Error Code : {}",
1026              key, e.getMessage(), storageExcp.getErrorCode());
1027        }
1028        throw e;
1029      }
1030    }
1031
1032    /**
1033     * Get the blob name.
1034     * 
1035     * @return String Blob name.
1036     */
1037    public String getKey() {
1038      return key;
1039    }
1040
1041    /**
1042     * Set the blob name.
1043     * 
1044     * @param key
1045     *          Blob name.
1046     */
1047    public void setKey(String key) {
1048      this.key = key;
1049    }
1050
1051    /**
1052     * Get the blob name.
1053     * 
1054     * @return String Blob name.
1055     */
1056    public String getEncodedKey() {
1057      return keyEncoded;
1058    }
1059
1060    /**
1061     * Set the blob name.
1062     * 
1063     * @param anEncodedKey
1064     *          Blob name.
1065     */
1066    public void setEncodedKey(String anEncodedKey) {
1067      this.keyEncoded = anEncodedKey;
1068    }
1069
1070    /**
1071     * Restore the original key name from the m_key member variable. Note: The
1072     * output file stream is created with an encoded blob store key to guarantee
1073     * load balancing on the front end of the Azure storage partition servers.
1074     * The create also includes the name of the original key value which is
1075     * stored in the m_key member variable. This method should only be called
1076     * when the stream is closed.
1077     */
1078    private void restoreKey() throws IOException {
1079      store.rename(getEncodedKey(), getKey());
1080    }
1081  }
1082
1083  private URI uri;
1084  private NativeFileSystemStore store;
1085  private AzureNativeFileSystemStore actualStore;
1086  private Path workingDir;
1087  private long blockSize = MAX_AZURE_BLOCK_SIZE;
1088  private AzureFileSystemInstrumentation instrumentation;
1089  private String metricsSourceName;
1090  private boolean isClosed = false;
1091  private static boolean suppressRetryPolicy = false;
1092  // A counter to create unique (within-process) names for my metrics sources.
1093  private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
1094  private boolean appendSupportEnabled = false;
1095  
1096  public NativeAzureFileSystem() {
1097    // set store in initialize()
1098  }
1099
1100  public NativeAzureFileSystem(NativeFileSystemStore store) {
1101    this.store = store;
1102  }
1103
1104  /**
1105   * Suppress the default retry policy for the Storage, useful in unit tests to
1106   * test negative cases without waiting forever.
1107   */
1108  @VisibleForTesting
1109  static void suppressRetryPolicy() {
1110    suppressRetryPolicy = true;
1111  }
1112
1113  /**
1114   * Undo the effect of suppressRetryPolicy.
1115   */
1116  @VisibleForTesting
1117  static void resumeRetryPolicy() {
1118    suppressRetryPolicy = false;
1119  }
1120
1121  /**
1122   * Creates a new metrics source name that's unique within this process.
1123   */
1124  @VisibleForTesting
1125  public static String newMetricsSourceName() {
1126    int number = metricsSourceNameCounter.incrementAndGet();
1127    final String baseName = "AzureFileSystemMetrics";
1128    if (number == 1) { // No need for a suffix for the first one
1129      return baseName;
1130    } else {
1131      return baseName + number;
1132    }
1133  }
1134  
1135  /**
1136   * Checks if the given URI scheme is a scheme that's affiliated with the Azure
1137   * File System.
1138   * 
1139   * @param scheme
1140   *          The URI scheme.
1141   * @return true iff it's an Azure File System URI scheme.
1142   */
1143  private static boolean isWasbScheme(String scheme) {
1144    // The valid schemes are: asv (old name), asvs (old name over HTTPS),
1145    // wasb (new name), wasbs (new name over HTTPS).
1146    return scheme != null
1147        && (scheme.equalsIgnoreCase("asv") || scheme.equalsIgnoreCase("asvs")
1148            || scheme.equalsIgnoreCase("wasb") || scheme
1149              .equalsIgnoreCase("wasbs"));
1150  }
1151
1152  /**
1153   * Puts in the authority of the default file system if it is a WASB file
1154   * system and the given URI's authority is null.
1155   * 
1156   * @return The URI with reconstructed authority if necessary and possible.
1157   */
1158  private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) {
1159    if (null == uri.getAuthority()) {
1160      // If WASB is the default file system, get the authority from there
1161      URI defaultUri = FileSystem.getDefaultUri(conf);
1162      if (defaultUri != null && isWasbScheme(defaultUri.getScheme())) {
1163        try {
1164          // Reconstruct the URI with the authority from the default URI.
1165          return new URI(uri.getScheme(), defaultUri.getAuthority(),
1166              uri.getPath(), uri.getQuery(), uri.getFragment());
1167        } catch (URISyntaxException e) {
1168          // This should never happen.
1169          throw new Error("Bad URI construction", e);
1170        }
1171      }
1172    }
1173    return uri;
1174  }
1175
1176  @Override
1177  protected void checkPath(Path path) {
1178    // Make sure to reconstruct the path's authority if needed
1179    super.checkPath(new Path(reconstructAuthorityIfNeeded(path.toUri(),
1180        getConf())));
1181  }
1182
1183  @Override
1184  public void initialize(URI uri, Configuration conf)
1185      throws IOException, IllegalArgumentException {
1186    // Check authority for the URI to guarantee that it is non-null.
1187    uri = reconstructAuthorityIfNeeded(uri, conf);
1188    if (null == uri.getAuthority()) {
1189      final String errMsg = String
1190          .format("Cannot initialize WASB file system, URI authority not recognized.");
1191      throw new IllegalArgumentException(errMsg);
1192    }
1193    super.initialize(uri, conf);
1194
1195    if (store == null) {
1196      store = createDefaultStore(conf);
1197    }
1198
1199    instrumentation = new AzureFileSystemInstrumentation(conf);
1200    if(!conf.getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) {
1201      // Make sure the metrics system is available before interacting with Azure
1202      AzureFileSystemMetricsSystem.fileSystemStarted();
1203      metricsSourceName = newMetricsSourceName();
1204      String sourceDesc = "Azure Storage Volume File System metrics";
1205      AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc,
1206        instrumentation);
1207    }
1208
1209    store.initialize(uri, conf, instrumentation);
1210    setConf(conf);
1211    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
1212    this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser()
1213        .getShortUserName()).makeQualified(getUri(), getWorkingDirectory());
1214    this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
1215        MAX_AZURE_BLOCK_SIZE);
1216
1217    this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
1218    LOG.debug("NativeAzureFileSystem. Initializing.");
1219    LOG.debug("  blockSize  = {}",
1220        conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
1221
1222    // Initialize thread counts from user configuration
1223    deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS);
1224    renameThreadCount = conf.getInt(AZURE_RENAME_THREADS, DEFAULT_AZURE_RENAME_THREADS);
1225  }
1226
1227  private NativeFileSystemStore createDefaultStore(Configuration conf) {
1228    actualStore = new AzureNativeFileSystemStore();
1229
1230    if (suppressRetryPolicy) {
1231      actualStore.suppressRetryPolicy();
1232    }
1233    return actualStore;
1234  }
1235
1236  /**
1237   * Azure Storage doesn't allow the blob names to end in a period,
1238   * so encode this here to work around that limitation.
1239   */
1240  private static String encodeTrailingPeriod(String toEncode) {
1241    Matcher matcher = TRAILING_PERIOD_PATTERN.matcher(toEncode);
1242    return matcher.replaceAll(TRAILING_PERIOD_PLACEHOLDER);
1243  }
1244
1245  /**
1246   * Reverse the encoding done by encodeTrailingPeriod().
1247   */
1248  private static String decodeTrailingPeriod(String toDecode) {
1249    Matcher matcher = TRAILING_PERIOD_PLACEHOLDER_PATTERN.matcher(toDecode);
1250    return matcher.replaceAll(".");
1251  }
1252
1253  /**
1254   * Convert the path to a key. By convention, any leading or trailing slash is
1255   * removed, except for the special case of a single slash.
1256   */
1257  @VisibleForTesting
1258  public String pathToKey(Path path) {
1259    // Convert the path to a URI to parse the scheme, the authority, and the
1260    // path from the path object.
1261    URI tmpUri = path.toUri();
1262    String pathUri = tmpUri.getPath();
1263
1264    // The scheme and authority is valid. If the path does not exist add a "/"
1265    // separator to list the root of the container.
1266    Path newPath = path;
1267    if ("".equals(pathUri)) {
1268      newPath = new Path(tmpUri.toString() + Path.SEPARATOR);
1269    }
1270
1271    // Verify path is absolute if the path refers to a windows drive scheme.
1272    if (!newPath.isAbsolute()) {
1273      throw new IllegalArgumentException("Path must be absolute: " + path);
1274    }
1275
1276    String key = null;
1277    key = newPath.toUri().getPath();
1278    key = removeTrailingSlash(key);
1279    key = encodeTrailingPeriod(key);
1280    if (key.length() == 1) {
1281      return key;
1282    } else {
1283      return key.substring(1); // remove initial slash
1284    }
1285  }
1286
1287  // Remove any trailing slash except for the case of a single slash.
1288  private static String removeTrailingSlash(String key) {
1289    if (key.length() == 0 || key.length() == 1) {
1290      return key;
1291    }
1292    if (key.charAt(key.length() - 1) == '/') {
1293      return key.substring(0, key.length() - 1);
1294    } else {
1295      return key;
1296    }
1297  }
1298
1299  private static Path keyToPath(String key) {
1300    if (key.equals("/")) {
1301      return new Path("/"); // container
1302    }
1303    return new Path("/" + decodeTrailingPeriod(key));
1304  }
1305
1306  /**
1307   * Get the absolute version of the path (fully qualified).
1308   * This is public for testing purposes.
1309   *
1310   * @param path
1311   * @return fully qualified path
1312   */
1313  @VisibleForTesting
1314  public Path makeAbsolute(Path path) {
1315    if (path.isAbsolute()) {
1316      return path;
1317    }
1318    return new Path(workingDir, path);
1319  }
1320
1321  /**
1322   * For unit test purposes, retrieves the AzureNativeFileSystemStore store
1323   * backing this file system.
1324   * 
1325   * @return The store object.
1326   */
1327  @VisibleForTesting
1328  public AzureNativeFileSystemStore getStore() {
1329    return actualStore;
1330  }
1331  
1332  NativeFileSystemStore getStoreInterface() {
1333    return store;
1334  }
1335
1336  /**
1337   * Gets the metrics source for this file system.
1338   * This is mainly here for unit testing purposes.
1339   *
1340   * @return the metrics source.
1341   */
1342  public AzureFileSystemInstrumentation getInstrumentation() {
1343    return instrumentation;
1344  }
1345
1346  /** This optional operation is not yet supported. */
1347  @Override
1348  public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
1349      throws IOException {
1350
1351    if (!appendSupportEnabled) {
1352      throw new UnsupportedOperationException("Append Support not enabled");
1353    }
1354
1355    LOG.debug("Opening file: {} for append", f);
1356
1357    Path absolutePath = makeAbsolute(f);
1358    String key = pathToKey(absolutePath);
1359    FileMetadata meta = null;
1360    try {
1361      meta = store.retrieveMetadata(key);
1362    } catch(Exception ex) {
1363
1364      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
1365
1366      if (innerException instanceof StorageException
1367          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1368
1369        throw new FileNotFoundException(String.format("%s is not found", key));
1370      } else {
1371        throw ex;
1372      }
1373    }
1374
1375    if (meta == null) {
1376      throw new FileNotFoundException(f.toString());
1377    }
1378
1379    if (meta.isDir()) {
1380      throw new FileNotFoundException(f.toString()
1381          + " is a directory not a file.");
1382    }
1383
1384    if (store.isPageBlobKey(key)) {
1385      throw new IOException("Append not supported for Page Blobs");
1386    }
1387
1388    DataOutputStream appendStream = null;
1389
1390    try {
1391      appendStream = store.retrieveAppendStream(key, bufferSize);
1392    } catch (Exception ex) {
1393
1394      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
1395
1396      if (innerException instanceof StorageException
1397          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1398        throw new FileNotFoundException(String.format("%s is not found", key));
1399      } else {
1400        throw ex;
1401      }
1402    }
1403
1404    return new FSDataOutputStream(appendStream, statistics);
1405  }
1406
1407  @Override
1408  public FSDataOutputStream create(Path f, FsPermission permission,
1409      boolean overwrite, int bufferSize, short replication, long blockSize,
1410      Progressable progress) throws IOException {
1411    return create(f, permission, overwrite, true,
1412        bufferSize, replication, blockSize, progress,
1413        (SelfRenewingLease) null);
1414  }
1415
1416  /**
1417   * Get a self-renewing lease on the specified file.
1418   */
1419  public SelfRenewingLease acquireLease(Path path) throws AzureException {
1420    String fullKey = pathToKey(makeAbsolute(path));
1421    return getStore().acquireLease(fullKey);
1422  }
1423
1424  @Override
1425  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1426      boolean overwrite, int bufferSize, short replication, long blockSize,
1427      Progressable progress) throws IOException {
1428
1429    Path parent = f.getParent();
1430
1431    // Get exclusive access to folder if this is a directory designated
1432    // for atomic rename. The primary use case of for HBase write-ahead
1433    // log file management.
1434    SelfRenewingLease lease = null;
1435    if (store.isAtomicRenameKey(pathToKey(f))) {
1436      try {
1437        lease = acquireLease(parent);
1438      } catch (AzureException e) {
1439
1440        String errorCode = "";
1441        try {
1442          StorageException e2 = (StorageException) e.getCause();
1443          errorCode = e2.getErrorCode();
1444        } catch (Exception e3) {
1445          // do nothing if cast fails
1446        }
1447        if (errorCode.equals("BlobNotFound")) {
1448          throw new FileNotFoundException("Cannot create file " +
1449              f.getName() + " because parent folder does not exist.");
1450        }
1451
1452        LOG.warn("Got unexpected exception trying to get lease on {} . {}",
1453          pathToKey(parent), e.getMessage());
1454        throw e;
1455      }
1456    }
1457
1458    // See if the parent folder exists. If not, throw error.
1459    // The exists() check will push any pending rename operation forward,
1460    // if there is one, and return false.
1461    //
1462    // At this point, we have exclusive access to the source folder
1463    // via the lease, so we will not conflict with an active folder
1464    // rename operation.
1465    if (!exists(parent)) {
1466      try {
1467
1468        // This'll let the keep-alive thread exit as soon as it wakes up.
1469        lease.free();
1470      } catch (Exception e) {
1471        LOG.warn("Unable to free lease because: {}", e.getMessage());
1472      }
1473      throw new FileNotFoundException("Cannot create file " +
1474          f.getName() + " because parent folder does not exist.");
1475    }
1476
1477    // Create file inside folder.
1478    FSDataOutputStream out = null;
1479    try {
1480      out = create(f, permission, overwrite, false,
1481          bufferSize, replication, blockSize, progress, lease);
1482    } finally {
1483      // Release exclusive access to folder.
1484      try {
1485        if (lease != null) {
1486          lease.free();
1487        }
1488      } catch (Exception e) {
1489        NativeAzureFileSystemHelper.cleanup(LOG, out);
1490        String msg = "Unable to free lease on " + parent.toUri();
1491        LOG.error(msg);
1492        throw new IOException(msg, e);
1493      }
1494    }
1495    return out;
1496  }
1497
1498  @Override
1499  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1500      EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1501      Progressable progress) throws IOException {
1502
1503    // Check if file should be appended or overwritten. Assume that the file
1504    // is overwritten on if the CREATE and OVERWRITE create flags are set. Note
1505    // that any other combinations of create flags will result in an open new or
1506    // open with append.
1507    final EnumSet<CreateFlag> createflags =
1508        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
1509    boolean overwrite = flags.containsAll(createflags);
1510
1511    // Delegate the create non-recursive call.
1512    return this.createNonRecursive(f, permission, overwrite,
1513        bufferSize, replication, blockSize, progress);
1514  }
1515
1516  @Override
1517  public FSDataOutputStream createNonRecursive(Path f,
1518      boolean overwrite, int bufferSize, short replication, long blockSize,
1519      Progressable progress) throws IOException {
1520    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1521        overwrite, bufferSize, replication, blockSize, progress);
1522  }
1523
1524
1525  /**
1526   * Create an Azure blob and return an output stream to use
1527   * to write data to it.
1528   *
1529   * @param f
1530   * @param permission
1531   * @param overwrite
1532   * @param createParent
1533   * @param bufferSize
1534   * @param replication
1535   * @param blockSize
1536   * @param progress
1537   * @param parentFolderLease Lease on parent folder (or null if
1538   * no lease).
1539   * @return
1540   * @throws IOException
1541   */
1542  private FSDataOutputStream create(Path f, FsPermission permission,
1543      boolean overwrite, boolean createParent, int bufferSize,
1544      short replication, long blockSize, Progressable progress,
1545      SelfRenewingLease parentFolderLease)
1546          throws FileAlreadyExistsException, IOException {
1547
1548    LOG.debug("Creating file: {}", f.toString());
1549
1550    if (containsColon(f)) {
1551      throw new IOException("Cannot create file " + f
1552          + " through WASB that has colons in the name");
1553    }
1554
1555    Path absolutePath = makeAbsolute(f);
1556    String key = pathToKey(absolutePath);
1557
1558    FileMetadata existingMetadata = store.retrieveMetadata(key);
1559    if (existingMetadata != null) {
1560      if (existingMetadata.isDir()) {
1561        throw new FileAlreadyExistsException("Cannot create file " + f
1562            + "; already exists as a directory.");
1563      }
1564      if (!overwrite) {
1565        throw new FileAlreadyExistsException("File already exists:" + f);
1566      }
1567    }
1568
1569    Path parentFolder = absolutePath.getParent();
1570    if (parentFolder != null && parentFolder.getParent() != null) { // skip root
1571      // Update the parent folder last modified time if the parent folder
1572      // already exists.
1573      String parentKey = pathToKey(parentFolder);
1574      FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
1575      if (parentMetadata != null && parentMetadata.isDir() &&
1576        parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
1577        if (parentFolderLease != null) {
1578          store.updateFolderLastModifiedTime(parentKey, parentFolderLease);
1579        } else {
1580          updateParentFolderLastModifiedTime(key);
1581        }
1582      } else {
1583        // Make sure that the parent folder exists.
1584        // Create it using inherited permissions from the first existing directory going up the path
1585        Path firstExisting = parentFolder.getParent();
1586        FileMetadata metadata = store.retrieveMetadata(pathToKey(firstExisting));
1587        while(metadata == null) {
1588          // Guaranteed to terminate properly because we will eventually hit root, which will return non-null metadata
1589          firstExisting = firstExisting.getParent();
1590          metadata = store.retrieveMetadata(pathToKey(firstExisting));
1591        }
1592        mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true);
1593      }
1594    }
1595
1596    // Mask the permission first (with the default permission mask as well).
1597    FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile);
1598    PermissionStatus permissionStatus = createPermissionStatus(masked);
1599
1600    OutputStream bufOutStream;
1601    if (store.isPageBlobKey(key)) {
1602      // Store page blobs directly in-place without renames.
1603      bufOutStream = store.storefile(key, permissionStatus);
1604    } else {
1605      // This is a block blob, so open the output blob stream based on the
1606      // encoded key.
1607      //
1608      String keyEncoded = encodeKey(key);
1609
1610
1611      // First create a blob at the real key, pointing back to the temporary file
1612      // This accomplishes a few things:
1613      // 1. Makes sure we can create a file there.
1614      // 2. Makes it visible to other concurrent threads/processes/nodes what
1615      // we're
1616      // doing.
1617      // 3. Makes it easier to restore/cleanup data in the event of us crashing.
1618      store.storeEmptyLinkFile(key, keyEncoded, permissionStatus);
1619
1620      // The key is encoded to point to a common container at the storage server.
1621      // This reduces the number of splits on the server side when load balancing.
1622      // Ingress to Azure storage can take advantage of earlier splits. We remove
1623      // the root path to the key and prefix a random GUID to the tail (or leaf
1624      // filename) of the key. Keys are thus broadly and randomly distributed over
1625      // a single container to ease load balancing on the storage server. When the
1626      // blob is committed it is renamed to its earlier key. Uncommitted blocks
1627      // are not cleaned up and we leave it to Azure storage to garbage collect
1628      // these
1629      // blocks.
1630      bufOutStream = new NativeAzureFsOutputStream(store.storefile(
1631          keyEncoded, permissionStatus), key, keyEncoded);
1632    }
1633    // Construct the data output stream from the buffered output stream.
1634    FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
1635
1636    
1637    // Increment the counter
1638    instrumentation.fileCreated();
1639    
1640    // Return data output stream to caller.
1641    return fsOut;
1642  }
1643
1644  @Override
1645  @Deprecated
1646  public boolean delete(Path path) throws IOException {
1647    return delete(path, true);
1648  }
1649
1650  @Override
1651  public boolean delete(Path f, boolean recursive) throws IOException {
1652    return delete(f, recursive, false);
1653  }
1654
1655  /**
1656   * Delete the specified file or folder. The parameter
1657   * skipParentFolderLastModifidedTimeUpdate
1658   * is used in the case of atomic folder rename redo. In that case, there is
1659   * a lease on the parent folder, so (without reworking the code) modifying
1660   * the parent folder update time will fail because of a conflict with the
1661   * lease. Since we are going to delete the folder soon anyway so accurate
1662   * modified time is not necessary, it's easier to just skip
1663   * the modified time update.
1664   *
1665   * @param f
1666   * @param recursive
1667   * @param skipParentFolderLastModifidedTimeUpdate If true, don't update the folder last
1668   * modified time.
1669   * @return true if and only if the file is deleted
1670   * @throws IOException
1671   */
1672  public boolean delete(Path f, boolean recursive,
1673      boolean skipParentFolderLastModifidedTimeUpdate) throws IOException {
1674
1675    LOG.debug("Deleting file: {}", f.toString());
1676
1677    Path absolutePath = makeAbsolute(f);
1678    String key = pathToKey(absolutePath);
1679
1680    // Capture the metadata for the path.
1681    //
1682    FileMetadata metaFile = null;
1683    try {
1684      metaFile = store.retrieveMetadata(key);
1685    } catch (IOException e) {
1686
1687      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1688
1689      if (innerException instanceof StorageException
1690          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1691
1692        return false;
1693      }
1694      throw e;
1695    }
1696
1697    if (null == metaFile) {
1698      // The path to be deleted does not exist.
1699      return false;
1700    }
1701
1702    // The path exists, determine if it is a folder containing objects,
1703    // an empty folder, or a simple file and take the appropriate actions.
1704    if (!metaFile.isDir()) {
1705      // The path specifies a file. We need to check the parent path
1706      // to make sure it's a proper materialized directory before we
1707      // delete the file. Otherwise we may get into a situation where
1708      // the file we were deleting was the last one in an implicit directory
1709      // (e.g. the blob store only contains the blob a/b and there's no
1710      // corresponding directory blob a) and that would implicitly delete
1711      // the directory as well, which is not correct.
1712      Path parentPath = absolutePath.getParent();
1713      if (parentPath.getParent() != null) {// Not root
1714        String parentKey = pathToKey(parentPath);
1715
1716        FileMetadata parentMetadata = null;
1717        try {
1718          parentMetadata = store.retrieveMetadata(parentKey);
1719        } catch (IOException e) {
1720
1721          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1722
1723          if (innerException instanceof StorageException) {
1724            // Invalid State.
1725            // A FileNotFoundException is not thrown here as the API returns false
1726            // if the file not present. But not retrieving metadata here is an
1727            // unrecoverable state and can only happen if there is a race condition
1728            // hence throwing a IOException
1729            if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1730              throw new IOException("File " + f + " has a parent directory "
1731                  + parentPath + " whose metadata cannot be retrieved. Can't resolve");
1732            }
1733          }
1734          throw e;
1735        }
1736
1737        // Invalid State.
1738        // A FileNotFoundException is not thrown here as the API returns false
1739        // if the file not present. But not retrieving metadata here is an
1740        // unrecoverable state and can only happen if there is a race condition
1741        // hence throwing a IOException
1742        if (parentMetadata == null) {
1743          throw new IOException("File " + f + " has a parent directory "
1744              + parentPath + " whose metadata cannot be retrieved. Can't resolve");
1745        }
1746
1747        if (!parentMetadata.isDir()) {
1748          // Invalid state: the parent path is actually a file. Throw.
1749          throw new AzureException("File " + f + " has a parent directory "
1750              + parentPath + " which is also a file. Can't resolve.");
1751        }
1752
1753        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
1754          LOG.debug("Found an implicit parent directory while trying to"
1755              + " delete the file {}. Creating the directory blob for"
1756              + " it in {}.", f, parentKey);
1757
1758          store.storeEmptyFolder(parentKey,
1759              createPermissionStatus(FsPermission.getDefault()));
1760        } else {
1761          if (!skipParentFolderLastModifidedTimeUpdate) {
1762            updateParentFolderLastModifiedTime(key);
1763          }
1764        }
1765      }
1766
1767      try {
1768        store.delete(key);
1769        instrumentation.fileDeleted();
1770      } catch(IOException e) {
1771
1772        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1773
1774        if (innerException instanceof StorageException
1775            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1776          return false;
1777        }
1778
1779       throw e;
1780      }
1781    } else {
1782      // The path specifies a folder. Recursively delete all entries under the
1783      // folder.
1784      LOG.debug("Directory Delete encountered: {}", f.toString());
1785      Path parentPath = absolutePath.getParent();
1786      if (parentPath.getParent() != null) {
1787        String parentKey = pathToKey(parentPath);
1788        FileMetadata parentMetadata = null;
1789
1790        try {
1791          parentMetadata = store.retrieveMetadata(parentKey);
1792        } catch (IOException e) {
1793
1794          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1795
1796          if (innerException instanceof StorageException) {
1797            // Invalid State.
1798            // A FileNotFoundException is not thrown here as the API returns false
1799            // if the file not present. But not retrieving metadata here is an
1800            // unrecoverable state and can only happen if there is a race condition
1801            // hence throwing a IOException
1802            if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1803              throw new IOException("File " + f + " has a parent directory "
1804                  + parentPath + " whose metadata cannot be retrieved. Can't resolve");
1805            }
1806          }
1807          throw e;
1808        }
1809
1810        // Invalid State.
1811        // A FileNotFoundException is not thrown here as the API returns false
1812        // if the file not present. But not retrieving metadata here is an
1813        // unrecoverable state and can only happen if there is a race condition
1814        // hence throwing a IOException
1815        if (parentMetadata == null) {
1816          throw new IOException("File " + f + " has a parent directory "
1817              + parentPath + " whose metadata cannot be retrieved. Can't resolve");
1818        }
1819
1820        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
1821          LOG.debug("Found an implicit parent directory while trying to"
1822              + " delete the directory {}. Creating the directory blob for"
1823              + " it in {}. ", f, parentKey);
1824
1825          store.storeEmptyFolder(parentKey,
1826              createPermissionStatus(FsPermission.getDefault()));
1827        }
1828      }
1829
1830      // List all the blobs in the current folder.
1831      String priorLastKey = null;
1832
1833      // Start time for list operation
1834      long start = Time.monotonicNow();
1835      ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
1836
1837      // List all the files in the folder with AZURE_UNBOUNDED_DEPTH depth.
1838      do {
1839        try {
1840          PartialListing listing = store.listAll(key, AZURE_LIST_ALL,
1841            AZURE_UNBOUNDED_DEPTH, priorLastKey);
1842          for(FileMetadata file : listing.getFiles()) {
1843            fileMetadataList.add(file);
1844          }
1845          priorLastKey = listing.getPriorLastKey();
1846        } catch (IOException e) {
1847          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1848
1849          if (innerException instanceof StorageException
1850              && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1851            return false;
1852          }
1853
1854          throw e;
1855        }
1856      } while (priorLastKey != null);
1857
1858      long end = Time.monotonicNow();
1859      LOG.debug("Time taken to list {} blobs for delete operation: {} ms", fileMetadataList.size(), (end - start));
1860
1861      final FileMetadata[] contents = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
1862
1863      if (!recursive && contents.length > 0) {
1864          // The folder is non-empty and recursive delete was not specified.
1865          // Throw an exception indicating that a non-recursive delete was
1866          // specified for a non-empty folder.
1867          throw new IOException("Non-recursive delete of non-empty directory "
1868              + f.toString());
1869      }
1870
1871      // Delete all files / folders in current directory stored as list in 'contents'.
1872      AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
1873        @Override
1874        public boolean execute(FileMetadata file) throws IOException{
1875          return deleteFile(file.getKey(), file.isDir());
1876        }
1877      };
1878
1879      AzureFileSystemThreadPoolExecutor executor = getThreadPoolExecutor(this.deleteThreadCount,
1880          "AzureBlobDeleteThread", "Delete", key, AZURE_DELETE_THREADS);
1881
1882      if (!executor.executeParallel(contents, task)) {
1883        LOG.error("Failed to delete files / subfolders in blob {}", key);
1884        return false;
1885      }
1886
1887      // Delete the current directory
1888      if (!deleteFile(metaFile.getKey(), metaFile.isDir())) {
1889        LOG.error("Failed delete directory {}", f.toString());
1890        return false;
1891      }
1892
1893      // Update parent directory last modified time
1894      Path parent = absolutePath.getParent();
1895      if (parent != null && parent.getParent() != null) { // not root
1896        if (!skipParentFolderLastModifidedTimeUpdate) {
1897          updateParentFolderLastModifiedTime(key);
1898        }
1899      }
1900    }
1901
1902    // File or directory was successfully deleted.
1903    LOG.debug("Delete Successful for : {}", f.toString());
1904    return true;
1905  }
1906
1907  public AzureFileSystemThreadPoolExecutor getThreadPoolExecutor(int threadCount,
1908      String threadNamePrefix, String operation, String key, String config) {
1909    return new AzureFileSystemThreadPoolExecutor(threadCount, threadNamePrefix, operation, key, config);
1910  }
1911
1912  // Delete single file / directory from key.
1913  @VisibleForTesting
1914  boolean deleteFile(String key, boolean isDir) throws IOException {
1915    try {
1916      store.delete(key);
1917      if (isDir) {
1918        instrumentation.directoryDeleted();
1919      } else {
1920        instrumentation.fileDeleted();
1921      }
1922    } catch(IOException e) {
1923      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1924
1925      if (innerException instanceof StorageException
1926          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1927        return false;
1928      }
1929
1930      throw e;
1931    }
1932
1933    return true;
1934  }
1935
1936  @Override
1937  public FileStatus getFileStatus(Path f) throws FileNotFoundException, IOException {
1938
1939    LOG.debug("Getting the file status for {}", f.toString());
1940
1941    // Capture the absolute path and the path to key.
1942    Path absolutePath = makeAbsolute(f);
1943    String key = pathToKey(absolutePath);
1944    if (key.length() == 0) { // root always exists
1945      return newDirectory(null, absolutePath);
1946    }
1947
1948    // The path is either a folder or a file. Retrieve metadata to
1949    // determine if it is a directory or file.
1950    FileMetadata meta = null;
1951    try {
1952      meta = store.retrieveMetadata(key);
1953    } catch(Exception ex) {
1954
1955      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
1956
1957      if (innerException instanceof StorageException
1958          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1959
1960          throw new FileNotFoundException(String.format("%s is not found", key));
1961       }
1962
1963      throw ex;
1964    }
1965
1966    if (meta != null) {
1967      if (meta.isDir()) {
1968        // The path is a folder with files in it.
1969        //
1970
1971        LOG.debug("Path {} is a folder.", f.toString());
1972
1973        // If a rename operation for the folder was pending, redo it.
1974        // Then the file does not exist, so signal that.
1975        if (conditionalRedoFolderRename(f)) {
1976          throw new FileNotFoundException(
1977              absolutePath + ": No such file or directory.");
1978        }
1979
1980        // Return reference to the directory object.
1981        return newDirectory(meta, absolutePath);
1982      }
1983
1984      // The path is a file.
1985      LOG.debug("Found the path: {} as a file.", f.toString());
1986
1987      // Return with reference to a file object.
1988      return newFile(meta, absolutePath);
1989    }
1990
1991    // File not found. Throw exception no such file or directory.
1992    //
1993    throw new FileNotFoundException(
1994        absolutePath + ": No such file or directory.");
1995  }
1996
1997  // Return true if there is a rename pending and we redo it, otherwise false.
1998  private boolean conditionalRedoFolderRename(Path f) throws IOException {
1999
2000    // Can't rename /, so return immediately in that case.
2001    if (f.getName().equals("")) {
2002      return false;
2003    }
2004
2005    // Check if there is a -RenamePending.json file for this folder, and if so,
2006    // redo the rename.
2007    Path absoluteRenamePendingFile = renamePendingFilePath(f);
2008    if (exists(absoluteRenamePendingFile)) {
2009      FolderRenamePending pending =
2010          new FolderRenamePending(absoluteRenamePendingFile, this);
2011      pending.redo();
2012      return true;
2013    } else {
2014      return false;
2015    }
2016  }
2017
2018  // Return the path name that would be used for rename of folder with path f.
2019  private Path renamePendingFilePath(Path f) {
2020    Path absPath = makeAbsolute(f);
2021    String key = pathToKey(absPath);
2022    key += "-RenamePending.json";
2023    return keyToPath(key);
2024  }
2025
2026  @Override
2027  public URI getUri() {
2028    return uri;
2029  }
2030
2031  /**
2032   * Retrieve the status of a given path if it is a file, or of all the
2033   * contained files if it is a directory.
2034   */
2035  @Override
2036  public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
2037
2038    LOG.debug("Listing status for {}", f.toString());
2039
2040    Path absolutePath = makeAbsolute(f);
2041    String key = pathToKey(absolutePath);
2042    Set<FileStatus> status = new TreeSet<FileStatus>();
2043    FileMetadata meta = null;
2044    try {
2045      meta = store.retrieveMetadata(key);
2046    } catch (IOException ex) {
2047
2048      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2049
2050      if (innerException instanceof StorageException
2051          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2052
2053        throw new FileNotFoundException(String.format("%s is not found", f));
2054      }
2055
2056      throw ex;
2057    }
2058
2059    if (meta != null) {
2060      if (!meta.isDir()) {
2061
2062        LOG.debug("Found path as a file");
2063
2064        return new FileStatus[] { newFile(meta, absolutePath) };
2065      }
2066
2067      String partialKey = null;
2068      PartialListing listing = null;
2069
2070      try {
2071        listing  = store.list(key, AZURE_LIST_ALL, 1, partialKey);
2072      } catch (IOException ex) {
2073
2074        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2075
2076        if (innerException instanceof StorageException
2077            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2078
2079            throw new FileNotFoundException(String.format("%s is not found", key));
2080        }
2081
2082        throw ex;
2083      }
2084      // NOTE: We don't check for Null condition as the Store API should return
2085      // an empty list if there are not listing.
2086
2087      // For any -RenamePending.json files in the listing,
2088      // push the rename forward.
2089      boolean renamed = conditionalRedoFolderRenames(listing);
2090
2091      // If any renames were redone, get another listing,
2092      // since the current one may have changed due to the redo.
2093      if (renamed) {
2094       listing = null;
2095       try {
2096         listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
2097       } catch (IOException ex) {
2098         Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2099
2100         if (innerException instanceof StorageException
2101             && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2102
2103           throw new FileNotFoundException(String.format("%s is not found", key));
2104         }
2105
2106         throw ex;
2107       }
2108      }
2109
2110      // NOTE: We don't check for Null condition as the Store API should return
2111      // and empty list if there are not listing.
2112
2113      for (FileMetadata fileMetadata : listing.getFiles()) {
2114        Path subpath = keyToPath(fileMetadata.getKey());
2115
2116        // Test whether the metadata represents a file or directory and
2117        // add the appropriate metadata object.
2118        //
2119        // Note: There was a very old bug here where directories were added
2120        // to the status set as files flattening out recursive listings
2121        // using "-lsr" down the file system hierarchy.
2122        if (fileMetadata.isDir()) {
2123          // Make sure we hide the temp upload folder
2124          if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) {
2125            // Don't expose that.
2126            continue;
2127          }
2128          status.add(newDirectory(fileMetadata, subpath));
2129        } else {
2130          status.add(newFile(fileMetadata, subpath));
2131        }
2132      }
2133
2134      LOG.debug("Found path as a directory with {}"
2135          + " files in it.", status.size());
2136
2137    } else {
2138      // There is no metadata found for the path.
2139      LOG.debug("Did not find any metadata for path: {}", key);
2140
2141      throw new FileNotFoundException("File" + f + " does not exist.");
2142    }
2143
2144    return status.toArray(new FileStatus[0]);
2145  }
2146
2147  // Redo any folder renames needed if there are rename pending files in the
2148  // directory listing. Return true if one or more redo operations were done.
2149  private boolean conditionalRedoFolderRenames(PartialListing listing)
2150      throws IllegalArgumentException, IOException {
2151    boolean renamed = false;
2152    for (FileMetadata fileMetadata : listing.getFiles()) {
2153      Path subpath = keyToPath(fileMetadata.getKey());
2154      if (isRenamePendingFile(subpath)) {
2155        FolderRenamePending pending =
2156            new FolderRenamePending(subpath, this);
2157        pending.redo();
2158        renamed = true;
2159      }
2160    }
2161    return renamed;
2162  }
2163
2164  // True if this is a folder rename pending file, else false.
2165  private boolean isRenamePendingFile(Path path) {
2166    return path.toString().endsWith(FolderRenamePending.SUFFIX);
2167  }
2168
2169  private FileStatus newFile(FileMetadata meta, Path path) {
2170    return new FileStatus (
2171        meta.getLength(),
2172        false,
2173        1,
2174        blockSize,
2175        meta.getLastModified(),
2176        0,
2177        meta.getPermissionStatus().getPermission(),
2178        meta.getPermissionStatus().getUserName(),
2179        meta.getPermissionStatus().getGroupName(),
2180        path.makeQualified(getUri(), getWorkingDirectory()));
2181  }
2182
2183  private FileStatus newDirectory(FileMetadata meta, Path path) {
2184    return new FileStatus (
2185        0,
2186        true,
2187        1,
2188        blockSize,
2189        meta == null ? 0 : meta.getLastModified(),
2190        0,
2191        meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(),
2192        meta == null ? "" : meta.getPermissionStatus().getUserName(),
2193        meta == null ? "" : meta.getPermissionStatus().getGroupName(),
2194        path.makeQualified(getUri(), getWorkingDirectory()));
2195  }
2196
2197  private static enum UMaskApplyMode {
2198    NewFile,
2199    NewDirectory,
2200    NewDirectoryNoUmask,
2201    ChangeExistingFile,
2202    ChangeExistingDirectory,
2203  }
2204
2205  /**
2206   * Applies the applicable UMASK's on the given permission.
2207   * 
2208   * @param permission
2209   *          The permission to mask.
2210   * @param applyMode
2211   *          Whether to also apply the default umask.
2212   * @return The masked persmission.
2213   */
2214  private FsPermission applyUMask(final FsPermission permission,
2215      final UMaskApplyMode applyMode) {
2216    FsPermission newPermission = new FsPermission(permission);
2217    // Apply the default umask - this applies for new files or directories.
2218    if (applyMode == UMaskApplyMode.NewFile
2219        || applyMode == UMaskApplyMode.NewDirectory) {
2220      newPermission = newPermission
2221          .applyUMask(FsPermission.getUMask(getConf()));
2222    }
2223    return newPermission;
2224  }
2225
2226  /**
2227   * Creates the PermissionStatus object to use for the given permission, based
2228   * on the current user in context.
2229   * 
2230   * @param permission
2231   *          The permission for the file.
2232   * @return The permission status object to use.
2233   * @throws IOException
2234   *           If login fails in getCurrentUser
2235   */
2236  @VisibleForTesting
2237  PermissionStatus createPermissionStatus(FsPermission permission)
2238    throws IOException {
2239    // Create the permission status for this file based on current user
2240    return new PermissionStatus(
2241        UserGroupInformation.getCurrentUser().getShortUserName(),
2242        getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME,
2243            AZURE_DEFAULT_GROUP_DEFAULT),
2244        permission);
2245  }
2246
2247  @Override
2248  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
2249      return mkdirs(f, permission, false);
2250  }
2251
2252  public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException {
2253
2254
2255    LOG.debug("Creating directory: {}", f.toString());
2256
2257    if (containsColon(f)) {
2258      throw new IOException("Cannot create directory " + f
2259          + " through WASB that has colons in the name");
2260    }
2261
2262    Path absolutePath = makeAbsolute(f);
2263    PermissionStatus permissionStatus = null;
2264    if(noUmask) {
2265      // ensure owner still has wx permissions at the minimum
2266      permissionStatus = createPermissionStatus(
2267          applyUMask(FsPermission.createImmutable((short) (permission.toShort() | USER_WX_PERMISION)),
2268              UMaskApplyMode.NewDirectoryNoUmask));
2269    } else {
2270      permissionStatus = createPermissionStatus(
2271          applyUMask(permission, UMaskApplyMode.NewDirectory));
2272    }
2273
2274
2275    ArrayList<String> keysToCreateAsFolder = new ArrayList<String>();
2276    ArrayList<String> keysToUpdateAsFolder = new ArrayList<String>();
2277    boolean childCreated = false;
2278    // Check that there is no file in the parent chain of the given path.
2279    for (Path current = absolutePath, parent = current.getParent();
2280        parent != null; // Stop when you get to the root
2281        current = parent, parent = current.getParent()) {
2282      String currentKey = pathToKey(current);
2283      FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
2284      if (currentMetadata != null && !currentMetadata.isDir()) {
2285        throw new FileAlreadyExistsException("Cannot create directory " + f + " because "
2286            + current + " is an existing file.");
2287      } else if (currentMetadata == null) {
2288        keysToCreateAsFolder.add(currentKey);
2289        childCreated = true;
2290      } else {
2291        // The directory already exists. Its last modified time need to be
2292        // updated if there is a child directory created under it.
2293        if (childCreated) {
2294          keysToUpdateAsFolder.add(currentKey);
2295        }
2296        childCreated = false;
2297      }
2298    }
2299
2300    for (String currentKey : keysToCreateAsFolder) {
2301      store.storeEmptyFolder(currentKey, permissionStatus);
2302    }
2303
2304    instrumentation.directoryCreated();
2305
2306    // otherwise throws exception
2307    return true;
2308  }
2309
2310  @Override
2311  public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException {
2312
2313    LOG.debug("Opening file: {}", f.toString());
2314
2315    Path absolutePath = makeAbsolute(f);
2316    String key = pathToKey(absolutePath);
2317    FileMetadata meta = null;
2318    try {
2319      meta = store.retrieveMetadata(key);
2320    } catch(Exception ex) {
2321
2322      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2323
2324      if (innerException instanceof StorageException
2325          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2326
2327        throw new FileNotFoundException(String.format("%s is not found", key));
2328      }
2329
2330      throw ex;
2331    }
2332
2333    if (meta == null) {
2334      throw new FileNotFoundException(f.toString());
2335    }
2336    if (meta.isDir()) {
2337      throw new FileNotFoundException(f.toString()
2338          + " is a directory not a file.");
2339    }
2340
2341    DataInputStream inputStream = null;
2342    try {
2343      inputStream = store.retrieve(key);
2344    } catch(Exception ex) {
2345      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2346
2347      if (innerException instanceof StorageException
2348          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2349
2350        throw new FileNotFoundException(String.format("%s is not found", key));
2351      }
2352
2353      throw ex;
2354    }
2355
2356    return new FSDataInputStream(new BufferedFSInputStream(
2357        new NativeAzureFsInputStream(inputStream, key, meta.getLength()), bufferSize));
2358  }
2359
2360  @Override
2361  public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException {
2362
2363    FolderRenamePending renamePending = null;
2364
2365    LOG.debug("Moving {} to {}", src, dst);
2366
2367    if (containsColon(dst)) {
2368      throw new IOException("Cannot rename to file " + dst
2369          + " through WASB that has colons in the name");
2370    }
2371
2372    String srcKey = pathToKey(makeAbsolute(src));
2373
2374    if (srcKey.length() == 0) {
2375      // Cannot rename root of file system
2376      return false;
2377    }
2378
2379    // Figure out the final destination
2380    Path absoluteDst = makeAbsolute(dst);
2381    String dstKey = pathToKey(absoluteDst);
2382    FileMetadata dstMetadata = null;
2383    try {
2384      dstMetadata = store.retrieveMetadata(dstKey);
2385    } catch (IOException ex) {
2386
2387      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2388
2389      // A BlobNotFound storage exception in only thrown from retrieveMetdata API when
2390      // there is a race condition. If there is another thread which deletes the destination
2391      // file or folder, then this thread calling rename should be able to continue with
2392      // rename gracefully. Hence the StorageException is swallowed here.
2393      if (innerException instanceof StorageException) {
2394        if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2395          LOG.debug("BlobNotFound exception encountered for Destination key : {}. "
2396              + "Swallowin the exception to handle race condition gracefully", dstKey);
2397        }
2398      } else {
2399        throw ex;
2400      }
2401    }
2402
2403    if (dstMetadata != null && dstMetadata.isDir()) {
2404      // It's an existing directory.
2405      dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
2406      LOG.debug("Destination {} "
2407          + " is a directory, adjusted the destination to be {}", dst, dstKey);
2408    } else if (dstMetadata != null) {
2409      // Attempting to overwrite a file using rename()
2410      LOG.debug("Destination {}"
2411          + " is an already existing file, failing the rename.", dst);
2412      return false;
2413    } else {
2414      // Check that the parent directory exists.
2415      FileMetadata parentOfDestMetadata = null;
2416      try {
2417        parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDst.getParent()));
2418      } catch (IOException ex) {
2419
2420        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2421
2422        if (innerException instanceof StorageException
2423            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2424
2425          LOG.debug("Parent of destination {} doesn't exists. Failing rename", dst);
2426          return false;
2427        }
2428
2429        throw ex;
2430      }
2431
2432      if (parentOfDestMetadata == null) {
2433        LOG.debug("Parent of the destination {}"
2434            + " doesn't exist, failing the rename.", dst);
2435        return false;
2436      } else if (!parentOfDestMetadata.isDir()) {
2437        LOG.debug("Parent of the destination {}"
2438            + " is a file, failing the rename.", dst);
2439        return false;
2440      }
2441    }
2442    FileMetadata srcMetadata = null;
2443    try {
2444      srcMetadata = store.retrieveMetadata(srcKey);
2445    } catch (IOException ex) {
2446      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2447
2448      if (innerException instanceof StorageException
2449          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2450
2451        LOG.debug("Source {} doesn't exists. Failing rename", src);
2452        return false;
2453      }
2454
2455      throw ex;
2456    }
2457
2458    if (srcMetadata == null) {
2459      // Source doesn't exist
2460      LOG.debug("Source {} doesn't exist, failing the rename.", src);
2461      return false;
2462    } else if (!srcMetadata.isDir()) {
2463      LOG.debug("Source {} found as a file, renaming.", src);
2464      try {
2465        store.rename(srcKey, dstKey);
2466      } catch(IOException ex) {
2467
2468        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2469
2470        if (innerException instanceof StorageException
2471            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2472
2473          LOG.debug("BlobNotFoundException encountered. Failing rename", src);
2474          return false;
2475        }
2476
2477        throw ex;
2478      }
2479    } else {
2480
2481      // Prepare for, execute and clean up after of all files in folder, and
2482      // the root file, and update the last modified time of the source and
2483      // target parent folders. The operation can be redone if it fails part
2484      // way through, by applying the "Rename Pending" file.
2485
2486      // The following code (internally) only does atomic rename preparation
2487      // and lease management for page blob folders, limiting the scope of the
2488      // operation to HBase log file folders, where atomic rename is required.
2489      // In the future, we could generalize it easily to all folders.
2490      renamePending = prepareAtomicFolderRename(srcKey, dstKey);
2491      renamePending.execute();
2492
2493      LOG.debug("Renamed {} to {} successfully.", src, dst);
2494      renamePending.cleanup();
2495      return true;
2496    }
2497
2498    // Update the last-modified time of the parent folders of both source
2499    // and destination.
2500    updateParentFolderLastModifiedTime(srcKey);
2501    updateParentFolderLastModifiedTime(dstKey);
2502
2503    LOG.debug("Renamed {} to {} successfully.", src, dst);
2504    return true;
2505  }
2506
2507  /**
2508   * Update the last-modified time of the parent folder of the file
2509   * identified by key.
2510   * @param key
2511   * @throws IOException
2512   */
2513  private void updateParentFolderLastModifiedTime(String key)
2514      throws IOException {
2515    Path parent = makeAbsolute(keyToPath(key)).getParent();
2516    if (parent != null && parent.getParent() != null) { // not root
2517      String parentKey = pathToKey(parent);
2518
2519      // ensure the parent is a materialized folder
2520      FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
2521      // The metadata could be null if the implicit folder only contains a
2522      // single file. In this case, the parent folder no longer exists if the
2523      // file is renamed; so we can safely ignore the null pointer case.
2524      if (parentMetadata != null) {
2525        if (parentMetadata.isDir()
2526            && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
2527          store.storeEmptyFolder(parentKey,
2528              createPermissionStatus(FsPermission.getDefault()));
2529        }
2530
2531        if (store.isAtomicRenameKey(parentKey)) {
2532          SelfRenewingLease lease = null;
2533          try {
2534            lease = leaseSourceFolder(parentKey);
2535            store.updateFolderLastModifiedTime(parentKey, lease);
2536          } catch (AzureException e) {
2537            String errorCode = "";
2538            try {
2539              StorageException e2 = (StorageException) e.getCause();
2540              errorCode = e2.getErrorCode();
2541            } catch (Exception e3) {
2542              // do nothing if cast fails
2543            }
2544            if (errorCode.equals("BlobNotFound")) {
2545              throw new FileNotFoundException("Folder does not exist: " + parentKey);
2546            }
2547            LOG.warn("Got unexpected exception trying to get lease on {}. {}",
2548                parentKey, e.getMessage());
2549            throw e;
2550          } finally {
2551            try {
2552              if (lease != null) {
2553                lease.free();
2554              }
2555            } catch (Exception e) {
2556              LOG.error("Unable to free lease on {}", parentKey, e);
2557            }
2558          }
2559        } else {
2560          store.updateFolderLastModifiedTime(parentKey, null);
2561        }
2562      }
2563    }
2564  }
2565
2566  /**
2567   * If the source is a page blob folder,
2568   * prepare to rename this folder atomically. This means to get exclusive
2569   * access to the source folder, and record the actions to be performed for
2570   * this rename in a "Rename Pending" file. This code was designed to
2571   * meet the needs of HBase, which requires atomic rename of write-ahead log
2572   * (WAL) folders for correctness.
2573   *
2574   * Before calling this method, the caller must ensure that the source is a
2575   * folder.
2576   *
2577   * For non-page-blob directories, prepare the in-memory information needed,
2578   * but don't take the lease or write the redo file. This is done to limit the
2579   * scope of atomic folder rename to HBase, at least at the time of writing
2580   * this code.
2581   *
2582   * @param srcKey Source folder name.
2583   * @param dstKey Destination folder name.
2584   * @throws IOException
2585   */
2586  @VisibleForTesting
2587  FolderRenamePending prepareAtomicFolderRename(
2588      String srcKey, String dstKey) throws IOException {
2589
2590    if (store.isAtomicRenameKey(srcKey)) {
2591
2592      // Block unwanted concurrent access to source folder.
2593      SelfRenewingLease lease = leaseSourceFolder(srcKey);
2594
2595      // Prepare in-memory information needed to do or redo a folder rename.
2596      FolderRenamePending renamePending =
2597          new FolderRenamePending(srcKey, dstKey, lease, this);
2598
2599      // Save it to persistent storage to help recover if the operation fails.
2600      renamePending.writeFile(this);
2601      return renamePending;
2602    } else {
2603      FolderRenamePending renamePending =
2604          new FolderRenamePending(srcKey, dstKey, null, this);
2605      return renamePending;
2606    }
2607  }
2608
2609  /**
2610   * Get a self-renewing Azure blob lease on the source folder zero-byte file.
2611   */
2612  private SelfRenewingLease leaseSourceFolder(String srcKey)
2613      throws AzureException {
2614    return store.acquireLease(srcKey);
2615  }
2616
2617  /**
2618   * Return an array containing hostnames, offset and size of
2619   * portions of the given file. For WASB we'll just lie and give
2620   * fake hosts to make sure we get many splits in MR jobs.
2621   */
2622  @Override
2623  public BlockLocation[] getFileBlockLocations(FileStatus file,
2624      long start, long len) throws IOException {
2625    if (file == null) {
2626      return null;
2627    }
2628
2629    if ((start < 0) || (len < 0)) {
2630      throw new IllegalArgumentException("Invalid start or len parameter");
2631    }
2632
2633    if (file.getLen() < start) {
2634      return new BlockLocation[0];
2635    }
2636    final String blobLocationHost = getConf().get(
2637        AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
2638        AZURE_BLOCK_LOCATION_HOST_DEFAULT);
2639    final String[] name = { blobLocationHost };
2640    final String[] host = { blobLocationHost };
2641    long blockSize = file.getBlockSize();
2642    if (blockSize <= 0) {
2643      throw new IllegalArgumentException(
2644          "The block size for the given file is not a positive number: "
2645              + blockSize);
2646    }
2647    int numberOfLocations = (int) (len / blockSize)
2648        + ((len % blockSize == 0) ? 0 : 1);
2649    BlockLocation[] locations = new BlockLocation[numberOfLocations];
2650    for (int i = 0; i < locations.length; i++) {
2651      long currentOffset = start + (i * blockSize);
2652      long currentLength = Math.min(blockSize, start + len - currentOffset);
2653      locations[i] = new BlockLocation(name, host, currentOffset, currentLength);
2654    }
2655    return locations;
2656  }
2657
2658  /**
2659   * Set the working directory to the given directory.
2660   */
2661  @Override
2662  public void setWorkingDirectory(Path newDir) {
2663    workingDir = makeAbsolute(newDir);
2664  }
2665
2666  @Override
2667  public Path getWorkingDirectory() {
2668    return workingDir;
2669  }
2670
2671  @Override
2672  public void setPermission(Path p, FsPermission permission) throws FileNotFoundException, IOException {
2673    Path absolutePath = makeAbsolute(p);
2674    String key = pathToKey(absolutePath);
2675    FileMetadata metadata = null;
2676    try {
2677      metadata = store.retrieveMetadata(key);
2678    } catch (IOException ex) {
2679      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2680
2681      if (innerException instanceof StorageException
2682          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2683
2684        throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
2685      }
2686
2687      throw ex;
2688    }
2689
2690    if (metadata == null) {
2691      throw new FileNotFoundException("File doesn't exist: " + p);
2692    }
2693    permission = applyUMask(permission,
2694        metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory
2695            : UMaskApplyMode.ChangeExistingFile);
2696    if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
2697      // It's an implicit folder, need to materialize it.
2698      store.storeEmptyFolder(key, createPermissionStatus(permission));
2699    } else if (!metadata.getPermissionStatus().getPermission().
2700        equals(permission)) {
2701      store.changePermissionStatus(key, new PermissionStatus(
2702          metadata.getPermissionStatus().getUserName(),
2703          metadata.getPermissionStatus().getGroupName(),
2704          permission));
2705    }
2706  }
2707
2708  @Override
2709  public void setOwner(Path p, String username, String groupname)
2710      throws IOException {
2711    Path absolutePath = makeAbsolute(p);
2712    String key = pathToKey(absolutePath);
2713    FileMetadata metadata = null;
2714
2715    try {
2716      metadata = store.retrieveMetadata(key);
2717    } catch (IOException ex) {
2718      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2719
2720      if (innerException instanceof StorageException
2721          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2722
2723        throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
2724      }
2725
2726      throw ex;
2727    }
2728
2729    if (metadata == null) {
2730      throw new FileNotFoundException("File doesn't exist: " + p);
2731    }
2732
2733    PermissionStatus newPermissionStatus = new PermissionStatus(
2734        username == null ?
2735            metadata.getPermissionStatus().getUserName() : username,
2736        groupname == null ?
2737            metadata.getPermissionStatus().getGroupName() : groupname,
2738        metadata.getPermissionStatus().getPermission());
2739    if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
2740      // It's an implicit folder, need to materialize it.
2741      store.storeEmptyFolder(key, newPermissionStatus);
2742    } else {
2743      store.changePermissionStatus(key, newPermissionStatus);
2744    }
2745  }
2746
2747  @Override
2748  public synchronized void close() throws IOException {
2749    if (isClosed) {
2750      return;
2751    }
2752
2753    // Call the base close() to close any resources there.
2754    super.close();
2755    // Close the store to close any resources there - e.g. the bandwidth
2756    // updater thread would be stopped at this time.
2757    store.close();
2758    // Notify the metrics system that this file system is closed, which may
2759    // trigger one final metrics push to get the accurate final file system
2760    // metrics out.
2761
2762    long startTime = System.currentTimeMillis();
2763
2764    if(!getConf().getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) {
2765      AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName);
2766      AzureFileSystemMetricsSystem.fileSystemClosed();
2767    }
2768
2769    LOG.debug("Submitting metrics when file system closed took {} ms.",
2770        (System.currentTimeMillis() - startTime));
2771    isClosed = true;
2772  }
2773
2774  /**
2775   * A handler that defines what to do with blobs whose upload was
2776   * interrupted.
2777   */
2778  private abstract class DanglingFileHandler {
2779    abstract void handleFile(FileMetadata file, FileMetadata tempFile)
2780      throws IOException;
2781  }
2782
2783  /**
2784   * Handler implementation for just deleting dangling files and cleaning
2785   * them up.
2786   */
2787  private class DanglingFileDeleter extends DanglingFileHandler {
2788    @Override
2789    void handleFile(FileMetadata file, FileMetadata tempFile)
2790        throws IOException {
2791
2792      LOG.debug("Deleting dangling file {}", file.getKey());
2793      store.delete(file.getKey());
2794      store.delete(tempFile.getKey());
2795    }
2796  }
2797
2798  /**
2799   * Handler implementation for just moving dangling files to recovery
2800   * location (/lost+found).
2801   */
2802  private class DanglingFileRecoverer extends DanglingFileHandler {
2803    private final Path destination;
2804
2805    DanglingFileRecoverer(Path destination) {
2806      this.destination = destination;
2807    }
2808
2809    @Override
2810    void handleFile(FileMetadata file, FileMetadata tempFile)
2811        throws IOException {
2812
2813      LOG.debug("Recovering {}", file.getKey());
2814      // Move to the final destination
2815      String finalDestinationKey =
2816          pathToKey(new Path(destination, file.getKey()));
2817      store.rename(tempFile.getKey(), finalDestinationKey);
2818      if (!finalDestinationKey.equals(file.getKey())) {
2819        // Delete the empty link file now that we've restored it.
2820        store.delete(file.getKey());
2821      }
2822    }
2823  }
2824
2825  /**
2826   * Check if a path has colons in its name
2827   */
2828  private boolean containsColon(Path p) {
2829    return p.toUri().getPath().toString().contains(":");
2830  }
2831
2832  /**
2833   * Implements recover and delete (-move and -delete) behaviors for handling
2834   * dangling files (blobs whose upload was interrupted).
2835   * 
2836   * @param root
2837   *          The root path to check from.
2838   * @param handler
2839   *          The handler that deals with dangling files.
2840   */
2841  private void handleFilesWithDanglingTempData(Path root,
2842      DanglingFileHandler handler) throws IOException {
2843    // Calculate the cut-off for when to consider a blob to be dangling.
2844    long cutoffForDangling = new Date().getTime()
2845        - getConf().getInt(AZURE_TEMP_EXPIRY_PROPERTY_NAME,
2846            AZURE_TEMP_EXPIRY_DEFAULT) * 1000;
2847    // Go over all the blobs under the given root and look for blobs to
2848    // recover.
2849    String priorLastKey = null;
2850    do {
2851      PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL,
2852          AZURE_UNBOUNDED_DEPTH, priorLastKey);
2853
2854      for (FileMetadata file : listing.getFiles()) {
2855        if (!file.isDir()) { // We don't recover directory blobs
2856          // See if this blob has a link in it (meaning it's a place-holder
2857          // blob for when the upload to the temp blob is complete).
2858          String link = store.getLinkInFileMetadata(file.getKey());
2859          if (link != null) {
2860            // It has a link, see if the temp blob it is pointing to is
2861            // existent and old enough to be considered dangling.
2862            FileMetadata linkMetadata = store.retrieveMetadata(link);
2863            if (linkMetadata != null
2864                && linkMetadata.getLastModified() >= cutoffForDangling) {
2865              // Found one!
2866              handler.handleFile(file, linkMetadata);
2867            }
2868          }
2869        }
2870      }
2871      priorLastKey = listing.getPriorLastKey();
2872    } while (priorLastKey != null);
2873  }
2874
2875  /**
2876   * Looks under the given root path for any blob that are left "dangling",
2877   * meaning that they are place-holder blobs that we created while we upload
2878   * the data to a temporary blob, but for some reason we crashed in the middle
2879   * of the upload and left them there. If any are found, we move them to the
2880   * destination given.
2881   * 
2882   * @param root
2883   *          The root path to consider.
2884   * @param destination
2885   *          The destination path to move any recovered files to.
2886   * @throws IOException
2887   */
2888  public void recoverFilesWithDanglingTempData(Path root, Path destination)
2889      throws IOException {
2890
2891    LOG.debug("Recovering files with dangling temp data in {}", root);
2892    handleFilesWithDanglingTempData(root,
2893        new DanglingFileRecoverer(destination));
2894  }
2895
2896  /**
2897   * Looks under the given root path for any blob that are left "dangling",
2898   * meaning that they are place-holder blobs that we created while we upload
2899   * the data to a temporary blob, but for some reason we crashed in the middle
2900   * of the upload and left them there. If any are found, we delete them.
2901   * 
2902   * @param root
2903   *          The root path to consider.
2904   * @throws IOException
2905   */
2906  public void deleteFilesWithDanglingTempData(Path root) throws IOException {
2907
2908    LOG.debug("Deleting files with dangling temp data in {}", root);
2909    handleFilesWithDanglingTempData(root, new DanglingFileDeleter());
2910  }
2911
2912  @Override
2913  protected void finalize() throws Throwable {
2914    LOG.debug("finalize() called.");
2915    close();
2916    super.finalize();
2917  }
2918
2919  /**
2920   * Encode the key with a random prefix for load balancing in Azure storage.
2921   * Upload data to a random temporary file then do storage side renaming to
2922   * recover the original key.
2923   * 
2924   * @param aKey
2925   * @return Encoded version of the original key.
2926   */
2927  private static String encodeKey(String aKey) {
2928    // Get the tail end of the key name.
2929    //
2930    String fileName = aKey.substring(aKey.lastIndexOf(Path.SEPARATOR) + 1,
2931        aKey.length());
2932
2933    // Construct the randomized prefix of the file name. The prefix ensures the
2934    // file always drops into the same folder but with a varying tail key name.
2935    String filePrefix = AZURE_TEMP_FOLDER + Path.SEPARATOR
2936        + UUID.randomUUID().toString();
2937
2938    // Concatenate the randomized prefix with the tail of the key name.
2939    String randomizedKey = filePrefix + fileName;
2940
2941    // Return to the caller with the randomized key.
2942    return randomizedKey;
2943  }
2944}