Package vsql_core :: Package executors :: Package FACTOR_EXECUTE :: Module victor_factor_execution_chunk
[hide private]
[frames] | no frames]

Source Code for Module vsql_core.executors.FACTOR_EXECUTE.victor_factor_execution_chunk

  1  """ 
  2  This is the parallel execution for factors 
  3  """ 
  4   
  5  from multiprocessing import Pool 
  6  import sys_appends 
  7  import time 
  8  import sys, getopt 
  9  import random 
 10   
 11  import psycopg2 
 12   
13 -def get_tablename(tablename, epoch, nSplits, roundid, chunkid):
14 epoch_sign = epoch % 2 15 linear = roundid*nSplits + chunkid 16 s = "%s%d%d" % ("staging_data", epoch_sign, linear) 17 return s
18
19 -def clean_tables():
20 connect_str = 'dbname=' + sys_appends.dbname 21 conn = psycopg2.connect(connect_str) 22 psql = conn.cursor() 23 psql.execute("SELECT tablename from pg_tables where tablename LIKE 'staging_data%'") 24 for (tname,) in psql.fetchall(): 25 print "DROP TABLE %s;" % (tname)
26 27 ## This is the parallel execution for 28 ## for factors.
29 -class WorkerThread:
30 - def __init__(self, mid, max_rank, nRows, nCols, nSplits, connect_str,table_name):
31 self.mid = mid 32 self.nRows = nRows 33 self.nCols = nCols 34 self.nSplits = nSplits 35 self.conn = psycopg2.connect(connect_str) 36 self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 37 self.psql = self.conn.cursor() 38 self.max_rank = max_rank 39 self.table_name = table_name
40
41 - def take_step(self, seed, epoch, roundid, chunkid):
42 # step 1. Retrieve model and setup permtuations 43 #execute(self.retrieve_model_plan, [mid]) 44 t0 = time.time() 45 print "[take_step:%s,%s] Enter" % (roundid,chunkid) 46 self.psql.execute("SELECT library_setup();") 47 self.psql.execute("SELECT retrieve_model_instance_string(%s)" % (self.mid)) 48 print "[take_step:%s,%s] Retrieved Model %d in %s" % (roundid,chunkid, self.mid, time.time() - t0) 49 t1 = time.time() 50 51 #self.psql.execute("SELECT create_permutations(%s,%s,%s)" % (self.nRows, self.nCols, seed)) 52 #print "[take_step:%s,%s] Permutations Created. Executing Gradient." % (roundid,chunkid) 53 54 t2 = time.time() 55 tname = get_tablename(self.table_name, epoch, self.nSplits, roundid, chunkid) 56 exec_str = "SELECT COUNT(*) FROM (SELECT take_step(row - 1, col - 1, rating) FROM %s) AS t;" % (tname) 57 print "[take_step:%s,%s] %s" % (roundid, chunkid, exec_str) 58 # Step 2. execute the step 59 self.psql.execute(exec_str) 60 (nCount,) = self.psql.fetchone() 61 print "[take_step:%s,%s] Finished Step (%d tuples) in %s secs. Storing" % (roundid,chunkid, nCount, time.time() - t2) 62 63 t3 = time.time() 64 # Step 3. transactionally execute the update 65 ow = "SELECT overwrite_fragment(%s,%s,%s,%s,%s,%s,%s);" % (self.max_rank, self.nSplits, self.nRows, self.nCols, chunkid, roundid, self.mid) 66 print ow 67 self.psql.execute(ow) 68 print "[take_step:%s,%s] Complete in %f sec (update=%s)" % (roundid,chunkid, time.time() - t0, time.time() - t3)
69
70 - def close(self):
71 self.psql.close()
72 73
74 -def chunk_data(psql, query_str, nsplits, nRows,nCols, epoch):
75 t0 = time.time() 76 epoch_sign = (epoch + 1) % 2 77 table_name = "%s%d" % ("staging_data", epoch_sign) 78 print "[chunk_data] Chunking the data for epoch %d" % (epoch+1) 79 exec_str = "SELECT perm_slice('" + sys_appends.dbname + "', '%s', 'staging_data%d', %d, %d, %d);" % (query_str, epoch_sign, nsplits, nRows, nCols) 80 print exec_str 81 psql.execute(exec_str) 82 print "[chunk_data] Finished Chunking in %s secs." % (time.time() - t0)
83
84 -class chunk_desc:
85 - def __init__(self, mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, chunkid, roundid, table_name):
86 self.mid = mid 87 self.epoch = epoch 88 self.max_rank = max_rank 89 self.nRows = nRows 90 self.nCols = nCols 91 self.nsplits = nsplits 92 self.connect_str = connect_str 93 self.seed = seed 94 self.chunkid = chunkid 95 self.roundid = roundid 96 self.table_name = table_name
97
98 -def chunk_maker(mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, roundid,table_name):
99 if roundid == 0: 100 ret = [chunk_desc(mid,epoch,max_rank, nRows, nCols, nsplits, connect_str, seed, -1, -1, table_name)] 101 else: 102 ret = [] 103 for chunkid in range(nsplits): 104 ret.append(chunk_desc(mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, chunkid, roundid, table_name)) 105 return ret
106
107 -def perform_work(cd):
108 if cd.roundid < 0: 109 conn = psycopg2.connect(cd.connect_str) 110 conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 111 psql = conn.cursor() 112 qstr = "SELECT row, col, rating FROM %s" % cd.table_name 113 chunk_data(psql, qstr, cd.nsplits, cd.nRows, cd.nCols, cd.epoch) 114 psql.close() 115 conn.close() 116 else: 117 wt = WorkerThread(cd.mid, cd.max_rank, cd.nRows, cd.nCols, cd.nsplits, cd.connect_str, cd.table_name) 118 wt.take_step(cd.seed, cd.epoch, cd.roundid, cd.chunkid) 119 wt.close() 120 return (cd.roundid,cd.chunkid)
121
122 -def install_fresh_database(psql, mid, max_rank, b, stepsize, table_name):
123 print "[INIT] Installing fresh model instance to mid=%d with rank=%d b=%f stepsize=%f" % (mid, max_rank, b, stepsize) 124 table_query = "SELECT MAX(row), MAX(col), AVG(rating) FROM %s" % table_name 125 psql.execute(table_query) 126 (nRows, nCols, mean,) = psql.fetchone() 127 128 psql.execute("DELETE FROM lr_model_instance WHERE MID=%d;" % (mid)) 129 psql.execute("SELECT initialize_model(%d,%d,%d,%f,%f,%f)" % (max_rank, nRows, nCols, b, mean, stepsize)) 130 psql.execute("SELECT store_model(%d);" % (mid)) 131 print "[INIT] Stored model mid=%d" % mid 132 return (nRows, nCols)
133
134 -def get_database_stats(psql, mid, table_name):
135 print "[INIT] Getting stats about mid=%d and table=%s" % (mid, table_name) 136 table_query = "SELECT MAX(row), MAX(col) FROM %s" % table_name 137 psql.execute(table_query) 138 (nrows, ncols) = psql.fetchone() 139 psql.execute("SELECT array_upper(L,2) as l_rank FROM lr_model_instance WHERE mid=%d LIMIT 1" % (mid)) 140 (max_rank,) = psql.fetchone() 141 return (nrows, ncols, max_rank)
142
143 -def fresh_execute(mid, nsplits, nepochs, table_name, probe_table, max_rank, b, initial_stepsize, diminish):
144 connect_str = 'dbname=' + sys_appends.dbname 145 conn = psycopg2.connect(connect_str) 146 psql = conn.cursor() 147 conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 148 psql.execute("SELECT library_setup();") 149 150 (nRows, nCols) = install_fresh_database(psql, mid, max_rank, b, initial_stepsize, table_name) 151 if not (probe_table is None): 152 psql.execute("select SQRT(SUM(mse(row - 1, col - 1, rating)))/SQRT(COUNT(*)) from %s" % probe_table) 153 (rmse, ) = psql.fetchone() 154 print "[Initial RMSE] %f" % (rmse) 155 156 qstr = "SELECT row, col, rating FROM %s" % (table_name) 157 chunk_data(psql, qstr, nsplits, nRows, nCols, -1) 158 pool = Pool(processes=nsplits+1) 159 for epoch in range(nepochs): 160 start_epoch = time.time() 161 seed = random.randint(1,1000000) 162 print "[EPOCH] %d using seed=%d" % (epoch,seed) 163 for i in range(nsplits): 164 print pool.map(perform_work, chunk_maker(mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, i, table_name)) 165 epoch_time = time.time() - start_epoch 166 print "[End Epoch] %s" % (epoch_time) 167 168 psql.execute("SELECT retrieve_model_instance_string(%d)" % mid) 169 if not (probe_table is None): 170 psql.execute("select SQRT(SUM(mse(row - 1, col - 1, rating)))/SQRT(COUNT(*)) from %s" % probe_table) 171 (rmse, ) = psql.fetchone() 172 print "[RMSE] epoch=%d time=%f rmse=%f" % (epoch, epoch_time,rmse) 173 psql.execute("UPDATE lr_model_instance SET stepsize=stepsize*%f WHERE MID=%d" % (diminish, mid)) 174 return
175 176 177
178 -def stored_execute(mid, nsplits, nepochs, table_name, probe_table, diminish):
179 connect_str = 'dbname=' + sys_appends.dbname 180 conn = psycopg2.connect(connect_str) 181 conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 182 psql = conn.cursor() 183 psql.execute("SELECT library_setup();") 184 185 (nRows, nCols, max_rank) = get_database_stats(psql, mid, table_name) 186 qstr = "SELECT row, col, rating FROM %s" % (table_name) 187 chunk_data(psql, qstr, nsplits, nRows, nCols, -1) 188 189 pool = Pool(processes=nsplits) 190 for epoch in range(nepochs): 191 start_epoch = time.time() 192 seed = random.randint(1,1000000) 193 print "[EPOCH] using seed=%d" % (seed) 194 for i in range(nsplits): 195 print pool.map(perform_work, chunk_maker(mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, i, table_name)) 196 print "[End Epoch] %s" % (time.time() - start_epoch) 197 psql.execute("SELECT retrieve_model_instance_string(%d)" % mid) 198 if not (probe_table is None): 199 psql.execute("select SQRT(SUM(mse(row - 1, col - 1, rating)))/SQRT(COUNT(*)) from %s" % probe_table) 200 (rmse, ) = psql.fetchone() 201 print "[RMSE] %f" % (rmse) 202 psql.execute("UPDATE lr_model_instance SET stepsize=stepsize*%f WHERE MID=%d" % (diminish, mid)) 203 204 psql.close()
205
206 -def usage(argv):
207 print "%s: <mid> <table_name>" % (argv[0])
208
209 -def main():
210 try: 211 long_opts = ["mid=", "parallel=", "epochs=", "table=", "probe=", "rank=", "stepsize=", "diminish="] 212 opts, args = getopt.getopt(sys.argv[1:], "fm:p:e:t:r:b:s:d:", long_opts) 213 except getopt.GetoptError: 214 print str(err) 215 usage(sys.argv) 216 sys.exit(2) 217 output = None 218 verbose = False 219 220 mid = None 221 nthreads = 1 222 epochs = 10 223 rank = 10 224 table_name = None 225 probe_name = None 226 stepsize = 5e-2 227 diminish = 0.8 228 B = 1.5 229 exec_fresh = True 230 231 for o,a in opts: 232 if o in ("-m", "--mid"): 233 mid = int(a) 234 elif o == "-f": 235 exec_fresh = False 236 elif o in ("--parallel"): 237 nthreads = int(a) 238 elif o in ("-e", "--epochs"): 239 epochs = int(a) 240 elif o in ("-t", "--table"): 241 table_name = a 242 elif o in ("-p", "--probe"): 243 probe_name = a 244 elif o in ("-r", "--rank"): 245 rank = int(a) 246 elif o in ("-b"): 247 B = float(a) 248 elif o in ("-s", "--stepsize"): 249 stepsize = float(a) 250 elif o in ("-d", "--diminish"): 251 diminish = float(a) 252 253 if (mid is None) or (table_name is None): 254 print "mid and table name are required" 255 usage(sys.argv) 256 sys.exit() 257 else: 258 print "Execute on table %s with mid %d using %d processes" % (table_name,mid, nthreads) 259 260 if exec_fresh: 261 fresh_execute(mid, nthreads, epochs, table_name, probe_name, rank, B, stepsize, diminish) 262 else: 263 stored_execute(mid, nthreads, epochs, table_name, probe_name, diminish)
264 265 clean_tables() 266 main() 267