EMMA Coverage Report (generated Tue Aug 23 05:57:12 CDT 2011)
[all classes][felix.thirdpart]

COVERAGE SUMMARY FOR SOURCE FILE [XmlInputFormat.java]

nameclass, %method, %block, %line, %
XmlInputFormat.java100% (2/2)100% (12/12)91%  (197/217)87%  (42.6/49)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class XmlInputFormat100% (1/1)100% (3/3)68%  (15/22)50%  (3/6)
createRecordReader (InputSplit, TaskAttemptContext): RecordReader 100% (1/1)53%  (8/15)25%  (1/4)
<static initializer> 100% (1/1)100% (4/4)100% (2/2)
XmlInputFormat (): void 100% (1/1)100% (3/3)100% (1/1)
     
class XmlInputFormat$XmlRecordReader100% (1/1)100% (9/9)93%  (182/195)92%  (39.6/43)
next (LongWritable, Text): boolean 100% (1/1)81%  (46/57)76%  (7.6/10)
readUntilMatch (byte [], boolean): boolean 100% (1/1)95%  (42/44)92%  (12/13)
XmlInputFormat$XmlRecordReader (FileSplit, Configuration): void 100% (1/1)100% (51/51)100% (11/11)
close (): void 100% (1/1)100% (4/4)100% (2/2)
getCurrentKey (): LongWritable 100% (1/1)100% (3/3)100% (1/1)
getCurrentValue (): Text 100% (1/1)100% (3/3)100% (1/1)
getProgress (): float 100% (1/1)100% (15/15)100% (1/1)
initialize (InputSplit, TaskAttemptContext): void 100% (1/1)100% (1/1)100% (1/1)
nextKeyValue (): boolean 100% (1/1)100% (17/17)100% (3/3)

1/**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 
18package felix.thirdpart;
19 
20import java.io.IOException;
21 
22import com.google.common.base.Charsets;
23import com.google.common.io.Closeables;
24import org.apache.hadoop.conf.Configuration;
25import org.apache.hadoop.fs.FSDataInputStream;
26import org.apache.hadoop.fs.FileSystem;
27import org.apache.hadoop.fs.Path;
28import org.apache.hadoop.io.DataOutputBuffer;
29import org.apache.hadoop.io.LongWritable;
30import org.apache.hadoop.io.Text;
31import org.apache.hadoop.mapreduce.InputSplit;
32import org.apache.hadoop.mapreduce.RecordReader;
33import org.apache.hadoop.mapreduce.TaskAttemptContext;
34import org.apache.hadoop.mapreduce.lib.input.FileSplit;
35import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
36import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38 
39/**
40 * Reads records that are delimited by a specific begin/end tag
41 * -- ACK: THIS THIRD-PART CLASS IS NOT WRITTEN BY FELIX'S AUTHORS.
42 * 
43 */
44public class XmlInputFormat extends TextInputFormat {
45 
46  private static final Logger log = LoggerFactory.getLogger(XmlInputFormat.class);
47 
48  public static final String START_TAG_KEY = "xmlinput.start";
49  public static final String END_TAG_KEY = "xmlinput.end";
50 
51  /**
52   * Returns XMLRecord reader to read xml document.
53   */
54  @Override
55  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
56    try {
57      return new XmlRecordReader((FileSplit) split, context.getConfiguration());
58    } catch (IOException ioe) {
59      log.warn("Error while creating XmlRecordReader", ioe);
60      return null;
61    }
62  }
63 
64  /**
65   * XMLRecordReader class to read through a given xml document to output xml blocks as records as specified
66   * by the start tag and end tag
67   * 
68   */
69  public static class XmlRecordReader extends RecordReader<LongWritable, Text> {
70 
71    private final byte[] startTag;
72    private final byte[] endTag;
73    private final long start;
74    private final long end;
75    private final FSDataInputStream fsin;
76    private final DataOutputBuffer buffer = new DataOutputBuffer();
77    private LongWritable currentKey;
78    private Text currentValue;
79 
80    /**
81     * The constructor.
82     * @param split
83     * @param conf
84     * @throws IOException
85     */
86    public XmlRecordReader(FileSplit split, Configuration conf) throws IOException {
87      startTag = conf.get(START_TAG_KEY).getBytes(Charsets.UTF_8);
88      endTag = conf.get(END_TAG_KEY).getBytes(Charsets.UTF_8);
89 
90      // open the file and seek to the start of the split
91      start = split.getStart();
92      end = start + split.getLength();
93      Path file = split.getPath();
94      FileSystem fs = file.getFileSystem(conf);
95      fsin = fs.open(split.getPath());
96      fsin.seek(start);
97    }
98 
99    /**
100     * Sets next key, value and returns true if possible.
101     * @param key
102     * @param value
103     * @return
104     * @throws IOException
105     */
106    private boolean next(LongWritable key, Text value) throws IOException {
107      if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
108        try {
109          buffer.write(startTag);
110          if (readUntilMatch(endTag, true)) {
111            key.set(fsin.getPos());
112            value.set(buffer.getData(), 0, buffer.getLength());
113            return true;
114          }
115        } finally {
116          buffer.reset();
117        }
118      }
119      return false;
120    }
121 
122    @Override
123    public void close() throws IOException {
124      Closeables.closeQuietly(fsin);
125    }
126 
127    /**
128     * Returns progress through file.
129     */
130    @Override
131    public float getProgress() throws IOException {
132      return (fsin.getPos() - start) / (float) (end - start);
133    }
134 
135    /**
136     * Reads until given match is found.
137     * @param match
138     * @param withinBlock
139     * @return
140     * @throws IOException
141     */
142    private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
143      int i = 0;
144      while (true) {
145        int b = fsin.read();
146        // end of file:
147        if (b == -1) {
148          return false;
149        }
150        // save to buffer:
151        if (withinBlock) {
152          buffer.write(b);
153        }
154 
155        // check if we're matching:
156        if (b == match[i]) {
157          i++;
158          if (i >= match.length) {
159            return true;
160          }
161        } else {
162          i = 0;
163        }
164        // see if we've passed the stop point:
165        if (!withinBlock && i == 0 && fsin.getPos() >= end) {
166          return false;
167        }
168      }
169    }
170 
171    /**
172     * Returns current key.
173     */
174    @Override
175    public LongWritable getCurrentKey() throws IOException, InterruptedException {
176      return currentKey;
177    }
178 
179    /**
180     * Returns current value.
181     */
182    @Override
183    public Text getCurrentValue() throws IOException, InterruptedException {
184      return currentValue;
185    }
186 
187    @Override
188    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
189    }
190 
191    /**
192     * Sets next key, value.
193     */
194    @Override
195    public boolean nextKeyValue() throws IOException, InterruptedException {
196      currentKey = new LongWritable();
197      currentValue = new Text();
198      return next(currentKey, currentValue);
199    }
200  }
201}

[all classes][felix.thirdpart]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov