EMMA Coverage Report (generated Sat Aug 20 11:00:51 CDT 2011)
[all classes][felix.io]

COVERAGE SUMMARY FOR SOURCE FILE [HadoopPostgreSQLPopulator.java]

nameclass, %method, %block, %line, %
HadoopPostgreSQLPopulator.java100% (1/1)43%  (3/7)57%  (237/414)55%  (44/80)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class HadoopPostgreSQLPopulator100% (1/1)43%  (3/7)57%  (237/414)55%  (44/80)
HadoopPostgreSQLPopulator (): void 0%   (0/1)0%   (0/3)0%   (0/1)
dumpTableToHDFS (String, String, int, String): void 0%   (0/1)0%   (0/99)0%   (0/19)
main (String []): void 0%   (0/1)0%   (0/27)0%   (0/7)
populateTableFromDir (String, String, List, List, String, int): void 0%   (0/1)0%   (0/42)0%   (0/7)
createAndPopulateTable (String, String, List, List, String): void 100% (1/1)96%  (69/72)92%  (12/13)
populateTable (String, String, List, List, String): void 100% (1/1)97%  (112/115)96%  (23/24)
createAndPopulateTableFromDir (String, String, List, List, String, int): void 100% (1/1)100% (56/56)100% (9/9)

1package felix.io;
2 
3import java.io.BufferedWriter;
4import java.io.OutputStreamWriter;
5import java.sql.ResultSet;
6import java.util.ArrayList;
7import java.util.Arrays;
8import java.util.List;
9import org.apache.hadoop.conf.Configuration;
10import org.apache.hadoop.fs.FSDataInputStream;
11import org.apache.hadoop.fs.FileSystem;
12import org.apache.hadoop.fs.Path;
13import org.postgresql.PGConnection;
14 
15import felix.util.FelixConfig;
16import tuffy.db.RDB;
17import tuffy.util.StringMan;
18import tuffy.util.UIMan;
19 
20/**
21 * Class for populating tables from 
22 * files/directories generated with hadoop.
23 * @author Ce Zhang
24 *
25 */
26public class HadoopPostgreSQLPopulator{
27        
28        /**
29         * Dumps given schema.table in to the given file.
30         * @param schema
31         * @param tableName
32         * @param nColumn
33         * @param filePath
34         */
35        public static void dumpTableToHDFS(
36                                                                                String           schema,
37                                                                                String           tableName,
38                                                                                int              nColumn,
39                                                                                String                         filePath){
40                try{
41                        Path pt=new Path(filePath);
42                        Configuration conf = new Configuration();
43                        
44                        conf.set("fs.default.name", FelixConfig.hdfsServer);
45                        conf.set("fs.defaultFS", FelixConfig.hdfsServer);
46                        
47                        FileSystem fs = FileSystem.get(conf);
48                        BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
49                        
50                        RDB db = RDB.getRDBbyConfig(schema);
51                        db.disableAutoCommitForNow();
52                        
53                        ResultSet rs = db.query("SELECT * FROM " + tableName);
54                        while(rs.next()){
55                                ArrayList<String> tmps = new ArrayList<String>();
56                                for(int i=0;i<nColumn;i++){
57                                        tmps.add(rs.getString(i+1).replaceAll("\t", "\\\\t").replaceAll("\n", "\\\\n").replaceAll("\r", "\\\\r"));
58                                }
59                                br.write(StringMan.join("\t", tmps) + "\n");
60                        }
61                        
62                        db.close();
63                        br.close();
64                }catch(Exception e){
65                        e.printStackTrace();
66                }
67                
68                
69        }
70        
71        
72        
73        /**
74         * Creates and populates the given schema.table from the given file.
75         * @param schema
76         * @param tableName
77         * @param columnNames
78         * @param columnTypes
79         * @param filePath
80         * @throws Exception
81         */
82        public static void createAndPopulateTable(        
83                                                                                String            schema,
84                                                                                String            tableName,
85                                                                                List<String>           columnNames,
86                                                                                List<String>           columnTypes,
87                                                                                String                      filePath      ) throws Exception{
88                
89                RDB db;
90                if(schema == null){
91                        db = RDB.getRDBbyConfig();
92                }else{
93                        db = RDB.getRDBbyConfig(schema);
94                }
95                db.dropTable(tableName);
96                
97                ArrayList<String> schemas = new ArrayList<String>();
98                for(int i=0;i<columnNames.size();i++){
99                        schemas.add(columnNames.get(i) + " " + columnTypes.get(i));
100                }
101                
102                db.dropTable(tableName);
103                
104                String sql = "CREATE TABLE " + tableName + " ( truth BOOL, prior FLOAT, " + StringMan.commaList(schemas) + " )";
105                db.execute(sql);
106                db.close();
107                
108                HadoopPostgreSQLPopulator.populateTable(schema, tableName, columnNames, columnTypes, filePath);
109                
110                
111        }
112        
113        //TODO: CHECK HDFS API AND MOVE TO MORE ELEGANT WAY
114        /**
115         * Populates the given schema.table from the given directory.
116         * @param schema
117         * @param tableName
118         * @param columnNames
119         * @param columnTypes
120         * @param dirPath
121         * @param nReducer
122         * @throws Exception
123         */
124        public static void populateTableFromDir(        
125                                                                        String            schema,
126                                                                        String            tableName,
127                                                                        List<String>           columnNames,
128                                                                        List<String>           columnTypes,
129                                                                        String                      dirPath,
130                                                                        int                           nReducer) throws Exception{
131                
132                for(int i=0;i<nReducer;i++){
133                        String fileName = "" + nReducer;
134                        while(fileName.length() < 5){
135                                fileName = "0" + fileName;
136                        }
137                        fileName = "part-" + fileName;
138                        HadoopPostgreSQLPopulator.populateTable(schema, tableName, columnNames, columnTypes, fileName);
139                }
140        }
141        
142        //TODO: CHECK HDFS API AND MOVE TO MORE ELEGANT WAY
143        /**
144         * Creates and populates the given schema.table from the given directory.
145         * @param schema
146         * @param tableName
147         * @param columnNames
148         * @param columnTypes
149         * @param dirPath
150         * @param nReducer
151         * @throws Exception
152         */
153        public static void createAndPopulateTableFromDir(        
154                                                                        String            schema,
155                                                                        String            tableName,
156                                                                        List<String>           columnNames,
157                                                                        List<String>           columnTypes,
158                                                                        String                      dirPath,
159                                                                        int                           nReducer) throws Exception{
160 
161                for(int i=0;i<nReducer;i++){
162                        String fileName = "" + i;
163                        while(fileName.length() < 5){
164                                fileName = "0" + fileName;
165                        }
166                        fileName = dirPath + "/" + "part-r-" + fileName;
167                        if(i==0){
168                                HadoopPostgreSQLPopulator.createAndPopulateTable(schema, tableName, columnNames, columnTypes, fileName);
169                        }else{
170                                HadoopPostgreSQLPopulator.populateTable(schema, tableName, columnNames, columnTypes, fileName);
171                        }
172                }
173        }
174        
175        /**
176         * Populates the given schema.table from the given file.
177         * @param schema
178         * @param tableName
179         * @param columnNames
180         * @param columnTypes
181         * @param filePath
182         * @throws Exception
183         */
184        public static void populateTable(        
185                                                                                String            schema,
186                                                                                String            tableName,
187                                                                                List<String>           columnNames,
188                                                                                List<String>           columnTypes,
189                                                                                String                      filePath      ) throws Exception{
190 
191                RDB db;
192                if(schema == null){
193                        db = RDB.getRDBbyConfig();
194                }else{
195                        db = RDB.getRDBbyConfig(schema);
196                }
197                
198                ArrayList<String> schemas = new ArrayList<String>();
199                for(int i=0;i<columnNames.size();i++){
200                        schemas.add(columnNames.get(i) + " " + columnTypes.get(i));
201                }
202                String sql;
203                
204                Configuration conf = new Configuration();
205                //TOOD: HARD-WIRED ADDRESS IS NTO GOOD
206                conf.set("fs.default.name", FelixConfig.hdfsServer);
207                conf.set("fs.defaultFS", FelixConfig.hdfsServer);
208                
209        FileSystem fileSystem = FileSystem.get(conf);
210 
211        Path path = new Path(filePath);
212        if (!fileSystem.exists(path)) {
213            UIMan.verbose(2, "File " + filePath + " does not exists");
214            return;
215        }
216 
217        FSDataInputStream in = fileSystem.open(path);
218                PGConnection con = (PGConnection)db.getConnection();
219                sql = "COPY " + tableName + "( truth , prior , " + StringMan.commaList(columnNames) + ") " +
220                                " FROM STDIN WITH DELIMITER '\t'";
221                con.getCopyAPI().copyIn(sql, in);
222                
223                in.close();
224        fileSystem.close();
225                
226        db.commit();
227        db.close();
228        
229        
230        }
231        
232        /**
233         * Main test method.
234         * @param args
235         * @throws Exception
236         */
237        public static void main(String[] args) throws Exception{
238        
239                
240                //HadoopPostgreSQLPopulator.populateTable(
241                //                                                null, 
242                //                                                "test", 
243                //                                                Arrays.asList("c1", "c2"), 
244                //                                                Arrays.asList("TEXT", "TEXT"),
245                //                                                "hdfs://d-02.cs.wisc.edu:9000/firstMapReduceOut38/part-00000");
246                
247                
248                HadoopPostgreSQLPopulator.createAndPopulateTable(
249                                        null, 
250                                        "test", 
251                                        Arrays.asList("c1", "c2"), 
252                                        Arrays.asList("TEXT", "TEXT"),
253                                        "hdfs://d-02.cs.wisc.edu:9000/firstMapReduceOut42/part-00000");
254        }
255        
256}

[all classes][felix.io]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov