EMMA Coverage Report (generated Sat Aug 20 11:00:51 CDT 2011)
[all classes][felix.dstruct]

COVERAGE SUMMARY FOR SOURCE FILE [ConcurrentOperatorsBucket.java]

nameclass, %method, %block, %line, %
ConcurrentOperatorsBucket.java100% (1/1)82%  (14/17)41%  (540/1329)49%  (118.1/241)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ConcurrentOperatorsBucket100% (1/1)82%  (14/17)41%  (540/1329)49%  (118.1/241)
addCommonOutput (FelixPredicate): void 0%   (0/1)0%   (0/23)0%   (0/4)
dumpMapAnswerForPredicate (RDB, String, FelixPredicate): void 0%   (0/1)0%   (0/195)0%   (0/32)
toNoParString (): String 0%   (0/1)0%   (0/34)0%   (0/5)
prepareDB44DD (): void 100% (1/1)8%   (34/452)11%  (6.6/61)
runNextOperatorInBucket (): void 100% (1/1)28%  (5/18)40%  (2/5)
cleanUp (): void 100% (1/1)66%  (73/110)61%  (12.2/20)
run (): void 100% (1/1)74%  (186/252)73%  (39.3/54)
myJoin (): void 100% (1/1)92%  (37/40)83%  (10/12)
ConcurrentOperatorsBucket (boolean): void 100% (1/1)100% (76/76)100% (21/21)
addMLNRelTable (String): void 100% (1/1)100% (6/6)100% (2/2)
addOperator (StatOperator): void 100% (1/1)100% (58/58)100% (12/12)
getOperators (): HashSet 100% (1/1)100% (3/3)100% (1/1)
getPrecedence (): int 100% (1/1)100% (3/3)100% (1/1)
isMarginal (): boolean 100% (1/1)100% (3/3)100% (1/1)
pushPredicateScopes (HashSet): void 100% (1/1)100% (18/18)100% (3/3)
setNCore (int): void 100% (1/1)100% (4/4)100% (2/2)
toString (): String 100% (1/1)100% (34/34)100% (5/5)

1package felix.dstruct;
2 
3 
4import java.io.BufferedWriter;
5import java.io.FileInputStream;
6import java.sql.ResultSet;
7import java.util.ArrayList;
8import java.util.HashMap;
9import java.util.HashSet;
10import java.util.concurrent.ExecutorService;
11import java.util.concurrent.Executors;
12 
13import felix.dstruct.StatOperator.OPType;
14import felix.main.Felix;
15import felix.parser.FelixCommandOptions;
16import felix.society.TaskList;
17import felix.society.TaskSet;
18import felix.task.ExecuteOperatorTask;
19import felix.task.OptimizeDMOTask;
20import felix.util.FelixConfig;
21 
22 
23import tuffy.db.RDB;
24import tuffy.infer.DataMover;
25import tuffy.mln.Literal;
26import tuffy.mln.MarkovLogicNetwork;
27import tuffy.mln.Predicate;
28import tuffy.mln.Term;
29import tuffy.mln.Type;
30import tuffy.util.Config;
31import tuffy.util.ExceptionMan;
32import tuffy.util.FileMan;
33import tuffy.util.StringMan;
34import tuffy.util.UIMan;
35 
36 
37/**
38 * An object of ConcurrentOperatorsBucket contains 
39 * multiple statistical operators that can be executed in
40 * parallel. ConcurrentOperatorsBucket is the basic unit
41 * of the {@link ExecutionPlan}.
42 */
43public class ConcurrentOperatorsBucket extends Thread{
44 
45        /**
46         * Set of statistical operators in this bucket.
47         */
48        HashSet<StatOperator> concurrentOperators = new HashSet<StatOperator>();
49        
50        /**
51         * When executing this bucket, this variable records the remaining operators.
52         */
53        ArrayList<StatOperator> operatorStacks = new ArrayList<StatOperator>();
54        
55        /**
56         * Predicates used as inputs.
57         */
58        public HashSet<FelixPredicate> inputPredicates = new HashSet<FelixPredicate>();
59        
60        /**
61         * Auxiliary structure for clearing the old data table and
62         * merge the output of statistical operators in this bucket.
63         * This is used for not clearing a table twice.
64         */
65        public HashSet<String> cleared = new HashSet<String>();
66        
67        /**
68         * Set of predicates that can potentially share with others (through
69         * dual decomposition).
70         */
71        public HashSet<FelixPredicate> commonCandidates = new HashSet<FelixPredicate>();
72        
73        /**
74         * Predicates output by this bucket.
75         */
76        public HashSet<FelixPredicate> outputPredicates = new HashSet<FelixPredicate>();
77        
78        /**
79         * If this bucket is a tuffy bucket, this set contains the result table 
80         * name of their corresponding MLN.
81         */
82        HashSet<String> subTuffyMLNRelTable = new HashSet<String>();
83        
84        /**
85         * Register a table in {@link ConcurrentOperatorsBucket#subTuffyMLNRelTable}.
86         * @param tableName
87         */
88        public void addMLNRelTable(String tableName){
89                this.subTuffyMLNRelTable.add(tableName);
90        }
91        
92        /**
93         * Predicates as input.
94         */
95        public HashSet<String> inputPredicateScope = new HashSet<String>();
96        
97        /**
98         * Adds input predicate scopes.
99         * @param _predicates
100         * @deprecated
101         */
102        public void pushPredicateScopes(HashSet<FelixPredicate> _predicates){
103                for(FelixPredicate fp : _predicates){
104                        this.inputPredicateScope.add(fp.getName());
105                }
106        }
107        
108        /**
109         * Degree of concurrent running of operators.
110         * @deprecated
111         */
112        int nCore = 1;
113                
114        /**
115         * MAP/marginal.
116         */
117        boolean isMarginal = false;
118        
119        /**
120         * Whether this bucket has ran its first operator. This variable is used to
121         * control some consistency issues.
122         * @deprecated
123         */
124        boolean started = false;
125        
126        /**
127         * ID of this bucket.
128         */
129        public int id = -1;
130        
131        /**
132         * Type of operators in this bucket. The construction of buckets
133         * ensures all operators in it are with the same type.
134         */
135        public OPType type;
136        
137        /**
138         * Precedence of this bucket. The construction of buckets
139         * ensures all operators in it are with the same precedence.
140         */
141        public int precedence = -1;
142        
143        /**
144         * Command line options.
145         */
146        FelixCommandOptions options;
147        
148        /**
149         * Whether this bucket runs in marginal mode.
150         * @return
151         */
152        public boolean isMarginal(){
153                return this.isMarginal;
154        }
155        
156        /**
157         * The constructor.
158         * @param isMarginal
159         */
160        public ConcurrentOperatorsBucket(boolean isMarginal){
161                this.isMarginal = isMarginal;
162        }
163        
164        /**
165         * Get all operators in this bucket.
166         * @return
167         */
168        public HashSet<StatOperator> getOperators(){
169                return concurrentOperators;
170        }
171        
172        /**
173         * Get the precedence of this bucket.
174         * @return
175         */
176        public int getPrecedence(){
177                return precedence;
178        }
179        
180        /**
181         * Number of rules that only contains evidence relations.
182         * Intuitively, the larger this number, the earlier
183         * we should solve it while scheduling.
184         */
185        public int nStartingRule;
186        
187        /**
188         * Add an operator into this bucket.
189         * @param sop
190         */
191        public void addOperator(StatOperator sop){
192                this.concurrentOperators.add(sop);
193                outputPredicates.addAll(sop.outputPredicates);
194                inputPredicates.addAll(sop.inputPredicates);
195                
196                for(FelixPredicate fp : sop.outputPredicates){
197                        fp.belongsTo = this;
198                }
199                
200                this.commonCandidates.addAll(sop.commonCandidate);
201                
202                this.nStartingRule = sop.nStartingRules;
203                options = sop.options;
204                this.type = sop.type;
205                this.precedence = sop.getPrecedence();
206                sop.belongsToBucket = this;
207        }
208        
209        /**
210         * Set the degree of concurrency.
211         * @param _nCore
212         */
213        public void setNCore(int _nCore){
214                nCore = _nCore;
215        }
216        
217        /**
218         * Relations that are shared with other buckets through 
219         * dual decomposition.
220         */
221        public HashSet<FelixPredicate> dd_CommonOutput = new HashSet<FelixPredicate>();
222        
223        /**
224         * Map from relations in {@link #dd_CommonOutput}
225         * to tables containing their temporary output result in
226         * each iteration.
227         */
228        public HashMap<FelixPredicate, String> dd_commonOutputPredicate_2_tableName = 
229                                                                                        new HashMap<FelixPredicate, String>();
230        
231        /**
232         * Map from relations in {@link #dd_CommonOutput}
233         * to tables containing their corresponding Langragian
234         * Multipliers.
235         */
236        public HashMap<FelixPredicate, String> dd_commonOutputPredicate_2_priorTableName = 
237                new HashMap<FelixPredicate, String>();
238 
239        /**
240         * Adds predicate to {@link #dd_CommonOutput}.
241         * @param _fp
242         */
243        public void addCommonOutput(FelixPredicate _fp){
244                for(StatOperator sop : this.concurrentOperators){
245                        sop.dd_CommonOutput.add(_fp);
246                }
247                this.dd_CommonOutput.add(_fp);
248        }
249        
250        /**
251         * Prepares database for dual decomposition -- 1) add
252         * new rules for Lagaragian Multipliers; 2) create tables
253         * for intermediate output and priors etc..
254         */
255        public void prepareDB44DD(){
256                if(options.useDualDecomposition){
257                        
258                        RDB newDB = RDB.getRDBbyConfig(Config.db_schema);
259                        
260                        for(FelixPredicate fp : this.dd_CommonOutput){
261                                
262                                // generate output table and build the map from predicate name to table name
263                                if(fp.isCorefPredicate){
264                                        String tableName = "_dd_" + fp.getName() + "_map_of_op" +this.id + "_" + this.type;
265                                        String viewName = "_dd_" + fp.getName() + "_of_op" +this.id + "_" + this.type;
266                                        
267                                        newDB.dropTable(tableName);
268                                        newDB.dropView(viewName);
269                                        newDB.execute("CREATE TABLE " + tableName + " AS (SELECT * FROM " + fp.corefMAPPredicate.getRelName() + " WHERE 1=2)");
270                                        
271                                        this.dd_commonOutputPredicate_2_tableName.put(fp, viewName);
272                                        this.dd_commonOutputPredicate_2_tableName.put(fp.corefMAPPredicate, tableName);
273                                                                                
274                                }else{
275                                        String tableName = "_dd_" + fp.getName() + "_of_op" +this.id + "_" + this.type;
276                                        this.dd_commonOutputPredicate_2_tableName.put(fp, tableName);
277                                        
278                                        newDB.dropTable(tableName);
279                                        newDB.execute("CREATE TABLE " + tableName + " AS (SELECT * FROM " + fp.getRelName() + " WHERE 1=2)");
280                                }
281        
282                                // generate prior predicate and new rules
283                                String priorTableName = "_dd_prior_" + fp.getName() + "_of_op" + this.id + "_" + this.type;
284                                FelixPredicate tmpPredicate = new FelixPredicate(priorTableName, true);
285                                for(int i=0;i<fp.getArgs().size();i++){
286                                        tmpPredicate.appendArgument(fp.getTypeAt(i));
287                                }
288                                tmpPredicate.appendArgument(new Type("float_"));
289                                tmpPredicate.prepareDB(Felix.db);
290                                dd_commonOutputPredicate_2_priorTableName.put(fp, tmpPredicate.getRelName());
291                                
292                                Literal tmpLiteral = new Literal(tmpPredicate, false);
293                                for(int i=0;i<fp.getArgs().size();i++){
294                                        tmpLiteral.appendTerm(new Term(String.valueOf((char)('a' + i))));
295                                }
296                                tmpLiteral.appendTerm(new Term("wgt"));
297                                
298                                Literal tmpLiteral2 = new Literal(fp, true);
299                                for(int i=0;i<fp.getArgs().size();i++){
300                                        tmpLiteral2.appendTerm(new Term(String.valueOf((char)('a' + i))));
301                                }
302                                
303                                FelixClause fc = new FelixClause();
304                                fc.addLiteral(tmpLiteral);
305                                fc.addLiteral(tmpLiteral2);
306                                fc.setVarWeight("wgt");
307                                
308                                for(StatOperator sop : this.concurrentOperators){
309                                        sop.dd_PriorClauses.add(fc);
310                                }
311                                
312                                
313                                if(fp.isCorefPredicate && this.type == OPType.COREF){
314                                        
315                                        FelixPredicate pmap = fp.corefMAPPredicate;
316                                        
317                                        priorTableName = "_dd_prior_" + pmap.getName() + "_of_op" + this.id + "_" + this.type;
318                                        tmpPredicate = new FelixPredicate(priorTableName, true);
319                                        for(int i=0;i<fp.getArgs().size();i++){
320                                                tmpPredicate.appendArgument(fp.getTypeAt(i));
321                                        }
322                                        tmpPredicate.appendArgument(new Type("float_"));
323                                        tmpPredicate.prepareDB(Felix.db);
324                                        dd_commonOutputPredicate_2_priorTableName.put(pmap, tmpPredicate.getRelName());
325                                        
326                                        tmpLiteral = new Literal(tmpPredicate, false);
327                                        for(int i=0;i<fp.getArgs().size();i++){
328                                                tmpLiteral.appendTerm(new Term(String.valueOf((char)('a' + i))));
329                                        }
330                                        tmpLiteral.appendTerm(new Term("wgt"));
331                                        
332                                        tmpLiteral2 = new Literal(fp, true);
333                                        for(int i=0;i<fp.getArgs().size();i++){
334                                                tmpLiteral2.appendTerm(new Term(String.valueOf((char)('a' + i))));
335                                        }
336                                        
337                                        fc = new FelixClause();
338                                        fc.addLiteral(tmpLiteral);
339                                        fc.addLiteral(tmpLiteral2);
340                                        fc.setVarWeight("wgt");
341                                        
342                                        for(StatOperator sop : this.concurrentOperators){
343                                                sop.dd_PriorClauses.add(fc);
344                                        }
345                                }
346                                
347                                
348                        }
349                        
350                        
351                        newDB.close();
352                        
353                        for(StatOperator sop : this.concurrentOperators){
354                                sop.dd_commonOutputPredicate_2_tableName = this.dd_commonOutputPredicate_2_tableName;
355                        }
356                        
357                }
358        }
359        
360        /**
361         * Run all operators in this bucket.
362         */
363        public void run(){
364                                
365                cleared.clear();
366                subTuffyMLNRelTable.clear();
367                
368                for(Predicate p : outputPredicates){
369                        RDB newDB = RDB.getRDBbyConfig(Config.db_schema);
370                        
371                        if(p.isCurrentlyView == false && p.isClosedWorld() == false){
372                                if(FelixConfig.isFirstRunOfDD || !this.options.useDualDecomposition){
373                                        String sql = "DELETE FROM " + p.getRelName();
374                                        newDB.execute(sql);
375                                }
376                        }
377                        if(this.type.equals(OPType.COREF)){
378                                if(!options.isDLearningMode){
379                                        if(FelixConfig.isFirstRunOfDD || !this.options.useDualDecomposition){
380                                                String sql = "DELETE FROM " + p.getRelName() + "_map";
381                                                newDB.execute(sql);
382                                        }
383                                }
384                        }
385                        newDB.close();
386                        //p.nextTupleID = 0;
387                }
388                
389                operatorStacks.clear();
390                operatorStacks.addAll(concurrentOperators);
391                
392                if(this.type.equals(OPType.TUFFY)){
393                        nCore = 1;
394                }
395                
396 
397                
398                if(options.useDualDecomposition){
399                        
400                        RDB newDB = RDB.getRDBbyConfig(Config.db_schema);
401                        for(FelixPredicate fp : this.dd_CommonOutput){
402                                String tableName = this.dd_commonOutputPredicate_2_tableName.get(fp);
403                                String sql = "DELETE from " + tableName;
404                                newDB.execute(sql);
405                        }
406                        newDB.close();
407                        
408                        if(this.type == OPType.TUFFY){
409                                
410                                if(FelixConfig.isFirstRunOfDD){
411                                        for(StatOperator sop : concurrentOperators){
412                                                sop.currentState = false;
413                                                sop.run();
414                                        }
415                                        this.cleanUp();
416                                }else{
417                                                                        
418                                        // second, run tuffy on the output table,
419                                        // take prior as evidences
420                                        for(StatOperator sop : concurrentOperators){
421                                                sop.currentState = false;
422                                                sop.run();
423                                        }
424                                        this.cleanUp();
425                                        
426                                }
427                                
428                        }else{
429                        
430                                ExecutorService pool = Executors.newFixedThreadPool(Config.getNumThreads());
431                                
432                                TaskList tasks = new TaskList();
433                                
434                                TaskSet taskset1 = new TaskSet();
435                                        
436                                for(StatOperator sop : concurrentOperators){
437                                        taskset1.addSubTask(new ExecuteOperatorTask(sop));
438                                }
439                                
440                                tasks.addSubTask(taskset1);
441                                try {
442                                        tasks.execute(pool);
443                                } catch (Exception e) {
444                                        e.printStackTrace();
445                                }
446                                
447                                pool.shutdown();
448                                
449                                this.cleanUp();
450                        }
451                        
452                        newDB.close();
453                        
454                }else{
455                        for(int i=0;i<nCore; i++){
456                                if(operatorStacks.size() == 0){
457                                        continue;
458                                }
459                                StatOperator sop = operatorStacks.remove(operatorStacks.size() - 1);
460                                sop.start();
461                                started = true;
462                        }
463                }
464        }
465        
466        /**
467         * Merge the output of operators into a consistent DB table 
468         * and dump them into files.
469         */
470        public void cleanUp(){
471                try{
472                        RDB newDB = RDB.getRDBbyConfig();
473                        newDB.execute("SET search_path = " + Config.db_schema);
474                        
475                        if(this.type.equals(OPType.TUFFY)){
476                                MarkovLogicNetwork tmpMLN = new MarkovLogicNetwork();
477                                tmpMLN.setDB(newDB);
478                                DataMover datamover = new DataMover(tmpMLN);
479                                
480                                for(FelixPredicate p : outputPredicates){
481                                        
482                                        if(p.isClosedWorld() == true){
483                                                continue;
484                                        }
485                                        
486                                        datamover.updateOriTable(subTuffyMLNRelTable, p);
487                                        datamover.updateOriTable(subTuffyMLNRelTable, p, "_copy_of_" + p.getRelName());
488                                }
489                                
490                                for(FelixPredicate p : this.dd_CommonOutput){
491                                        
492                                        
493                                        String tableName = this.dd_commonOutputPredicate_2_tableName.get(p);
494                                                
495                                        
496                                        newDB.execute("DELETE FROM " + this.dd_commonOutputPredicate_2_tableName.get(p));
497                                                
498                                        datamover.updateOriTable(subTuffyMLNRelTable, p, 
499                                                        this.dd_commonOutputPredicate_2_tableName.get(p));        
500 
501                                        
502                                }
503                                
504                        }
505                        
506                        newDB.close();
507                }catch(Exception e){
508                        e.printStackTrace();
509                }
510        }
511        
512        /**
513         * Run the next operator in this bucket.
514         * @deprecated
515         */
516        public synchronized void runNextOperatorInBucket(){
517                
518                if(operatorStacks.size() == 0){
519                        return;
520                }
521                
522                StatOperator sop = operatorStacks.remove(operatorStacks.size() - 1);
523                sop.start();
524        }
525        
526        /**
527         * Return only if all the operators have been executed.
528         * @deprecated
529         */
530        public void myJoin(){
531 
532                try{
533                        int goal = this.concurrentOperators.size();
534                        
535                        while(true){
536                                int ct = 0;
537                                for(StatOperator sop : this.concurrentOperators){
538                                        if(operatorStacks.contains(sop) || !started){
539                                                continue;
540                                        }
541                                        sop.join();
542                                        ct ++;
543                                }
544                                if(ct == goal){
545                                        break;
546                                }
547                        }
548                        
549                        this.cleanUp();
550                }catch(Exception e){
551                        e.printStackTrace();
552                }
553        
554        }
555        
556        /**
557         * Returns string representation of bucket.
558         */
559        public String toString(){
560 
561                String ret = "";
562                
563                for(StatOperator sop : this.concurrentOperators){
564                        ret += sop;
565                        ret += "\n";
566                }
567                
568                return ret;
569        }
570        
571        /**
572         * Returns string representation of bucket without partitioning info..
573         * @return
574         */
575        public String toNoParString(){
576 
577                String ret = "";
578                
579                for(StatOperator sop : this.concurrentOperators){
580                        ret += sop.toNoParString();
581                        ret += "\n";
582                        break;
583                }
584                
585                return ret;
586        }
587        
588        /**
589         * Output the results of this bucket.
590         * @param db
591         * @param fout
592         * @param p
593         * @deprecated
594         */
595        public void dumpMapAnswerForPredicate(RDB db, String fout, FelixPredicate p) {
596        //        spreadTruth();
597                
598                int digits = 4;
599                
600                HashMap<Long,String> cmap = db.loadIdSymbolMapFromTable();
601                try {
602                        BufferedWriter bufferedWriter = FileMan.getBufferedWriterMaybeGZ(fout);
603                        String sql;
604                        String tableName = p.getRelName();
605                        String predName = p.getName();
606                        
607                        if(this.type.equals(OPType.COREF)){
608                                tableName += "_map";
609                                predName += "_map";
610                        }
611                        
612                        sql = "SELECT * FROM " + tableName + " WHERE truth=TRUE";
613                        
614                        
615                        ResultSet rs = db.query(sql);
616                        while(rs == null){
617                                rs = db.query(sql);
618                        }
619                        while(rs.next()) {
620                                String line = "";
621                                ArrayList<String> cs = new ArrayList<String>();
622                                for(String a : p.getArgs()) {
623                                        int c = rs.getInt(a);
624                                        cs.add("\"" + StringMan.escapeJavaString(cmap.get(c)) + "\"");
625                                }
626                                line += StringMan.commaList(cs) + ")";
627                                
628                                if(isMarginal){
629                                        
630                                        if(Config.output_prolog_format){
631                                                line = "tuffyPrediction(" + UIMan.decimalRound(digits, rs.getDouble("prior")) +
632                                                        ", " + predName + "(" + line + ".";
633                                        }else{
634                                                if(!this.type.equals(OPType.COREF)){
635                                                        line = rs.getString("prior") + "\t" + predName + "(" + line;
636                                                }
637                                        }
638                                        
639                                }else{
640                                        line =  predName + "(" + line;
641                                }
642                                
643                                bufferedWriter.append(line + "\n");
644                        }
645                        rs.close();
646                        bufferedWriter.close();
647                } catch (Exception e) {
648                        ExceptionMan.handle(e);
649                }
650        }
651                
652        
653}

[all classes][felix.dstruct]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov