Package vsql_core :: Package executors :: Package FACTOR_EXECUTE :: Module victor_factor_execution
[hide private]
[frames] | no frames]

Source Code for Module vsql_core.executors.FACTOR_EXECUTE.victor_factor_execution

  1  """ 
  2  This class is the parallel execution for factors 
  3  """ 
  4   
  5  from multiprocessing import Pool 
  6  import sys_appends 
  7  import time 
  8  import sys, getopt 
  9  import random 
 10   
 11  ## This is the parallel execution for factors 
 12  ## This has the convoy problem 
 13  ## It requires serious effort to fix the convyo problem 
 14   
 15  import psycopg2 
 16   
17 -class WorkerThread:
18 - def __init__(self, mid, max_rank, nRows, nCols, nSplits, connect_str,table_name):
19 """ 20 Initialize method 21 """ 22 self.mid = mid 23 self.nRows = nRows 24 self.nCols = nCols 25 self.nSplits = nSplits 26 self.conn = psycopg2.connect(connect_str) 27 self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 28 self.psql = self.conn.cursor() 29 self.max_rank = max_rank 30 self.table_name = table_name
31
32 - def take_step(self, seed, roundid, chunkid):
33 # step 1. Retrieve model and setup permtuations 34 #execute(self.retrieve_model_plan, [mid]) 35 t0 = time.time() 36 print "[take_step:%s,%s] Enter" % (roundid,chunkid) 37 self.psql.execute("SELECT library_setup();") 38 self.psql.execute("SELECT retrieve_model_instance_string(%s)" % (self.mid)) 39 print "[take_step:%s,%s] Retrieved Model in %s" % (roundid,chunkid, time.time() - t0) 40 t1 = time.time() 41 42 self.psql.execute("SELECT create_permutations(%s,%s,%s)" % (self.nRows, self.nCols, seed)) 43 print "[take_step:%s,%s] Permutations Created. Executing Gradient." % (roundid,chunkid) 44 45 t2 = time.time() 46 exec_str = "SELECT COUNT(*) FROM (SELECT take_step(row - 1, col - 1, rating) FROM " + self.table_name 47 exec_str = exec_str + " WHERE round(%s,%s,%s,row-1,col-1) = %s AND chunk(%s,%s,%s,row-1,col-1) = %s) as t;" 48 exec_str = exec_str % (self.nSplits, self.nRows, self.nCols, roundid, self.nSplits, self.nRows, self.nCols, chunkid) 49 50 # Step 2. execute the step 51 self.psql.execute(exec_str) 52 (nCount,) = self.psql.fetchone() 53 print "[take_step:%s,%s] Finished Step (%d tuples) in %s secs. Storing" % (roundid,chunkid, nCount, time.time() - t2) 54 55 t3 = time.time() 56 # Step 3. transactionally execute the update 57 #self.psql.execute("SELECT update_fragment(%s,%s,%s,%s,%s,%s,%s);" % (self.max_rank, self.nSplits, self.nRows, self.nCols, chunkid, roundid, self.mid)) 58 self.psql.execute("SELECT overwrite_fragment(%s,%s,%s,%s,%s,%s,%s);" % (self.max_rank, self.nSplits, self.nRows, self.nCols, chunkid, roundid, self.mid)) 59 print "[take_step:%s,%s] Complete in %f sec (update=%s)" % (roundid,chunkid, time.time() - t0, time.time() - t3)
60
61 - def close(self):
62 self.psql.close()
63
64 -class chunk_desc:
65 - def __init__(self, mid, max_rank, nRows, nCols, nsplits, connect_str, seed, chunkid, roundid, table_name):
66 self.mid = mid 67 self.max_rank = max_rank 68 self.nRows = nRows 69 self.nCols = nCols 70 self.nsplits = nsplits 71 self.connect_str = connect_str 72 self.seed = seed 73 self.chunkid = chunkid 74 self.roundid = roundid 75 self.table_name = table_name
76
77 -def chunk_maker(mid, max_rank, nRows, nCols, nsplits, connect_str, seed, roundid,table_name):
78 ret = [] 79 for chunkid in range(nsplits): 80 ret.append(chunk_desc(mid, max_rank, nRows, nCols, nsplits, connect_str, seed, chunkid, roundid, table_name)) 81 return ret
82
83 -def perform_work(cd):
84 wt = WorkerThread(cd.mid, cd.max_rank, cd.nRows, cd.nCols, cd.nsplits, cd.connect_str, cd.table_name) 85 wt.take_step(cd.seed, cd.roundid, cd.chunkid) 86 wt.close() 87 return (cd.roundid,cd.chunkid)
88
89 -def install_fresh_database(psql, mid, max_rank, b, stepsize, table_name):
90 print "[INIT] Installing fresh model instance to mid=%d with rank=%d b=%f stepsize=%f" % (mid, max_rank, b, stepsize) 91 table_query = "SELECT MAX(row), MAX(col), AVG(rating) FROM %s" % table_name 92 psql.execute(table_query) 93 (nRows, nCols, mean,) = psql.fetchone() 94 95 psql.execute("DELETE FROM lr_model_instance WHERE MID=%d;" % (mid)) 96 psql.execute("SELECT initialize_model(%d,%d,%d,%f,%f,%f)" % (max_rank, nRows, nCols, b, mean, stepsize)) 97 psql.execute("SELECT store_model(%d);" % (mid)) 98 print "[INIT] Stored model mid=%d" % mid 99 return (nRows, nCols)
100
101 -def get_database_stats(psql, mid, table_name):
102 print "[INIT] Getting stats about mid=%d and table=%s" % (mid, table_name) 103 table_query = "SELECT MAX(row), MAX(col) FROM %s" % table_name 104 psql.execute(table_query) 105 (nrows, ncols) = psql.fetchone() 106 psql.execute("SELECT array_upper(L,2) as l_rank FROM lr_model_instance WHERE mid=%d LIMIT 1" % (mid)) 107 (max_rank,) = psql.fetchone() 108 return (nrows, ncols, max_rank)
109
110 -def fresh_execute(mid, nsplits, nepochs, table_name, probe_table, max_rank, b, initial_stepsize, diminish):
111 connect_str = 'dbname=' + sys_appends.dbname 112 conn = psycopg2.connect(connect_str) 113 psql = conn.cursor() 114 conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 115 psql.execute("SELECT library_setup();") 116 117 (nRows, nCols) = install_fresh_database(psql, mid, max_rank, b, initial_stepsize, table_name) 118 119 pool = Pool(processes=nsplits) 120 for epoch in range(nepochs): 121 start_epoch = time.time() 122 seed = random.randint(1,1000000) 123 print "[EPOCH] using seed=%d" % (seed) 124 for i in range(nsplits): 125 print pool.map(perform_work, chunk_maker(mid, max_rank, nRows, nCols, nsplits, connect_str, seed, i, table_name)) 126 print "[End Epoch] %s" % (time.time() - start_epoch) 127 128 psql.execute("SELECT retrieve_model_instance_string(%d)" % mid) 129 if not (probe_table is None): 130 psql.execute("select SQRT(SUM(mse(row - 1, col - 1, rating)))/SQRT(COUNT(*)) from %s" % probe_table) 131 (rmse, ) = psql.fetchone() 132 print "[RMSE] %f" % (rmse) 133 psql.execute("UPDATE lr_model_instance SET stepsize=stepsize*%f WHERE MID=%d" % (diminish, mid)) 134 return
135
136 -def stored_execute(mid, nsplits, nepochs, table_name, probe_table, diminish):
137 connect_str = 'dbname=' + sys_appends.dbname 138 conn = psycopg2.connect(connect_str) 139 conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 140 psql = conn.cursor() 141 psql.execute("SELECT library_setup();") 142 143 (nRows, nCols, max_rank) = get_database_stats(psql, mid, table_name) 144 145 pool = Pool(processes=nsplits) 146 for epoch in range(nepochs): 147 start_epoch = time.time() 148 seed = random.randint(1,1000000) 149 print "[EPOCH] using seed=%d" % (seed) 150 for i in range(nsplits): 151 print pool.map(perform_work, chunk_maker(mid, max_rank, nRows, nCols, nsplits, connect_str, seed, i, table_name)) 152 print "[End Epoch] %s" % (time.time() - start_epoch) 153 psql.execute("SELECT retrieve_model_instance_string(%d)" % mid) 154 if not (probe_table is None): 155 psql.execute("select SQRT(SUM(mse(row - 1, col - 1, rating)))/SQRT(COUNT(*)) from %s" % probe_table) 156 (rmse, ) = psql.fetchone() 157 print "[RMSE] %f" % (rmse) 158 psql.execute("UPDATE lr_model_instance SET stepsize=stepsize*%f WHERE MID=%d" % (diminish, mid)) 159 160 psql.close()
161
162 -def usage(argv):
163 print "%s: <mid> <table_name>" % (argv[0])
164
165 -def main():
166 try: 167 long_opts = ["mid=", "parallel=", "epochs=", "table=", "probe=", "rank=", "stepsize=", "diminish="] 168 opts, args = getopt.getopt(sys.argv[1:], "fm:p:e:t:r:b:s:d:", long_opts) 169 except getopt.GetoptError: 170 print str(err) 171 usage(sys.argv) 172 sys.exit(2) 173 output = None 174 verbose = False 175 176 mid = None 177 nthreads = 1 178 epochs = 10 179 rank = 10 180 table_name = None 181 probe_name = None 182 stepsize = 1e-2 183 diminish = 0.8 184 B = 1.5 185 exec_fresh = True 186 187 for o,a in opts: 188 if o in ("-m", "--mid"): 189 mid = int(a) 190 elif o == "-f": 191 exec_fresh = False 192 elif o in ("--parallel"): 193 nthreads = int(a) 194 elif o in ("-e", "--epochs"): 195 epochs = int(a) 196 elif o in ("-t", "--table"): 197 table_name = a 198 elif o in ("-p", "--probe"): 199 probe_name = a 200 elif o in ("-r", "--rank"): 201 rank = int(a) 202 elif o in ("-b"): 203 B = float(a) 204 elif o in ("-s", "--stepsize"): 205 stepsize = float(a) 206 elif o in ("-d", "--diminish"): 207 diminish = float(a) 208 209 if (mid is None) or (table_name is None): 210 print "mid and table name are required" 211 usage(sys.argv) 212 sys.exit() 213 else: 214 print "Execute on table %s with mid %d using %d processes" % (table_name,mid, nthreads) 215 216 if exec_fresh: 217 fresh_execute(mid, nthreads, epochs, table_name, probe_name, rank, B, stepsize, diminish) 218 else: 219 stored_execute(mid, nthreads, epochs, table_name, probe_name, diminish)
220 221 222 main() 223