EMMA Coverage Report (generated Tue Aug 23 05:57:12 CDT 2011)
[all classes][felix.executor]

COVERAGE SUMMARY FOR SOURCE FILE [DDExecutor.java]

nameclass, %method, %block, %line, %
DDExecutor.java100% (1/1)100% (3/3)96%  (1350/1405)94%  (174/186)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class DDExecutor100% (1/1)100% (3/3)96%  (1350/1405)94%  (174/186)
dumpMapAnswerForPredicate (RDB, FelixPredicate, BufferedWriter): void 100% (1/1)81%  (206/255)81%  (34/42)
run (): void 100% (1/1)99%  (1124/1130)97%  (134/138)
DDExecutor (ExecutionPlan, FelixQuery, FelixCommandOptions): void 100% (1/1)100% (20/20)100% (6/6)

1package felix.executor;
2 
3import java.io.BufferedWriter;
4import java.io.File;
5import java.sql.ResultSet;
6import java.util.ArrayList;
7import java.util.HashMap;
8import java.util.HashSet;
9 
10import com.google.common.io.Files;
11 
12import tuffy.db.RDB;
13import tuffy.infer.DataMover;
14import tuffy.mln.MarkovLogicNetwork;
15import tuffy.mln.Predicate;
16import tuffy.parse.CommandOptions;
17import tuffy.util.Config;
18import tuffy.util.ExceptionMan;
19import tuffy.util.FileMan;
20import tuffy.util.StringMan;
21import tuffy.util.UIMan;
22 
23import felix.dstruct.ConcurrentOperatorsBucket;
24import felix.dstruct.ExecutionPlan;
25import felix.dstruct.FelixPredicate;
26import felix.dstruct.FelixQuery;
27import felix.dstruct.StatOperator.OPType;
28import felix.main.Felix;
29import felix.optimizer.DMOOptimizer;
30import felix.parser.FelixCommandOptions;
31import felix.util.FelixConfig;
32import felix.util.FelixUIMan;
33 
34 
35 
36/**
37 * Class for executing a given physical {@link ExecutionPlan} using 
38 * dual decomposition.
39 * 
40 * @author Ce Zhang
41 *
42 */
43public class DDExecutor {
44 
45        /**
46         * The execution plan to be executed.
47         */
48        ExecutionPlan ep;
49        
50        /**
51         * The FelixQuery used by this Executor.
52         */
53        FelixQuery fq;
54        
55        /**
56         * The FelixCommandOptions used by this Executor.
57         */
58        FelixCommandOptions options;
59        
60        /**
61         * The DMOOptimizer used by this Executor.
62         */
63        public DMOOptimizer dmoo;
64                
65        /**
66         * Execute this plan. If Felix runs in
67         * explain mode, just prints out the physical plan.
68         * 
69         * <p> TODO: Find a better way to the explain mode (e.g., a graph).
70         */
71        @SuppressWarnings("unchecked")
72        public void run(){
73                
74                try{
75                        
76                        int sum = -1;
77                        int nIt = FelixConfig.nDDIT;
78                        
79                        for(int i = ep.operators.size()-1; i >=0; i --){
80                                ep.operators.get(i).setNCore(Config.getNumThreads());
81                                ep.operators.get(i).prepareDB44DD();                                
82                        }
83 
84                        // the first run of DD replicates the NON-DD executor.
85                        // this is used to decides the scope of the downstream MLN
86                        // operator.
87                        FelixConfig.isFirstRunOfDD = true;
88                        
89                        for(int i = ep.operators.size()-1; i >=0; i --){
90                                dmoo.optimizeDMO(ep.operators.get(i));
91                                ep.operators.get(i).run();
92                                
93                        }                        
94                        
95                        BufferedWriter bw = FileMan.getBufferedWriterMaybeGZ(options.fout + "_snap_0" );
96                        RDB newDB = RDB.getRDBbyConfig(Config.db_schema);
97                        for(FelixPredicate fp : fq.getPredicates()){
98                                if(fp.hasQuery()){
99                                        UIMan.println(">>> Dumping results for " + fp + "\n");
100                                        this.dumpMapAnswerForPredicate(newDB, fp, bw);
101                                }
102                        }
103                        bw.close();
104                        
105                        File from = new File(options.fout + "_snap_" + (FelixConfig.nDDIT-nIt));
106                        File to = new File(options.fout);
107                        if(to.exists()){
108                                to.delete();
109                        }
110                        Files.copy(from, to);
111                        
112                        FelixConfig.isFirstRunOfDD = false;
113                        
114                        // copy state of each table
115                        HashSet<FelixPredicate> sharedPreds = new HashSet<FelixPredicate>();
116                        for(int i = ep.operators.size()-1; i >=0; i --){
117                                sharedPreds.addAll(ep.operators.get(i).dd_CommonOutput);
118                                if(ep.operators.get(i).type == OPType.TUFFY){
119                                        sharedPreds.addAll(ep.operators.get(i).outputPredicates);
120                                }
121                        }
122                        for(FelixPredicate fp : sharedPreds){
123                                UIMan.println(">>> Backing up table " + fp.getName());
124                                newDB.dropTable("_copy_ori_" + fp.getRelName());
125                                String sql = "CREATE TABLE _copy_ori_" + fp.getRelName()
126                                                + " AS SELECT * FROM " + fp.getRelName() + ";";
127                                newDB.execute(sql);
128                        }
129                        newDB.close();
130                        
131                        
132                        while(sum != 0 && nIt-- > 0 ){
133                                
134                                newDB = RDB.getRDBbyConfig(Config.db_schema);
135                                
136                                for(FelixPredicate fp : sharedPreds){
137                                        UIMan.println(">>> Restoring table " + fp.getName());
138                                        newDB.dropTable(fp.getRelName());
139                                        newDB.dropView(fp.getRelName());
140                                        String sql = "CREATE TABLE " + fp.getRelName()
141                                                        + " AS SELECT * FROM _copy_ori_" + fp.getRelName() + ";";
142                                        newDB.execute(sql);
143                                }
144                                for(FelixPredicate fp : fq.getAllPred()){
145                                        if(fp.isCorefPredicate && !sharedPreds.contains(fp)){
146                                                newDB.dropTable(fp.getRelName());
147                                                newDB.dropView(fp.getRelName());
148                                                newDB.execute(fp.viewDef);
149                                        }                                        
150                                }
151                                newDB.commit();
152                                newDB.close();
153                        
154                                for(int i = ep.operators.size()-1; i >=0; i --){
155                                        dmoo.optimizeDMO(ep.operators.get(i));
156                                        ep.operators.get(i).run();
157                                }
158                                
159                                bw = FileMan.getBufferedWriterMaybeGZ(options.fout + "_snap_" + (FelixConfig.nDDIT-nIt) );
160                                newDB = RDB.getRDBbyConfig(Config.db_schema);
161                                for(FelixPredicate fp : fq.getPredicates()){
162                                        if(fp.hasQuery()){
163                                                UIMan.println(">>> Dumping results for " + fp + "\n");
164                                                this.dumpMapAnswerForPredicate(newDB, fp, bw);
165                                        }
166                                }
167                                newDB.close();
168                                bw.close();
169                                
170                                from = new File(options.fout + "_snap_" + (FelixConfig.nDDIT-nIt));
171                                to = new File(options.fout);
172                                if(to.exists()){
173                                        to.delete();
174                                }
175                                Files.copy(from, to);
176                                
177                                
178                                
179                                sum = 0;
180                                
181                                //CURRENTLY ASSUME ONE PREDICATE IS SHARED BY ONLY TWO OPERATORS 
182                                // this is ensured by Felix's compiler.
183                                for(FelixPredicate fp : fq.getAllPred()){
184                                        
185                                        ConcurrentOperatorsBucket cob1 = null;
186                                        ConcurrentOperatorsBucket cob2 = null;
187                                        
188                                        for(int i = ep.operators.size()-1; i >=0; i --){
189                                                if(ep.operators.get(i).dd_CommonOutput.contains(fp)){
190                                                        if(cob1 == null){
191                                                                cob1 = ep.operators.get(i);
192                                                        }else if(cob2 == null){
193                                                                cob2 = ep.operators.get(i);
194                                                        }else{
195                                                                ExceptionMan.die("ERROR 12: " +
196                                                                                "There must be something wrong with the compiler");
197                                                        }
198                                                }
199                                        }
200                                        
201                                        if(cob1==null || cob2==null){
202                                                continue;
203                                        }
204                                        
205                                        String tableName1;
206                                        String tableName2;
207                                        String priorTable1;
208                                        String priorTable2;
209                                        
210                                        tableName1 = cob1.dd_commonOutputPredicate_2_tableName.get(fp);
211                                        tableName2 = cob2.dd_commonOutputPredicate_2_tableName.get(fp);
212                                        
213                                        priorTable1 = cob1.dd_commonOutputPredicate_2_priorTableName.get(fp);
214                                        priorTable2 = cob2.dd_commonOutputPredicate_2_priorTableName.get(fp);
215                                        
216                                        
217                                        String args = StringMan.commaList(fp.getArgs());
218                    ArrayList<String> whereargsarray = new ArrayList<String>();
219                    for(String a : fp.getArgs()){
220                            whereargsarray.add("t0." + a + "=" + "t1." + a);
221                    }
222                    String whereargs = StringMan.join(" AND ", whereargsarray);
223                    ArrayList<String> t0argsArray = new ArrayList<String>();
224                    for(String a : fp.getArgs()){
225                            t0argsArray.add("t0." + a);
226                    }
227                    String t0args = StringMan.commaList(t0argsArray);
228                    
229                    ArrayList<String> t1argsArray = new ArrayList<String>();
230                    for(String a : fp.getArgs()){
231                            t1argsArray.add("t1." + a);
232                    }
233                    String t1args = StringMan.commaList(t1argsArray);
234 
235                                        String priorArgs = StringMan.commaList(fp.getArgs());
236                                        priorArgs = priorArgs + "," + "float_" + (fp.arity()+1);
237                                        priorArgs = "truth, club, " + priorArgs;
238                                        
239                                        if(options.marginal == true){
240 
241                                                String deltaCase = "(CASE WHEN abs(t0.prior-t1.prior)<0.1 THEN " + 0 +
242                                                                " ELSE 0.9*(t0.prior-t1.prior) END) AS prior11";
243                                                String negDeltaCase = "(CASE WHEN abs(t1.prior-t0.prior)<0.1 THEN " + 0 +
244                                                                " ELSE 0.9*(t1.prior-t0.prior) END) AS prior11";
245                                                
246                                                
247                         String sql = "(SELECT NULL::Float, TRUE, 2, " + t0args + ", " + deltaCase +" FROM " + tableName1 + " t0, " + tableName2  + " t1 " + " WHERE "
248                                 + whereargs+")";
249                         
250                         sql = sql + " UNION ALL " + "(SELECT NULL::Float, TRUE, 2, " + t0args + ", t0.prior FROM " + tableName1 + " t0 " +
251                                         " WHERE (" + t0args + ") NOT IN (SELECT " + t1args + " FROM " + tableName2 + " t1) AND t0.prior>0.1)";
252 
253                         sql =  "INSERT INTO " + priorTable2 + "(prior, " + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt WHERE nt.prior11<>0";
254                         Felix.db.update(sql);
255                         
256                         
257                         UIMan.println(">>> INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob2);
258 
259                         sql = "(SELECT NULL::Float, TRUE, 2, " + t0args + "," + negDeltaCase + " FROM " + tableName1 + " t0, " + tableName2  + " t1 " + " WHERE "
260                         + whereargs+")";
261                         
262                         sql = sql + " UNION ALL " + "(SELECT NULL::Float, TRUE, 2, " + t0args + ", -t0.prior FROM " + tableName2 + " t0 " +
263                                         " WHERE (" + t0args + ") NOT IN (SELECT " + t1args + " FROM " + tableName1 + " t1) AND t0.prior>0.1)";
264                         
265                         sql = "INSERT INTO " + priorTable1 + "(prior, " + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt WHERE nt.prior11<>0";
266                         Felix.db.update(sql);
267                         UIMan.println(">>> INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob1);
268 
269                         sum += Felix.db.getLastUpdateRowCount();
270 
271                                        }else{
272                                                String sql = "SELECT NULL::Float, TRUE, 2, " + args + ", " + (0.9) + " FROM " + tableName1 + " WHERE ("
273                                                        + args + ") NOT IN ( SELECT " + args + " FROM " + tableName2 + ")"
274                                                        + " UNION ALL " + 
275                                                        "SELECT NULL::Float, TRUE, 2, " + args + ", " + (-0.9) + " FROM " + tableName2 + " WHERE ("
276                                                        + args + ") NOT IN ( SELECT " + args + " FROM " + tableName1 + ")";
277                                                sql = "INSERT INTO " + priorTable2 + "(prior, " + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt";
278                                                Felix.db.update(sql);
279                                                UIMan.println(">>> INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob2);
280                                                
281                                                sql = "SELECT NULL::Float, TRUE, 2, " + args + ", " + (-0.9) + " FROM " + tableName1 + " WHERE ("
282                                                + args + ") NOT IN ( SELECT " + args + " FROM " + tableName2 + ")"
283                                                + " UNION ALL " + 
284                                                "SELECT NULL::Float, TRUE, 2, " + args + ", " + (0.9) + " FROM " + tableName2 + " WHERE ("
285                                                + args + ") NOT IN ( SELECT " + args + " FROM " + tableName1 + ")";
286                                                sql = "INSERT INTO " + priorTable1 + "(prior, " + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt";
287                                                Felix.db.update(sql);
288                                                UIMan.println(">>> INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob1);
289                                                
290                        sum += Felix.db.getLastUpdateRowCount();
291                                                
292                     }
293                                }
294                                
295                                Felix.db.commit();
296                                
297                        }
298                        
299                
300                }catch(Exception e){
301                        e.printStackTrace();
302                }
303                
304                        
305        }
306        
307        /**
308         * The constructor.
309         * @param _ep
310         */
311        public DDExecutor(ExecutionPlan _ep, FelixQuery _fq, FelixCommandOptions _options){
312                ep = _ep;
313                dmoo = new DMOOptimizer(ep.getCostModel());
314                fq = _fq;
315                options = _options;
316        }
317 
318        /**
319         * Output the results of this bucket.
320         * @param db
321         * @param fout
322         * @param p
323         */
324        public void dumpMapAnswerForPredicate(RDB db, FelixPredicate p, BufferedWriter bufferedWriter) {
325                //        spreadTruth();
326                        HashMap<Long,String> cmap = db.loadIdSymbolMapFromTable();
327                        try {
328                                
329                                int digits = 4;
330                                
331                                String sql;
332                                String tableName = p.getRelName();
333                                String predName = p.getName();
334                                
335                                if(options.useDualDecomposition){
336                                        //sql = "SELECT * FROM " + tableName + " WHERE truth=TRUE ORDER BY prior DESC";
337                                        if(!p.isCorefMapPredicate){
338                                                if(p.belongsTo != null && p.belongsTo.dd_commonOutputPredicate_2_tableName.containsKey(p)){
339                                                        sql = "SELECT * FROM " + p.belongsTo.dd_commonOutputPredicate_2_tableName.get(p) + " WHERE truth=TRUE ORDER BY prior DESC";
340                                                }else{
341                                                        sql = "SELECT * FROM " + p.getRelName() + " WHERE truth=TRUE ORDER BY prior DESC";
342                                                }
343                                        }else{
344                                                sql = "SELECT * FROM " + tableName + " ORDER BY prior DESC";
345                                        }
346                                }else{
347                                        sql = "SELECT * FROM " + tableName + " WHERE truth=TRUE ORDER BY prior DESC";
348                                }
349                                
350                                ResultSet rs = db.query(sql);
351                                while(rs == null){
352                                        rs = db.query(sql);
353                                }
354                                while(rs.next()) {
355                                        String line = predName + "(";
356                                        ArrayList<String> cs = new ArrayList<String>();
357                                        int ct = 0;
358                                        for(String a : p.getArgs()) {
359                                                
360                                                
361                                                if(p.getTypeAt(ct).isProbArg == true || p.getTypeAt(ct).isNonSymbolicType()){
362                                                        cs.add(rs.getDouble(a)+"");
363                                                }else{
364                                                        long c = rs.getLong(a);
365                                                
366                                                
367                                                        String v = StringMan.escapeJavaString(cmap.get(c));
368                                                
369                                                //if(v.matches("^[0-9].*$") && !StringMan.escapeJavaString(v).contains(" ")){
370                                                //        cs.add("" + StringMan.escapeJavaString(v) + "");
371                                                //}else{
372                                                        cs.add("\"" + StringMan.escapeJavaString(v) + "\"");
373                                                //}
374                                                }
375                                                ct ++;
376                                        }
377                                        line += StringMan.commaList(cs) + ")";
378                                        
379                                        double prior = 1;
380                                        if(options.marginal){
381                                                
382                                                double prob;
383                                                if(rs.getString("prior") == null){
384                                                        prob = 1;
385                                                }else{
386                                                        prob = Double.valueOf(rs.getString("prior"));
387                                                }
388                                                
389                                                if(Config.output_prolog_format){
390                                                
391                                                        line = "tuffyPrediction(" + UIMan.decimalRound(digits, prob) +
392                                                                ", " + line + ").";
393                                                }else{
394                                                        line = UIMan.decimalRound(digits, prob) + "\t" + line;
395 
396                                                }
397                                                
398                                        }else{
399                                                line =  line;
400                                        }
401                                        
402                                        if(prior >= options.minProb){
403                                                bufferedWriter.append(line + "\n");
404                                        }
405                                        
406                                }
407                                rs.close();
408                                //bufferedWriter.close();
409                        } catch (Exception e) {
410                                ExceptionMan.handle(e);
411                        }
412                }
413}
414 
415 
416 
417 

[all classes][felix.executor]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov