Stxxl
1.2.1
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/stream/sort_stream.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2002-2005 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * Copyright (C) 2006 Johannes Singler <singler@ira.uka.de> 00008 * 00009 * Distributed under the Boost Software License, Version 1.0. 00010 * (See accompanying file LICENSE_1_0.txt or copy at 00011 * http://www.boost.org/LICENSE_1_0.txt) 00012 **************************************************************************/ 00013 00014 #ifndef STXXL_SORT_STREAM_HEADER 00015 #define STXXL_SORT_STREAM_HEADER 00016 00017 #ifdef STXXL_BOOST_CONFIG 00018 #include <boost/config.hpp> 00019 #endif 00020 00021 #include <stxxl/bits/stream/stream.h> 00022 #include <stxxl/sort> 00023 00024 00025 __STXXL_BEGIN_NAMESPACE 00026 00027 namespace stream 00028 { 00031 00032 template <class ValueType, class TriggerEntryType> 00033 struct sorted_runs 00034 { 00035 typedef TriggerEntryType trigger_entry_type; 00036 typedef ValueType value_type; 00037 typedef typename trigger_entry_type::bid_type bid_type; 00038 typedef stxxl::int64 size_type; 00039 typedef std::vector<trigger_entry_type> run_type; 00040 typedef typed_block<bid_type::size, value_type> block_type; 00041 size_type elements; 00042 std::vector<run_type> runs; 00043 std::vector<unsigned_type> runs_sizes; 00044 00045 // Optimization: 00046 // if the input is small such that its total size is 00047 // less than B (block_type::size) 00048 // then input is sorted internally 00049 // and kept in the array "small" 00050 std::vector<ValueType> small_; 00051 00052 sorted_runs() : elements(0) { } 00053 00058 void deallocate_blocks() 00059 { 00060 block_manager * bm = block_manager::get_instance(); 00061 for (unsigned_type i = 0; i < runs.size(); ++i) 00062 bm->delete_blocks( 00063 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i].begin()), 00064 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i].end())); 00065 00066 00067 runs.clear(); 00068 } 00069 }; 00070 00078 template < 00079 class Input_, 00080 class Cmp_, 00081 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type), 00082 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY> 00083 class runs_creator : private noncopyable 00084 { 00085 Input_ & input; 00086 Cmp_ cmp; 00087 00088 public: 00089 typedef Cmp_ cmp_type; 00090 typedef typename Input_::value_type value_type; 00091 typedef BID<BlockSize_> bid_type; 00092 typedef typed_block<BlockSize_, value_type> block_type; 00093 typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type; 00094 typedef sorted_runs<value_type, trigger_entry_type> sorted_runs_type; 00095 00096 private: 00097 typedef typename sorted_runs_type::run_type run_type; 00098 sorted_runs_type result_; // stores the result (sorted runs) 00099 unsigned_type m_; // memory for internal use in blocks 00100 bool result_computed; // true result is already computed (used in 'result' method) 00101 00102 void compute_result(); 00103 void sort_run(block_type * run, unsigned_type elements) 00104 { 00105 if (block_type::has_filler) 00106 std::sort( 00107 #if 1 00108 ArrayOfSequencesIterator< 00109 block_type, typename block_type::value_type, block_type::size 00110 >(run, 0), 00111 ArrayOfSequencesIterator< 00112 block_type, typename block_type::value_type, block_type::size 00113 >(run, elements), 00114 #else 00115 TwoToOneDimArrayRowAdaptor< 00116 block_type, value_type, block_type::size 00117 >(run, 0), 00118 TwoToOneDimArrayRowAdaptor< 00119 block_type, value_type, block_type::size 00120 >(run, elements), 00121 #endif 00122 cmp); 00123 00124 else 00125 std::sort(run[0].elem, run[0].elem + elements, cmp); 00126 } 00127 00128 public: 00133 runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) : 00134 input(i), cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false) 00135 { 00136 assert(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use); 00137 } 00138 00142 const sorted_runs_type & result() 00143 { 00144 if (!result_computed) 00145 { 00146 compute_result(); 00147 result_computed = true; 00148 #ifdef STXXL_PRINT_STAT_AFTER_RF 00149 STXXL_MSG(*stats::get_instance()); 00150 #endif 00151 } 00152 return result_; 00153 } 00154 }; 00155 00156 00157 template <class Input_, class Cmp_, unsigned BlockSize_, class AllocStr_> 00158 void runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>::compute_result() 00159 { 00160 unsigned_type i = 0; 00161 unsigned_type m2 = m_ / 2; 00162 const unsigned_type el_in_run = m2 * block_type::size; // # el in a run 00163 STXXL_VERBOSE1("runs_creator::compute_result m2=" << m2); 00164 unsigned_type pos = 0; 00165 00166 #ifndef STXXL_SMALL_INPUT_PSORT_OPT 00167 block_type * Blocks1 = new block_type[m2 * 2]; 00168 #else 00169 #if 0 00170 block_type * Blocks1 = new block_type[1]; // allocate only one block first 00171 // if needed reallocate 00172 while (!input.empty() && pos != block_type::size) 00173 { 00174 Blocks1[pos / block_type::size][pos % block_type::size] = *input; 00175 ++input; 00176 ++pos; 00177 } 00178 #endif 00179 00180 while (!input.empty() && pos != block_type::size) 00181 { 00182 result_.small_.push_back(*input); 00183 ++input; 00184 ++pos; 00185 } 00186 00187 block_type * Blocks1; 00188 00189 if (pos == block_type::size) 00190 { // enlarge/reallocate Blocks1 array 00191 block_type * NewBlocks = new block_type[m2 * 2]; 00192 std::copy(result_.small_.begin(), result_.small_.end(), NewBlocks[0].begin()); 00193 result_.small_.clear(); 00194 //delete [] Blocks1; 00195 Blocks1 = NewBlocks; 00196 } 00197 else 00198 { 00199 STXXL_VERBOSE1("runs_creator: Small input optimization, input length: " << pos); 00200 result_.elements = pos; 00201 std::sort(result_.small_.begin(), result_.small_.end(), cmp); 00202 return; 00203 } 00204 #endif 00205 00206 while (!input.empty() && pos != el_in_run) 00207 { 00208 Blocks1[pos / block_type::size][pos % block_type::size] = *input; 00209 ++input; 00210 ++pos; 00211 } 00212 00213 // sort first run 00214 sort_run(Blocks1, pos); 00215 result_.elements = pos; 00216 if (pos < block_type::size && input.empty()) // small input, do not flush it on the disk(s) 00217 { 00218 STXXL_VERBOSE1("runs_creator: Small input optimization, input length: " << pos); 00219 result_.small_.resize(pos); 00220 std::copy(Blocks1[0].begin(), Blocks1[0].begin() + pos, result_.small_.begin()); 00221 delete[] Blocks1; 00222 return; 00223 } 00224 00225 00226 block_type * Blocks2 = Blocks1 + m2; 00227 block_manager * bm = block_manager::get_instance(); 00228 request_ptr * write_reqs = new request_ptr[m2]; 00229 run_type run; 00230 00231 00232 unsigned_type cur_run_size = div_and_round_up(pos, block_type::size); // in blocks 00233 run.resize(cur_run_size); 00234 bm->new_blocks(AllocStr_(), 00235 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()), 00236 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end()) 00237 ); 00238 00239 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE); 00240 00241 result_.runs_sizes.push_back(pos); 00242 00243 // fill the rest of the last block with max values 00244 for ( ; pos != el_in_run; ++pos) 00245 Blocks1[pos / block_type::size][pos % block_type::size] = cmp.max_value(); 00246 00247 00248 for (i = 0; i < cur_run_size; ++i) 00249 { 00250 run[i].value = Blocks1[i][0]; 00251 write_reqs[i] = Blocks1[i].write(run[i].bid); 00252 //STXXL_MSG("BID: "<<run[i].bid<<" val: "<<run[i].value); 00253 } 00254 result_.runs.push_back(run); // # 00255 00256 if (input.empty()) 00257 { 00258 // return 00259 wait_all(write_reqs, write_reqs + cur_run_size); 00260 delete[] write_reqs; 00261 delete[] Blocks1; 00262 return; 00263 } 00264 00265 STXXL_VERBOSE1("Filling the second part of the allocated blocks"); 00266 pos = 0; 00267 while (!input.empty() && pos != el_in_run) 00268 { 00269 Blocks2[pos / block_type::size][pos % block_type::size] = *input; 00270 ++input; 00271 ++pos; 00272 } 00273 result_.elements += pos; 00274 00275 if (input.empty()) 00276 { 00277 // (re)sort internally and return 00278 pos += el_in_run; 00279 sort_run(Blocks1, pos); // sort first an second run together 00280 wait_all(write_reqs, write_reqs + cur_run_size); 00281 bm->delete_blocks(trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()), 00282 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())); 00283 00284 cur_run_size = div_and_round_up(pos, block_type::size); 00285 run.resize(cur_run_size); 00286 bm->new_blocks(AllocStr_(), 00287 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()), 00288 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end()) 00289 ); 00290 00291 result_.runs_sizes[0] = pos; 00292 // fill the rest of the last block with max values 00293 for ( ; pos != 2 * el_in_run; ++pos) 00294 Blocks1[pos / block_type::size][pos % block_type::size] = cmp.max_value(); 00295 00296 00297 assert(cur_run_size > m2); 00298 00299 for (i = 0; i < m2; ++i) 00300 { 00301 run[i].value = Blocks1[i][0]; 00302 write_reqs[i]->wait(); 00303 write_reqs[i] = Blocks1[i].write(run[i].bid); 00304 } 00305 00306 request_ptr * write_reqs1 = new request_ptr[cur_run_size - m2]; 00307 00308 for ( ; i < cur_run_size; ++i) 00309 { 00310 run[i].value = Blocks1[i][0]; 00311 write_reqs1[i - m2] = Blocks1[i].write(run[i].bid); 00312 } 00313 00314 result_.runs[0] = run; 00315 00316 wait_all(write_reqs, write_reqs + m2); 00317 delete[] write_reqs; 00318 wait_all(write_reqs1, write_reqs1 + cur_run_size - m2); 00319 delete[] write_reqs1; 00320 00321 delete[] Blocks1; 00322 00323 return; 00324 } 00325 00326 sort_run(Blocks2, pos); 00327 00328 cur_run_size = div_and_round_up(pos, block_type::size); // in blocks 00329 run.resize(cur_run_size); 00330 bm->new_blocks(AllocStr_(), 00331 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()), 00332 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end()) 00333 ); 00334 00335 for (i = 0; i < cur_run_size; ++i) 00336 { 00337 run[i].value = Blocks2[i][0]; 00338 write_reqs[i]->wait(); 00339 write_reqs[i] = Blocks2[i].write(run[i].bid); 00340 } 00341 assert((pos % el_in_run) == 0); 00342 00343 result_.runs.push_back(run); 00344 result_.runs_sizes.push_back(pos); 00345 00346 while (!input.empty()) 00347 { 00348 pos = 0; 00349 while (!input.empty() && pos != el_in_run) 00350 { 00351 Blocks1[pos / block_type::size][pos % block_type::size] = *input; 00352 ++input; 00353 ++pos; 00354 } 00355 result_.elements += pos; 00356 sort_run(Blocks1, pos); 00357 cur_run_size = div_and_round_up(pos, block_type::size); // in blocks 00358 run.resize(cur_run_size); 00359 bm->new_blocks(AllocStr_(), 00360 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()), 00361 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end()) 00362 ); 00363 00364 result_.runs_sizes.push_back(pos); 00365 00366 // fill the rest of the last block with max values (occurs only on the last run) 00367 for ( ; pos != el_in_run; ++pos) 00368 Blocks1[pos / block_type::size][pos % block_type::size] = cmp.max_value(); 00369 00370 00371 for (i = 0; i < cur_run_size; ++i) 00372 { 00373 run[i].value = Blocks1[i][0]; 00374 write_reqs[i]->wait(); 00375 write_reqs[i] = Blocks1[i].write(run[i].bid); 00376 } 00377 result_.runs.push_back(run); // # 00378 00379 std::swap(Blocks1, Blocks2); 00380 } 00381 00382 wait_all(write_reqs, write_reqs + m2); 00383 delete[] write_reqs; 00384 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2); 00385 } 00386 00387 00395 template <class ValueType_> 00396 struct use_push 00397 { 00398 typedef ValueType_ value_type; 00399 }; 00400 00412 template < 00413 class ValueType_, 00414 class Cmp_, 00415 unsigned BlockSize_, 00416 class AllocStr_> 00417 class runs_creator< 00418 use_push<ValueType_>, 00419 Cmp_, 00420 BlockSize_, 00421 AllocStr_> 00422 { 00423 Cmp_ cmp; 00424 00425 public: 00426 typedef Cmp_ cmp_type; 00427 typedef ValueType_ value_type; 00428 typedef BID<BlockSize_> bid_type; 00429 typedef typed_block<BlockSize_, value_type> block_type; 00430 typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type; 00431 typedef sorted_runs<value_type, trigger_entry_type> sorted_runs_type; 00432 00433 private: 00434 typedef typename sorted_runs_type::run_type run_type; 00435 sorted_runs_type result_; // stores the result (sorted runs) 00436 unsigned_type m_; // memory for internal use in blocks 00437 00438 bool output_requested; // true after the result() method was called for the first time 00439 00440 const unsigned_type m2; 00441 const unsigned_type el_in_run; 00442 unsigned_type cur_el; 00443 block_type * Blocks1; 00444 block_type * Blocks2; 00445 request_ptr * write_reqs; 00446 run_type run; 00447 00448 void sort_run(block_type * run, unsigned_type elements) 00449 { 00450 if (block_type::has_filler) 00451 std::sort( 00452 #if 1 00453 ArrayOfSequencesIterator< 00454 block_type, typename block_type::value_type, block_type::size 00455 >(run, 0), 00456 ArrayOfSequencesIterator< 00457 block_type, typename block_type::value_type, block_type::size 00458 >(run, elements), 00459 #else 00460 TwoToOneDimArrayRowAdaptor< 00461 block_type, value_type, block_type::size 00462 >(run, 0), 00463 TwoToOneDimArrayRowAdaptor< 00464 block_type, value_type, block_type::size 00465 >(run, elements), 00466 #endif 00467 cmp); 00468 00469 else 00470 std::sort(run[0].elem, run[0].elem + elements, cmp); 00471 } 00472 void finish_result() 00473 { 00474 if (cur_el == 0) 00475 return; 00476 00477 00478 unsigned_type cur_el_reg = cur_el; 00479 sort_run(Blocks1, cur_el_reg); 00480 result_.elements += cur_el_reg; 00481 if (cur_el_reg < unsigned_type(block_type::size) && 00482 unsigned_type(result_.elements) == cur_el_reg) // small input, do not flush it on the disk(s) 00483 { 00484 STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << cur_el_reg); 00485 result_.small_.resize(cur_el_reg); 00486 std::copy(Blocks1[0].begin(), Blocks1[0].begin() + cur_el_reg, result_.small_.begin()); 00487 return; 00488 } 00489 00490 const unsigned_type cur_run_size = div_and_round_up(cur_el_reg, block_type::size); // in blocks 00491 run.resize(cur_run_size); 00492 block_manager * bm = block_manager::get_instance(); 00493 bm->new_blocks(AllocStr_(), 00494 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()), 00495 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end()) 00496 ); 00497 00498 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE); 00499 00500 result_.runs_sizes.push_back(cur_el_reg); 00501 00502 // fill the rest of the last block with max values 00503 for ( ; cur_el_reg != el_in_run; ++cur_el_reg) 00504 Blocks1[cur_el_reg / block_type::size][cur_el_reg % block_type::size] = cmp.max_value(); 00505 00506 00507 unsigned_type i = 0; 00508 for ( ; i < cur_run_size; ++i) 00509 { 00510 run[i].value = Blocks1[i][0]; 00511 if (write_reqs[i].get()) 00512 write_reqs[i]->wait(); 00513 00514 write_reqs[i] = Blocks1[i].write(run[i].bid); 00515 } 00516 result_.runs.push_back(run); 00517 00518 for (i = 0; i < m2; ++i) 00519 if (write_reqs[i].get()) 00520 write_reqs[i]->wait(); 00521 } 00522 void cleanup() 00523 { 00524 delete[] write_reqs; 00525 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2); 00526 write_reqs = NULL; 00527 Blocks1 = Blocks2 = NULL; 00528 } 00529 00530 public: 00534 runs_creator(Cmp_ c, unsigned_type memory_to_use) : 00535 cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), output_requested(false), 00536 m2(m_ / 2), 00537 el_in_run(m2 * block_type::size), 00538 cur_el(0), 00539 Blocks1(new block_type[m2 * 2]), 00540 Blocks2(Blocks1 + m2), 00541 write_reqs(new request_ptr[m2]) 00542 { 00543 assert(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use); 00544 } 00545 00546 ~runs_creator() 00547 { 00548 if (!output_requested) 00549 cleanup(); 00550 } 00551 00554 void push(const value_type & val) 00555 { 00556 assert(output_requested == false); 00557 unsigned_type cur_el_reg = cur_el; 00558 if (cur_el_reg < el_in_run) 00559 { 00560 Blocks1[cur_el_reg / block_type::size][cur_el_reg % block_type::size] = val; 00561 ++cur_el; 00562 return; 00563 } 00564 00565 assert(el_in_run == cur_el); 00566 cur_el = 0; 00567 00568 //sort and store Blocks1 00569 sort_run(Blocks1, el_in_run); 00570 result_.elements += el_in_run; 00571 00572 const unsigned_type cur_run_size = div_and_round_up(el_in_run, block_type::size); // in blocks 00573 run.resize(cur_run_size); 00574 block_manager * bm = block_manager::get_instance(); 00575 bm->new_blocks(AllocStr_(), 00576 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()), 00577 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end()) 00578 ); 00579 00580 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE); 00581 00582 result_.runs_sizes.push_back(el_in_run); 00583 00584 for (unsigned_type i = 0; i < cur_run_size; ++i) 00585 { 00586 run[i].value = Blocks1[i][0]; 00587 if (write_reqs[i].get()) 00588 write_reqs[i]->wait(); 00589 00590 write_reqs[i] = Blocks1[i].write(run[i].bid); 00591 } 00592 00593 result_.runs.push_back(run); 00594 00595 std::swap(Blocks1, Blocks2); 00596 00597 push(val); 00598 } 00599 00603 const sorted_runs_type & result() 00604 { 00605 if (!output_requested) 00606 { 00607 finish_result(); 00608 output_requested = true; 00609 cleanup(); 00610 #ifdef STXXL_PRINT_STAT_AFTER_RF 00611 STXXL_MSG(*stats::get_instance()); 00612 #endif 00613 } 00614 return result_; 00615 } 00616 }; 00617 00618 00625 template <class ValueType_> 00626 struct from_sorted_sequences 00627 { 00628 typedef ValueType_ value_type; 00629 }; 00630 00642 template < 00643 class ValueType_, 00644 class Cmp_, 00645 unsigned BlockSize_, 00646 class AllocStr_> 00647 class runs_creator< 00648 from_sorted_sequences<ValueType_>, 00649 Cmp_, 00650 BlockSize_, 00651 AllocStr_> 00652 { 00653 typedef ValueType_ value_type; 00654 typedef BID<BlockSize_> bid_type; 00655 typedef typed_block<BlockSize_, value_type> block_type; 00656 typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type; 00657 typedef AllocStr_ alloc_strategy_type; 00658 Cmp_ cmp; 00659 00660 public: 00661 typedef Cmp_ cmp_type; 00662 typedef sorted_runs<value_type, trigger_entry_type> sorted_runs_type; 00663 00664 private: 00665 typedef typename sorted_runs_type::run_type run_type; 00666 sorted_runs_type result_; // stores the result (sorted runs) 00667 unsigned_type m_; // memory for internal use in blocks 00668 buffered_writer<block_type> writer; 00669 block_type * cur_block; 00670 unsigned_type offset; 00671 unsigned_type iblock; 00672 unsigned_type irun; 00673 alloc_strategy_type alloc_strategy; 00674 00675 public: 00680 runs_creator(Cmp_ c, unsigned_type memory_to_use) : 00681 cmp(c), 00682 m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), 00683 writer(m_, m_ / 2), 00684 cur_block(writer.get_free_block()), 00685 offset(0), 00686 iblock(0), 00687 irun(0) 00688 { 00689 assert(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use); 00690 } 00691 00694 void push(const value_type & val) 00695 { 00696 assert(offset < block_type::size); 00697 00698 (*cur_block)[offset] = val; 00699 ++offset; 00700 00701 if (offset == block_type::size) 00702 { 00703 // write current block 00704 00705 block_manager * bm = block_manager::get_instance(); 00706 // allocate space for the block 00707 result_.runs.resize(irun + 1); 00708 result_.runs[irun].resize(iblock + 1); 00709 bm->new_blocks( 00710 offset_allocator<alloc_strategy_type>(iblock, alloc_strategy), 00711 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>( 00712 result_.runs[irun].begin() + iblock), 00713 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>( 00714 result_.runs[irun].end()) 00715 ); 00716 00717 result_.runs[irun][iblock].value = (*cur_block)[0]; // init trigger 00718 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid); 00719 ++iblock; 00720 00721 offset = 0; 00722 } 00723 00724 ++result_.elements; 00725 } 00726 00728 void finish() 00729 { 00730 if (offset == 0 && iblock == 0) // current run is empty 00731 return; 00732 00733 00734 result_.runs_sizes.resize(irun + 1); 00735 result_.runs_sizes.back() = iblock * block_type::size + offset; 00736 00737 if (offset) // if current block is partially filled 00738 { 00739 while (offset != block_type::size) 00740 { 00741 (*cur_block)[offset] = cmp.max_value(); 00742 ++offset; 00743 } 00744 offset = 0; 00745 00746 block_manager * bm = block_manager::get_instance(); 00747 // allocate space for the block 00748 result_.runs.resize(irun + 1); 00749 result_.runs[irun].resize(iblock + 1); 00750 bm->new_blocks( 00751 offset_allocator<alloc_strategy_type>(iblock, alloc_strategy), 00752 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>( 00753 result_.runs[irun].begin() + iblock), 00754 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>( 00755 result_.runs[irun].end()) 00756 ); 00757 00758 result_.runs[irun][iblock].value = (*cur_block)[0]; // init trigger 00759 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid); 00760 } 00761 else 00762 { } 00763 00764 iblock = 0; 00765 ++irun; 00766 } 00767 00771 const sorted_runs_type & result() 00772 { 00773 finish(); 00774 writer.flush(); 00775 00776 return result_; 00777 } 00778 }; 00779 00780 00785 template <class RunsType_, class Cmp_> 00786 bool check_sorted_runs(RunsType_ & sruns, Cmp_ cmp) 00787 { 00788 typedef typename RunsType_::block_type block_type; 00789 typedef typename block_type::value_type value_type; 00790 STXXL_VERBOSE2("Elements: " << sruns.elements); 00791 unsigned_type nruns = sruns.runs.size(); 00792 STXXL_VERBOSE2("Runs: " << nruns); 00793 unsigned_type irun = 0; 00794 for (irun = 0; irun < nruns; ++irun) 00795 { 00796 const unsigned_type nblocks = sruns.runs[irun].size(); 00797 block_type * blocks = new block_type[nblocks]; 00798 request_ptr * reqs = new request_ptr[nblocks]; 00799 for (unsigned_type j = 0; j < nblocks; ++j) 00800 { 00801 reqs[j] = blocks[j].read(sruns.runs[irun][j].bid); 00802 } 00803 wait_all(reqs, reqs + nblocks); 00804 for (unsigned_type j = 0; j < nblocks; ++j) 00805 { 00806 if (cmp(blocks[j][0], sruns.runs[irun][j].value) || 00807 cmp(sruns.runs[irun][j].value, blocks[j][0])) 00808 { 00809 STXXL_ERRMSG("check_sorted_runs wrong trigger in the run"); 00810 return false; 00811 } 00812 } 00813 if (!stxxl::is_sorted( 00814 #if 1 00815 ArrayOfSequencesIterator< 00816 block_type, typename block_type::value_type, block_type::size 00817 >(blocks, 0), 00818 ArrayOfSequencesIterator< 00819 block_type, typename block_type::value_type, block_type::size 00820 >(blocks, sruns.runs_sizes[irun]), 00821 #else 00822 TwoToOneDimArrayRowAdaptor< 00823 block_type, value_type, block_type::size 00824 >(blocks, 0), 00825 TwoToOneDimArrayRowAdaptor< 00826 block_type, value_type, block_type::size 00827 >(blocks, 00828 //nblocks*block_type::size 00829 //(irun<nruns-1)?(nblocks*block_type::size): (sruns.elements%(nblocks*block_type::size)) 00830 sruns.runs_sizes[irun] 00831 ), 00832 #endif 00833 cmp)) 00834 { 00835 STXXL_ERRMSG("check_sorted_runs wrong order in the run"); 00836 return false; 00837 } 00838 00839 delete[] reqs; 00840 delete[] blocks; 00841 } 00842 00843 STXXL_MSG("Checking runs finished successfully"); 00844 00845 return true; 00846 } 00847 00848 00856 template <class RunsType_, 00857 class Cmp_, 00858 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY> 00859 class runs_merger : private noncopyable 00860 { 00861 typedef RunsType_ sorted_runs_type; 00862 typedef AllocStr_ alloc_strategy; 00863 typedef typename sorted_runs_type::size_type size_type; 00864 typedef Cmp_ value_cmp; 00865 typedef typename sorted_runs_type::run_type run_type; 00866 typedef typename sorted_runs_type::block_type block_type; 00867 typedef typename block_type::bid_type bid_type; 00868 typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type; 00869 typedef run_cursor2<block_type, prefetcher_type> run_cursor_type; 00870 typedef sort_local::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type; 00871 typedef loser_tree<run_cursor_type, run_cursor2_cmp_type, block_type::size> loser_tree_type; 00872 00873 typedef stxxl::int64 diff_type; 00874 typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence; 00875 typedef typename std::vector<sequence>::size_type seqs_size_type; 00876 std::vector<sequence> * seqs; 00877 std::vector<block_type *> * buffers; 00878 00879 00880 sorted_runs_type sruns; 00881 unsigned_type m_; // blocks to use - 1 00882 value_cmp cmp; 00883 size_type elements_remaining; 00884 unsigned_type buffer_pos; 00885 block_type * current_block; 00886 run_type consume_seq; 00887 prefetcher_type * prefetcher; 00888 loser_tree_type * losers; 00889 int_type * prefetch_seq; 00890 unsigned_type nruns; 00891 #ifdef STXXL_CHECK_ORDER_IN_SORTS 00892 typename block_type::value_type last_element; 00893 #endif 00894 00895 void merge_recursively(); 00896 00897 void deallocate_prefetcher() 00898 { 00899 if (prefetcher) 00900 { 00901 delete losers; 00902 delete seqs; 00903 delete buffers; 00904 delete prefetcher; 00905 delete[] prefetch_seq; 00906 prefetcher = NULL; 00907 } 00908 // free blocks in runs , (or the user should do it?) 00909 sruns.deallocate_blocks(); 00910 } 00911 00912 void initialize_current_block() 00913 { 00914 if (do_parallel_merge()) 00915 { 00916 #if STXXL_PARALLEL_MULTIWAY_MERGE 00917 // begin of STL-style merging 00918 seqs = new std::vector<sequence>(nruns); 00919 buffers = new std::vector<block_type *>(nruns); 00920 00921 for (unsigned_type i = 0; i < nruns; i++) //initialize sequences 00922 { 00923 (*buffers)[i] = prefetcher->pull_block(); //get first block of each run 00924 (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end()); //this memory location stays the same, only the data is exchanged 00925 } 00926 00927 // end of STL-style merging 00928 #else 00929 assert(false); 00930 #endif 00931 } 00932 else 00933 { 00934 // begin of native merging procedure 00935 00936 losers = new loser_tree_type(prefetcher, nruns, run_cursor2_cmp_type(cmp)); 00937 00938 // end of native merging procedure 00939 } 00940 } 00941 00942 void fill_current_block() 00943 { 00944 if (do_parallel_merge()) 00945 { 00946 #if STXXL_PARALLEL_MULTIWAY_MERGE 00947 // begin of STL-style merging 00948 diff_type rest = block_type::size; // elements still to merge for this output block 00949 00950 do // while rest > 0 and still elements available 00951 { 00952 value_type * min_last_element = NULL; // no element found yet 00953 diff_type total_size = 0; 00954 00955 for (seqs_size_type i = 0; i < (*seqs).size(); i++) 00956 { 00957 if ((*seqs)[i].first == (*seqs)[i].second) 00958 continue; //run empty 00959 00960 if (min_last_element == NULL) 00961 min_last_element = &(*((*seqs)[i].second - 1)); 00962 else 00963 min_last_element = cmp(*min_last_element, *((*seqs)[i].second - 1)) ? min_last_element : &(*((*seqs)[i].second - 1)); 00964 00965 total_size += (*seqs)[i].second - (*seqs)[i].first; 00966 STXXL_VERBOSE1("last " << *((*seqs)[i].second - 1) << " block size " << ((*seqs)[i].second - (*seqs)[i].first)); 00967 } 00968 00969 assert(min_last_element != NULL); //there must be some element 00970 00971 STXXL_VERBOSE1("min_last_element " << min_last_element << " total size " << total_size + (block_type::size - rest)); 00972 00973 diff_type less_equal_than_min_last = 0; 00974 //locate this element in all sequences 00975 for (seqs_size_type i = 0; i < (*seqs).size(); i++) 00976 { 00977 if ((*seqs)[i].first == (*seqs)[i].second) 00978 continue; //empty subsequence 00979 00980 typename block_type::iterator position = std::upper_bound((*seqs)[i].first, (*seqs)[i].second, *min_last_element, cmp); 00981 STXXL_VERBOSE1("greater equal than " << position - (*seqs)[i].first); 00982 less_equal_than_min_last += position - (*seqs)[i].first; 00983 } 00984 00985 STXXL_VERBOSE1("finished loop"); 00986 00987 ptrdiff_t output_size = (std::min)(less_equal_than_min_last, rest); // at most rest elements 00988 00989 STXXL_VERBOSE1("before merge" << output_size); 00990 00991 stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), current_block->end() - rest, cmp, output_size); 00992 //sequence iterators are progressed appropriately 00993 00994 STXXL_VERBOSE1("after merge"); 00995 00996 rest -= output_size; 00997 00998 STXXL_VERBOSE1("so long"); 00999 01000 for (seqs_size_type i = 0; i < (*seqs).size(); i++) 01001 { 01002 if ((*seqs)[i].first == (*seqs)[i].second) // run empty 01003 { 01004 if (prefetcher->block_consumed((*buffers)[i])) 01005 { 01006 (*seqs)[i].first = (*buffers)[i]->begin(); // reset iterator 01007 (*seqs)[i].second = (*buffers)[i]->end(); 01008 STXXL_VERBOSE1("block ran empty " << i); 01009 } 01010 else 01011 { 01012 (*seqs).erase((*seqs).begin() + i); // remove this sequence 01013 (*buffers).erase((*buffers).begin() + i); 01014 STXXL_VERBOSE1("seq removed " << i); 01015 } 01016 } 01017 } 01018 } while (rest > 0 && (*seqs).size() > 0); 01019 01020 #ifdef STXXL_CHECK_ORDER_IN_SORTS 01021 if (!stxxl::is_sorted(current_block->begin(), current_block->end(), cmp)) 01022 { 01023 for (value_type * i = current_block->begin() + 1; i != current_block->end(); i++) 01024 if (cmp(*i, *(i - 1))) 01025 { 01026 STXXL_VERBOSE1("Error at position " << (i - current_block->begin())); 01027 } 01028 assert(false); 01029 } 01030 #endif 01031 01032 // end of STL-style merging 01033 #else 01034 assert(false); 01035 #endif 01036 } 01037 else 01038 { 01039 // begin of native merging procedure 01040 01041 losers->multi_merge(current_block->elem); 01042 01043 // end of native merging procedure 01044 } 01045 } 01046 01047 public: 01049 typedef typename sorted_runs_type::value_type value_type; 01050 01055 runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) : 01056 sruns(r), m_(memory_to_use / block_type::raw_size / sort_memory_usage_factor() /* - 1 */), cmp(c), 01057 elements_remaining(r.elements), 01058 current_block(NULL), 01059 prefetcher(NULL) 01060 #ifdef STXXL_CHECK_ORDER_IN_SORTS 01061 , last_element(cmp.min_value()) 01062 #endif 01063 { 01064 if (empty()) 01065 return; 01066 01067 01068 if (!sruns.small_.empty()) // we have a small input < B, 01069 // that is kept in the main memory 01070 { 01071 STXXL_VERBOSE1("runs_merger: small input optimization, input length: " << elements_remaining); 01072 assert(elements_remaining == size_type(sruns.small_.size())); 01073 current_block = new block_type; 01074 std::copy(sruns.small_.begin(), sruns.small_.end(), current_block->begin()); 01075 current_value = current_block->elem[0]; 01076 buffer_pos = 1; 01077 01078 return; 01079 } 01080 01081 #ifdef STXXL_CHECK_ORDER_IN_SORTS 01082 assert(check_sorted_runs(r, cmp)); 01083 #endif 01084 01085 current_block = new block_type; 01086 01087 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE); 01088 01089 nruns = sruns.runs.size(); 01090 01091 if (m_ < nruns) 01092 { 01093 // can not merge runs in one pass 01094 // merge recursively: 01095 STXXL_ERRMSG("The implementation of sort requires more than one merge pass, therefore for a better"); 01096 STXXL_ERRMSG("efficiency decrease block size of run storage (a parameter of the run_creator)"); 01097 STXXL_ERRMSG("or increase the amount memory dedicated to the merger."); 01098 STXXL_ERRMSG("m = " << m_ << " nruns=" << nruns); 01099 01100 // insufficient memory, can not merge at all 01101 if (m_ < 2) { 01102 STXXL_ERRMSG("The merger requires memory to store at least two blocks internally. Aborting."); 01103 abort(); 01104 } 01105 01106 merge_recursively(); 01107 01108 nruns = sruns.runs.size(); 01109 } 01110 01111 assert(nruns <= m_); 01112 01113 unsigned_type i; 01114 /* 01115 const unsigned_type out_run_size = 01116 div_and_round_up(elements_remaining,size_type(block_type::size)); 01117 */ 01118 unsigned_type prefetch_seq_size = 0; 01119 for (i = 0; i < nruns; i++) 01120 { 01121 prefetch_seq_size += sruns.runs[i].size(); 01122 } 01123 01124 consume_seq.resize(prefetch_seq_size); 01125 01126 prefetch_seq = new int_type[prefetch_seq_size]; 01127 01128 typename run_type::iterator copy_start = consume_seq.begin(); 01129 for (i = 0; i < nruns; i++) 01130 { 01131 copy_start = std::copy( 01132 sruns.runs[i].begin(), 01133 sruns.runs[i].end(), 01134 copy_start); 01135 } 01136 01137 std::stable_sort(consume_seq.begin(), consume_seq.end(), 01138 sort_local::trigger_entry_cmp<bid_type, value_type, value_cmp>(cmp)); 01139 01140 int_type disks_number = config::get_instance()->disks_number(); 01141 01142 const int_type n_prefetch_buffers = STXXL_MAX(2 * disks_number, (int_type(m_) - int_type(nruns))); 01143 01144 01145 #ifdef SORT_OPTIMAL_PREFETCHING 01146 // heuristic 01147 const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10; 01148 01149 compute_prefetch_schedule( 01150 consume_seq, 01151 prefetch_seq, 01152 n_opt_prefetch_buffers, 01153 disks_number); 01154 #else 01155 for (i = 0; i < prefetch_seq_size; ++i) 01156 prefetch_seq[i] = i; 01157 01158 #endif 01159 01160 01161 prefetcher = new prefetcher_type( 01162 consume_seq.begin(), 01163 consume_seq.end(), 01164 prefetch_seq, 01165 nruns + n_prefetch_buffers); 01166 01167 losers = NULL; 01168 seqs = NULL; 01169 buffers = NULL; 01170 01171 initialize_current_block(); 01172 fill_current_block(); 01173 01174 01175 current_value = current_block->elem[0]; 01176 buffer_pos = 1; 01177 01178 if (elements_remaining <= block_type::size) 01179 deallocate_prefetcher(); 01180 } 01181 01183 bool empty() const 01184 { 01185 return elements_remaining == 0; 01186 } 01188 runs_merger & operator ++ () // preincrement operator 01189 { 01190 assert(!empty()); 01191 01192 --elements_remaining; 01193 01194 if (buffer_pos != block_type::size) 01195 { 01196 current_value = current_block->elem[buffer_pos]; 01197 ++buffer_pos; 01198 } 01199 else 01200 { 01201 if (!empty()) 01202 { 01203 fill_current_block(); 01204 01205 #ifdef STXXL_CHECK_ORDER_IN_SORTS 01206 assert(stxxl::is_sorted(current_block->elem, current_block->elem + current_block->size, cmp)); 01207 assert(!cmp(current_block->elem[0], current_value)); 01208 #endif 01209 current_value = current_block->elem[0]; 01210 buffer_pos = 1; 01211 } 01212 if (elements_remaining <= block_type::size) 01213 deallocate_prefetcher(); 01214 } 01215 01216 01217 #ifdef STXXL_CHECK_ORDER_IN_SORTS 01218 if (!empty()) 01219 { 01220 assert(!cmp(current_value, last_element)); 01221 last_element = current_value; 01222 } 01223 #endif 01224 01225 return *this; 01226 } 01228 const value_type & operator * () const 01229 { 01230 assert(!empty()); 01231 return current_value; 01232 } 01233 01235 const value_type * operator -> () const 01236 { 01237 assert(!empty()); 01238 return ¤t_value; 01239 } 01240 01241 01244 virtual ~runs_merger() 01245 { 01246 deallocate_prefetcher(); 01247 01248 if (current_block) 01249 delete current_block; 01250 01251 // free blocks in runs , (or the user should do it?) 01252 sruns.deallocate_blocks(); 01253 } 01254 01255 private: 01256 // cache for the current value 01257 value_type current_value; 01258 }; 01259 01260 01261 template <class RunsType_, class Cmp_, class AllocStr_> 01262 void runs_merger<RunsType_, Cmp_, AllocStr_>::merge_recursively() 01263 { 01264 block_manager * bm = block_manager::get_instance(); 01265 unsigned_type ndisks = config::get_instance()->disks_number(); 01266 unsigned_type nwrite_buffers = 2 * ndisks; 01267 01268 unsigned_type nruns = sruns.runs.size(); 01269 const unsigned_type merge_factor = 01270 static_cast<unsigned_type>(ceil(pow(nruns, 1. / ceil(log(double(nruns)) / log(double(m_)))))); 01271 assert(merge_factor <= m_); 01272 while (nruns > m_) 01273 { 01274 unsigned_type new_nruns = div_and_round_up(nruns, merge_factor); 01275 STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns << 01276 " opt_merge_factor: " << merge_factor << " m:" << m_ << " new_nruns: " << new_nruns); 01277 01278 sorted_runs_type new_runs; 01279 new_runs.runs.resize(new_nruns); 01280 new_runs.runs_sizes.resize(new_nruns); 01281 new_runs.elements = sruns.elements; 01282 01283 unsigned_type runs_left = nruns; 01284 unsigned_type cur_out_run = 0; 01285 unsigned_type elements_in_new_run = 0; 01286 //unsigned_type blocks_in_new_run = 0; 01287 01288 01289 while (runs_left > 0) 01290 { 01291 int_type runs2merge = STXXL_MIN(runs_left, merge_factor); 01292 //blocks_in_new_run = 0 ; 01293 elements_in_new_run = 0; 01294 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i) 01295 { 01296 elements_in_new_run += sruns.runs_sizes[i]; 01297 //blocks_in_new_run += sruns.runs[i].size(); 01298 } 01299 const unsigned_type blocks_in_new_run1 = div_and_round_up(elements_in_new_run, block_type::size); 01300 //assert(blocks_in_new_run1 == blocks_in_new_run); 01301 01302 new_runs.runs_sizes[cur_out_run] = elements_in_new_run; 01303 // allocate run 01304 new_runs.runs[cur_out_run++].resize(blocks_in_new_run1); 01305 runs_left -= runs2merge; 01306 } 01307 01308 // allocate blocks for the new runs 01309 for (unsigned_type i = 0; i < new_runs.runs.size(); ++i) 01310 bm->new_blocks(alloc_strategy(), 01311 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(new_runs.runs[i].begin()), 01312 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(new_runs.runs[i].end())); 01313 01314 01315 // merge all 01316 runs_left = nruns; 01317 cur_out_run = 0; 01318 size_type elements_left = sruns.elements; 01319 01320 while (runs_left > 0) 01321 { 01322 unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor); 01323 STXXL_VERBOSE("Merging " << runs2merge << " runs"); 01324 01325 sorted_runs_type cur_runs; 01326 cur_runs.runs.resize(runs2merge); 01327 cur_runs.runs_sizes.resize(runs2merge); 01328 01329 std::copy(sruns.runs.begin() + nruns - runs_left, 01330 sruns.runs.begin() + nruns - runs_left + runs2merge, 01331 cur_runs.runs.begin()); 01332 std::copy(sruns.runs_sizes.begin() + nruns - runs_left, 01333 sruns.runs_sizes.begin() + nruns - runs_left + runs2merge, 01334 cur_runs.runs_sizes.begin()); 01335 01336 runs_left -= runs2merge; 01337 /* 01338 cur_runs.elements = (runs_left)? 01339 (new_runs.runs[cur_out_run].size()*block_type::size): 01340 (elements_left); 01341 */ 01342 cur_runs.elements = new_runs.runs_sizes[cur_out_run]; 01343 elements_left -= cur_runs.elements; 01344 01345 if (runs2merge > 1) 01346 { 01347 runs_merger<RunsType_, Cmp_, AllocStr_> merger(cur_runs, cmp, m_ * block_type::raw_size * sort_memory_usage_factor()); 01348 01349 { // make sure everything is being destroyed in right time 01350 buf_ostream<block_type, typename run_type::iterator> out( 01351 new_runs.runs[cur_out_run].begin(), 01352 nwrite_buffers); 01353 01354 size_type cnt = 0; 01355 const size_type cnt_max = cur_runs.elements; 01356 01357 while (cnt != cnt_max) 01358 { 01359 *out = *merger; 01360 if ((cnt % block_type::size) == 0) // have to write the trigger value 01361 new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger; 01362 01363 01364 ++cnt; 01365 ++out; 01366 ++merger; 01367 } 01368 assert(merger.empty()); 01369 01370 while (cnt % block_type::size) 01371 { 01372 *out = cmp.max_value(); 01373 ++out; 01374 ++cnt; 01375 } 01376 } 01377 } 01378 else 01379 { 01380 bm->delete_blocks( 01381 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>( 01382 new_runs.runs.back().begin()), 01383 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>( 01384 new_runs.runs.back().end())); 01385 01386 assert(cur_runs.runs.size() == 1); 01387 01388 std::copy(cur_runs.runs.front().begin(), 01389 cur_runs.runs.front().end(), 01390 new_runs.runs.back().begin()); 01391 new_runs.runs_sizes.back() = cur_runs.runs_sizes.back(); 01392 } 01393 01394 ++cur_out_run; 01395 } 01396 assert(elements_left == 0); 01397 01398 nruns = new_nruns; 01399 sruns = new_runs; 01400 } 01401 } 01402 01403 01412 template <class Input_, 01413 class Cmp_, 01414 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type), 01415 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY> 01416 class sort : public noncopyable 01417 { 01418 typedef runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> runs_creator_type; 01419 typedef typename runs_creator_type::sorted_runs_type sorted_runs_type; 01420 typedef runs_merger<sorted_runs_type, Cmp_, AllocStr_> runs_merger_type; 01421 01422 runs_creator_type creator; 01423 runs_merger_type merger; 01424 01425 public: 01427 typedef typename Input_::value_type value_type; 01428 01433 sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use) : 01434 creator(in, c, memory_to_use), 01435 merger(creator.result(), c, memory_to_use) 01436 { } 01437 01443 sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use_rc, unsigned_type memory_to_use_m) : 01444 creator(in, c, memory_to_use_rc), 01445 merger(creator.result(), c, memory_to_use_m) 01446 { } 01447 01448 01450 const value_type & operator * () const 01451 { 01452 assert(!empty()); 01453 return *merger; 01454 } 01455 01456 const value_type * operator -> () const 01457 { 01458 assert(!empty()); 01459 return merger.operator -> (); 01460 } 01461 01463 sort & operator ++ () 01464 { 01465 ++merger; 01466 return *this; 01467 } 01468 01470 bool empty() const 01471 { 01472 return merger.empty(); 01473 } 01474 }; 01475 01481 template < 01482 class ValueType_, 01483 unsigned BlockSize_> 01484 class compute_sorted_runs_type 01485 { 01486 typedef ValueType_ value_type; 01487 typedef BID<BlockSize_> bid_type; 01488 typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type; 01489 01490 public: 01491 typedef sorted_runs<value_type, trigger_entry_type> result; 01492 }; 01493 01495 } 01496 01499 01501 01510 template <unsigned BlockSize, 01511 class RandomAccessIterator, 01512 class CmpType, 01513 class AllocStr> 01514 void sort(RandomAccessIterator begin, 01515 RandomAccessIterator end, 01516 CmpType cmp, 01517 unsigned_type MemSize, 01518 AllocStr AS) 01519 { 01520 UNUSED(AS); 01521 #ifdef BOOST_MSVC 01522 typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType; 01523 #else 01524 typedef __typeof__(stream::streamify(begin, end)) InputType; 01525 #endif 01526 InputType Input(begin, end); 01527 typedef stream::sort<InputType, CmpType, BlockSize, AllocStr> sorter_type; 01528 sorter_type Sort(Input, cmp, MemSize); 01529 stream::materialize(Sort, begin); 01530 } 01531 01533 01534 __STXXL_END_NAMESPACE 01535 01536 #endif // !STXXL_SORT_STREAM_HEADER 01537 // vim: et:ts=4:sw=4