001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs; 020 021import java.io.EOFException; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InputStream; 025import java.nio.channels.ClosedChannelException; 026import java.util.Arrays; 027import java.util.List; 028 029import com.google.common.base.Preconditions; 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.permission.AclEntry; 034import org.apache.hadoop.fs.permission.FsPermission; 035import org.apache.hadoop.util.DataChecksum; 036import org.apache.hadoop.util.Progressable; 037 038/**************************************************************** 039 * Abstract Checksumed FileSystem. 040 * It provide a basic implementation of a Checksumed FileSystem, 041 * which creates a checksum file for each raw file. 042 * It generates & verifies checksums at the client side. 043 * 044 *****************************************************************/ 045@InterfaceAudience.Public 046@InterfaceStability.Stable 047public abstract class ChecksumFileSystem extends FilterFileSystem { 048 private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0}; 049 private int bytesPerChecksum = 512; 050 private boolean verifyChecksum = true; 051 private boolean writeChecksum = true; 052 053 public static double getApproxChkSumLength(long size) { 054 return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size; 055 } 056 057 public ChecksumFileSystem(FileSystem fs) { 058 super(fs); 059 } 060 061 @Override 062 public void setConf(Configuration conf) { 063 super.setConf(conf); 064 if (conf != null) { 065 bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY, 066 LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT); 067 Preconditions.checkState(bytesPerChecksum > 0, 068 "bytes per checksum should be positive but was %s", 069 bytesPerChecksum); 070 } 071 } 072 073 /** 074 * Set whether to verify checksum. 075 */ 076 @Override 077 public void setVerifyChecksum(boolean verifyChecksum) { 078 this.verifyChecksum = verifyChecksum; 079 } 080 081 @Override 082 public void setWriteChecksum(boolean writeChecksum) { 083 this.writeChecksum = writeChecksum; 084 } 085 086 /** get the raw file system */ 087 @Override 088 public FileSystem getRawFileSystem() { 089 return fs; 090 } 091 092 /** Return the name of the checksum file associated with a file.*/ 093 public Path getChecksumFile(Path file) { 094 return new Path(file.getParent(), "." + file.getName() + ".crc"); 095 } 096 097 /** Return true iff file is a checksum file name.*/ 098 public static boolean isChecksumFile(Path file) { 099 String name = file.getName(); 100 return name.startsWith(".") && name.endsWith(".crc"); 101 } 102 103 /** Return the length of the checksum file given the size of the 104 * actual file. 105 **/ 106 public long getChecksumFileLength(Path file, long fileSize) { 107 return getChecksumLength(fileSize, getBytesPerSum()); 108 } 109 110 /** Return the bytes Per Checksum */ 111 public int getBytesPerSum() { 112 return bytesPerChecksum; 113 } 114 115 private int getSumBufferSize(int bytesPerSum, int bufferSize) { 116 int defaultBufferSize = getConf().getInt( 117 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 118 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT); 119 int proportionalBufferSize = bufferSize / bytesPerSum; 120 return Math.max(bytesPerSum, 121 Math.max(proportionalBufferSize, defaultBufferSize)); 122 } 123 124 /******************************************************* 125 * For open()'s FSInputStream 126 * It verifies that data matches checksums. 127 *******************************************************/ 128 private static class ChecksumFSInputChecker extends FSInputChecker { 129 private ChecksumFileSystem fs; 130 private FSDataInputStream datas; 131 private FSDataInputStream sums; 132 133 private static final int HEADER_LENGTH = 8; 134 135 private int bytesPerSum = 1; 136 137 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file) 138 throws IOException { 139 this(fs, file, fs.getConf().getInt( 140 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 141 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT)); 142 } 143 144 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize) 145 throws IOException { 146 super( file, fs.getFileStatus(file).getReplication() ); 147 this.datas = fs.getRawFileSystem().open(file, bufferSize); 148 this.fs = fs; 149 Path sumFile = fs.getChecksumFile(file); 150 try { 151 int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize); 152 sums = fs.getRawFileSystem().open(sumFile, sumBufferSize); 153 154 byte[] version = new byte[CHECKSUM_VERSION.length]; 155 sums.readFully(version); 156 if (!Arrays.equals(version, CHECKSUM_VERSION)) 157 throw new IOException("Not a checksum file: "+sumFile); 158 this.bytesPerSum = sums.readInt(); 159 set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4); 160 } catch (IOException e) { 161 // mincing the message is terrible, but java throws permission 162 // exceptions as FNF because that's all the method signatures allow! 163 if (!(e instanceof FileNotFoundException) || 164 e.getMessage().endsWith(" (Permission denied)")) { 165 LOG.warn("Problem opening checksum file: "+ file + 166 ". Ignoring exception: " , e); 167 } 168 set(fs.verifyChecksum, null, 1, 0); 169 } 170 } 171 172 private long getChecksumFilePos( long dataPos ) { 173 return HEADER_LENGTH + 4*(dataPos/bytesPerSum); 174 } 175 176 @Override 177 protected long getChunkPosition( long dataPos ) { 178 return dataPos/bytesPerSum*bytesPerSum; 179 } 180 181 @Override 182 public int available() throws IOException { 183 return datas.available() + super.available(); 184 } 185 186 @Override 187 public int read(long position, byte[] b, int off, int len) 188 throws IOException { 189 // parameter check 190 validatePositionedReadArgs(position, b, off, len); 191 if (len == 0) { 192 return 0; 193 } 194 195 int nread; 196 try (ChecksumFSInputChecker checker = 197 new ChecksumFSInputChecker(fs, file)) { 198 checker.seek(position); 199 nread = checker.read(b, off, len); 200 } 201 return nread; 202 } 203 204 @Override 205 public void close() throws IOException { 206 datas.close(); 207 if( sums != null ) { 208 sums.close(); 209 } 210 set(fs.verifyChecksum, null, 1, 0); 211 } 212 213 214 @Override 215 public boolean seekToNewSource(long targetPos) throws IOException { 216 long sumsPos = getChecksumFilePos(targetPos); 217 fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos); 218 boolean newDataSource = datas.seekToNewSource(targetPos); 219 return sums.seekToNewSource(sumsPos) || newDataSource; 220 } 221 222 @Override 223 protected int readChunk(long pos, byte[] buf, int offset, int len, 224 byte[] checksum) throws IOException { 225 226 boolean eof = false; 227 if (needChecksum()) { 228 assert checksum != null; // we have a checksum buffer 229 assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length 230 assert len >= bytesPerSum; // we must read at least one chunk 231 232 final int checksumsToRead = Math.min( 233 len/bytesPerSum, // number of checksums based on len to read 234 checksum.length / CHECKSUM_SIZE); // size of checksum buffer 235 long checksumPos = getChecksumFilePos(pos); 236 if(checksumPos != sums.getPos()) { 237 sums.seek(checksumPos); 238 } 239 240 int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead); 241 if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) { 242 throw new ChecksumException( 243 "Checksum file not a length multiple of checksum size " + 244 "in " + file + " at " + pos + " checksumpos: " + checksumPos + 245 " sumLenread: " + sumLenRead, 246 pos); 247 } 248 if (sumLenRead <= 0) { // we're at the end of the file 249 eof = true; 250 } else { 251 // Adjust amount of data to read based on how many checksum chunks we read 252 len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE)); 253 } 254 } 255 if(pos != datas.getPos()) { 256 datas.seek(pos); 257 } 258 int nread = readFully(datas, buf, offset, len); 259 if (eof && nread > 0) { 260 throw new ChecksumException("Checksum error: "+file+" at "+pos, pos); 261 } 262 return nread; 263 } 264 } 265 266 private static class FSDataBoundedInputStream extends FSDataInputStream { 267 private FileSystem fs; 268 private Path file; 269 private long fileLen = -1L; 270 271 FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) { 272 super(in); 273 this.fs = fs; 274 this.file = file; 275 } 276 277 @Override 278 public boolean markSupported() { 279 return false; 280 } 281 282 /* Return the file length */ 283 private long getFileLength() throws IOException { 284 if( fileLen==-1L ) { 285 fileLen = fs.getContentSummary(file).getLength(); 286 } 287 return fileLen; 288 } 289 290 /** 291 * Skips over and discards <code>n</code> bytes of data from the 292 * input stream. 293 * 294 *The <code>skip</code> method skips over some smaller number of bytes 295 * when reaching end of file before <code>n</code> bytes have been skipped. 296 * The actual number of bytes skipped is returned. If <code>n</code> is 297 * negative, no bytes are skipped. 298 * 299 * @param n the number of bytes to be skipped. 300 * @return the actual number of bytes skipped. 301 * @exception IOException if an I/O error occurs. 302 * ChecksumException if the chunk to skip to is corrupted 303 */ 304 @Override 305 public synchronized long skip(long n) throws IOException { 306 long curPos = getPos(); 307 long fileLength = getFileLength(); 308 if( n+curPos > fileLength ) { 309 n = fileLength - curPos; 310 } 311 return super.skip(n); 312 } 313 314 /** 315 * Seek to the given position in the stream. 316 * The next read() will be from that position. 317 * 318 * <p>This method does not allow seek past the end of the file. 319 * This produces IOException. 320 * 321 * @param pos the postion to seek to. 322 * @exception IOException if an I/O error occurs or seeks after EOF 323 * ChecksumException if the chunk to seek to is corrupted 324 */ 325 326 @Override 327 public synchronized void seek(long pos) throws IOException { 328 if (pos > getFileLength()) { 329 throw new EOFException("Cannot seek after EOF"); 330 } 331 super.seek(pos); 332 } 333 334 } 335 336 /** 337 * Opens an FSDataInputStream at the indicated Path. 338 * @param f the file name to open 339 * @param bufferSize the size of the buffer to be used. 340 */ 341 @Override 342 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 343 FileSystem fs; 344 InputStream in; 345 if (verifyChecksum) { 346 fs = this; 347 in = new ChecksumFSInputChecker(this, f, bufferSize); 348 } else { 349 fs = getRawFileSystem(); 350 in = fs.open(f, bufferSize); 351 } 352 return new FSDataBoundedInputStream(fs, f, in); 353 } 354 355 @Override 356 public FSDataOutputStream append(Path f, int bufferSize, 357 Progressable progress) throws IOException { 358 throw new UnsupportedOperationException("Append is not supported " 359 + "by ChecksumFileSystem"); 360 } 361 362 @Override 363 public boolean truncate(Path f, long newLength) throws IOException { 364 throw new UnsupportedOperationException("Truncate is not supported " 365 + "by ChecksumFileSystem"); 366 } 367 368 /** 369 * Calculated the length of the checksum file in bytes. 370 * @param size the length of the data file in bytes 371 * @param bytesPerSum the number of bytes in a checksum block 372 * @return the number of bytes in the checksum file 373 */ 374 public static long getChecksumLength(long size, int bytesPerSum) { 375 //the checksum length is equal to size passed divided by bytesPerSum + 376 //bytes written in the beginning of the checksum file. 377 return ((size + bytesPerSum - 1) / bytesPerSum) * 4 + 378 CHECKSUM_VERSION.length + 4; 379 } 380 381 /** This class provides an output stream for a checksummed file. 382 * It generates checksums for data. */ 383 private static class ChecksumFSOutputSummer extends FSOutputSummer { 384 private FSDataOutputStream datas; 385 private FSDataOutputStream sums; 386 private static final float CHKSUM_AS_FRACTION = 0.01f; 387 private boolean isClosed = false; 388 389 public ChecksumFSOutputSummer(ChecksumFileSystem fs, 390 Path file, 391 boolean overwrite, 392 int bufferSize, 393 short replication, 394 long blockSize, 395 Progressable progress, 396 FsPermission permission) 397 throws IOException { 398 super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 399 fs.getBytesPerSum())); 400 int bytesPerSum = fs.getBytesPerSum(); 401 this.datas = fs.getRawFileSystem().create(file, permission, overwrite, 402 bufferSize, replication, blockSize, 403 progress); 404 int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize); 405 this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), 406 permission, true, sumBufferSize, 407 replication, blockSize, null); 408 sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length); 409 sums.writeInt(bytesPerSum); 410 } 411 412 @Override 413 public void close() throws IOException { 414 try { 415 flushBuffer(); 416 sums.close(); 417 datas.close(); 418 } finally { 419 isClosed = true; 420 } 421 } 422 423 @Override 424 protected void writeChunk(byte[] b, int offset, int len, byte[] checksum, 425 int ckoff, int cklen) 426 throws IOException { 427 datas.write(b, offset, len); 428 sums.write(checksum, ckoff, cklen); 429 } 430 431 @Override 432 protected void checkClosed() throws IOException { 433 if (isClosed) { 434 throw new ClosedChannelException(); 435 } 436 } 437 } 438 439 @Override 440 public FSDataOutputStream create(Path f, FsPermission permission, 441 boolean overwrite, int bufferSize, short replication, long blockSize, 442 Progressable progress) throws IOException { 443 return create(f, permission, overwrite, true, bufferSize, 444 replication, blockSize, progress); 445 } 446 447 private FSDataOutputStream create(Path f, FsPermission permission, 448 boolean overwrite, boolean createParent, int bufferSize, 449 short replication, long blockSize, 450 Progressable progress) throws IOException { 451 Path parent = f.getParent(); 452 if (parent != null) { 453 if (!createParent && !exists(parent)) { 454 throw new FileNotFoundException("Parent directory doesn't exist: " 455 + parent); 456 } else if (!mkdirs(parent)) { 457 throw new IOException("Mkdirs failed to create " + parent 458 + " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory() 459 + ")"); 460 } 461 } 462 final FSDataOutputStream out; 463 if (writeChecksum) { 464 out = new FSDataOutputStream( 465 new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication, 466 blockSize, progress, permission), null); 467 } else { 468 out = fs.create(f, permission, overwrite, bufferSize, replication, 469 blockSize, progress); 470 // remove the checksum file since we aren't writing one 471 Path checkFile = getChecksumFile(f); 472 if (fs.exists(checkFile)) { 473 fs.delete(checkFile, true); 474 } 475 } 476 return out; 477 } 478 479 @Override 480 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 481 boolean overwrite, int bufferSize, short replication, long blockSize, 482 Progressable progress) throws IOException { 483 return create(f, permission, overwrite, false, bufferSize, replication, 484 blockSize, progress); 485 } 486 487 abstract class FsOperation { 488 boolean run(Path p) throws IOException { 489 boolean status = apply(p); 490 if (status) { 491 Path checkFile = getChecksumFile(p); 492 if (fs.exists(checkFile)) { 493 apply(checkFile); 494 } 495 } 496 return status; 497 } 498 abstract boolean apply(Path p) throws IOException; 499 } 500 501 502 @Override 503 public void setPermission(Path src, final FsPermission permission) 504 throws IOException { 505 new FsOperation(){ 506 @Override 507 boolean apply(Path p) throws IOException { 508 fs.setPermission(p, permission); 509 return true; 510 } 511 }.run(src); 512 } 513 514 @Override 515 public void setOwner(Path src, final String username, final String groupname) 516 throws IOException { 517 new FsOperation(){ 518 @Override 519 boolean apply(Path p) throws IOException { 520 fs.setOwner(p, username, groupname); 521 return true; 522 } 523 }.run(src); 524 } 525 526 @Override 527 public void setAcl(Path src, final List<AclEntry> aclSpec) 528 throws IOException { 529 new FsOperation(){ 530 @Override 531 boolean apply(Path p) throws IOException { 532 fs.setAcl(p, aclSpec); 533 return true; 534 } 535 }.run(src); 536 } 537 538 @Override 539 public void modifyAclEntries(Path src, final List<AclEntry> aclSpec) 540 throws IOException { 541 new FsOperation(){ 542 @Override 543 boolean apply(Path p) throws IOException { 544 fs.modifyAclEntries(p, aclSpec); 545 return true; 546 } 547 }.run(src); 548 } 549 550 @Override 551 public void removeAcl(Path src) throws IOException { 552 new FsOperation(){ 553 @Override 554 boolean apply(Path p) throws IOException { 555 fs.removeAcl(p); 556 return true; 557 } 558 }.run(src); 559 } 560 561 @Override 562 public void removeAclEntries(Path src, final List<AclEntry> aclSpec) 563 throws IOException { 564 new FsOperation(){ 565 @Override 566 boolean apply(Path p) throws IOException { 567 fs.removeAclEntries(p, aclSpec); 568 return true; 569 } 570 }.run(src); 571 } 572 573 @Override 574 public void removeDefaultAcl(Path src) throws IOException { 575 new FsOperation(){ 576 @Override 577 boolean apply(Path p) throws IOException { 578 fs.removeDefaultAcl(p); 579 return true; 580 } 581 }.run(src); 582 } 583 584 /** 585 * Set replication for an existing file. 586 * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt> 587 * @param src file name 588 * @param replication new replication 589 * @throws IOException 590 * @return true if successful; 591 * false if file does not exist or is a directory 592 */ 593 @Override 594 public boolean setReplication(Path src, final short replication) 595 throws IOException { 596 return new FsOperation(){ 597 @Override 598 boolean apply(Path p) throws IOException { 599 return fs.setReplication(p, replication); 600 } 601 }.run(src); 602 } 603 604 /** 605 * Rename files/dirs 606 */ 607 @Override 608 public boolean rename(Path src, Path dst) throws IOException { 609 if (fs.isDirectory(src)) { 610 return fs.rename(src, dst); 611 } else { 612 if (fs.isDirectory(dst)) { 613 dst = new Path(dst, src.getName()); 614 } 615 616 boolean value = fs.rename(src, dst); 617 if (!value) 618 return false; 619 620 Path srcCheckFile = getChecksumFile(src); 621 Path dstCheckFile = getChecksumFile(dst); 622 if (fs.exists(srcCheckFile)) { //try to rename checksum 623 value = fs.rename(srcCheckFile, dstCheckFile); 624 } else if (fs.exists(dstCheckFile)) { 625 // no src checksum, so remove dst checksum 626 value = fs.delete(dstCheckFile, true); 627 } 628 629 return value; 630 } 631 } 632 633 /** 634 * Implement the delete(Path, boolean) in checksum 635 * file system. 636 */ 637 @Override 638 public boolean delete(Path f, boolean recursive) throws IOException{ 639 FileStatus fstatus = null; 640 try { 641 fstatus = fs.getFileStatus(f); 642 } catch(FileNotFoundException e) { 643 return false; 644 } 645 if (fstatus.isDirectory()) { 646 //this works since the crcs are in the same 647 //directories and the files. so we just delete 648 //everything in the underlying filesystem 649 return fs.delete(f, recursive); 650 } else { 651 Path checkFile = getChecksumFile(f); 652 if (fs.exists(checkFile)) { 653 fs.delete(checkFile, true); 654 } 655 return fs.delete(f, true); 656 } 657 } 658 659 final private static PathFilter DEFAULT_FILTER = new PathFilter() { 660 @Override 661 public boolean accept(Path file) { 662 return !isChecksumFile(file); 663 } 664 }; 665 666 /** 667 * List the statuses of the files/directories in the given path if the path is 668 * a directory. 669 * 670 * @param f 671 * given path 672 * @return the statuses of the files/directories in the given path 673 * @throws IOException 674 */ 675 @Override 676 public FileStatus[] listStatus(Path f) throws IOException { 677 return fs.listStatus(f, DEFAULT_FILTER); 678 } 679 680 /** 681 * List the statuses of the files/directories in the given path if the path is 682 * a directory. 683 * 684 * @param f 685 * given path 686 * @return the statuses of the files/directories in the given patch 687 * @throws IOException 688 */ 689 @Override 690 public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) 691 throws IOException { 692 return fs.listLocatedStatus(f, DEFAULT_FILTER); 693 } 694 695 @Override 696 public boolean mkdirs(Path f) throws IOException { 697 return fs.mkdirs(f); 698 } 699 700 @Override 701 public void copyFromLocalFile(boolean delSrc, Path src, Path dst) 702 throws IOException { 703 Configuration conf = getConf(); 704 FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf); 705 } 706 707 /** 708 * The src file is under FS, and the dst is on the local disk. 709 * Copy it from FS control to the local dst name. 710 */ 711 @Override 712 public void copyToLocalFile(boolean delSrc, Path src, Path dst) 713 throws IOException { 714 Configuration conf = getConf(); 715 FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf); 716 } 717 718 /** 719 * The src file is under FS, and the dst is on the local disk. 720 * Copy it from FS control to the local dst name. 721 * If src and dst are directories, the copyCrc parameter 722 * determines whether to copy CRC files. 723 */ 724 public void copyToLocalFile(Path src, Path dst, boolean copyCrc) 725 throws IOException { 726 if (!fs.isDirectory(src)) { // source is a file 727 fs.copyToLocalFile(src, dst); 728 FileSystem localFs = getLocal(getConf()).getRawFileSystem(); 729 if (localFs.isDirectory(dst)) { 730 dst = new Path(dst, src.getName()); 731 } 732 dst = getChecksumFile(dst); 733 if (localFs.exists(dst)) { //remove old local checksum file 734 localFs.delete(dst, true); 735 } 736 Path checksumFile = getChecksumFile(src); 737 if (copyCrc && fs.exists(checksumFile)) { //copy checksum file 738 fs.copyToLocalFile(checksumFile, dst); 739 } 740 } else { 741 FileStatus[] srcs = listStatus(src); 742 for (FileStatus srcFile : srcs) { 743 copyToLocalFile(srcFile.getPath(), 744 new Path(dst, srcFile.getPath().getName()), copyCrc); 745 } 746 } 747 } 748 749 @Override 750 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 751 throws IOException { 752 return tmpLocalFile; 753 } 754 755 @Override 756 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 757 throws IOException { 758 moveFromLocalFile(tmpLocalFile, fsOutputFile); 759 } 760 761 /** 762 * Report a checksum error to the file system. 763 * @param f the file name containing the error 764 * @param in the stream open on the file 765 * @param inPos the position of the beginning of the bad data in the file 766 * @param sums the stream open on the checksum file 767 * @param sumsPos the position of the beginning of the bad data in the checksum file 768 * @return if retry is necessary 769 */ 770 public boolean reportChecksumFailure(Path f, FSDataInputStream in, 771 long inPos, FSDataInputStream sums, long sumsPos) { 772 return false; 773 } 774}