1 """
2 This is the parallel execution for factors
3 """
4
5 from multiprocessing import Pool
6 import sys_appends
7 import time
8 import sys, getopt
9 import random
10
11 import psycopg2
12
14 epoch_sign = epoch % 2
15 linear = roundid*nSplits + chunkid
16 s = "%s%d%d" % ("staging_data", epoch_sign, linear)
17 return s
18
20 connect_str = 'dbname=' + sys_appends.dbname
21 conn = psycopg2.connect(connect_str)
22 psql = conn.cursor()
23 psql.execute("SELECT tablename from pg_tables where tablename LIKE 'staging_data%'")
24 for (tname,) in psql.fetchall():
25 print "DROP TABLE %s;" % (tname)
26
27
28
30 - def __init__(self, mid, max_rank, nRows, nCols, nSplits, connect_str,table_name):
31 self.mid = mid
32 self.nRows = nRows
33 self.nCols = nCols
34 self.nSplits = nSplits
35 self.conn = psycopg2.connect(connect_str)
36 self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
37 self.psql = self.conn.cursor()
38 self.max_rank = max_rank
39 self.table_name = table_name
40
41 - def take_step(self, seed, epoch, roundid, chunkid):
42
43
44 t0 = time.time()
45 print "[take_step:%s,%s] Enter" % (roundid,chunkid)
46 self.psql.execute("SELECT library_setup();")
47 self.psql.execute("SELECT retrieve_model_instance_string(%s)" % (self.mid))
48 print "[take_step:%s,%s] Retrieved Model %d in %s" % (roundid,chunkid, self.mid, time.time() - t0)
49 t1 = time.time()
50
51
52
53
54 t2 = time.time()
55 tname = get_tablename(self.table_name, epoch, self.nSplits, roundid, chunkid)
56 exec_str = "SELECT COUNT(*) FROM (SELECT take_step(row - 1, col - 1, rating) FROM %s) AS t;" % (tname)
57 print "[take_step:%s,%s] %s" % (roundid, chunkid, exec_str)
58
59 self.psql.execute(exec_str)
60 (nCount,) = self.psql.fetchone()
61 print "[take_step:%s,%s] Finished Step (%d tuples) in %s secs. Storing" % (roundid,chunkid, nCount, time.time() - t2)
62
63 t3 = time.time()
64
65 ow = "SELECT overwrite_fragment(%s,%s,%s,%s,%s,%s,%s);" % (self.max_rank, self.nSplits, self.nRows, self.nCols, chunkid, roundid, self.mid)
66 print ow
67 self.psql.execute(ow)
68 print "[take_step:%s,%s] Complete in %f sec (update=%s)" % (roundid,chunkid, time.time() - t0, time.time() - t3)
69
72
73
74 -def chunk_data(psql, query_str, nsplits, nRows,nCols, epoch):
75 t0 = time.time()
76 epoch_sign = (epoch + 1) % 2
77 table_name = "%s%d" % ("staging_data", epoch_sign)
78 print "[chunk_data] Chunking the data for epoch %d" % (epoch+1)
79 exec_str = "SELECT perm_slice('" + sys_appends.dbname + "', '%s', 'staging_data%d', %d, %d, %d);" % (query_str, epoch_sign, nsplits, nRows, nCols)
80 print exec_str
81 psql.execute(exec_str)
82 print "[chunk_data] Finished Chunking in %s secs." % (time.time() - t0)
83
85 - def __init__(self, mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, chunkid, roundid, table_name):
86 self.mid = mid
87 self.epoch = epoch
88 self.max_rank = max_rank
89 self.nRows = nRows
90 self.nCols = nCols
91 self.nsplits = nsplits
92 self.connect_str = connect_str
93 self.seed = seed
94 self.chunkid = chunkid
95 self.roundid = roundid
96 self.table_name = table_name
97
98 -def chunk_maker(mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, roundid,table_name):
99 if roundid == 0:
100 ret = [chunk_desc(mid,epoch,max_rank, nRows, nCols, nsplits, connect_str, seed, -1, -1, table_name)]
101 else:
102 ret = []
103 for chunkid in range(nsplits):
104 ret.append(chunk_desc(mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, chunkid, roundid, table_name))
105 return ret
106
121
123 print "[INIT] Installing fresh model instance to mid=%d with rank=%d b=%f stepsize=%f" % (mid, max_rank, b, stepsize)
124 table_query = "SELECT MAX(row), MAX(col), AVG(rating) FROM %s" % table_name
125 psql.execute(table_query)
126 (nRows, nCols, mean,) = psql.fetchone()
127
128 psql.execute("DELETE FROM lr_model_instance WHERE MID=%d;" % (mid))
129 psql.execute("SELECT initialize_model(%d,%d,%d,%f,%f,%f)" % (max_rank, nRows, nCols, b, mean, stepsize))
130 psql.execute("SELECT store_model(%d);" % (mid))
131 print "[INIT] Stored model mid=%d" % mid
132 return (nRows, nCols)
133
135 print "[INIT] Getting stats about mid=%d and table=%s" % (mid, table_name)
136 table_query = "SELECT MAX(row), MAX(col) FROM %s" % table_name
137 psql.execute(table_query)
138 (nrows, ncols) = psql.fetchone()
139 psql.execute("SELECT array_upper(L,2) as l_rank FROM lr_model_instance WHERE mid=%d LIMIT 1" % (mid))
140 (max_rank,) = psql.fetchone()
141 return (nrows, ncols, max_rank)
142
143 -def fresh_execute(mid, nsplits, nepochs, table_name, probe_table, max_rank, b, initial_stepsize, diminish):
144 connect_str = 'dbname=' + sys_appends.dbname
145 conn = psycopg2.connect(connect_str)
146 psql = conn.cursor()
147 conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
148 psql.execute("SELECT library_setup();")
149
150 (nRows, nCols) = install_fresh_database(psql, mid, max_rank, b, initial_stepsize, table_name)
151 if not (probe_table is None):
152 psql.execute("select SQRT(SUM(mse(row - 1, col - 1, rating)))/SQRT(COUNT(*)) from %s" % probe_table)
153 (rmse, ) = psql.fetchone()
154 print "[Initial RMSE] %f" % (rmse)
155
156 qstr = "SELECT row, col, rating FROM %s" % (table_name)
157 chunk_data(psql, qstr, nsplits, nRows, nCols, -1)
158 pool = Pool(processes=nsplits+1)
159 for epoch in range(nepochs):
160 start_epoch = time.time()
161 seed = random.randint(1,1000000)
162 print "[EPOCH] %d using seed=%d" % (epoch,seed)
163 for i in range(nsplits):
164 print pool.map(perform_work, chunk_maker(mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, i, table_name))
165 epoch_time = time.time() - start_epoch
166 print "[End Epoch] %s" % (epoch_time)
167
168 psql.execute("SELECT retrieve_model_instance_string(%d)" % mid)
169 if not (probe_table is None):
170 psql.execute("select SQRT(SUM(mse(row - 1, col - 1, rating)))/SQRT(COUNT(*)) from %s" % probe_table)
171 (rmse, ) = psql.fetchone()
172 print "[RMSE] epoch=%d time=%f rmse=%f" % (epoch, epoch_time,rmse)
173 psql.execute("UPDATE lr_model_instance SET stepsize=stepsize*%f WHERE MID=%d" % (diminish, mid))
174 return
175
176
177
178 -def stored_execute(mid, nsplits, nepochs, table_name, probe_table, diminish):
179 connect_str = 'dbname=' + sys_appends.dbname
180 conn = psycopg2.connect(connect_str)
181 conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
182 psql = conn.cursor()
183 psql.execute("SELECT library_setup();")
184
185 (nRows, nCols, max_rank) = get_database_stats(psql, mid, table_name)
186 qstr = "SELECT row, col, rating FROM %s" % (table_name)
187 chunk_data(psql, qstr, nsplits, nRows, nCols, -1)
188
189 pool = Pool(processes=nsplits)
190 for epoch in range(nepochs):
191 start_epoch = time.time()
192 seed = random.randint(1,1000000)
193 print "[EPOCH] using seed=%d" % (seed)
194 for i in range(nsplits):
195 print pool.map(perform_work, chunk_maker(mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, i, table_name))
196 print "[End Epoch] %s" % (time.time() - start_epoch)
197 psql.execute("SELECT retrieve_model_instance_string(%d)" % mid)
198 if not (probe_table is None):
199 psql.execute("select SQRT(SUM(mse(row - 1, col - 1, rating)))/SQRT(COUNT(*)) from %s" % probe_table)
200 (rmse, ) = psql.fetchone()
201 print "[RMSE] %f" % (rmse)
202 psql.execute("UPDATE lr_model_instance SET stepsize=stepsize*%f WHERE MID=%d" % (diminish, mid))
203
204 psql.close()
205
207 print "%s: <mid> <table_name>" % (argv[0])
208
210 try:
211 long_opts = ["mid=", "parallel=", "epochs=", "table=", "probe=", "rank=", "stepsize=", "diminish="]
212 opts, args = getopt.getopt(sys.argv[1:], "fm:p:e:t:r:b:s:d:", long_opts)
213 except getopt.GetoptError:
214 print str(err)
215 usage(sys.argv)
216 sys.exit(2)
217 output = None
218 verbose = False
219
220 mid = None
221 nthreads = 1
222 epochs = 10
223 rank = 10
224 table_name = None
225 probe_name = None
226 stepsize = 5e-2
227 diminish = 0.8
228 B = 1.5
229 exec_fresh = True
230
231 for o,a in opts:
232 if o in ("-m", "--mid"):
233 mid = int(a)
234 elif o == "-f":
235 exec_fresh = False
236 elif o in ("--parallel"):
237 nthreads = int(a)
238 elif o in ("-e", "--epochs"):
239 epochs = int(a)
240 elif o in ("-t", "--table"):
241 table_name = a
242 elif o in ("-p", "--probe"):
243 probe_name = a
244 elif o in ("-r", "--rank"):
245 rank = int(a)
246 elif o in ("-b"):
247 B = float(a)
248 elif o in ("-s", "--stepsize"):
249 stepsize = float(a)
250 elif o in ("-d", "--diminish"):
251 diminish = float(a)
252
253 if (mid is None) or (table_name is None):
254 print "mid and table name are required"
255 usage(sys.argv)
256 sys.exit()
257 else:
258 print "Execute on table %s with mid %d using %d processes" % (table_name,mid, nthreads)
259
260 if exec_fresh:
261 fresh_execute(mid, nthreads, epochs, table_name, probe_name, rank, B, stepsize, diminish)
262 else:
263 stored_execute(mid, nthreads, epochs, table_name, probe_name, diminish)
264
265 clean_tables()
266 main()
267