Package vsql_core :: Package executors :: Package FACTOR_EXECUTE :: Module victor_factor_execution_sparse
[hide private]
[frames] | no frames]

Source Code for Module vsql_core.executors.FACTOR_EXECUTE.victor_factor_execution_sparse

  1  from multiprocessing import Pool 
  2  import time 
  3  import sys_appends 
  4  import sys, getopt 
  5  import random 
  6       
  7  import psycopg2 
  8  import low_rank_helper 
  9   
 10  import mmap 
 11  import cStringIO 
 12   
 13  #TODO: Put these to somewhere common 
 14  model_simple_base = "model_simple" 
 15  model_l_r_base = "model_l_r" 
 16  stats_simple_base = "stats_simple" 
 17  stats_base = "stats" 
 18   
 19  ## This is the parallel execution for 
 20  ## for factors. 
21 -class WorkerThread:
22 - def __init__(self, mid, max_rank, nRows, nCols, nSplits, connect_str,table_name):
23 self.mid = mid 24 self.nRows = nRows 25 self.nCols = nCols 26 self.nSplits = nSplits 27 self.conn = psycopg2.connect(connect_str) 28 self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 29 self.psql = self.conn.cursor() 30 self.max_rank = max_rank 31 self.table_name = table_name
32
33 - def take_step(self, seed, epoch, nsplits, roundid, chunkid):
34 # step 1. Retrieve model and setup permtuations 35 #execute(self.retrieve_model_plan, [mid]) 36 t0 = time.time() 37 print "[take_step:%s,%s] Enter" % (roundid, chunkid) 38 self.psql.execute("SELECT library_setup();") 39 self.psql.execute("SELECT retrieve_sparse_model(%d,%d,%d,%d,%d)" % (self.mid,epoch %2,nsplits,roundid, chunkid)) 40 print "[take_step:%s,%s] Retrieved Sparse Model %d for epoch %d in %s" % (chunkid,roundid, self.mid, epoch, time.time() - t0) 41 t1 = time.time() 42 43 #self.psql.execute("SELECT create_permutations(%s,%s,%s)" % (self.nRows, self.nCols, seed)) 44 #print "[take_step:%s,%s] Permutations Created. Executing Gradient." % (roundid,chunkid) 45 46 t2 = time.time() 47 tname = low_rank_helper.tablename_data(self.mid, epoch % 2, low_rank_helper.get_part(nsplits,roundid, chunkid)) 48 exec_str = "SELECT COUNT(*) FROM (SELECT take_step_sparse(row - 1, col - 1, rating) FROM %s) AS t;" % (tname) 49 print "[take_step:%s,%s] %s" % (roundid, chunkid, exec_str) 50 # Step 2. execute the step 51 self.psql.execute(exec_str) 52 (nCount,) = self.psql.fetchone() 53 54 print "[take_step:%s,%s] Finished Step (%d tuples) in %s secs. Storing" % (roundid,chunkid, nCount, time.time() - t2) 55 t3 = time.time() 56 # Step 3. transactionally execute the update 57 ow = "SELECT update_segment_sparse(%d,%d,%d,%d,%d);" % (self.mid,epoch %2,nsplits,roundid,chunkid) 58 print ow 59 self.psql.execute(ow) 60 print "[take_step:%s,%s] Complete in %f sec (update=%s) for epoch %d" % (roundid,chunkid, time.time() - t0, time.time() - t3,epoch)
61
62 - def close(self):
63 self.psql.close()
64 65
66 -def chunk_data(psql, seed, query_str, nsplits, epoch, mid):
67 t0 = time.time() 68 epoch_sign = (epoch + 1) % 2 69 table_name = "%s%d" % ("staging_data", epoch_sign) 70 print "[chunk_data] Chunking the data for epoch %d which has %d sign" % (epoch+1, epoch_sign) 71 exec_str = "SELECT permute_and_slice_data('" + sys_appends.dbname + "', %d, '%s', %d, %d, %d);" % (seed, query_str, nsplits, epoch_sign, mid) 72 print exec_str 73 psql.execute(exec_str) 74 print "[chunk_data] Finished Chunking in %s secs." % (time.time() - t0)
75
76 -class chunk_desc:
77 - def __init__(self, mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, chunkid, roundid, table_name):
78 self.mid = mid 79 self.epoch = epoch 80 self.max_rank = max_rank 81 self.nRows = nRows 82 self.nCols = nCols 83 self.nsplits = nsplits 84 self.connect_str = connect_str 85 self.seed = seed 86 self.chunkid = chunkid 87 self.roundid = roundid 88 self.table_name = table_name
89
90 -def chunk_maker(mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, roundid,table_name):
91 return [chunk_desc(mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, chunkid, roundid, table_name) for chunkid in range(nsplits)]
92 93
94 -def perform_work(cd):
95 if cd.roundid < 0: 96 conn = psycopg2.connect(cd.connect_str) 97 conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 98 psql = conn.cursor() 99 psql.execute("SELECT library_setup();") 100 qstr = "SELECT row, col, rating FROM %s" % cd.table_name 101 chunk_data(psql, cd.seed, qstr, cd.nsplits, cd.epoch, cd.mid) 102 psql.close() 103 conn.close() 104 else: 105 wt = WorkerThread(cd.mid, cd.max_rank, cd.nRows, cd.nCols, cd.nsplits, cd.connect_str, cd.table_name) 106 wt.take_step(cd.seed, cd.epoch, cd.nsplits, cd.roundid, cd.chunkid) 107 wt.close() 108 return (cd.roundid,cd.chunkid)
109
110 -def install_fresh_database(psql, mid, max_rank, b, stepsize, table_name):
111 print "[INIT] Installing fresh model instance to mid=%d with rank=%d b=%f stepsize=%f" % (mid, max_rank, b, stepsize) 112 table_query = "SELECT MAX(row), MAX(col), AVG(rating) FROM %s" % table_name 113 psql.execute(table_query) 114 (nRows, nCols, mean,) = psql.fetchone() 115 116 psql.execute("DELETE FROM lr_model_instance WHERE MID=%d;" % (mid)) 117 psql.execute("SELECT initialize_model(%d,%d,%d,%f,%f,%f)" % (max_rank, nRows, nCols, b, mean, stepsize)) 118 psql.execute("SELECT store_model(%d);" % (mid)) 119 ###New: For mmap 120 b = cStringIO.StringIO() 121 psql.copy_expert("COPY (SELECT array_upper(L,1) as nRows, array_upper(L,2) as l_rank, array_upper(R,1) as ncols, array_upper(R,2) as r_rank, moviemean as moviemean, stepsize as stepsize, b as b FROM lr_model_instance WHERE mid={0}) TO STDOUT WITH BINARY".format(mid), b) 122 f = open("{0}{1}.dat".format(model_simple_base, mid), "wb") 123 f.write(b.getvalue()) 124 f.close() 125 126 b = cStringIO.StringIO() 127 psql.copy_expert("COPY (select L,R from lr_model_instance where mid={0}) TO STDOUT WITH BINARY".format(mid), b) 128 f = open("{0}{1}.dat".format(model_l_r_base, mid), "wb") 129 f.write(b.getvalue()) 130 f.close() 131 132 b = cStringIO.StringIO() 133 psql.copy_expert("COPY (SELECT array_upper(R,1) as max_cols, array_upper(L,1) as max_rows, array_upper(L,2) as max_rank from lr_model_instance where mid = {0}) TO STDOUT WITH BINARY".format(mid), b) 134 f = open("{0}{1}.dat".format(stats_simple_base, mid), "wb") 135 f.write(b.getvalue()) 136 f.close() 137 138 b = cStringIO.StringIO() 139 psql.copy_expert("COPY (SELECT array_upper(R,1) as max_cols, array_upper(L,1) as max_rows, array_upper(L,2) as max_rank, moviemean, stepsize, b from lr_model_instance where mid = {0}) TO STDOUT WITH BINARY".format(mid), b) 140 f = open("{0}{1}.dat".format(stats_base, mid), "wb") 141 f.write(b.getvalue()) 142 f.close() 143 ####################################### 144 145 print "[INIT] Stored model mid=%d" % mid 146 return (nRows, nCols)
147
148 -def get_database_stats(psql, mid, table_name):
149 print "[INIT] Getting stats about mid=%d and table=%s" % (mid, table_name) 150 table_query = "SELECT MAX(row), MAX(col) FROM %s" % table_name 151 psql.execute(table_query) 152 (nrows, ncols) = psql.fetchone() 153 psql.execute("SELECT array_upper(L,2) as l_rank FROM lr_model_instance WHERE mid=%d LIMIT 1" % (mid)) 154 (max_rank,) = psql.fetchone() 155 return (nrows, ncols, max_rank)
156
157 -def fresh_execute(mid, nsplits, nepochs, table_name, probe_table, max_rank, b, initial_stepsize, diminish):
158 connect_str = 'dbname=' + sys_appends.dbname 159 conn = psycopg2.connect(connect_str) 160 psql = conn.cursor() 161 conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 162 psql.execute("SELECT library_setup();") 163 164 (nRows, nCols) = install_fresh_database(psql, mid, max_rank, b, initial_stepsize, table_name) 165 if not (probe_table is None): 166 psql.execute("select SQRT(SUM(mse(row - 1, col - 1, rating)))/SQRT(COUNT(*)) from %s" % probe_table) 167 (rmse, ) = psql.fetchone() 168 print "[Initial RMSE] %f" % (rmse) 169 170 qstr = "SELECT row, col, rating FROM %s" % (table_name) 171 # We save the last seed 172 # The way it works is in epoch -1 here, we setup the data for epoch 0 173 # then in epoch 0 we need to partition the model according to the same 174 # seed that was used in epoch -1. To do that, we save the seed 175 last_seed = random.randint(1,1000000) 176 chunk_data(psql, last_seed, qstr, nsplits, -1, mid) 177 178 # Fire up the process pool 179 pool = Pool(processes=nsplits+1) 180 for epoch in range(nepochs): 181 start_epoch = time.time() 182 # Permute and slice up the model for this epoch 183 model_q = "SELECT permute_and_slice_model('" + sys_appends.dbname + "', %d, '%s', %d, %d,%d)" % (last_seed, qstr, nsplits, epoch % 2, mid) 184 print model_q 185 psql.execute(model_q) 186 print "[EPOCH] %d partitioning the model with seed=%d" % (epoch,last_seed) 187 188 # generate a new seed. This is actually used for the next epoch. 189 # we save it now in last seed, since we have already created the model 190 seed = random.randint(1,1000000) 191 last_seed = seed 192 print "[EPOCH] Partitioning the data for epoch %d with seed=%d" % (epoch+1,seed) 193 194 # kick off all the worker threads for this epoch 195 # TODO: Separately kick off the chunk thread instead of doing it through the map. 196 chunk_result = pool.apply_async(perform_work, (chunk_desc(mid,epoch,max_rank, nRows, nCols, nsplits, connect_str, seed, -1, -1, table_name),)) 197 for i in range(nsplits): 198 print pool.map(perform_work, chunk_maker(mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, i, table_name)) 199 v = chunk_result.get() 200 #print ("[Chunk] gives %s" % (v)) 201 202 # The model is now scattered in (potentially) many tables 203 # reconstruct the sparse model and store it back in the model instance table 204 z = "SELECT sparse_rebuild_model(%d, %d, %d);" % (mid, epoch % 2, nsplits) 205 psql.execute(z) 206 epoch_time = time.time() - start_epoch 207 print "[End Epoch] %s rebuilt for epoch %d" % (epoch_time, epoch %2) 208 209 # If we are probing then reload the model and compute the rmse 210 if not (probe_table is None): 211 psql.execute("SELECT retrieve_model_instance_string(%d)" % mid) 212 psql.execute("select SQRT(SUM(mse(row - 1, col - 1, rating)))/SQRT(COUNT(*)) from %s" % probe_table) 213 (rmse, ) = psql.fetchone() 214 # For convience of experiments we print the time of the epoch -- not the time of computing the RMSE 215 print "[RMSE] epoch=%d time=%f rmse=%f" % (epoch, epoch_time,rmse) 216 217 # End of the epoch book keeeping 218 psql.execute("UPDATE lr_model_instance SET stepsize=stepsize*%f WHERE MID=%d" % (diminish, mid)) 219 return
220 221 222 223 # def stored_execute(mid, nsplits, nepochs, table_name, probe_table, diminish): 224 # connect_str = 'dbname=' + sys_appends.dbname 225 # conn = psycopg2.connect(connect_str) 226 # conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 227 # psql = conn.cursor() 228 # psql.execute("SELECT library_setup();") 229 230 # (nRows, nCols, max_rank) = get_database_stats(psql, mid, table_name) 231 # qstr = "SELECT row, col, rating FROM %s" % (table_name) 232 # chunk_data(psql, qstr, nsplits,epoch, mid) 233 234 # pool = Pool(processes=nsplits) 235 # for epoch in range(nepochs): 236 # start_epoch = time.time() 237 # seed = random.randint(1,1000000) 238 # print "[EPOCH] using seed=%d" % (seed) 239 # for i in range(nsplits): 240 # print pool.map(perform_work, chunk_maker(mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, i, table_name)) 241 # print "[End Epoch] %s" % (time.time() - start_epoch) 242 # psql.execute("SELECT retrieve_model_instance_string(%d)" % mid) 243 # if not (probe_table is None): 244 # psql.execute("select SQRT(SUM(mse(row - 1, col - 1, rating)))/SQRT(COUNT(*)) from %s" % probe_table) 245 # (rmse, ) = psql.fetchone() 246 # print "[RMSE] %f" % (rmse) 247 # psql.execute("UPDATE lr_model_instance SET stepsize=stepsize*%f WHERE MID=%d" % (diminish, mid)) 248 249 # psql.close() 250
251 -def usage(argv):
252 print "%s: <mid> <table_name>" % (argv[0])
253
254 -def main():
255 try: 256 long_opts = ["mid=", "parallel=", "epochs=", "table=", "probe=", "rank=", "stepsize=", "diminish="] 257 opts, args = getopt.getopt(sys.argv[1:], "fm:p:e:t:r:b:s:d:", long_opts) 258 except getopt.GetoptError: 259 print str(err) 260 usage(sys.argv) 261 sys.exit(2) 262 output = None 263 verbose = False 264 265 mid = None 266 nthreads = 1 267 epochs = 10 268 rank = 10 269 table_name = None 270 probe_name = None 271 stepsize = 5e-2 272 diminish = 0.8 273 B = 1.5 274 exec_fresh = True 275 276 for o,a in opts: 277 if o in ("-m", "--mid"): 278 mid = int(a) 279 elif o == "-f": 280 exec_fresh = False 281 elif o in ("--parallel"): 282 nthreads = int(a) 283 elif o in ("-e", "--epochs"): 284 epochs = int(a) 285 elif o in ("-t", "--table"): 286 table_name = a 287 elif o in ("-p", "--probe"): 288 probe_name = a 289 elif o in ("-r", "--rank"): 290 rank = int(a) 291 elif o in ("-b"): 292 B = float(a) 293 elif o in ("-s", "--stepsize"): 294 stepsize = float(a) 295 elif o in ("-d", "--diminish"): 296 diminish = float(a) 297 298 if (mid is None) or (table_name is None): 299 print "mid and table name are required" 300 usage(sys.argv) 301 sys.exit() 302 else: 303 print "Execute on table %s with mid %d using %d processes" % (table_name,mid, nthreads) 304 305 if exec_fresh: 306 fresh_execute(mid, nthreads, epochs, table_name, probe_name, rank, B, stepsize, diminish) 307 else: 308 stored_execute(mid, nthreads, epochs, table_name, probe_name, diminish)
309 310 main() 311