1 | package felix.dstruct; |
2 | |
3 | import java.util.ArrayList; |
4 | import java.util.Collection; |
5 | import java.util.HashMap; |
6 | import java.util.HashSet; |
7 | import java.util.List; |
8 | |
9 | import felix.dstruct.FelixPredicate.FPProperty; |
10 | import felix.parser.FelixCommandOptions; |
11 | import felix.util.FelixStringMan; |
12 | import felix.util.FelixUIMan; |
13 | |
14 | import tuffy.db.RDB; |
15 | import tuffy.mln.Predicate; |
16 | import tuffy.ra.ConjunctiveQuery; |
17 | import tuffy.ra.Expression; |
18 | import tuffy.util.ExceptionMan; |
19 | import tuffy.util.StringMan; |
20 | |
21 | |
22 | /** |
23 | * Abstract class of a statistical operator (e.g., Tuffy, Coref, CRF etc. ). |
24 | * A valid Felix operator should extend this class and implement methods |
25 | * like run(), prepare(), etc. |
26 | * |
27 | * @author Ce Zhang |
28 | * |
29 | */ |
30 | public abstract class StatOperator extends Thread implements Cloneable{ |
31 | |
32 | /** |
33 | * Parameter assigning the degree of data partitioning. |
34 | */ |
35 | public int partitionedInto = 1; |
36 | |
37 | /** |
38 | * @deprecated |
39 | */ |
40 | public boolean currentState = false; |
41 | |
42 | /** |
43 | * The set of predicates that are shared with other |
44 | * operators via dual decomposition. |
45 | */ |
46 | protected HashSet<FelixPredicate> dd_CommonOutput = new HashSet<FelixPredicate>(); |
47 | |
48 | /** |
49 | * Map from predicate to database table name |
50 | */ |
51 | protected HashMap<FelixPredicate, String> dd_commonOutputPredicate_2_tableName = |
52 | new HashMap<FelixPredicate, String>(); |
53 | |
54 | /** |
55 | * Type of StatOperator. |
56 | */ |
57 | public enum OPType {CRF, LR, COREF, TUFFY}; |
58 | |
59 | /** |
60 | * The human-readable representation of data partitions. This is only used in toString() |
61 | */ |
62 | public String dataCrackerSignature = null; |
63 | |
64 | /** |
65 | * Rules for Lagrangian Multipliers. |
66 | */ |
67 | public HashSet<FelixClause> dd_PriorClauses = new HashSet<FelixClause>(); |
68 | |
69 | /** |
70 | * Map from clauses to expressions partitioning the data. |
71 | */ |
72 | public HashMap<FelixClause, HashSet<Expression>> clauseConstraints = |
73 | new HashMap<FelixClause, HashSet<Expression>>(); |
74 | |
75 | /** |
76 | * The ConcurrentOperatorsBucket this StatOperator belongs to. |
77 | * StatOperators in one ConcurrentOperatorsBucket can be executed |
78 | * in parallel. |
79 | */ |
80 | public ConcurrentOperatorsBucket belongsToBucket = null; |
81 | |
82 | /** |
83 | * Whether this operator works in marginal or MAP mode. |
84 | */ |
85 | public boolean isMarginal = true; |
86 | |
87 | /** |
88 | * Options parsed from command line or configuration file. |
89 | */ |
90 | protected FelixCommandOptions options = null; |
91 | |
92 | /** |
93 | * Predicates computed by previous operators. |
94 | */ |
95 | protected HashSet<String> inputPredicateScope = new HashSet<String>(); |
96 | |
97 | /** |
98 | * Max time this operator can run. This parameter will be useful |
99 | * when there are cycles in the operator graph. |
100 | * @deprecated |
101 | */ |
102 | int maxRunTime = 5; |
103 | |
104 | /** |
105 | * Set of predicates that can potentially be shared with |
106 | * other operators via dual decomposition. |
107 | */ |
108 | public HashSet<FelixPredicate> commonCandidate = new HashSet<FelixPredicate>(); |
109 | |
110 | /** |
111 | * How many times this operator has run. |
112 | * @deprecated |
113 | */ |
114 | int currentRunTime = 0; |
115 | |
116 | /** |
117 | * Global counter of StatOperators ID. |
118 | */ |
119 | static int idCounter = 1; |
120 | |
121 | /** |
122 | * ID of this operator. Note that, the statistical operator returned by |
123 | * .clone() has a different ID. |
124 | */ |
125 | int id = -1; |
126 | |
127 | /** |
128 | * Type of this operator. |
129 | */ |
130 | public OPType type = null; |
131 | |
132 | /** |
133 | * Set of predicates whose values are used by this operator as inputs. |
134 | */ |
135 | public HashSet<FelixPredicate> inputPredicates = new HashSet<FelixPredicate>(); |
136 | |
137 | /** |
138 | * Set of predicates whose values are output by this operator. |
139 | */ |
140 | public HashSet<FelixPredicate> outputPredicates = new HashSet<FelixPredicate>(); |
141 | |
142 | /** |
143 | * Set of clauses assigned to this operator. |
144 | */ |
145 | public HashSet<FelixClause> allRelevantFelixClause = new HashSet<FelixClause>(); |
146 | |
147 | /** |
148 | * Database connection. |
149 | */ |
150 | public RDB db; |
151 | |
152 | /** |
153 | * Felix query. |
154 | */ |
155 | public FelixQuery fq; |
156 | |
157 | /** |
158 | * @deprecated |
159 | */ |
160 | public boolean isBinaryArbLR = false; |
161 | |
162 | /** |
163 | * The precedence of this operator. The larger this value, the earlier this operator |
164 | * will be run in the final physical plan. |
165 | */ |
166 | protected int precedence = -1; |
167 | |
168 | /** |
169 | * @deprecated |
170 | * @param _predicates |
171 | */ |
172 | public void pushPredicateScopes(HashSet<String> _predicates){ |
173 | this.inputPredicateScope.addAll(_predicates); |
174 | } |
175 | |
176 | /** |
177 | * Returns a clone of this statistical operator. |
178 | */ |
179 | @SuppressWarnings("unchecked") |
180 | public StatOperator clone(){ |
181 | try{ |
182 | StatOperator ret; |
183 | |
184 | if(this.type == OPType.LR){ |
185 | ret = this.getClass().getConstructor( |
186 | this.fq.getClass(), |
187 | this.outputPredicates.getClass(), |
188 | this.options.getClass()) |
189 | .newInstance(this.fq, this.outputPredicates, this.options); |
190 | }else{ |
191 | ret = this.getClass().getConstructor( |
192 | this.fq.getClass(), |
193 | this.outputPredicates.getClass(), |
194 | this.options.getClass()) |
195 | .newInstance(this.fq, this.outputPredicates, this.options); |
196 | } |
197 | |
198 | |
199 | for(FelixClause fc : this.allRelevantFelixClause){ |
200 | ret.registerRelevantClause(fc); |
201 | } |
202 | ret.sealDefinition(); |
203 | |
204 | |
205 | ret.clauseConstraints = (HashMap<FelixClause, HashSet<Expression>>) |
206 | this.clauseConstraints.clone(); |
207 | ret.belongsToBucket = this.belongsToBucket; |
208 | ret.isMarginal = this.isMarginal; |
209 | ret.precedence = this.precedence; |
210 | |
211 | return ret; |
212 | }catch(Exception e){ |
213 | e.printStackTrace(); |
214 | return null; |
215 | } |
216 | |
217 | } |
218 | |
219 | /** |
220 | * See {@link ConcurrentOperatorsBucket#nStartingRule}. |
221 | */ |
222 | public int nStartingRules = 0; |
223 | |
224 | /** |
225 | * Sets precedence of this statistical operator. |
226 | * @param _value |
227 | */ |
228 | public void setPrecedence(int _value){ |
229 | this.precedence = _value; |
230 | } |
231 | |
232 | /** |
233 | * Returns set of all clauses associated with the given property. |
234 | * @param col |
235 | * @param prop |
236 | * @return |
237 | */ |
238 | HashSet<FelixClause> getPropertyClausesUnion(Collection<FelixPredicate> col, FPProperty prop){ |
239 | HashSet<FelixClause> ret = new HashSet<FelixClause>(); |
240 | for(FelixPredicate fp : col){ |
241 | ret.addAll(fp.getPropertyClauses(prop)); |
242 | } |
243 | return ret; |
244 | } |
245 | |
246 | /** |
247 | * the constructor. |
248 | * @param _oriMLN |
249 | * @param _opt |
250 | */ |
251 | public StatOperator(FelixQuery _fq, HashSet<FelixPredicate> _goalPredicates, FelixCommandOptions _opt){ |
252 | this.outputPredicates.clear(); |
253 | this.outputPredicates.addAll(_goalPredicates); |
254 | options = _opt; |
255 | id = idCounter++; |
256 | fq = _fq; |
257 | this.isMarginal = _opt.marginal; |
258 | |
259 | } |
260 | |
261 | /** |
262 | * Registers clause to this statistical operator. |
263 | * @param fc |
264 | */ |
265 | public void registerRelevantClause(FelixClause fc){ |
266 | this.allRelevantFelixClause.add(fc); |
267 | } |
268 | |
269 | /** |
270 | * According to the clauses added via {@link #registerRelevantClause(FelixClause)}, |
271 | * parse {@link #outputPredicates}, {@link #inputPredicates}, {@link #nStartingRules}, |
272 | * and {@link #commonCandidate}. |
273 | * |
274 | */ |
275 | public void sealDefinition(){ |
276 | |
277 | for(FelixClause fc : this.allRelevantFelixClause){ |
278 | |
279 | int nOpen = 0; |
280 | |
281 | for(Predicate p : fc.getReferencedPredicates()){ |
282 | |
283 | if(!p.isClosedWorld() && !outputPredicates.contains(p)){ |
284 | nOpen ++; |
285 | } |
286 | |
287 | FelixPredicate fp = fq.getPredByName(p.getName()); |
288 | |
289 | if(this.outputPredicates.contains(fp)){ |
290 | continue; |
291 | } |
292 | |
293 | if(fp.isCorefMapPredicate){ |
294 | |
295 | this.inputPredicates.add(fp.getOriCorefPredicate()); |
296 | |
297 | }else{ |
298 | |
299 | if(fp.isClosedWorld() == true){ |
300 | continue; |
301 | } |
302 | |
303 | if(this.isBinaryArbLR == false){ |
304 | this.inputPredicates.add(fp); |
305 | }else{ |
306 | if(this.getPropertyClausesUnion(outputPredicates, FPProperty.NON_RECUR).contains(fc)){ |
307 | this.inputPredicates.add(fp); |
308 | } |
309 | } |
310 | } |
311 | |
312 | } |
313 | |
314 | for(Predicate p : fc.getReferencedPredicates()){ |
315 | if(!p.isClosedWorld() || fq.getPredByName(p.getName()).isCorefMapPredicate){ |
316 | this.commonCandidate.add(fq.getPredByName(p.getName())); |
317 | } |
318 | } |
319 | |
320 | if(nOpen == 0){ |
321 | this.nStartingRules ++; |
322 | } |
323 | |
324 | } |
325 | } |
326 | |
327 | |
328 | /** |
329 | * All {@link DataMovementOperator}s used in this statistical operator. |
330 | */ |
331 | protected List<DataMovementOperator> allDMOs = new ArrayList<DataMovementOperator>(); |
332 | |
333 | |
334 | /** |
335 | * Generate the operator-specified logic plan, i.e., all data movement operators |
336 | * that will be used for inference. |
337 | * |
338 | * This function should be invoked after a new instance of operator is created. |
339 | * Any valid Felix operator should implement this method. |
340 | */ |
341 | public abstract void prepare(); |
342 | |
343 | /** |
344 | * Method that executes this operator. Any valid Felix operator should implement |
345 | * this method. |
346 | */ |
347 | abstract public void run(); |
348 | |
349 | /** |
350 | * @deprecated |
351 | */ |
352 | abstract public void learn(); |
353 | |
354 | /** |
355 | * Human-readable representation of the logic plan. Any valid Felix operator should |
356 | * implement this method. |
357 | * |
358 | * TODO: need to think out a better to explain physical plan. (e.g., a graph?) |
359 | */ |
360 | abstract public String explain(); |
361 | |
362 | /** |
363 | * @deprecated |
364 | */ |
365 | public HashSet<String> throwAwayPredicatesNames = new HashSet<String>(); |
366 | |
367 | /** |
368 | * Get the target predicate of this StatOperator if this operator is |
369 | * CRF, LR or COREF. |
370 | * @return |
371 | */ |
372 | public FelixPredicate getTargetPredicateIfHasOnlyOne(){ |
373 | if(this.outputPredicates.size() != 1){ |
374 | ExceptionMan.die("Why this operator has >two or =zero output predicates?"); |
375 | } |
376 | return this.outputPredicates.iterator().next(); |
377 | } |
378 | |
379 | /** |
380 | * @deprecated |
381 | * Given a first order logic clause and a target predicate, translate it into |
382 | * a conjunctive query with properly assigned weight. |
383 | * |
384 | * @param target |
385 | * @param forceRecursive If there are multiple literals of the given predicate, |
386 | * this parameter specifies whether the generated conjunctive queries are recursive. |
387 | * CRF will set this parameter as FALSE, while COREF will set it as TRUE. |
388 | * @param props The property selected for the target predicate. |
389 | * @return |
390 | */ |
391 | public HashSet<ConjunctiveQuery> translateFelixClasesIntoLearningQueriesForVictor( |
392 | FelixPredicate target, |
393 | FPProperty... props){ |
394 | |
395 | HashSet<FelixClause> rules = new HashSet<FelixClause>(); |
396 | |
397 | for(FPProperty prop : props){ |
398 | rules.addAll(target.getPropertyClauses(prop)); |
399 | } |
400 | |
401 | HashSet<ConjunctiveQuery> ret = new HashSet<ConjunctiveQuery> (); |
402 | |
403 | FelixUIMan.println(2, 0, "{" + StringMan.join(",", FelixStringMan.colToStringArray(props)) + |
404 | "} ConjunctiveQueries for " + this.toString()); |
405 | for(FelixClause rule : rules){ |
406 | |
407 | if(this.isBinaryArbLR == true){ |
408 | rule.isBinaryLRRules = true; |
409 | } |
410 | |
411 | ConjunctiveQuery returnedCQ = rule.toLearningQueryForVictor(this, target); |
412 | ret.add(returnedCQ); |
413 | FelixUIMan.printobj(2, 0, returnedCQ); |
414 | } |
415 | |
416 | return ret; |
417 | } |
418 | |
419 | /** |
420 | * Given a first order logic clause and a target predicate, translate it into |
421 | * a conjunctive query with properly assigned weight. |
422 | * |
423 | * @param target |
424 | * @param forceRecursive If there are multiple literals of the given predicate, |
425 | * this parameter specifies whether the generated conjunctive queries are recursive. |
426 | * CRF will set this parameter as FALSE, while COREF will set it as TRUE. |
427 | * @param props The property selected for the target predicate. |
428 | * @return |
429 | */ |
430 | public HashSet<ConjunctiveQuery> translateFelixClasesIntoFactorGraphEdgeQueries( |
431 | FelixPredicate target, |
432 | boolean forceRecursive, |
433 | HashSet<String> allowedOpenPredicates, |
434 | FPProperty... props){ |
435 | |
436 | HashSet<FelixClause> rules = new HashSet<FelixClause>(); |
437 | |
438 | for(FPProperty prop : props){ |
439 | rules.addAll(target.getPropertyClauses(prop)); |
440 | } |
441 | |
442 | if(options.useDualDecomposition){ |
443 | for(FPProperty prop : props){ |
444 | if(prop == FPProperty.NON_RECUR){ |
445 | rules.addAll(this.dd_PriorClauses); |
446 | } |
447 | } |
448 | } |
449 | |
450 | HashSet<ConjunctiveQuery> ret = new HashSet<ConjunctiveQuery> (); |
451 | |
452 | FelixUIMan.println(2, 0, "{" + StringMan.join(",", FelixStringMan.colToStringArray(props)) + |
453 | "} ConjunctiveQueries for " + this.toString()); |
454 | for(FelixClause rule : rules){ |
455 | |
456 | boolean flag = false; |
457 | boolean notRelated = true; |
458 | for(Predicate fp : rule.getReferencedPredicates()){ |
459 | if(!fp.isClosedWorld() && !fp.equals(target)){ |
460 | if(!allowedOpenPredicates.contains(fp.getName())){ |
461 | flag = true; |
462 | } |
463 | |
464 | if(this.throwAwayPredicatesNames.contains(fp.getName())){ |
465 | flag = true; |
466 | } |
467 | } |
468 | |
469 | if(fp.equals(target)){ |
470 | notRelated = false; |
471 | } |
472 | } |
473 | |
474 | if(notRelated == true){ |
475 | continue; |
476 | } |
477 | |
478 | if(flag == true){ |
479 | continue; |
480 | } |
481 | |
482 | if(this.isBinaryArbLR == true){ |
483 | rule.isBinaryLRRules = true; |
484 | } |
485 | |
486 | ConjunctiveQuery returnedCQ = rule.toSimplifiedFactorGraphQuery(this, target, forceRecursive); |
487 | ret.add(returnedCQ); |
488 | FelixUIMan.printobj(2, 0, returnedCQ); |
489 | } |
490 | |
491 | return ret; |
492 | } |
493 | |
494 | /** |
495 | * Get all DMOs used by this operator. |
496 | * @return |
497 | */ |
498 | public List<DataMovementOperator> getAllDMOs(){ |
499 | return this.allDMOs; |
500 | } |
501 | |
502 | /** |
503 | * Get the precedence of this operator. |
504 | * @return |
505 | */ |
506 | public int getPrecedence(){ |
507 | return this.precedence; |
508 | } |
509 | |
510 | public String toString(){ |
511 | String ret = FelixStringMan.indentHead(); |
512 | |
513 | ret += "{" + this.type + "} "; |
514 | ret += "Operator of {"; |
515 | ret += StringMan.join(",", FelixStringMan.colToStringArray(this.outputPredicates)); |
516 | ret += "} with {" + this.allRelevantFelixClause.size() + "} Relevant Clauses"; |
517 | |
518 | if(this.clauseConstraints.size() == 0){ |
519 | ret += " Communicate { " + |
520 | StringMan.join(",", FelixStringMan.colToStringArray(this.dd_CommonOutput)) |
521 | + "}"; |
522 | return ret; |
523 | } |
524 | |
525 | ret += " Partitioned by: "; |
526 | |
527 | if(dataCrackerSignature != null){ |
528 | ret += dataCrackerSignature; |
529 | } |
530 | |
531 | ret += " Communicating { " + |
532 | StringMan.join(",", FelixStringMan.colToStringArray(this.dd_CommonOutput)) |
533 | + "}"; |
534 | |
535 | return ret; |
536 | } |
537 | |
538 | public String toNoParString(){ |
539 | String ret = FelixStringMan.indentHead(); |
540 | |
541 | ret += "{" + this.type + "} "; |
542 | ret += "Operator of {"; |
543 | ret += StringMan.join(",", FelixStringMan.colToStringArray(this.outputPredicates)); |
544 | ret += "} "; |
545 | |
546 | ret += " Communicating { " + |
547 | StringMan.join(",", FelixStringMan.colToStringArray(this.dd_CommonOutput)) |
548 | + "}"; |
549 | |
550 | return ret; |
551 | } |
552 | |
553 | //TODO: Maybe add legibility-checking of rules? |
554 | } |
555 | |
556 | |
557 | |
558 | |
559 | |
560 | |
561 | |
562 | |
563 | |
564 | |