| 1 | package felix.executor; |
| 2 | |
| 3 | import java.io.BufferedWriter; |
| 4 | import java.io.File; |
| 5 | import java.sql.ResultSet; |
| 6 | import java.util.ArrayList; |
| 7 | import java.util.HashMap; |
| 8 | import java.util.HashSet; |
| 9 | |
| 10 | import com.google.common.io.Files; |
| 11 | |
| 12 | import tuffy.db.RDB; |
| 13 | import tuffy.infer.DataMover; |
| 14 | import tuffy.mln.MarkovLogicNetwork; |
| 15 | import tuffy.mln.Predicate; |
| 16 | import tuffy.parse.CommandOptions; |
| 17 | import tuffy.util.Config; |
| 18 | import tuffy.util.ExceptionMan; |
| 19 | import tuffy.util.FileMan; |
| 20 | import tuffy.util.StringMan; |
| 21 | import tuffy.util.UIMan; |
| 22 | |
| 23 | import felix.dstruct.ConcurrentOperatorsBucket; |
| 24 | import felix.dstruct.ExecutionPlan; |
| 25 | import felix.dstruct.FelixPredicate; |
| 26 | import felix.dstruct.FelixQuery; |
| 27 | import felix.dstruct.StatOperator.OPType; |
| 28 | import felix.main.Felix; |
| 29 | import felix.optimizer.DMOOptimizer; |
| 30 | import felix.parser.FelixCommandOptions; |
| 31 | import felix.util.FelixConfig; |
| 32 | import felix.util.FelixUIMan; |
| 33 | |
| 34 | |
| 35 | |
| 36 | /** |
| 37 | * Class for executing a given physical {@link ExecutionPlan} using |
| 38 | * dual decomposition. |
| 39 | * |
| 40 | * @author Ce Zhang |
| 41 | * |
| 42 | */ |
| 43 | public class DDExecutor { |
| 44 | |
| 45 | /** |
| 46 | * The execution plan to be executed. |
| 47 | */ |
| 48 | ExecutionPlan ep; |
| 49 | |
| 50 | /** |
| 51 | * The FelixQuery used by this Executor. |
| 52 | */ |
| 53 | FelixQuery fq; |
| 54 | |
| 55 | /** |
| 56 | * The FelixCommandOptions used by this Executor. |
| 57 | */ |
| 58 | FelixCommandOptions options; |
| 59 | |
| 60 | /** |
| 61 | * The DMOOptimizer used by this Executor. |
| 62 | */ |
| 63 | public DMOOptimizer dmoo; |
| 64 | |
| 65 | /** |
| 66 | * Execute this plan. If Felix runs in |
| 67 | * explain mode, just prints out the physical plan. |
| 68 | * |
| 69 | * <p> TODO: Find a better way to the explain mode (e.g., a graph). |
| 70 | */ |
| 71 | @SuppressWarnings("unchecked") |
| 72 | public void run(){ |
| 73 | |
| 74 | try{ |
| 75 | |
| 76 | int sum = -1; |
| 77 | int nIt = FelixConfig.nDDIT; |
| 78 | |
| 79 | for(int i = ep.operators.size()-1; i >=0; i --){ |
| 80 | ep.operators.get(i).setNCore(Config.getNumThreads()); |
| 81 | ep.operators.get(i).prepareDB44DD(); |
| 82 | } |
| 83 | |
| 84 | // the first run of DD replicates the NON-DD executor. |
| 85 | // this is used to decides the scope of the downstream MLN |
| 86 | // operator. |
| 87 | FelixConfig.isFirstRunOfDD = true; |
| 88 | |
| 89 | for(int i = ep.operators.size()-1; i >=0; i --){ |
| 90 | dmoo.optimizeDMO(ep.operators.get(i)); |
| 91 | ep.operators.get(i).run(); |
| 92 | |
| 93 | } |
| 94 | |
| 95 | BufferedWriter bw = FileMan.getBufferedWriterMaybeGZ(options.fout + "_snap_0" ); |
| 96 | RDB newDB = RDB.getRDBbyConfig(Config.db_schema); |
| 97 | for(FelixPredicate fp : fq.getPredicates()){ |
| 98 | if(fp.hasQuery()){ |
| 99 | UIMan.println(">>> Dumping results for " + fp + "\n"); |
| 100 | this.dumpMapAnswerForPredicate(newDB, fp, bw); |
| 101 | } |
| 102 | } |
| 103 | bw.close(); |
| 104 | |
| 105 | File from = new File(options.fout + "_snap_" + (FelixConfig.nDDIT-nIt)); |
| 106 | File to = new File(options.fout); |
| 107 | if(to.exists()){ |
| 108 | to.delete(); |
| 109 | } |
| 110 | Files.copy(from, to); |
| 111 | |
| 112 | FelixConfig.isFirstRunOfDD = false; |
| 113 | |
| 114 | // copy state of each table |
| 115 | HashSet<FelixPredicate> sharedPreds = new HashSet<FelixPredicate>(); |
| 116 | for(int i = ep.operators.size()-1; i >=0; i --){ |
| 117 | sharedPreds.addAll(ep.operators.get(i).dd_CommonOutput); |
| 118 | if(ep.operators.get(i).type == OPType.TUFFY){ |
| 119 | sharedPreds.addAll(ep.operators.get(i).outputPredicates); |
| 120 | } |
| 121 | } |
| 122 | for(FelixPredicate fp : sharedPreds){ |
| 123 | UIMan.println(">>> Backing up table " + fp.getName()); |
| 124 | newDB.dropTable("_copy_ori_" + fp.getRelName()); |
| 125 | String sql = "CREATE TABLE _copy_ori_" + fp.getRelName() |
| 126 | + " AS SELECT * FROM " + fp.getRelName() + ";"; |
| 127 | newDB.execute(sql); |
| 128 | } |
| 129 | newDB.close(); |
| 130 | |
| 131 | |
| 132 | while(sum != 0 && nIt-- > 0 ){ |
| 133 | |
| 134 | newDB = RDB.getRDBbyConfig(Config.db_schema); |
| 135 | |
| 136 | for(FelixPredicate fp : sharedPreds){ |
| 137 | UIMan.println(">>> Restoring table " + fp.getName()); |
| 138 | newDB.dropTable(fp.getRelName()); |
| 139 | newDB.dropView(fp.getRelName()); |
| 140 | String sql = "CREATE TABLE " + fp.getRelName() |
| 141 | + " AS SELECT * FROM _copy_ori_" + fp.getRelName() + ";"; |
| 142 | newDB.execute(sql); |
| 143 | } |
| 144 | for(FelixPredicate fp : fq.getAllPred()){ |
| 145 | if(fp.isCorefPredicate && !sharedPreds.contains(fp)){ |
| 146 | newDB.dropTable(fp.getRelName()); |
| 147 | newDB.dropView(fp.getRelName()); |
| 148 | newDB.execute(fp.viewDef); |
| 149 | } |
| 150 | } |
| 151 | newDB.commit(); |
| 152 | newDB.close(); |
| 153 | |
| 154 | for(int i = ep.operators.size()-1; i >=0; i --){ |
| 155 | dmoo.optimizeDMO(ep.operators.get(i)); |
| 156 | ep.operators.get(i).run(); |
| 157 | } |
| 158 | |
| 159 | bw = FileMan.getBufferedWriterMaybeGZ(options.fout + "_snap_" + (FelixConfig.nDDIT-nIt) ); |
| 160 | newDB = RDB.getRDBbyConfig(Config.db_schema); |
| 161 | for(FelixPredicate fp : fq.getPredicates()){ |
| 162 | if(fp.hasQuery()){ |
| 163 | UIMan.println(">>> Dumping results for " + fp + "\n"); |
| 164 | this.dumpMapAnswerForPredicate(newDB, fp, bw); |
| 165 | } |
| 166 | } |
| 167 | newDB.close(); |
| 168 | bw.close(); |
| 169 | |
| 170 | from = new File(options.fout + "_snap_" + (FelixConfig.nDDIT-nIt)); |
| 171 | to = new File(options.fout); |
| 172 | if(to.exists()){ |
| 173 | to.delete(); |
| 174 | } |
| 175 | Files.copy(from, to); |
| 176 | |
| 177 | |
| 178 | |
| 179 | sum = 0; |
| 180 | |
| 181 | //CURRENTLY ASSUME ONE PREDICATE IS SHARED BY ONLY TWO OPERATORS |
| 182 | // this is ensured by Felix's compiler. |
| 183 | for(FelixPredicate fp : fq.getAllPred()){ |
| 184 | |
| 185 | ConcurrentOperatorsBucket cob1 = null; |
| 186 | ConcurrentOperatorsBucket cob2 = null; |
| 187 | |
| 188 | for(int i = ep.operators.size()-1; i >=0; i --){ |
| 189 | if(ep.operators.get(i).dd_CommonOutput.contains(fp)){ |
| 190 | if(cob1 == null){ |
| 191 | cob1 = ep.operators.get(i); |
| 192 | }else if(cob2 == null){ |
| 193 | cob2 = ep.operators.get(i); |
| 194 | }else{ |
| 195 | ExceptionMan.die("ERROR 12: " + |
| 196 | "There must be something wrong with the compiler"); |
| 197 | } |
| 198 | } |
| 199 | } |
| 200 | |
| 201 | if(cob1==null || cob2==null){ |
| 202 | continue; |
| 203 | } |
| 204 | |
| 205 | String tableName1; |
| 206 | String tableName2; |
| 207 | String priorTable1; |
| 208 | String priorTable2; |
| 209 | |
| 210 | tableName1 = cob1.dd_commonOutputPredicate_2_tableName.get(fp); |
| 211 | tableName2 = cob2.dd_commonOutputPredicate_2_tableName.get(fp); |
| 212 | |
| 213 | priorTable1 = cob1.dd_commonOutputPredicate_2_priorTableName.get(fp); |
| 214 | priorTable2 = cob2.dd_commonOutputPredicate_2_priorTableName.get(fp); |
| 215 | |
| 216 | |
| 217 | String args = StringMan.commaList(fp.getArgs()); |
| 218 | ArrayList<String> whereargsarray = new ArrayList<String>(); |
| 219 | for(String a : fp.getArgs()){ |
| 220 | whereargsarray.add("t0." + a + "=" + "t1." + a); |
| 221 | } |
| 222 | String whereargs = StringMan.join(" AND ", whereargsarray); |
| 223 | ArrayList<String> t0argsArray = new ArrayList<String>(); |
| 224 | for(String a : fp.getArgs()){ |
| 225 | t0argsArray.add("t0." + a); |
| 226 | } |
| 227 | String t0args = StringMan.commaList(t0argsArray); |
| 228 | |
| 229 | ArrayList<String> t1argsArray = new ArrayList<String>(); |
| 230 | for(String a : fp.getArgs()){ |
| 231 | t1argsArray.add("t1." + a); |
| 232 | } |
| 233 | String t1args = StringMan.commaList(t1argsArray); |
| 234 | |
| 235 | String priorArgs = StringMan.commaList(fp.getArgs()); |
| 236 | priorArgs = priorArgs + "," + "float_" + (fp.arity()+1); |
| 237 | priorArgs = "truth, club, " + priorArgs; |
| 238 | |
| 239 | if(options.marginal == true){ |
| 240 | |
| 241 | String deltaCase = "(CASE WHEN abs(t0.prior-t1.prior)<0.1 THEN " + 0 + |
| 242 | " ELSE 0.9*(t0.prior-t1.prior) END) AS prior11"; |
| 243 | String negDeltaCase = "(CASE WHEN abs(t1.prior-t0.prior)<0.1 THEN " + 0 + |
| 244 | " ELSE 0.9*(t1.prior-t0.prior) END) AS prior11"; |
| 245 | |
| 246 | |
| 247 | String sql = "(SELECT NULL::Float, TRUE, 2, " + t0args + ", " + deltaCase +" FROM " + tableName1 + " t0, " + tableName2 + " t1 " + " WHERE " |
| 248 | + whereargs+")"; |
| 249 | |
| 250 | sql = sql + " UNION ALL " + "(SELECT NULL::Float, TRUE, 2, " + t0args + ", t0.prior FROM " + tableName1 + " t0 " + |
| 251 | " WHERE (" + t0args + ") NOT IN (SELECT " + t1args + " FROM " + tableName2 + " t1) AND t0.prior>0.1)"; |
| 252 | |
| 253 | sql = "INSERT INTO " + priorTable2 + "(prior, " + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt WHERE nt.prior11<>0"; |
| 254 | Felix.db.update(sql); |
| 255 | |
| 256 | |
| 257 | UIMan.println(">>> INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob2); |
| 258 | |
| 259 | sql = "(SELECT NULL::Float, TRUE, 2, " + t0args + "," + negDeltaCase + " FROM " + tableName1 + " t0, " + tableName2 + " t1 " + " WHERE " |
| 260 | + whereargs+")"; |
| 261 | |
| 262 | sql = sql + " UNION ALL " + "(SELECT NULL::Float, TRUE, 2, " + t0args + ", -t0.prior FROM " + tableName2 + " t0 " + |
| 263 | " WHERE (" + t0args + ") NOT IN (SELECT " + t1args + " FROM " + tableName1 + " t1) AND t0.prior>0.1)"; |
| 264 | |
| 265 | sql = "INSERT INTO " + priorTable1 + "(prior, " + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt WHERE nt.prior11<>0"; |
| 266 | Felix.db.update(sql); |
| 267 | UIMan.println(">>> INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob1); |
| 268 | |
| 269 | sum += Felix.db.getLastUpdateRowCount(); |
| 270 | |
| 271 | }else{ |
| 272 | String sql = "SELECT NULL::Float, TRUE, 2, " + args + ", " + (0.9) + " FROM " + tableName1 + " WHERE (" |
| 273 | + args + ") NOT IN ( SELECT " + args + " FROM " + tableName2 + ")" |
| 274 | + " UNION ALL " + |
| 275 | "SELECT NULL::Float, TRUE, 2, " + args + ", " + (-0.9) + " FROM " + tableName2 + " WHERE (" |
| 276 | + args + ") NOT IN ( SELECT " + args + " FROM " + tableName1 + ")"; |
| 277 | sql = "INSERT INTO " + priorTable2 + "(prior, " + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt"; |
| 278 | Felix.db.update(sql); |
| 279 | UIMan.println(">>> INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob2); |
| 280 | |
| 281 | sql = "SELECT NULL::Float, TRUE, 2, " + args + ", " + (-0.9) + " FROM " + tableName1 + " WHERE (" |
| 282 | + args + ") NOT IN ( SELECT " + args + " FROM " + tableName2 + ")" |
| 283 | + " UNION ALL " + |
| 284 | "SELECT NULL::Float, TRUE, 2, " + args + ", " + (0.9) + " FROM " + tableName2 + " WHERE (" |
| 285 | + args + ") NOT IN ( SELECT " + args + " FROM " + tableName1 + ")"; |
| 286 | sql = "INSERT INTO " + priorTable1 + "(prior, " + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt"; |
| 287 | Felix.db.update(sql); |
| 288 | UIMan.println(">>> INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob1); |
| 289 | |
| 290 | sum += Felix.db.getLastUpdateRowCount(); |
| 291 | |
| 292 | } |
| 293 | } |
| 294 | |
| 295 | Felix.db.commit(); |
| 296 | |
| 297 | } |
| 298 | |
| 299 | |
| 300 | }catch(Exception e){ |
| 301 | e.printStackTrace(); |
| 302 | } |
| 303 | |
| 304 | |
| 305 | } |
| 306 | |
| 307 | /** |
| 308 | * The constructor. |
| 309 | * @param _ep |
| 310 | */ |
| 311 | public DDExecutor(ExecutionPlan _ep, FelixQuery _fq, FelixCommandOptions _options){ |
| 312 | ep = _ep; |
| 313 | dmoo = new DMOOptimizer(ep.getCostModel()); |
| 314 | fq = _fq; |
| 315 | options = _options; |
| 316 | } |
| 317 | |
| 318 | /** |
| 319 | * Output the results of this bucket. |
| 320 | * @param db |
| 321 | * @param fout |
| 322 | * @param p |
| 323 | */ |
| 324 | public void dumpMapAnswerForPredicate(RDB db, FelixPredicate p, BufferedWriter bufferedWriter) { |
| 325 | // spreadTruth(); |
| 326 | HashMap<Long,String> cmap = db.loadIdSymbolMapFromTable(); |
| 327 | try { |
| 328 | |
| 329 | int digits = 4; |
| 330 | |
| 331 | String sql; |
| 332 | String tableName = p.getRelName(); |
| 333 | String predName = p.getName(); |
| 334 | |
| 335 | if(options.useDualDecomposition){ |
| 336 | //sql = "SELECT * FROM " + tableName + " WHERE truth=TRUE ORDER BY prior DESC"; |
| 337 | if(!p.isCorefMapPredicate){ |
| 338 | if(p.belongsTo != null && p.belongsTo.dd_commonOutputPredicate_2_tableName.containsKey(p)){ |
| 339 | sql = "SELECT * FROM " + p.belongsTo.dd_commonOutputPredicate_2_tableName.get(p) + " WHERE truth=TRUE ORDER BY prior DESC"; |
| 340 | }else{ |
| 341 | sql = "SELECT * FROM " + p.getRelName() + " WHERE truth=TRUE ORDER BY prior DESC"; |
| 342 | } |
| 343 | }else{ |
| 344 | sql = "SELECT * FROM " + tableName + " ORDER BY prior DESC"; |
| 345 | } |
| 346 | }else{ |
| 347 | sql = "SELECT * FROM " + tableName + " WHERE truth=TRUE ORDER BY prior DESC"; |
| 348 | } |
| 349 | |
| 350 | ResultSet rs = db.query(sql); |
| 351 | while(rs == null){ |
| 352 | rs = db.query(sql); |
| 353 | } |
| 354 | while(rs.next()) { |
| 355 | String line = predName + "("; |
| 356 | ArrayList<String> cs = new ArrayList<String>(); |
| 357 | int ct = 0; |
| 358 | for(String a : p.getArgs()) { |
| 359 | |
| 360 | |
| 361 | if(p.getTypeAt(ct).isProbArg == true || p.getTypeAt(ct).isNonSymbolicType()){ |
| 362 | cs.add(rs.getDouble(a)+""); |
| 363 | }else{ |
| 364 | long c = rs.getLong(a); |
| 365 | |
| 366 | |
| 367 | String v = StringMan.escapeJavaString(cmap.get(c)); |
| 368 | |
| 369 | //if(v.matches("^[0-9].*$") && !StringMan.escapeJavaString(v).contains(" ")){ |
| 370 | // cs.add("" + StringMan.escapeJavaString(v) + ""); |
| 371 | //}else{ |
| 372 | cs.add("\"" + StringMan.escapeJavaString(v) + "\""); |
| 373 | //} |
| 374 | } |
| 375 | ct ++; |
| 376 | } |
| 377 | line += StringMan.commaList(cs) + ")"; |
| 378 | |
| 379 | double prior = 1; |
| 380 | if(options.marginal){ |
| 381 | |
| 382 | double prob; |
| 383 | if(rs.getString("prior") == null){ |
| 384 | prob = 1; |
| 385 | }else{ |
| 386 | prob = Double.valueOf(rs.getString("prior")); |
| 387 | } |
| 388 | |
| 389 | if(Config.output_prolog_format){ |
| 390 | |
| 391 | line = "tuffyPrediction(" + UIMan.decimalRound(digits, prob) + |
| 392 | ", " + line + ")."; |
| 393 | }else{ |
| 394 | line = UIMan.decimalRound(digits, prob) + "\t" + line; |
| 395 | |
| 396 | } |
| 397 | |
| 398 | }else{ |
| 399 | line = line; |
| 400 | } |
| 401 | |
| 402 | if(prior >= options.minProb){ |
| 403 | bufferedWriter.append(line + "\n"); |
| 404 | } |
| 405 | |
| 406 | } |
| 407 | rs.close(); |
| 408 | //bufferedWriter.close(); |
| 409 | } catch (Exception e) { |
| 410 | ExceptionMan.handle(e); |
| 411 | } |
| 412 | } |
| 413 | } |
| 414 | |
| 415 | |
| 416 | |
| 417 | |