1 from multiprocessing import Pool
2 import time
3 import sys_appends
4 import sys, getopt
5 import random
6
7 import psycopg2
8 import low_rank_helper
9
10 import mmap
11 import cStringIO
12
13
14 model_simple_base = "model_simple"
15 model_l_r_base = "model_l_r"
16 stats_simple_base = "stats_simple"
17 stats_base = "stats"
18
19
20
22 - def __init__(self, mid, max_rank, nRows, nCols, nSplits, connect_str,table_name):
23 self.mid = mid
24 self.nRows = nRows
25 self.nCols = nCols
26 self.nSplits = nSplits
27 self.conn = psycopg2.connect(connect_str)
28 self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
29 self.psql = self.conn.cursor()
30 self.max_rank = max_rank
31 self.table_name = table_name
32
33 - def take_step(self, seed, epoch, nsplits, roundid, chunkid):
34
35
36 t0 = time.time()
37 print "[take_step:%s,%s] Enter" % (roundid, chunkid)
38 self.psql.execute("SELECT library_setup();")
39 self.psql.execute("SELECT retrieve_sparse_model(%d,%d,%d,%d,%d)" % (self.mid,epoch %2,nsplits,roundid, chunkid))
40 print "[take_step:%s,%s] Retrieved Sparse Model %d for epoch %d in %s" % (chunkid,roundid, self.mid, epoch, time.time() - t0)
41 t1 = time.time()
42
43
44
45
46 t2 = time.time()
47 tname = low_rank_helper.tablename_data(self.mid, epoch % 2, low_rank_helper.get_part(nsplits,roundid, chunkid))
48 exec_str = "SELECT COUNT(*) FROM (SELECT take_step_sparse(row - 1, col - 1, rating) FROM %s) AS t;" % (tname)
49 print "[take_step:%s,%s] %s" % (roundid, chunkid, exec_str)
50
51 self.psql.execute(exec_str)
52 (nCount,) = self.psql.fetchone()
53
54 print "[take_step:%s,%s] Finished Step (%d tuples) in %s secs. Storing" % (roundid,chunkid, nCount, time.time() - t2)
55 t3 = time.time()
56
57 ow = "SELECT update_segment_sparse(%d,%d,%d,%d,%d);" % (self.mid,epoch %2,nsplits,roundid,chunkid)
58 print ow
59 self.psql.execute(ow)
60 print "[take_step:%s,%s] Complete in %f sec (update=%s) for epoch %d" % (roundid,chunkid, time.time() - t0, time.time() - t3,epoch)
61
64
65
66 -def chunk_data(psql, seed, query_str, nsplits, epoch, mid):
67 t0 = time.time()
68 epoch_sign = (epoch + 1) % 2
69 table_name = "%s%d" % ("staging_data", epoch_sign)
70 print "[chunk_data] Chunking the data for epoch %d which has %d sign" % (epoch+1, epoch_sign)
71 exec_str = "SELECT permute_and_slice_data('" + sys_appends.dbname + "', %d, '%s', %d, %d, %d);" % (seed, query_str, nsplits, epoch_sign, mid)
72 print exec_str
73 psql.execute(exec_str)
74 print "[chunk_data] Finished Chunking in %s secs." % (time.time() - t0)
75
77 - def __init__(self, mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, chunkid, roundid, table_name):
78 self.mid = mid
79 self.epoch = epoch
80 self.max_rank = max_rank
81 self.nRows = nRows
82 self.nCols = nCols
83 self.nsplits = nsplits
84 self.connect_str = connect_str
85 self.seed = seed
86 self.chunkid = chunkid
87 self.roundid = roundid
88 self.table_name = table_name
89
90 -def chunk_maker(mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, roundid,table_name):
91 return [chunk_desc(mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, chunkid, roundid, table_name) for chunkid in range(nsplits)]
92
93
109
111 print "[INIT] Installing fresh model instance to mid=%d with rank=%d b=%f stepsize=%f" % (mid, max_rank, b, stepsize)
112 table_query = "SELECT MAX(row), MAX(col), AVG(rating) FROM %s" % table_name
113 psql.execute(table_query)
114 (nRows, nCols, mean,) = psql.fetchone()
115
116 psql.execute("DELETE FROM lr_model_instance WHERE MID=%d;" % (mid))
117 psql.execute("SELECT initialize_model(%d,%d,%d,%f,%f,%f)" % (max_rank, nRows, nCols, b, mean, stepsize))
118 psql.execute("SELECT store_model(%d);" % (mid))
119
120 b = cStringIO.StringIO()
121 psql.copy_expert("COPY (SELECT array_upper(L,1) as nRows, array_upper(L,2) as l_rank, array_upper(R,1) as ncols, array_upper(R,2) as r_rank, moviemean as moviemean, stepsize as stepsize, b as b FROM lr_model_instance WHERE mid={0}) TO STDOUT WITH BINARY".format(mid), b)
122 f = open("{0}{1}.dat".format(model_simple_base, mid), "wb")
123 f.write(b.getvalue())
124 f.close()
125
126 b = cStringIO.StringIO()
127 psql.copy_expert("COPY (select L,R from lr_model_instance where mid={0}) TO STDOUT WITH BINARY".format(mid), b)
128 f = open("{0}{1}.dat".format(model_l_r_base, mid), "wb")
129 f.write(b.getvalue())
130 f.close()
131
132 b = cStringIO.StringIO()
133 psql.copy_expert("COPY (SELECT array_upper(R,1) as max_cols, array_upper(L,1) as max_rows, array_upper(L,2) as max_rank from lr_model_instance where mid = {0}) TO STDOUT WITH BINARY".format(mid), b)
134 f = open("{0}{1}.dat".format(stats_simple_base, mid), "wb")
135 f.write(b.getvalue())
136 f.close()
137
138 b = cStringIO.StringIO()
139 psql.copy_expert("COPY (SELECT array_upper(R,1) as max_cols, array_upper(L,1) as max_rows, array_upper(L,2) as max_rank, moviemean, stepsize, b from lr_model_instance where mid = {0}) TO STDOUT WITH BINARY".format(mid), b)
140 f = open("{0}{1}.dat".format(stats_base, mid), "wb")
141 f.write(b.getvalue())
142 f.close()
143
144
145 print "[INIT] Stored model mid=%d" % mid
146 return (nRows, nCols)
147
149 print "[INIT] Getting stats about mid=%d and table=%s" % (mid, table_name)
150 table_query = "SELECT MAX(row), MAX(col) FROM %s" % table_name
151 psql.execute(table_query)
152 (nrows, ncols) = psql.fetchone()
153 psql.execute("SELECT array_upper(L,2) as l_rank FROM lr_model_instance WHERE mid=%d LIMIT 1" % (mid))
154 (max_rank,) = psql.fetchone()
155 return (nrows, ncols, max_rank)
156
157 -def fresh_execute(mid, nsplits, nepochs, table_name, probe_table, max_rank, b, initial_stepsize, diminish):
158 connect_str = 'dbname=' + sys_appends.dbname
159 conn = psycopg2.connect(connect_str)
160 psql = conn.cursor()
161 conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
162 psql.execute("SELECT library_setup();")
163
164 (nRows, nCols) = install_fresh_database(psql, mid, max_rank, b, initial_stepsize, table_name)
165 if not (probe_table is None):
166 psql.execute("select SQRT(SUM(mse(row - 1, col - 1, rating)))/SQRT(COUNT(*)) from %s" % probe_table)
167 (rmse, ) = psql.fetchone()
168 print "[Initial RMSE] %f" % (rmse)
169
170 qstr = "SELECT row, col, rating FROM %s" % (table_name)
171
172
173
174
175 last_seed = random.randint(1,1000000)
176 chunk_data(psql, last_seed, qstr, nsplits, -1, mid)
177
178
179 pool = Pool(processes=nsplits+1)
180 for epoch in range(nepochs):
181 start_epoch = time.time()
182
183 model_q = "SELECT permute_and_slice_model('" + sys_appends.dbname + "', %d, '%s', %d, %d,%d)" % (last_seed, qstr, nsplits, epoch % 2, mid)
184 print model_q
185 psql.execute(model_q)
186 print "[EPOCH] %d partitioning the model with seed=%d" % (epoch,last_seed)
187
188
189
190 seed = random.randint(1,1000000)
191 last_seed = seed
192 print "[EPOCH] Partitioning the data for epoch %d with seed=%d" % (epoch+1,seed)
193
194
195
196 chunk_result = pool.apply_async(perform_work, (chunk_desc(mid,epoch,max_rank, nRows, nCols, nsplits, connect_str, seed, -1, -1, table_name),))
197 for i in range(nsplits):
198 print pool.map(perform_work, chunk_maker(mid, epoch, max_rank, nRows, nCols, nsplits, connect_str, seed, i, table_name))
199 v = chunk_result.get()
200
201
202
203
204 z = "SELECT sparse_rebuild_model(%d, %d, %d);" % (mid, epoch % 2, nsplits)
205 psql.execute(z)
206 epoch_time = time.time() - start_epoch
207 print "[End Epoch] %s rebuilt for epoch %d" % (epoch_time, epoch %2)
208
209
210 if not (probe_table is None):
211 psql.execute("SELECT retrieve_model_instance_string(%d)" % mid)
212 psql.execute("select SQRT(SUM(mse(row - 1, col - 1, rating)))/SQRT(COUNT(*)) from %s" % probe_table)
213 (rmse, ) = psql.fetchone()
214
215 print "[RMSE] epoch=%d time=%f rmse=%f" % (epoch, epoch_time,rmse)
216
217
218 psql.execute("UPDATE lr_model_instance SET stepsize=stepsize*%f WHERE MID=%d" % (diminish, mid))
219 return
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
252 print "%s: <mid> <table_name>" % (argv[0])
253
255 try:
256 long_opts = ["mid=", "parallel=", "epochs=", "table=", "probe=", "rank=", "stepsize=", "diminish="]
257 opts, args = getopt.getopt(sys.argv[1:], "fm:p:e:t:r:b:s:d:", long_opts)
258 except getopt.GetoptError:
259 print str(err)
260 usage(sys.argv)
261 sys.exit(2)
262 output = None
263 verbose = False
264
265 mid = None
266 nthreads = 1
267 epochs = 10
268 rank = 10
269 table_name = None
270 probe_name = None
271 stepsize = 5e-2
272 diminish = 0.8
273 B = 1.5
274 exec_fresh = True
275
276 for o,a in opts:
277 if o in ("-m", "--mid"):
278 mid = int(a)
279 elif o == "-f":
280 exec_fresh = False
281 elif o in ("--parallel"):
282 nthreads = int(a)
283 elif o in ("-e", "--epochs"):
284 epochs = int(a)
285 elif o in ("-t", "--table"):
286 table_name = a
287 elif o in ("-p", "--probe"):
288 probe_name = a
289 elif o in ("-r", "--rank"):
290 rank = int(a)
291 elif o in ("-b"):
292 B = float(a)
293 elif o in ("-s", "--stepsize"):
294 stepsize = float(a)
295 elif o in ("-d", "--diminish"):
296 diminish = float(a)
297
298 if (mid is None) or (table_name is None):
299 print "mid and table name are required"
300 usage(sys.argv)
301 sys.exit()
302 else:
303 print "Execute on table %s with mid %d using %d processes" % (table_name,mid, nthreads)
304
305 if exec_fresh:
306 fresh_execute(mid, nthreads, epochs, table_name, probe_name, rank, B, stepsize, diminish)
307 else:
308 stored_execute(mid, nthreads, epochs, table_name, probe_name, diminish)
309
310 main()
311