00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <core/threading/thread_list.h>
00025 #include <core/threading/thread.h>
00026 #include <core/threading/mutex.h>
00027 #include <core/threading/barrier.h>
00028 #include <core/threading/interruptible_barrier.h>
00029 #include <core/exceptions/software.h>
00030 #include <core/exceptions/system.h>
00031
00032 #include <string>
00033 #include <cstring>
00034 #include <cstdlib>
00035 #include <cstdio>
00036 #include <unistd.h>
00037
00038 namespace fawkes {
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052 ThreadListSealedException::ThreadListSealedException(const char *operation)
00053 : Exception("ThreadList is sealed")
00054 {
00055 append("Operation '%s' is not allowed on a sealed thread list", operation);
00056 }
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071 ThreadListNotSealedException::ThreadListNotSealedException(const char *format, ...)
00072 : Exception()
00073 {
00074 va_list va;
00075 va_start(va, format);
00076 append_va(format, va);
00077 va_end(va);
00078 }
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093 ThreadList::ThreadList(const char *tlname)
00094 {
00095 __name = strdup(tlname);
00096 __sealed = false;
00097 __finalize_mutex = new Mutex();
00098 __wnw_barrier = NULL;
00099 clear();
00100 }
00101
00102
00103
00104
00105
00106
00107
00108
00109 ThreadList::ThreadList(bool maintain_barrier, const char *tlname)
00110 {
00111 __name = strdup(tlname);
00112 __sealed = false;
00113 __finalize_mutex = new Mutex();
00114 __wnw_barrier = NULL;
00115 clear();
00116 if ( maintain_barrier) update_barrier();
00117 }
00118
00119
00120
00121
00122
00123 ThreadList::ThreadList(const ThreadList &tl)
00124 : LockList<Thread *>(tl)
00125 {
00126 __name = strdup(tl.__name);
00127 __sealed = tl.__sealed;
00128 __finalize_mutex = new Mutex();
00129 __wnw_barrier = NULL;
00130 if ( tl.__wnw_barrier != NULL ) update_barrier();
00131 }
00132
00133
00134
00135 ThreadList::~ThreadList()
00136 {
00137 free(__name);
00138 delete __finalize_mutex;
00139 delete __wnw_barrier;
00140 }
00141
00142
00143
00144
00145
00146
00147 ThreadList &
00148 ThreadList::operator= (const ThreadList &tl)
00149 {
00150 LockList<Thread *>::operator=(tl);
00151 __name = strdup(tl.__name);
00152 __sealed = tl.__sealed;
00153 __finalize_mutex = new Mutex();
00154 __wnw_barrier = NULL;
00155 if ( tl.__wnw_barrier != NULL ) update_barrier();
00156
00157 return *this;
00158 }
00159
00160
00161
00162 void
00163 ThreadList::wakeup()
00164 {
00165 lock();
00166 for (iterator i = begin(); i != end(); ++i) {
00167 (*i)->wakeup();
00168 }
00169 unlock();
00170 }
00171
00172
00173
00174
00175
00176
00177 void
00178 ThreadList::wakeup_unlocked()
00179 {
00180 for (iterator i = begin(); i != end(); ++i) {
00181 (*i)->wakeup();
00182 }
00183 }
00184
00185
00186
00187
00188
00189 void
00190 ThreadList::wakeup(Barrier *barrier)
00191 {
00192 lock();
00193 for (iterator i = begin(); i != end(); ++i) {
00194 (*i)->wakeup(barrier);
00195 }
00196 unlock();
00197 }
00198
00199
00200
00201
00202
00203
00204
00205 void
00206 ThreadList::wakeup_unlocked(Barrier *barrier)
00207 {
00208 unsigned int count = 1;
00209 for (iterator i = begin(); i != end(); ++i) {
00210 if ( ! (*i)->flagged_bad() ) {
00211 (*i)->wakeup(barrier);
00212 ++count;
00213 }
00214 }
00215 if (count != barrier->count()) {
00216 throw Exception("ThreadList(%s)::wakeup(): barrier has count (%u) different "
00217 "from number of unflagged threads (%u)", __name, barrier->count(), count);
00218 }
00219 }
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231 void
00232 ThreadList::wakeup_and_wait(unsigned int timeout_sec, unsigned int timeout_nanosec)
00233 {
00234 if ( ! __wnw_barrier ) {
00235 throw NullPointerException("ThreadList::wakeup_and_wait() can only be called if "
00236 "barrier is maintained");
00237 }
00238 lock();
00239 try {
00240 wakeup_unlocked(__wnw_barrier);
00241 } catch (Exception &e) {
00242 unlock();
00243 throw;
00244 }
00245 if ( ! __wnw_barrier->wait(timeout_sec, timeout_nanosec) ) {
00246
00247 RefPtr<ThreadList> passed_threads = __wnw_barrier->passed_threads();
00248 ThreadList bad_threads;
00249 for (iterator i = begin(); i != end(); ++i) {
00250 bool ok = false;
00251 for (iterator j = passed_threads->begin(); j != passed_threads->end(); ++j) {
00252 if (*j == *i) {
00253 ok = true;
00254 break;
00255 }
00256 }
00257 if (! ok) {
00258 bad_threads.push_back(*i);
00259 (*i)->set_flag(Thread::FLAG_BAD);
00260 }
00261 }
00262
00263 __wnw_bad_barriers.push_back(make_pair(__wnw_barrier, bad_threads));
00264
00265 __wnw_barrier = NULL;
00266 update_barrier();
00267
00268
00269 std::string s;
00270 if ( bad_threads.size() > 1 ) {
00271 s = "Multiple threads did not finish in time, flagging as bad: ";
00272 for (iterator i = bad_threads.begin(); i != bad_threads.end(); ++i) {
00273 s += std::string((*i)->name()) + " ";
00274 }
00275 } else if (bad_threads.size() == 0) {
00276 s = "Timeout happened, but no bad threads recorded.";
00277 } else {
00278 throw Exception("Thread %s did not finish in time (max %f), flagging as bad",
00279 bad_threads.front()->name(),
00280 (float)timeout_sec + (float)timeout_nanosec / 1000000000.);
00281 }
00282 unlock();
00283 throw Exception("%s", s.c_str());
00284 }
00285 unlock();
00286 }
00287
00288
00289
00290
00291
00292
00293 void
00294 ThreadList::set_maintain_barrier(bool maintain_barrier)
00295 {
00296 lock();
00297 delete __wnw_barrier;
00298 __wnw_barrier = NULL;
00299 if ( maintain_barrier ) update_barrier();
00300 unlock();
00301 }
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311 void
00312 ThreadList::try_recover(std::list<std::string> &recovered_threads)
00313 {
00314 lock();
00315 bool changed = false;
00316 __wnw_bbit = __wnw_bad_barriers.begin();
00317 while (__wnw_bbit != __wnw_bad_barriers.end()) {
00318 iterator i = __wnw_bbit->second.begin();
00319 while (i != __wnw_bbit->second.end()) {
00320 if ( (*i)->waiting() ) {
00321
00322 recovered_threads.push_back((*i)->name());
00323
00324 (*i)->unset_flag(Thread::FLAG_BAD);
00325 i = __wnw_bbit->second.erase(i);
00326 changed = true;
00327 } else {
00328 ++i;
00329 }
00330 }
00331 if ( __wnw_bbit->second.empty() ) {
00332 delete __wnw_bbit->first;
00333 __wnw_bbit = __wnw_bad_barriers.erase(__wnw_bbit);
00334 } else {
00335 ++__wnw_bbit;
00336 }
00337 }
00338 if ( changed ) update_barrier();
00339 unlock();
00340 }
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353 void
00354 ThreadList::init(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
00355 {
00356 CannotInitializeThreadException cite;
00357 ThreadList initialized_threads;
00358 bool success = true;
00359 for (ThreadList::iterator i = begin(); i != end(); ++i) {
00360 try {
00361 initializer->init(*i);
00362 (*i)->init();
00363 initialized_threads.push_back(*i);
00364 } catch (CannotInitializeThreadException &e) {
00365 notify_of_failed_init();
00366 cite.append("Initializing thread '%s' in list '%s' failed", (*i)->name(), __name);
00367 cite.append(e);
00368 success = false;
00369 break;
00370 } catch (Exception &e) {
00371 notify_of_failed_init();
00372 cite.append("Could not initialize thread '%s'", (*i)->name());
00373 cite.append(e);
00374 success = false;
00375 break;
00376 } catch (...) {
00377 notify_of_failed_init();
00378 cite.append("Could not initialize thread '%s'", (*i)->name());
00379 cite.append("Unknown exception caught");
00380 success = false;
00381 break;
00382 }
00383 }
00384
00385 if ( ! success ) {
00386 initialized_threads.finalize(finalizer);
00387 throw cite;
00388 }
00389 }
00390
00391
00392
00393
00394
00395
00396
00397
00398 void
00399 ThreadList::start()
00400 {
00401 for (iterator i = begin(); i != end(); ++i) {
00402 (*i)->start();
00403 }
00404 }
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424 void
00425 ThreadList::cancel()
00426 {
00427 for (iterator i = begin(); i != end(); ++i) {
00428 (*i)->cancel();
00429 }
00430 }
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450 void
00451 ThreadList::join()
00452 {
00453 for (iterator i = begin(); i != end(); ++i) {
00454 (*i)->join();
00455 }
00456 }
00457
00458
00459
00460
00461
00462
00463
00464
00465 void
00466 ThreadList::stop()
00467 {
00468 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
00469 (*i)->cancel();
00470 (*i)->join();
00471
00472 usleep(5000);
00473 }
00474 }
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487 bool
00488 ThreadList::prepare_finalize(ThreadFinalizer *finalizer)
00489 {
00490 __finalize_mutex->lock();
00491 bool can_finalize = true;
00492 CannotFinalizeThreadException cfte("Cannot finalize one or more threads");
00493 bool threw_exception = false;
00494 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
00495
00496
00497
00498 try {
00499 if ( ! finalizer->prepare_finalize(*i) ) {
00500 can_finalize = false;
00501 }
00502 if ( ! (*i)->prepare_finalize() ) {
00503 can_finalize = false;
00504 }
00505 } catch (CannotFinalizeThreadException &e) {
00506 cfte.append("Thread '%s' throw an exception while preparing finalization of "
00507 "ThreadList '%s'", (*i)->name(), __name);
00508 threw_exception = true;
00509 }
00510 }
00511 __finalize_mutex->unlock();
00512 if ( threw_exception ) {
00513 throw cfte;
00514 }
00515 return can_finalize;
00516 }
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528 void
00529 ThreadList::finalize(ThreadFinalizer *finalizer)
00530 {
00531 bool error = false;
00532 Exception me("One or more threads failed to finalize");
00533 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
00534 try {
00535 finalizer->finalize(*i);
00536 } catch (CannotFinalizeThreadException &e) {
00537 error = true;
00538 me.append("Could not finalize thread '%s' in list '%s'", (*i)->name(), __name);
00539 me.append(e);
00540 }
00541 try {
00542 (*i)->finalize();
00543 } catch (CannotFinalizeThreadException &e) {
00544 error = true;
00545 me.append("AspectIniFin called Thread[%s]::finalize() which failed", (*i)->name());
00546 me.append(e);
00547 } catch (Exception &e) {
00548 me.append("AspectIniFin called Thread[%s]::finalize() which failed", (*i)->name());
00549 me.append(e);
00550 } catch (...) {
00551 me.append("Thread[%s]::finalize() threw unsupported exception", (*i)->name());
00552 }
00553 }
00554 if ( error ) {
00555 throw me;
00556 }
00557 }
00558
00559
00560
00561
00562 void
00563 ThreadList::cancel_finalize()
00564 {
00565 __finalize_mutex->lock();
00566 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
00567 (*i)->cancel_finalize();
00568 }
00569 __finalize_mutex->unlock();
00570 }
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580 void
00581 ThreadList::set_prepfin_hold(bool hold)
00582 {
00583 iterator i;
00584 try {
00585 for (i = begin(); i != end(); ++i) {
00586 (*i)->set_prepfin_hold(hold);
00587 }
00588 } catch (Exception &e) {
00589
00590
00591 for (iterator j = begin(); j != i; ++j) {
00592 (*j)->set_prepfin_hold(false);
00593 }
00594 throw;
00595 }
00596 }
00597
00598
00599
00600
00601
00602
00603
00604 void
00605 ThreadList::force_stop(ThreadFinalizer *finalizer)
00606 {
00607 try {
00608 prepare_finalize(finalizer);
00609 stop();
00610 finalize(finalizer);
00611 } catch (Exception &e) {
00612
00613 }
00614 }
00615
00616
00617
00618
00619
00620
00621
00622 const char *
00623 ThreadList::name()
00624 {
00625 return __name;
00626 }
00627
00628
00629
00630
00631
00632
00633 void
00634 ThreadList::set_name(const char *format, ...)
00635 {
00636 va_list va;
00637 va_start(va, format);
00638
00639 char *tmpname;
00640 if (vasprintf(&tmpname, format, va) != -1) {
00641 free(__name);
00642 __name = tmpname;
00643 } else {
00644 throw OutOfMemoryException("ThreadList::set_name(): vasprintf() failed");
00645 }
00646 va_end(va);
00647 }
00648
00649
00650
00651
00652
00653
00654
00655 bool
00656 ThreadList::sealed()
00657 {
00658 return __sealed;
00659 }
00660
00661
00662
00663 void
00664 ThreadList::seal()
00665 {
00666 __sealed = true;
00667 }
00668
00669
00670
00671
00672
00673
00674 void
00675 ThreadList::push_front(Thread *thread)
00676 {
00677 if ( __sealed ) throw ThreadListSealedException("push_front");
00678
00679 LockList<Thread *>::push_front(thread);
00680 if ( __wnw_barrier) update_barrier();
00681 }
00682
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692 void
00693 ThreadList::push_front_locked(Thread *thread)
00694 {
00695 if ( __sealed ) throw ThreadListSealedException("push_front_locked");
00696
00697 lock();
00698 LockList<Thread *>::push_front(thread);
00699 if ( __wnw_barrier) update_barrier();
00700 unlock();
00701 }
00702
00703
00704
00705
00706
00707
00708 void
00709 ThreadList::push_back(Thread *thread)
00710 {
00711 if ( __sealed ) throw ThreadListSealedException("push_back");
00712
00713 LockList<Thread *>::push_back(thread);
00714 if ( __wnw_barrier) update_barrier();
00715 }
00716
00717
00718
00719
00720
00721
00722
00723
00724
00725
00726 void
00727 ThreadList::push_back_locked(Thread *thread)
00728 {
00729 if ( __sealed ) throw ThreadListSealedException("push_back_locked");
00730
00731 lock();
00732 LockList<Thread *>::push_back(thread);
00733 if ( __wnw_barrier) update_barrier();
00734 unlock();
00735 }
00736
00737
00738
00739
00740
00741 void
00742 ThreadList::clear()
00743 {
00744 if ( __sealed ) throw ThreadListSealedException("clear");
00745
00746 LockList<Thread *>::clear();
00747 if ( __wnw_barrier) update_barrier();
00748 }
00749
00750
00751
00752
00753
00754 void
00755 ThreadList::remove(Thread *thread)
00756 {
00757 if ( __sealed ) throw ThreadListSealedException("remove_locked");
00758
00759 LockList<Thread *>::remove(thread);
00760 if ( __wnw_barrier) update_barrier();
00761 }
00762
00763
00764
00765
00766
00767 void
00768 ThreadList::remove_locked(Thread *thread)
00769 {
00770 if ( __sealed ) throw ThreadListSealedException("remove_locked");
00771
00772 lock();
00773 LockList<Thread *>::remove(thread);
00774 if ( __wnw_barrier) update_barrier();
00775 unlock();
00776 }
00777
00778
00779
00780 void
00781 ThreadList::pop_front()
00782 {
00783 if ( __sealed ) throw ThreadListSealedException("pop_front");
00784
00785 LockList<Thread *>::pop_front();
00786 if ( __wnw_barrier) update_barrier();
00787 }
00788
00789
00790
00791 void
00792 ThreadList::pop_back()
00793 {
00794 if ( __sealed ) throw ThreadListSealedException("pop_back");
00795
00796 LockList<Thread *>::pop_back();
00797 if ( __wnw_barrier) update_barrier();
00798 }
00799
00800
00801
00802
00803
00804
00805 ThreadList::iterator
00806 ThreadList::erase(iterator pos)
00807 {
00808 if ( __sealed ) throw ThreadListSealedException("erase");
00809
00810 ThreadList::iterator rv = LockList<Thread *>::erase(pos);
00811 if ( __wnw_barrier) update_barrier();
00812 return rv;
00813 }
00814
00815
00816
00817 void
00818 ThreadList::update_barrier()
00819 {
00820 unsigned int num = 1;
00821 for (iterator i = begin(); i != end(); ++i) {
00822 if (! (*i)->flagged_bad() ) ++num;
00823 }
00824 delete __wnw_barrier;
00825 __wnw_barrier = new InterruptibleBarrier(num);
00826 }
00827
00828
00829
00830 void
00831 ThreadList::notify_of_failed_init()
00832 {
00833 for (ThreadList::iterator i = begin(); i != end(); ++i) {
00834 (*i)->notify_of_failed_init();
00835 }
00836 }
00837
00838
00839 }