00001 #include <float.h>
00002
00011 template<class T>
00012 Hybrid_Storage_Manager<T>::
00013 Hybrid_Storage_Manager(Hazy_Database *_db_conn, ondisk_storage_spec *spec, Ondisk_Storage_Manager<T> *disk, int nBuffer, bool epsused) : _disk(disk), _epsused(epsused) {
00014 unique_id_for_ps = spec->unique_id;
00015 _conn = _db_conn;
00016 _buffer_size = nBuffer;
00017 int half_buf = _buffer_size / 2;
00018 eps_high = 0; eps_low = 0;
00019
00020 _force_monotone = true;
00021 assert(spec->strategy == hazy_model::EAGER_HAZY || spec->strategy == hazy_model::LAZY_HAZY);
00022 _lazy = spec->strategy == hazy_model::LAZY_HAZY;
00023 std::ostringstream ostr;
00024 ostr << "PREPARE buffer_query" << unique_id_for_ps << " AS (SELECT id,feature_vector,eps FROM " << spec->intermediate_table_name << " WHERE eps > 0 ORDER BY eps LIMIT " << half_buf << " ) " << std::endl;
00025 ostr << " UNION ALL " << std::endl;
00026 ostr << " (SELECT id, feature_vector,eps FROM " << spec->intermediate_table_name << " WHERE eps <= 0 ORDER BY eps DESC LIMIT " << half_buf << ")" << std::endl;
00027 ostr << " ORDER BY eps; " << std::endl;
00028
00029 query_buffer_fill << "EXECUTE buffer_query" << unique_id_for_ps << ";";
00030 ostr << "PREPARE eps_map_query" << unique_id_for_ps << " AS SELECT id,eps FROM " << spec->intermediate_table_name << ";";
00031 query_eps_exec << "EXECUTE eps_map_query" << unique_id_for_ps << ";";
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 ostr << "PREPARE search_disk_region_one_lazy" << unique_id_for_ps << "(float8, float8) AS SELECT COUNT(*) FROM " << spec->intermediate_table_name << " WHERE eps BETWEEN $1 AND $2 AND dotprdct_cached_waste(feature_vector) > 0;" << std::endl;
00042 ostr << "PREPARE search_disk_region_one_eager" << unique_id_for_ps << "(float8, float8) AS SELECT COUNT(*) FROM " << spec->external_table_name << " WHERE eps BETWEEN $1 AND $2 AND class = 1;" << std::endl;
00043
00044
00045 ostr << "PREPARE search_disk_region_three_lazy" << unique_id_for_ps << "(float8,float8) AS SELECT (SELECT COUNT(*) FROM " << spec->intermediate_table_name << " WHERE $1 < eps AND eps <= $2 AND dotprdct_cached_waste(feature_vector) > 0) + " << std::endl;
00046 ostr << " (SELECT COUNT(*) FROM " << spec->intermediate_table_name << " WHERE eps > $2);" << std::endl;
00047 ostr << "PREPARE search_disk_region_three_eager" << unique_id_for_ps << "(float8,float8) AS SELECT (SELECT COUNT(*) FROM " << spec->external_table_name << " WHERE $1 < eps AND eps <= $2 AND class = 1) + " << std::endl;
00048 ostr << " (SELECT COUNT(*) FROM " << spec->external_table_name << " WHERE eps > $2)" << std::endl;
00049
00050
00051
00052
00053 DEBUG_ONLY(std::cout << "[HYBRID PREPARED QUERIES] " << std::endl << ostr.str() << std::endl << "[END HYBRID PREPARED QUERIES]" << std::endl; );
00054 int retVal = _conn->execute_statement_msg(ostr.str().c_str(), "[Hybrid_Storage_Manager::model_prepared_statements:__FILE__: __LINE__");
00055 checkQueryReturnValue(retVal, ostr.str());
00056
00057
00058 LOGGING_ONLY(log_se_eps_hit = 0; log_se_eps_miss = 0;);
00059 LOGGING_ONLY(log_se_buffer_hit = 0; log_se_buffer_miss = 0;);
00060 LOGGING_ONLY(log_iu_buffer_hit = 0; log_iu_buffer_miss = 0;);
00061 }
00062
00067 template<class T>
00068 void
00069 Hybrid_Storage_Manager<T>::
00070 resort(struct hazy_model &m) {
00071 _disk->resort(m);
00072 LOGGING_ONLY(std::cout << "[RESORT]" << std::endl;);
00073
00074 _entity_map.clear(); _buffer.clear();
00075 typedef typename Hybrid_Storage_Manager<T>::_full_entity_entry t_fee;
00076 _conn->postgresTupleParser<t_fee>(query_buffer_fill.str().c_str(), t_fee::parse_tuple, _buffer);
00077
00078
00079
00080
00081
00082
00083
00084 eps_high = 0.0; eps_low = 0.0;
00085 int n = _buffer.size();
00086 LOGGING_ONLY(std::cout << "[RESORT] buffer size: " << n << std::endl;);
00087 eps_high = (n > 0) ? _buffer[0].eps : 0.0;
00088 eps_low = (n > 1) ? _buffer[n-1].eps : eps_high;
00089
00090 VERBOSE_ONLY(std::cout << "[RESORT] entities" << std::endl;);
00091 for(int i = 0; i < n; i++) {
00092 const struct _full_entity_entry &e = _buffer[i];
00093 _entity_map[e.k] = i;
00094 VERBOSE_ONLY(std::cout << " " << e.k << "@" << i << " eps=" << e.eps << std::endl;);
00095 }
00096 VERBOSE_ONLY(std::cout << std::endl;);
00097
00098 _full_entity_entry l;
00099 l.eps = 0;
00100
00101
00102 low_water_it = std::lower_bound( _buffer.begin(), _buffer.end(), l,
00103 Hybrid_Storage_Manager<T>::_full_entity_entry::_full_entity_compare);
00104 high_water_it = std::upper_bound( _buffer.begin(), _buffer.end(), l,
00105 Hybrid_Storage_Manager<T>::_full_entity_entry::_full_entity_compare);
00106
00107
00108 if(_epsused) {
00109 _conn->postgresMapParser<key, double>(query_eps_exec.str().c_str(), Hybrid_Storage_Manager::parse_eps_map, _epsmap);
00110 }
00111 LOGGING_ONLY(double space_used_in_hybrid = 0; calculateSpaceUsed(space_used_in_hybrid););
00112 LOGGING_ONLY(std::cout << "[Hybrid::resort] storage used: " << space_used_in_hybrid << std::endl;);
00113 LOGGING_ONLY(double entity_map_used = 0; calculateSpaceUsedByEntityMap(entity_map_used););
00114 LOGGING_ONLY(double eps_map_used = 0; calculateSpaceUsedByEpsMap(eps_map_used););
00115 LOGGING_ONLY(double buffer_used = 0; calculateSpaceUsedByBuffer(buffer_used););
00116 LOGGING_ONLY(std::cout << "[Hybrid::resort] entity_map_used: " << entity_map_used << std::endl;);
00117 LOGGING_ONLY(std::cout << "[Hybrid::resort] eps_map_used: " << eps_map_used << std::endl;);
00118 LOGGING_ONLY(std::cout << "[Hybrid::resort] buffer_map_used: " << buffer_used << std::endl;);
00119 }
00120
00121
00127 template<class T>
00128 void
00129 Hybrid_Storage_Manager<T>::
00130 incrementalUpdate(struct hazy_model &m, double &waste_time) {
00131 m.invalidate_db_model();
00132
00133
00134 if(isLazy()) {
00135 LOGGING_ONLY(std::cout << "[Hybrid::IU] LazyUpdate" << std::endl;);
00136 return;
00137 }
00138 LOGGING_ONLY(std::cout << "[Hybrid::IU] Buffer size: " << _buffer.size() << " model=" << m.low_water << "," << m.high_water << std::endl;);
00139
00140
00141
00142 Timer waste_timer(true);
00143 int nTotalTuples = 0, nWasteTuples = 0;
00144
00145
00146
00147
00148
00149
00150 _tbuffer_it new_low, new_high;
00151 for(new_low = low_water_it;
00152 new_low != _buffer.begin() && new_low->eps >= m.low_water; new_low--) {
00153 double val = IncrementalSGD<T>::classifyExample(m._model, new_low->x);
00154 VERBOSE_ONLY(std::cout << "\t update: " << new_low->k << " eps=" << new_low->eps << " to " << val << std::endl;);
00155 new_low->label = val > 0;
00156 nTotalTuples++; nWasteTuples += val > 0 ? 0 : 1;
00157 }
00158
00159
00160
00161 if(new_low == _buffer.begin() && new_low != _buffer.end() && new_low->eps >= m.low_water) {
00162 double val = IncrementalSGD<T>::classifyExample(m._model, new_low->x);
00163 VERBOSE_ONLY(std::cout << "\t update: " << new_low->k << " eps=" << new_low->eps << " to " << val << " *" <<std::endl;);
00164 new_low->label = val > 0;
00165 nTotalTuples++; nWasteTuples += val > 0 ? 0 : 1;
00166 }
00167
00168
00169 for(new_high = low_water_it;
00170 new_high != _buffer.end() && new_high->eps <= m.high_water; new_high++) {
00171 double val = IncrementalSGD<T>::classifyExample(m._model, new_high->x);
00172 VERBOSE_ONLY(std::cout << "\t update: " << new_high->k << " eps=" << new_high->eps << " to " << val <<std::endl;);
00173 new_high->label = val > 0;
00174 nTotalTuples++; nWasteTuples += val > 0 ? 0 : 1;
00175 }
00176
00177
00178 if(_force_monotone && new_high != _buffer.end()) {
00179 LOGGING_ONLY(int nMonotoneWaste = 0;);
00180 while(new_high != high_water_it && new_high->eps <= high_water_it->eps) { nTotalTuples++; new_high->label = true; new_high++; LOGGING_ONLY(nMonotoneWaste++;); }
00181 LOGGING_ONLY(std::cout << "[Hybrid] monotone fixes " << nMonotoneWaste << std::endl;);
00182 }
00183
00184 double total_time = waste_timer.stop();
00185
00186
00187 if( (new_low == _buffer.begin() || new_high == _buffer.end()) || (_buffer.size() == 0)) {
00188
00189
00190 LOGGING_ONLY(log_iu_buffer_miss++;);
00191 LOGGING_ONLY(std::cout << "[Hybrid::IU] miss: " << log_iu_buffer_miss << std::endl;);
00192
00193
00194
00195
00196 _disk->incrementalUpdate(m,waste_time);
00197 LOGGING_ONLY(std::cout << "[Hybrid::IU] \t waste=" << waste_time << std::endl;);
00198
00199 } else {
00200
00201 LOGGING_ONLY(log_iu_buffer_hit++;);
00202
00203 LOGGING_ONLY(std::cout << "[Hybrid::IU] hit: " << log_iu_buffer_hit << std::endl;);
00204 LOGGING_ONLY(std::cout << "\t [lw,hw]=[" << m.low_water << "," << m.high_water << "] iterators=[" << new_low->eps << "," << new_high->eps << "]" << std::endl;);
00205 }
00206
00207
00208 waste_time += ((nTotalTuples == 0) ? 0 : total_time * (double) nWasteTuples / (double) nTotalTuples);
00209 low_water_it = new_low; high_water_it = new_high;
00210 }
00211
00218 template<class T>
00219 void
00220 Hybrid_Storage_Manager<T>::
00221 insertEntity(struct hazy_model &m, key e, T f) {
00222
00223
00224 _disk->insertEntity(m, e, f);
00225 }
00226
00233 template<class T>
00234 void
00235 Hybrid_Storage_Manager<T>::
00236 getEntityClass(key e, sClass &c, struct hazy_model &m) {
00237
00238
00239
00240 if(_epsused) {
00241 _tepsmap_iterator it = _epsmap.find(e);
00242
00243 if(it != _epsmap.end()) {
00244 double eps = it->second;
00245 DEBUG_ONLY(std::cout << "[Hybrid::SE] e=" << e << " eps=" << eps << std::endl;);
00246 if(m.low_water > eps) {
00247 c = 0;
00248 LOGGING_ONLY(log_se_eps_hit++;);
00249 LOGGING_ONLY(std::cout << "[Hybrid::SE] pos: " << log_se_eps_hit << std::endl;);
00250 return; }
00251
00252 if(m.high_water < eps) {
00253 c = 1;
00254 LOGGING_ONLY(log_se_eps_hit++;);
00255 LOGGING_ONLY(std::cout << "[Hybrid::SE] neg: " << log_se_eps_hit << std::endl;);
00256 return;
00257 }
00258
00259 } else {
00260
00261
00262 }
00263 }
00264
00265 LOGGING_ONLY(log_se_eps_miss++;);
00266 LOGGING_ONLY(std::cout << "[Hybrid::SE] eps miss: " << log_se_eps_miss << std::endl;);
00267
00268
00269 _tentitymap::iterator i = _entity_map.find(e);
00270 if(i == _entity_map.end()) {
00271 LOGGING_ONLY(log_se_buffer_miss++;);
00272 LOGGING_ONLY(std::cout << "[Hybrid::SE] buffer miss: " << log_se_buffer_miss << std::endl;);
00273
00274
00275 _disk->getEntityClass(e, c, m);
00276 return;
00277
00278 } else {
00279
00280 LOGGING_ONLY(log_se_buffer_hit++;);
00281 _full_entity_entry &fee = _buffer[i->second];
00282
00283 if(isEager()) {
00284
00285
00286
00287 DEBUG_ONLY(assert(m.low_water <= 0 && m.high_water >= 0););
00288 LOGGING_ONLY(std::cout << "[Hybrid::SE] buffer_hit " << log_se_buffer_hit << " for " << i->second << " fee.k " << fee.k << " fee.eps " << fee.eps << " label" << fee.label << std::endl;);
00289 c = fee.label;
00290 } else {
00291
00292
00293
00294 if(! _epsused) {
00295 if(fee.eps > m.high_water) {
00296 LOGGING_ONLY(log_se_eps_hit++;);
00297 LOGGING_ONLY(std::cout << "[Hybrid::SE] hit: " << log_se_buffer_hit << std::endl;);
00298 c = 1; return;
00299 }
00300 if(fee.eps < m.low_water) {
00301 LOGGING_ONLY(log_se_eps_hit++;);
00302 LOGGING_ONLY(std::cout << "[Hybrid::SE] hit: " << log_se_buffer_hit << std::endl;);
00303 c = 0;
00304 return;
00305 }
00306 }
00307 c = (IncrementalSGD<T>::classifyExample(m._model, fee.x) > 0);
00308
00309 }
00310 }
00311 }
00312
00313
00321 template<class T>
00322 void
00323 Hybrid_Storage_Manager<T>::
00324 getNumInClass(sClass c, int &nClass, struct hazy_model &m, double &waste_time) {
00325 LOGGING_ONLY(double space_used_in_hybrid = 0; calculateSpaceUsed(space_used_in_hybrid););
00326 LOGGING_ONLY(std::cout << "[Hybrid::GetNum] storage used: " << space_used_in_hybrid << std::endl;);
00327 LOGGING_ONLY(double entity_map_used = 0; calculateSpaceUsedByEntityMap(entity_map_used););
00328 LOGGING_ONLY(double eps_map_used = 0; calculateSpaceUsedByEpsMap(eps_map_used););
00329 LOGGING_ONLY(double buffer_used = 0; calculateSpaceUsedByBuffer(buffer_used););
00330 LOGGING_ONLY(std::cout << "[Hybrid::GetNum] entity_map_used: " << entity_map_used << std::endl;);
00331 LOGGING_ONLY(std::cout << "[Hybrid::GetNum] eps_map_used: " << eps_map_used << std::endl;);
00332 LOGGING_ONLY(std::cout << "[Hybrid::GetNum] buffer_map_used: " << buffer_used << std::endl;);
00333
00334
00335
00336
00337
00338
00339
00340 int region_one_count = 0, region_three_count =0;
00341 double region_one_waste = 0.0, region_three_waste = 0.0;
00342
00343 Timer _lazy_waste(true);
00344 _tbuffer_it it = low_water_it;
00345 bool model_is_cached = false;
00346 LOGGING_ONLY(std::cout << "[Hybrid::GetNum]" << std::endl);
00347
00348
00349 if(low_water_it == _buffer.begin() ) {
00350 VERBOSE_ONLY(std::cout << "[Hybrid::GetNum] Searching Region (I)" << std::endl;);
00351 _full_entity_entry l; l.eps = m.low_water;
00352 it = upper_bound( low_water_it, _buffer.end(), l, Hybrid_Storage_Manager<T>::_full_entity_entry::_full_entity_compare);
00353 std::ostringstream ostr;
00354 ostr.precision(16);
00355
00356 double top_of_region_one = (low_water_it == _buffer.end()) ? 0.0 : low_water_it->eps;
00357 if (top_of_region_one < 0.0) { top_of_region_one -= FLT_EPSILON;}
00358 if(isEager()) {
00359 ostr << "EXECUTE search_disk_region_one_eager" << unique_id_for_ps << "(" << m.low_water << ", " << top_of_region_one << ");";
00360 } else {
00361 std::string model_string, bias_string;
00362 ostr << "EXECUTE model_initialization" << unique_id_for_ps << ";";
00363 if(!m.test_and_set_model_in_db()) {
00364 model_to_dbstring(m._model, model_string, bias_string);
00365 ostr << "EXECUTE model_caching" << unique_id_for_ps << "(" << model_string << ", " << bias_string << ");";
00366 }
00367 ostr << "EXECUTE search_disk_region_one_lazy" << unique_id_for_ps << "(" << m.low_water << ", " << top_of_region_one << ");";
00368 model_is_cached = true;
00369 }
00370 VERBOSE_ONLY(std::cout << "[Hybrid::GetNum] Region (I): " << ostr.str() << std::endl;);
00371 int retVal = _conn->execute_query_msg_int(ostr.str().c_str(), "[Hybrid::GetRegion]", region_one_count);
00372 checkQueryReturnValue(retVal, ostr.str());
00373 LOGGING_ONLY(std::cout << "[Hybrid::GetNum] region_one_count=" << region_one_count << std::endl);
00374 if(isLazy()) {
00375
00376 std::ostringstream waste_str;
00377 waste_str << "EXECUTE retrieve_ratio" << unique_id_for_ps << ";";
00378 int retVal = _conn->execute_query_msg_double(waste_str.str().c_str(), "[Hybrid::GetWaste]", region_one_waste);
00379 checkQueryReturnValue(retVal, waste_str.str());
00380 }
00381 } else {
00382 LOGGING_ONLY(std::cout << "[Hybrid::GetNum] Avoiding Region (I)" << std::endl;);
00383 }
00384
00385
00386
00387
00388 _full_entity_entry h; h.eps = m.high_water;
00389
00390
00391 int n = _buffer.size();
00392 int max_count_index = n-1;
00393 while(max_count_index >= 0 && _buffer[max_count_index].eps == _buffer[n-1].eps) { max_count_index--; }
00394 double bottom_of_region_three = (max_count_index >= 0) ? _buffer[max_count_index].eps : 0.0;
00395 if (bottom_of_region_three > 0.0) { bottom_of_region_three += FLT_EPSILON; }
00396
00397 std::ostringstream ostr;
00398 ostr.precision(16);
00399 double top_of_region_three = std::max(m.high_water, bottom_of_region_three);
00400 if(isEager()) {
00401 ostr << "EXECUTE search_disk_region_three_eager" << unique_id_for_ps << "(" << bottom_of_region_three << ", " << top_of_region_three << ");";
00402 } else {
00403 if(!model_is_cached) {
00404 std::string model_string, bias_string;
00405 ostr << "EXECUTE model_initialization" << unique_id_for_ps << ";";
00406 if(!m.test_and_set_model_in_db()) {
00407 model_to_dbstring(m._model, model_string, bias_string);
00408 ostr << "EXECUTE model_caching" << unique_id_for_ps << "(" << model_string << ", " << bias_string << ");";
00409 }
00410 }
00411 ostr << "EXECUTE search_disk_region_three_lazy" << unique_id_for_ps << "(" << bottom_of_region_three << "," << top_of_region_three << ");";
00412 }
00413 VERBOSE_ONLY(std::cout << "Region III query: " << ostr.str() << std::endl;);
00414 int retVal = _conn->execute_query_msg_int(ostr.str().c_str(), "[Hybrid::GetRegion]", region_three_count);
00415 checkQueryReturnValue(retVal, ostr.str());
00416 LOGGING_ONLY(std::cout << "[Hybrid::GetNum] region_three_count=" << region_three_count << std::endl);
00417
00418
00419 if(isLazy()) {
00420
00421 std::ostringstream waste_str;
00422 waste_str << "EXECUTE retrieve_ratio" << unique_id_for_ps << ";";
00423 int retVal = _conn->execute_query_msg_double(waste_str.str().c_str(), "[Hybrid::GetWaste]", region_three_waste);
00424 checkQueryReturnValue(retVal, waste_str.str());
00425 }
00426
00427 LOGGING_ONLY(std::cout << "[Hybrid::GetNum] Search Region (II)" << std::endl;);
00428
00429 nClass = region_one_count + region_three_count;
00430 waste_time = region_one_waste + region_three_waste;
00431
00432
00433 if(isEager()) {
00434 for(; it != _buffer.end() && it->eps <= bottom_of_region_three; it++) {
00435 nClass += it->label ? 1 : 0;
00436 }
00437 waste_time = 0.0;
00438 } else {
00439 int nTotalTuples = _buffer.size();
00440 int region_two_count = 0;
00441 for(; it != _buffer.end() && it->eps <= bottom_of_region_three; it++) {
00442 region_two_count += (IncrementalSGD<T>::classifyExample(m._model, it->x) > 0) ? 1 : 0;
00443 }
00444 nClass += region_two_count;
00445 waste_time += ((nTotalTuples == 0) ? 0 : _lazy_waste.stop() * (double) (nTotalTuples - region_two_count) / (double) nTotalTuples);
00446 }
00447
00448 }
00449
00454 template <class T>
00455 void
00456 Hybrid_Storage_Manager<T>::calculateSpaceUsedByEpsMap(double &spaceUsed) {
00457 spaceUsed = _epsmap.size() * (sizeof(key) + sizeof(double));
00458 }
00459
00464 template <class T>
00465 void
00466 Hybrid_Storage_Manager<T>::calculateSpaceUsedByEntityMap(double &spaceUsed) {
00467 spaceUsed = _entity_map.size() * (sizeof(key) + sizeof(int));
00468 }
00469
00474 template <class T>
00475 void
00476 Hybrid_Storage_Manager<T>::calculateSpaceUsedByBuffer(double &spaceUsed) {
00477 spaceUsed = 0;
00478 for(unsigned int i = 0; i < _buffer.size(); i ++) {
00479 double record_at_i_size = sizeof(key) + sizeof(double) + sizeof(bool);
00480 record_at_i_size += _buffer[i].x.getSpaceUsed();
00481 spaceUsed += record_at_i_size;
00482 }
00483 }
00484
00485
00490 template <class T>
00491 void
00492 Hybrid_Storage_Manager<T>::calculateSpaceUsed(double &spaceUsed) {
00493 spaceUsed = 0;
00494 double spaceUsedByStructure = 0;
00495 calculateSpaceUsedByEpsMap(spaceUsedByStructure);
00496 spaceUsed += spaceUsedByStructure;
00497 calculateSpaceUsedByEntityMap(spaceUsedByStructure);
00498 spaceUsed += spaceUsedByStructure;
00499 calculateSpaceUsedByBuffer(spaceUsedByStructure);
00500 spaceUsed += spaceUsedByStructure;
00501 }