EMMA Coverage Report (generated Sat Aug 20 11:00:51 CDT 2011)
[all classes][felix.optimizer]

COVERAGE SUMMARY FOR SOURCE FILE [Scheduler.java]

nameclass, %method, %block, %line, %
Scheduler.java100% (2/2)100% (10/10)91%  (607/666)92%  (116.2/126)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class Scheduler100% (1/1)100% (8/8)91%  (595/654)92%  (114.2/124)
parseCommonPredicates (ExecutionPlan): void 100% (1/1)78%  (111/142)81%  (17.1/21)
orderOperators (OperatorBucketGraph): ExecutionPlan 100% (1/1)84%  (147/175)85%  (34.1/40)
Scheduler (Felix, FelixQuery, FelixCommandOptions): void 100% (1/1)100% (12/12)100% (5/5)
dataDecomposition (HashSet, CostModel): OperatorBucketGraph 100% (1/1)100% (248/248)100% (40/40)
getOperatorBucketGraph (): OperatorBucketGraph 100% (1/1)100% (3/3)100% (1/1)
rank (Collection): ArrayList 100% (1/1)100% (16/16)100% (4/4)
ruleDecomposition (CostModel): HashSet 100% (1/1)100% (14/14)100% (3/3)
schedule (): ExecutionPlan 100% (1/1)100% (44/44)100% (10/10)
     
class Scheduler$1100% (1/1)100% (2/2)100% (12/12)100% (3/3)
Scheduler$1 (Scheduler): void 100% (1/1)100% (6/6)100% (2/2)
compare (ConcurrentOperatorsBucket, ConcurrentOperatorsBucket): int 100% (1/1)100% (6/6)100% (1/1)

1package felix.optimizer;
2 
3import java.util.ArrayList;
4import java.util.Collection;
5import java.util.Collections;
6import java.util.Comparator;
7import java.util.HashMap;
8import java.util.HashSet;
9 
10import tuffy.mln.Literal;
11import tuffy.mln.Predicate;
12import tuffy.ra.Expression;
13import tuffy.util.Config;
14 
15 
16import felix.dstruct.ConcurrentOperatorsBucket;
17import felix.dstruct.ExecutionPlan;
18import felix.dstruct.FelixClause;
19import felix.dstruct.FelixPredicate;
20import felix.dstruct.FelixQuery;
21import felix.dstruct.OperatorBucketGraph;
22import felix.dstruct.StatOperator;
23import felix.dstruct.StatOperator.OPType;
24import felix.main.Felix;
25import felix.parser.FelixCommandOptions;
26import felix.util.FelixUIMan;
27 
28/**
29 * The class of a scheduler, which takes as input a FelixQuery, and 
30 * outputs an physical execution plan that is ready to be fed into {@link Executor}.
31 * The
32 * output of a scheduler, i.e., {@link ExecutionPlan}, can be regarded
33 * as a description of the physical plan, which contains the execution
34 * order of different statistical operators.
35 * 
36 * <br/>
37 * TODO: + better cycle support for operator graph.
38 * 
39 * 
40 * @author Ce Zhang
41 *
42 */
43public class Scheduler {
44 
45        /**
46         * Felix object.
47         */
48        Felix parentFelix;
49        
50        /**
51         * Felix query to be scheduled.
52         */
53        FelixQuery fq;
54        
55        /**
56         * Command line options.
57         */
58        FelixCommandOptions options;
59 
60        /**
61         * Dependency graph between different operator buckets.
62         */
63        OperatorBucketGraph obg;
64        
65        /**
66         * Partition the rules into different operators.
67         * @param cm
68         * @return
69         */
70        public HashSet<StatOperator> ruleDecomposition(CostModel cm){
71                OperatorSelector os = new OperatorSelector(fq, cm, options);
72                HashSet<StatOperator> ops = os.getOperators();
73                return ops;
74        }
75        
76        /**
77         * Partition the data into different parts. This method will partition
78         * one operator returned by {@link Scheduler#ruleDecomposition} into different ones,
79         * with each of them deals with different portions of data. These operators
80         * will be put into a ConcurrentOperatorsBucket, which means they can be 
81         * executed in parallel.
82         * @param ops
83         * @param cm
84         * @return
85         */
86        public OperatorBucketGraph dataDecomposition(HashSet<StatOperator> ops, CostModel cm){
87                
88                OperatorBucketGraph obg = new OperatorBucketGraph();
89                int dpart = Config.getNumThreads()-1;
90                
91                if(Config.getNumThreads() > 1){
92                        for(StatOperator op : ops){
93                                DataCracker1991 dc = new DataCracker1991();
94                                dc.decompose(op);
95                                
96                                if(dc.isDecomposable == true && (options.decomposeTuffy || op.type != OPType.TUFFY)){
97                                        
98                                        FelixUIMan.println(0, 0, ">>> Decomposing the following operator into " + dpart + " parts:");
99                                        FelixUIMan.printobj(0, 1, op);
100                                        ConcurrentOperatorsBucket bucket = new ConcurrentOperatorsBucket(options.marginal);
101                                        
102                                        for(int i=0;i<dpart;i++){
103                                        
104                                                StatOperator newOp = op.clone();
105                                                
106                                                HashMap<Predicate, Expression> toSig = new HashMap<Predicate, Expression>();
107                                                for(FelixClause fc : op.allRelevantFelixClause){
108                                                        HashSet<Expression> e = dc.getExpressions(fc, null, dpart, i, false);
109                                                        newOp.clauseConstraints.put(fc, e);
110                                                        
111                                                        for(FelixPredicate fop : newOp.outputPredicates){
112                                                                
113                                                                Predicate _p = fop;
114                                                                if(_p.isClosedWorld() || toSig.containsKey(_p) ){
115                                                                        continue;
116                                                                }
117                                                                
118                                                                HashSet<Expression> expForClause = dc.getExpressions(null, _p, dpart, i, true);
119                                                                
120                                                                if(expForClause.size() != 0){
121                                                                        toSig.put(_p, expForClause.iterator().next());
122                                                                }
123                                                                
124                                                        }
125                                                }
126                                                newOp.dataCrackerSignature = toSig.toString();
127                                                
128                                                newOp.partitionedInto = dpart;
129                                                
130                                                FelixUIMan.printobj(2, 2, newOp);
131                                                bucket.addOperator(newOp);
132                                        }
133                                        
134                                        obg.addOperator(bucket);
135                                        
136                                }else{
137                                        
138                                        FelixUIMan.println(0, 0, ">>> The following operator is not decomposable:\n\t" + op.toString() + "\n");
139                                        
140                                        ConcurrentOperatorsBucket bucket = new ConcurrentOperatorsBucket(options.marginal);
141                                        bucket.addOperator(op);
142                                        obg.addOperator(bucket);
143                                }
144                                
145                        }
146                }else{
147                        // do not partition
148                        for(StatOperator op : ops){
149 
150                                FelixUIMan.println(0, 0, ">>> Decomposing the following operator into " + dpart + " parts:");
151                                FelixUIMan.printobj(0, 1, op);
152                                        
153                                ConcurrentOperatorsBucket bucket = new ConcurrentOperatorsBucket(options.marginal);
154                                bucket.addOperator(op);
155                                obg.addOperator(bucket);
156                        }
157                }
158                
159                obg.parseDependency();
160                
161                //FelixUIMan.println(2, 0, obg.toString());
162                
163                return obg;
164        }
165        
166        /**
167         * Returns list of operator buckets sorted by precedence.
168         * @param torank
169         * @return
170         */
171        public ArrayList<ConcurrentOperatorsBucket> rank(Collection<ConcurrentOperatorsBucket> torank){
172                ArrayList<ConcurrentOperatorsBucket> ret = new ArrayList<ConcurrentOperatorsBucket>();
173                
174                ret.addAll(torank);
175                
176                Collections.sort(ret, new Comparator<ConcurrentOperatorsBucket>(){
177 
178                        @Override
179                        public int compare(ConcurrentOperatorsBucket o1,
180                                        ConcurrentOperatorsBucket o2) {
181                                // TODO Auto-generated method stub
182                                return o1.precedence - o2.precedence;
183                        }
184                        
185                });
186                
187                
188                return ret;
189        }
190        
191        /**
192         * Schedule the order of running the operators.
193         * @param obg
194         * @return
195         */
196        public ExecutionPlan orderOperators(OperatorBucketGraph obg){
197                ExecutionPlan ep = new ExecutionPlan();
198                
199                // generate the order of execution
200                ArrayList<ConcurrentOperatorsBucket> HotList = new ArrayList<ConcurrentOperatorsBucket>();
201                HashSet<ConcurrentOperatorsBucket> notFinished = new HashSet<ConcurrentOperatorsBucket>();
202                notFinished.addAll(obg.getOperators());
203                
204                while(notFinished.isEmpty() != true){
205                        ConcurrentOperatorsBucket op = null;
206                        
207                        int lowest = 10000;
208                        int numberOfClause = 0;
209                        // pick one with lowest precedence and put it in the last
210                        for(ConcurrentOperatorsBucket aop : notFinished){
211                                if(lowest > aop.getPrecedence()){
212                                        numberOfClause = aop.nStartingRule;
213                                        lowest = aop.getPrecedence();
214                                        op = aop;
215                                }
216                                
217                                if(lowest == aop.getPrecedence() && 
218                                                aop.nStartingRule > numberOfClause){
219                                        numberOfClause = aop.nStartingRule;
220                                        lowest = aop.getPrecedence();
221                                        op = aop;
222                                }
223                        }
224                        
225                        HotList = new ArrayList<ConcurrentOperatorsBucket>();
226                        HotList.add(op);
227                        
228                        while(HotList.isEmpty() != true){
229                        
230                                // TODO: CHANGE TO A SMARTER VERSION
231                                ConcurrentOperatorsBucket current = rank(HotList).get(0);
232                                HotList.remove(0);
233                                
234                                //put in the last position
235                                ep.addOperatorAfter(current);
236                                notFinished.remove(current);
237                                
238                                //put upstream opts in HotList, which will be added after current position
239                                //in next iterations
240                                HashSet<ConcurrentOperatorsBucket> upstreams = obg.upStreams.get(current);
241                                HotList.addAll(upstreams);
242                                HotList.retainAll(notFinished);
243                                                                
244                                //put downstream operators before current position
245                                HashSet<ConcurrentOperatorsBucket> downstreams = obg.downStreams.get(current);
246                                for(ConcurrentOperatorsBucket ooo : rank(downstreams)){
247                                        if(notFinished.contains(ooo) && !HotList.contains(ooo)){
248                                                ep.addOperatorBefore(ooo);
249                                        }
250                                        if(HotList.contains(ooo) && ooo.getPrecedence() == current.getPrecedence()
251                                                        && ooo.nStartingRule < current.nStartingRule){
252                                                ep.addOperatorBefore(ooo);
253                                                HotList.remove(ooo);
254                                                notFinished.remove(ooo);
255                                        }
256                                        if(!HotList.contains(ooo)){
257                                                notFinished.remove(ooo);
258                                        }
259                                }
260 
261                        }
262                }
263                
264                return ep;
265        }
266        
267        /**
268         * Returns operator bucket graph.
269         * @return
270         */
271        public OperatorBucketGraph getOperatorBucketGraph(){
272                return obg;
273        }
274        
275        /**
276         * Parses common predicates among buckets.
277         * @param _ep
278         */
279        public void parseCommonPredicates(ExecutionPlan _ep){
280                
281                HashMap<FelixPredicate, HashSet<ConcurrentOperatorsBucket>>
282                        commonPredicates = new HashMap<FelixPredicate, HashSet<ConcurrentOperatorsBucket>>();
283                
284                for(ConcurrentOperatorsBucket cob : _ep.operators){
285                                                
286                        for(FelixPredicate fp : cob.commonCandidates){
287                                if(fp.isClosedWorld() && !fp.isCorefMap()) continue;
288                                FelixPredicate _fp = fp;
289                                //if(fp.isCorefMap()){
290                                //        _fp = fp.getOriCorefPredicate();
291                                //}
292                                if(!commonPredicates.containsKey(_fp)){
293                                        commonPredicates.put(_fp, new HashSet<ConcurrentOperatorsBucket>());
294                                }
295                                commonPredicates.get(_fp).add(cob);
296                                
297                                if(_fp.isCorefPredicate && cob.outputPredicates.contains(_fp)){
298                                        _fp = fp.corefMAPPredicate;
299                                        if(!commonPredicates.containsKey(_fp)){
300                                                commonPredicates.put(_fp, new HashSet<ConcurrentOperatorsBucket>());
301                                        }
302                                        commonPredicates.get(_fp).add(cob);
303                                }
304                                        
305                        }
306                }
307                
308                for(FelixPredicate fp : commonPredicates.keySet()){
309                        if(commonPredicates.get(fp).size() > 1){
310                                _ep.dd_CommonPredicates.add(fp);
311                        }
312                }
313                
314                _ep.dd_Predicate2OperatorBucket = commonPredicates;
315                
316                for(FelixPredicate fp : _ep.dd_CommonPredicates){
317                        for(ConcurrentOperatorsBucket cob : _ep.dd_Predicate2OperatorBucket.get(fp)){
318                                cob.addCommonOutput(fp);
319                        }
320                }
321                
322        }
323        
324        /**
325         * The entry of this optimizer.
326         * @return
327         */
328        public ExecutionPlan schedule(){
329                
330                // decompose the whole MLN into different operators
331                HashSet<StatOperator> ops = this.ruleDecomposition(null);
332 
333                // decompose each operators into smaller operators dealing
334                // with different portion of data.
335                obg = this.dataDecomposition(ops, null);
336                
337                // optimize execution plan.
338                        //note that, in the future we way want to merge 1) operator ordering and 2) DMO optimization part
339                        //into an unified optimization model. however, currently, we only worry about them separately.
340                // therefore, current assumption is, the ordering of operators does not rely on the cost model.
341                ExecutionPlan ep = this.orderOperators(obg);
342                                
343                if(options.useDualDecomposition){
344                        this.parseCommonPredicates(ep);
345                }
346                
347                FelixUIMan.println(">>> Serialized Execution Plan:");
348                FelixUIMan.printobj(0, 1, ep);
349 
350                
351                CostModel cm = new CostModel(this.parentFelix);
352                
353                ep.setCostModel(cm);
354                
355                //DMOOptimizer dmoo = new DMOOptimizer(cm);
356                //this.optimizeDMO(ep, dmoo);
357                //dmoo.close();
358        
359                return ep;
360                
361        }
362        
363        /**
364         * The constructor.
365         * @param _felix
366         * @param _fq
367         * @param _options
368         */
369        public Scheduler(Felix _felix, FelixQuery _fq, FelixCommandOptions _options){
370                parentFelix = _felix;
371                fq = _fq;
372                options = _options;
373        }
374        
375        
376        
377}
378 
379 
380 

[all classes][felix.optimizer]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov