| 1 | package felix.optimizer; |
| 2 | |
| 3 | import java.util.ArrayList; |
| 4 | import java.util.Collection; |
| 5 | import java.util.Collections; |
| 6 | import java.util.Comparator; |
| 7 | import java.util.HashMap; |
| 8 | import java.util.HashSet; |
| 9 | |
| 10 | import tuffy.mln.Literal; |
| 11 | import tuffy.mln.Predicate; |
| 12 | import tuffy.ra.Expression; |
| 13 | import tuffy.util.Config; |
| 14 | |
| 15 | |
| 16 | import felix.dstruct.ConcurrentOperatorsBucket; |
| 17 | import felix.dstruct.ExecutionPlan; |
| 18 | import felix.dstruct.FelixClause; |
| 19 | import felix.dstruct.FelixPredicate; |
| 20 | import felix.dstruct.FelixQuery; |
| 21 | import felix.dstruct.OperatorBucketGraph; |
| 22 | import felix.dstruct.StatOperator; |
| 23 | import felix.dstruct.StatOperator.OPType; |
| 24 | import felix.main.Felix; |
| 25 | import felix.parser.FelixCommandOptions; |
| 26 | import felix.util.FelixUIMan; |
| 27 | |
| 28 | /** |
| 29 | * The class of a scheduler, which takes as input a FelixQuery, and |
| 30 | * outputs an physical execution plan that is ready to be fed into {@link Executor}. |
| 31 | * The |
| 32 | * output of a scheduler, i.e., {@link ExecutionPlan}, can be regarded |
| 33 | * as a description of the physical plan, which contains the execution |
| 34 | * order of different statistical operators. |
| 35 | * |
| 36 | * <br/> |
| 37 | * TODO: + better cycle support for operator graph. |
| 38 | * |
| 39 | * |
| 40 | * @author Ce Zhang |
| 41 | * |
| 42 | */ |
| 43 | public class Scheduler { |
| 44 | |
| 45 | /** |
| 46 | * Felix object. |
| 47 | */ |
| 48 | Felix parentFelix; |
| 49 | |
| 50 | /** |
| 51 | * Felix query to be scheduled. |
| 52 | */ |
| 53 | FelixQuery fq; |
| 54 | |
| 55 | /** |
| 56 | * Command line options. |
| 57 | */ |
| 58 | FelixCommandOptions options; |
| 59 | |
| 60 | /** |
| 61 | * Dependency graph between different operator buckets. |
| 62 | */ |
| 63 | OperatorBucketGraph obg; |
| 64 | |
| 65 | /** |
| 66 | * Partition the rules into different operators. |
| 67 | * @param cm |
| 68 | * @return |
| 69 | */ |
| 70 | public HashSet<StatOperator> ruleDecomposition(CostModel cm){ |
| 71 | OperatorSelector os = new OperatorSelector(fq, cm, options); |
| 72 | HashSet<StatOperator> ops = os.getOperators(); |
| 73 | return ops; |
| 74 | } |
| 75 | |
| 76 | /** |
| 77 | * Partition the data into different parts. This method will partition |
| 78 | * one operator returned by {@link Scheduler#ruleDecomposition} into different ones, |
| 79 | * with each of them deals with different portions of data. These operators |
| 80 | * will be put into a ConcurrentOperatorsBucket, which means they can be |
| 81 | * executed in parallel. |
| 82 | * @param ops |
| 83 | * @param cm |
| 84 | * @return |
| 85 | */ |
| 86 | public OperatorBucketGraph dataDecomposition(HashSet<StatOperator> ops, CostModel cm){ |
| 87 | |
| 88 | OperatorBucketGraph obg = new OperatorBucketGraph(); |
| 89 | int dpart = Config.getNumThreads()-1; |
| 90 | |
| 91 | if(Config.getNumThreads() > 1){ |
| 92 | for(StatOperator op : ops){ |
| 93 | DataCracker1991 dc = new DataCracker1991(); |
| 94 | dc.decompose(op); |
| 95 | |
| 96 | if(dc.isDecomposable == true && (options.decomposeTuffy || op.type != OPType.TUFFY)){ |
| 97 | |
| 98 | FelixUIMan.println(0, 0, ">>> Decomposing the following operator into " + dpart + " parts:"); |
| 99 | FelixUIMan.printobj(0, 1, op); |
| 100 | ConcurrentOperatorsBucket bucket = new ConcurrentOperatorsBucket(options.marginal); |
| 101 | |
| 102 | for(int i=0;i<dpart;i++){ |
| 103 | |
| 104 | StatOperator newOp = op.clone(); |
| 105 | newOp.dc = dc; |
| 106 | |
| 107 | HashMap<Predicate, Expression> toSig = new HashMap<Predicate, Expression>(); |
| 108 | for(FelixClause fc : op.allRelevantFelixClause){ |
| 109 | |
| 110 | newOp.dPart = dpart; |
| 111 | newOp.nPart = i; |
| 112 | |
| 113 | HashSet<Expression> e = dc.getExpressions(fc, null, dpart, i, false); |
| 114 | newOp.clauseConstraints.put(fc, e); |
| 115 | |
| 116 | for(FelixPredicate fop : newOp.outputPredicates){ |
| 117 | |
| 118 | Predicate _p = fop; |
| 119 | if(_p.isClosedWorld() || toSig.containsKey(_p) ){ |
| 120 | continue; |
| 121 | } |
| 122 | |
| 123 | HashSet<Expression> expForClause = dc.getExpressions(null, _p, dpart, i, true); |
| 124 | |
| 125 | if(expForClause.size() != 0){ |
| 126 | toSig.put(_p, expForClause.iterator().next()); |
| 127 | } |
| 128 | |
| 129 | } |
| 130 | } |
| 131 | newOp.dataCrackerSignature = toSig.toString(); |
| 132 | |
| 133 | newOp.partitionedInto = dpart; |
| 134 | |
| 135 | FelixUIMan.printobj(2, 2, newOp); |
| 136 | bucket.addOperator(newOp); |
| 137 | } |
| 138 | |
| 139 | obg.addOperator(bucket); |
| 140 | |
| 141 | }else{ |
| 142 | |
| 143 | FelixUIMan.println(0, 0, ">>> The following operator is not decomposable:\n\t" + op.toString() + "\n"); |
| 144 | |
| 145 | ConcurrentOperatorsBucket bucket = new ConcurrentOperatorsBucket(options.marginal); |
| 146 | bucket.addOperator(op); |
| 147 | obg.addOperator(bucket); |
| 148 | } |
| 149 | |
| 150 | } |
| 151 | }else{ |
| 152 | // do not partition |
| 153 | for(StatOperator op : ops){ |
| 154 | |
| 155 | FelixUIMan.println(0, 0, ">>> Decomposing the following operator into " + dpart + " parts:"); |
| 156 | FelixUIMan.printobj(0, 1, op); |
| 157 | |
| 158 | ConcurrentOperatorsBucket bucket = new ConcurrentOperatorsBucket(options.marginal); |
| 159 | bucket.addOperator(op); |
| 160 | obg.addOperator(bucket); |
| 161 | } |
| 162 | } |
| 163 | |
| 164 | obg.parseDependency(); |
| 165 | |
| 166 | //FelixUIMan.println(2, 0, obg.toString()); |
| 167 | |
| 168 | return obg; |
| 169 | } |
| 170 | |
| 171 | /** |
| 172 | * Returns list of operator buckets sorted by precedence. |
| 173 | * @param torank |
| 174 | * @return |
| 175 | */ |
| 176 | public ArrayList<ConcurrentOperatorsBucket> rank(Collection<ConcurrentOperatorsBucket> torank){ |
| 177 | ArrayList<ConcurrentOperatorsBucket> ret = new ArrayList<ConcurrentOperatorsBucket>(); |
| 178 | |
| 179 | ret.addAll(torank); |
| 180 | |
| 181 | Collections.sort(ret, new Comparator<ConcurrentOperatorsBucket>(){ |
| 182 | |
| 183 | @Override |
| 184 | public int compare(ConcurrentOperatorsBucket o1, |
| 185 | ConcurrentOperatorsBucket o2) { |
| 186 | // TODO Auto-generated method stub |
| 187 | return o1.precedence - o2.precedence; |
| 188 | } |
| 189 | |
| 190 | }); |
| 191 | |
| 192 | |
| 193 | return ret; |
| 194 | } |
| 195 | |
| 196 | /** |
| 197 | * Schedule the order of running the operators. |
| 198 | * @param obg |
| 199 | * @return |
| 200 | */ |
| 201 | public ExecutionPlan orderOperators(OperatorBucketGraph obg){ |
| 202 | ExecutionPlan ep = new ExecutionPlan(); |
| 203 | |
| 204 | // generate the order of execution |
| 205 | ArrayList<ConcurrentOperatorsBucket> HotList = new ArrayList<ConcurrentOperatorsBucket>(); |
| 206 | HashSet<ConcurrentOperatorsBucket> notFinished = new HashSet<ConcurrentOperatorsBucket>(); |
| 207 | notFinished.addAll(obg.getOperators()); |
| 208 | |
| 209 | while(notFinished.isEmpty() != true){ |
| 210 | ConcurrentOperatorsBucket op = null; |
| 211 | |
| 212 | int lowest = 10000; |
| 213 | int numberOfClause = 0; |
| 214 | // pick one with lowest precedence and put it in the last |
| 215 | for(ConcurrentOperatorsBucket aop : notFinished){ |
| 216 | if(lowest > aop.getPrecedence()){ |
| 217 | numberOfClause = aop.nStartingRule; |
| 218 | lowest = aop.getPrecedence(); |
| 219 | op = aop; |
| 220 | } |
| 221 | |
| 222 | if(lowest == aop.getPrecedence() && |
| 223 | aop.nStartingRule > numberOfClause){ |
| 224 | numberOfClause = aop.nStartingRule; |
| 225 | lowest = aop.getPrecedence(); |
| 226 | op = aop; |
| 227 | } |
| 228 | } |
| 229 | |
| 230 | HotList = new ArrayList<ConcurrentOperatorsBucket>(); |
| 231 | HotList.add(op); |
| 232 | |
| 233 | while(HotList.isEmpty() != true){ |
| 234 | |
| 235 | // TODO: CHANGE TO A SMARTER VERSION |
| 236 | ConcurrentOperatorsBucket current = rank(HotList).get(0); |
| 237 | HotList.remove(0); |
| 238 | |
| 239 | //put in the last position |
| 240 | ep.addOperatorAfter(current); |
| 241 | notFinished.remove(current); |
| 242 | |
| 243 | //put upstream opts in HotList, which will be added after current position |
| 244 | //in next iterations |
| 245 | HashSet<ConcurrentOperatorsBucket> upstreams = obg.upStreams.get(current); |
| 246 | HotList.addAll(upstreams); |
| 247 | HotList.retainAll(notFinished); |
| 248 | |
| 249 | //put downstream operators before current position |
| 250 | HashSet<ConcurrentOperatorsBucket> downstreams = obg.downStreams.get(current); |
| 251 | for(ConcurrentOperatorsBucket ooo : rank(downstreams)){ |
| 252 | if(notFinished.contains(ooo) && !HotList.contains(ooo)){ |
| 253 | ep.addOperatorBefore(ooo); |
| 254 | } |
| 255 | if(HotList.contains(ooo) && ooo.getPrecedence() == current.getPrecedence() |
| 256 | && ooo.nStartingRule < current.nStartingRule){ |
| 257 | ep.addOperatorBefore(ooo); |
| 258 | HotList.remove(ooo); |
| 259 | notFinished.remove(ooo); |
| 260 | } |
| 261 | if(!HotList.contains(ooo)){ |
| 262 | notFinished.remove(ooo); |
| 263 | } |
| 264 | } |
| 265 | |
| 266 | } |
| 267 | } |
| 268 | |
| 269 | return ep; |
| 270 | } |
| 271 | |
| 272 | /** |
| 273 | * Returns operator bucket graph. |
| 274 | * @return |
| 275 | */ |
| 276 | public OperatorBucketGraph getOperatorBucketGraph(){ |
| 277 | return obg; |
| 278 | } |
| 279 | |
| 280 | /** |
| 281 | * Parses common predicates among buckets. |
| 282 | * @param _ep |
| 283 | */ |
| 284 | public void parseCommonPredicates(ExecutionPlan _ep){ |
| 285 | |
| 286 | HashMap<FelixPredicate, HashSet<ConcurrentOperatorsBucket>> |
| 287 | commonPredicates = new HashMap<FelixPredicate, HashSet<ConcurrentOperatorsBucket>>(); |
| 288 | |
| 289 | for(ConcurrentOperatorsBucket cob : _ep.operators){ |
| 290 | |
| 291 | for(FelixPredicate fp : cob.commonCandidates){ |
| 292 | if(fp.isClosedWorld() && !fp.isCorefMap()) continue; |
| 293 | FelixPredicate _fp = fp; |
| 294 | //if(fp.isCorefMap()){ |
| 295 | // _fp = fp.getOriCorefPredicate(); |
| 296 | //} |
| 297 | if(!commonPredicates.containsKey(_fp)){ |
| 298 | commonPredicates.put(_fp, new HashSet<ConcurrentOperatorsBucket>()); |
| 299 | } |
| 300 | commonPredicates.get(_fp).add(cob); |
| 301 | |
| 302 | if(_fp.isCorefPredicate && cob.outputPredicates.contains(_fp)){ |
| 303 | _fp = fp.corefMAPPredicate; |
| 304 | if(!commonPredicates.containsKey(_fp)){ |
| 305 | commonPredicates.put(_fp, new HashSet<ConcurrentOperatorsBucket>()); |
| 306 | } |
| 307 | commonPredicates.get(_fp).add(cob); |
| 308 | } |
| 309 | |
| 310 | } |
| 311 | } |
| 312 | |
| 313 | for(FelixPredicate fp : commonPredicates.keySet()){ |
| 314 | if(commonPredicates.get(fp).size() > 1){ |
| 315 | _ep.dd_CommonPredicates.add(fp); |
| 316 | } |
| 317 | } |
| 318 | |
| 319 | _ep.dd_Predicate2OperatorBucket = commonPredicates; |
| 320 | |
| 321 | for(FelixPredicate fp : _ep.dd_CommonPredicates){ |
| 322 | for(ConcurrentOperatorsBucket cob : _ep.dd_Predicate2OperatorBucket.get(fp)){ |
| 323 | cob.addCommonOutput(fp); |
| 324 | } |
| 325 | } |
| 326 | |
| 327 | } |
| 328 | |
| 329 | /** |
| 330 | * The entry of this optimizer. |
| 331 | * @return |
| 332 | */ |
| 333 | public ExecutionPlan schedule(){ |
| 334 | |
| 335 | // decompose the whole MLN into different operators |
| 336 | HashSet<StatOperator> ops = this.ruleDecomposition(null); |
| 337 | |
| 338 | // decompose each operators into smaller operators dealing |
| 339 | // with different portion of data. |
| 340 | obg = this.dataDecomposition(ops, null); |
| 341 | |
| 342 | // optimize execution plan. |
| 343 | //note that, in the future we way want to merge 1) operator ordering and 2) DMO optimization part |
| 344 | //into an unified optimization model. however, currently, we only worry about them separately. |
| 345 | // therefore, current assumption is, the ordering of operators does not rely on the cost model. |
| 346 | ExecutionPlan ep = this.orderOperators(obg); |
| 347 | |
| 348 | if(options.useDualDecomposition){ |
| 349 | this.parseCommonPredicates(ep); |
| 350 | } |
| 351 | |
| 352 | FelixUIMan.println(">>> Serialized Execution Plan:"); |
| 353 | FelixUIMan.printobj(0, 1, ep); |
| 354 | |
| 355 | |
| 356 | CostModel cm = new CostModel(this.parentFelix); |
| 357 | |
| 358 | ep.setCostModel(cm); |
| 359 | |
| 360 | //DMOOptimizer dmoo = new DMOOptimizer(cm); |
| 361 | //this.optimizeDMO(ep, dmoo); |
| 362 | //dmoo.close(); |
| 363 | |
| 364 | return ep; |
| 365 | |
| 366 | } |
| 367 | |
| 368 | /** |
| 369 | * The constructor. |
| 370 | * @param _felix |
| 371 | * @param _fq |
| 372 | * @param _options |
| 373 | */ |
| 374 | public Scheduler(Felix _felix, FelixQuery _fq, FelixCommandOptions _options){ |
| 375 | parentFelix = _felix; |
| 376 | fq = _fq; |
| 377 | options = _options; |
| 378 | } |
| 379 | |
| 380 | |
| 381 | |
| 382 | } |
| 383 | |
| 384 | |
| 385 | |