001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.fs.ftp; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.InputStream; 023import java.net.ConnectException; 024import java.net.URI; 025 026import com.google.common.base.Preconditions; 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.apache.commons.net.ftp.FTP; 030import org.apache.commons.net.ftp.FTPClient; 031import org.apache.commons.net.ftp.FTPFile; 032import org.apache.commons.net.ftp.FTPReply; 033import org.apache.hadoop.classification.InterfaceAudience; 034import org.apache.hadoop.classification.InterfaceStability; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FSDataInputStream; 037import org.apache.hadoop.fs.FSDataOutputStream; 038import org.apache.hadoop.fs.FileAlreadyExistsException; 039import org.apache.hadoop.fs.FileStatus; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.ParentNotDirectoryException; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.fs.permission.FsAction; 044import org.apache.hadoop.fs.permission.FsPermission; 045import org.apache.hadoop.net.NetUtils; 046import org.apache.hadoop.util.Progressable; 047 048/** 049 * <p> 050 * A {@link FileSystem} backed by an FTP client provided by <a 051 * href="http://commons.apache.org/net/">Apache Commons Net</a>. 052 * </p> 053 */ 054@InterfaceAudience.Public 055@InterfaceStability.Stable 056public class FTPFileSystem extends FileSystem { 057 058 public static final Log LOG = LogFactory 059 .getLog(FTPFileSystem.class); 060 061 public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; 062 063 public static final int DEFAULT_BLOCK_SIZE = 4 * 1024; 064 public static final String FS_FTP_USER_PREFIX = "fs.ftp.user."; 065 public static final String FS_FTP_HOST = "fs.ftp.host"; 066 public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port"; 067 public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password."; 068 public static final String E_SAME_DIRECTORY_ONLY = 069 "only same directory renames are supported"; 070 071 private URI uri; 072 073 /** 074 * Return the protocol scheme for the FileSystem. 075 * <p/> 076 * 077 * @return <code>ftp</code> 078 */ 079 @Override 080 public String getScheme() { 081 return "ftp"; 082 } 083 084 /** 085 * Get the default port for this FTPFileSystem. 086 * 087 * @return the default port 088 */ 089 @Override 090 protected int getDefaultPort() { 091 return FTP.DEFAULT_PORT; 092 } 093 094 @Override 095 public void initialize(URI uri, Configuration conf) throws IOException { // get 096 super.initialize(uri, conf); 097 // get host information from uri (overrides info in conf) 098 String host = uri.getHost(); 099 host = (host == null) ? conf.get(FS_FTP_HOST, null) : host; 100 if (host == null) { 101 throw new IOException("Invalid host specified"); 102 } 103 conf.set(FS_FTP_HOST, host); 104 105 // get port information from uri, (overrides info in conf) 106 int port = uri.getPort(); 107 port = (port == -1) ? FTP.DEFAULT_PORT : port; 108 conf.setInt("fs.ftp.host.port", port); 109 110 // get user/password information from URI (overrides info in conf) 111 String userAndPassword = uri.getUserInfo(); 112 if (userAndPassword == null) { 113 userAndPassword = (conf.get("fs.ftp.user." + host, null) + ":" + conf 114 .get("fs.ftp.password." + host, null)); 115 } 116 String[] userPasswdInfo = userAndPassword.split(":"); 117 Preconditions.checkState(userPasswdInfo.length > 1, 118 "Invalid username / password"); 119 conf.set(FS_FTP_USER_PREFIX + host, userPasswdInfo[0]); 120 conf.set(FS_FTP_PASSWORD_PREFIX + host, userPasswdInfo[1]); 121 setConf(conf); 122 this.uri = uri; 123 } 124 125 /** 126 * Connect to the FTP server using configuration parameters * 127 * 128 * @return An FTPClient instance 129 * @throws IOException 130 */ 131 private FTPClient connect() throws IOException { 132 FTPClient client = null; 133 Configuration conf = getConf(); 134 String host = conf.get(FS_FTP_HOST); 135 int port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT); 136 String user = conf.get(FS_FTP_USER_PREFIX + host); 137 String password = conf.get(FS_FTP_PASSWORD_PREFIX + host); 138 client = new FTPClient(); 139 client.connect(host, port); 140 int reply = client.getReplyCode(); 141 if (!FTPReply.isPositiveCompletion(reply)) { 142 throw NetUtils.wrapException(host, port, 143 NetUtils.UNKNOWN_HOST, 0, 144 new ConnectException("Server response " + reply)); 145 } else if (client.login(user, password)) { 146 client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE); 147 client.setFileType(FTP.BINARY_FILE_TYPE); 148 client.setBufferSize(DEFAULT_BUFFER_SIZE); 149 } else { 150 throw new IOException("Login failed on server - " + host + ", port - " 151 + port + " as user '" + user + "'"); 152 } 153 154 return client; 155 } 156 157 /** 158 * Logout and disconnect the given FTPClient. * 159 * 160 * @param client 161 * @throws IOException 162 */ 163 private void disconnect(FTPClient client) throws IOException { 164 if (client != null) { 165 if (!client.isConnected()) { 166 throw new FTPException("Client not connected"); 167 } 168 boolean logoutSuccess = client.logout(); 169 client.disconnect(); 170 if (!logoutSuccess) { 171 LOG.warn("Logout failed while disconnecting, error code - " 172 + client.getReplyCode()); 173 } 174 } 175 } 176 177 /** 178 * Resolve against given working directory. * 179 * 180 * @param workDir 181 * @param path 182 * @return 183 */ 184 private Path makeAbsolute(Path workDir, Path path) { 185 if (path.isAbsolute()) { 186 return path; 187 } 188 return new Path(workDir, path); 189 } 190 191 @Override 192 public FSDataInputStream open(Path file, int bufferSize) throws IOException { 193 FTPClient client = connect(); 194 Path workDir = new Path(client.printWorkingDirectory()); 195 Path absolute = makeAbsolute(workDir, file); 196 FileStatus fileStat = getFileStatus(client, absolute); 197 if (fileStat.isDirectory()) { 198 disconnect(client); 199 throw new FileNotFoundException("Path " + file + " is a directory."); 200 } 201 client.allocate(bufferSize); 202 Path parent = absolute.getParent(); 203 // Change to parent directory on the 204 // server. Only then can we read the 205 // file 206 // on the server by opening up an InputStream. As a side effect the working 207 // directory on the server is changed to the parent directory of the file. 208 // The FTP client connection is closed when close() is called on the 209 // FSDataInputStream. 210 client.changeWorkingDirectory(parent.toUri().getPath()); 211 InputStream is = client.retrieveFileStream(file.getName()); 212 FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is, 213 client, statistics)); 214 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) { 215 // The ftpClient is an inconsistent state. Must close the stream 216 // which in turn will logout and disconnect from FTP server 217 fis.close(); 218 throw new IOException("Unable to open file: " + file + ", Aborting"); 219 } 220 return fis; 221 } 222 223 /** 224 * A stream obtained via this call must be closed before using other APIs of 225 * this class or else the invocation will block. 226 */ 227 @Override 228 public FSDataOutputStream create(Path file, FsPermission permission, 229 boolean overwrite, int bufferSize, short replication, long blockSize, 230 Progressable progress) throws IOException { 231 final FTPClient client = connect(); 232 Path workDir = new Path(client.printWorkingDirectory()); 233 Path absolute = makeAbsolute(workDir, file); 234 FileStatus status; 235 try { 236 status = getFileStatus(client, file); 237 } catch (FileNotFoundException fnfe) { 238 status = null; 239 } 240 if (status != null) { 241 if (overwrite && !status.isDirectory()) { 242 delete(client, file, false); 243 } else { 244 disconnect(client); 245 throw new FileAlreadyExistsException("File already exists: " + file); 246 } 247 } 248 249 Path parent = absolute.getParent(); 250 if (parent == null || !mkdirs(client, parent, FsPermission.getDirDefault())) { 251 parent = (parent == null) ? new Path("/") : parent; 252 disconnect(client); 253 throw new IOException("create(): Mkdirs failed to create: " + parent); 254 } 255 client.allocate(bufferSize); 256 // Change to parent directory on the server. Only then can we write to the 257 // file on the server by opening up an OutputStream. As a side effect the 258 // working directory on the server is changed to the parent directory of the 259 // file. The FTP client connection is closed when close() is called on the 260 // FSDataOutputStream. 261 client.changeWorkingDirectory(parent.toUri().getPath()); 262 FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file 263 .getName()), statistics) { 264 @Override 265 public void close() throws IOException { 266 super.close(); 267 if (!client.isConnected()) { 268 throw new FTPException("Client not connected"); 269 } 270 boolean cmdCompleted = client.completePendingCommand(); 271 disconnect(client); 272 if (!cmdCompleted) { 273 throw new FTPException("Could not complete transfer, Reply Code - " 274 + client.getReplyCode()); 275 } 276 } 277 }; 278 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) { 279 // The ftpClient is an inconsistent state. Must close the stream 280 // which in turn will logout and disconnect from FTP server 281 fos.close(); 282 throw new IOException("Unable to create file: " + file + ", Aborting"); 283 } 284 return fos; 285 } 286 287 /** This optional operation is not yet supported. */ 288 @Override 289 public FSDataOutputStream append(Path f, int bufferSize, 290 Progressable progress) throws IOException { 291 throw new UnsupportedOperationException("Append is not supported " 292 + "by FTPFileSystem"); 293 } 294 295 /** 296 * Convenience method, so that we don't open a new connection when using this 297 * method from within another method. Otherwise every API invocation incurs 298 * the overhead of opening/closing a TCP connection. 299 * @throws IOException on IO problems other than FileNotFoundException 300 */ 301 private boolean exists(FTPClient client, Path file) throws IOException { 302 try { 303 getFileStatus(client, file); 304 return true; 305 } catch (FileNotFoundException fnfe) { 306 return false; 307 } 308 } 309 310 @Override 311 public boolean delete(Path file, boolean recursive) throws IOException { 312 FTPClient client = connect(); 313 try { 314 boolean success = delete(client, file, recursive); 315 return success; 316 } finally { 317 disconnect(client); 318 } 319 } 320 321 /** 322 * Convenience method, so that we don't open a new connection when using this 323 * method from within another method. Otherwise every API invocation incurs 324 * the overhead of opening/closing a TCP connection. 325 */ 326 private boolean delete(FTPClient client, Path file, boolean recursive) 327 throws IOException { 328 Path workDir = new Path(client.printWorkingDirectory()); 329 Path absolute = makeAbsolute(workDir, file); 330 String pathName = absolute.toUri().getPath(); 331 try { 332 FileStatus fileStat = getFileStatus(client, absolute); 333 if (fileStat.isFile()) { 334 return client.deleteFile(pathName); 335 } 336 } catch (FileNotFoundException e) { 337 //the file is not there 338 return false; 339 } 340 FileStatus[] dirEntries = listStatus(client, absolute); 341 if (dirEntries != null && dirEntries.length > 0 && !(recursive)) { 342 throw new IOException("Directory: " + file + " is not empty."); 343 } 344 for (FileStatus dirEntry : dirEntries) { 345 delete(client, new Path(absolute, dirEntry.getPath()), recursive); 346 } 347 return client.removeDirectory(pathName); 348 } 349 350 private FsAction getFsAction(int accessGroup, FTPFile ftpFile) { 351 FsAction action = FsAction.NONE; 352 if (ftpFile.hasPermission(accessGroup, FTPFile.READ_PERMISSION)) { 353 action.or(FsAction.READ); 354 } 355 if (ftpFile.hasPermission(accessGroup, FTPFile.WRITE_PERMISSION)) { 356 action.or(FsAction.WRITE); 357 } 358 if (ftpFile.hasPermission(accessGroup, FTPFile.EXECUTE_PERMISSION)) { 359 action.or(FsAction.EXECUTE); 360 } 361 return action; 362 } 363 364 private FsPermission getPermissions(FTPFile ftpFile) { 365 FsAction user, group, others; 366 user = getFsAction(FTPFile.USER_ACCESS, ftpFile); 367 group = getFsAction(FTPFile.GROUP_ACCESS, ftpFile); 368 others = getFsAction(FTPFile.WORLD_ACCESS, ftpFile); 369 return new FsPermission(user, group, others); 370 } 371 372 @Override 373 public URI getUri() { 374 return uri; 375 } 376 377 @Override 378 public FileStatus[] listStatus(Path file) throws IOException { 379 FTPClient client = connect(); 380 try { 381 FileStatus[] stats = listStatus(client, file); 382 return stats; 383 } finally { 384 disconnect(client); 385 } 386 } 387 388 /** 389 * Convenience method, so that we don't open a new connection when using this 390 * method from within another method. Otherwise every API invocation incurs 391 * the overhead of opening/closing a TCP connection. 392 */ 393 private FileStatus[] listStatus(FTPClient client, Path file) 394 throws IOException { 395 Path workDir = new Path(client.printWorkingDirectory()); 396 Path absolute = makeAbsolute(workDir, file); 397 FileStatus fileStat = getFileStatus(client, absolute); 398 if (fileStat.isFile()) { 399 return new FileStatus[] { fileStat }; 400 } 401 FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath()); 402 FileStatus[] fileStats = new FileStatus[ftpFiles.length]; 403 for (int i = 0; i < ftpFiles.length; i++) { 404 fileStats[i] = getFileStatus(ftpFiles[i], absolute); 405 } 406 return fileStats; 407 } 408 409 @Override 410 public FileStatus getFileStatus(Path file) throws IOException { 411 FTPClient client = connect(); 412 try { 413 FileStatus status = getFileStatus(client, file); 414 return status; 415 } finally { 416 disconnect(client); 417 } 418 } 419 420 /** 421 * Convenience method, so that we don't open a new connection when using this 422 * method from within another method. Otherwise every API invocation incurs 423 * the overhead of opening/closing a TCP connection. 424 */ 425 private FileStatus getFileStatus(FTPClient client, Path file) 426 throws IOException { 427 FileStatus fileStat = null; 428 Path workDir = new Path(client.printWorkingDirectory()); 429 Path absolute = makeAbsolute(workDir, file); 430 Path parentPath = absolute.getParent(); 431 if (parentPath == null) { // root dir 432 long length = -1; // Length of root dir on server not known 433 boolean isDir = true; 434 int blockReplication = 1; 435 long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known. 436 long modTime = -1; // Modification time of root dir not known. 437 Path root = new Path("/"); 438 return new FileStatus(length, isDir, blockReplication, blockSize, 439 modTime, root.makeQualified(this)); 440 } 441 String pathName = parentPath.toUri().getPath(); 442 FTPFile[] ftpFiles = client.listFiles(pathName); 443 if (ftpFiles != null) { 444 for (FTPFile ftpFile : ftpFiles) { 445 if (ftpFile.getName().equals(file.getName())) { // file found in dir 446 fileStat = getFileStatus(ftpFile, parentPath); 447 break; 448 } 449 } 450 if (fileStat == null) { 451 throw new FileNotFoundException("File " + file + " does not exist."); 452 } 453 } else { 454 throw new FileNotFoundException("File " + file + " does not exist."); 455 } 456 return fileStat; 457 } 458 459 /** 460 * Convert the file information in FTPFile to a {@link FileStatus} object. * 461 * 462 * @param ftpFile 463 * @param parentPath 464 * @return FileStatus 465 */ 466 private FileStatus getFileStatus(FTPFile ftpFile, Path parentPath) { 467 long length = ftpFile.getSize(); 468 boolean isDir = ftpFile.isDirectory(); 469 int blockReplication = 1; 470 // Using default block size since there is no way in FTP client to know of 471 // block sizes on server. The assumption could be less than ideal. 472 long blockSize = DEFAULT_BLOCK_SIZE; 473 long modTime = ftpFile.getTimestamp().getTimeInMillis(); 474 long accessTime = 0; 475 FsPermission permission = getPermissions(ftpFile); 476 String user = ftpFile.getUser(); 477 String group = ftpFile.getGroup(); 478 Path filePath = new Path(parentPath, ftpFile.getName()); 479 return new FileStatus(length, isDir, blockReplication, blockSize, modTime, 480 accessTime, permission, user, group, filePath.makeQualified(this)); 481 } 482 483 @Override 484 public boolean mkdirs(Path file, FsPermission permission) throws IOException { 485 FTPClient client = connect(); 486 try { 487 boolean success = mkdirs(client, file, permission); 488 return success; 489 } finally { 490 disconnect(client); 491 } 492 } 493 494 /** 495 * Convenience method, so that we don't open a new connection when using this 496 * method from within another method. Otherwise every API invocation incurs 497 * the overhead of opening/closing a TCP connection. 498 */ 499 private boolean mkdirs(FTPClient client, Path file, FsPermission permission) 500 throws IOException { 501 boolean created = true; 502 Path workDir = new Path(client.printWorkingDirectory()); 503 Path absolute = makeAbsolute(workDir, file); 504 String pathName = absolute.getName(); 505 if (!exists(client, absolute)) { 506 Path parent = absolute.getParent(); 507 created = (parent == null || mkdirs(client, parent, FsPermission 508 .getDirDefault())); 509 if (created) { 510 String parentDir = parent.toUri().getPath(); 511 client.changeWorkingDirectory(parentDir); 512 created = created && client.makeDirectory(pathName); 513 } 514 } else if (isFile(client, absolute)) { 515 throw new ParentNotDirectoryException(String.format( 516 "Can't make directory for path %s since it is a file.", absolute)); 517 } 518 return created; 519 } 520 521 /** 522 * Convenience method, so that we don't open a new connection when using this 523 * method from within another method. Otherwise every API invocation incurs 524 * the overhead of opening/closing a TCP connection. 525 */ 526 private boolean isFile(FTPClient client, Path file) { 527 try { 528 return getFileStatus(client, file).isFile(); 529 } catch (FileNotFoundException e) { 530 return false; // file does not exist 531 } catch (IOException ioe) { 532 throw new FTPException("File check failed", ioe); 533 } 534 } 535 536 /* 537 * Assuming that parent of both source and destination is the same. Is the 538 * assumption correct or it is suppose to work like 'move' ? 539 */ 540 @Override 541 public boolean rename(Path src, Path dst) throws IOException { 542 FTPClient client = connect(); 543 try { 544 boolean success = rename(client, src, dst); 545 return success; 546 } finally { 547 disconnect(client); 548 } 549 } 550 551 /** 552 * Probe for a path being a parent of another 553 * @param parent parent path 554 * @param child possible child path 555 * @return true if the parent's path matches the start of the child's 556 */ 557 private boolean isParentOf(Path parent, Path child) { 558 URI parentURI = parent.toUri(); 559 String parentPath = parentURI.getPath(); 560 if (!parentPath.endsWith("/")) { 561 parentPath += "/"; 562 } 563 URI childURI = child.toUri(); 564 String childPath = childURI.getPath(); 565 return childPath.startsWith(parentPath); 566 } 567 568 /** 569 * Convenience method, so that we don't open a new connection when using this 570 * method from within another method. Otherwise every API invocation incurs 571 * the overhead of opening/closing a TCP connection. 572 * 573 * @param client 574 * @param src 575 * @param dst 576 * @return 577 * @throws IOException 578 */ 579 private boolean rename(FTPClient client, Path src, Path dst) 580 throws IOException { 581 Path workDir = new Path(client.printWorkingDirectory()); 582 Path absoluteSrc = makeAbsolute(workDir, src); 583 Path absoluteDst = makeAbsolute(workDir, dst); 584 if (!exists(client, absoluteSrc)) { 585 throw new FileNotFoundException("Source path " + src + " does not exist"); 586 } 587 if (isDirectory(absoluteDst)) { 588 // destination is a directory: rename goes underneath it with the 589 // source name 590 absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); 591 } 592 if (exists(client, absoluteDst)) { 593 throw new FileAlreadyExistsException("Destination path " + dst 594 + " already exists"); 595 } 596 String parentSrc = absoluteSrc.getParent().toUri().toString(); 597 String parentDst = absoluteDst.getParent().toUri().toString(); 598 if (isParentOf(absoluteSrc, absoluteDst)) { 599 throw new IOException("Cannot rename " + absoluteSrc + " under itself" 600 + " : "+ absoluteDst); 601 } 602 603 if (!parentSrc.equals(parentDst)) { 604 throw new IOException("Cannot rename source: " + absoluteSrc 605 + " to " + absoluteDst 606 + " -"+ E_SAME_DIRECTORY_ONLY); 607 } 608 String from = absoluteSrc.getName(); 609 String to = absoluteDst.getName(); 610 client.changeWorkingDirectory(parentSrc); 611 boolean renamed = client.rename(from, to); 612 return renamed; 613 } 614 615 @Override 616 public Path getWorkingDirectory() { 617 // Return home directory always since we do not maintain state. 618 return getHomeDirectory(); 619 } 620 621 @Override 622 public Path getHomeDirectory() { 623 FTPClient client = null; 624 try { 625 client = connect(); 626 Path homeDir = new Path(client.printWorkingDirectory()); 627 return homeDir; 628 } catch (IOException ioe) { 629 throw new FTPException("Failed to get home directory", ioe); 630 } finally { 631 try { 632 disconnect(client); 633 } catch (IOException ioe) { 634 throw new FTPException("Failed to disconnect", ioe); 635 } 636 } 637 } 638 639 @Override 640 public void setWorkingDirectory(Path newDir) { 641 // we do not maintain the working directory state 642 } 643}