1 | package felix.executor; |
2 | |
3 | import java.io.BufferedWriter; |
4 | import java.sql.ResultSet; |
5 | import java.util.ArrayList; |
6 | import java.util.HashMap; |
7 | import java.util.HashSet; |
8 | |
9 | import tuffy.db.RDB; |
10 | import tuffy.infer.DataMover; |
11 | import tuffy.mln.MarkovLogicNetwork; |
12 | import tuffy.mln.Predicate; |
13 | import tuffy.parse.CommandOptions; |
14 | import tuffy.util.Config; |
15 | import tuffy.util.ExceptionMan; |
16 | import tuffy.util.FileMan; |
17 | import tuffy.util.StringMan; |
18 | import tuffy.util.UIMan; |
19 | |
20 | import felix.dstruct.ConcurrentOperatorsBucket; |
21 | import felix.dstruct.ExecutionPlan; |
22 | import felix.dstruct.FelixPredicate; |
23 | import felix.dstruct.FelixQuery; |
24 | import felix.dstruct.StatOperator.OPType; |
25 | import felix.main.Felix; |
26 | import felix.optimizer.DMOOptimizer; |
27 | import felix.parser.FelixCommandOptions; |
28 | import felix.util.FelixConfig; |
29 | import felix.util.FelixUIMan; |
30 | |
31 | |
32 | |
33 | /** |
34 | * Class for executing a given physical {@link ExecutionPlan} using |
35 | * dual decomposition. |
36 | * |
37 | * @author Ce Zhang |
38 | * |
39 | */ |
40 | public class DDExecutor { |
41 | |
42 | /** |
43 | * The execution plan to be executed. |
44 | */ |
45 | ExecutionPlan ep; |
46 | |
47 | /** |
48 | * The FelixQuery used by this Executor. |
49 | */ |
50 | FelixQuery fq; |
51 | |
52 | /** |
53 | * The FelixCommandOptions used by this Executor. |
54 | */ |
55 | FelixCommandOptions options; |
56 | |
57 | /** |
58 | * The DMOOptimizer used by this Executor. |
59 | */ |
60 | public DMOOptimizer dmoo; |
61 | |
62 | /** |
63 | * Execute this plan. If Felix runs in |
64 | * explain mode, just prints out the physical plan. |
65 | * |
66 | * <p> TODO: Find a better way to the explain mode (e.g., a graph). |
67 | */ |
68 | @SuppressWarnings("unchecked") |
69 | public void run(){ |
70 | |
71 | try{ |
72 | |
73 | int sum = -1; |
74 | int nIt = FelixConfig.nDDIT; |
75 | |
76 | for(int i = ep.operators.size()-1; i >=0; i --){ |
77 | ep.operators.get(i).setNCore(Config.getNumThreads()); |
78 | ep.operators.get(i).prepareDB44DD(); |
79 | } |
80 | |
81 | // the first run of DD replicates the NON-DD executor. |
82 | // this is used to decides the scope of the downstream MLN |
83 | // operator. |
84 | FelixConfig.isFirstRunOfDD = true; |
85 | |
86 | for(int i = ep.operators.size()-1; i >=0; i --){ |
87 | dmoo.optimizeDMO(ep.operators.get(i)); |
88 | ep.operators.get(i).run(); |
89 | |
90 | } |
91 | |
92 | BufferedWriter bw = FileMan.getBufferedWriterMaybeGZ(options.fout + "_snap_0" ); |
93 | RDB newDB = RDB.getRDBbyConfig(Config.db_schema); |
94 | for(FelixPredicate fp : fq.getPredicates()){ |
95 | if(fp.hasQuery()){ |
96 | UIMan.println(">>> Dumping results for " + fp + "\n"); |
97 | this.dumpMapAnswerForPredicate(newDB, fp, bw); |
98 | } |
99 | } |
100 | bw.close(); |
101 | |
102 | FelixConfig.isFirstRunOfDD = false; |
103 | |
104 | // copy state of each table |
105 | HashSet<FelixPredicate> sharedPreds = new HashSet<FelixPredicate>(); |
106 | for(int i = ep.operators.size()-1; i >=0; i --){ |
107 | sharedPreds.addAll(ep.operators.get(i).dd_CommonOutput); |
108 | } |
109 | for(FelixPredicate fp : sharedPreds){ |
110 | newDB.dropTable("_copy_ori_" + fp.getRelName()); |
111 | String sql = "CREATE TABLE _copy_ori_" + fp.getRelName() |
112 | + " AS SELECT * FROM " + fp.getRelName() + ";"; |
113 | newDB.execute(sql); |
114 | } |
115 | newDB.close(); |
116 | |
117 | |
118 | while(sum != 0 && nIt-- > 0 ){ |
119 | |
120 | newDB = RDB.getRDBbyConfig(Config.db_schema); |
121 | |
122 | for(FelixPredicate fp : sharedPreds){ |
123 | newDB.dropTable(fp.getRelName()); |
124 | String sql = "CREATE TABLE " + fp.getRelName() |
125 | + " AS SELECT * FROM _copy_ori_" + fp.getRelName() + ";"; |
126 | newDB.execute(sql); |
127 | } |
128 | for(FelixPredicate fp : fq.getAllPred()){ |
129 | if(fp.isCorefPredicate && !sharedPreds.contains(fp)){ |
130 | newDB.dropTable(fp.getRelName()); |
131 | newDB.dropView(fp.getRelName()); |
132 | newDB.execute(fp.viewDef); |
133 | } |
134 | } |
135 | newDB.commit(); |
136 | newDB.close(); |
137 | |
138 | for(int i = ep.operators.size()-1; i >=0; i --){ |
139 | dmoo.optimizeDMO(ep.operators.get(i)); |
140 | ep.operators.get(i).run(); |
141 | } |
142 | |
143 | bw = FileMan.getBufferedWriterMaybeGZ(options.fout + "_snap_" + (FelixConfig.nDDIT-nIt) ); |
144 | newDB = RDB.getRDBbyConfig(Config.db_schema); |
145 | for(FelixPredicate fp : fq.getPredicates()){ |
146 | if(fp.hasQuery()){ |
147 | UIMan.println(">>> Dumping results for " + fp + "\n"); |
148 | this.dumpMapAnswerForPredicate(newDB, fp, bw); |
149 | } |
150 | } |
151 | newDB.close(); |
152 | bw.close(); |
153 | |
154 | sum = 0; |
155 | |
156 | //CURRENTLY ASSUME ONE PREDICATE IS SHARED BY ONLY TWO OPERATORS |
157 | // this is ensured by Felix's compiler. |
158 | for(FelixPredicate fp : fq.getAllPred()){ |
159 | |
160 | ConcurrentOperatorsBucket cob1 = null; |
161 | ConcurrentOperatorsBucket cob2 = null; |
162 | |
163 | for(int i = ep.operators.size()-1; i >=0; i --){ |
164 | if(ep.operators.get(i).dd_CommonOutput.contains(fp)){ |
165 | if(cob1 == null){ |
166 | cob1 = ep.operators.get(i); |
167 | }else if(cob2 == null){ |
168 | cob2 = ep.operators.get(i); |
169 | }else{ |
170 | ExceptionMan.die("ERROR 12: " + |
171 | "There must be something wrong with the compiler"); |
172 | } |
173 | } |
174 | } |
175 | |
176 | if(cob1==null || cob2==null){ |
177 | continue; |
178 | } |
179 | |
180 | String tableName1; |
181 | String tableName2; |
182 | String priorTable1; |
183 | String priorTable2; |
184 | |
185 | tableName1 = cob1.dd_commonOutputPredicate_2_tableName.get(fp); |
186 | tableName2 = cob2.dd_commonOutputPredicate_2_tableName.get(fp); |
187 | |
188 | priorTable1 = cob1.dd_commonOutputPredicate_2_priorTableName.get(fp); |
189 | priorTable2 = cob2.dd_commonOutputPredicate_2_priorTableName.get(fp); |
190 | |
191 | |
192 | String args = StringMan.commaList(fp.getArgs()); |
193 | ArrayList<String> whereargsarray = new ArrayList<String>(); |
194 | for(String a : fp.getArgs()){ |
195 | whereargsarray.add("t0." + a + "=" + "t1." + a); |
196 | } |
197 | String whereargs = StringMan.join(" AND ", whereargsarray); |
198 | ArrayList<String> t0argsArray = new ArrayList<String>(); |
199 | for(String a : fp.getArgs()){ |
200 | t0argsArray.add("t0." + a); |
201 | } |
202 | String t0args = StringMan.commaList(t0argsArray); |
203 | |
204 | String priorArgs = StringMan.commaList(fp.getArgs()); |
205 | priorArgs = priorArgs + "," + "float_" + (fp.arity()+1); |
206 | priorArgs = "truth, club, " + priorArgs; |
207 | |
208 | if(options.marginal == true){ |
209 | |
210 | String deltaCase = "(CASE WHEN abs(t0.prior-t1.prior)<0.1 THEN " + 0 + |
211 | " ELSE 0.9*(t0.prior-t1.prior) END) AS prior11"; |
212 | String negDeltaCase = "(CASE WHEN abs(t1.prior-t0.prior)<0.1 THEN " + 0 + |
213 | " ELSE 0.9*(t1.prior-t0.prior) END) AS prior11"; |
214 | |
215 | |
216 | String sql = "SELECT TRUE, 2, " + t0args + ", " + deltaCase +" FROM " + tableName1 + " t0, " + tableName2 + " t1 " + " WHERE " |
217 | + whereargs; |
218 | |
219 | sql = "INSERT INTO " + priorTable2 + "(" + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt WHERE nt.prior11<>0"; |
220 | Felix.db.update(sql); |
221 | UIMan.warn("INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob2); |
222 | |
223 | sql = "SELECT TRUE, 2, " + t0args + "," + negDeltaCase + " FROM " + tableName1 + " t0, " + tableName2 + " t1 " + " WHERE " |
224 | + whereargs; |
225 | |
226 | sql = "INSERT INTO " + priorTable1 + "(" + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt WHERE nt.prior11<>0"; |
227 | Felix.db.update(sql); |
228 | UIMan.warn("INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob1); |
229 | |
230 | sum += Felix.db.getLastUpdateRowCount(); |
231 | |
232 | }else{ |
233 | String sql = "SELECT TRUE, 2, " + args + ", " + (1.0/2) + " FROM " + tableName1 + " WHERE (" |
234 | + args + ") NOT IN ( SELECT " + args + " FROM " + tableName2 + ")" |
235 | + " UNION ALL " + |
236 | "SELECT TRUE, 2, " + args + ", " + (-1.0/2) + " FROM " + tableName2 + " WHERE (" |
237 | + args + ") NOT IN ( SELECT " + args + " FROM " + tableName1 + ")"; |
238 | sql = "INSERT INTO " + priorTable2 + "(" + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt"; |
239 | Felix.db.update(sql); |
240 | UIMan.warn("INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob2); |
241 | |
242 | sql = "SELECT TRUE, 2, " + args + ", " + (-1.0/2) + " FROM " + tableName1 + " WHERE (" |
243 | + args + ") NOT IN ( SELECT " + args + " FROM " + tableName2 + ")" |
244 | + " UNION ALL " + |
245 | "SELECT TRUE, 2, " + args + ", " + (1.0/2) + " FROM " + tableName2 + " WHERE (" |
246 | + args + ") NOT IN ( SELECT " + args + " FROM " + tableName1 + ")"; |
247 | sql = "INSERT INTO " + priorTable1 + "(" + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt"; |
248 | Felix.db.update(sql); |
249 | UIMan.warn("INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob1); |
250 | |
251 | sum += Felix.db.getLastUpdateRowCount(); |
252 | |
253 | } |
254 | } |
255 | |
256 | Felix.db.commit(); |
257 | |
258 | } |
259 | |
260 | |
261 | }catch(Exception e){ |
262 | e.printStackTrace(); |
263 | } |
264 | |
265 | |
266 | } |
267 | |
268 | /** |
269 | * The constructor. |
270 | * @param _ep |
271 | */ |
272 | public DDExecutor(ExecutionPlan _ep, FelixQuery _fq, FelixCommandOptions _options){ |
273 | ep = _ep; |
274 | dmoo = new DMOOptimizer(ep.getCostModel()); |
275 | fq = _fq; |
276 | options = _options; |
277 | } |
278 | |
279 | /** |
280 | * Output the results of this bucket. |
281 | * @param db |
282 | * @param fout |
283 | * @param p |
284 | */ |
285 | public void dumpMapAnswerForPredicate(RDB db, FelixPredicate p, BufferedWriter bufferedWriter) { |
286 | // spreadTruth(); |
287 | HashMap<Long,String> cmap = db.loadIdSymbolMapFromTable(); |
288 | try { |
289 | |
290 | int digits = 4; |
291 | |
292 | String sql; |
293 | String tableName = p.getRelName(); |
294 | String predName = p.getName(); |
295 | |
296 | if(options.useDualDecomposition){ |
297 | //sql = "SELECT * FROM " + tableName + " WHERE truth=TRUE ORDER BY prior DESC"; |
298 | if(!p.isCorefMapPredicate){ |
299 | if(p.belongsTo.dd_commonOutputPredicate_2_tableName.containsKey(p)){ |
300 | sql = "SELECT * FROM " + p.belongsTo.dd_commonOutputPredicate_2_tableName.get(p) + " WHERE truth=TRUE ORDER BY prior DESC"; |
301 | }else{ |
302 | sql = "SELECT * FROM " + p.getRelName() + " WHERE truth=TRUE ORDER BY prior DESC"; |
303 | } |
304 | }else{ |
305 | sql = "SELECT * FROM " + tableName + " ORDER BY prior DESC"; |
306 | } |
307 | }else{ |
308 | sql = "SELECT * FROM " + tableName + " WHERE truth=TRUE ORDER BY prior DESC"; |
309 | } |
310 | |
311 | ResultSet rs = db.query(sql); |
312 | while(rs == null){ |
313 | rs = db.query(sql); |
314 | } |
315 | while(rs.next()) { |
316 | String line = predName + "("; |
317 | ArrayList<String> cs = new ArrayList<String>(); |
318 | int ct = 0; |
319 | for(String a : p.getArgs()) { |
320 | |
321 | |
322 | if(p.getTypeAt(ct).isProbArg == true || p.getTypeAt(ct).isNonSymbolicType()){ |
323 | cs.add(rs.getDouble(a)+""); |
324 | }else{ |
325 | long c = rs.getLong(a); |
326 | |
327 | |
328 | String v = StringMan.escapeJavaString(cmap.get(c)); |
329 | |
330 | //if(v.matches("^[0-9].*$") && !StringMan.escapeJavaString(v).contains(" ")){ |
331 | // cs.add("" + StringMan.escapeJavaString(v) + ""); |
332 | //}else{ |
333 | cs.add("\"" + StringMan.escapeJavaString(v) + "\""); |
334 | //} |
335 | } |
336 | ct ++; |
337 | } |
338 | line += StringMan.commaList(cs) + ")"; |
339 | |
340 | double prior = 1; |
341 | if(options.marginal){ |
342 | |
343 | double prob; |
344 | if(rs.getString("prior") == null){ |
345 | prob = 1; |
346 | }else{ |
347 | prob = Double.valueOf(rs.getString("prior")); |
348 | } |
349 | |
350 | if(Config.output_prolog_format){ |
351 | |
352 | line = "tuffyPrediction(" + UIMan.decimalRound(digits, prob) + |
353 | ", " + line + ")."; |
354 | }else{ |
355 | line = UIMan.decimalRound(digits, prob) + "\t" + line; |
356 | |
357 | } |
358 | |
359 | }else{ |
360 | line = line; |
361 | } |
362 | |
363 | if(prior >= options.minProb){ |
364 | bufferedWriter.append(line + "\n"); |
365 | } |
366 | |
367 | } |
368 | rs.close(); |
369 | //bufferedWriter.close(); |
370 | } catch (Exception e) { |
371 | ExceptionMan.handle(e); |
372 | } |
373 | } |
374 | } |
375 | |
376 | |
377 | |
378 | |