Stxxl
1.2.1
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/algo/stable_ksort.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2003 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * 00008 * Distributed under the Boost Software License, Version 1.0. 00009 * (See accompanying file LICENSE_1_0.txt or copy at 00010 * http://www.boost.org/LICENSE_1_0.txt) 00011 **************************************************************************/ 00012 00013 #ifndef STXXL_STABLE_KSORT_HEADER 00014 #define STXXL_STABLE_KSORT_HEADER 00015 00016 // it is a first try: distribution sort without sampling 00017 // I rework the stable_ksort when I would have a time 00018 00019 00020 #include <cmath> 00021 00022 #include <stxxl/bits/mng/mng.h> 00023 #include <stxxl/bits/mng/buf_istream.h> 00024 #include <stxxl/bits/mng/buf_ostream.h> 00025 #include <stxxl/bits/common/simple_vector.h> 00026 #include <stxxl/bits/algo/intksort.h> 00027 00028 #ifndef STXXL_VERBOSE_STABLE_KSORT 00029 #define STXXL_VERBOSE_STABLE_KSORT STXXL_VERBOSE1 00030 #endif 00031 00032 00033 __STXXL_BEGIN_NAMESPACE 00034 00037 00040 namespace stable_ksort_local 00041 { 00042 template <class type_, class type_key> 00043 void classify_block(type_ * begin, type_ * end, type_key * & out, int_type * bucket, unsigned_type offset, unsigned shift) 00044 { 00045 for (type_ * p = begin; p < end; p++, out++) // count & create references 00046 { 00047 out->ptr = p; 00048 typename type_::key_type key = p->key(); 00049 int_type ibucket = (key - offset) >> shift; 00050 out->key = key; 00051 bucket[ibucket]++; 00052 } 00053 } 00054 00055 template <typename type> 00056 struct type_key 00057 { 00058 typedef typename type::key_type key_type; 00059 key_type key; 00060 type * ptr; 00061 00062 type_key() { } 00063 type_key(key_type k, type * p) : key(k), ptr(p) 00064 { } 00065 }; 00066 00067 template <typename type> 00068 bool operator < (const type_key<type> & a, const type_key<type> & b) 00069 { 00070 return a.key < b.key; 00071 } 00072 00073 template <typename type> 00074 bool operator > (const type_key<type> & a, const type_key<type> & b) 00075 { 00076 return a.key > b.key; 00077 } 00078 00079 00080 template <typename BIDType_, typename AllocStrategy_> 00081 class bid_sequence 00082 { 00083 public: 00084 typedef BIDType_ bid_type; 00085 typedef bid_type & reference; 00086 typedef AllocStrategy_ alloc_strategy; 00087 typedef typename simple_vector<bid_type>::size_type size_type; 00088 typedef typename simple_vector<bid_type>::iterator iterator; 00089 00090 protected: 00091 simple_vector<bid_type> * bids; 00092 alloc_strategy alloc_strategy_; 00093 00094 public: 00095 bid_sequence() { } 00096 bid_sequence(size_type size_) 00097 { 00098 bids = new simple_vector<bid_type>(size_); 00099 block_manager * mng = block_manager::get_instance(); 00100 mng->new_blocks(alloc_strategy_, bids->begin(), bids->end()); 00101 } 00102 void init(size_type size_) 00103 { 00104 bids = new simple_vector<bid_type>(size_); 00105 block_manager * mng = block_manager::get_instance(); 00106 mng->new_blocks(alloc_strategy_, bids->begin(), bids->end()); 00107 } 00108 reference operator [] (size_type i) 00109 { 00110 size_type size_ = size(); // cache size in a register 00111 if (i < size_) 00112 return *(bids->begin() + i); 00113 00114 block_manager * mng = block_manager::get_instance(); 00115 simple_vector<bid_type> * larger_bids = new simple_vector<bid_type>((i + 1) * 2); 00116 std::copy(bids->begin(), bids->end(), larger_bids->begin()); 00117 mng->new_blocks(alloc_strategy_, larger_bids->begin() + size_, larger_bids->end()); 00118 delete bids; 00119 bids = larger_bids; 00120 return *(larger_bids->begin() + i); 00121 } 00122 size_type size() { return bids->size(); } 00123 iterator begin() { return bids->begin(); } 00124 ~bid_sequence() 00125 { 00126 block_manager::get_instance()->delete_blocks(bids->begin(), bids->end()); 00127 delete bids; 00128 } 00129 }; 00130 00131 template <typename ExtIterator_> 00132 void distribute( 00133 bid_sequence<typename ExtIterator_::vector_type::block_type::bid_type, 00134 typename ExtIterator_::vector_type::alloc_strategy> * bucket_bids, 00135 int64 * bucket_sizes, 00136 const int_type nbuckets, 00137 const int_type lognbuckets, 00138 ExtIterator_ first, 00139 ExtIterator_ last, 00140 const int_type nread_buffers, 00141 const int_type nwrite_buffers) 00142 { 00143 typedef typename ExtIterator_::vector_type::value_type value_type; 00144 typedef typename value_type::key_type key_type; 00145 typedef typename ExtIterator_::block_type block_type; 00146 typedef typename block_type::bid_type bid_type; 00147 typedef buf_istream<typename ExtIterator_::block_type, 00148 typename ExtIterator_::bids_container_iterator> buf_istream_type; 00149 00150 int_type i = 0; 00151 00152 buf_istream_type in(first.bid(), last.bid() + ((first.block_offset()) ? 1 : 0), 00153 nread_buffers); 00154 00155 buffered_writer<block_type> out( 00156 nbuckets + nwrite_buffers, 00157 nwrite_buffers); 00158 00159 unsigned_type * bucket_block_offsets = new unsigned_type[nbuckets]; 00160 unsigned_type * bucket_iblock = new unsigned_type[nbuckets]; 00161 block_type ** bucket_blocks = new block_type *[nbuckets]; 00162 00163 std::fill(bucket_sizes, bucket_sizes + nbuckets, 0); 00164 std::fill(bucket_iblock, bucket_iblock + nbuckets, 0); 00165 std::fill(bucket_block_offsets, bucket_block_offsets + nbuckets, 0); 00166 00167 for (i = 0; i < nbuckets; i++) 00168 bucket_blocks[i] = out.get_free_block(); 00169 00170 00171 ExtIterator_ cur = first - first.block_offset(); 00172 00173 // skip part of the block before first untouched 00174 for ( ; cur != first; cur++) 00175 ++in; 00176 00177 00178 const int_type shift = sizeof(key_type) * 8 - lognbuckets; 00179 // search in the the range [_begin,_end) 00180 STXXL_VERBOSE_STABLE_KSORT("Shift by: " << shift << " bits, lognbuckets: " << lognbuckets); 00181 for ( ; cur != last; cur++) 00182 { 00183 key_type cur_key = in.current().key(); 00184 int_type ibucket = cur_key >> shift; 00185 00186 int_type block_offset = bucket_block_offsets[ibucket]; 00187 in >> (bucket_blocks[ibucket]->elem[block_offset++]); 00188 if (block_offset == block_type::size) 00189 { 00190 block_offset = 0; 00191 int_type iblock = bucket_iblock[ibucket]++; 00192 bucket_blocks[ibucket] = out.write(bucket_blocks[ibucket], bucket_bids[ibucket][iblock]); 00193 } 00194 bucket_block_offsets[ibucket] = block_offset; 00195 } 00196 for (i = 0; i < nbuckets; i++) 00197 { 00198 if (bucket_block_offsets[i]) 00199 { 00200 out.write(bucket_blocks[i], bucket_bids[i][bucket_iblock[i]]); 00201 } 00202 bucket_sizes[i] = int64(block_type::size) * bucket_iblock[i] + 00203 bucket_block_offsets[i]; 00204 STXXL_VERBOSE_STABLE_KSORT("Bucket " << i << " has size " << bucket_sizes[i] << 00205 ", estimated size: " << ((last - first) / int64(nbuckets))); 00206 } 00207 00208 delete[] bucket_blocks; 00209 delete[] bucket_block_offsets; 00210 delete[] bucket_iblock; 00211 } 00212 } 00213 00214 template <typename ExtIterator_> 00215 void stable_ksort(ExtIterator_ first, ExtIterator_ last, unsigned_type M) 00216 { 00217 STXXL_MSG("Warning: stable_ksort is not yet fully implemented, it assumes that the keys are uniformly distributed between [0,(std::numeric_limits<key_type>::max)()]"); 00218 typedef typename ExtIterator_::vector_type::value_type value_type; 00219 typedef typename value_type::key_type key_type; 00220 typedef typename ExtIterator_::block_type block_type; 00221 typedef typename block_type::bid_type bid_type; 00222 typedef typename ExtIterator_::vector_type::alloc_strategy alloc_strategy; 00223 typedef stable_ksort_local::bid_sequence<bid_type, alloc_strategy> bucket_bids_type; 00224 typedef stable_ksort_local::type_key<value_type> type_key_; 00225 00226 first.flush(); // flush container 00227 00228 double begin = timestamp(); 00229 00230 unsigned_type i = 0; 00231 config * cfg = config::get_instance(); 00232 const unsigned_type m = M / block_type::raw_size; 00233 assert(2 * block_type::raw_size <= M); 00234 const unsigned_type write_buffers_multiple = 2; 00235 const unsigned_type read_buffers_multiple = 2; 00236 const unsigned_type ndisks = cfg->disks_number(); 00237 const unsigned_type min_num_read_write_buffers = (write_buffers_multiple + read_buffers_multiple) * ndisks; 00238 const unsigned_type nmaxbuckets = m - min_num_read_write_buffers; 00239 const unsigned_type lognbuckets = static_cast<unsigned_type>(log2(double(nmaxbuckets))); 00240 const unsigned_type nbuckets = 1 << lognbuckets; 00241 const unsigned_type est_bucket_size = div_and_round_up((last - first) / int64(nbuckets), 00242 int64(block_type::size)); //in blocks 00243 00244 if (m < min_num_read_write_buffers + 2 || nbuckets < 2) { 00245 STXXL_ERRMSG("stxxl::stable_ksort: Not enough memory. Blocks available: " << m << 00246 ", required for r/w buffers: " << min_num_read_write_buffers << 00247 ", required for buckets: 2, nbuckets: " << nbuckets); 00248 abort(); 00249 } 00250 STXXL_VERBOSE_STABLE_KSORT("Elements to sort: " << (last - first)); 00251 STXXL_VERBOSE_STABLE_KSORT("Number of buckets has to be reduced from " << nmaxbuckets << " to " << nbuckets); 00252 const unsigned_type nread_buffers = (m - nbuckets) * read_buffers_multiple / (read_buffers_multiple + write_buffers_multiple); 00253 const unsigned_type nwrite_buffers = (m - nbuckets) * write_buffers_multiple / (read_buffers_multiple + write_buffers_multiple); 00254 00255 STXXL_VERBOSE_STABLE_KSORT("Read buffers in distribution phase: " << nread_buffers); 00256 STXXL_VERBOSE_STABLE_KSORT("Write buffers in distribution phase: " << nwrite_buffers); 00257 00258 bucket_bids_type * bucket_bids = new bucket_bids_type[nbuckets]; 00259 for (i = 0; i < nbuckets; ++i) 00260 bucket_bids[i].init(est_bucket_size); 00261 00262 int64 * bucket_sizes = new int64[nbuckets]; 00263 00264 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE); 00265 00266 stable_ksort_local::distribute( 00267 bucket_bids, 00268 bucket_sizes, 00269 nbuckets, 00270 lognbuckets, 00271 first, 00272 last, 00273 nread_buffers, 00274 nwrite_buffers); 00275 00276 double dist_end = timestamp(), end; 00277 double io_wait_after_d = stats::get_instance()->get_io_wait_time(); 00278 00279 { 00280 // sort buckets 00281 unsigned_type write_buffers_multiple_bs = 2; 00282 unsigned_type max_bucket_size_bl = (m - write_buffers_multiple_bs * ndisks) / 2; // in number of blocks 00283 int64 max_bucket_size_rec = int64(max_bucket_size_bl) * block_type::size; // in number of records 00284 int64 max_bucket_size_act = 0; // actual max bucket size 00285 // establish output stream 00286 00287 for (i = 0; i < nbuckets; i++) 00288 { 00289 max_bucket_size_act = STXXL_MAX(bucket_sizes[i], max_bucket_size_act); 00290 if (bucket_sizes[i] > max_bucket_size_rec) 00291 { 00292 STXXL_ERRMSG("Bucket " << i << " is too large: " << bucket_sizes[i] << 00293 " records, maximum: " << max_bucket_size_rec); 00294 STXXL_ERRMSG("Recursion on buckets is not yet implemented, aborting."); 00295 abort(); 00296 } 00297 } 00298 // here we can increase write_buffers_multiple_b knowing max(bucket_sizes[i]) 00299 // ... and decrease max_bucket_size_bl 00300 const int_type max_bucket_size_act_bl = div_and_round_up(max_bucket_size_act, block_type::size); 00301 STXXL_VERBOSE_STABLE_KSORT("Reducing required number of required blocks per bucket from " << 00302 max_bucket_size_bl << " to " << max_bucket_size_act_bl); 00303 max_bucket_size_rec = max_bucket_size_act; 00304 max_bucket_size_bl = max_bucket_size_act_bl; 00305 const unsigned_type nwrite_buffers_bs = m - 2 * max_bucket_size_bl; 00306 STXXL_VERBOSE_STABLE_KSORT("Write buffers in bucket sorting phase: " << nwrite_buffers_bs); 00307 00308 typedef buf_ostream<block_type, typename ExtIterator_::bids_container_iterator> buf_ostream_type; 00309 buf_ostream_type out(first.bid(), nwrite_buffers_bs); 00310 00311 disk_queues::get_instance()->set_priority_op(disk_queue::READ); 00312 00313 if (first.block_offset()) 00314 { 00315 // has to skip part of the first block 00316 block_type * block = new block_type; 00317 request_ptr req; 00318 req = block->read(*first.bid()); 00319 req->wait(); 00320 00321 for (i = 0; i < first.block_offset(); i++) 00322 { 00323 out << block->elem[i]; 00324 } 00325 delete block; 00326 } 00327 block_type * blocks1 = new block_type[max_bucket_size_bl]; 00328 block_type * blocks2 = new block_type[max_bucket_size_bl]; 00329 request_ptr * reqs1 = new request_ptr[max_bucket_size_bl]; 00330 request_ptr * reqs2 = new request_ptr[max_bucket_size_bl]; 00331 type_key_ * refs1 = new type_key_[max_bucket_size_rec]; 00332 type_key_ * refs2 = new type_key_[max_bucket_size_rec]; 00333 00334 // submit reading first 2 buckets (Peter's scheme) 00335 unsigned_type nbucket_blocks = div_and_round_up(bucket_sizes[0], block_type::size); 00336 for (i = 0; i < nbucket_blocks; i++) 00337 reqs1[i] = blocks1[i].read(bucket_bids[0][i]); 00338 00339 00340 nbucket_blocks = div_and_round_up(bucket_sizes[1], block_type::size); 00341 for (i = 0; i < nbucket_blocks; i++) 00342 reqs2[i] = blocks2[i].read(bucket_bids[1][i]); 00343 00344 00345 key_type offset = 0; 00346 const unsigned log_k1 = 00347 (std::max)(static_cast<unsigned>(ceil(log2(double( 00348 max_bucket_size_rec * sizeof(type_key_) / STXXL_L2_SIZE)))), 1U); 00349 unsigned_type k1 = 1 << log_k1; 00350 int_type * bucket1 = new int_type[k1]; 00351 00352 const unsigned shift = sizeof(key_type) * 8 - lognbuckets; 00353 const unsigned shift1 = shift - log_k1; 00354 00355 STXXL_VERBOSE_STABLE_KSORT("Classifying " << nbuckets << " buckets, max size:" << max_bucket_size_rec << 00356 " block size:" << block_type::size << " log_k1:" << log_k1); 00357 00358 for (unsigned_type k = 0; k < nbuckets; k++) 00359 { 00360 nbucket_blocks = div_and_round_up(bucket_sizes[k], block_type::size); 00361 const unsigned log_k1_k = 00362 (std::max)(static_cast<unsigned>(ceil(log2( 00363 double(bucket_sizes[k] * sizeof(type_key_) / STXXL_L2_SIZE)))), 1U); 00364 assert(log_k1_k <= log_k1); 00365 k1 = 1 << log_k1_k; 00366 std::fill(bucket1, bucket1 + k1, 0); 00367 00368 STXXL_VERBOSE_STABLE_KSORT("Classifying bucket " << k << " size:" << bucket_sizes[k] << 00369 " blocks:" << nbucket_blocks << " log_k1:" << log_k1_k); 00370 // classify first nbucket_blocks-1 blocks, they are full 00371 type_key_ * ref_ptr = refs1; 00372 key_type offset1 = offset + (key_type(1) << key_type(shift)) * key_type(k); 00373 for (i = 0; i < nbucket_blocks - 1; i++) 00374 { 00375 reqs1[i]->wait(); 00376 stable_ksort_local::classify_block(blocks1[i].begin(), blocks1[i].end(), ref_ptr, bucket1, offset1, shift1 /*,k1*/); 00377 } 00378 // last block might be non-full 00379 const unsigned_type last_block_size = bucket_sizes[k] - int64(nbucket_blocks - 1) * block_type::size; 00380 reqs1[i]->wait(); 00381 00382 //STXXL_MSG("block_type::size: "<<block_type::size<<" last_block_size:"<<last_block_size); 00383 00384 classify_block(blocks1[i].begin(), blocks1[i].begin() + last_block_size, ref_ptr, bucket1, offset1, shift1); 00385 00386 exclusive_prefix_sum(bucket1, k1); 00387 classify(refs1, refs1 + bucket_sizes[k], refs2, bucket1, offset1, shift1); 00388 00389 type_key_ * c = refs2; 00390 type_key_ * d = refs1; 00391 for (i = 0; i < k1; i++) 00392 { 00393 type_key_ * cEnd = refs2 + bucket1[i]; 00394 type_key_ * dEnd = refs1 + bucket1[i]; 00395 00396 const unsigned log_k2 = static_cast<unsigned>(log2(double(bucket1[i]))) - 1; // adaptive bucket size 00397 const unsigned_type k2 = 1 << log_k2; 00398 int_type * bucket2 = new int_type[k2]; 00399 const unsigned shift2 = shift1 - log_k2; 00400 00401 // STXXL_MSG("Sorting bucket "<<k<<":"<<i); 00402 l1sort(c, cEnd, d, bucket2, k2, 00403 offset1 + (key_type(1) << key_type(shift1)) * key_type(i), 00404 shift2); 00405 00406 // write out all 00407 for (type_key_ * p = d; p < dEnd; p++) 00408 out << (*(p->ptr)); 00409 00410 00411 delete[] bucket2; 00412 c = cEnd; 00413 d = dEnd; 00414 } 00415 // submit next read 00416 const unsigned_type bucket2submit = k + 2; 00417 if (bucket2submit < nbuckets) 00418 { 00419 nbucket_blocks = div_and_round_up(bucket_sizes[bucket2submit], block_type::size); 00420 for (i = 0; i < nbucket_blocks; i++) 00421 reqs1[i] = blocks1[i].read(bucket_bids[bucket2submit][i]); 00422 } 00423 00424 std::swap(blocks1, blocks2); 00425 std::swap(reqs1, reqs2); 00426 } 00427 00428 delete[] bucket1; 00429 delete[] refs1; 00430 delete[] refs2; 00431 delete[] blocks1; 00432 delete[] blocks2; 00433 delete[] reqs1; 00434 delete[] reqs2; 00435 delete[] bucket_bids; 00436 delete[] bucket_sizes; 00437 00438 if (last.block_offset()) 00439 { 00440 // has to skip part of the first block 00441 block_type * block = new block_type; 00442 request_ptr req = block->read(*last.bid()); 00443 req->wait(); 00444 00445 for (i = last.block_offset(); i < block_type::size; i++) 00446 { 00447 out << block->elem[i]; 00448 } 00449 delete block; 00450 } 00451 00452 end = timestamp(); 00453 } 00454 00455 STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Distribution time: " << 00456 dist_end - begin << " s"); 00457 STXXL_VERBOSE("Time in I/O wait(ds): " << io_wait_after_d << " s"); 00458 STXXL_VERBOSE(*stats::get_instance()); 00459 UNUSED(begin + dist_end + io_wait_after_d); 00460 } 00461 00463 00464 __STXXL_END_NAMESPACE 00465 00466 #endif // !STXXL_STABLE_KSORT_HEADER