EMMA Coverage Report (generated Sat Aug 20 11:00:51 CDT 2011)
[all classes][felix.io]

COVERAGE SUMMARY FOR SOURCE FILE [TestHadoop.java]

nameclass, %method, %block, %line, %
TestHadoop.java100% (3/3)82%  (9/11)82%  (421/511)81%  (86/106)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class TestHadoop100% (1/1)60%  (3/5)71%  (203/287)70%  (44/63)
main (String []): void 0%   (0/1)0%   (0/22)0%   (0/7)
post (String []): void 0%   (0/1)0%   (0/50)0%   (0/9)
run (String []): int 100% (1/1)93%  (157/169)92%  (33/36)
TestHadoop (String, String): void 100% (1/1)100% (13/13)100% (6/6)
executeHadoopProgram (String []): void 100% (1/1)100% (33/33)100% (5/5)
     
class TestHadoop$Reduce100% (1/1)100% (3/3)95%  (122/128)96%  (24/25)
setup (Reducer$Context): void 100% (1/1)83%  (30/36)88%  (7/8)
TestHadoop$Reduce (): void 100% (1/1)100% (3/3)100% (1/1)
reduce (Text, Iterable, Reducer$Context): void 100% (1/1)100% (89/89)100% (16/16)
     
class TestHadoop$Map100% (1/1)100% (3/3)100% (96/96)100% (18/18)
TestHadoop$Map (): void 100% (1/1)100% (8/8)100% (2/2)
map (LongWritable, Text, Mapper$Context): void 100% (1/1)100% (57/57)100% (9/9)
setup (Mapper$Context): void 100% (1/1)100% (31/31)100% (7/7)

1package felix.io;
2import edu.umd.cloud9.collection.XMLInputFormat;
3 
4import java.io.IOException;
5import java.util.ArrayList;
6import java.util.Date;
7import java.util.Iterator;
8import java.util.StringTokenizer;
9import org.apache.hadoop.streaming.StreamInputFormat;
10import org.apache.hadoop.streaming.StreamXmlRecordReader;
11 
12import org.apache.hadoop.conf.Configuration;
13import org.apache.hadoop.conf.Configured;
14import org.apache.hadoop.fs.FileSystem;
15import org.apache.hadoop.fs.FileUtil;
16import org.apache.hadoop.fs.Path;
17import org.apache.hadoop.io.*;
18import org.apache.hadoop.mapred.JobClient;
19import org.apache.hadoop.mapred.OutputCollector;
20import org.apache.hadoop.mapred.Reporter;
21import org.apache.hadoop.mapred.lib.db.DBInputFormat;
22 
23import org.apache.hadoop.mapreduce.Job;
24import org.apache.hadoop.mapreduce.Mapper;
25import org.apache.hadoop.mapreduce.Reducer;
26import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
27import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
28import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
29import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
30 
31 
32import org.apache.hadoop.util.Tool;
33import org.apache.hadoop.util.ToolRunner;
34import org.python.core.PyArray;
35import org.python.core.PyList;
36import org.python.core.PyObject;
37import org.python.core.PyString;
38 
39import tuffy.util.ExceptionMan;
40 
41import felix.thirdpart.XmlInputFormat;
42import felix.util.FelixConfig;
43 
44/**
45 * This class communicates with Hadoop. It translates
46 * the given MAP and REDUCE codes to Hadoop API.
47 * @author czhang
48 *
49 */
50public class TestHadoop extends Configured implements Tool{
51 
52        /**
53         * MAP
54         * @author czhang
55         *
56         */
57        public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>{
58                
59                private Text word = new Text();
60                public static PythonExecutor pyMap;
61                
62                public static String _inputvalue;
63                
64                @Override
65                public void setup(Mapper.Context contex) {
66 
67                        pyMap = new PythonExecutor(
68                                        "_felix_donotusemyname_outkey=[]\n" + "_felix_donotusemyname_outvalue=[]\n"+
69                                        contex.getConfiguration().get("pyMapScript"));
70                        _inputvalue = contex.getConfiguration().get("mapinputvalue");
71                        
72                        if(contex.getConfiguration().get("pyMapInitScript") != null){
73                                pyMap.execSingle(contex.getConfiguration().get("pyMapInitScript"));
74                        }
75                        
76                }
77 
78                public void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {        
79                        
80                        pyMap.set(_inputvalue, new PyString(value.toString()));
81                        
82                        pyMap.run();
83                        PyList outKey = (PyList) pyMap.get("_felix_donotusemyname_outkey");
84                        PyList outValue = (PyList) pyMap.get("_felix_donotusemyname_outvalue");
85                        
86                        for(int i=0;i<outKey.__len__();i++){
87                                PyObject _key = outKey.__getitem__(i);
88                                PyObject _value = outValue.__getitem__(i);
89                                
90                                context.write(new Text(""+_key), new Text(""+_value));
91                        //        System.out.println(""+key + "\t" + value);
92                        }
93                        
94                }
95                
96        }
97        
98        /**
99         * REDUCE
100         * @author czhang
101         *
102         */
103        public static class Reduce extends Reducer<Text, Text, Text, Text>{
104                
105                public static PythonExecutor pyReducer;
106                
107                public static String _inputkey;
108                public static String _inputvalues;
109                
110                @Override
111                public void setup(Reducer.Context contex) {
112                        pyReducer = new PythonExecutor(
113                                        "_felix_donotusemyname_outkey=[]\n" + "_felix_donotusemyname_outvalue=[]\n" +
114                                        contex.getConfiguration().get("pyReduceScript"));
115                        _inputkey = contex.getConfiguration().get("reduceinputkey");
116                        _inputvalues = contex.getConfiguration().get("reduceinputvalues");
117                        
118                        if(contex.getConfiguration().get("pyReduceInitScript") != null){
119                                pyReducer.execSingle(contex.getConfiguration().get("pyReduceInitScript"));
120                        }
121                        
122                }
123 
124                protected void reduce(Text _key, Iterable _values, Context context) throws IOException, InterruptedException{
125                        
126                
127                        ArrayList<PyString> args = new ArrayList<PyString>();
128                        Iterator it = _values.iterator();
129                        while(it.hasNext()){
130                                args.add(new PyString(it.next().toString()));
131                        }
132                                                
133                        PyList values = new PyList(args);
134                        PyObject inputkey  = new PyString(_key.toString());
135                        
136                        pyReducer.set(_inputvalues, values);
137                        pyReducer.set(_inputkey, inputkey);
138                        
139                        pyReducer.run();
140                        PyList outKey = (PyList) pyReducer.get("_felix_donotusemyname_outkey");
141                        PyList outValue = (PyList) pyReducer.get("_felix_donotusemyname_outvalue");
142                        
143                        for(int i=0;i<outKey.__len__();i++){
144                                PyObject key = outKey.__getitem__(i);
145                                PyObject value = outValue.__getitem__(i);
146                                context.write(new Text("True\t1\t"+key), new Text(""+value));
147                        }
148                        
149                }
150                
151        }
152        
153        /**
154         * @deprecated
155         * @param mapScript
156         * @param reduceScript
157         */
158        public TestHadoop(String mapScript, String reduceScript){
159                Map.pyMap = new PythonExecutor(
160                                mapScript);
161                
162                Reduce.pyReducer = new PythonExecutor(
163                                reduceScript);
164        }
165        
166        /**
167         * Execute a given task configuration (via arg0).
168         * @param toPass
169         * @throws Exception
170         */
171        public static void executeHadoopProgram(String[] toPass) throws Exception{
172                
173                String[] dirtoPass = toPass.clone();
174                dirtoPass[2] += "_dir";
175                int res = ToolRunner.run(new Configuration(), new TestHadoop(dirtoPass[3], dirtoPass[4]), 
176                                dirtoPass);
177                                
178        }
179        
180        /**
181         * @deprecated
182         * @param toPass
183         * @throws Exception
184         */
185        public static void post(String[] toPass) throws Exception{
186                
187                Configuration fsconf = new Configuration();
188                fsconf.set("fs.default.name", FelixConfig.hdfsServer);
189        FileSystem fileSystem = FileSystem.get(fsconf);
190                
191                String[] dirtoPass = toPass.clone();
192                dirtoPass[2] += "_dir";
193                
194                FileUtil.copyMerge(fileSystem, new Path(dirtoPass[2]), fileSystem, new Path(toPass[2]), true,
195                                fsconf, "");
196                
197                fileSystem.close();
198                        
199        }
200        
201        
202        /**
203         * Test entry.
204         * @param args
205         * @throws Exception
206         */
207        public static void main(String[] args) throws Exception{
208        
209                String[] toPass = {
210                                
211                                "hdfs://d-02.cs.wisc.edu:9000/felixNE/Entity.db",
212                                //"hdfs://d-02.cs.wisc.edu:9000/firstMapReduceOut42/part-00000",
213                                
214                                "hdfs://d-02.cs.wisc.edu:9000/firstMapReduceOut100",
215                                
216                                "for k in _input.split(' '):\n"+
217                                "\t_outkey.append(k)\n"+
218                                "\t_outvalue.append('1')\n",
219                                
220                                "_outkey.append(_inputkey)\n"+
221                                "_outvalue.append(len(_inputvalues))\n"
222                                
223                };
224                
225                TestHadoop.executeHadoopProgram(toPass);
226                
227        
228        }
229 
230 
231        @Override
232        /**
233         * Execute a given task configuration (via arg0).
234         */
235        public int run(String[] arg0) throws Exception {
236                                
237 
238                Configuration conf = getConf();
239                
240                if(!FelixConfig.hadoopLocal){
241                        conf.set("fs.defaultFS", FelixConfig.hdfsServer);
242                        conf.set("mapred.job.tracker", FelixConfig.mrServer);
243                        conf.set("mapred.child.java.opts", "-Xmx3192m");
244                }
245                conf.set("pyMapScript", arg0[3]);
246                conf.set("pyReduceScript", arg0[4]);
247                conf.set("mapinputvalue", arg0[5]);
248                conf.set("reduceinputkey", arg0[6]);
249                conf.set("reduceinputvalues", arg0[7]);
250                conf.set("pyMapInitScript", arg0[8]);
251                conf.set("pyReduceInitScript", arg0[9]);
252        
253                
254                if(arg0[0].equals("xml")){
255                        conf.set("xmlinput.start", arg0[10]);
256                        conf.set("xmlinput.end", arg0[11]);
257                }
258                
259                Job job = new Job(conf, "Felix_Run_On" + (new Date()).toLocaleString());
260 
261                job.setNumReduceTasks(FelixConfig.nReduce);
262                
263                job.setJarByClass(TestHadoop.class);
264                
265                job.setMapperClass(Map.class);
266                job.setReducerClass(Reduce.class);
267 
268                job.setOutputKeyClass(Text.class);
269                job.setOutputValueClass(Text.class);
270 
271                FileInputFormat.addInputPath(job, new Path(arg0[1]));
272                FileOutputFormat.setOutputPath(job, new Path(arg0[2]));
273                
274                
275                if(arg0[0].equals("xml")){
276                        //XML
277                                                
278                        job.setInputFormatClass(XmlInputFormat.class);
279                        job.setOutputFormatClass(TextOutputFormat.class);
280                        
281                        job.setOutputKeyClass(Text.class);
282                        job.setOutputValueClass(Text.class);
283                        
284                }else if(arg0[0].equals("standard")){
285                        
286                        job.setInputFormatClass(TextInputFormat.class);
287                        job.setOutputFormatClass(TextOutputFormat.class);
288                        
289                        job.setOutputKeyClass(Text.class);
290                        job.setOutputValueClass(Text.class);
291                }
292                
293                job.waitForCompletion(true);
294                return 0;
295        }
296        
297}

[all classes][felix.io]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov