EMMA Coverage Report (generated Tue Aug 23 05:57:12 CDT 2011)
[all classes][felix.dstruct]

COVERAGE SUMMARY FOR SOURCE FILE [ConcurrentOperatorsBucket.java]

nameclass, %method, %block, %line, %
ConcurrentOperatorsBucket.java100% (1/1)88%  (15/17)65%  (883/1358)69%  (171.4/247)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ConcurrentOperatorsBucket100% (1/1)88%  (15/17)65%  (883/1358)69%  (171.4/247)
dumpMapAnswerForPredicate (RDB, String, FelixPredicate): void 0%   (0/1)0%   (0/195)0%   (0/32)
toNoParString (): String 0%   (0/1)0%   (0/34)0%   (0/5)
runNextOperatorInBucket (): void 100% (1/1)28%  (5/18)40%  (2/5)
prepareDB44DD (): void 100% (1/1)54%  (258/481)57%  (38.4/67)
myJoin (): void 100% (1/1)92%  (37/40)83%  (10/12)
cleanUp (): void 100% (1/1)96%  (106/110)85%  (17/20)
run (): void 100% (1/1)99%  (249/252)96%  (52/54)
ConcurrentOperatorsBucket (boolean): void 100% (1/1)100% (76/76)100% (21/21)
addCommonOutput (FelixPredicate): void 100% (1/1)100% (23/23)100% (4/4)
addMLNRelTable (String): void 100% (1/1)100% (6/6)100% (2/2)
addOperator (StatOperator): void 100% (1/1)100% (58/58)100% (12/12)
getOperators (): HashSet 100% (1/1)100% (3/3)100% (1/1)
getPrecedence (): int 100% (1/1)100% (3/3)100% (1/1)
isMarginal (): boolean 100% (1/1)100% (3/3)100% (1/1)
pushPredicateScopes (HashSet): void 100% (1/1)100% (18/18)100% (3/3)
setNCore (int): void 100% (1/1)100% (4/4)100% (2/2)
toString (): String 100% (1/1)100% (34/34)100% (5/5)

1package felix.dstruct;
2 
3 
4import java.io.BufferedWriter;
5import java.io.FileInputStream;
6import java.sql.ResultSet;
7import java.util.ArrayList;
8import java.util.HashMap;
9import java.util.HashSet;
10import java.util.concurrent.ExecutorService;
11import java.util.concurrent.Executors;
12 
13import felix.dstruct.StatOperator.OPType;
14import felix.main.Felix;
15import felix.parser.FelixCommandOptions;
16import felix.society.TaskList;
17import felix.society.TaskSet;
18import felix.task.ExecuteOperatorTask;
19import felix.task.OptimizeDMOTask;
20import felix.util.FelixConfig;
21 
22 
23import tuffy.db.RDB;
24import tuffy.infer.DataMover;
25import tuffy.mln.Clause;
26import tuffy.mln.Literal;
27import tuffy.mln.MarkovLogicNetwork;
28import tuffy.mln.Predicate;
29import tuffy.mln.Term;
30import tuffy.mln.Type;
31import tuffy.ra.ConjunctiveQuery;
32import tuffy.ra.Expression;
33import tuffy.util.Config;
34import tuffy.util.ExceptionMan;
35import tuffy.util.FileMan;
36import tuffy.util.StringMan;
37import tuffy.util.UIMan;
38 
39 
40/**
41 * An object of ConcurrentOperatorsBucket contains 
42 * multiple statistical operators that can be executed in
43 * parallel. ConcurrentOperatorsBucket is the basic unit
44 * of the {@link ExecutionPlan}.
45 */
46public class ConcurrentOperatorsBucket extends Thread{
47 
48        /**
49         * Set of statistical operators in this bucket.
50         */
51        HashSet<StatOperator> concurrentOperators = new HashSet<StatOperator>();
52        
53        /**
54         * When executing this bucket, this variable records the remaining operators.
55         */
56        ArrayList<StatOperator> operatorStacks = new ArrayList<StatOperator>();
57        
58        /**
59         * Predicates used as inputs.
60         */
61        public HashSet<FelixPredicate> inputPredicates = new HashSet<FelixPredicate>();
62        
63        /**
64         * Auxiliary structure for clearing the old data table and
65         * merge the output of statistical operators in this bucket.
66         * This is used for not clearing a table twice.
67         */
68        public HashSet<String> cleared = new HashSet<String>();
69        
70        /**
71         * Set of predicates that can potentially share with others (through
72         * dual decomposition).
73         */
74        public HashSet<FelixPredicate> commonCandidates = new HashSet<FelixPredicate>();
75        
76        /**
77         * Predicates output by this bucket.
78         */
79        public HashSet<FelixPredicate> outputPredicates = new HashSet<FelixPredicate>();
80        
81        /**
82         * If this bucket is a tuffy bucket, this set contains the result table 
83         * name of their corresponding MLN.
84         */
85        HashSet<String> subTuffyMLNRelTable = new HashSet<String>();
86        
87        /**
88         * Register a table in {@link ConcurrentOperatorsBucket#subTuffyMLNRelTable}.
89         * @param tableName
90         */
91        public void addMLNRelTable(String tableName){
92                this.subTuffyMLNRelTable.add(tableName);
93        }
94        
95        /**
96         * Predicates as input.
97         */
98        public HashSet<String> inputPredicateScope = new HashSet<String>();
99        
100        /**
101         * Adds input predicate scopes.
102         * @param _predicates
103         * @deprecated
104         */
105        public void pushPredicateScopes(HashSet<FelixPredicate> _predicates){
106                for(FelixPredicate fp : _predicates){
107                        this.inputPredicateScope.add(fp.getName());
108                }
109        }
110        
111        /**
112         * Degree of concurrent running of operators.
113         * @deprecated
114         */
115        int nCore = 1;
116                
117        /**
118         * MAP/marginal.
119         */
120        boolean isMarginal = false;
121        
122        /**
123         * Whether this bucket has ran its first operator. This variable is used to
124         * control some consistency issues.
125         * @deprecated
126         */
127        boolean started = false;
128        
129        /**
130         * ID of this bucket.
131         */
132        public int id = -1;
133        
134        /**
135         * Type of operators in this bucket. The construction of buckets
136         * ensures all operators in it are with the same type.
137         */
138        public OPType type;
139        
140        /**
141         * Precedence of this bucket. The construction of buckets
142         * ensures all operators in it are with the same precedence.
143         */
144        public int precedence = -1;
145        
146        /**
147         * Command line options.
148         */
149        FelixCommandOptions options;
150        
151        
152        /**
153         * Whether this bucket runs in marginal mode.
154         * @return
155         */
156        public boolean isMarginal(){
157                return this.isMarginal;
158        }
159        
160        /**
161         * The constructor.
162         * @param isMarginal
163         */
164        public ConcurrentOperatorsBucket(boolean isMarginal){
165                this.isMarginal = isMarginal;
166        }
167        
168        /**
169         * Get all operators in this bucket.
170         * @return
171         */
172        public HashSet<StatOperator> getOperators(){
173                return concurrentOperators;
174        }
175        
176        /**
177         * Get the precedence of this bucket.
178         * @return
179         */
180        public int getPrecedence(){
181                return precedence;
182        }
183        
184        /**
185         * Number of rules that only contains evidence relations.
186         * Intuitively, the larger this number, the earlier
187         * we should solve it while scheduling.
188         */
189        public int nStartingRule;
190        
191        /**
192         * Add an operator into this bucket.
193         * @param sop
194         */
195        public void addOperator(StatOperator sop){
196                this.concurrentOperators.add(sop);
197                outputPredicates.addAll(sop.outputPredicates);
198                inputPredicates.addAll(sop.inputPredicates);
199                
200                for(FelixPredicate fp : sop.outputPredicates){
201                        fp.belongsTo = this;
202                }
203                
204                this.commonCandidates.addAll(sop.commonCandidate);
205                
206                this.nStartingRule = sop.nStartingRules;
207                options = sop.options;
208                this.type = sop.type;
209                this.precedence = sop.getPrecedence();
210                sop.belongsToBucket = this;
211        }
212        
213        /**
214         * Set the degree of concurrency.
215         * @param _nCore
216         */
217        public void setNCore(int _nCore){
218                nCore = _nCore;
219        }
220        
221        /**
222         * Relations that are shared with other buckets through 
223         * dual decomposition.
224         */
225        public HashSet<FelixPredicate> dd_CommonOutput = new HashSet<FelixPredicate>();
226        
227        /**
228         * Map from relations in {@link #dd_CommonOutput}
229         * to tables containing their temporary output result in
230         * each iteration.
231         */
232        public HashMap<FelixPredicate, String> dd_commonOutputPredicate_2_tableName = 
233                                                                                        new HashMap<FelixPredicate, String>();
234        
235        /**
236         * Map from relations in {@link #dd_CommonOutput}
237         * to tables containing their corresponding Langragian
238         * Multipliers.
239         */
240        public HashMap<FelixPredicate, String> dd_commonOutputPredicate_2_priorTableName = 
241                new HashMap<FelixPredicate, String>();
242 
243        /**
244         * Adds predicate to {@link #dd_CommonOutput}.
245         * @param _fp
246         */
247        public void addCommonOutput(FelixPredicate _fp){
248                for(StatOperator sop : this.concurrentOperators){
249                        sop.dd_CommonOutput.add(_fp);
250                }
251                this.dd_CommonOutput.add(_fp);
252        }
253        
254        /**
255         * Prepares database for dual decomposition -- 1) add
256         * new rules for Lagaragian Multipliers; 2) create tables
257         * for intermediate output and priors etc..
258         */
259        public void prepareDB44DD(){
260                if(options.useDualDecomposition){
261                        
262                        RDB newDB = RDB.getRDBbyConfig(Config.db_schema);
263                        
264                        for(FelixPredicate fp : this.dd_CommonOutput){
265                                
266                                // generate output table and build the map from predicate name to table name
267                                if(fp.isCorefPredicate){
268                                        String tableName = "_dd_" + fp.getName() + "_map_of_op" +this.id + "_" + this.type;
269                                        String viewName = "_dd_" + fp.getName() + "_of_op" +this.id + "_" + this.type;
270                                        
271                                        newDB.dropTable(tableName);
272                                        newDB.dropView(viewName);
273                                        newDB.execute("CREATE TABLE " + tableName + " AS (SELECT * FROM " + fp.corefMAPPredicate.getRelName() + " WHERE 1=2)");
274                                        
275                                        this.dd_commonOutputPredicate_2_tableName.put(fp, viewName);
276                                        this.dd_commonOutputPredicate_2_tableName.put(fp.corefMAPPredicate, tableName);
277                                                                                
278                                }else{
279                                        String tableName = "_dd_" + fp.getName() + "_of_op" +this.id + "_" + this.type;
280                                        this.dd_commonOutputPredicate_2_tableName.put(fp, tableName);
281                                        
282                                        newDB.dropTable(tableName);
283                                        newDB.execute("CREATE TABLE " + tableName + " AS (SELECT * FROM " + fp.getRelName() + " WHERE 1=2)");
284                                }
285        
286                                // generate prior predicate and new rules
287                                String priorTableName = "_dd_prior_" + fp.getName() + "_of_op" + this.id + "_" + this.type;
288                                FelixPredicate tmpPredicate = new FelixPredicate(priorTableName, true);
289                                for(int i=0;i<fp.getArgs().size();i++){
290                                        tmpPredicate.appendArgument(fp.getTypeAt(i));
291                                }
292                                tmpPredicate.appendArgument(new Type("float_"));
293                                tmpPredicate.prepareDB(Felix.db);
294                                dd_commonOutputPredicate_2_priorTableName.put(fp, tmpPredicate.getRelName());
295                                
296                                Literal tmpLiteral = new Literal(tmpPredicate, false);
297                                for(int i=0;i<fp.getArgs().size();i++){
298                                        tmpLiteral.appendTerm(new Term(String.valueOf((char)('a' + i))));
299                                }
300                                tmpLiteral.appendTerm(new Term("wgt"));
301                                
302                                Literal tmpLiteral2 = new Literal(fp, true);
303                                for(int i=0;i<fp.getArgs().size();i++){
304                                        tmpLiteral2.appendTerm(new Term(String.valueOf((char)('a' + i))));
305                                }
306                                
307                                FelixClause fc = new FelixClause();
308                                fc.addLiteral(tmpLiteral);
309                                fc.addLiteral(tmpLiteral2);
310                                fc.setVarWeight("wgt");
311                                
312                                
313                                
314                                for(StatOperator sop : this.concurrentOperators){
315                                        
316                                        FelixClause fff = (FelixClause) fc.clone();
317                                        
318                                        if(sop.dc == null){
319                                                sop.dd_PriorClauses.add(fff);
320                                        }else{
321                                                HashSet<Expression> e = 
322                                                                sop.dc.getExpressions(fff, null, sop.dPart, sop.nPart, false);
323                                                sop.clauseConstraints.put(fff, e);
324                                                sop.dd_PriorClauses.add(fff);
325                                        }
326                                        
327                                        
328                                }
329                                
330                                
331                                if(fp.isCorefPredicate && this.type == OPType.COREF){
332                                        
333                                        FelixPredicate pmap = fp.corefMAPPredicate;
334                                        
335                                        priorTableName = "_dd_prior_" + pmap.getName() + "_of_op" + this.id + "_" + this.type;
336                                        tmpPredicate = new FelixPredicate(priorTableName, true);
337                                        for(int i=0;i<fp.getArgs().size();i++){
338                                                tmpPredicate.appendArgument(fp.getTypeAt(i));
339                                        }
340                                        tmpPredicate.appendArgument(new Type("float_"));
341                                        tmpPredicate.prepareDB(Felix.db);
342                                        dd_commonOutputPredicate_2_priorTableName.put(pmap, tmpPredicate.getRelName());
343                                        
344                                        tmpLiteral = new Literal(tmpPredicate, false);
345                                        for(int i=0;i<fp.getArgs().size();i++){
346                                                tmpLiteral.appendTerm(new Term(String.valueOf((char)('a' + i))));
347                                        }
348                                        tmpLiteral.appendTerm(new Term("wgt"));
349                                        
350                                        tmpLiteral2 = new Literal(fp, true);
351                                        for(int i=0;i<fp.getArgs().size();i++){
352                                                tmpLiteral2.appendTerm(new Term(String.valueOf((char)('a' + i))));
353                                        }
354                                        
355                                        fc = new FelixClause();
356                                        fc.addLiteral(tmpLiteral);
357                                        fc.addLiteral(tmpLiteral2);
358                                        fc.setVarWeight("wgt");
359                                        
360                                        for(StatOperator sop : this.concurrentOperators){
361                                                sop.dd_PriorClauses.add(fc);
362                                        }
363                                }
364                                
365                                
366                        }
367                        
368                        
369                        newDB.close();
370                        
371                        for(StatOperator sop : this.concurrentOperators){
372                                sop.dd_commonOutputPredicate_2_tableName = this.dd_commonOutputPredicate_2_tableName;
373                        }
374                        
375                }
376        }
377        
378        /**
379         * Run all operators in this bucket.
380         */
381        public void run(){
382                                
383                cleared.clear();
384                subTuffyMLNRelTable.clear();
385                
386                for(Predicate p : outputPredicates){
387                        RDB newDB = RDB.getRDBbyConfig(Config.db_schema);
388                        
389                        if(p.isCurrentlyView == false && p.isClosedWorld() == false){
390                                if(FelixConfig.isFirstRunOfDD || !this.options.useDualDecomposition){
391                                        String sql = "DELETE FROM " + p.getRelName();
392                                        newDB.execute(sql);
393                                }
394                        }
395                        if(this.type.equals(OPType.COREF)){
396                                if(!options.isDLearningMode){
397                                        if(FelixConfig.isFirstRunOfDD || !this.options.useDualDecomposition){
398                                                String sql = "DELETE FROM " + p.getRelName() + "_map";
399                                                newDB.execute(sql);
400                                        }
401                                }
402                        }
403                        newDB.close();
404                        //p.nextTupleID = 0;
405                }
406                
407                operatorStacks.clear();
408                operatorStacks.addAll(concurrentOperators);
409                
410                if(this.type.equals(OPType.TUFFY)){
411                        nCore = 1;
412                }
413                
414 
415                
416                if(options.useDualDecomposition){
417                        
418                        RDB newDB = RDB.getRDBbyConfig(Config.db_schema);
419                        for(FelixPredicate fp : this.dd_CommonOutput){
420                                String tableName = this.dd_commonOutputPredicate_2_tableName.get(fp);
421                                String sql = "DELETE from " + tableName;
422                                newDB.execute(sql);
423                        }
424                        newDB.close();
425                        
426                        if(this.type == OPType.TUFFY){
427                                
428                                if(FelixConfig.isFirstRunOfDD){
429                                        for(StatOperator sop : concurrentOperators){
430                                                sop.currentState = false;
431                                                sop.run();
432                                        }
433                                        this.cleanUp();
434                                }else{
435                                                                        
436                                        // second, run tuffy on the output table,
437                                        // take prior as evidences
438                                        for(StatOperator sop : concurrentOperators){
439                                                sop.currentState = false;
440                                                sop.run();
441                                        }
442                                        this.cleanUp();
443                                        
444                                }
445                                
446                        }else{
447                        
448                                ExecutorService pool = Executors.newFixedThreadPool(Config.getNumThreads());
449                                
450                                TaskList tasks = new TaskList();
451                                
452                                TaskSet taskset1 = new TaskSet();
453                                        
454                                for(StatOperator sop : concurrentOperators){
455                                        taskset1.addSubTask(new ExecuteOperatorTask(sop));
456                                }
457                                
458                                tasks.addSubTask(taskset1);
459                                try {
460                                        tasks.execute(pool);
461                                } catch (Exception e) {
462                                        e.printStackTrace();
463                                }
464                                
465                                pool.shutdown();
466                                
467                                this.cleanUp();
468                        }
469                        
470                        newDB.close();
471                        
472                }else{
473                        for(int i=0;i<nCore; i++){
474                                if(operatorStacks.size() == 0){
475                                        continue;
476                                }
477                                StatOperator sop = operatorStacks.remove(operatorStacks.size() - 1);
478                                sop.start();
479                                started = true;
480                        }
481                }
482        }
483        
484        /**
485         * Merge the output of operators into a consistent DB table 
486         * and dump them into files.
487         */
488        public void cleanUp(){
489                try{
490                        RDB newDB = RDB.getRDBbyConfig();
491                        newDB.execute("SET search_path = " + Config.db_schema);
492                        
493                        if(this.type.equals(OPType.TUFFY)){
494                                MarkovLogicNetwork tmpMLN = new MarkovLogicNetwork();
495                                tmpMLN.setDB(newDB);
496                                DataMover datamover = new DataMover(tmpMLN);
497                                
498                                for(FelixPredicate p : outputPredicates){
499                                        
500                                        if(p.isClosedWorld() == true){
501                                                continue;
502                                        }
503                                        
504                                        datamover.updateOriTable(subTuffyMLNRelTable, p);
505                                        datamover.updateOriTable(subTuffyMLNRelTable, p, "_copy_of_" + p.getRelName());
506                                }
507                                
508                                for(FelixPredicate p : this.dd_CommonOutput){
509                                        
510                                        
511                                        String tableName = this.dd_commonOutputPredicate_2_tableName.get(p);
512                                                
513                                        
514                                        newDB.execute("DELETE FROM " + this.dd_commonOutputPredicate_2_tableName.get(p));
515                                                
516                                        datamover.updateOriTable(subTuffyMLNRelTable, p, 
517                                                        this.dd_commonOutputPredicate_2_tableName.get(p));        
518 
519                                        
520                                }
521                                
522                        }
523                        
524                        newDB.close();
525                }catch(Exception e){
526                        e.printStackTrace();
527                }
528        }
529        
530        /**
531         * Run the next operator in this bucket.
532         * @deprecated
533         */
534        public synchronized void runNextOperatorInBucket(){
535                
536                if(operatorStacks.size() == 0){
537                        return;
538                }
539                
540                StatOperator sop = operatorStacks.remove(operatorStacks.size() - 1);
541                sop.start();
542        }
543        
544        /**
545         * Return only if all the operators have been executed.
546         * @deprecated
547         */
548        public void myJoin(){
549 
550                try{
551                        int goal = this.concurrentOperators.size();
552                        
553                        while(true){
554                                int ct = 0;
555                                for(StatOperator sop : this.concurrentOperators){
556                                        if(operatorStacks.contains(sop) || !started){
557                                                continue;
558                                        }
559                                        sop.join();
560                                        ct ++;
561                                }
562                                if(ct == goal){
563                                        break;
564                                }
565                        }
566                        
567                        this.cleanUp();
568                }catch(Exception e){
569                        e.printStackTrace();
570                }
571        
572        }
573        
574        /**
575         * Returns string representation of bucket.
576         */
577        public String toString(){
578 
579                String ret = "";
580                
581                for(StatOperator sop : this.concurrentOperators){
582                        ret += sop;
583                        ret += "\n";
584                }
585                
586                return ret;
587        }
588        
589        /**
590         * Returns string representation of bucket without partitioning info..
591         * @return
592         */
593        public String toNoParString(){
594 
595                String ret = "";
596                
597                for(StatOperator sop : this.concurrentOperators){
598                        ret += sop.toNoParString();
599                        ret += "\n";
600                        break;
601                }
602                
603                return ret;
604        }
605        
606        /**
607         * Output the results of this bucket.
608         * @param db
609         * @param fout
610         * @param p
611         * @deprecated
612         */
613        public void dumpMapAnswerForPredicate(RDB db, String fout, FelixPredicate p) {
614        //        spreadTruth();
615                
616                int digits = 4;
617                
618                HashMap<Long,String> cmap = db.loadIdSymbolMapFromTable();
619                try {
620                        BufferedWriter bufferedWriter = FileMan.getBufferedWriterMaybeGZ(fout);
621                        String sql;
622                        String tableName = p.getRelName();
623                        String predName = p.getName();
624                        
625                        if(this.type.equals(OPType.COREF)){
626                                tableName += "_map";
627                                predName += "_map";
628                        }
629                        
630                        sql = "SELECT * FROM " + tableName + " WHERE truth=TRUE";
631                        
632                        
633                        ResultSet rs = db.query(sql);
634                        while(rs == null){
635                                rs = db.query(sql);
636                        }
637                        while(rs.next()) {
638                                String line = "";
639                                ArrayList<String> cs = new ArrayList<String>();
640                                for(String a : p.getArgs()) {
641                                        int c = rs.getInt(a);
642                                        cs.add("\"" + StringMan.escapeJavaString(cmap.get(c)) + "\"");
643                                }
644                                line += StringMan.commaList(cs) + ")";
645                                
646                                if(isMarginal){
647                                        
648                                        if(Config.output_prolog_format){
649                                                line = "tuffyPrediction(" + UIMan.decimalRound(digits, rs.getDouble("prior")) +
650                                                        ", " + predName + "(" + line + ".";
651                                        }else{
652                                                if(!this.type.equals(OPType.COREF)){
653                                                        line = rs.getString("prior") + "\t" + predName + "(" + line;
654                                                }
655                                        }
656                                        
657                                }else{
658                                        line =  predName + "(" + line;
659                                }
660                                
661                                bufferedWriter.append(line + "\n");
662                        }
663                        rs.close();
664                        bufferedWriter.close();
665                } catch (Exception e) {
666                        ExceptionMan.handle(e);
667                }
668        }
669                
670        
671}

[all classes][felix.dstruct]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov