EMMA Coverage Report (generated Sat Aug 20 11:00:51 CDT 2011)
[all classes][felix.executor]

COVERAGE SUMMARY FOR SOURCE FILE [DDExecutor.java]

nameclass, %method, %block, %line, %
DDExecutor.java100% (1/1)100% (3/3)45%  (538/1184)54%  (88.8/163)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class DDExecutor100% (1/1)100% (3/3)45%  (538/1184)54%  (88.8/163)
run (): void 100% (1/1)36%  (331/912)43%  (49.8/115)
dumpMapAnswerForPredicate (RDB, FelixPredicate, BufferedWriter): void 100% (1/1)74%  (187/252)79%  (33/42)
DDExecutor (ExecutionPlan, FelixQuery, FelixCommandOptions): void 100% (1/1)100% (20/20)100% (6/6)

1package felix.executor;
2 
3import java.io.BufferedWriter;
4import java.sql.ResultSet;
5import java.util.ArrayList;
6import java.util.HashMap;
7import java.util.HashSet;
8 
9import tuffy.db.RDB;
10import tuffy.infer.DataMover;
11import tuffy.mln.MarkovLogicNetwork;
12import tuffy.mln.Predicate;
13import tuffy.parse.CommandOptions;
14import tuffy.util.Config;
15import tuffy.util.ExceptionMan;
16import tuffy.util.FileMan;
17import tuffy.util.StringMan;
18import tuffy.util.UIMan;
19 
20import felix.dstruct.ConcurrentOperatorsBucket;
21import felix.dstruct.ExecutionPlan;
22import felix.dstruct.FelixPredicate;
23import felix.dstruct.FelixQuery;
24import felix.dstruct.StatOperator.OPType;
25import felix.main.Felix;
26import felix.optimizer.DMOOptimizer;
27import felix.parser.FelixCommandOptions;
28import felix.util.FelixConfig;
29import felix.util.FelixUIMan;
30 
31 
32 
33/**
34 * Class for executing a given physical {@link ExecutionPlan} using 
35 * dual decomposition.
36 * 
37 * @author Ce Zhang
38 *
39 */
40public class DDExecutor {
41 
42        /**
43         * The execution plan to be executed.
44         */
45        ExecutionPlan ep;
46        
47        /**
48         * The FelixQuery used by this Executor.
49         */
50        FelixQuery fq;
51        
52        /**
53         * The FelixCommandOptions used by this Executor.
54         */
55        FelixCommandOptions options;
56        
57        /**
58         * The DMOOptimizer used by this Executor.
59         */
60        public DMOOptimizer dmoo;
61                
62        /**
63         * Execute this plan. If Felix runs in
64         * explain mode, just prints out the physical plan.
65         * 
66         * <p> TODO: Find a better way to the explain mode (e.g., a graph).
67         */
68        @SuppressWarnings("unchecked")
69        public void run(){
70                
71                try{
72                        
73                        int sum = -1;
74                        int nIt = FelixConfig.nDDIT;
75                        
76                        for(int i = ep.operators.size()-1; i >=0; i --){
77                                ep.operators.get(i).setNCore(Config.getNumThreads());
78                                ep.operators.get(i).prepareDB44DD();                                
79                        }
80 
81                        // the first run of DD replicates the NON-DD executor.
82                        // this is used to decides the scope of the downstream MLN
83                        // operator.
84                        FelixConfig.isFirstRunOfDD = true;
85                        
86                        for(int i = ep.operators.size()-1; i >=0; i --){
87                                dmoo.optimizeDMO(ep.operators.get(i));
88                                ep.operators.get(i).run();
89                                
90                        }                        
91                        
92                        BufferedWriter bw = FileMan.getBufferedWriterMaybeGZ(options.fout + "_snap_0" );
93                        RDB newDB = RDB.getRDBbyConfig(Config.db_schema);
94                        for(FelixPredicate fp : fq.getPredicates()){
95                                if(fp.hasQuery()){
96                                        UIMan.println(">>> Dumping results for " + fp + "\n");
97                                        this.dumpMapAnswerForPredicate(newDB, fp, bw);
98                                }
99                        }
100                        bw.close();
101                        
102                        FelixConfig.isFirstRunOfDD = false;
103                        
104                        // copy state of each table
105                        HashSet<FelixPredicate> sharedPreds = new HashSet<FelixPredicate>();
106                        for(int i = ep.operators.size()-1; i >=0; i --){
107                                sharedPreds.addAll(ep.operators.get(i).dd_CommonOutput);
108                        }
109                        for(FelixPredicate fp : sharedPreds){
110                                newDB.dropTable("_copy_ori_" + fp.getRelName());
111                                String sql = "CREATE TABLE _copy_ori_" + fp.getRelName()
112                                                + " AS SELECT * FROM " + fp.getRelName() + ";";
113                                newDB.execute(sql);
114                        }
115                        newDB.close();
116                        
117                        
118                        while(sum != 0 && nIt-- > 0 ){
119                                
120                                newDB = RDB.getRDBbyConfig(Config.db_schema);
121                                
122                                for(FelixPredicate fp : sharedPreds){
123                                        newDB.dropTable(fp.getRelName());
124                                        String sql = "CREATE TABLE " + fp.getRelName()
125                                                        + " AS SELECT * FROM _copy_ori_" + fp.getRelName() + ";";
126                                        newDB.execute(sql);
127                                }
128                                for(FelixPredicate fp : fq.getAllPred()){
129                                        if(fp.isCorefPredicate && !sharedPreds.contains(fp)){
130                                                newDB.dropTable(fp.getRelName());
131                                                newDB.dropView(fp.getRelName());
132                                                newDB.execute(fp.viewDef);
133                                        }                                        
134                                }
135                                newDB.commit();
136                                newDB.close();
137                        
138                                for(int i = ep.operators.size()-1; i >=0; i --){
139                                        dmoo.optimizeDMO(ep.operators.get(i));
140                                        ep.operators.get(i).run();
141                                }
142                                
143                                bw = FileMan.getBufferedWriterMaybeGZ(options.fout + "_snap_" + (FelixConfig.nDDIT-nIt) );
144                                newDB = RDB.getRDBbyConfig(Config.db_schema);
145                                for(FelixPredicate fp : fq.getPredicates()){
146                                        if(fp.hasQuery()){
147                                                UIMan.println(">>> Dumping results for " + fp + "\n");
148                                                this.dumpMapAnswerForPredicate(newDB, fp, bw);
149                                        }
150                                }
151                                newDB.close();
152                                bw.close();
153                                
154                                sum = 0;
155                                
156                                //CURRENTLY ASSUME ONE PREDICATE IS SHARED BY ONLY TWO OPERATORS 
157                                // this is ensured by Felix's compiler.
158                                for(FelixPredicate fp : fq.getAllPred()){
159                                        
160                                        ConcurrentOperatorsBucket cob1 = null;
161                                        ConcurrentOperatorsBucket cob2 = null;
162                                        
163                                        for(int i = ep.operators.size()-1; i >=0; i --){
164                                                if(ep.operators.get(i).dd_CommonOutput.contains(fp)){
165                                                        if(cob1 == null){
166                                                                cob1 = ep.operators.get(i);
167                                                        }else if(cob2 == null){
168                                                                cob2 = ep.operators.get(i);
169                                                        }else{
170                                                                ExceptionMan.die("ERROR 12: " +
171                                                                                "There must be something wrong with the compiler");
172                                                        }
173                                                }
174                                        }
175                                        
176                                        if(cob1==null || cob2==null){
177                                                continue;
178                                        }
179                                        
180                                        String tableName1;
181                                        String tableName2;
182                                        String priorTable1;
183                                        String priorTable2;
184                                        
185                                        tableName1 = cob1.dd_commonOutputPredicate_2_tableName.get(fp);
186                                        tableName2 = cob2.dd_commonOutputPredicate_2_tableName.get(fp);
187                                        
188                                        priorTable1 = cob1.dd_commonOutputPredicate_2_priorTableName.get(fp);
189                                        priorTable2 = cob2.dd_commonOutputPredicate_2_priorTableName.get(fp);
190                                        
191                                        
192                                        String args = StringMan.commaList(fp.getArgs());
193                    ArrayList<String> whereargsarray = new ArrayList<String>();
194                    for(String a : fp.getArgs()){
195                            whereargsarray.add("t0." + a + "=" + "t1." + a);
196                    }
197                    String whereargs = StringMan.join(" AND ", whereargsarray);
198                    ArrayList<String> t0argsArray = new ArrayList<String>();
199                    for(String a : fp.getArgs()){
200                            t0argsArray.add("t0." + a);
201                    }
202                    String t0args = StringMan.commaList(t0argsArray);
203 
204                                        String priorArgs = StringMan.commaList(fp.getArgs());
205                                        priorArgs = priorArgs + "," + "float_" + (fp.arity()+1);
206                                        priorArgs = "truth, club, " + priorArgs;
207                                        
208                                        if(options.marginal == true){
209 
210                                                String deltaCase = "(CASE WHEN abs(t0.prior-t1.prior)<0.1 THEN " + 0 +
211                                                                " ELSE 0.9*(t0.prior-t1.prior) END) AS prior11";
212                                                String negDeltaCase = "(CASE WHEN abs(t1.prior-t0.prior)<0.1 THEN " + 0 +
213                                                                " ELSE 0.9*(t1.prior-t0.prior) END) AS prior11";
214                                                
215                                                
216                         String sql = "SELECT TRUE, 2, " + t0args + ", " + deltaCase +" FROM " + tableName1 + " t0, " + tableName2  + " t1 " + " WHERE "
217                                 + whereargs;
218 
219                         sql =  "INSERT INTO " + priorTable2 + "(" + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt WHERE nt.prior11<>0";
220                         Felix.db.update(sql);
221                         UIMan.warn("INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob2);
222 
223                         sql = "SELECT TRUE, 2, " + t0args + "," + negDeltaCase + " FROM " + tableName1 + " t0, " + tableName2  + " t1 " + " WHERE "
224                         + whereargs;
225                         
226                         sql = "INSERT INTO " + priorTable1 + "(" + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt WHERE nt.prior11<>0";
227                         Felix.db.update(sql);
228                         UIMan.warn("INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob1);
229 
230                         sum += Felix.db.getLastUpdateRowCount();
231 
232                                        }else{
233                                                String sql = "SELECT TRUE, 2, " + args + ", " + (1.0/2) + " FROM " + tableName1 + " WHERE ("
234                                                        + args + ") NOT IN ( SELECT " + args + " FROM " + tableName2 + ")"
235                                                        + " UNION ALL " + 
236                                                        "SELECT TRUE, 2, " + args + ", " + (-1.0/2) + " FROM " + tableName2 + " WHERE ("
237                                                        + args + ") NOT IN ( SELECT " + args + " FROM " + tableName1 + ")";
238                                                sql = "INSERT INTO " + priorTable2 + "(" + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt";
239                                                Felix.db.update(sql);
240                                                UIMan.warn("INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob2);
241                                                
242                                                sql = "SELECT TRUE, 2, " + args + ", " + (-1.0/2) + " FROM " + tableName1 + " WHERE ("
243                                                + args + ") NOT IN ( SELECT " + args + " FROM " + tableName2 + ")"
244                                                + " UNION ALL " + 
245                                                "SELECT TRUE, 2, " + args + ", " + (1.0/2) + " FROM " + tableName2 + " WHERE ("
246                                                + args + ") NOT IN ( SELECT " + args + " FROM " + tableName1 + ")";
247                                                sql = "INSERT INTO " + priorTable1 + "(" + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt";
248                                                Felix.db.update(sql);
249                                                UIMan.warn("INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob1);
250                                                
251                        sum += Felix.db.getLastUpdateRowCount();
252                                                
253                     }
254                                }
255                                
256                                Felix.db.commit();
257                                
258                        }
259                        
260                
261                }catch(Exception e){
262                        e.printStackTrace();
263                }
264                
265                        
266        }
267        
268        /**
269         * The constructor.
270         * @param _ep
271         */
272        public DDExecutor(ExecutionPlan _ep, FelixQuery _fq, FelixCommandOptions _options){
273                ep = _ep;
274                dmoo = new DMOOptimizer(ep.getCostModel());
275                fq = _fq;
276                options = _options;
277        }
278 
279        /**
280         * Output the results of this bucket.
281         * @param db
282         * @param fout
283         * @param p
284         */
285        public void dumpMapAnswerForPredicate(RDB db, FelixPredicate p, BufferedWriter bufferedWriter) {
286                //        spreadTruth();
287                        HashMap<Long,String> cmap = db.loadIdSymbolMapFromTable();
288                        try {
289                                
290                                int digits = 4;
291                                
292                                String sql;
293                                String tableName = p.getRelName();
294                                String predName = p.getName();
295                                
296                                if(options.useDualDecomposition){
297                                        //sql = "SELECT * FROM " + tableName + " WHERE truth=TRUE ORDER BY prior DESC";
298                                        if(!p.isCorefMapPredicate){
299                                                if(p.belongsTo.dd_commonOutputPredicate_2_tableName.containsKey(p)){
300                                                        sql = "SELECT * FROM " + p.belongsTo.dd_commonOutputPredicate_2_tableName.get(p) + " WHERE truth=TRUE ORDER BY prior DESC";
301                                                }else{
302                                                        sql = "SELECT * FROM " + p.getRelName() + " WHERE truth=TRUE ORDER BY prior DESC";
303                                                }
304                                        }else{
305                                                sql = "SELECT * FROM " + tableName + " ORDER BY prior DESC";
306                                        }
307                                }else{
308                                        sql = "SELECT * FROM " + tableName + " WHERE truth=TRUE ORDER BY prior DESC";
309                                }
310                                
311                                ResultSet rs = db.query(sql);
312                                while(rs == null){
313                                        rs = db.query(sql);
314                                }
315                                while(rs.next()) {
316                                        String line = predName + "(";
317                                        ArrayList<String> cs = new ArrayList<String>();
318                                        int ct = 0;
319                                        for(String a : p.getArgs()) {
320                                                
321                                                
322                                                if(p.getTypeAt(ct).isProbArg == true || p.getTypeAt(ct).isNonSymbolicType()){
323                                                        cs.add(rs.getDouble(a)+"");
324                                                }else{
325                                                        long c = rs.getLong(a);
326                                                
327                                                
328                                                        String v = StringMan.escapeJavaString(cmap.get(c));
329                                                
330                                                //if(v.matches("^[0-9].*$") && !StringMan.escapeJavaString(v).contains(" ")){
331                                                //        cs.add("" + StringMan.escapeJavaString(v) + "");
332                                                //}else{
333                                                        cs.add("\"" + StringMan.escapeJavaString(v) + "\"");
334                                                //}
335                                                }
336                                                ct ++;
337                                        }
338                                        line += StringMan.commaList(cs) + ")";
339                                        
340                                        double prior = 1;
341                                        if(options.marginal){
342                                                
343                                                double prob;
344                                                if(rs.getString("prior") == null){
345                                                        prob = 1;
346                                                }else{
347                                                        prob = Double.valueOf(rs.getString("prior"));
348                                                }
349                                                
350                                                if(Config.output_prolog_format){
351                                                
352                                                        line = "tuffyPrediction(" + UIMan.decimalRound(digits, prob) +
353                                                                ", " + line + ").";
354                                                }else{
355                                                        line = UIMan.decimalRound(digits, prob) + "\t" + line;
356 
357                                                }
358                                                
359                                        }else{
360                                                line =  line;
361                                        }
362                                        
363                                        if(prior >= options.minProb){
364                                                bufferedWriter.append(line + "\n");
365                                        }
366                                        
367                                }
368                                rs.close();
369                                //bufferedWriter.close();
370                        } catch (Exception e) {
371                                ExceptionMan.handle(e);
372                        }
373                }
374}
375 
376 
377 
378 

[all classes][felix.executor]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov