001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs;
020
021import java.io.EOFException;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InputStream;
025import java.nio.channels.ClosedChannelException;
026import java.util.Arrays;
027import java.util.List;
028
029import com.google.common.base.Preconditions;
030import org.apache.hadoop.classification.InterfaceAudience;
031import org.apache.hadoop.classification.InterfaceStability;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.permission.AclEntry;
034import org.apache.hadoop.fs.permission.FsPermission;
035import org.apache.hadoop.util.DataChecksum;
036import org.apache.hadoop.util.Progressable;
037
038/****************************************************************
039 * Abstract Checksumed FileSystem.
040 * It provide a basic implementation of a Checksumed FileSystem,
041 * which creates a checksum file for each raw file.
042 * It generates & verifies checksums at the client side.
043 *
044 *****************************************************************/
045@InterfaceAudience.Public
046@InterfaceStability.Stable
047public abstract class ChecksumFileSystem extends FilterFileSystem {
048  private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
049  private int bytesPerChecksum = 512;
050  private boolean verifyChecksum = true;
051  private boolean writeChecksum = true;
052
053  public static double getApproxChkSumLength(long size) {
054    return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
055  }
056  
057  public ChecksumFileSystem(FileSystem fs) {
058    super(fs);
059  }
060
061  @Override
062  public void setConf(Configuration conf) {
063    super.setConf(conf);
064    if (conf != null) {
065      bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY,
066                                     LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT);
067      Preconditions.checkState(bytesPerChecksum > 0,
068          "bytes per checksum should be positive but was %s",
069          bytesPerChecksum);
070    }
071  }
072  
073  /**
074   * Set whether to verify checksum.
075   */
076  @Override
077  public void setVerifyChecksum(boolean verifyChecksum) {
078    this.verifyChecksum = verifyChecksum;
079  }
080
081  @Override
082  public void setWriteChecksum(boolean writeChecksum) {
083    this.writeChecksum = writeChecksum;
084  }
085  
086  /** get the raw file system */
087  @Override
088  public FileSystem getRawFileSystem() {
089    return fs;
090  }
091
092  /** Return the name of the checksum file associated with a file.*/
093  public Path getChecksumFile(Path file) {
094    return new Path(file.getParent(), "." + file.getName() + ".crc");
095  }
096
097  /** Return true iff file is a checksum file name.*/
098  public static boolean isChecksumFile(Path file) {
099    String name = file.getName();
100    return name.startsWith(".") && name.endsWith(".crc");
101  }
102
103  /** Return the length of the checksum file given the size of the 
104   * actual file.
105   **/
106  public long getChecksumFileLength(Path file, long fileSize) {
107    return getChecksumLength(fileSize, getBytesPerSum());
108  }
109
110  /** Return the bytes Per Checksum */
111  public int getBytesPerSum() {
112    return bytesPerChecksum;
113  }
114
115  private int getSumBufferSize(int bytesPerSum, int bufferSize) {
116    int defaultBufferSize = getConf().getInt(
117                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
118                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT);
119    int proportionalBufferSize = bufferSize / bytesPerSum;
120    return Math.max(bytesPerSum,
121                    Math.max(proportionalBufferSize, defaultBufferSize));
122  }
123
124  /*******************************************************
125   * For open()'s FSInputStream
126   * It verifies that data matches checksums.
127   *******************************************************/
128  private static class ChecksumFSInputChecker extends FSInputChecker {
129    private ChecksumFileSystem fs;
130    private FSDataInputStream datas;
131    private FSDataInputStream sums;
132    
133    private static final int HEADER_LENGTH = 8;
134    
135    private int bytesPerSum = 1;
136    
137    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
138      throws IOException {
139      this(fs, file, fs.getConf().getInt(
140                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 
141                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
142    }
143    
144    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
145      throws IOException {
146      super( file, fs.getFileStatus(file).getReplication() );
147      this.datas = fs.getRawFileSystem().open(file, bufferSize);
148      this.fs = fs;
149      Path sumFile = fs.getChecksumFile(file);
150      try {
151        int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
152        sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
153
154        byte[] version = new byte[CHECKSUM_VERSION.length];
155        sums.readFully(version);
156        if (!Arrays.equals(version, CHECKSUM_VERSION))
157          throw new IOException("Not a checksum file: "+sumFile);
158        this.bytesPerSum = sums.readInt();
159        set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4);
160      } catch (IOException e) {
161        // mincing the message is terrible, but java throws permission
162        // exceptions as FNF because that's all the method signatures allow!
163        if (!(e instanceof FileNotFoundException) ||
164            e.getMessage().endsWith(" (Permission denied)")) {
165          LOG.warn("Problem opening checksum file: "+ file +
166              ".  Ignoring exception: " , e);
167        }
168        set(fs.verifyChecksum, null, 1, 0);
169      }
170    }
171    
172    private long getChecksumFilePos( long dataPos ) {
173      return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
174    }
175    
176    @Override
177    protected long getChunkPosition( long dataPos ) {
178      return dataPos/bytesPerSum*bytesPerSum;
179    }
180    
181    @Override
182    public int available() throws IOException {
183      return datas.available() + super.available();
184    }
185    
186    @Override
187    public int read(long position, byte[] b, int off, int len)
188      throws IOException {
189      // parameter check
190      validatePositionedReadArgs(position, b, off, len);
191      if (len == 0) {
192        return 0;
193      }
194
195      int nread;
196      try (ChecksumFSInputChecker checker =
197               new ChecksumFSInputChecker(fs, file)) {
198        checker.seek(position);
199        nread = checker.read(b, off, len);
200      }
201      return nread;
202    }
203    
204    @Override
205    public void close() throws IOException {
206      datas.close();
207      if( sums != null ) {
208        sums.close();
209      }
210      set(fs.verifyChecksum, null, 1, 0);
211    }
212    
213
214    @Override
215    public boolean seekToNewSource(long targetPos) throws IOException {
216      long sumsPos = getChecksumFilePos(targetPos);
217      fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
218      boolean newDataSource = datas.seekToNewSource(targetPos);
219      return sums.seekToNewSource(sumsPos) || newDataSource;
220    }
221
222    @Override
223    protected int readChunk(long pos, byte[] buf, int offset, int len,
224        byte[] checksum) throws IOException {
225
226      boolean eof = false;
227      if (needChecksum()) {
228        assert checksum != null; // we have a checksum buffer
229        assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
230        assert len >= bytesPerSum; // we must read at least one chunk
231
232        final int checksumsToRead = Math.min(
233          len/bytesPerSum, // number of checksums based on len to read
234          checksum.length / CHECKSUM_SIZE); // size of checksum buffer
235        long checksumPos = getChecksumFilePos(pos); 
236        if(checksumPos != sums.getPos()) {
237          sums.seek(checksumPos);
238        }
239
240        int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
241        if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
242          throw new ChecksumException(
243            "Checksum file not a length multiple of checksum size " +
244            "in " + file + " at " + pos + " checksumpos: " + checksumPos +
245            " sumLenread: " + sumLenRead,
246            pos);
247        }
248        if (sumLenRead <= 0) { // we're at the end of the file
249          eof = true;
250        } else {
251          // Adjust amount of data to read based on how many checksum chunks we read
252          len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
253        }
254      }
255      if(pos != datas.getPos()) {
256        datas.seek(pos);
257      }
258      int nread = readFully(datas, buf, offset, len);
259      if (eof && nread > 0) {
260        throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
261      }
262      return nread;
263    }
264  }
265  
266  private static class FSDataBoundedInputStream extends FSDataInputStream {
267    private FileSystem fs;
268    private Path file;
269    private long fileLen = -1L;
270
271    FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) {
272      super(in);
273      this.fs = fs;
274      this.file = file;
275    }
276    
277    @Override
278    public boolean markSupported() {
279      return false;
280    }
281    
282    /* Return the file length */
283    private long getFileLength() throws IOException {
284      if( fileLen==-1L ) {
285        fileLen = fs.getContentSummary(file).getLength();
286      }
287      return fileLen;
288    }
289    
290    /**
291     * Skips over and discards <code>n</code> bytes of data from the
292     * input stream.
293     *
294     *The <code>skip</code> method skips over some smaller number of bytes
295     * when reaching end of file before <code>n</code> bytes have been skipped.
296     * The actual number of bytes skipped is returned.  If <code>n</code> is
297     * negative, no bytes are skipped.
298     *
299     * @param      n   the number of bytes to be skipped.
300     * @return     the actual number of bytes skipped.
301     * @exception  IOException  if an I/O error occurs.
302     *             ChecksumException if the chunk to skip to is corrupted
303     */
304    @Override
305    public synchronized long skip(long n) throws IOException {
306      long curPos = getPos();
307      long fileLength = getFileLength();
308      if( n+curPos > fileLength ) {
309        n = fileLength - curPos;
310      }
311      return super.skip(n);
312    }
313    
314    /**
315     * Seek to the given position in the stream.
316     * The next read() will be from that position.
317     * 
318     * <p>This method does not allow seek past the end of the file.
319     * This produces IOException.
320     *
321     * @param      pos   the postion to seek to.
322     * @exception  IOException  if an I/O error occurs or seeks after EOF
323     *             ChecksumException if the chunk to seek to is corrupted
324     */
325
326    @Override
327    public synchronized void seek(long pos) throws IOException {
328      if (pos > getFileLength()) {
329        throw new EOFException("Cannot seek after EOF");
330      }
331      super.seek(pos);
332    }
333
334  }
335
336  /**
337   * Opens an FSDataInputStream at the indicated Path.
338   * @param f the file name to open
339   * @param bufferSize the size of the buffer to be used.
340   */
341  @Override
342  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
343    FileSystem fs;
344    InputStream in;
345    if (verifyChecksum) {
346      fs = this;
347      in = new ChecksumFSInputChecker(this, f, bufferSize);
348    } else {
349      fs = getRawFileSystem();
350      in = fs.open(f, bufferSize);
351    }
352    return new FSDataBoundedInputStream(fs, f, in);
353  }
354
355  @Override
356  public FSDataOutputStream append(Path f, int bufferSize,
357      Progressable progress) throws IOException {
358    throw new UnsupportedOperationException("Append is not supported "
359        + "by ChecksumFileSystem");
360  }
361
362  @Override
363  public boolean truncate(Path f, long newLength) throws IOException {
364    throw new UnsupportedOperationException("Truncate is not supported "
365        + "by ChecksumFileSystem");
366  }
367
368  /**
369   * Calculated the length of the checksum file in bytes.
370   * @param size the length of the data file in bytes
371   * @param bytesPerSum the number of bytes in a checksum block
372   * @return the number of bytes in the checksum file
373   */
374  public static long getChecksumLength(long size, int bytesPerSum) {
375    //the checksum length is equal to size passed divided by bytesPerSum +
376    //bytes written in the beginning of the checksum file.  
377    return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
378             CHECKSUM_VERSION.length + 4;  
379  }
380
381  /** This class provides an output stream for a checksummed file.
382   * It generates checksums for data. */
383  private static class ChecksumFSOutputSummer extends FSOutputSummer {
384    private FSDataOutputStream datas;    
385    private FSDataOutputStream sums;
386    private static final float CHKSUM_AS_FRACTION = 0.01f;
387    private boolean isClosed = false;
388    
389    public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
390                          Path file, 
391                          boolean overwrite,
392                          int bufferSize,
393                          short replication,
394                          long blockSize,
395                          Progressable progress,
396                          FsPermission permission)
397      throws IOException {
398      super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
399          fs.getBytesPerSum()));
400      int bytesPerSum = fs.getBytesPerSum();
401      this.datas = fs.getRawFileSystem().create(file, permission, overwrite,
402                                         bufferSize, replication, blockSize,
403                                         progress);
404      int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
405      this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file),
406                                               permission, true, sumBufferSize,
407                                               replication, blockSize, null);
408      sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
409      sums.writeInt(bytesPerSum);
410    }
411    
412    @Override
413    public void close() throws IOException {
414      try {
415        flushBuffer();
416        sums.close();
417        datas.close();
418      } finally {
419        isClosed = true;
420      }
421    }
422    
423    @Override
424    protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
425        int ckoff, int cklen)
426    throws IOException {
427      datas.write(b, offset, len);
428      sums.write(checksum, ckoff, cklen);
429    }
430
431    @Override
432    protected void checkClosed() throws IOException {
433      if (isClosed) {
434        throw new ClosedChannelException();
435      }
436    }
437  }
438
439  @Override
440  public FSDataOutputStream create(Path f, FsPermission permission,
441      boolean overwrite, int bufferSize, short replication, long blockSize,
442      Progressable progress) throws IOException {
443    return create(f, permission, overwrite, true, bufferSize,
444        replication, blockSize, progress);
445  }
446
447  private FSDataOutputStream create(Path f, FsPermission permission,
448      boolean overwrite, boolean createParent, int bufferSize,
449      short replication, long blockSize,
450      Progressable progress) throws IOException {
451    Path parent = f.getParent();
452    if (parent != null) {
453      if (!createParent && !exists(parent)) {
454        throw new FileNotFoundException("Parent directory doesn't exist: "
455            + parent);
456      } else if (!mkdirs(parent)) {
457        throw new IOException("Mkdirs failed to create " + parent
458            + " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory()
459            + ")");
460      }
461    }
462    final FSDataOutputStream out;
463    if (writeChecksum) {
464      out = new FSDataOutputStream(
465          new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
466              blockSize, progress, permission), null);
467    } else {
468      out = fs.create(f, permission, overwrite, bufferSize, replication,
469          blockSize, progress);
470      // remove the checksum file since we aren't writing one
471      Path checkFile = getChecksumFile(f);
472      if (fs.exists(checkFile)) {
473        fs.delete(checkFile, true);
474      }
475    }
476    return out;
477  }
478
479  @Override
480  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
481      boolean overwrite, int bufferSize, short replication, long blockSize,
482      Progressable progress) throws IOException {
483    return create(f, permission, overwrite, false, bufferSize, replication,
484        blockSize, progress);
485  }
486
487  abstract class FsOperation {
488    boolean run(Path p) throws IOException {
489      boolean status = apply(p);
490      if (status) {
491        Path checkFile = getChecksumFile(p);
492        if (fs.exists(checkFile)) {
493          apply(checkFile);
494        }
495      }
496      return status;
497    }
498    abstract boolean apply(Path p) throws IOException;
499  }
500
501
502  @Override
503  public void setPermission(Path src, final FsPermission permission)
504      throws IOException {
505    new FsOperation(){
506      @Override
507      boolean apply(Path p) throws IOException {
508        fs.setPermission(p, permission);
509        return true;
510      }
511    }.run(src);
512  }
513
514  @Override
515  public void setOwner(Path src, final String username, final String groupname)
516      throws IOException {
517    new FsOperation(){
518      @Override
519      boolean apply(Path p) throws IOException {
520        fs.setOwner(p, username, groupname);
521        return true;
522      }
523    }.run(src);
524  }
525
526  @Override
527  public void setAcl(Path src, final List<AclEntry> aclSpec)
528      throws IOException {
529    new FsOperation(){
530      @Override
531      boolean apply(Path p) throws IOException {
532        fs.setAcl(p, aclSpec);
533        return true;
534      }
535    }.run(src);
536  }
537
538  @Override
539  public void modifyAclEntries(Path src, final List<AclEntry> aclSpec)
540      throws IOException {
541    new FsOperation(){
542      @Override
543      boolean apply(Path p) throws IOException {
544        fs.modifyAclEntries(p, aclSpec);
545        return true;
546      }
547    }.run(src);
548  }
549
550  @Override
551  public void removeAcl(Path src) throws IOException {
552    new FsOperation(){
553      @Override
554      boolean apply(Path p) throws IOException {
555        fs.removeAcl(p);
556        return true;
557      }
558    }.run(src);
559  }
560
561  @Override
562  public void removeAclEntries(Path src, final List<AclEntry> aclSpec)
563      throws IOException {
564    new FsOperation(){
565      @Override
566      boolean apply(Path p) throws IOException {
567        fs.removeAclEntries(p, aclSpec);
568        return true;
569      }
570    }.run(src);
571  }
572
573  @Override
574  public void removeDefaultAcl(Path src) throws IOException {
575    new FsOperation(){
576      @Override
577      boolean apply(Path p) throws IOException {
578        fs.removeDefaultAcl(p);
579        return true;
580      }
581    }.run(src);
582  }
583
584  /**
585   * Set replication for an existing file.
586   * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
587   * @param src file name
588   * @param replication new replication
589   * @throws IOException
590   * @return true if successful;
591   *         false if file does not exist or is a directory
592   */
593  @Override
594  public boolean setReplication(Path src, final short replication)
595      throws IOException {
596    return new FsOperation(){
597      @Override
598      boolean apply(Path p) throws IOException {
599        return fs.setReplication(p, replication);
600      }
601    }.run(src);
602  }
603
604  /**
605   * Rename files/dirs
606   */
607  @Override
608  public boolean rename(Path src, Path dst) throws IOException {
609    if (fs.isDirectory(src)) {
610      return fs.rename(src, dst);
611    } else {
612      if (fs.isDirectory(dst)) {
613        dst = new Path(dst, src.getName());
614      }
615
616      boolean value = fs.rename(src, dst);
617      if (!value)
618        return false;
619
620      Path srcCheckFile = getChecksumFile(src);
621      Path dstCheckFile = getChecksumFile(dst);
622      if (fs.exists(srcCheckFile)) { //try to rename checksum
623        value = fs.rename(srcCheckFile, dstCheckFile);
624      } else if (fs.exists(dstCheckFile)) {
625        // no src checksum, so remove dst checksum
626        value = fs.delete(dstCheckFile, true); 
627      }
628
629      return value;
630    }
631  }
632
633  /**
634   * Implement the delete(Path, boolean) in checksum
635   * file system.
636   */
637  @Override
638  public boolean delete(Path f, boolean recursive) throws IOException{
639    FileStatus fstatus = null;
640    try {
641      fstatus = fs.getFileStatus(f);
642    } catch(FileNotFoundException e) {
643      return false;
644    }
645    if (fstatus.isDirectory()) {
646      //this works since the crcs are in the same
647      //directories and the files. so we just delete
648      //everything in the underlying filesystem
649      return fs.delete(f, recursive);
650    } else {
651      Path checkFile = getChecksumFile(f);
652      if (fs.exists(checkFile)) {
653        fs.delete(checkFile, true);
654      }
655      return fs.delete(f, true);
656    }
657  }
658    
659  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
660    @Override
661    public boolean accept(Path file) {
662      return !isChecksumFile(file);
663    }
664  };
665
666  /**
667   * List the statuses of the files/directories in the given path if the path is
668   * a directory.
669   * 
670   * @param f
671   *          given path
672   * @return the statuses of the files/directories in the given path
673   * @throws IOException
674   */
675  @Override
676  public FileStatus[] listStatus(Path f) throws IOException {
677    return fs.listStatus(f, DEFAULT_FILTER);
678  }
679  
680  /**
681   * List the statuses of the files/directories in the given path if the path is
682   * a directory.
683   * 
684   * @param f
685   *          given path
686   * @return the statuses of the files/directories in the given patch
687   * @throws IOException
688   */
689  @Override
690  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
691  throws IOException {
692    return fs.listLocatedStatus(f, DEFAULT_FILTER);
693  }
694  
695  @Override
696  public boolean mkdirs(Path f) throws IOException {
697    return fs.mkdirs(f);
698  }
699
700  @Override
701  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
702    throws IOException {
703    Configuration conf = getConf();
704    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
705  }
706
707  /**
708   * The src file is under FS, and the dst is on the local disk.
709   * Copy it from FS control to the local dst name.
710   */
711  @Override
712  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
713    throws IOException {
714    Configuration conf = getConf();
715    FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
716  }
717
718  /**
719   * The src file is under FS, and the dst is on the local disk.
720   * Copy it from FS control to the local dst name.
721   * If src and dst are directories, the copyCrc parameter
722   * determines whether to copy CRC files.
723   */
724  public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
725    throws IOException {
726    if (!fs.isDirectory(src)) { // source is a file
727      fs.copyToLocalFile(src, dst);
728      FileSystem localFs = getLocal(getConf()).getRawFileSystem();
729      if (localFs.isDirectory(dst)) {
730        dst = new Path(dst, src.getName());
731      }
732      dst = getChecksumFile(dst);
733      if (localFs.exists(dst)) { //remove old local checksum file
734        localFs.delete(dst, true);
735      }
736      Path checksumFile = getChecksumFile(src);
737      if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
738        fs.copyToLocalFile(checksumFile, dst);
739      }
740    } else {
741      FileStatus[] srcs = listStatus(src);
742      for (FileStatus srcFile : srcs) {
743        copyToLocalFile(srcFile.getPath(), 
744                        new Path(dst, srcFile.getPath().getName()), copyCrc);
745      }
746    }
747  }
748
749  @Override
750  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
751    throws IOException {
752    return tmpLocalFile;
753  }
754
755  @Override
756  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
757    throws IOException {
758    moveFromLocalFile(tmpLocalFile, fsOutputFile);
759  }
760
761  /**
762   * Report a checksum error to the file system.
763   * @param f the file name containing the error
764   * @param in the stream open on the file
765   * @param inPos the position of the beginning of the bad data in the file
766   * @param sums the stream open on the checksum file
767   * @param sumsPos the position of the beginning of the bad data in the checksum file
768   * @return if retry is necessary
769   */
770  public boolean reportChecksumFailure(Path f, FSDataInputStream in,
771                                       long inPos, FSDataInputStream sums, long sumsPos) {
772    return false;
773  }
774}