001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022import java.nio.charset.StandardCharsets;
023import java.util.*;
024import java.rmi.server.UID;
025import java.security.MessageDigest;
026
027import org.apache.commons.logging.*;
028import org.apache.hadoop.util.Options;
029import org.apache.hadoop.fs.*;
030import org.apache.hadoop.fs.Options.CreateOpts;
031import org.apache.hadoop.io.compress.CodecPool;
032import org.apache.hadoop.io.compress.CompressionCodec;
033import org.apache.hadoop.io.compress.CompressionInputStream;
034import org.apache.hadoop.io.compress.CompressionOutputStream;
035import org.apache.hadoop.io.compress.Compressor;
036import org.apache.hadoop.io.compress.Decompressor;
037import org.apache.hadoop.io.compress.DefaultCodec;
038import org.apache.hadoop.io.compress.GzipCodec;
039import org.apache.hadoop.io.compress.zlib.ZlibFactory;
040import org.apache.hadoop.io.serializer.Deserializer;
041import org.apache.hadoop.io.serializer.Serializer;
042import org.apache.hadoop.io.serializer.SerializationFactory;
043import org.apache.hadoop.classification.InterfaceAudience;
044import org.apache.hadoop.classification.InterfaceStability;
045import org.apache.hadoop.conf.*;
046import org.apache.hadoop.util.Progressable;
047import org.apache.hadoop.util.Progress;
048import org.apache.hadoop.util.ReflectionUtils;
049import org.apache.hadoop.util.NativeCodeLoader;
050import org.apache.hadoop.util.MergeSort;
051import org.apache.hadoop.util.PriorityQueue;
052import org.apache.hadoop.util.Time;
053
054/** 
055 * <code>SequenceFile</code>s are flat files consisting of binary key/value 
056 * pairs.
057 * 
058 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer},
059 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing,
060 * reading and sorting respectively.</p>
061 * 
062 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
063 * {@link CompressionType} used to compress key/value pairs:
064 * <ol>
065 *   <li>
066 *   <code>Writer</code> : Uncompressed records.
067 *   </li>
068 *   <li>
069 *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
070 *                                       values.
071 *   </li>
072 *   <li>
073 *   <code>BlockCompressWriter</code> : Block-compressed files, both keys & 
074 *                                      values are collected in 'blocks' 
075 *                                      separately and compressed. The size of 
076 *                                      the 'block' is configurable.
077 * </ol>
078 * 
079 * <p>The actual compression algorithm used to compress key and/or values can be
080 * specified by using the appropriate {@link CompressionCodec}.</p>
081 * 
082 * <p>The recommended way is to use the static <tt>createWriter</tt> methods
083 * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
084 *
085 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the
086 * above <code>SequenceFile</code> formats.</p>
087 *
088 * <h4 id="Formats">SequenceFile Formats</h4>
089 * 
090 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
091 * depending on the <code>CompressionType</code> specified. All of them share a
092 * <a href="#Header">common header</a> described below.
093 * 
094 * <h5 id="Header">SequenceFile Header</h5>
095 * <ul>
096 *   <li>
097 *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
098 *             version number (e.g. SEQ4 or SEQ6)
099 *   </li>
100 *   <li>
101 *   keyClassName -key class
102 *   </li>
103 *   <li>
104 *   valueClassName - value class
105 *   </li>
106 *   <li>
107 *   compression - A boolean which specifies if compression is turned on for 
108 *                 keys/values in this file.
109 *   </li>
110 *   <li>
111 *   blockCompression - A boolean which specifies if block-compression is 
112 *                      turned on for keys/values in this file.
113 *   </li>
114 *   <li>
115 *   compression codec - <code>CompressionCodec</code> class which is used for  
116 *                       compression of keys and/or values (if compression is 
117 *                       enabled).
118 *   </li>
119 *   <li>
120 *   metadata - {@link Metadata} for this file.
121 *   </li>
122 *   <li>
123 *   sync - A sync marker to denote end of the header.
124 *   </li>
125 * </ul>
126 * 
127 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
128 * <ul>
129 * <li>
130 * <a href="#Header">Header</a>
131 * </li>
132 * <li>
133 * Record
134 *   <ul>
135 *     <li>Record length</li>
136 *     <li>Key length</li>
137 *     <li>Key</li>
138 *     <li>Value</li>
139 *   </ul>
140 * </li>
141 * <li>
142 * A sync-marker every few <code>100</code> bytes or so.
143 * </li>
144 * </ul>
145 *
146 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
147 * <ul>
148 * <li>
149 * <a href="#Header">Header</a>
150 * </li>
151 * <li>
152 * Record
153 *   <ul>
154 *     <li>Record length</li>
155 *     <li>Key length</li>
156 *     <li>Key</li>
157 *     <li><i>Compressed</i> Value</li>
158 *   </ul>
159 * </li>
160 * <li>
161 * A sync-marker every few <code>100</code> bytes or so.
162 * </li>
163 * </ul>
164 * 
165 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
166 * <ul>
167 * <li>
168 * <a href="#Header">Header</a>
169 * </li>
170 * <li>
171 * Record <i>Block</i>
172 *   <ul>
173 *     <li>Uncompressed number of records in the block</li>
174 *     <li>Compressed key-lengths block-size</li>
175 *     <li>Compressed key-lengths block</li>
176 *     <li>Compressed keys block-size</li>
177 *     <li>Compressed keys block</li>
178 *     <li>Compressed value-lengths block-size</li>
179 *     <li>Compressed value-lengths block</li>
180 *     <li>Compressed values block-size</li>
181 *     <li>Compressed values block</li>
182 *   </ul>
183 * </li>
184 * <li>
185 * A sync-marker every block.
186 * </li>
187 * </ul>
188 * 
189 * <p>The compressed blocks of key lengths and value lengths consist of the 
190 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
191 * format.</p>
192 * 
193 * @see CompressionCodec
194 */
195@InterfaceAudience.Public
196@InterfaceStability.Stable
197public class SequenceFile {
198  private static final Log LOG = LogFactory.getLog(SequenceFile.class);
199
200  private SequenceFile() {}                         // no public ctor
201
202  private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
203  private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
204  private static final byte VERSION_WITH_METADATA = (byte)6;
205  private static byte[] VERSION = new byte[] {
206    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
207  };
208
209  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
210  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
211  private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
212
213  /** The number of bytes between sync points.*/
214  public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
215
216  /** 
217   * The compression type used to compress key/value pairs in the 
218   * {@link SequenceFile}.
219   * 
220   * @see SequenceFile.Writer
221   */
222  public static enum CompressionType {
223    /** Do not compress records. */
224    NONE, 
225    /** Compress values only, each separately. */
226    RECORD,
227    /** Compress sequences of records together in blocks. */
228    BLOCK
229  }
230
231  /**
232   * Get the compression type for the reduce outputs
233   * @param job the job config to look in
234   * @return the kind of compression to use
235   */
236  static public CompressionType getDefaultCompressionType(Configuration job) {
237    String name = job.get("io.seqfile.compression.type");
238    return name == null ? CompressionType.RECORD : 
239      CompressionType.valueOf(name);
240  }
241  
242  /**
243   * Set the default compression type for sequence files.
244   * @param job the configuration to modify
245   * @param val the new compression type (none, block, record)
246   */
247  static public void setDefaultCompressionType(Configuration job, 
248                                               CompressionType val) {
249    job.set("io.seqfile.compression.type", val.toString());
250  }
251
252  /**
253   * Create a new Writer with the given options.
254   * @param conf the configuration to use
255   * @param opts the options to create the file with
256   * @return a new Writer
257   * @throws IOException
258   */
259  public static Writer createWriter(Configuration conf, Writer.Option... opts
260                                    ) throws IOException {
261    Writer.CompressionOption compressionOption = 
262      Options.getOption(Writer.CompressionOption.class, opts);
263    CompressionType kind;
264    if (compressionOption != null) {
265      kind = compressionOption.getValue();
266    } else {
267      kind = getDefaultCompressionType(conf);
268      opts = Options.prependOptions(opts, Writer.compression(kind));
269    }
270    switch (kind) {
271      default:
272      case NONE:
273        return new Writer(conf, opts);
274      case RECORD:
275        return new RecordCompressWriter(conf, opts);
276      case BLOCK:
277        return new BlockCompressWriter(conf, opts);
278    }
279  }
280
281  /**
282   * Construct the preferred type of SequenceFile Writer.
283   * @param fs The configured filesystem. 
284   * @param conf The configuration.
285   * @param name The name of the file. 
286   * @param keyClass The 'key' type.
287   * @param valClass The 'value' type.
288   * @return Returns the handle to the constructed SequenceFile Writer.
289   * @throws IOException
290   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
291   *     instead.
292   */
293  @Deprecated
294  public static Writer 
295    createWriter(FileSystem fs, Configuration conf, Path name, 
296                 Class keyClass, Class valClass) throws IOException {
297    return createWriter(conf, Writer.filesystem(fs),
298                        Writer.file(name), Writer.keyClass(keyClass),
299                        Writer.valueClass(valClass));
300  }
301  
302  /**
303   * Construct the preferred type of SequenceFile Writer.
304   * @param fs The configured filesystem. 
305   * @param conf The configuration.
306   * @param name The name of the file. 
307   * @param keyClass The 'key' type.
308   * @param valClass The 'value' type.
309   * @param compressionType The compression type.
310   * @return Returns the handle to the constructed SequenceFile Writer.
311   * @throws IOException
312   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
313   *     instead.
314   */
315  @Deprecated
316  public static Writer 
317    createWriter(FileSystem fs, Configuration conf, Path name, 
318                 Class keyClass, Class valClass, 
319                 CompressionType compressionType) throws IOException {
320    return createWriter(conf, Writer.filesystem(fs),
321                        Writer.file(name), Writer.keyClass(keyClass),
322                        Writer.valueClass(valClass), 
323                        Writer.compression(compressionType));
324  }
325  
326  /**
327   * Construct the preferred type of SequenceFile Writer.
328   * @param fs The configured filesystem. 
329   * @param conf The configuration.
330   * @param name The name of the file. 
331   * @param keyClass The 'key' type.
332   * @param valClass The 'value' type.
333   * @param compressionType The compression type.
334   * @param progress The Progressable object to track progress.
335   * @return Returns the handle to the constructed SequenceFile Writer.
336   * @throws IOException
337   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
338   *     instead.
339   */
340  @Deprecated
341  public static Writer
342    createWriter(FileSystem fs, Configuration conf, Path name, 
343                 Class keyClass, Class valClass, CompressionType compressionType,
344                 Progressable progress) throws IOException {
345    return createWriter(conf, Writer.file(name),
346                        Writer.filesystem(fs),
347                        Writer.keyClass(keyClass),
348                        Writer.valueClass(valClass), 
349                        Writer.compression(compressionType),
350                        Writer.progressable(progress));
351  }
352
353  /**
354   * Construct the preferred type of SequenceFile Writer.
355   * @param fs The configured filesystem. 
356   * @param conf The configuration.
357   * @param name The name of the file. 
358   * @param keyClass The 'key' type.
359   * @param valClass The 'value' type.
360   * @param compressionType The compression type.
361   * @param codec The compression codec.
362   * @return Returns the handle to the constructed SequenceFile Writer.
363   * @throws IOException
364   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
365   *     instead.
366   */
367  @Deprecated
368  public static Writer 
369    createWriter(FileSystem fs, Configuration conf, Path name, 
370                 Class keyClass, Class valClass, CompressionType compressionType, 
371                 CompressionCodec codec) throws IOException {
372    return createWriter(conf, Writer.file(name),
373                        Writer.filesystem(fs),
374                        Writer.keyClass(keyClass),
375                        Writer.valueClass(valClass), 
376                        Writer.compression(compressionType, codec));
377  }
378  
379  /**
380   * Construct the preferred type of SequenceFile Writer.
381   * @param fs The configured filesystem. 
382   * @param conf The configuration.
383   * @param name The name of the file. 
384   * @param keyClass The 'key' type.
385   * @param valClass The 'value' type.
386   * @param compressionType The compression type.
387   * @param codec The compression codec.
388   * @param progress The Progressable object to track progress.
389   * @param metadata The metadata of the file.
390   * @return Returns the handle to the constructed SequenceFile Writer.
391   * @throws IOException
392   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
393   *     instead.
394   */
395  @Deprecated
396  public static Writer
397    createWriter(FileSystem fs, Configuration conf, Path name, 
398                 Class keyClass, Class valClass, 
399                 CompressionType compressionType, CompressionCodec codec,
400                 Progressable progress, Metadata metadata) throws IOException {
401    return createWriter(conf, Writer.file(name),
402                        Writer.filesystem(fs),
403                        Writer.keyClass(keyClass),
404                        Writer.valueClass(valClass),
405                        Writer.compression(compressionType, codec),
406                        Writer.progressable(progress),
407                        Writer.metadata(metadata));
408  }
409
410  /**
411   * Construct the preferred type of SequenceFile Writer.
412   * @param fs The configured filesystem.
413   * @param conf The configuration.
414   * @param name The name of the file.
415   * @param keyClass The 'key' type.
416   * @param valClass The 'value' type.
417   * @param bufferSize buffer size for the underlaying outputstream.
418   * @param replication replication factor for the file.
419   * @param blockSize block size for the file.
420   * @param compressionType The compression type.
421   * @param codec The compression codec.
422   * @param progress The Progressable object to track progress.
423   * @param metadata The metadata of the file.
424   * @return Returns the handle to the constructed SequenceFile Writer.
425   * @throws IOException
426   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
427   *     instead.
428   */
429  @Deprecated
430  public static Writer
431    createWriter(FileSystem fs, Configuration conf, Path name,
432                 Class keyClass, Class valClass, int bufferSize,
433                 short replication, long blockSize,
434                 CompressionType compressionType, CompressionCodec codec,
435                 Progressable progress, Metadata metadata) throws IOException {
436    return createWriter(conf, Writer.file(name),
437                        Writer.filesystem(fs),
438                        Writer.keyClass(keyClass),
439                        Writer.valueClass(valClass), 
440                        Writer.bufferSize(bufferSize), 
441                        Writer.replication(replication),
442                        Writer.blockSize(blockSize),
443                        Writer.compression(compressionType, codec),
444                        Writer.progressable(progress),
445                        Writer.metadata(metadata));
446  }
447
448  /**
449   * Construct the preferred type of SequenceFile Writer.
450   * @param fs The configured filesystem.
451   * @param conf The configuration.
452   * @param name The name of the file.
453   * @param keyClass The 'key' type.
454   * @param valClass The 'value' type.
455   * @param bufferSize buffer size for the underlaying outputstream.
456   * @param replication replication factor for the file.
457   * @param blockSize block size for the file.
458   * @param createParent create parent directory if non-existent
459   * @param compressionType The compression type.
460   * @param codec The compression codec.
461   * @param metadata The metadata of the file.
462   * @return Returns the handle to the constructed SequenceFile Writer.
463   * @throws IOException
464   */
465  @Deprecated
466  public static Writer
467  createWriter(FileSystem fs, Configuration conf, Path name,
468               Class keyClass, Class valClass, int bufferSize,
469               short replication, long blockSize, boolean createParent,
470               CompressionType compressionType, CompressionCodec codec,
471               Metadata metadata) throws IOException {
472    return createWriter(FileContext.getFileContext(fs.getUri(), conf),
473        conf, name, keyClass, valClass, compressionType, codec,
474        metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
475        CreateOpts.bufferSize(bufferSize),
476        createParent ? CreateOpts.createParent()
477                     : CreateOpts.donotCreateParent(),
478        CreateOpts.repFac(replication),
479        CreateOpts.blockSize(blockSize)
480      );
481  }
482
483  /**
484   * Construct the preferred type of SequenceFile Writer.
485   * @param fc The context for the specified file.
486   * @param conf The configuration.
487   * @param name The name of the file.
488   * @param keyClass The 'key' type.
489   * @param valClass The 'value' type.
490   * @param compressionType The compression type.
491   * @param codec The compression codec.
492   * @param metadata The metadata of the file.
493   * @param createFlag gives the semantics of create: overwrite, append etc.
494   * @param opts file creation options; see {@link CreateOpts}.
495   * @return Returns the handle to the constructed SequenceFile Writer.
496   * @throws IOException
497   */
498  public static Writer
499  createWriter(FileContext fc, Configuration conf, Path name,
500               Class keyClass, Class valClass,
501               CompressionType compressionType, CompressionCodec codec,
502               Metadata metadata,
503               final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
504               throws IOException {
505    return createWriter(conf, fc.create(name, createFlag, opts),
506          keyClass, valClass, compressionType, codec, metadata).ownStream();
507  }
508
509  /**
510   * Construct the preferred type of SequenceFile Writer.
511   * @param fs The configured filesystem. 
512   * @param conf The configuration.
513   * @param name The name of the file. 
514   * @param keyClass The 'key' type.
515   * @param valClass The 'value' type.
516   * @param compressionType The compression type.
517   * @param codec The compression codec.
518   * @param progress The Progressable object to track progress.
519   * @return Returns the handle to the constructed SequenceFile Writer.
520   * @throws IOException
521   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
522   *     instead.
523   */
524  @Deprecated
525  public static Writer
526    createWriter(FileSystem fs, Configuration conf, Path name, 
527                 Class keyClass, Class valClass, 
528                 CompressionType compressionType, CompressionCodec codec,
529                 Progressable progress) throws IOException {
530    return createWriter(conf, Writer.file(name),
531                        Writer.filesystem(fs),
532                        Writer.keyClass(keyClass),
533                        Writer.valueClass(valClass),
534                        Writer.compression(compressionType, codec),
535                        Writer.progressable(progress));
536  }
537
538  /**
539   * Construct the preferred type of 'raw' SequenceFile Writer.
540   * @param conf The configuration.
541   * @param out The stream on top which the writer is to be constructed.
542   * @param keyClass The 'key' type.
543   * @param valClass The 'value' type.
544   * @param compressionType The compression type.
545   * @param codec The compression codec.
546   * @param metadata The metadata of the file.
547   * @return Returns the handle to the constructed SequenceFile Writer.
548   * @throws IOException
549   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
550   *     instead.
551   */
552  @Deprecated
553  public static Writer
554    createWriter(Configuration conf, FSDataOutputStream out, 
555                 Class keyClass, Class valClass,
556                 CompressionType compressionType,
557                 CompressionCodec codec, Metadata metadata) throws IOException {
558    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
559                        Writer.valueClass(valClass), 
560                        Writer.compression(compressionType, codec),
561                        Writer.metadata(metadata));
562  }
563  
564  /**
565   * Construct the preferred type of 'raw' SequenceFile Writer.
566   * @param conf The configuration.
567   * @param out The stream on top which the writer is to be constructed.
568   * @param keyClass The 'key' type.
569   * @param valClass The 'value' type.
570   * @param compressionType The compression type.
571   * @param codec The compression codec.
572   * @return Returns the handle to the constructed SequenceFile Writer.
573   * @throws IOException
574   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
575   *     instead.
576   */
577  @Deprecated
578  public static Writer
579    createWriter(Configuration conf, FSDataOutputStream out, 
580                 Class keyClass, Class valClass, CompressionType compressionType,
581                 CompressionCodec codec) throws IOException {
582    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
583                        Writer.valueClass(valClass),
584                        Writer.compression(compressionType, codec));
585  }
586  
587
588  /** The interface to 'raw' values of SequenceFiles. */
589  public static interface ValueBytes {
590
591    /** Writes the uncompressed bytes to the outStream.
592     * @param outStream : Stream to write uncompressed bytes into.
593     * @throws IOException
594     */
595    public void writeUncompressedBytes(DataOutputStream outStream)
596      throws IOException;
597
598    /** Write compressed bytes to outStream. 
599     * Note: that it will NOT compress the bytes if they are not compressed.
600     * @param outStream : Stream to write compressed bytes into.
601     */
602    public void writeCompressedBytes(DataOutputStream outStream) 
603      throws IllegalArgumentException, IOException;
604
605    /**
606     * Size of stored data.
607     */
608    public int getSize();
609  }
610  
611  private static class UncompressedBytes implements ValueBytes {
612    private int dataSize;
613    private byte[] data;
614    
615    private UncompressedBytes() {
616      data = null;
617      dataSize = 0;
618    }
619    
620    private void reset(DataInputStream in, int length) throws IOException {
621      if (data == null) {
622        data = new byte[length];
623      } else if (length > data.length) {
624        data = new byte[Math.max(length, data.length * 2)];
625      }
626      dataSize = -1;
627      in.readFully(data, 0, length);
628      dataSize = length;
629    }
630    
631    @Override
632    public int getSize() {
633      return dataSize;
634    }
635    
636    @Override
637    public void writeUncompressedBytes(DataOutputStream outStream)
638      throws IOException {
639      outStream.write(data, 0, dataSize);
640    }
641
642    @Override
643    public void writeCompressedBytes(DataOutputStream outStream) 
644      throws IllegalArgumentException, IOException {
645      throw 
646        new IllegalArgumentException("UncompressedBytes cannot be compressed!");
647    }
648
649  } // UncompressedBytes
650  
651  private static class CompressedBytes implements ValueBytes {
652    private int dataSize;
653    private byte[] data;
654    DataInputBuffer rawData = null;
655    CompressionCodec codec = null;
656    CompressionInputStream decompressedStream = null;
657
658    private CompressedBytes(CompressionCodec codec) {
659      data = null;
660      dataSize = 0;
661      this.codec = codec;
662    }
663
664    private void reset(DataInputStream in, int length) throws IOException {
665      if (data == null) {
666        data = new byte[length];
667      } else if (length > data.length) {
668        data = new byte[Math.max(length, data.length * 2)];
669      } 
670      dataSize = -1;
671      in.readFully(data, 0, length);
672      dataSize = length;
673    }
674    
675    @Override
676    public int getSize() {
677      return dataSize;
678    }
679    
680    @Override
681    public void writeUncompressedBytes(DataOutputStream outStream)
682      throws IOException {
683      if (decompressedStream == null) {
684        rawData = new DataInputBuffer();
685        decompressedStream = codec.createInputStream(rawData);
686      } else {
687        decompressedStream.resetState();
688      }
689      rawData.reset(data, 0, dataSize);
690
691      byte[] buffer = new byte[8192];
692      int bytesRead = 0;
693      while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
694        outStream.write(buffer, 0, bytesRead);
695      }
696    }
697
698    @Override
699    public void writeCompressedBytes(DataOutputStream outStream) 
700      throws IllegalArgumentException, IOException {
701      outStream.write(data, 0, dataSize);
702    }
703
704  } // CompressedBytes
705  
706  /**
707   * The class encapsulating with the metadata of a file.
708   * The metadata of a file is a list of attribute name/value
709   * pairs of Text type.
710   *
711   */
712  public static class Metadata implements Writable {
713
714    private TreeMap<Text, Text> theMetadata;
715    
716    public Metadata() {
717      this(new TreeMap<Text, Text>());
718    }
719    
720    public Metadata(TreeMap<Text, Text> arg) {
721      if (arg == null) {
722        this.theMetadata = new TreeMap<Text, Text>();
723      } else {
724        this.theMetadata = arg;
725      }
726    }
727    
728    public Text get(Text name) {
729      return this.theMetadata.get(name);
730    }
731    
732    public void set(Text name, Text value) {
733      this.theMetadata.put(name, value);
734    }
735    
736    public TreeMap<Text, Text> getMetadata() {
737      return new TreeMap<Text, Text>(this.theMetadata);
738    }
739    
740    @Override
741    public void write(DataOutput out) throws IOException {
742      out.writeInt(this.theMetadata.size());
743      Iterator<Map.Entry<Text, Text>> iter =
744        this.theMetadata.entrySet().iterator();
745      while (iter.hasNext()) {
746        Map.Entry<Text, Text> en = iter.next();
747        en.getKey().write(out);
748        en.getValue().write(out);
749      }
750    }
751
752    @Override
753    public void readFields(DataInput in) throws IOException {
754      int sz = in.readInt();
755      if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
756      this.theMetadata = new TreeMap<Text, Text>();
757      for (int i = 0; i < sz; i++) {
758        Text key = new Text();
759        Text val = new Text();
760        key.readFields(in);
761        val.readFields(in);
762        this.theMetadata.put(key, val);
763      }    
764    }
765
766    @Override
767    public boolean equals(Object other) {
768      if (other == null) {
769        return false;
770      }
771      if (other.getClass() != this.getClass()) {
772        return false;
773      } else {
774        return equals((Metadata)other);
775      }
776    }
777    
778    public boolean equals(Metadata other) {
779      if (other == null) return false;
780      if (this.theMetadata.size() != other.theMetadata.size()) {
781        return false;
782      }
783      Iterator<Map.Entry<Text, Text>> iter1 =
784        this.theMetadata.entrySet().iterator();
785      Iterator<Map.Entry<Text, Text>> iter2 =
786        other.theMetadata.entrySet().iterator();
787      while (iter1.hasNext() && iter2.hasNext()) {
788        Map.Entry<Text, Text> en1 = iter1.next();
789        Map.Entry<Text, Text> en2 = iter2.next();
790        if (!en1.getKey().equals(en2.getKey())) {
791          return false;
792        }
793        if (!en1.getValue().equals(en2.getValue())) {
794          return false;
795        }
796      }
797      if (iter1.hasNext() || iter2.hasNext()) {
798        return false;
799      }
800      return true;
801    }
802
803    @Override
804    public int hashCode() {
805      assert false : "hashCode not designed";
806      return 42; // any arbitrary constant will do 
807    }
808    
809    @Override
810    public String toString() {
811      StringBuilder sb = new StringBuilder();
812      sb.append("size: ").append(this.theMetadata.size()).append("\n");
813      Iterator<Map.Entry<Text, Text>> iter =
814        this.theMetadata.entrySet().iterator();
815      while (iter.hasNext()) {
816        Map.Entry<Text, Text> en = iter.next();
817        sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
818        sb.append("\n");
819      }
820      return sb.toString();
821    }
822  }
823  
824  /** Write key/value pairs to a sequence-format file. */
825  public static class Writer implements java.io.Closeable, Syncable {
826    private Configuration conf;
827    FSDataOutputStream out;
828    boolean ownOutputStream = true;
829    DataOutputBuffer buffer = new DataOutputBuffer();
830
831    Class keyClass;
832    Class valClass;
833
834    private final CompressionType compress;
835    CompressionCodec codec = null;
836    CompressionOutputStream deflateFilter = null;
837    DataOutputStream deflateOut = null;
838    Metadata metadata = null;
839    Compressor compressor = null;
840
841    private boolean appendMode = false;
842
843    protected Serializer keySerializer;
844    protected Serializer uncompressedValSerializer;
845    protected Serializer compressedValSerializer;
846    
847    // Insert a globally unique 16-byte value every few entries, so that one
848    // can seek into the middle of a file and then synchronize with record
849    // starts and ends by scanning for this value.
850    long lastSyncPos;                     // position of last sync
851    byte[] sync;                          // 16 random bytes
852    {
853      try {                                       
854        MessageDigest digester = MessageDigest.getInstance("MD5");
855        long time = Time.now();
856        digester.update((new UID()+"@"+time).getBytes(StandardCharsets.UTF_8));
857        sync = digester.digest();
858      } catch (Exception e) {
859        throw new RuntimeException(e);
860      }
861    }
862
863    public static interface Option {}
864    
865    static class FileOption extends Options.PathOption 
866                                    implements Option {
867      FileOption(Path path) {
868        super(path);
869      }
870    }
871
872    /**
873     * @deprecated only used for backwards-compatibility in the createWriter methods
874     * that take FileSystem.
875     */
876    @Deprecated
877    private static class FileSystemOption implements Option {
878      private final FileSystem value;
879      protected FileSystemOption(FileSystem value) {
880        this.value = value;
881      }
882      public FileSystem getValue() {
883        return value;
884      }
885    }
886
887    static class StreamOption extends Options.FSDataOutputStreamOption 
888                              implements Option {
889      StreamOption(FSDataOutputStream stream) {
890        super(stream);
891      }
892    }
893
894    static class BufferSizeOption extends Options.IntegerOption
895                                  implements Option {
896      BufferSizeOption(int value) {
897        super(value);
898      }
899    }
900    
901    static class BlockSizeOption extends Options.LongOption implements Option {
902      BlockSizeOption(long value) {
903        super(value);
904      }
905    }
906
907    static class ReplicationOption extends Options.IntegerOption
908                                   implements Option {
909      ReplicationOption(int value) {
910        super(value);
911      }
912    }
913
914    static class AppendIfExistsOption extends Options.BooleanOption implements
915        Option {
916      AppendIfExistsOption(boolean value) {
917        super(value);
918      }
919    }
920
921    static class KeyClassOption extends Options.ClassOption implements Option {
922      KeyClassOption(Class<?> value) {
923        super(value);
924      }
925    }
926
927    static class ValueClassOption extends Options.ClassOption
928                                          implements Option {
929      ValueClassOption(Class<?> value) {
930        super(value);
931      }
932    }
933
934    static class MetadataOption implements Option {
935      private final Metadata value;
936      MetadataOption(Metadata value) {
937        this.value = value;
938      }
939      Metadata getValue() {
940        return value;
941      }
942    }
943
944    static class ProgressableOption extends Options.ProgressableOption
945                                    implements Option {
946      ProgressableOption(Progressable value) {
947        super(value);
948      }
949    }
950
951    private static class CompressionOption implements Option {
952      private final CompressionType value;
953      private final CompressionCodec codec;
954      CompressionOption(CompressionType value) {
955        this(value, null);
956      }
957      CompressionOption(CompressionType value, CompressionCodec codec) {
958        this.value = value;
959        this.codec = (CompressionType.NONE != value && null == codec)
960          ? new DefaultCodec()
961          : codec;
962      }
963      CompressionType getValue() {
964        return value;
965      }
966      CompressionCodec getCodec() {
967        return codec;
968      }
969    }
970
971    public static Option file(Path value) {
972      return new FileOption(value);
973    }
974
975    /**
976     * @deprecated only used for backwards-compatibility in the createWriter methods
977     * that take FileSystem.
978     */
979    @Deprecated
980    private static Option filesystem(FileSystem fs) {
981      return new SequenceFile.Writer.FileSystemOption(fs);
982    }
983    
984    public static Option bufferSize(int value) {
985      return new BufferSizeOption(value);
986    }
987    
988    public static Option stream(FSDataOutputStream value) {
989      return new StreamOption(value);
990    }
991    
992    public static Option replication(short value) {
993      return new ReplicationOption(value);
994    }
995    
996    public static Option appendIfExists(boolean value) {
997      return new AppendIfExistsOption(value);
998    }
999
1000    public static Option blockSize(long value) {
1001      return new BlockSizeOption(value);
1002    }
1003    
1004    public static Option progressable(Progressable value) {
1005      return new ProgressableOption(value);
1006    }
1007
1008    public static Option keyClass(Class<?> value) {
1009      return new KeyClassOption(value);
1010    }
1011    
1012    public static Option valueClass(Class<?> value) {
1013      return new ValueClassOption(value);
1014    }
1015    
1016    public static Option metadata(Metadata value) {
1017      return new MetadataOption(value);
1018    }
1019
1020    public static Option compression(CompressionType value) {
1021      return new CompressionOption(value);
1022    }
1023
1024    public static Option compression(CompressionType value,
1025        CompressionCodec codec) {
1026      return new CompressionOption(value, codec);
1027    }
1028    
1029    /**
1030     * Construct a uncompressed writer from a set of options.
1031     * @param conf the configuration to use
1032     * @param options the options used when creating the writer
1033     * @throws IOException if it fails
1034     */
1035    Writer(Configuration conf, 
1036           Option... opts) throws IOException {
1037      BlockSizeOption blockSizeOption = 
1038        Options.getOption(BlockSizeOption.class, opts);
1039      BufferSizeOption bufferSizeOption = 
1040        Options.getOption(BufferSizeOption.class, opts);
1041      ReplicationOption replicationOption = 
1042        Options.getOption(ReplicationOption.class, opts);
1043      ProgressableOption progressOption = 
1044        Options.getOption(ProgressableOption.class, opts);
1045      FileOption fileOption = Options.getOption(FileOption.class, opts);
1046      AppendIfExistsOption appendIfExistsOption = Options.getOption(
1047          AppendIfExistsOption.class, opts);
1048      FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
1049      StreamOption streamOption = Options.getOption(StreamOption.class, opts);
1050      KeyClassOption keyClassOption = 
1051        Options.getOption(KeyClassOption.class, opts);
1052      ValueClassOption valueClassOption = 
1053        Options.getOption(ValueClassOption.class, opts);
1054      MetadataOption metadataOption = 
1055        Options.getOption(MetadataOption.class, opts);
1056      CompressionOption compressionTypeOption =
1057        Options.getOption(CompressionOption.class, opts);
1058      // check consistency of options
1059      if ((fileOption == null) == (streamOption == null)) {
1060        throw new IllegalArgumentException("file or stream must be specified");
1061      }
1062      if (fileOption == null && (blockSizeOption != null ||
1063                                 bufferSizeOption != null ||
1064                                 replicationOption != null ||
1065                                 progressOption != null)) {
1066        throw new IllegalArgumentException("file modifier options not " +
1067                                           "compatible with stream");
1068      }
1069
1070      FSDataOutputStream out;
1071      boolean ownStream = fileOption != null;
1072      if (ownStream) {
1073        Path p = fileOption.getValue();
1074        FileSystem fs;
1075        if (fsOption != null) {
1076          fs = fsOption.getValue();
1077        } else {
1078          fs = p.getFileSystem(conf);
1079        }
1080        int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
1081          bufferSizeOption.getValue();
1082        short replication = replicationOption == null ? 
1083          fs.getDefaultReplication(p) :
1084          (short) replicationOption.getValue();
1085        long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
1086          blockSizeOption.getValue();
1087        Progressable progress = progressOption == null ? null :
1088          progressOption.getValue();
1089
1090        if (appendIfExistsOption != null && appendIfExistsOption.getValue()
1091            && fs.exists(p)) {
1092
1093          // Read the file and verify header details
1094          SequenceFile.Reader reader = new SequenceFile.Reader(conf,
1095              SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption());
1096          try {
1097
1098            if (keyClassOption.getValue() != reader.getKeyClass()
1099                || valueClassOption.getValue() != reader.getValueClass()) {
1100              throw new IllegalArgumentException(
1101                  "Key/value class provided does not match the file");
1102            }
1103
1104            if (reader.getVersion() != VERSION[3]) {
1105              throw new VersionMismatchException(VERSION[3],
1106                  reader.getVersion());
1107            }
1108
1109            if (metadataOption != null) {
1110              LOG.info("MetaData Option is ignored during append");
1111            }
1112            metadataOption = (MetadataOption) SequenceFile.Writer
1113                .metadata(reader.getMetadata());
1114
1115            CompressionOption readerCompressionOption = new CompressionOption(
1116                reader.getCompressionType(), reader.getCompressionCodec());
1117
1118            // Codec comparison will be ignored if the compression is NONE
1119            if (readerCompressionOption.value != compressionTypeOption.value
1120                || (readerCompressionOption.value != CompressionType.NONE
1121                    && readerCompressionOption.codec
1122                        .getClass() != compressionTypeOption.codec
1123                            .getClass())) {
1124              throw new IllegalArgumentException(
1125                  "Compression option provided does not match the file");
1126            }
1127
1128            sync = reader.getSync();
1129
1130          } finally {
1131            reader.close();
1132          }
1133
1134          out = fs.append(p, bufferSize, progress);
1135          this.appendMode = true;
1136        } else {
1137          out = fs
1138              .create(p, true, bufferSize, replication, blockSize, progress);
1139        }
1140      } else {
1141        out = streamOption.getValue();
1142      }
1143      Class<?> keyClass = keyClassOption == null ?
1144          Object.class : keyClassOption.getValue();
1145      Class<?> valueClass = valueClassOption == null ?
1146          Object.class : valueClassOption.getValue();
1147      Metadata metadata = metadataOption == null ?
1148          new Metadata() : metadataOption.getValue();
1149      this.compress = compressionTypeOption.getValue();
1150      final CompressionCodec codec = compressionTypeOption.getCodec();
1151      if (codec != null &&
1152          (codec instanceof GzipCodec) &&
1153          !NativeCodeLoader.isNativeCodeLoaded() &&
1154          !ZlibFactory.isNativeZlibLoaded(conf)) {
1155        throw new IllegalArgumentException("SequenceFile doesn't work with " +
1156                                           "GzipCodec without native-hadoop " +
1157                                           "code!");
1158      }
1159      init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
1160    }
1161
1162    /** Create the named file.
1163     * @deprecated Use 
1164     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1165     *   instead.
1166     */
1167    @Deprecated
1168    public Writer(FileSystem fs, Configuration conf, Path name, 
1169                  Class keyClass, Class valClass) throws IOException {
1170      this.compress = CompressionType.NONE;
1171      init(conf, fs.create(name), true, keyClass, valClass, null, 
1172           new Metadata());
1173    }
1174    
1175    /** Create the named file with write-progress reporter.
1176     * @deprecated Use 
1177     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1178     *   instead.
1179     */
1180    @Deprecated
1181    public Writer(FileSystem fs, Configuration conf, Path name, 
1182                  Class keyClass, Class valClass,
1183                  Progressable progress, Metadata metadata) throws IOException {
1184      this.compress = CompressionType.NONE;
1185      init(conf, fs.create(name, progress), true, keyClass, valClass,
1186           null, metadata);
1187    }
1188    
1189    /** Create the named file with write-progress reporter. 
1190     * @deprecated Use 
1191     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1192     *   instead.
1193     */
1194    @Deprecated
1195    public Writer(FileSystem fs, Configuration conf, Path name,
1196                  Class keyClass, Class valClass,
1197                  int bufferSize, short replication, long blockSize,
1198                  Progressable progress, Metadata metadata) throws IOException {
1199      this.compress = CompressionType.NONE;
1200      init(conf,
1201           fs.create(name, true, bufferSize, replication, blockSize, progress),
1202           true, keyClass, valClass, null, metadata);
1203    }
1204
1205    boolean isCompressed() { return compress != CompressionType.NONE; }
1206    boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
1207    
1208    Writer ownStream() { this.ownOutputStream = true; return this;  }
1209
1210    /** Write and flush the file header. */
1211    private void writeFileHeader() 
1212      throws IOException {
1213      out.write(VERSION);
1214      Text.writeString(out, keyClass.getName());
1215      Text.writeString(out, valClass.getName());
1216      
1217      out.writeBoolean(this.isCompressed());
1218      out.writeBoolean(this.isBlockCompressed());
1219      
1220      if (this.isCompressed()) {
1221        Text.writeString(out, (codec.getClass()).getName());
1222      }
1223      this.metadata.write(out);
1224      out.write(sync);                       // write the sync bytes
1225      out.flush();                           // flush header
1226    }
1227
1228    /** Initialize. */
1229    @SuppressWarnings("unchecked")
1230    void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
1231              Class keyClass, Class valClass,
1232              CompressionCodec codec, Metadata metadata) 
1233      throws IOException {
1234      this.conf = conf;
1235      this.out = out;
1236      this.ownOutputStream = ownStream;
1237      this.keyClass = keyClass;
1238      this.valClass = valClass;
1239      this.codec = codec;
1240      this.metadata = metadata;
1241      SerializationFactory serializationFactory = new SerializationFactory(conf);
1242      this.keySerializer = serializationFactory.getSerializer(keyClass);
1243      if (this.keySerializer == null) {
1244        throw new IOException(
1245            "Could not find a serializer for the Key class: '"
1246                + keyClass.getCanonicalName() + "'. "
1247                + "Please ensure that the configuration '" +
1248                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1249                + "properly configured, if you're using"
1250                + "custom serialization.");
1251      }
1252      this.keySerializer.open(buffer);
1253      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
1254      if (this.uncompressedValSerializer == null) {
1255        throw new IOException(
1256            "Could not find a serializer for the Value class: '"
1257                + valClass.getCanonicalName() + "'. "
1258                + "Please ensure that the configuration '" +
1259                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1260                + "properly configured, if you're using"
1261                + "custom serialization.");
1262      }
1263      this.uncompressedValSerializer.open(buffer);
1264      if (this.codec != null) {
1265        ReflectionUtils.setConf(this.codec, this.conf);
1266        this.compressor = CodecPool.getCompressor(this.codec);
1267        this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
1268        this.deflateOut = 
1269          new DataOutputStream(new BufferedOutputStream(deflateFilter));
1270        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
1271        if (this.compressedValSerializer == null) {
1272          throw new IOException(
1273              "Could not find a serializer for the Value class: '"
1274                  + valClass.getCanonicalName() + "'. "
1275                  + "Please ensure that the configuration '" +
1276                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1277                  + "properly configured, if you're using"
1278                  + "custom serialization.");
1279        }
1280        this.compressedValSerializer.open(deflateOut);
1281      }
1282
1283      if (appendMode) {
1284        sync();
1285      } else {
1286        writeFileHeader();
1287      }
1288    }
1289    
1290    /** Returns the class of keys in this file. */
1291    public Class getKeyClass() { return keyClass; }
1292
1293    /** Returns the class of values in this file. */
1294    public Class getValueClass() { return valClass; }
1295
1296    /** Returns the compression codec of data in this file. */
1297    public CompressionCodec getCompressionCodec() { return codec; }
1298    
1299    /** create a sync point */
1300    public void sync() throws IOException {
1301      if (sync != null && lastSyncPos != out.getPos()) {
1302        out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
1303        out.write(sync);                          // write sync
1304        lastSyncPos = out.getPos();               // update lastSyncPos
1305      }
1306    }
1307
1308    /**
1309     * flush all currently written data to the file system
1310     * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
1311     */
1312    @Deprecated
1313    public void syncFs() throws IOException {
1314      if (out != null) {
1315        out.hflush();  // flush contents to file system
1316      }
1317    }
1318
1319    @Override
1320    public void hsync() throws IOException {
1321      if (out != null) {
1322        out.hsync();
1323      }
1324    }
1325
1326    @Override
1327    public void hflush() throws IOException {
1328      if (out != null) {
1329        out.hflush();
1330      }
1331    }
1332    
1333    /** Returns the configuration of this file. */
1334    Configuration getConf() { return conf; }
1335    
1336    /** Close the file. */
1337    @Override
1338    public synchronized void close() throws IOException {
1339      keySerializer.close();
1340      uncompressedValSerializer.close();
1341      if (compressedValSerializer != null) {
1342        compressedValSerializer.close();
1343      }
1344
1345      CodecPool.returnCompressor(compressor);
1346      compressor = null;
1347      
1348      if (out != null) {
1349        
1350        // Close the underlying stream iff we own it...
1351        if (ownOutputStream) {
1352          out.close();
1353        } else {
1354          out.flush();
1355        }
1356        out = null;
1357      }
1358    }
1359
1360    synchronized void checkAndWriteSync() throws IOException {
1361      if (sync != null &&
1362          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
1363        sync();
1364      }
1365    }
1366
1367    /** Append a key/value pair. */
1368    public void append(Writable key, Writable val)
1369      throws IOException {
1370      append((Object) key, (Object) val);
1371    }
1372
1373    /** Append a key/value pair. */
1374    @SuppressWarnings("unchecked")
1375    public synchronized void append(Object key, Object val)
1376      throws IOException {
1377      if (key.getClass() != keyClass)
1378        throw new IOException("wrong key class: "+key.getClass().getName()
1379                              +" is not "+keyClass);
1380      if (val.getClass() != valClass)
1381        throw new IOException("wrong value class: "+val.getClass().getName()
1382                              +" is not "+valClass);
1383
1384      buffer.reset();
1385
1386      // Append the 'key'
1387      keySerializer.serialize(key);
1388      int keyLength = buffer.getLength();
1389      if (keyLength < 0)
1390        throw new IOException("negative length keys not allowed: " + key);
1391
1392      // Append the 'value'
1393      if (compress == CompressionType.RECORD) {
1394        deflateFilter.resetState();
1395        compressedValSerializer.serialize(val);
1396        deflateOut.flush();
1397        deflateFilter.finish();
1398      } else {
1399        uncompressedValSerializer.serialize(val);
1400      }
1401
1402      // Write the record out
1403      checkAndWriteSync();                                // sync
1404      out.writeInt(buffer.getLength());                   // total record length
1405      out.writeInt(keyLength);                            // key portion length
1406      out.write(buffer.getData(), 0, buffer.getLength()); // data
1407    }
1408
1409    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1410        int keyLength, ValueBytes val) throws IOException {
1411      if (keyLength < 0)
1412        throw new IOException("negative length keys not allowed: " + keyLength);
1413
1414      int valLength = val.getSize();
1415
1416      checkAndWriteSync();
1417      
1418      out.writeInt(keyLength+valLength);          // total record length
1419      out.writeInt(keyLength);                    // key portion length
1420      out.write(keyData, keyOffset, keyLength);   // key
1421      val.writeUncompressedBytes(out);            // value
1422    }
1423
1424    /** Returns the current length of the output file.
1425     *
1426     * <p>This always returns a synchronized position.  In other words,
1427     * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
1428     * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
1429     * the key may be earlier in the file than key last written when this
1430     * method was called (e.g., with block-compression, it may be the first key
1431     * in the block that was being written when this method was called).
1432     */
1433    public synchronized long getLength() throws IOException {
1434      return out.getPos();
1435    }
1436
1437  } // class Writer
1438
1439  /** Write key/compressed-value pairs to a sequence-format file. */
1440  static class RecordCompressWriter extends Writer {
1441    
1442    RecordCompressWriter(Configuration conf, 
1443                         Option... options) throws IOException {
1444      super(conf, options);
1445    }
1446
1447    /** Append a key/value pair. */
1448    @Override
1449    @SuppressWarnings("unchecked")
1450    public synchronized void append(Object key, Object val)
1451      throws IOException {
1452      if (key.getClass() != keyClass)
1453        throw new IOException("wrong key class: "+key.getClass().getName()
1454                              +" is not "+keyClass);
1455      if (val.getClass() != valClass)
1456        throw new IOException("wrong value class: "+val.getClass().getName()
1457                              +" is not "+valClass);
1458
1459      buffer.reset();
1460
1461      // Append the 'key'
1462      keySerializer.serialize(key);
1463      int keyLength = buffer.getLength();
1464      if (keyLength < 0)
1465        throw new IOException("negative length keys not allowed: " + key);
1466
1467      // Compress 'value' and append it
1468      deflateFilter.resetState();
1469      compressedValSerializer.serialize(val);
1470      deflateOut.flush();
1471      deflateFilter.finish();
1472
1473      // Write the record out
1474      checkAndWriteSync();                                // sync
1475      out.writeInt(buffer.getLength());                   // total record length
1476      out.writeInt(keyLength);                            // key portion length
1477      out.write(buffer.getData(), 0, buffer.getLength()); // data
1478    }
1479
1480    /** Append a key/value pair. */
1481    @Override
1482    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1483        int keyLength, ValueBytes val) throws IOException {
1484
1485      if (keyLength < 0)
1486        throw new IOException("negative length keys not allowed: " + keyLength);
1487
1488      int valLength = val.getSize();
1489      
1490      checkAndWriteSync();                        // sync
1491      out.writeInt(keyLength+valLength);          // total record length
1492      out.writeInt(keyLength);                    // key portion length
1493      out.write(keyData, keyOffset, keyLength);   // 'key' data
1494      val.writeCompressedBytes(out);              // 'value' data
1495    }
1496    
1497  } // RecordCompressionWriter
1498
1499  /** Write compressed key/value blocks to a sequence-format file. */
1500  static class BlockCompressWriter extends Writer {
1501    
1502    private int noBufferedRecords = 0;
1503    
1504    private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
1505    private DataOutputBuffer keyBuffer = new DataOutputBuffer();
1506
1507    private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
1508    private DataOutputBuffer valBuffer = new DataOutputBuffer();
1509
1510    private final int compressionBlockSize;
1511    
1512    BlockCompressWriter(Configuration conf,
1513                        Option... options) throws IOException {
1514      super(conf, options);
1515      compressionBlockSize = 
1516        conf.getInt("io.seqfile.compress.blocksize", 1000000);
1517      keySerializer.close();
1518      keySerializer.open(keyBuffer);
1519      uncompressedValSerializer.close();
1520      uncompressedValSerializer.open(valBuffer);
1521    }
1522
1523    /** Workhorse to check and write out compressed data/lengths */
1524    private synchronized 
1525      void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
1526      throws IOException {
1527      deflateFilter.resetState();
1528      buffer.reset();
1529      deflateOut.write(uncompressedDataBuffer.getData(), 0, 
1530                       uncompressedDataBuffer.getLength());
1531      deflateOut.flush();
1532      deflateFilter.finish();
1533      
1534      WritableUtils.writeVInt(out, buffer.getLength());
1535      out.write(buffer.getData(), 0, buffer.getLength());
1536    }
1537    
1538    /** Compress and flush contents to dfs */
1539    @Override
1540    public synchronized void sync() throws IOException {
1541      if (noBufferedRecords > 0) {
1542        super.sync();
1543        
1544        // No. of records
1545        WritableUtils.writeVInt(out, noBufferedRecords);
1546        
1547        // Write 'keys' and lengths
1548        writeBuffer(keyLenBuffer);
1549        writeBuffer(keyBuffer);
1550        
1551        // Write 'values' and lengths
1552        writeBuffer(valLenBuffer);
1553        writeBuffer(valBuffer);
1554        
1555        // Flush the file-stream
1556        out.flush();
1557        
1558        // Reset internal states
1559        keyLenBuffer.reset();
1560        keyBuffer.reset();
1561        valLenBuffer.reset();
1562        valBuffer.reset();
1563        noBufferedRecords = 0;
1564      }
1565      
1566    }
1567    
1568    /** Close the file. */
1569    @Override
1570    public synchronized void close() throws IOException {
1571      if (out != null) {
1572        sync();
1573      }
1574      super.close();
1575    }
1576
1577    /** Append a key/value pair. */
1578    @Override
1579    @SuppressWarnings("unchecked")
1580    public synchronized void append(Object key, Object val)
1581      throws IOException {
1582      if (key.getClass() != keyClass)
1583        throw new IOException("wrong key class: "+key+" is not "+keyClass);
1584      if (val.getClass() != valClass)
1585        throw new IOException("wrong value class: "+val+" is not "+valClass);
1586
1587      // Save key/value into respective buffers 
1588      int oldKeyLength = keyBuffer.getLength();
1589      keySerializer.serialize(key);
1590      int keyLength = keyBuffer.getLength() - oldKeyLength;
1591      if (keyLength < 0)
1592        throw new IOException("negative length keys not allowed: " + key);
1593      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1594
1595      int oldValLength = valBuffer.getLength();
1596      uncompressedValSerializer.serialize(val);
1597      int valLength = valBuffer.getLength() - oldValLength;
1598      WritableUtils.writeVInt(valLenBuffer, valLength);
1599      
1600      // Added another key/value pair
1601      ++noBufferedRecords;
1602      
1603      // Compress and flush?
1604      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1605      if (currentBlockSize >= compressionBlockSize) {
1606        sync();
1607      }
1608    }
1609    
1610    /** Append a key/value pair. */
1611    @Override
1612    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1613        int keyLength, ValueBytes val) throws IOException {
1614      
1615      if (keyLength < 0)
1616        throw new IOException("negative length keys not allowed");
1617
1618      int valLength = val.getSize();
1619      
1620      // Save key/value data in relevant buffers
1621      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1622      keyBuffer.write(keyData, keyOffset, keyLength);
1623      WritableUtils.writeVInt(valLenBuffer, valLength);
1624      val.writeUncompressedBytes(valBuffer);
1625
1626      // Added another key/value pair
1627      ++noBufferedRecords;
1628
1629      // Compress and flush?
1630      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
1631      if (currentBlockSize >= compressionBlockSize) {
1632        sync();
1633      }
1634    }
1635  
1636  } // BlockCompressionWriter
1637
1638  /** Get the configured buffer size */
1639  private static int getBufferSize(Configuration conf) {
1640    return conf.getInt("io.file.buffer.size", 4096);
1641  }
1642
1643  /** Reads key/value pairs from a sequence-format file. */
1644  public static class Reader implements java.io.Closeable {
1645    private String filename;
1646    private FSDataInputStream in;
1647    private DataOutputBuffer outBuf = new DataOutputBuffer();
1648
1649    private byte version;
1650
1651    private String keyClassName;
1652    private String valClassName;
1653    private Class keyClass;
1654    private Class valClass;
1655
1656    private CompressionCodec codec = null;
1657    private Metadata metadata = null;
1658    
1659    private byte[] sync = new byte[SYNC_HASH_SIZE];
1660    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
1661    private boolean syncSeen;
1662
1663    private long headerEnd;
1664    private long end;
1665    private int keyLength;
1666    private int recordLength;
1667
1668    private boolean decompress;
1669    private boolean blockCompressed;
1670    
1671    private Configuration conf;
1672
1673    private int noBufferedRecords = 0;
1674    private boolean lazyDecompress = true;
1675    private boolean valuesDecompressed = true;
1676    
1677    private int noBufferedKeys = 0;
1678    private int noBufferedValues = 0;
1679    
1680    private DataInputBuffer keyLenBuffer = null;
1681    private CompressionInputStream keyLenInFilter = null;
1682    private DataInputStream keyLenIn = null;
1683    private Decompressor keyLenDecompressor = null;
1684    private DataInputBuffer keyBuffer = null;
1685    private CompressionInputStream keyInFilter = null;
1686    private DataInputStream keyIn = null;
1687    private Decompressor keyDecompressor = null;
1688
1689    private DataInputBuffer valLenBuffer = null;
1690    private CompressionInputStream valLenInFilter = null;
1691    private DataInputStream valLenIn = null;
1692    private Decompressor valLenDecompressor = null;
1693    private DataInputBuffer valBuffer = null;
1694    private CompressionInputStream valInFilter = null;
1695    private DataInputStream valIn = null;
1696    private Decompressor valDecompressor = null;
1697    
1698    private Deserializer keyDeserializer;
1699    private Deserializer valDeserializer;
1700
1701    /**
1702     * A tag interface for all of the Reader options
1703     */
1704    public static interface Option {}
1705    
1706    /**
1707     * Create an option to specify the path name of the sequence file.
1708     * @param value the path to read
1709     * @return a new option
1710     */
1711    public static Option file(Path value) {
1712      return new FileOption(value);
1713    }
1714    
1715    /**
1716     * Create an option to specify the stream with the sequence file.
1717     * @param value the stream to read.
1718     * @return a new option
1719     */
1720    public static Option stream(FSDataInputStream value) {
1721      return new InputStreamOption(value);
1722    }
1723    
1724    /**
1725     * Create an option to specify the starting byte to read.
1726     * @param value the number of bytes to skip over
1727     * @return a new option
1728     */
1729    public static Option start(long value) {
1730      return new StartOption(value);
1731    }
1732    
1733    /**
1734     * Create an option to specify the number of bytes to read.
1735     * @param value the number of bytes to read
1736     * @return a new option
1737     */
1738    public static Option length(long value) {
1739      return new LengthOption(value);
1740    }
1741    
1742    /**
1743     * Create an option with the buffer size for reading the given pathname.
1744     * @param value the number of bytes to buffer
1745     * @return a new option
1746     */
1747    public static Option bufferSize(int value) {
1748      return new BufferSizeOption(value);
1749    }
1750
1751    private static class FileOption extends Options.PathOption 
1752                                    implements Option {
1753      private FileOption(Path value) {
1754        super(value);
1755      }
1756    }
1757    
1758    private static class InputStreamOption
1759        extends Options.FSDataInputStreamOption 
1760        implements Option {
1761      private InputStreamOption(FSDataInputStream value) {
1762        super(value);
1763      }
1764    }
1765
1766    private static class StartOption extends Options.LongOption
1767                                     implements Option {
1768      private StartOption(long value) {
1769        super(value);
1770      }
1771    }
1772
1773    private static class LengthOption extends Options.LongOption
1774                                      implements Option {
1775      private LengthOption(long value) {
1776        super(value);
1777      }
1778    }
1779
1780    private static class BufferSizeOption extends Options.IntegerOption
1781                                      implements Option {
1782      private BufferSizeOption(int value) {
1783        super(value);
1784      }
1785    }
1786
1787    // only used directly
1788    private static class OnlyHeaderOption extends Options.BooleanOption 
1789                                          implements Option {
1790      private OnlyHeaderOption() {
1791        super(true);
1792      }
1793    }
1794
1795    public Reader(Configuration conf, Option... opts) throws IOException {
1796      // Look up the options, these are null if not set
1797      FileOption fileOpt = Options.getOption(FileOption.class, opts);
1798      InputStreamOption streamOpt = 
1799        Options.getOption(InputStreamOption.class, opts);
1800      StartOption startOpt = Options.getOption(StartOption.class, opts);
1801      LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
1802      BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
1803      OnlyHeaderOption headerOnly = 
1804        Options.getOption(OnlyHeaderOption.class, opts);
1805      // check for consistency
1806      if ((fileOpt == null) == (streamOpt == null)) {
1807        throw new 
1808          IllegalArgumentException("File or stream option must be specified");
1809      }
1810      if (fileOpt == null && bufOpt != null) {
1811        throw new IllegalArgumentException("buffer size can only be set when" +
1812                                           " a file is specified.");
1813      }
1814      // figure out the real values
1815      Path filename = null;
1816      FSDataInputStream file;
1817      final long len;
1818      if (fileOpt != null) {
1819        filename = fileOpt.getValue();
1820        FileSystem fs = filename.getFileSystem(conf);
1821        int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
1822        len = null == lenOpt
1823          ? fs.getFileStatus(filename).getLen()
1824          : lenOpt.getValue();
1825        file = openFile(fs, filename, bufSize, len);
1826      } else {
1827        len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
1828        file = streamOpt.getValue();
1829      }
1830      long start = startOpt == null ? 0 : startOpt.getValue();
1831      // really set up
1832      initialize(filename, file, start, len, conf, headerOnly != null);
1833    }
1834
1835    /**
1836     * Construct a reader by opening a file from the given file system.
1837     * @param fs The file system used to open the file.
1838     * @param file The file being read.
1839     * @param conf Configuration
1840     * @throws IOException
1841     * @deprecated Use Reader(Configuration, Option...) instead.
1842     */
1843    @Deprecated
1844    public Reader(FileSystem fs, Path file, 
1845                  Configuration conf) throws IOException {
1846      this(conf, file(file.makeQualified(fs)));
1847    }
1848
1849    /**
1850     * Construct a reader by the given input stream.
1851     * @param in An input stream.
1852     * @param buffersize unused
1853     * @param start The starting position.
1854     * @param length The length being read.
1855     * @param conf Configuration
1856     * @throws IOException
1857     * @deprecated Use Reader(Configuration, Reader.Option...) instead.
1858     */
1859    @Deprecated
1860    public Reader(FSDataInputStream in, int buffersize,
1861        long start, long length, Configuration conf) throws IOException {
1862      this(conf, stream(in), start(start), length(length));
1863    }
1864
1865    /** Common work of the constructors. */
1866    private void initialize(Path filename, FSDataInputStream in,
1867                            long start, long length, Configuration conf,
1868                            boolean tempReader) throws IOException {
1869      if (in == null) {
1870        throw new IllegalArgumentException("in == null");
1871      }
1872      this.filename = filename == null ? "<unknown>" : filename.toString();
1873      this.in = in;
1874      this.conf = conf;
1875      boolean succeeded = false;
1876      try {
1877        seek(start);
1878        this.end = this.in.getPos() + length;
1879        // if it wrapped around, use the max
1880        if (end < length) {
1881          end = Long.MAX_VALUE;
1882        }
1883        init(tempReader);
1884        succeeded = true;
1885      } finally {
1886        if (!succeeded) {
1887          IOUtils.cleanup(LOG, this.in);
1888        }
1889      }
1890    }
1891
1892    /**
1893     * Override this method to specialize the type of
1894     * {@link FSDataInputStream} returned.
1895     * @param fs The file system used to open the file.
1896     * @param file The file being read.
1897     * @param bufferSize The buffer size used to read the file.
1898     * @param length The length being read if it is >= 0.  Otherwise,
1899     *               the length is not available.
1900     * @return The opened stream.
1901     * @throws IOException
1902     */
1903    protected FSDataInputStream openFile(FileSystem fs, Path file,
1904        int bufferSize, long length) throws IOException {
1905      return fs.open(file, bufferSize);
1906    }
1907    
1908    /**
1909     * Initialize the {@link Reader}
1910     * @param tmpReader <code>true</code> if we are constructing a temporary
1911     *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
1912     *                  and hence do not initialize every component; 
1913     *                  <code>false</code> otherwise.
1914     * @throws IOException
1915     */
1916    private void init(boolean tempReader) throws IOException {
1917      byte[] versionBlock = new byte[VERSION.length];
1918      String exceptionMsg = this + " not a SequenceFile";
1919
1920      // Try to read sequence file header.
1921      try {
1922        in.readFully(versionBlock);
1923      } catch (EOFException e) {
1924        throw new EOFException(exceptionMsg);
1925      }
1926
1927      if ((versionBlock[0] != VERSION[0]) ||
1928          (versionBlock[1] != VERSION[1]) ||
1929          (versionBlock[2] != VERSION[2])) {
1930        throw new IOException(this + " not a SequenceFile");
1931      }
1932
1933      // Set 'version'
1934      version = versionBlock[3];
1935      if (version > VERSION[3]) {
1936        throw new VersionMismatchException(VERSION[3], version);
1937      }
1938
1939      if (version < BLOCK_COMPRESS_VERSION) {
1940        UTF8 className = new UTF8();
1941
1942        className.readFields(in);
1943        keyClassName = className.toStringChecked(); // key class name
1944
1945        className.readFields(in);
1946        valClassName = className.toStringChecked(); // val class name
1947      } else {
1948        keyClassName = Text.readString(in);
1949        valClassName = Text.readString(in);
1950      }
1951
1952      if (version > 2) {                          // if version > 2
1953        this.decompress = in.readBoolean();       // is compressed?
1954      } else {
1955        decompress = false;
1956      }
1957
1958      if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
1959        this.blockCompressed = in.readBoolean();  // is block-compressed?
1960      } else {
1961        blockCompressed = false;
1962      }
1963      
1964      // if version >= 5
1965      // setup the compression codec
1966      if (decompress) {
1967        if (version >= CUSTOM_COMPRESS_VERSION) {
1968          String codecClassname = Text.readString(in);
1969          try {
1970            Class<? extends CompressionCodec> codecClass
1971              = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
1972            this.codec = ReflectionUtils.newInstance(codecClass, conf);
1973          } catch (ClassNotFoundException cnfe) {
1974            throw new IllegalArgumentException("Unknown codec: " + 
1975                                               codecClassname, cnfe);
1976          }
1977        } else {
1978          codec = new DefaultCodec();
1979          ((Configurable)codec).setConf(conf);
1980        }
1981      }
1982      
1983      this.metadata = new Metadata();
1984      if (version >= VERSION_WITH_METADATA) {    // if version >= 6
1985        this.metadata.readFields(in);
1986      }
1987      
1988      if (version > 1) {                          // if version > 1
1989        in.readFully(sync);                       // read sync bytes
1990        headerEnd = in.getPos();                  // record end of header
1991      }
1992      
1993      // Initialize... *not* if this we are constructing a temporary Reader
1994      if (!tempReader) {
1995        valBuffer = new DataInputBuffer();
1996        if (decompress) {
1997          valDecompressor = CodecPool.getDecompressor(codec);
1998          valInFilter = codec.createInputStream(valBuffer, valDecompressor);
1999          valIn = new DataInputStream(valInFilter);
2000        } else {
2001          valIn = valBuffer;
2002        }
2003
2004        if (blockCompressed) {
2005          keyLenBuffer = new DataInputBuffer();
2006          keyBuffer = new DataInputBuffer();
2007          valLenBuffer = new DataInputBuffer();
2008
2009          keyLenDecompressor = CodecPool.getDecompressor(codec);
2010          keyLenInFilter = codec.createInputStream(keyLenBuffer, 
2011                                                   keyLenDecompressor);
2012          keyLenIn = new DataInputStream(keyLenInFilter);
2013
2014          keyDecompressor = CodecPool.getDecompressor(codec);
2015          keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
2016          keyIn = new DataInputStream(keyInFilter);
2017
2018          valLenDecompressor = CodecPool.getDecompressor(codec);
2019          valLenInFilter = codec.createInputStream(valLenBuffer, 
2020                                                   valLenDecompressor);
2021          valLenIn = new DataInputStream(valLenInFilter);
2022        }
2023        
2024        SerializationFactory serializationFactory =
2025          new SerializationFactory(conf);
2026        this.keyDeserializer =
2027          getDeserializer(serializationFactory, getKeyClass());
2028        if (this.keyDeserializer == null) {
2029          throw new IOException(
2030              "Could not find a deserializer for the Key class: '"
2031                  + getKeyClass().getCanonicalName() + "'. "
2032                  + "Please ensure that the configuration '" +
2033                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
2034                  + "properly configured, if you're using "
2035                  + "custom serialization.");
2036        }
2037        if (!blockCompressed) {
2038          this.keyDeserializer.open(valBuffer);
2039        } else {
2040          this.keyDeserializer.open(keyIn);
2041        }
2042        this.valDeserializer =
2043          getDeserializer(serializationFactory, getValueClass());
2044        if (this.valDeserializer == null) {
2045          throw new IOException(
2046              "Could not find a deserializer for the Value class: '"
2047                  + getValueClass().getCanonicalName() + "'. "
2048                  + "Please ensure that the configuration '" +
2049                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
2050                  + "properly configured, if you're using "
2051                  + "custom serialization.");
2052        }
2053        this.valDeserializer.open(valIn);
2054      }
2055    }
2056    
2057    @SuppressWarnings("unchecked")
2058    private Deserializer getDeserializer(SerializationFactory sf, Class c) {
2059      return sf.getDeserializer(c);
2060    }
2061    
2062    /** Close the file. */
2063    @Override
2064    public synchronized void close() throws IOException {
2065      // Return the decompressors to the pool
2066      CodecPool.returnDecompressor(keyLenDecompressor);
2067      CodecPool.returnDecompressor(keyDecompressor);
2068      CodecPool.returnDecompressor(valLenDecompressor);
2069      CodecPool.returnDecompressor(valDecompressor);
2070      keyLenDecompressor = keyDecompressor = null;
2071      valLenDecompressor = valDecompressor = null;
2072      
2073      if (keyDeserializer != null) {
2074        keyDeserializer.close();
2075      }
2076      if (valDeserializer != null) {
2077        valDeserializer.close();
2078      }
2079      
2080      // Close the input-stream
2081      in.close();
2082    }
2083
2084    /** Returns the name of the key class. */
2085    public String getKeyClassName() {
2086      return keyClassName;
2087    }
2088
2089    /** Returns the class of keys in this file. */
2090    public synchronized Class<?> getKeyClass() {
2091      if (null == keyClass) {
2092        try {
2093          keyClass = WritableName.getClass(getKeyClassName(), conf);
2094        } catch (IOException e) {
2095          throw new RuntimeException(e);
2096        }
2097      }
2098      return keyClass;
2099    }
2100
2101    /** Returns the name of the value class. */
2102    public String getValueClassName() {
2103      return valClassName;
2104    }
2105
2106    /** Returns the class of values in this file. */
2107    public synchronized Class<?> getValueClass() {
2108      if (null == valClass) {
2109        try {
2110          valClass = WritableName.getClass(getValueClassName(), conf);
2111        } catch (IOException e) {
2112          throw new RuntimeException(e);
2113        }
2114      }
2115      return valClass;
2116    }
2117
2118    /** Returns true if values are compressed. */
2119    public boolean isCompressed() { return decompress; }
2120    
2121    /** Returns true if records are block-compressed. */
2122    public boolean isBlockCompressed() { return blockCompressed; }
2123    
2124    /** Returns the compression codec of data in this file. */
2125    public CompressionCodec getCompressionCodec() { return codec; }
2126    
2127    private byte[] getSync() {
2128      return sync;
2129    }
2130
2131    private byte getVersion() {
2132      return version;
2133    }
2134
2135    /**
2136     * Get the compression type for this file.
2137     * @return the compression type
2138     */
2139    public CompressionType getCompressionType() {
2140      if (decompress) {
2141        return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
2142      } else {
2143        return CompressionType.NONE;
2144      }
2145    }
2146
2147    /** Returns the metadata object of the file */
2148    public Metadata getMetadata() {
2149      return this.metadata;
2150    }
2151    
2152    /** Returns the configuration used for this file. */
2153    Configuration getConf() { return conf; }
2154    
2155    /** Read a compressed buffer */
2156    private synchronized void readBuffer(DataInputBuffer buffer, 
2157                                         CompressionInputStream filter) throws IOException {
2158      // Read data into a temporary buffer
2159      DataOutputBuffer dataBuffer = new DataOutputBuffer();
2160
2161      try {
2162        int dataBufferLength = WritableUtils.readVInt(in);
2163        dataBuffer.write(in, dataBufferLength);
2164      
2165        // Set up 'buffer' connected to the input-stream
2166        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
2167      } finally {
2168        dataBuffer.close();
2169      }
2170
2171      // Reset the codec
2172      filter.resetState();
2173    }
2174    
2175    /** Read the next 'compressed' block */
2176    private synchronized void readBlock() throws IOException {
2177      // Check if we need to throw away a whole block of 
2178      // 'values' due to 'lazy decompression' 
2179      if (lazyDecompress && !valuesDecompressed) {
2180        in.seek(WritableUtils.readVInt(in)+in.getPos());
2181        in.seek(WritableUtils.readVInt(in)+in.getPos());
2182      }
2183      
2184      // Reset internal states
2185      noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
2186      valuesDecompressed = false;
2187
2188      //Process sync
2189      if (sync != null) {
2190        in.readInt();
2191        in.readFully(syncCheck);                // read syncCheck
2192        if (!Arrays.equals(sync, syncCheck))    // check it
2193          throw new IOException("File is corrupt!");
2194      }
2195      syncSeen = true;
2196
2197      // Read number of records in this block
2198      noBufferedRecords = WritableUtils.readVInt(in);
2199      
2200      // Read key lengths and keys
2201      readBuffer(keyLenBuffer, keyLenInFilter);
2202      readBuffer(keyBuffer, keyInFilter);
2203      noBufferedKeys = noBufferedRecords;
2204      
2205      // Read value lengths and values
2206      if (!lazyDecompress) {
2207        readBuffer(valLenBuffer, valLenInFilter);
2208        readBuffer(valBuffer, valInFilter);
2209        noBufferedValues = noBufferedRecords;
2210        valuesDecompressed = true;
2211      }
2212    }
2213
2214    /** 
2215     * Position valLenIn/valIn to the 'value' 
2216     * corresponding to the 'current' key 
2217     */
2218    private synchronized void seekToCurrentValue() throws IOException {
2219      if (!blockCompressed) {
2220        if (decompress) {
2221          valInFilter.resetState();
2222        }
2223        valBuffer.reset();
2224      } else {
2225        // Check if this is the first value in the 'block' to be read
2226        if (lazyDecompress && !valuesDecompressed) {
2227          // Read the value lengths and values
2228          readBuffer(valLenBuffer, valLenInFilter);
2229          readBuffer(valBuffer, valInFilter);
2230          noBufferedValues = noBufferedRecords;
2231          valuesDecompressed = true;
2232        }
2233        
2234        // Calculate the no. of bytes to skip
2235        // Note: 'current' key has already been read!
2236        int skipValBytes = 0;
2237        int currentKey = noBufferedKeys + 1;          
2238        for (int i=noBufferedValues; i > currentKey; --i) {
2239          skipValBytes += WritableUtils.readVInt(valLenIn);
2240          --noBufferedValues;
2241        }
2242        
2243        // Skip to the 'val' corresponding to 'current' key
2244        if (skipValBytes > 0) {
2245          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
2246            throw new IOException("Failed to seek to " + currentKey + 
2247                                  "(th) value!");
2248          }
2249        }
2250      }
2251    }
2252
2253    /**
2254     * Get the 'value' corresponding to the last read 'key'.
2255     * @param val : The 'value' to be read.
2256     * @throws IOException
2257     */
2258    public synchronized void getCurrentValue(Writable val) 
2259      throws IOException {
2260      if (val instanceof Configurable) {
2261        ((Configurable) val).setConf(this.conf);
2262      }
2263
2264      // Position stream to 'current' value
2265      seekToCurrentValue();
2266
2267      if (!blockCompressed) {
2268        val.readFields(valIn);
2269        
2270        if (valIn.read() > 0) {
2271          LOG.info("available bytes: " + valIn.available());
2272          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2273                                + " bytes, should read " +
2274                                (valBuffer.getLength()-keyLength));
2275        }
2276      } else {
2277        // Get the value
2278        int valLength = WritableUtils.readVInt(valLenIn);
2279        val.readFields(valIn);
2280        
2281        // Read another compressed 'value'
2282        --noBufferedValues;
2283        
2284        // Sanity check
2285        if ((valLength < 0) && LOG.isDebugEnabled()) {
2286          LOG.debug(val + " is a zero-length value");
2287        }
2288      }
2289
2290    }
2291    
2292    /**
2293     * Get the 'value' corresponding to the last read 'key'.
2294     * @param val : The 'value' to be read.
2295     * @throws IOException
2296     */
2297    public synchronized Object getCurrentValue(Object val) 
2298      throws IOException {
2299      if (val instanceof Configurable) {
2300        ((Configurable) val).setConf(this.conf);
2301      }
2302
2303      // Position stream to 'current' value
2304      seekToCurrentValue();
2305
2306      if (!blockCompressed) {
2307        val = deserializeValue(val);
2308        
2309        if (valIn.read() > 0) {
2310          LOG.info("available bytes: " + valIn.available());
2311          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2312                                + " bytes, should read " +
2313                                (valBuffer.getLength()-keyLength));
2314        }
2315      } else {
2316        // Get the value
2317        int valLength = WritableUtils.readVInt(valLenIn);
2318        val = deserializeValue(val);
2319        
2320        // Read another compressed 'value'
2321        --noBufferedValues;
2322        
2323        // Sanity check
2324        if ((valLength < 0) && LOG.isDebugEnabled()) {
2325          LOG.debug(val + " is a zero-length value");
2326        }
2327      }
2328      return val;
2329
2330    }
2331
2332    @SuppressWarnings("unchecked")
2333    private Object deserializeValue(Object val) throws IOException {
2334      return valDeserializer.deserialize(val);
2335    }
2336    
2337    /** Read the next key in the file into <code>key</code>, skipping its
2338     * value.  True if another entry exists, and false at end of file. */
2339    public synchronized boolean next(Writable key) throws IOException {
2340      if (key.getClass() != getKeyClass())
2341        throw new IOException("wrong key class: "+key.getClass().getName()
2342                              +" is not "+keyClass);
2343
2344      if (!blockCompressed) {
2345        outBuf.reset();
2346        
2347        keyLength = next(outBuf);
2348        if (keyLength < 0)
2349          return false;
2350        
2351        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2352        
2353        key.readFields(valBuffer);
2354        valBuffer.mark(0);
2355        if (valBuffer.getPosition() != keyLength)
2356          throw new IOException(key + " read " + valBuffer.getPosition()
2357                                + " bytes, should read " + keyLength);
2358      } else {
2359        //Reset syncSeen
2360        syncSeen = false;
2361        
2362        if (noBufferedKeys == 0) {
2363          try {
2364            readBlock();
2365          } catch (EOFException eof) {
2366            return false;
2367          }
2368        }
2369        
2370        int keyLength = WritableUtils.readVInt(keyLenIn);
2371        
2372        // Sanity check
2373        if (keyLength < 0) {
2374          return false;
2375        }
2376        
2377        //Read another compressed 'key'
2378        key.readFields(keyIn);
2379        --noBufferedKeys;
2380      }
2381
2382      return true;
2383    }
2384
2385    /** Read the next key/value pair in the file into <code>key</code> and
2386     * <code>val</code>.  Returns true if such a pair exists and false when at
2387     * end of file */
2388    public synchronized boolean next(Writable key, Writable val)
2389      throws IOException {
2390      if (val.getClass() != getValueClass())
2391        throw new IOException("wrong value class: "+val+" is not "+valClass);
2392
2393      boolean more = next(key);
2394      
2395      if (more) {
2396        getCurrentValue(val);
2397      }
2398
2399      return more;
2400    }
2401    
2402    /**
2403     * Read and return the next record length, potentially skipping over 
2404     * a sync block.
2405     * @return the length of the next record or -1 if there is no next record
2406     * @throws IOException
2407     */
2408    private synchronized int readRecordLength() throws IOException {
2409      if (in.getPos() >= end) {
2410        return -1;
2411      }      
2412      int length = in.readInt();
2413      if (version > 1 && sync != null &&
2414          length == SYNC_ESCAPE) {              // process a sync entry
2415        in.readFully(syncCheck);                // read syncCheck
2416        if (!Arrays.equals(sync, syncCheck))    // check it
2417          throw new IOException("File is corrupt!");
2418        syncSeen = true;
2419        if (in.getPos() >= end) {
2420          return -1;
2421        }
2422        length = in.readInt();                  // re-read length
2423      } else {
2424        syncSeen = false;
2425      }
2426      
2427      return length;
2428    }
2429    
2430    /** Read the next key/value pair in the file into <code>buffer</code>.
2431     * Returns the length of the key read, or -1 if at end of file.  The length
2432     * of the value may be computed by calling buffer.getLength() before and
2433     * after calls to this method. */
2434    /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
2435    @Deprecated
2436    synchronized int next(DataOutputBuffer buffer) throws IOException {
2437      // Unsupported for block-compressed sequence files
2438      if (blockCompressed) {
2439        throw new IOException("Unsupported call for block-compressed" +
2440                              " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
2441      }
2442      try {
2443        int length = readRecordLength();
2444        if (length == -1) {
2445          return -1;
2446        }
2447        int keyLength = in.readInt();
2448        buffer.write(in, length);
2449        return keyLength;
2450      } catch (ChecksumException e) {             // checksum failure
2451        handleChecksumException(e);
2452        return next(buffer);
2453      }
2454    }
2455
2456    public ValueBytes createValueBytes() {
2457      ValueBytes val = null;
2458      if (!decompress || blockCompressed) {
2459        val = new UncompressedBytes();
2460      } else {
2461        val = new CompressedBytes(codec);
2462      }
2463      return val;
2464    }
2465
2466    /**
2467     * Read 'raw' records.
2468     * @param key - The buffer into which the key is read
2469     * @param val - The 'raw' value
2470     * @return Returns the total record length or -1 for end of file
2471     * @throws IOException
2472     */
2473    public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
2474      throws IOException {
2475      if (!blockCompressed) {
2476        int length = readRecordLength();
2477        if (length == -1) {
2478          return -1;
2479        }
2480        int keyLength = in.readInt();
2481        int valLength = length - keyLength;
2482        key.write(in, keyLength);
2483        if (decompress) {
2484          CompressedBytes value = (CompressedBytes)val;
2485          value.reset(in, valLength);
2486        } else {
2487          UncompressedBytes value = (UncompressedBytes)val;
2488          value.reset(in, valLength);
2489        }
2490        
2491        return length;
2492      } else {
2493        //Reset syncSeen
2494        syncSeen = false;
2495        
2496        // Read 'key'
2497        if (noBufferedKeys == 0) {
2498          if (in.getPos() >= end) 
2499            return -1;
2500
2501          try { 
2502            readBlock();
2503          } catch (EOFException eof) {
2504            return -1;
2505          }
2506        }
2507        int keyLength = WritableUtils.readVInt(keyLenIn);
2508        if (keyLength < 0) {
2509          throw new IOException("zero length key found!");
2510        }
2511        key.write(keyIn, keyLength);
2512        --noBufferedKeys;
2513        
2514        // Read raw 'value'
2515        seekToCurrentValue();
2516        int valLength = WritableUtils.readVInt(valLenIn);
2517        UncompressedBytes rawValue = (UncompressedBytes)val;
2518        rawValue.reset(valIn, valLength);
2519        --noBufferedValues;
2520        
2521        return (keyLength+valLength);
2522      }
2523      
2524    }
2525
2526    /**
2527     * Read 'raw' keys.
2528     * @param key - The buffer into which the key is read
2529     * @return Returns the key length or -1 for end of file
2530     * @throws IOException
2531     */
2532    public synchronized int nextRawKey(DataOutputBuffer key) 
2533      throws IOException {
2534      if (!blockCompressed) {
2535        recordLength = readRecordLength();
2536        if (recordLength == -1) {
2537          return -1;
2538        }
2539        keyLength = in.readInt();
2540        key.write(in, keyLength);
2541        return keyLength;
2542      } else {
2543        //Reset syncSeen
2544        syncSeen = false;
2545        
2546        // Read 'key'
2547        if (noBufferedKeys == 0) {
2548          if (in.getPos() >= end) 
2549            return -1;
2550
2551          try { 
2552            readBlock();
2553          } catch (EOFException eof) {
2554            return -1;
2555          }
2556        }
2557        int keyLength = WritableUtils.readVInt(keyLenIn);
2558        if (keyLength < 0) {
2559          throw new IOException("zero length key found!");
2560        }
2561        key.write(keyIn, keyLength);
2562        --noBufferedKeys;
2563        
2564        return keyLength;
2565      }
2566      
2567    }
2568
2569    /** Read the next key in the file, skipping its
2570     * value.  Return null at end of file. */
2571    public synchronized Object next(Object key) throws IOException {
2572      if (key != null && key.getClass() != getKeyClass()) {
2573        throw new IOException("wrong key class: "+key.getClass().getName()
2574                              +" is not "+keyClass);
2575      }
2576
2577      if (!blockCompressed) {
2578        outBuf.reset();
2579        
2580        keyLength = next(outBuf);
2581        if (keyLength < 0)
2582          return null;
2583        
2584        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2585        
2586        key = deserializeKey(key);
2587        valBuffer.mark(0);
2588        if (valBuffer.getPosition() != keyLength)
2589          throw new IOException(key + " read " + valBuffer.getPosition()
2590                                + " bytes, should read " + keyLength);
2591      } else {
2592        //Reset syncSeen
2593        syncSeen = false;
2594        
2595        if (noBufferedKeys == 0) {
2596          try {
2597            readBlock();
2598          } catch (EOFException eof) {
2599            return null;
2600          }
2601        }
2602        
2603        int keyLength = WritableUtils.readVInt(keyLenIn);
2604        
2605        // Sanity check
2606        if (keyLength < 0) {
2607          return null;
2608        }
2609        
2610        //Read another compressed 'key'
2611        key = deserializeKey(key);
2612        --noBufferedKeys;
2613      }
2614
2615      return key;
2616    }
2617
2618    @SuppressWarnings("unchecked")
2619    private Object deserializeKey(Object key) throws IOException {
2620      return keyDeserializer.deserialize(key);
2621    }
2622
2623    /**
2624     * Read 'raw' values.
2625     * @param val - The 'raw' value
2626     * @return Returns the value length
2627     * @throws IOException
2628     */
2629    public synchronized int nextRawValue(ValueBytes val) 
2630      throws IOException {
2631      
2632      // Position stream to current value
2633      seekToCurrentValue();
2634 
2635      if (!blockCompressed) {
2636        int valLength = recordLength - keyLength;
2637        if (decompress) {
2638          CompressedBytes value = (CompressedBytes)val;
2639          value.reset(in, valLength);
2640        } else {
2641          UncompressedBytes value = (UncompressedBytes)val;
2642          value.reset(in, valLength);
2643        }
2644         
2645        return valLength;
2646      } else {
2647        int valLength = WritableUtils.readVInt(valLenIn);
2648        UncompressedBytes rawValue = (UncompressedBytes)val;
2649        rawValue.reset(valIn, valLength);
2650        --noBufferedValues;
2651        return valLength;
2652      }
2653      
2654    }
2655
2656    private void handleChecksumException(ChecksumException e)
2657      throws IOException {
2658      if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
2659        LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
2660        sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
2661      } else {
2662        throw e;
2663      }
2664    }
2665
2666    /** disables sync. often invoked for tmp files */
2667    synchronized void ignoreSync() {
2668      sync = null;
2669    }
2670    
2671    /** Set the current byte position in the input file.
2672     *
2673     * <p>The position passed must be a position returned by {@link
2674     * SequenceFile.Writer#getLength()} when writing this file.  To seek to an arbitrary
2675     * position, use {@link SequenceFile.Reader#sync(long)}.
2676     */
2677    public synchronized void seek(long position) throws IOException {
2678      in.seek(position);
2679      if (blockCompressed) {                      // trigger block read
2680        noBufferedKeys = 0;
2681        valuesDecompressed = true;
2682      }
2683    }
2684
2685    /** Seek to the next sync mark past a given position.*/
2686    public synchronized void sync(long position) throws IOException {
2687      if (position+SYNC_SIZE >= end) {
2688        seek(end);
2689        return;
2690      }
2691
2692      if (position < headerEnd) {
2693        // seek directly to first record
2694        in.seek(headerEnd);
2695        // note the sync marker "seen" in the header
2696        syncSeen = true;
2697        return;
2698      }
2699
2700      try {
2701        seek(position+4);                         // skip escape
2702        in.readFully(syncCheck);
2703        int syncLen = sync.length;
2704        for (int i = 0; in.getPos() < end; i++) {
2705          int j = 0;
2706          for (; j < syncLen; j++) {
2707            if (sync[j] != syncCheck[(i+j)%syncLen])
2708              break;
2709          }
2710          if (j == syncLen) {
2711            in.seek(in.getPos() - SYNC_SIZE);     // position before sync
2712            return;
2713          }
2714          syncCheck[i%syncLen] = in.readByte();
2715        }
2716      } catch (ChecksumException e) {             // checksum failure
2717        handleChecksumException(e);
2718      }
2719    }
2720
2721    /** Returns true iff the previous call to next passed a sync mark.*/
2722    public synchronized boolean syncSeen() { return syncSeen; }
2723
2724    /** Return the current byte position in the input file. */
2725    public synchronized long getPosition() throws IOException {
2726      return in.getPos();
2727    }
2728
2729    /** Returns the name of the file. */
2730    @Override
2731    public String toString() {
2732      return filename;
2733    }
2734
2735  }
2736
2737  /** Sorts key/value pairs in a sequence-format file.
2738   *
2739   * <p>For best performance, applications should make sure that the {@link
2740   * Writable#readFields(DataInput)} implementation of their keys is
2741   * very efficient.  In particular, it should avoid allocating memory.
2742   */
2743  public static class Sorter {
2744
2745    private RawComparator comparator;
2746
2747    private MergeSort mergeSort; //the implementation of merge sort
2748    
2749    private Path[] inFiles;                     // when merging or sorting
2750
2751    private Path outFile;
2752
2753    private int memory; // bytes
2754    private int factor; // merged per pass
2755
2756    private FileSystem fs = null;
2757
2758    private Class keyClass;
2759    private Class valClass;
2760
2761    private Configuration conf;
2762    private Metadata metadata;
2763    
2764    private Progressable progressable = null;
2765
2766    /** Sort and merge files containing the named classes. */
2767    public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
2768                  Class valClass, Configuration conf)  {
2769      this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf);
2770    }
2771
2772    /** Sort and merge using an arbitrary {@link RawComparator}. */
2773    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
2774                  Class valClass, Configuration conf) {
2775      this(fs, comparator, keyClass, valClass, conf, new Metadata());
2776    }
2777
2778    /** Sort and merge using an arbitrary {@link RawComparator}. */
2779    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2780                  Class valClass, Configuration conf, Metadata metadata) {
2781      this.fs = fs;
2782      this.comparator = comparator;
2783      this.keyClass = keyClass;
2784      this.valClass = valClass;
2785      this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
2786      this.factor = conf.getInt("io.sort.factor", 100);
2787      this.conf = conf;
2788      this.metadata = metadata;
2789    }
2790
2791    /** Set the number of streams to merge at once.*/
2792    public void setFactor(int factor) { this.factor = factor; }
2793
2794    /** Get the number of streams to merge at once.*/
2795    public int getFactor() { return factor; }
2796
2797    /** Set the total amount of buffer memory, in bytes.*/
2798    public void setMemory(int memory) { this.memory = memory; }
2799
2800    /** Get the total amount of buffer memory, in bytes.*/
2801    public int getMemory() { return memory; }
2802
2803    /** Set the progressable object in order to report progress. */
2804    public void setProgressable(Progressable progressable) {
2805      this.progressable = progressable;
2806    }
2807    
2808    /** 
2809     * Perform a file sort from a set of input files into an output file.
2810     * @param inFiles the files to be sorted
2811     * @param outFile the sorted output file
2812     * @param deleteInput should the input files be deleted as they are read?
2813     */
2814    public void sort(Path[] inFiles, Path outFile,
2815                     boolean deleteInput) throws IOException {
2816      if (fs.exists(outFile)) {
2817        throw new IOException("already exists: " + outFile);
2818      }
2819
2820      this.inFiles = inFiles;
2821      this.outFile = outFile;
2822
2823      int segments = sortPass(deleteInput);
2824      if (segments > 1) {
2825        mergePass(outFile.getParent());
2826      }
2827    }
2828
2829    /** 
2830     * Perform a file sort from a set of input files and return an iterator.
2831     * @param inFiles the files to be sorted
2832     * @param tempDir the directory where temp files are created during sort
2833     * @param deleteInput should the input files be deleted as they are read?
2834     * @return iterator the RawKeyValueIterator
2835     */
2836    public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
2837                                              boolean deleteInput) throws IOException {
2838      Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
2839      if (fs.exists(outFile)) {
2840        throw new IOException("already exists: " + outFile);
2841      }
2842      this.inFiles = inFiles;
2843      //outFile will basically be used as prefix for temp files in the cases
2844      //where sort outputs multiple sorted segments. For the single segment
2845      //case, the outputFile itself will contain the sorted data for that
2846      //segment
2847      this.outFile = outFile;
2848
2849      int segments = sortPass(deleteInput);
2850      if (segments > 1)
2851        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
2852                     tempDir);
2853      else if (segments == 1)
2854        return merge(new Path[]{outFile}, true, tempDir);
2855      else return null;
2856    }
2857
2858    /**
2859     * The backwards compatible interface to sort.
2860     * @param inFile the input file to sort
2861     * @param outFile the sorted output file
2862     */
2863    public void sort(Path inFile, Path outFile) throws IOException {
2864      sort(new Path[]{inFile}, outFile, false);
2865    }
2866    
2867    private int sortPass(boolean deleteInput) throws IOException {
2868      if(LOG.isDebugEnabled()) {
2869        LOG.debug("running sort pass");
2870      }
2871      SortPass sortPass = new SortPass();         // make the SortPass
2872      sortPass.setProgressable(progressable);
2873      mergeSort = new MergeSort(sortPass.new SeqFileComparator());
2874      try {
2875        return sortPass.run(deleteInput);         // run it
2876      } finally {
2877        sortPass.close();                         // close it
2878      }
2879    }
2880
2881    private class SortPass {
2882      private int memoryLimit = memory/4;
2883      private int recordLimit = 1000000;
2884      
2885      private DataOutputBuffer rawKeys = new DataOutputBuffer();
2886      private byte[] rawBuffer;
2887
2888      private int[] keyOffsets = new int[1024];
2889      private int[] pointers = new int[keyOffsets.length];
2890      private int[] pointersCopy = new int[keyOffsets.length];
2891      private int[] keyLengths = new int[keyOffsets.length];
2892      private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
2893      
2894      private ArrayList segmentLengths = new ArrayList();
2895      
2896      private Reader in = null;
2897      private FSDataOutputStream out = null;
2898      private FSDataOutputStream indexOut = null;
2899      private Path outName;
2900
2901      private Progressable progressable = null;
2902
2903      public int run(boolean deleteInput) throws IOException {
2904        int segments = 0;
2905        int currentFile = 0;
2906        boolean atEof = (currentFile >= inFiles.length);
2907        CompressionType compressionType;
2908        CompressionCodec codec = null;
2909        segmentLengths.clear();
2910        if (atEof) {
2911          return 0;
2912        }
2913        
2914        // Initialize
2915        in = new Reader(fs, inFiles[currentFile], conf);
2916        compressionType = in.getCompressionType();
2917        codec = in.getCompressionCodec();
2918        
2919        for (int i=0; i < rawValues.length; ++i) {
2920          rawValues[i] = null;
2921        }
2922        
2923        while (!atEof) {
2924          int count = 0;
2925          int bytesProcessed = 0;
2926          rawKeys.reset();
2927          while (!atEof && 
2928                 bytesProcessed < memoryLimit && count < recordLimit) {
2929
2930            // Read a record into buffer
2931            // Note: Attempt to re-use 'rawValue' as far as possible
2932            int keyOffset = rawKeys.getLength();       
2933            ValueBytes rawValue = 
2934              (count == keyOffsets.length || rawValues[count] == null) ? 
2935              in.createValueBytes() : 
2936              rawValues[count];
2937            int recordLength = in.nextRaw(rawKeys, rawValue);
2938            if (recordLength == -1) {
2939              in.close();
2940              if (deleteInput) {
2941                fs.delete(inFiles[currentFile], true);
2942              }
2943              currentFile += 1;
2944              atEof = currentFile >= inFiles.length;
2945              if (!atEof) {
2946                in = new Reader(fs, inFiles[currentFile], conf);
2947              } else {
2948                in = null;
2949              }
2950              continue;
2951            }
2952
2953            int keyLength = rawKeys.getLength() - keyOffset;
2954
2955            if (count == keyOffsets.length)
2956              grow();
2957
2958            keyOffsets[count] = keyOffset;                // update pointers
2959            pointers[count] = count;
2960            keyLengths[count] = keyLength;
2961            rawValues[count] = rawValue;
2962
2963            bytesProcessed += recordLength; 
2964            count++;
2965          }
2966
2967          // buffer is full -- sort & flush it
2968          if(LOG.isDebugEnabled()) {
2969            LOG.debug("flushing segment " + segments);
2970          }
2971          rawBuffer = rawKeys.getData();
2972          sort(count);
2973          // indicate we're making progress
2974          if (progressable != null) {
2975            progressable.progress();
2976          }
2977          flush(count, bytesProcessed, compressionType, codec, 
2978                segments==0 && atEof);
2979          segments++;
2980        }
2981        return segments;
2982      }
2983
2984      public void close() throws IOException {
2985        if (in != null) {
2986          in.close();
2987        }
2988        if (out != null) {
2989          out.close();
2990        }
2991        if (indexOut != null) {
2992          indexOut.close();
2993        }
2994      }
2995
2996      private void grow() {
2997        int newLength = keyOffsets.length * 3 / 2;
2998        keyOffsets = grow(keyOffsets, newLength);
2999        pointers = grow(pointers, newLength);
3000        pointersCopy = new int[newLength];
3001        keyLengths = grow(keyLengths, newLength);
3002        rawValues = grow(rawValues, newLength);
3003      }
3004
3005      private int[] grow(int[] old, int newLength) {
3006        int[] result = new int[newLength];
3007        System.arraycopy(old, 0, result, 0, old.length);
3008        return result;
3009      }
3010      
3011      private ValueBytes[] grow(ValueBytes[] old, int newLength) {
3012        ValueBytes[] result = new ValueBytes[newLength];
3013        System.arraycopy(old, 0, result, 0, old.length);
3014        for (int i=old.length; i < newLength; ++i) {
3015          result[i] = null;
3016        }
3017        return result;
3018      }
3019
3020      private void flush(int count, int bytesProcessed, 
3021                         CompressionType compressionType, 
3022                         CompressionCodec codec, 
3023                         boolean done) throws IOException {
3024        if (out == null) {
3025          outName = done ? outFile : outFile.suffix(".0");
3026          out = fs.create(outName);
3027          if (!done) {
3028            indexOut = fs.create(outName.suffix(".index"));
3029          }
3030        }
3031
3032        long segmentStart = out.getPos();
3033        Writer writer = createWriter(conf, Writer.stream(out), 
3034            Writer.keyClass(keyClass), Writer.valueClass(valClass),
3035            Writer.compression(compressionType, codec),
3036            Writer.metadata(done ? metadata : new Metadata()));
3037        
3038        if (!done) {
3039          writer.sync = null;                     // disable sync on temp files
3040        }
3041
3042        for (int i = 0; i < count; i++) {         // write in sorted order
3043          int p = pointers[i];
3044          writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
3045        }
3046        writer.close();
3047        
3048        if (!done) {
3049          // Save the segment length
3050          WritableUtils.writeVLong(indexOut, segmentStart);
3051          WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
3052          indexOut.flush();
3053        }
3054      }
3055
3056      private void sort(int count) {
3057        System.arraycopy(pointers, 0, pointersCopy, 0, count);
3058        mergeSort.mergeSort(pointersCopy, pointers, 0, count);
3059      }
3060      class SeqFileComparator implements Comparator<IntWritable> {
3061        @Override
3062        public int compare(IntWritable I, IntWritable J) {
3063          return comparator.compare(rawBuffer, keyOffsets[I.get()], 
3064                                    keyLengths[I.get()], rawBuffer, 
3065                                    keyOffsets[J.get()], keyLengths[J.get()]);
3066        }
3067      }
3068      
3069      /** set the progressable object in order to report progress */
3070      public void setProgressable(Progressable progressable)
3071      {
3072        this.progressable = progressable;
3073      }
3074      
3075    } // SequenceFile.Sorter.SortPass
3076
3077    /** The interface to iterate over raw keys/values of SequenceFiles. */
3078    public static interface RawKeyValueIterator {
3079      /** Gets the current raw key
3080       * @return DataOutputBuffer
3081       * @throws IOException
3082       */
3083      DataOutputBuffer getKey() throws IOException; 
3084      /** Gets the current raw value
3085       * @return ValueBytes 
3086       * @throws IOException
3087       */
3088      ValueBytes getValue() throws IOException; 
3089      /** Sets up the current key and value (for getKey and getValue)
3090       * @return true if there exists a key/value, false otherwise 
3091       * @throws IOException
3092       */
3093      boolean next() throws IOException;
3094      /** closes the iterator so that the underlying streams can be closed
3095       * @throws IOException
3096       */
3097      void close() throws IOException;
3098      /** Gets the Progress object; this has a float (0.0 - 1.0) 
3099       * indicating the bytes processed by the iterator so far
3100       */
3101      Progress getProgress();
3102    }    
3103    
3104    /**
3105     * Merges the list of segments of type <code>SegmentDescriptor</code>
3106     * @param segments the list of SegmentDescriptors
3107     * @param tmpDir the directory to write temporary files into
3108     * @return RawKeyValueIterator
3109     * @throws IOException
3110     */
3111    public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
3112                                     Path tmpDir) 
3113      throws IOException {
3114      // pass in object to report progress, if present
3115      MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
3116      return mQueue.merge();
3117    }
3118
3119    /**
3120     * Merges the contents of files passed in Path[] using a max factor value
3121     * that is already set
3122     * @param inNames the array of path names
3123     * @param deleteInputs true if the input files should be deleted when 
3124     * unnecessary
3125     * @param tmpDir the directory to write temporary files into
3126     * @return RawKeyValueIteratorMergeQueue
3127     * @throws IOException
3128     */
3129    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3130                                     Path tmpDir) 
3131      throws IOException {
3132      return merge(inNames, deleteInputs, 
3133                   (inNames.length < factor) ? inNames.length : factor,
3134                   tmpDir);
3135    }
3136
3137    /**
3138     * Merges the contents of files passed in Path[]
3139     * @param inNames the array of path names
3140     * @param deleteInputs true if the input files should be deleted when 
3141     * unnecessary
3142     * @param factor the factor that will be used as the maximum merge fan-in
3143     * @param tmpDir the directory to write temporary files into
3144     * @return RawKeyValueIteratorMergeQueue
3145     * @throws IOException
3146     */
3147    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3148                                     int factor, Path tmpDir) 
3149      throws IOException {
3150      //get the segments from inNames
3151      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3152      for (int i = 0; i < inNames.length; i++) {
3153        SegmentDescriptor s = new SegmentDescriptor(0,
3154            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3155        s.preserveInput(!deleteInputs);
3156        s.doSync();
3157        a.add(s);
3158      }
3159      this.factor = factor;
3160      MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
3161      return mQueue.merge();
3162    }
3163
3164    /**
3165     * Merges the contents of files passed in Path[]
3166     * @param inNames the array of path names
3167     * @param tempDir the directory for creating temp files during merge
3168     * @param deleteInputs true if the input files should be deleted when 
3169     * unnecessary
3170     * @return RawKeyValueIteratorMergeQueue
3171     * @throws IOException
3172     */
3173    public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
3174                                     boolean deleteInputs) 
3175      throws IOException {
3176      //outFile will basically be used as prefix for temp files for the
3177      //intermediate merge outputs           
3178      this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
3179      //get the segments from inNames
3180      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3181      for (int i = 0; i < inNames.length; i++) {
3182        SegmentDescriptor s = new SegmentDescriptor(0,
3183            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3184        s.preserveInput(!deleteInputs);
3185        s.doSync();
3186        a.add(s);
3187      }
3188      factor = (inNames.length < factor) ? inNames.length : factor;
3189      // pass in object to report progress, if present
3190      MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
3191      return mQueue.merge();
3192    }
3193
3194    /**
3195     * Clones the attributes (like compression of the input file and creates a 
3196     * corresponding Writer
3197     * @param inputFile the path of the input file whose attributes should be 
3198     * cloned
3199     * @param outputFile the path of the output file 
3200     * @param prog the Progressable to report status during the file write
3201     * @return Writer
3202     * @throws IOException
3203     */
3204    public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
3205                                      Progressable prog) throws IOException {
3206      Reader reader = new Reader(conf,
3207                                 Reader.file(inputFile),
3208                                 new Reader.OnlyHeaderOption());
3209      CompressionType compress = reader.getCompressionType();
3210      CompressionCodec codec = reader.getCompressionCodec();
3211      reader.close();
3212
3213      Writer writer = createWriter(conf, 
3214                                   Writer.file(outputFile), 
3215                                   Writer.keyClass(keyClass), 
3216                                   Writer.valueClass(valClass), 
3217                                   Writer.compression(compress, codec), 
3218                                   Writer.progressable(prog));
3219      return writer;
3220    }
3221
3222    /**
3223     * Writes records from RawKeyValueIterator into a file represented by the 
3224     * passed writer
3225     * @param records the RawKeyValueIterator
3226     * @param writer the Writer created earlier 
3227     * @throws IOException
3228     */
3229    public void writeFile(RawKeyValueIterator records, Writer writer) 
3230      throws IOException {
3231      while(records.next()) {
3232        writer.appendRaw(records.getKey().getData(), 0, 
3233                         records.getKey().getLength(), records.getValue());
3234      }
3235      writer.sync();
3236    }
3237        
3238    /** Merge the provided files.
3239     * @param inFiles the array of input path names
3240     * @param outFile the final output file
3241     * @throws IOException
3242     */
3243    public void merge(Path[] inFiles, Path outFile) throws IOException {
3244      if (fs.exists(outFile)) {
3245        throw new IOException("already exists: " + outFile);
3246      }
3247      RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
3248      Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
3249      
3250      writeFile(r, writer);
3251
3252      writer.close();
3253    }
3254
3255    /** sort calls this to generate the final merged output */
3256    private int mergePass(Path tmpDir) throws IOException {
3257      if(LOG.isDebugEnabled()) {
3258        LOG.debug("running merge pass");
3259      }
3260      Writer writer = cloneFileAttributes(
3261                                          outFile.suffix(".0"), outFile, null);
3262      RawKeyValueIterator r = merge(outFile.suffix(".0"), 
3263                                    outFile.suffix(".0.index"), tmpDir);
3264      writeFile(r, writer);
3265
3266      writer.close();
3267      return 0;
3268    }
3269
3270    /** Used by mergePass to merge the output of the sort
3271     * @param inName the name of the input file containing sorted segments
3272     * @param indexIn the offsets of the sorted segments
3273     * @param tmpDir the relative directory to store intermediate results in
3274     * @return RawKeyValueIterator
3275     * @throws IOException
3276     */
3277    private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
3278      throws IOException {
3279      //get the segments from indexIn
3280      //we create a SegmentContainer so that we can track segments belonging to
3281      //inName and delete inName as soon as we see that we have looked at all
3282      //the contained segments during the merge process & hence don't need 
3283      //them anymore
3284      SegmentContainer container = new SegmentContainer(inName, indexIn);
3285      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
3286      return mQueue.merge();
3287    }
3288    
3289    /** This class implements the core of the merge logic */
3290    private class MergeQueue extends PriorityQueue 
3291      implements RawKeyValueIterator {
3292      private boolean compress;
3293      private boolean blockCompress;
3294      private DataOutputBuffer rawKey = new DataOutputBuffer();
3295      private ValueBytes rawValue;
3296      private long totalBytesProcessed;
3297      private float progPerByte;
3298      private Progress mergeProgress = new Progress();
3299      private Path tmpDir;
3300      private Progressable progress = null; //handle to the progress reporting object
3301      private SegmentDescriptor minSegment;
3302      
3303      //a TreeMap used to store the segments sorted by size (segment offset and
3304      //segment path name is used to break ties between segments of same sizes)
3305      private Map<SegmentDescriptor, Void> sortedSegmentSizes =
3306        new TreeMap<SegmentDescriptor, Void>();
3307            
3308      @SuppressWarnings("unchecked")
3309      public void put(SegmentDescriptor stream) throws IOException {
3310        if (size() == 0) {
3311          compress = stream.in.isCompressed();
3312          blockCompress = stream.in.isBlockCompressed();
3313        } else if (compress != stream.in.isCompressed() || 
3314                   blockCompress != stream.in.isBlockCompressed()) {
3315          throw new IOException("All merged files must be compressed or not.");
3316        } 
3317        super.put(stream);
3318      }
3319      
3320      /**
3321       * A queue of file segments to merge
3322       * @param segments the file segments to merge
3323       * @param tmpDir a relative local directory to save intermediate files in
3324       * @param progress the reference to the Progressable object
3325       */
3326      public MergeQueue(List <SegmentDescriptor> segments,
3327          Path tmpDir, Progressable progress) {
3328        int size = segments.size();
3329        for (int i = 0; i < size; i++) {
3330          sortedSegmentSizes.put(segments.get(i), null);
3331        }
3332        this.tmpDir = tmpDir;
3333        this.progress = progress;
3334      }
3335      @Override
3336      protected boolean lessThan(Object a, Object b) {
3337        // indicate we're making progress
3338        if (progress != null) {
3339          progress.progress();
3340        }
3341        SegmentDescriptor msa = (SegmentDescriptor)a;
3342        SegmentDescriptor msb = (SegmentDescriptor)b;
3343        return comparator.compare(msa.getKey().getData(), 0, 
3344                                  msa.getKey().getLength(), msb.getKey().getData(), 0, 
3345                                  msb.getKey().getLength()) < 0;
3346      }
3347      @Override
3348      public void close() throws IOException {
3349        SegmentDescriptor ms;                           // close inputs
3350        while ((ms = (SegmentDescriptor)pop()) != null) {
3351          ms.cleanup();
3352        }
3353        minSegment = null;
3354      }
3355      @Override
3356      public DataOutputBuffer getKey() throws IOException {
3357        return rawKey;
3358      }
3359      @Override
3360      public ValueBytes getValue() throws IOException {
3361        return rawValue;
3362      }
3363      @Override
3364      public boolean next() throws IOException {
3365        if (size() == 0)
3366          return false;
3367        if (minSegment != null) {
3368          //minSegment is non-null for all invocations of next except the first
3369          //one. For the first invocation, the priority queue is ready for use
3370          //but for the subsequent invocations, first adjust the queue 
3371          adjustPriorityQueue(minSegment);
3372          if (size() == 0) {
3373            minSegment = null;
3374            return false;
3375          }
3376        }
3377        minSegment = (SegmentDescriptor)top();
3378        long startPos = minSegment.in.getPosition(); // Current position in stream
3379        //save the raw key reference
3380        rawKey = minSegment.getKey();
3381        //load the raw value. Re-use the existing rawValue buffer
3382        if (rawValue == null) {
3383          rawValue = minSegment.in.createValueBytes();
3384        }
3385        minSegment.nextRawValue(rawValue);
3386        long endPos = minSegment.in.getPosition(); // End position after reading value
3387        updateProgress(endPos - startPos);
3388        return true;
3389      }
3390      
3391      @Override
3392      public Progress getProgress() {
3393        return mergeProgress; 
3394      }
3395
3396      private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
3397        long startPos = ms.in.getPosition(); // Current position in stream
3398        boolean hasNext = ms.nextRawKey();
3399        long endPos = ms.in.getPosition(); // End position after reading key
3400        updateProgress(endPos - startPos);
3401        if (hasNext) {
3402          adjustTop();
3403        } else {
3404          pop();
3405          ms.cleanup();
3406        }
3407      }
3408
3409      private void updateProgress(long bytesProcessed) {
3410        totalBytesProcessed += bytesProcessed;
3411        if (progPerByte > 0) {
3412          mergeProgress.set(totalBytesProcessed * progPerByte);
3413        }
3414      }
3415      
3416      /** This is the single level merge that is called multiple times 
3417       * depending on the factor size and the number of segments
3418       * @return RawKeyValueIterator
3419       * @throws IOException
3420       */
3421      public RawKeyValueIterator merge() throws IOException {
3422        //create the MergeStreams from the sorted map created in the constructor
3423        //and dump the final output to a file
3424        int numSegments = sortedSegmentSizes.size();
3425        int origFactor = factor;
3426        int passNo = 1;
3427        LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir");
3428        do {
3429          //get the factor for this pass of merge
3430          factor = getPassFactor(passNo, numSegments);
3431          List<SegmentDescriptor> segmentsToMerge =
3432            new ArrayList<SegmentDescriptor>();
3433          int segmentsConsidered = 0;
3434          int numSegmentsToConsider = factor;
3435          while (true) {
3436            //extract the smallest 'factor' number of segment pointers from the 
3437            //TreeMap. Call cleanup on the empty segments (no key/value data)
3438            SegmentDescriptor[] mStream = 
3439              getSegmentDescriptors(numSegmentsToConsider);
3440            for (int i = 0; i < mStream.length; i++) {
3441              if (mStream[i].nextRawKey()) {
3442                segmentsToMerge.add(mStream[i]);
3443                segmentsConsidered++;
3444                // Count the fact that we read some bytes in calling nextRawKey()
3445                updateProgress(mStream[i].in.getPosition());
3446              }
3447              else {
3448                mStream[i].cleanup();
3449                numSegments--; //we ignore this segment for the merge
3450              }
3451            }
3452            //if we have the desired number of segments
3453            //or looked at all available segments, we break
3454            if (segmentsConsidered == factor || 
3455                sortedSegmentSizes.size() == 0) {
3456              break;
3457            }
3458              
3459            numSegmentsToConsider = factor - segmentsConsidered;
3460          }
3461          //feed the streams to the priority queue
3462          initialize(segmentsToMerge.size()); clear();
3463          for (int i = 0; i < segmentsToMerge.size(); i++) {
3464            put(segmentsToMerge.get(i));
3465          }
3466          //if we have lesser number of segments remaining, then just return the
3467          //iterator, else do another single level merge
3468          if (numSegments <= factor) {
3469            //calculate the length of the remaining segments. Required for 
3470            //calculating the merge progress
3471            long totalBytes = 0;
3472            for (int i = 0; i < segmentsToMerge.size(); i++) {
3473              totalBytes += segmentsToMerge.get(i).segmentLength;
3474            }
3475            if (totalBytes != 0) //being paranoid
3476              progPerByte = 1.0f / (float)totalBytes;
3477            //reset factor to what it originally was
3478            factor = origFactor;
3479            return this;
3480          } else {
3481            //we want to spread the creation of temp files on multiple disks if 
3482            //available under the space constraints
3483            long approxOutputSize = 0; 
3484            for (SegmentDescriptor s : segmentsToMerge) {
3485              approxOutputSize += s.segmentLength + 
3486                                  ChecksumFileSystem.getApproxChkSumLength(
3487                                  s.segmentLength);
3488            }
3489            Path tmpFilename = 
3490              new Path(tmpDir, "intermediate").suffix("." + passNo);
3491
3492            Path outputFile =  lDirAlloc.getLocalPathForWrite(
3493                                                tmpFilename.toString(),
3494                                                approxOutputSize, conf);
3495            if(LOG.isDebugEnabled()) { 
3496              LOG.debug("writing intermediate results to " + outputFile);
3497            }
3498            Writer writer = cloneFileAttributes(
3499                                                fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
3500                                                fs.makeQualified(outputFile), null);
3501            writer.sync = null; //disable sync for temp files
3502            writeFile(this, writer);
3503            writer.close();
3504            
3505            //we finished one single level merge; now clean up the priority 
3506            //queue
3507            this.close();
3508            
3509            SegmentDescriptor tempSegment = 
3510              new SegmentDescriptor(0,
3511                  fs.getFileStatus(outputFile).getLen(), outputFile);
3512            //put the segment back in the TreeMap
3513            sortedSegmentSizes.put(tempSegment, null);
3514            numSegments = sortedSegmentSizes.size();
3515            passNo++;
3516          }
3517          //we are worried about only the first pass merge factor. So reset the 
3518          //factor to what it originally was
3519          factor = origFactor;
3520        } while(true);
3521      }
3522  
3523      //Hadoop-591
3524      public int getPassFactor(int passNo, int numSegments) {
3525        if (passNo > 1 || numSegments <= factor || factor == 1) 
3526          return factor;
3527        int mod = (numSegments - 1) % (factor - 1);
3528        if (mod == 0)
3529          return factor;
3530        return mod + 1;
3531      }
3532      
3533      /** Return (& remove) the requested number of segment descriptors from the
3534       * sorted map.
3535       */
3536      public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
3537        if (numDescriptors > sortedSegmentSizes.size())
3538          numDescriptors = sortedSegmentSizes.size();
3539        SegmentDescriptor[] SegmentDescriptors = 
3540          new SegmentDescriptor[numDescriptors];
3541        Iterator iter = sortedSegmentSizes.keySet().iterator();
3542        int i = 0;
3543        while (i < numDescriptors) {
3544          SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
3545          iter.remove();
3546        }
3547        return SegmentDescriptors;
3548      }
3549    } // SequenceFile.Sorter.MergeQueue
3550
3551    /** This class defines a merge segment. This class can be subclassed to 
3552     * provide a customized cleanup method implementation. In this 
3553     * implementation, cleanup closes the file handle and deletes the file 
3554     */
3555    public class SegmentDescriptor implements Comparable {
3556      
3557      long segmentOffset; //the start of the segment in the file
3558      long segmentLength; //the length of the segment
3559      Path segmentPathName; //the path name of the file containing the segment
3560      boolean ignoreSync = true; //set to true for temp files
3561      private Reader in = null; 
3562      private DataOutputBuffer rawKey = null; //this will hold the current key
3563      private boolean preserveInput = false; //delete input segment files?
3564      
3565      /** Constructs a segment
3566       * @param segmentOffset the offset of the segment in the file
3567       * @param segmentLength the length of the segment
3568       * @param segmentPathName the path name of the file containing the segment
3569       */
3570      public SegmentDescriptor (long segmentOffset, long segmentLength, 
3571                                Path segmentPathName) {
3572        this.segmentOffset = segmentOffset;
3573        this.segmentLength = segmentLength;
3574        this.segmentPathName = segmentPathName;
3575      }
3576      
3577      /** Do the sync checks */
3578      public void doSync() {ignoreSync = false;}
3579      
3580      /** Whether to delete the files when no longer needed */
3581      public void preserveInput(boolean preserve) {
3582        preserveInput = preserve;
3583      }
3584
3585      public boolean shouldPreserveInput() {
3586        return preserveInput;
3587      }
3588      
3589      @Override
3590      public int compareTo(Object o) {
3591        SegmentDescriptor that = (SegmentDescriptor)o;
3592        if (this.segmentLength != that.segmentLength) {
3593          return (this.segmentLength < that.segmentLength ? -1 : 1);
3594        }
3595        if (this.segmentOffset != that.segmentOffset) {
3596          return (this.segmentOffset < that.segmentOffset ? -1 : 1);
3597        }
3598        return (this.segmentPathName.toString()).
3599          compareTo(that.segmentPathName.toString());
3600      }
3601
3602      @Override
3603      public boolean equals(Object o) {
3604        if (!(o instanceof SegmentDescriptor)) {
3605          return false;
3606        }
3607        SegmentDescriptor that = (SegmentDescriptor)o;
3608        if (this.segmentLength == that.segmentLength &&
3609            this.segmentOffset == that.segmentOffset &&
3610            this.segmentPathName.toString().equals(
3611              that.segmentPathName.toString())) {
3612          return true;
3613        }
3614        return false;
3615      }
3616
3617      @Override
3618      public int hashCode() {
3619        return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
3620      }
3621
3622      /** Fills up the rawKey object with the key returned by the Reader
3623       * @return true if there is a key returned; false, otherwise
3624       * @throws IOException
3625       */
3626      public boolean nextRawKey() throws IOException {
3627        if (in == null) {
3628          int bufferSize = getBufferSize(conf); 
3629          Reader reader = new Reader(conf,
3630                                     Reader.file(segmentPathName), 
3631                                     Reader.bufferSize(bufferSize),
3632                                     Reader.start(segmentOffset), 
3633                                     Reader.length(segmentLength));
3634        
3635          //sometimes we ignore syncs especially for temp merge files
3636          if (ignoreSync) reader.ignoreSync();
3637
3638          if (reader.getKeyClass() != keyClass)
3639            throw new IOException("wrong key class: " + reader.getKeyClass() +
3640                                  " is not " + keyClass);
3641          if (reader.getValueClass() != valClass)
3642            throw new IOException("wrong value class: "+reader.getValueClass()+
3643                                  " is not " + valClass);
3644          this.in = reader;
3645          rawKey = new DataOutputBuffer();
3646        }
3647        rawKey.reset();
3648        int keyLength = 
3649          in.nextRawKey(rawKey);
3650        return (keyLength >= 0);
3651      }
3652
3653      /** Fills up the passed rawValue with the value corresponding to the key
3654       * read earlier
3655       * @param rawValue
3656       * @return the length of the value
3657       * @throws IOException
3658       */
3659      public int nextRawValue(ValueBytes rawValue) throws IOException {
3660        int valLength = in.nextRawValue(rawValue);
3661        return valLength;
3662      }
3663      
3664      /** Returns the stored rawKey */
3665      public DataOutputBuffer getKey() {
3666        return rawKey;
3667      }
3668      
3669      /** closes the underlying reader */
3670      private void close() throws IOException {
3671        this.in.close();
3672        this.in = null;
3673      }
3674
3675      /** The default cleanup. Subclasses can override this with a custom 
3676       * cleanup 
3677       */
3678      public void cleanup() throws IOException {
3679        close();
3680        if (!preserveInput) {
3681          fs.delete(segmentPathName, true);
3682        }
3683      }
3684    } // SequenceFile.Sorter.SegmentDescriptor
3685    
3686    /** This class provisions multiple segments contained within a single
3687     *  file
3688     */
3689    private class LinkedSegmentsDescriptor extends SegmentDescriptor {
3690
3691      SegmentContainer parentContainer = null;
3692
3693      /** Constructs a segment
3694       * @param segmentOffset the offset of the segment in the file
3695       * @param segmentLength the length of the segment
3696       * @param segmentPathName the path name of the file containing the segment
3697       * @param parent the parent SegmentContainer that holds the segment
3698       */
3699      public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
3700                                       Path segmentPathName, SegmentContainer parent) {
3701        super(segmentOffset, segmentLength, segmentPathName);
3702        this.parentContainer = parent;
3703      }
3704      /** The default cleanup. Subclasses can override this with a custom 
3705       * cleanup 
3706       */
3707      @Override
3708      public void cleanup() throws IOException {
3709        super.close();
3710        if (super.shouldPreserveInput()) return;
3711        parentContainer.cleanup();
3712      }
3713      
3714      @Override
3715      public boolean equals(Object o) {
3716        if (!(o instanceof LinkedSegmentsDescriptor)) {
3717          return false;
3718        }
3719        return super.equals(o);
3720      }
3721    } //SequenceFile.Sorter.LinkedSegmentsDescriptor
3722
3723    /** The class that defines a container for segments to be merged. Primarily
3724     * required to delete temp files as soon as all the contained segments
3725     * have been looked at */
3726    private class SegmentContainer {
3727      private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
3728      private int numSegmentsContained; //# of segments contained
3729      private Path inName; //input file from where segments are created
3730      
3731      //the list of segments read from the file
3732      private ArrayList <SegmentDescriptor> segments = 
3733        new ArrayList <SegmentDescriptor>();
3734      /** This constructor is there primarily to serve the sort routine that 
3735       * generates a single output file with an associated index file */
3736      public SegmentContainer(Path inName, Path indexIn) throws IOException {
3737        //get the segments from indexIn
3738        FSDataInputStream fsIndexIn = fs.open(indexIn);
3739        long end = fs.getFileStatus(indexIn).getLen();
3740        while (fsIndexIn.getPos() < end) {
3741          long segmentOffset = WritableUtils.readVLong(fsIndexIn);
3742          long segmentLength = WritableUtils.readVLong(fsIndexIn);
3743          Path segmentName = inName;
3744          segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
3745                                                    segmentLength, segmentName, this));
3746        }
3747        fsIndexIn.close();
3748        fs.delete(indexIn, true);
3749        numSegmentsContained = segments.size();
3750        this.inName = inName;
3751      }
3752
3753      public List <SegmentDescriptor> getSegmentList() {
3754        return segments;
3755      }
3756      public void cleanup() throws IOException {
3757        numSegmentsCleanedUp++;
3758        if (numSegmentsCleanedUp == numSegmentsContained) {
3759          fs.delete(inName, true);
3760        }
3761      }
3762    } //SequenceFile.Sorter.SegmentContainer
3763
3764  } // SequenceFile.Sorter
3765
3766} // SequenceFile