1 | package felix.dstruct; |
2 | |
3 | |
4 | import java.io.BufferedWriter; |
5 | import java.io.FileInputStream; |
6 | import java.sql.ResultSet; |
7 | import java.util.ArrayList; |
8 | import java.util.HashMap; |
9 | import java.util.HashSet; |
10 | import java.util.concurrent.ExecutorService; |
11 | import java.util.concurrent.Executors; |
12 | |
13 | import felix.dstruct.StatOperator.OPType; |
14 | import felix.main.Felix; |
15 | import felix.parser.FelixCommandOptions; |
16 | import felix.society.TaskList; |
17 | import felix.society.TaskSet; |
18 | import felix.task.ExecuteOperatorTask; |
19 | import felix.task.OptimizeDMOTask; |
20 | import felix.util.FelixConfig; |
21 | |
22 | |
23 | import tuffy.db.RDB; |
24 | import tuffy.infer.DataMover; |
25 | import tuffy.mln.Literal; |
26 | import tuffy.mln.MarkovLogicNetwork; |
27 | import tuffy.mln.Predicate; |
28 | import tuffy.mln.Term; |
29 | import tuffy.mln.Type; |
30 | import tuffy.util.Config; |
31 | import tuffy.util.ExceptionMan; |
32 | import tuffy.util.FileMan; |
33 | import tuffy.util.StringMan; |
34 | import tuffy.util.UIMan; |
35 | |
36 | |
37 | /** |
38 | * An object of ConcurrentOperatorsBucket contains |
39 | * multiple statistical operators that can be executed in |
40 | * parallel. ConcurrentOperatorsBucket is the basic unit |
41 | * of the {@link ExecutionPlan}. |
42 | */ |
43 | public class ConcurrentOperatorsBucket extends Thread{ |
44 | |
45 | /** |
46 | * Set of statistical operators in this bucket. |
47 | */ |
48 | HashSet<StatOperator> concurrentOperators = new HashSet<StatOperator>(); |
49 | |
50 | /** |
51 | * When executing this bucket, this variable records the remaining operators. |
52 | */ |
53 | ArrayList<StatOperator> operatorStacks = new ArrayList<StatOperator>(); |
54 | |
55 | /** |
56 | * Predicates used as inputs. |
57 | */ |
58 | public HashSet<FelixPredicate> inputPredicates = new HashSet<FelixPredicate>(); |
59 | |
60 | /** |
61 | * Auxiliary structure for clearing the old data table and |
62 | * merge the output of statistical operators in this bucket. |
63 | * This is used for not clearing a table twice. |
64 | */ |
65 | public HashSet<String> cleared = new HashSet<String>(); |
66 | |
67 | /** |
68 | * Set of predicates that can potentially share with others (through |
69 | * dual decomposition). |
70 | */ |
71 | public HashSet<FelixPredicate> commonCandidates = new HashSet<FelixPredicate>(); |
72 | |
73 | /** |
74 | * Predicates output by this bucket. |
75 | */ |
76 | public HashSet<FelixPredicate> outputPredicates = new HashSet<FelixPredicate>(); |
77 | |
78 | /** |
79 | * If this bucket is a tuffy bucket, this set contains the result table |
80 | * name of their corresponding MLN. |
81 | */ |
82 | HashSet<String> subTuffyMLNRelTable = new HashSet<String>(); |
83 | |
84 | /** |
85 | * Register a table in {@link ConcurrentOperatorsBucket#subTuffyMLNRelTable}. |
86 | * @param tableName |
87 | */ |
88 | public void addMLNRelTable(String tableName){ |
89 | this.subTuffyMLNRelTable.add(tableName); |
90 | } |
91 | |
92 | /** |
93 | * Predicates as input. |
94 | */ |
95 | public HashSet<String> inputPredicateScope = new HashSet<String>(); |
96 | |
97 | /** |
98 | * Adds input predicate scopes. |
99 | * @param _predicates |
100 | * @deprecated |
101 | */ |
102 | public void pushPredicateScopes(HashSet<FelixPredicate> _predicates){ |
103 | for(FelixPredicate fp : _predicates){ |
104 | this.inputPredicateScope.add(fp.getName()); |
105 | } |
106 | } |
107 | |
108 | /** |
109 | * Degree of concurrent running of operators. |
110 | * @deprecated |
111 | */ |
112 | int nCore = 1; |
113 | |
114 | /** |
115 | * MAP/marginal. |
116 | */ |
117 | boolean isMarginal = false; |
118 | |
119 | /** |
120 | * Whether this bucket has ran its first operator. This variable is used to |
121 | * control some consistency issues. |
122 | * @deprecated |
123 | */ |
124 | boolean started = false; |
125 | |
126 | /** |
127 | * ID of this bucket. |
128 | */ |
129 | public int id = -1; |
130 | |
131 | /** |
132 | * Type of operators in this bucket. The construction of buckets |
133 | * ensures all operators in it are with the same type. |
134 | */ |
135 | public OPType type; |
136 | |
137 | /** |
138 | * Precedence of this bucket. The construction of buckets |
139 | * ensures all operators in it are with the same precedence. |
140 | */ |
141 | public int precedence = -1; |
142 | |
143 | /** |
144 | * Command line options. |
145 | */ |
146 | FelixCommandOptions options; |
147 | |
148 | /** |
149 | * Whether this bucket runs in marginal mode. |
150 | * @return |
151 | */ |
152 | public boolean isMarginal(){ |
153 | return this.isMarginal; |
154 | } |
155 | |
156 | /** |
157 | * The constructor. |
158 | * @param isMarginal |
159 | */ |
160 | public ConcurrentOperatorsBucket(boolean isMarginal){ |
161 | this.isMarginal = isMarginal; |
162 | } |
163 | |
164 | /** |
165 | * Get all operators in this bucket. |
166 | * @return |
167 | */ |
168 | public HashSet<StatOperator> getOperators(){ |
169 | return concurrentOperators; |
170 | } |
171 | |
172 | /** |
173 | * Get the precedence of this bucket. |
174 | * @return |
175 | */ |
176 | public int getPrecedence(){ |
177 | return precedence; |
178 | } |
179 | |
180 | /** |
181 | * Number of rules that only contains evidence relations. |
182 | * Intuitively, the larger this number, the earlier |
183 | * we should solve it while scheduling. |
184 | */ |
185 | public int nStartingRule; |
186 | |
187 | /** |
188 | * Add an operator into this bucket. |
189 | * @param sop |
190 | */ |
191 | public void addOperator(StatOperator sop){ |
192 | this.concurrentOperators.add(sop); |
193 | outputPredicates.addAll(sop.outputPredicates); |
194 | inputPredicates.addAll(sop.inputPredicates); |
195 | |
196 | for(FelixPredicate fp : sop.outputPredicates){ |
197 | fp.belongsTo = this; |
198 | } |
199 | |
200 | this.commonCandidates.addAll(sop.commonCandidate); |
201 | |
202 | this.nStartingRule = sop.nStartingRules; |
203 | options = sop.options; |
204 | this.type = sop.type; |
205 | this.precedence = sop.getPrecedence(); |
206 | sop.belongsToBucket = this; |
207 | } |
208 | |
209 | /** |
210 | * Set the degree of concurrency. |
211 | * @param _nCore |
212 | */ |
213 | public void setNCore(int _nCore){ |
214 | nCore = _nCore; |
215 | } |
216 | |
217 | /** |
218 | * Relations that are shared with other buckets through |
219 | * dual decomposition. |
220 | */ |
221 | public HashSet<FelixPredicate> dd_CommonOutput = new HashSet<FelixPredicate>(); |
222 | |
223 | /** |
224 | * Map from relations in {@link #dd_CommonOutput} |
225 | * to tables containing their temporary output result in |
226 | * each iteration. |
227 | */ |
228 | public HashMap<FelixPredicate, String> dd_commonOutputPredicate_2_tableName = |
229 | new HashMap<FelixPredicate, String>(); |
230 | |
231 | /** |
232 | * Map from relations in {@link #dd_CommonOutput} |
233 | * to tables containing their corresponding Langragian |
234 | * Multipliers. |
235 | */ |
236 | public HashMap<FelixPredicate, String> dd_commonOutputPredicate_2_priorTableName = |
237 | new HashMap<FelixPredicate, String>(); |
238 | |
239 | /** |
240 | * Adds predicate to {@link #dd_CommonOutput}. |
241 | * @param _fp |
242 | */ |
243 | public void addCommonOutput(FelixPredicate _fp){ |
244 | for(StatOperator sop : this.concurrentOperators){ |
245 | sop.dd_CommonOutput.add(_fp); |
246 | } |
247 | this.dd_CommonOutput.add(_fp); |
248 | } |
249 | |
250 | /** |
251 | * Prepares database for dual decomposition -- 1) add |
252 | * new rules for Lagaragian Multipliers; 2) create tables |
253 | * for intermediate output and priors etc.. |
254 | */ |
255 | public void prepareDB44DD(){ |
256 | if(options.useDualDecomposition){ |
257 | |
258 | RDB newDB = RDB.getRDBbyConfig(Config.db_schema); |
259 | |
260 | for(FelixPredicate fp : this.dd_CommonOutput){ |
261 | |
262 | // generate output table and build the map from predicate name to table name |
263 | if(fp.isCorefPredicate){ |
264 | String tableName = "_dd_" + fp.getName() + "_map_of_op" +this.id + "_" + this.type; |
265 | String viewName = "_dd_" + fp.getName() + "_of_op" +this.id + "_" + this.type; |
266 | |
267 | newDB.dropTable(tableName); |
268 | newDB.dropView(viewName); |
269 | newDB.execute("CREATE TABLE " + tableName + " AS (SELECT * FROM " + fp.corefMAPPredicate.getRelName() + " WHERE 1=2)"); |
270 | |
271 | this.dd_commonOutputPredicate_2_tableName.put(fp, viewName); |
272 | this.dd_commonOutputPredicate_2_tableName.put(fp.corefMAPPredicate, tableName); |
273 | |
274 | }else{ |
275 | String tableName = "_dd_" + fp.getName() + "_of_op" +this.id + "_" + this.type; |
276 | this.dd_commonOutputPredicate_2_tableName.put(fp, tableName); |
277 | |
278 | newDB.dropTable(tableName); |
279 | newDB.execute("CREATE TABLE " + tableName + " AS (SELECT * FROM " + fp.getRelName() + " WHERE 1=2)"); |
280 | } |
281 | |
282 | // generate prior predicate and new rules |
283 | String priorTableName = "_dd_prior_" + fp.getName() + "_of_op" + this.id + "_" + this.type; |
284 | FelixPredicate tmpPredicate = new FelixPredicate(priorTableName, true); |
285 | for(int i=0;i<fp.getArgs().size();i++){ |
286 | tmpPredicate.appendArgument(fp.getTypeAt(i)); |
287 | } |
288 | tmpPredicate.appendArgument(new Type("float_")); |
289 | tmpPredicate.prepareDB(Felix.db); |
290 | dd_commonOutputPredicate_2_priorTableName.put(fp, tmpPredicate.getRelName()); |
291 | |
292 | Literal tmpLiteral = new Literal(tmpPredicate, false); |
293 | for(int i=0;i<fp.getArgs().size();i++){ |
294 | tmpLiteral.appendTerm(new Term(String.valueOf((char)('a' + i)))); |
295 | } |
296 | tmpLiteral.appendTerm(new Term("wgt")); |
297 | |
298 | Literal tmpLiteral2 = new Literal(fp, true); |
299 | for(int i=0;i<fp.getArgs().size();i++){ |
300 | tmpLiteral2.appendTerm(new Term(String.valueOf((char)('a' + i)))); |
301 | } |
302 | |
303 | FelixClause fc = new FelixClause(); |
304 | fc.addLiteral(tmpLiteral); |
305 | fc.addLiteral(tmpLiteral2); |
306 | fc.setVarWeight("wgt"); |
307 | |
308 | for(StatOperator sop : this.concurrentOperators){ |
309 | sop.dd_PriorClauses.add(fc); |
310 | } |
311 | |
312 | |
313 | if(fp.isCorefPredicate && this.type == OPType.COREF){ |
314 | |
315 | FelixPredicate pmap = fp.corefMAPPredicate; |
316 | |
317 | priorTableName = "_dd_prior_" + pmap.getName() + "_of_op" + this.id + "_" + this.type; |
318 | tmpPredicate = new FelixPredicate(priorTableName, true); |
319 | for(int i=0;i<fp.getArgs().size();i++){ |
320 | tmpPredicate.appendArgument(fp.getTypeAt(i)); |
321 | } |
322 | tmpPredicate.appendArgument(new Type("float_")); |
323 | tmpPredicate.prepareDB(Felix.db); |
324 | dd_commonOutputPredicate_2_priorTableName.put(pmap, tmpPredicate.getRelName()); |
325 | |
326 | tmpLiteral = new Literal(tmpPredicate, false); |
327 | for(int i=0;i<fp.getArgs().size();i++){ |
328 | tmpLiteral.appendTerm(new Term(String.valueOf((char)('a' + i)))); |
329 | } |
330 | tmpLiteral.appendTerm(new Term("wgt")); |
331 | |
332 | tmpLiteral2 = new Literal(fp, true); |
333 | for(int i=0;i<fp.getArgs().size();i++){ |
334 | tmpLiteral2.appendTerm(new Term(String.valueOf((char)('a' + i)))); |
335 | } |
336 | |
337 | fc = new FelixClause(); |
338 | fc.addLiteral(tmpLiteral); |
339 | fc.addLiteral(tmpLiteral2); |
340 | fc.setVarWeight("wgt"); |
341 | |
342 | for(StatOperator sop : this.concurrentOperators){ |
343 | sop.dd_PriorClauses.add(fc); |
344 | } |
345 | } |
346 | |
347 | |
348 | } |
349 | |
350 | |
351 | newDB.close(); |
352 | |
353 | for(StatOperator sop : this.concurrentOperators){ |
354 | sop.dd_commonOutputPredicate_2_tableName = this.dd_commonOutputPredicate_2_tableName; |
355 | } |
356 | |
357 | } |
358 | } |
359 | |
360 | /** |
361 | * Run all operators in this bucket. |
362 | */ |
363 | public void run(){ |
364 | |
365 | cleared.clear(); |
366 | subTuffyMLNRelTable.clear(); |
367 | |
368 | for(Predicate p : outputPredicates){ |
369 | RDB newDB = RDB.getRDBbyConfig(Config.db_schema); |
370 | |
371 | if(p.isCurrentlyView == false && p.isClosedWorld() == false){ |
372 | if(FelixConfig.isFirstRunOfDD || !this.options.useDualDecomposition){ |
373 | String sql = "DELETE FROM " + p.getRelName(); |
374 | newDB.execute(sql); |
375 | } |
376 | } |
377 | if(this.type.equals(OPType.COREF)){ |
378 | if(!options.isDLearningMode){ |
379 | if(FelixConfig.isFirstRunOfDD || !this.options.useDualDecomposition){ |
380 | String sql = "DELETE FROM " + p.getRelName() + "_map"; |
381 | newDB.execute(sql); |
382 | } |
383 | } |
384 | } |
385 | newDB.close(); |
386 | //p.nextTupleID = 0; |
387 | } |
388 | |
389 | operatorStacks.clear(); |
390 | operatorStacks.addAll(concurrentOperators); |
391 | |
392 | if(this.type.equals(OPType.TUFFY)){ |
393 | nCore = 1; |
394 | } |
395 | |
396 | |
397 | |
398 | if(options.useDualDecomposition){ |
399 | |
400 | RDB newDB = RDB.getRDBbyConfig(Config.db_schema); |
401 | for(FelixPredicate fp : this.dd_CommonOutput){ |
402 | String tableName = this.dd_commonOutputPredicate_2_tableName.get(fp); |
403 | String sql = "DELETE from " + tableName; |
404 | newDB.execute(sql); |
405 | } |
406 | newDB.close(); |
407 | |
408 | if(this.type == OPType.TUFFY){ |
409 | |
410 | if(FelixConfig.isFirstRunOfDD){ |
411 | for(StatOperator sop : concurrentOperators){ |
412 | sop.currentState = false; |
413 | sop.run(); |
414 | } |
415 | this.cleanUp(); |
416 | }else{ |
417 | |
418 | // second, run tuffy on the output table, |
419 | // take prior as evidences |
420 | for(StatOperator sop : concurrentOperators){ |
421 | sop.currentState = false; |
422 | sop.run(); |
423 | } |
424 | this.cleanUp(); |
425 | |
426 | } |
427 | |
428 | }else{ |
429 | |
430 | ExecutorService pool = Executors.newFixedThreadPool(Config.getNumThreads()); |
431 | |
432 | TaskList tasks = new TaskList(); |
433 | |
434 | TaskSet taskset1 = new TaskSet(); |
435 | |
436 | for(StatOperator sop : concurrentOperators){ |
437 | taskset1.addSubTask(new ExecuteOperatorTask(sop)); |
438 | } |
439 | |
440 | tasks.addSubTask(taskset1); |
441 | try { |
442 | tasks.execute(pool); |
443 | } catch (Exception e) { |
444 | e.printStackTrace(); |
445 | } |
446 | |
447 | pool.shutdown(); |
448 | |
449 | this.cleanUp(); |
450 | } |
451 | |
452 | newDB.close(); |
453 | |
454 | }else{ |
455 | for(int i=0;i<nCore; i++){ |
456 | if(operatorStacks.size() == 0){ |
457 | continue; |
458 | } |
459 | StatOperator sop = operatorStacks.remove(operatorStacks.size() - 1); |
460 | sop.start(); |
461 | started = true; |
462 | } |
463 | } |
464 | } |
465 | |
466 | /** |
467 | * Merge the output of operators into a consistent DB table |
468 | * and dump them into files. |
469 | */ |
470 | public void cleanUp(){ |
471 | try{ |
472 | RDB newDB = RDB.getRDBbyConfig(); |
473 | newDB.execute("SET search_path = " + Config.db_schema); |
474 | |
475 | if(this.type.equals(OPType.TUFFY)){ |
476 | MarkovLogicNetwork tmpMLN = new MarkovLogicNetwork(); |
477 | tmpMLN.setDB(newDB); |
478 | DataMover datamover = new DataMover(tmpMLN); |
479 | |
480 | for(FelixPredicate p : outputPredicates){ |
481 | |
482 | if(p.isClosedWorld() == true){ |
483 | continue; |
484 | } |
485 | |
486 | datamover.updateOriTable(subTuffyMLNRelTable, p); |
487 | datamover.updateOriTable(subTuffyMLNRelTable, p, "_copy_of_" + p.getRelName()); |
488 | } |
489 | |
490 | for(FelixPredicate p : this.dd_CommonOutput){ |
491 | |
492 | |
493 | String tableName = this.dd_commonOutputPredicate_2_tableName.get(p); |
494 | |
495 | |
496 | newDB.execute("DELETE FROM " + this.dd_commonOutputPredicate_2_tableName.get(p)); |
497 | |
498 | datamover.updateOriTable(subTuffyMLNRelTable, p, |
499 | this.dd_commonOutputPredicate_2_tableName.get(p)); |
500 | |
501 | |
502 | } |
503 | |
504 | } |
505 | |
506 | newDB.close(); |
507 | }catch(Exception e){ |
508 | e.printStackTrace(); |
509 | } |
510 | } |
511 | |
512 | /** |
513 | * Run the next operator in this bucket. |
514 | * @deprecated |
515 | */ |
516 | public synchronized void runNextOperatorInBucket(){ |
517 | |
518 | if(operatorStacks.size() == 0){ |
519 | return; |
520 | } |
521 | |
522 | StatOperator sop = operatorStacks.remove(operatorStacks.size() - 1); |
523 | sop.start(); |
524 | } |
525 | |
526 | /** |
527 | * Return only if all the operators have been executed. |
528 | * @deprecated |
529 | */ |
530 | public void myJoin(){ |
531 | |
532 | try{ |
533 | int goal = this.concurrentOperators.size(); |
534 | |
535 | while(true){ |
536 | int ct = 0; |
537 | for(StatOperator sop : this.concurrentOperators){ |
538 | if(operatorStacks.contains(sop) || !started){ |
539 | continue; |
540 | } |
541 | sop.join(); |
542 | ct ++; |
543 | } |
544 | if(ct == goal){ |
545 | break; |
546 | } |
547 | } |
548 | |
549 | this.cleanUp(); |
550 | }catch(Exception e){ |
551 | e.printStackTrace(); |
552 | } |
553 | |
554 | } |
555 | |
556 | /** |
557 | * Returns string representation of bucket. |
558 | */ |
559 | public String toString(){ |
560 | |
561 | String ret = ""; |
562 | |
563 | for(StatOperator sop : this.concurrentOperators){ |
564 | ret += sop; |
565 | ret += "\n"; |
566 | } |
567 | |
568 | return ret; |
569 | } |
570 | |
571 | /** |
572 | * Returns string representation of bucket without partitioning info.. |
573 | * @return |
574 | */ |
575 | public String toNoParString(){ |
576 | |
577 | String ret = ""; |
578 | |
579 | for(StatOperator sop : this.concurrentOperators){ |
580 | ret += sop.toNoParString(); |
581 | ret += "\n"; |
582 | break; |
583 | } |
584 | |
585 | return ret; |
586 | } |
587 | |
588 | /** |
589 | * Output the results of this bucket. |
590 | * @param db |
591 | * @param fout |
592 | * @param p |
593 | * @deprecated |
594 | */ |
595 | public void dumpMapAnswerForPredicate(RDB db, String fout, FelixPredicate p) { |
596 | // spreadTruth(); |
597 | |
598 | int digits = 4; |
599 | |
600 | HashMap<Long,String> cmap = db.loadIdSymbolMapFromTable(); |
601 | try { |
602 | BufferedWriter bufferedWriter = FileMan.getBufferedWriterMaybeGZ(fout); |
603 | String sql; |
604 | String tableName = p.getRelName(); |
605 | String predName = p.getName(); |
606 | |
607 | if(this.type.equals(OPType.COREF)){ |
608 | tableName += "_map"; |
609 | predName += "_map"; |
610 | } |
611 | |
612 | sql = "SELECT * FROM " + tableName + " WHERE truth=TRUE"; |
613 | |
614 | |
615 | ResultSet rs = db.query(sql); |
616 | while(rs == null){ |
617 | rs = db.query(sql); |
618 | } |
619 | while(rs.next()) { |
620 | String line = ""; |
621 | ArrayList<String> cs = new ArrayList<String>(); |
622 | for(String a : p.getArgs()) { |
623 | int c = rs.getInt(a); |
624 | cs.add("\"" + StringMan.escapeJavaString(cmap.get(c)) + "\""); |
625 | } |
626 | line += StringMan.commaList(cs) + ")"; |
627 | |
628 | if(isMarginal){ |
629 | |
630 | if(Config.output_prolog_format){ |
631 | line = "tuffyPrediction(" + UIMan.decimalRound(digits, rs.getDouble("prior")) + |
632 | ", " + predName + "(" + line + "."; |
633 | }else{ |
634 | if(!this.type.equals(OPType.COREF)){ |
635 | line = rs.getString("prior") + "\t" + predName + "(" + line; |
636 | } |
637 | } |
638 | |
639 | }else{ |
640 | line = predName + "(" + line; |
641 | } |
642 | |
643 | bufferedWriter.append(line + "\n"); |
644 | } |
645 | rs.close(); |
646 | bufferedWriter.close(); |
647 | } catch (Exception e) { |
648 | ExceptionMan.handle(e); |
649 | } |
650 | } |
651 | |
652 | |
653 | } |