Stxxl  1.2.1
sort_stream.h
1 /***************************************************************************
2  * include/stxxl/bits/stream/sort_stream.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002-2005 Roman Dementiev <dementiev@mpi-sb.mpg.de>
7  * Copyright (C) 2006 Johannes Singler <singler@ira.uka.de>
8  *
9  * Distributed under the Boost Software License, Version 1.0.
10  * (See accompanying file LICENSE_1_0.txt or copy at
11  * http://www.boost.org/LICENSE_1_0.txt)
12  **************************************************************************/
13 
14 #ifndef STXXL_SORT_STREAM_HEADER
15 #define STXXL_SORT_STREAM_HEADER
16 
17 #ifdef STXXL_BOOST_CONFIG
18  #include <boost/config.hpp>
19 #endif
20 
21 #include <stxxl/bits/stream/stream.h>
22 #include <stxxl/sort>
23 
24 
25 __STXXL_BEGIN_NAMESPACE
26 
27 namespace stream
28 {
31 
32  template <class ValueType, class TriggerEntryType>
33  struct sorted_runs
34  {
35  typedef TriggerEntryType trigger_entry_type;
36  typedef ValueType value_type;
37  typedef typename trigger_entry_type::bid_type bid_type;
38  typedef stxxl::int64 size_type;
39  typedef std::vector<trigger_entry_type> run_type;
40  typedef typed_block<bid_type::size, value_type> block_type;
41  size_type elements;
42  std::vector<run_type> runs;
43  std::vector<unsigned_type> runs_sizes;
44 
45  // Optimization:
46  // if the input is small such that its total size is
47  // less than B (block_type::size)
48  // then input is sorted internally
49  // and kept in the array "small"
50  std::vector<ValueType> small_;
51 
52  sorted_runs() : elements(0) { }
53 
58  void deallocate_blocks()
59  {
60  block_manager * bm = block_manager::get_instance();
61  for (unsigned_type i = 0; i < runs.size(); ++i)
62  bm->delete_blocks(
63  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i].begin()),
64  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i].end()));
65 
66 
67  runs.clear();
68  }
69  };
70 
78  template <
79  class Input_,
80  class Cmp_,
81  unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
82  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
83  class runs_creator : private noncopyable
84  {
85  Input_ & input;
86  Cmp_ cmp;
87 
88  public:
89  typedef Cmp_ cmp_type;
90  typedef typename Input_::value_type value_type;
91  typedef BID<BlockSize_> bid_type;
93  typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
94  typedef sorted_runs<value_type, trigger_entry_type> sorted_runs_type;
95 
96  private:
97  typedef typename sorted_runs_type::run_type run_type;
98  sorted_runs_type result_; // stores the result (sorted runs)
99  unsigned_type m_; // memory for internal use in blocks
100  bool result_computed; // true result is already computed (used in 'result' method)
101 
102  void compute_result();
103  void sort_run(block_type * run, unsigned_type elements)
104  {
105  if (block_type::has_filler)
106  std::sort(
107 #if 1
108  ArrayOfSequencesIterator<
109  block_type, typename block_type::value_type, block_type::size
110  >(run, 0),
111  ArrayOfSequencesIterator<
112  block_type, typename block_type::value_type, block_type::size
113  >(run, elements),
114 #else
115  TwoToOneDimArrayRowAdaptor<
116  block_type, value_type, block_type::size
117  >(run, 0),
118  TwoToOneDimArrayRowAdaptor<
119  block_type, value_type, block_type::size
120  >(run, elements),
121 #endif
122  cmp);
123 
124  else
125  std::sort(run[0].elem, run[0].elem + elements, cmp);
126  }
127 
128  public:
133  runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) :
134  input(i), cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false)
135  {
136  assert(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use);
137  }
138 
142  const sorted_runs_type & result()
143  {
144  if (!result_computed)
145  {
146  compute_result();
147  result_computed = true;
148 #ifdef STXXL_PRINT_STAT_AFTER_RF
149  STXXL_MSG(*stats::get_instance());
150 #endif
151  }
152  return result_;
153  }
154  };
155 
156 
157  template <class Input_, class Cmp_, unsigned BlockSize_, class AllocStr_>
158  void runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>::compute_result()
159  {
160  unsigned_type i = 0;
161  unsigned_type m2 = m_ / 2;
162  const unsigned_type el_in_run = m2 * block_type::size; // # el in a run
163  STXXL_VERBOSE1("runs_creator::compute_result m2=" << m2);
164  unsigned_type pos = 0;
165 
166 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
167  block_type * Blocks1 = new block_type[m2 * 2];
168 #else
169 #if 0
170  block_type * Blocks1 = new block_type[1]; // allocate only one block first
171  // if needed reallocate
172  while (!input.empty() && pos != block_type::size)
173  {
174  Blocks1[pos / block_type::size][pos % block_type::size] = *input;
175  ++input;
176  ++pos;
177  }
178 #endif
179 
180  while (!input.empty() && pos != block_type::size)
181  {
182  result_.small_.push_back(*input);
183  ++input;
184  ++pos;
185  }
186 
187  block_type * Blocks1;
188 
189  if (pos == block_type::size)
190  { // enlarge/reallocate Blocks1 array
191  block_type * NewBlocks = new block_type[m2 * 2];
192  std::copy(result_.small_.begin(), result_.small_.end(), NewBlocks[0].begin());
193  result_.small_.clear();
194  //delete [] Blocks1;
195  Blocks1 = NewBlocks;
196  }
197  else
198  {
199  STXXL_VERBOSE1("runs_creator: Small input optimization, input length: " << pos);
200  result_.elements = pos;
201  std::sort(result_.small_.begin(), result_.small_.end(), cmp);
202  return;
203  }
204 #endif
205 
206  while (!input.empty() && pos != el_in_run)
207  {
208  Blocks1[pos / block_type::size][pos % block_type::size] = *input;
209  ++input;
210  ++pos;
211  }
212 
213  // sort first run
214  sort_run(Blocks1, pos);
215  result_.elements = pos;
216  if (pos < block_type::size && input.empty()) // small input, do not flush it on the disk(s)
217  {
218  STXXL_VERBOSE1("runs_creator: Small input optimization, input length: " << pos);
219  result_.small_.resize(pos);
220  std::copy(Blocks1[0].begin(), Blocks1[0].begin() + pos, result_.small_.begin());
221  delete[] Blocks1;
222  return;
223  }
224 
225 
226  block_type * Blocks2 = Blocks1 + m2;
227  block_manager * bm = block_manager::get_instance();
228  request_ptr * write_reqs = new request_ptr[m2];
229  run_type run;
230 
231 
232  unsigned_type cur_run_size = div_and_round_up(pos, block_type::size); // in blocks
233  run.resize(cur_run_size);
234  bm->new_blocks(AllocStr_(),
235  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
236  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
237  );
238 
239  disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
240 
241  result_.runs_sizes.push_back(pos);
242 
243  // fill the rest of the last block with max values
244  for ( ; pos != el_in_run; ++pos)
245  Blocks1[pos / block_type::size][pos % block_type::size] = cmp.max_value();
246 
247 
248  for (i = 0; i < cur_run_size; ++i)
249  {
250  run[i].value = Blocks1[i][0];
251  write_reqs[i] = Blocks1[i].write(run[i].bid);
252  //STXXL_MSG("BID: "<<run[i].bid<<" val: "<<run[i].value);
253  }
254  result_.runs.push_back(run); // #
255 
256  if (input.empty())
257  {
258  // return
259  wait_all(write_reqs, write_reqs + cur_run_size);
260  delete[] write_reqs;
261  delete[] Blocks1;
262  return;
263  }
264 
265  STXXL_VERBOSE1("Filling the second part of the allocated blocks");
266  pos = 0;
267  while (!input.empty() && pos != el_in_run)
268  {
269  Blocks2[pos / block_type::size][pos % block_type::size] = *input;
270  ++input;
271  ++pos;
272  }
273  result_.elements += pos;
274 
275  if (input.empty())
276  {
277  // (re)sort internally and return
278  pos += el_in_run;
279  sort_run(Blocks1, pos); // sort first an second run together
280  wait_all(write_reqs, write_reqs + cur_run_size);
281  bm->delete_blocks(trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
282  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end()));
283 
284  cur_run_size = div_and_round_up(pos, block_type::size);
285  run.resize(cur_run_size);
286  bm->new_blocks(AllocStr_(),
287  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
288  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
289  );
290 
291  result_.runs_sizes[0] = pos;
292  // fill the rest of the last block with max values
293  for ( ; pos != 2 * el_in_run; ++pos)
294  Blocks1[pos / block_type::size][pos % block_type::size] = cmp.max_value();
295 
296 
297  assert(cur_run_size > m2);
298 
299  for (i = 0; i < m2; ++i)
300  {
301  run[i].value = Blocks1[i][0];
302  write_reqs[i]->wait();
303  write_reqs[i] = Blocks1[i].write(run[i].bid);
304  }
305 
306  request_ptr * write_reqs1 = new request_ptr[cur_run_size - m2];
307 
308  for ( ; i < cur_run_size; ++i)
309  {
310  run[i].value = Blocks1[i][0];
311  write_reqs1[i - m2] = Blocks1[i].write(run[i].bid);
312  }
313 
314  result_.runs[0] = run;
315 
316  wait_all(write_reqs, write_reqs + m2);
317  delete[] write_reqs;
318  wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
319  delete[] write_reqs1;
320 
321  delete[] Blocks1;
322 
323  return;
324  }
325 
326  sort_run(Blocks2, pos);
327 
328  cur_run_size = div_and_round_up(pos, block_type::size); // in blocks
329  run.resize(cur_run_size);
330  bm->new_blocks(AllocStr_(),
331  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
332  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
333  );
334 
335  for (i = 0; i < cur_run_size; ++i)
336  {
337  run[i].value = Blocks2[i][0];
338  write_reqs[i]->wait();
339  write_reqs[i] = Blocks2[i].write(run[i].bid);
340  }
341  assert((pos % el_in_run) == 0);
342 
343  result_.runs.push_back(run);
344  result_.runs_sizes.push_back(pos);
345 
346  while (!input.empty())
347  {
348  pos = 0;
349  while (!input.empty() && pos != el_in_run)
350  {
351  Blocks1[pos / block_type::size][pos % block_type::size] = *input;
352  ++input;
353  ++pos;
354  }
355  result_.elements += pos;
356  sort_run(Blocks1, pos);
357  cur_run_size = div_and_round_up(pos, block_type::size); // in blocks
358  run.resize(cur_run_size);
359  bm->new_blocks(AllocStr_(),
360  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
361  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
362  );
363 
364  result_.runs_sizes.push_back(pos);
365 
366  // fill the rest of the last block with max values (occurs only on the last run)
367  for ( ; pos != el_in_run; ++pos)
368  Blocks1[pos / block_type::size][pos % block_type::size] = cmp.max_value();
369 
370 
371  for (i = 0; i < cur_run_size; ++i)
372  {
373  run[i].value = Blocks1[i][0];
374  write_reqs[i]->wait();
375  write_reqs[i] = Blocks1[i].write(run[i].bid);
376  }
377  result_.runs.push_back(run); // #
378 
379  std::swap(Blocks1, Blocks2);
380  }
381 
382  wait_all(write_reqs, write_reqs + m2);
383  delete[] write_reqs;
384  delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
385  }
386 
387 
395  template <class ValueType_>
396  struct use_push
397  {
398  typedef ValueType_ value_type;
399  };
400 
412  template <
413  class ValueType_,
414  class Cmp_,
415  unsigned BlockSize_,
416  class AllocStr_>
418  use_push<ValueType_>,
419  Cmp_,
420  BlockSize_,
421  AllocStr_>
422  {
423  Cmp_ cmp;
424 
425  public:
426  typedef Cmp_ cmp_type;
427  typedef ValueType_ value_type;
428  typedef BID<BlockSize_> bid_type;
430  typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
431  typedef sorted_runs<value_type, trigger_entry_type> sorted_runs_type;
432 
433  private:
434  typedef typename sorted_runs_type::run_type run_type;
435  sorted_runs_type result_; // stores the result (sorted runs)
436  unsigned_type m_; // memory for internal use in blocks
437 
438  bool output_requested; // true after the result() method was called for the first time
439 
440  const unsigned_type m2;
441  const unsigned_type el_in_run;
442  unsigned_type cur_el;
443  block_type * Blocks1;
444  block_type * Blocks2;
445  request_ptr * write_reqs;
446  run_type run;
447 
448  void sort_run(block_type * run, unsigned_type elements)
449  {
450  if (block_type::has_filler)
451  std::sort(
452 #if 1
453  ArrayOfSequencesIterator<
454  block_type, typename block_type::value_type, block_type::size
455  >(run, 0),
456  ArrayOfSequencesIterator<
457  block_type, typename block_type::value_type, block_type::size
458  >(run, elements),
459 #else
460  TwoToOneDimArrayRowAdaptor<
461  block_type, value_type, block_type::size
462  >(run, 0),
463  TwoToOneDimArrayRowAdaptor<
464  block_type, value_type, block_type::size
465  >(run, elements),
466 #endif
467  cmp);
468 
469  else
470  std::sort(run[0].elem, run[0].elem + elements, cmp);
471  }
472  void finish_result()
473  {
474  if (cur_el == 0)
475  return;
476 
477 
478  unsigned_type cur_el_reg = cur_el;
479  sort_run(Blocks1, cur_el_reg);
480  result_.elements += cur_el_reg;
481  if (cur_el_reg < unsigned_type(block_type::size) &&
482  unsigned_type(result_.elements) == cur_el_reg) // small input, do not flush it on the disk(s)
483  {
484  STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << cur_el_reg);
485  result_.small_.resize(cur_el_reg);
486  std::copy(Blocks1[0].begin(), Blocks1[0].begin() + cur_el_reg, result_.small_.begin());
487  return;
488  }
489 
490  const unsigned_type cur_run_size = div_and_round_up(cur_el_reg, block_type::size); // in blocks
491  run.resize(cur_run_size);
492  block_manager * bm = block_manager::get_instance();
493  bm->new_blocks(AllocStr_(),
494  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
495  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
496  );
497 
498  disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
499 
500  result_.runs_sizes.push_back(cur_el_reg);
501 
502  // fill the rest of the last block with max values
503  for ( ; cur_el_reg != el_in_run; ++cur_el_reg)
504  Blocks1[cur_el_reg / block_type::size][cur_el_reg % block_type::size] = cmp.max_value();
505 
506 
507  unsigned_type i = 0;
508  for ( ; i < cur_run_size; ++i)
509  {
510  run[i].value = Blocks1[i][0];
511  if (write_reqs[i].get())
512  write_reqs[i]->wait();
513 
514  write_reqs[i] = Blocks1[i].write(run[i].bid);
515  }
516  result_.runs.push_back(run);
517 
518  for (i = 0; i < m2; ++i)
519  if (write_reqs[i].get())
520  write_reqs[i]->wait();
521  }
522  void cleanup()
523  {
524  delete[] write_reqs;
525  delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
526  write_reqs = NULL;
527  Blocks1 = Blocks2 = NULL;
528  }
529 
530  public:
534  runs_creator(Cmp_ c, unsigned_type memory_to_use) :
535  cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), output_requested(false),
536  m2(m_ / 2),
537  el_in_run(m2 * block_type::size),
538  cur_el(0),
539  Blocks1(new block_type[m2 * 2]),
540  Blocks2(Blocks1 + m2),
541  write_reqs(new request_ptr[m2])
542  {
543  assert(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use);
544  }
545 
546  ~runs_creator()
547  {
548  if (!output_requested)
549  cleanup();
550  }
551 
554  void push(const value_type & val)
555  {
556  assert(output_requested == false);
557  unsigned_type cur_el_reg = cur_el;
558  if (cur_el_reg < el_in_run)
559  {
560  Blocks1[cur_el_reg / block_type::size][cur_el_reg % block_type::size] = val;
561  ++cur_el;
562  return;
563  }
564 
565  assert(el_in_run == cur_el);
566  cur_el = 0;
567 
568  //sort and store Blocks1
569  sort_run(Blocks1, el_in_run);
570  result_.elements += el_in_run;
571 
572  const unsigned_type cur_run_size = div_and_round_up(el_in_run, block_type::size); // in blocks
573  run.resize(cur_run_size);
574  block_manager * bm = block_manager::get_instance();
575  bm->new_blocks(AllocStr_(),
576  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
577  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
578  );
579 
580  disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
581 
582  result_.runs_sizes.push_back(el_in_run);
583 
584  for (unsigned_type i = 0; i < cur_run_size; ++i)
585  {
586  run[i].value = Blocks1[i][0];
587  if (write_reqs[i].get())
588  write_reqs[i]->wait();
589 
590  write_reqs[i] = Blocks1[i].write(run[i].bid);
591  }
592 
593  result_.runs.push_back(run);
594 
595  std::swap(Blocks1, Blocks2);
596 
597  push(val);
598  }
599 
603  const sorted_runs_type & result()
604  {
605  if (!output_requested)
606  {
607  finish_result();
608  output_requested = true;
609  cleanup();
610 #ifdef STXXL_PRINT_STAT_AFTER_RF
611  STXXL_MSG(*stats::get_instance());
612 #endif
613  }
614  return result_;
615  }
616  };
617 
618 
625  template <class ValueType_>
627  {
628  typedef ValueType_ value_type;
629  };
630 
642  template <
643  class ValueType_,
644  class Cmp_,
645  unsigned BlockSize_,
646  class AllocStr_>
648  from_sorted_sequences<ValueType_>,
649  Cmp_,
650  BlockSize_,
651  AllocStr_>
652  {
653  typedef ValueType_ value_type;
654  typedef BID<BlockSize_> bid_type;
656  typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
657  typedef AllocStr_ alloc_strategy_type;
658  Cmp_ cmp;
659 
660  public:
661  typedef Cmp_ cmp_type;
662  typedef sorted_runs<value_type, trigger_entry_type> sorted_runs_type;
663 
664  private:
665  typedef typename sorted_runs_type::run_type run_type;
666  sorted_runs_type result_; // stores the result (sorted runs)
667  unsigned_type m_; // memory for internal use in blocks
669  block_type * cur_block;
670  unsigned_type offset;
671  unsigned_type iblock;
672  unsigned_type irun;
673  alloc_strategy_type alloc_strategy;
674 
675  public:
680  runs_creator(Cmp_ c, unsigned_type memory_to_use) :
681  cmp(c),
682  m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
683  writer(m_, m_ / 2),
684  cur_block(writer.get_free_block()),
685  offset(0),
686  iblock(0),
687  irun(0)
688  {
689  assert(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use);
690  }
691 
694  void push(const value_type & val)
695  {
696  assert(offset < block_type::size);
697 
698  (*cur_block)[offset] = val;
699  ++offset;
700 
701  if (offset == block_type::size)
702  {
703  // write current block
704 
705  block_manager * bm = block_manager::get_instance();
706  // allocate space for the block
707  result_.runs.resize(irun + 1);
708  result_.runs[irun].resize(iblock + 1);
709  bm->new_blocks(
710  offset_allocator<alloc_strategy_type>(iblock, alloc_strategy),
711  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
712  result_.runs[irun].begin() + iblock),
713  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
714  result_.runs[irun].end())
715  );
716 
717  result_.runs[irun][iblock].value = (*cur_block)[0]; // init trigger
718  cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
719  ++iblock;
720 
721  offset = 0;
722  }
723 
724  ++result_.elements;
725  }
726 
728  void finish()
729  {
730  if (offset == 0 && iblock == 0) // current run is empty
731  return;
732 
733 
734  result_.runs_sizes.resize(irun + 1);
735  result_.runs_sizes.back() = iblock * block_type::size + offset;
736 
737  if (offset) // if current block is partially filled
738  {
739  while (offset != block_type::size)
740  {
741  (*cur_block)[offset] = cmp.max_value();
742  ++offset;
743  }
744  offset = 0;
745 
746  block_manager * bm = block_manager::get_instance();
747  // allocate space for the block
748  result_.runs.resize(irun + 1);
749  result_.runs[irun].resize(iblock + 1);
750  bm->new_blocks(
751  offset_allocator<alloc_strategy_type>(iblock, alloc_strategy),
752  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
753  result_.runs[irun].begin() + iblock),
754  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
755  result_.runs[irun].end())
756  );
757 
758  result_.runs[irun][iblock].value = (*cur_block)[0]; // init trigger
759  cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
760  }
761  else
762  { }
763 
764  iblock = 0;
765  ++irun;
766  }
767 
771  const sorted_runs_type & result()
772  {
773  finish();
774  writer.flush();
775 
776  return result_;
777  }
778  };
779 
780 
785  template <class RunsType_, class Cmp_>
786  bool check_sorted_runs(RunsType_ & sruns, Cmp_ cmp)
787  {
788  typedef typename RunsType_::block_type block_type;
789  typedef typename block_type::value_type value_type;
790  STXXL_VERBOSE2("Elements: " << sruns.elements);
791  unsigned_type nruns = sruns.runs.size();
792  STXXL_VERBOSE2("Runs: " << nruns);
793  unsigned_type irun = 0;
794  for (irun = 0; irun < nruns; ++irun)
795  {
796  const unsigned_type nblocks = sruns.runs[irun].size();
797  block_type * blocks = new block_type[nblocks];
798  request_ptr * reqs = new request_ptr[nblocks];
799  for (unsigned_type j = 0; j < nblocks; ++j)
800  {
801  reqs[j] = blocks[j].read(sruns.runs[irun][j].bid);
802  }
803  wait_all(reqs, reqs + nblocks);
804  for (unsigned_type j = 0; j < nblocks; ++j)
805  {
806  if (cmp(blocks[j][0], sruns.runs[irun][j].value) ||
807  cmp(sruns.runs[irun][j].value, blocks[j][0]))
808  {
809  STXXL_ERRMSG("check_sorted_runs wrong trigger in the run");
810  return false;
811  }
812  }
813  if (!stxxl::is_sorted(
814 #if 1
815  ArrayOfSequencesIterator<
816  block_type, typename block_type::value_type, block_type::size
817  >(blocks, 0),
818  ArrayOfSequencesIterator<
819  block_type, typename block_type::value_type, block_type::size
820  >(blocks, sruns.runs_sizes[irun]),
821 #else
822  TwoToOneDimArrayRowAdaptor<
823  block_type, value_type, block_type::size
824  >(blocks, 0),
825  TwoToOneDimArrayRowAdaptor<
826  block_type, value_type, block_type::size
827  >(blocks,
828  //nblocks*block_type::size
829  //(irun<nruns-1)?(nblocks*block_type::size): (sruns.elements%(nblocks*block_type::size))
830  sruns.runs_sizes[irun]
831  ),
832 #endif
833  cmp))
834  {
835  STXXL_ERRMSG("check_sorted_runs wrong order in the run");
836  return false;
837  }
838 
839  delete[] reqs;
840  delete[] blocks;
841  }
842 
843  STXXL_MSG("Checking runs finished successfully");
844 
845  return true;
846  }
847 
848 
856  template <class RunsType_,
857  class Cmp_,
858  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
859  class runs_merger : private noncopyable
860  {
861  typedef RunsType_ sorted_runs_type;
862  typedef AllocStr_ alloc_strategy;
863  typedef typename sorted_runs_type::size_type size_type;
864  typedef Cmp_ value_cmp;
865  typedef typename sorted_runs_type::run_type run_type;
866  typedef typename sorted_runs_type::block_type block_type;
867  typedef typename block_type::bid_type bid_type;
869  typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
870  typedef sort_local::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
871  typedef loser_tree<run_cursor_type, run_cursor2_cmp_type, block_type::size> loser_tree_type;
872 
873  typedef stxxl::int64 diff_type;
874  typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
875  typedef typename std::vector<sequence>::size_type seqs_size_type;
876  std::vector<sequence> * seqs;
877  std::vector<block_type *> * buffers;
878 
879 
880  sorted_runs_type sruns;
881  unsigned_type m_; // blocks to use - 1
882  value_cmp cmp;
883  size_type elements_remaining;
884  unsigned_type buffer_pos;
885  block_type * current_block;
886  run_type consume_seq;
887  prefetcher_type * prefetcher;
888  loser_tree_type * losers;
889  int_type * prefetch_seq;
890  unsigned_type nruns;
891 #ifdef STXXL_CHECK_ORDER_IN_SORTS
892  typename block_type::value_type last_element;
893 #endif
894 
895  void merge_recursively();
896 
897  void deallocate_prefetcher()
898  {
899  if (prefetcher)
900  {
901  delete losers;
902  delete seqs;
903  delete buffers;
904  delete prefetcher;
905  delete[] prefetch_seq;
906  prefetcher = NULL;
907  }
908  // free blocks in runs , (or the user should do it?)
909  sruns.deallocate_blocks();
910  }
911 
912  void initialize_current_block()
913  {
914  if (do_parallel_merge())
915  {
916 #if STXXL_PARALLEL_MULTIWAY_MERGE
917 // begin of STL-style merging
918  seqs = new std::vector<sequence>(nruns);
919  buffers = new std::vector<block_type *>(nruns);
920 
921  for (unsigned_type i = 0; i < nruns; i++) //initialize sequences
922  {
923  (*buffers)[i] = prefetcher->pull_block(); //get first block of each run
924  (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end()); //this memory location stays the same, only the data is exchanged
925  }
926 
927 // end of STL-style merging
928 #else
929  assert(false);
930 #endif
931  }
932  else
933  {
934 // begin of native merging procedure
935 
936  losers = new loser_tree_type(prefetcher, nruns, run_cursor2_cmp_type(cmp));
937 
938 // end of native merging procedure
939  }
940  }
941 
942  void fill_current_block()
943  {
944  if (do_parallel_merge())
945  {
946 #if STXXL_PARALLEL_MULTIWAY_MERGE
947 // begin of STL-style merging
948  diff_type rest = block_type::size; // elements still to merge for this output block
949 
950  do // while rest > 0 and still elements available
951  {
952  value_type * min_last_element = NULL; // no element found yet
953  diff_type total_size = 0;
954 
955  for (seqs_size_type i = 0; i < (*seqs).size(); i++)
956  {
957  if ((*seqs)[i].first == (*seqs)[i].second)
958  continue; //run empty
959 
960  if (min_last_element == NULL)
961  min_last_element = &(*((*seqs)[i].second - 1));
962  else
963  min_last_element = cmp(*min_last_element, *((*seqs)[i].second - 1)) ? min_last_element : &(*((*seqs)[i].second - 1));
964 
965  total_size += (*seqs)[i].second - (*seqs)[i].first;
966  STXXL_VERBOSE1("last " << *((*seqs)[i].second - 1) << " block size " << ((*seqs)[i].second - (*seqs)[i].first));
967  }
968 
969  assert(min_last_element != NULL); //there must be some element
970 
971  STXXL_VERBOSE1("min_last_element " << min_last_element << " total size " << total_size + (block_type::size - rest));
972 
973  diff_type less_equal_than_min_last = 0;
974  //locate this element in all sequences
975  for (seqs_size_type i = 0; i < (*seqs).size(); i++)
976  {
977  if ((*seqs)[i].first == (*seqs)[i].second)
978  continue; //empty subsequence
979 
980  typename block_type::iterator position = std::upper_bound((*seqs)[i].first, (*seqs)[i].second, *min_last_element, cmp);
981  STXXL_VERBOSE1("greater equal than " << position - (*seqs)[i].first);
982  less_equal_than_min_last += position - (*seqs)[i].first;
983  }
984 
985  STXXL_VERBOSE1("finished loop");
986 
987  ptrdiff_t output_size = (std::min)(less_equal_than_min_last, rest); // at most rest elements
988 
989  STXXL_VERBOSE1("before merge" << output_size);
990 
991  stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), current_block->end() - rest, cmp, output_size);
992  //sequence iterators are progressed appropriately
993 
994  STXXL_VERBOSE1("after merge");
995 
996  rest -= output_size;
997 
998  STXXL_VERBOSE1("so long");
999 
1000  for (seqs_size_type i = 0; i < (*seqs).size(); i++)
1001  {
1002  if ((*seqs)[i].first == (*seqs)[i].second) // run empty
1003  {
1004  if (prefetcher->block_consumed((*buffers)[i]))
1005  {
1006  (*seqs)[i].first = (*buffers)[i]->begin(); // reset iterator
1007  (*seqs)[i].second = (*buffers)[i]->end();
1008  STXXL_VERBOSE1("block ran empty " << i);
1009  }
1010  else
1011  {
1012  (*seqs).erase((*seqs).begin() + i); // remove this sequence
1013  (*buffers).erase((*buffers).begin() + i);
1014  STXXL_VERBOSE1("seq removed " << i);
1015  }
1016  }
1017  }
1018  } while (rest > 0 && (*seqs).size() > 0);
1019 
1020  #ifdef STXXL_CHECK_ORDER_IN_SORTS
1021  if (!stxxl::is_sorted(current_block->begin(), current_block->end(), cmp))
1022  {
1023  for (value_type * i = current_block->begin() + 1; i != current_block->end(); i++)
1024  if (cmp(*i, *(i - 1)))
1025  {
1026  STXXL_VERBOSE1("Error at position " << (i - current_block->begin()));
1027  }
1028  assert(false);
1029  }
1030  #endif
1031 
1032 // end of STL-style merging
1033 #else
1034  assert(false);
1035 #endif
1036  }
1037  else
1038  {
1039 // begin of native merging procedure
1040 
1041  losers->multi_merge(current_block->elem);
1042 
1043 // end of native merging procedure
1044  }
1045  }
1046 
1047  public:
1049  typedef typename sorted_runs_type::value_type value_type;
1050 
1055  runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) :
1056  sruns(r), m_(memory_to_use / block_type::raw_size / sort_memory_usage_factor() /* - 1 */), cmp(c),
1057  elements_remaining(r.elements),
1058  current_block(NULL),
1059  prefetcher(NULL)
1060 #ifdef STXXL_CHECK_ORDER_IN_SORTS
1061  , last_element(cmp.min_value())
1062 #endif
1063  {
1064  if (empty())
1065  return;
1066 
1067 
1068  if (!sruns.small_.empty()) // we have a small input < B,
1069  // that is kept in the main memory
1070  {
1071  STXXL_VERBOSE1("runs_merger: small input optimization, input length: " << elements_remaining);
1072  assert(elements_remaining == size_type(sruns.small_.size()));
1073  current_block = new block_type;
1074  std::copy(sruns.small_.begin(), sruns.small_.end(), current_block->begin());
1075  current_value = current_block->elem[0];
1076  buffer_pos = 1;
1077 
1078  return;
1079  }
1080 
1081 #ifdef STXXL_CHECK_ORDER_IN_SORTS
1082  assert(check_sorted_runs(r, cmp));
1083 #endif
1084 
1085  current_block = new block_type;
1086 
1087  disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
1088 
1089  nruns = sruns.runs.size();
1090 
1091  if (m_ < nruns)
1092  {
1093  // can not merge runs in one pass
1094  // merge recursively:
1095  STXXL_ERRMSG("The implementation of sort requires more than one merge pass, therefore for a better");
1096  STXXL_ERRMSG("efficiency decrease block size of run storage (a parameter of the run_creator)");
1097  STXXL_ERRMSG("or increase the amount memory dedicated to the merger.");
1098  STXXL_ERRMSG("m = " << m_ << " nruns=" << nruns);
1099 
1100  // insufficient memory, can not merge at all
1101  if (m_ < 2) {
1102  STXXL_ERRMSG("The merger requires memory to store at least two blocks internally. Aborting.");
1103  abort();
1104  }
1105 
1106  merge_recursively();
1107 
1108  nruns = sruns.runs.size();
1109  }
1110 
1111  assert(nruns <= m_);
1112 
1113  unsigned_type i;
1114  /*
1115  const unsigned_type out_run_size =
1116  div_and_round_up(elements_remaining,size_type(block_type::size));
1117  */
1118  unsigned_type prefetch_seq_size = 0;
1119  for (i = 0; i < nruns; i++)
1120  {
1121  prefetch_seq_size += sruns.runs[i].size();
1122  }
1123 
1124  consume_seq.resize(prefetch_seq_size);
1125 
1126  prefetch_seq = new int_type[prefetch_seq_size];
1127 
1128  typename run_type::iterator copy_start = consume_seq.begin();
1129  for (i = 0; i < nruns; i++)
1130  {
1131  copy_start = std::copy(
1132  sruns.runs[i].begin(),
1133  sruns.runs[i].end(),
1134  copy_start);
1135  }
1136 
1137  std::stable_sort(consume_seq.begin(), consume_seq.end(),
1138  sort_local::trigger_entry_cmp<bid_type, value_type, value_cmp>(cmp));
1139 
1140  int_type disks_number = config::get_instance()->disks_number();
1141 
1142  const int_type n_prefetch_buffers = STXXL_MAX(2 * disks_number, (int_type(m_) - int_type(nruns)));
1143 
1144 
1145 #ifdef SORT_OPTIMAL_PREFETCHING
1146  // heuristic
1147  const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10;
1148 
1149  compute_prefetch_schedule(
1150  consume_seq,
1151  prefetch_seq,
1152  n_opt_prefetch_buffers,
1153  disks_number);
1154 #else
1155  for (i = 0; i < prefetch_seq_size; ++i)
1156  prefetch_seq[i] = i;
1157 
1158 #endif
1159 
1160 
1161  prefetcher = new prefetcher_type(
1162  consume_seq.begin(),
1163  consume_seq.end(),
1164  prefetch_seq,
1165  nruns + n_prefetch_buffers);
1166 
1167  losers = NULL;
1168  seqs = NULL;
1169  buffers = NULL;
1170 
1171  initialize_current_block();
1172  fill_current_block();
1173 
1174 
1175  current_value = current_block->elem[0];
1176  buffer_pos = 1;
1177 
1178  if (elements_remaining <= block_type::size)
1179  deallocate_prefetcher();
1180  }
1181 
1183  bool empty() const
1184  {
1185  return elements_remaining == 0;
1186  }
1188  runs_merger & operator ++ () // preincrement operator
1189  {
1190  assert(!empty());
1191 
1192  --elements_remaining;
1193 
1194  if (buffer_pos != block_type::size)
1195  {
1196  current_value = current_block->elem[buffer_pos];
1197  ++buffer_pos;
1198  }
1199  else
1200  {
1201  if (!empty())
1202  {
1203  fill_current_block();
1204 
1205 #ifdef STXXL_CHECK_ORDER_IN_SORTS
1206  assert(stxxl::is_sorted(current_block->elem, current_block->elem + current_block->size, cmp));
1207  assert(!cmp(current_block->elem[0], current_value));
1208 #endif
1209  current_value = current_block->elem[0];
1210  buffer_pos = 1;
1211  }
1212  if (elements_remaining <= block_type::size)
1213  deallocate_prefetcher();
1214  }
1215 
1216 
1217 #ifdef STXXL_CHECK_ORDER_IN_SORTS
1218  if (!empty())
1219  {
1220  assert(!cmp(current_value, last_element));
1221  last_element = current_value;
1222  }
1223 #endif
1224 
1225  return *this;
1226  }
1228  const value_type & operator * () const
1229  {
1230  assert(!empty());
1231  return current_value;
1232  }
1233 
1235  const value_type * operator -> () const
1236  {
1237  assert(!empty());
1238  return &current_value;
1239  }
1240 
1241 
1244  virtual ~runs_merger()
1245  {
1246  deallocate_prefetcher();
1247 
1248  if (current_block)
1249  delete current_block;
1250 
1251  // free blocks in runs , (or the user should do it?)
1252  sruns.deallocate_blocks();
1253  }
1254 
1255  private:
1256  // cache for the current value
1257  value_type current_value;
1258  };
1259 
1260 
1261  template <class RunsType_, class Cmp_, class AllocStr_>
1262  void runs_merger<RunsType_, Cmp_, AllocStr_>::merge_recursively()
1263  {
1264  block_manager * bm = block_manager::get_instance();
1265  unsigned_type ndisks = config::get_instance()->disks_number();
1266  unsigned_type nwrite_buffers = 2 * ndisks;
1267 
1268  unsigned_type nruns = sruns.runs.size();
1269  const unsigned_type merge_factor =
1270  static_cast<unsigned_type>(ceil(pow(nruns, 1. / ceil(log(double(nruns)) / log(double(m_))))));
1271  assert(merge_factor <= m_);
1272  while (nruns > m_)
1273  {
1274  unsigned_type new_nruns = div_and_round_up(nruns, merge_factor);
1275  STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
1276  " opt_merge_factor: " << merge_factor << " m:" << m_ << " new_nruns: " << new_nruns);
1277 
1278  sorted_runs_type new_runs;
1279  new_runs.runs.resize(new_nruns);
1280  new_runs.runs_sizes.resize(new_nruns);
1281  new_runs.elements = sruns.elements;
1282 
1283  unsigned_type runs_left = nruns;
1284  unsigned_type cur_out_run = 0;
1285  unsigned_type elements_in_new_run = 0;
1286  //unsigned_type blocks_in_new_run = 0;
1287 
1288 
1289  while (runs_left > 0)
1290  {
1291  int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
1292  //blocks_in_new_run = 0 ;
1293  elements_in_new_run = 0;
1294  for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
1295  {
1296  elements_in_new_run += sruns.runs_sizes[i];
1297  //blocks_in_new_run += sruns.runs[i].size();
1298  }
1299  const unsigned_type blocks_in_new_run1 = div_and_round_up(elements_in_new_run, block_type::size);
1300  //assert(blocks_in_new_run1 == blocks_in_new_run);
1301 
1302  new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
1303  // allocate run
1304  new_runs.runs[cur_out_run++].resize(blocks_in_new_run1);
1305  runs_left -= runs2merge;
1306  }
1307 
1308  // allocate blocks for the new runs
1309  for (unsigned_type i = 0; i < new_runs.runs.size(); ++i)
1310  bm->new_blocks(alloc_strategy(),
1311  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(new_runs.runs[i].begin()),
1312  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(new_runs.runs[i].end()));
1313 
1314 
1315  // merge all
1316  runs_left = nruns;
1317  cur_out_run = 0;
1318  size_type elements_left = sruns.elements;
1319 
1320  while (runs_left > 0)
1321  {
1322  unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor);
1323  STXXL_VERBOSE("Merging " << runs2merge << " runs");
1324 
1325  sorted_runs_type cur_runs;
1326  cur_runs.runs.resize(runs2merge);
1327  cur_runs.runs_sizes.resize(runs2merge);
1328 
1329  std::copy(sruns.runs.begin() + nruns - runs_left,
1330  sruns.runs.begin() + nruns - runs_left + runs2merge,
1331  cur_runs.runs.begin());
1332  std::copy(sruns.runs_sizes.begin() + nruns - runs_left,
1333  sruns.runs_sizes.begin() + nruns - runs_left + runs2merge,
1334  cur_runs.runs_sizes.begin());
1335 
1336  runs_left -= runs2merge;
1337  /*
1338  cur_runs.elements = (runs_left)?
1339  (new_runs.runs[cur_out_run].size()*block_type::size):
1340  (elements_left);
1341  */
1342  cur_runs.elements = new_runs.runs_sizes[cur_out_run];
1343  elements_left -= cur_runs.elements;
1344 
1345  if (runs2merge > 1)
1346  {
1347  runs_merger<RunsType_, Cmp_, AllocStr_> merger(cur_runs, cmp, m_ * block_type::raw_size * sort_memory_usage_factor());
1348 
1349  { // make sure everything is being destroyed in right time
1351  new_runs.runs[cur_out_run].begin(),
1352  nwrite_buffers);
1353 
1354  size_type cnt = 0;
1355  const size_type cnt_max = cur_runs.elements;
1356 
1357  while (cnt != cnt_max)
1358  {
1359  *out = *merger;
1360  if ((cnt % block_type::size) == 0) // have to write the trigger value
1361  new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger;
1362 
1363 
1364  ++cnt;
1365  ++out;
1366  ++merger;
1367  }
1368  assert(merger.empty());
1369 
1370  while (cnt % block_type::size)
1371  {
1372  *out = cmp.max_value();
1373  ++out;
1374  ++cnt;
1375  }
1376  }
1377  }
1378  else
1379  {
1380  bm->delete_blocks(
1381  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
1382  new_runs.runs.back().begin()),
1383  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
1384  new_runs.runs.back().end()));
1385 
1386  assert(cur_runs.runs.size() == 1);
1387 
1388  std::copy(cur_runs.runs.front().begin(),
1389  cur_runs.runs.front().end(),
1390  new_runs.runs.back().begin());
1391  new_runs.runs_sizes.back() = cur_runs.runs_sizes.back();
1392  }
1393 
1394  ++cur_out_run;
1395  }
1396  assert(elements_left == 0);
1397 
1398  nruns = new_nruns;
1399  sruns = new_runs;
1400  }
1401  }
1402 
1403 
1412  template <class Input_,
1413  class Cmp_,
1414  unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
1415  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
1416  class sort : public noncopyable
1417  {
1419  typedef typename runs_creator_type::sorted_runs_type sorted_runs_type;
1421 
1422  runs_creator_type creator;
1423  runs_merger_type merger;
1424 
1425  public:
1427  typedef typename Input_::value_type value_type;
1428 
1433  sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use) :
1434  creator(in, c, memory_to_use),
1435  merger(creator.result(), c, memory_to_use)
1436  { }
1437 
1443  sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use_rc, unsigned_type memory_to_use_m) :
1444  creator(in, c, memory_to_use_rc),
1445  merger(creator.result(), c, memory_to_use_m)
1446  { }
1447 
1448 
1450  const value_type & operator * () const
1451  {
1452  assert(!empty());
1453  return *merger;
1454  }
1455 
1456  const value_type * operator -> () const
1457  {
1458  assert(!empty());
1459  return merger.operator -> ();
1460  }
1461 
1464  {
1465  ++merger;
1466  return *this;
1467  }
1468 
1470  bool empty() const
1471  {
1472  return merger.empty();
1473  }
1474  };
1475 
1481  template <
1482  class ValueType_,
1483  unsigned BlockSize_>
1485  {
1486  typedef ValueType_ value_type;
1487  typedef BID<BlockSize_> bid_type;
1488  typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
1489 
1490  public:
1491  typedef sorted_runs<value_type, trigger_entry_type> result;
1492  };
1493 
1495 }
1496 
1499 
1501 
1510 template <unsigned BlockSize,
1511  class RandomAccessIterator,
1512  class CmpType,
1513  class AllocStr>
1514 void sort(RandomAccessIterator begin,
1515  RandomAccessIterator end,
1516  CmpType cmp,
1517  unsigned_type MemSize,
1518  AllocStr AS)
1519 {
1520  UNUSED(AS);
1521 #ifdef BOOST_MSVC
1522  typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType;
1523 #else
1524  typedef __typeof__(stream::streamify(begin, end)) InputType;
1525 #endif
1526  InputType Input(begin, end);
1528  sorter_type Sort(Input, cmp, MemSize);
1529  stream::materialize(Sort, begin);
1530 }
1531 
1533 
1534 __STXXL_END_NAMESPACE
1535 
1536 #endif // !STXXL_SORT_STREAM_HEADER
1537 // vim: et:ts=4:sw=4