1 | package felix.io; |
2 | import edu.umd.cloud9.collection.XMLInputFormat; |
3 | |
4 | import java.io.IOException; |
5 | import java.util.ArrayList; |
6 | import java.util.Date; |
7 | import java.util.Iterator; |
8 | import java.util.StringTokenizer; |
9 | import org.apache.hadoop.streaming.StreamInputFormat; |
10 | import org.apache.hadoop.streaming.StreamXmlRecordReader; |
11 | |
12 | import org.apache.hadoop.conf.Configuration; |
13 | import org.apache.hadoop.conf.Configured; |
14 | import org.apache.hadoop.fs.FileSystem; |
15 | import org.apache.hadoop.fs.FileUtil; |
16 | import org.apache.hadoop.fs.Path; |
17 | import org.apache.hadoop.io.*; |
18 | import org.apache.hadoop.mapred.JobClient; |
19 | import org.apache.hadoop.mapred.OutputCollector; |
20 | import org.apache.hadoop.mapred.Reporter; |
21 | import org.apache.hadoop.mapred.lib.db.DBInputFormat; |
22 | |
23 | import org.apache.hadoop.mapreduce.Job; |
24 | import org.apache.hadoop.mapreduce.Mapper; |
25 | import org.apache.hadoop.mapreduce.Reducer; |
26 | import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
27 | import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; |
28 | import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
29 | import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; |
30 | |
31 | |
32 | import org.apache.hadoop.util.Tool; |
33 | import org.apache.hadoop.util.ToolRunner; |
34 | import org.python.core.PyArray; |
35 | import org.python.core.PyList; |
36 | import org.python.core.PyObject; |
37 | import org.python.core.PyString; |
38 | |
39 | import tuffy.util.ExceptionMan; |
40 | |
41 | import felix.thirdpart.XmlInputFormat; |
42 | import felix.util.FelixConfig; |
43 | |
44 | /** |
45 | * This class communicates with Hadoop. It translates |
46 | * the given MAP and REDUCE codes to Hadoop API. |
47 | * @author czhang |
48 | * |
49 | */ |
50 | public class TestHadoop extends Configured implements Tool{ |
51 | |
52 | /** |
53 | * MAP |
54 | * @author czhang |
55 | * |
56 | */ |
57 | public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>{ |
58 | |
59 | private Text word = new Text(); |
60 | public static PythonExecutor pyMap; |
61 | |
62 | public static String _inputvalue; |
63 | |
64 | @Override |
65 | public void setup(Mapper.Context contex) { |
66 | |
67 | pyMap = new PythonExecutor( |
68 | "_felix_donotusemyname_outkey=[]\n" + "_felix_donotusemyname_outvalue=[]\n"+ |
69 | contex.getConfiguration().get("pyMapScript")); |
70 | _inputvalue = contex.getConfiguration().get("mapinputvalue"); |
71 | |
72 | if(contex.getConfiguration().get("pyMapInitScript") != null){ |
73 | pyMap.execSingle(contex.getConfiguration().get("pyMapInitScript")); |
74 | } |
75 | |
76 | } |
77 | |
78 | public void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { |
79 | |
80 | pyMap.set(_inputvalue, new PyString(value.toString())); |
81 | |
82 | pyMap.run(); |
83 | PyList outKey = (PyList) pyMap.get("_felix_donotusemyname_outkey"); |
84 | PyList outValue = (PyList) pyMap.get("_felix_donotusemyname_outvalue"); |
85 | |
86 | for(int i=0;i<outKey.__len__();i++){ |
87 | PyObject _key = outKey.__getitem__(i); |
88 | PyObject _value = outValue.__getitem__(i); |
89 | |
90 | context.write(new Text(""+_key), new Text(""+_value)); |
91 | // System.out.println(""+key + "\t" + value); |
92 | } |
93 | |
94 | } |
95 | |
96 | } |
97 | |
98 | /** |
99 | * REDUCE |
100 | * @author czhang |
101 | * |
102 | */ |
103 | public static class Reduce extends Reducer<Text, Text, Text, Text>{ |
104 | |
105 | public static PythonExecutor pyReducer; |
106 | |
107 | public static String _inputkey; |
108 | public static String _inputvalues; |
109 | |
110 | @Override |
111 | public void setup(Reducer.Context contex) { |
112 | pyReducer = new PythonExecutor( |
113 | "_felix_donotusemyname_outkey=[]\n" + "_felix_donotusemyname_outvalue=[]\n" + |
114 | contex.getConfiguration().get("pyReduceScript")); |
115 | _inputkey = contex.getConfiguration().get("reduceinputkey"); |
116 | _inputvalues = contex.getConfiguration().get("reduceinputvalues"); |
117 | |
118 | if(contex.getConfiguration().get("pyReduceInitScript") != null){ |
119 | pyReducer.execSingle(contex.getConfiguration().get("pyReduceInitScript")); |
120 | } |
121 | |
122 | } |
123 | |
124 | protected void reduce(Text _key, Iterable _values, Context context) throws IOException, InterruptedException{ |
125 | |
126 | |
127 | ArrayList<PyString> args = new ArrayList<PyString>(); |
128 | Iterator it = _values.iterator(); |
129 | while(it.hasNext()){ |
130 | args.add(new PyString(it.next().toString())); |
131 | } |
132 | |
133 | PyList values = new PyList(args); |
134 | PyObject inputkey = new PyString(_key.toString()); |
135 | |
136 | pyReducer.set(_inputvalues, values); |
137 | pyReducer.set(_inputkey, inputkey); |
138 | |
139 | pyReducer.run(); |
140 | PyList outKey = (PyList) pyReducer.get("_felix_donotusemyname_outkey"); |
141 | PyList outValue = (PyList) pyReducer.get("_felix_donotusemyname_outvalue"); |
142 | |
143 | for(int i=0;i<outKey.__len__();i++){ |
144 | PyObject key = outKey.__getitem__(i); |
145 | PyObject value = outValue.__getitem__(i); |
146 | context.write(new Text("True\t1\t"+key), new Text(""+value)); |
147 | } |
148 | |
149 | } |
150 | |
151 | } |
152 | |
153 | /** |
154 | * @deprecated |
155 | * @param mapScript |
156 | * @param reduceScript |
157 | */ |
158 | public TestHadoop(String mapScript, String reduceScript){ |
159 | Map.pyMap = new PythonExecutor( |
160 | mapScript); |
161 | |
162 | Reduce.pyReducer = new PythonExecutor( |
163 | reduceScript); |
164 | } |
165 | |
166 | /** |
167 | * Execute a given task configuration (via arg0). |
168 | * @param toPass |
169 | * @throws Exception |
170 | */ |
171 | public static void executeHadoopProgram(String[] toPass) throws Exception{ |
172 | |
173 | String[] dirtoPass = toPass.clone(); |
174 | dirtoPass[2] += "_dir"; |
175 | int res = ToolRunner.run(new Configuration(), new TestHadoop(dirtoPass[3], dirtoPass[4]), |
176 | dirtoPass); |
177 | |
178 | } |
179 | |
180 | /** |
181 | * @deprecated |
182 | * @param toPass |
183 | * @throws Exception |
184 | */ |
185 | public static void post(String[] toPass) throws Exception{ |
186 | |
187 | Configuration fsconf = new Configuration(); |
188 | fsconf.set("fs.default.name", FelixConfig.hdfsServer); |
189 | FileSystem fileSystem = FileSystem.get(fsconf); |
190 | |
191 | String[] dirtoPass = toPass.clone(); |
192 | dirtoPass[2] += "_dir"; |
193 | |
194 | FileUtil.copyMerge(fileSystem, new Path(dirtoPass[2]), fileSystem, new Path(toPass[2]), true, |
195 | fsconf, ""); |
196 | |
197 | fileSystem.close(); |
198 | |
199 | } |
200 | |
201 | |
202 | /** |
203 | * Test entry. |
204 | * @param args |
205 | * @throws Exception |
206 | */ |
207 | public static void main(String[] args) throws Exception{ |
208 | |
209 | String[] toPass = { |
210 | |
211 | "hdfs://d-02.cs.wisc.edu:9000/felixNE/Entity.db", |
212 | //"hdfs://d-02.cs.wisc.edu:9000/firstMapReduceOut42/part-00000", |
213 | |
214 | "hdfs://d-02.cs.wisc.edu:9000/firstMapReduceOut100", |
215 | |
216 | "for k in _input.split(' '):\n"+ |
217 | "\t_outkey.append(k)\n"+ |
218 | "\t_outvalue.append('1')\n", |
219 | |
220 | "_outkey.append(_inputkey)\n"+ |
221 | "_outvalue.append(len(_inputvalues))\n" |
222 | |
223 | }; |
224 | |
225 | TestHadoop.executeHadoopProgram(toPass); |
226 | |
227 | |
228 | } |
229 | |
230 | |
231 | @Override |
232 | /** |
233 | * Execute a given task configuration (via arg0). |
234 | */ |
235 | public int run(String[] arg0) throws Exception { |
236 | |
237 | |
238 | Configuration conf = getConf(); |
239 | |
240 | if(!FelixConfig.hadoopLocal){ |
241 | conf.set("fs.defaultFS", FelixConfig.hdfsServer); |
242 | conf.set("mapred.job.tracker", FelixConfig.mrServer); |
243 | conf.set("mapred.child.java.opts", "-Xmx3192m"); |
244 | } |
245 | conf.set("pyMapScript", arg0[3]); |
246 | conf.set("pyReduceScript", arg0[4]); |
247 | conf.set("mapinputvalue", arg0[5]); |
248 | conf.set("reduceinputkey", arg0[6]); |
249 | conf.set("reduceinputvalues", arg0[7]); |
250 | conf.set("pyMapInitScript", arg0[8]); |
251 | conf.set("pyReduceInitScript", arg0[9]); |
252 | |
253 | |
254 | if(arg0[0].equals("xml")){ |
255 | conf.set("xmlinput.start", arg0[10]); |
256 | conf.set("xmlinput.end", arg0[11]); |
257 | } |
258 | |
259 | Job job = new Job(conf, "Felix_Run_On" + (new Date()).toLocaleString()); |
260 | |
261 | job.setNumReduceTasks(FelixConfig.nReduce); |
262 | |
263 | job.setJarByClass(TestHadoop.class); |
264 | |
265 | job.setMapperClass(Map.class); |
266 | job.setReducerClass(Reduce.class); |
267 | |
268 | job.setOutputKeyClass(Text.class); |
269 | job.setOutputValueClass(Text.class); |
270 | |
271 | FileInputFormat.addInputPath(job, new Path(arg0[1])); |
272 | FileOutputFormat.setOutputPath(job, new Path(arg0[2])); |
273 | |
274 | |
275 | if(arg0[0].equals("xml")){ |
276 | //XML |
277 | |
278 | job.setInputFormatClass(XmlInputFormat.class); |
279 | job.setOutputFormatClass(TextOutputFormat.class); |
280 | |
281 | job.setOutputKeyClass(Text.class); |
282 | job.setOutputValueClass(Text.class); |
283 | |
284 | }else if(arg0[0].equals("standard")){ |
285 | |
286 | job.setInputFormatClass(TextInputFormat.class); |
287 | job.setOutputFormatClass(TextOutputFormat.class); |
288 | |
289 | job.setOutputKeyClass(Text.class); |
290 | job.setOutputValueClass(Text.class); |
291 | } |
292 | |
293 | job.waitForCompletion(true); |
294 | return 0; |
295 | } |
296 | |
297 | } |