00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "log_thread.h"
00024 #include "file.h"
00025
00026 #include <blackboard/blackboard.h>
00027 #include <utils/logging/logger.h>
00028 #include <core/exceptions/system.h>
00029 #include <interfaces/SwitchInterface.h>
00030
00031 #include <memory>
00032 #include <cstring>
00033 #include <cstdlib>
00034 #include <cstdio>
00035 #include <cerrno>
00036 #include <fcntl.h>
00037 #ifdef __FreeBSD__
00038 # include <sys/endian.h>
00039 #else
00040 # include <endian.h>
00041 #endif
00042 #include <arpa/inet.h>
00043 #include <sys/stat.h>
00044 #include <sys/mman.h>
00045
00046 using namespace fawkes;
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075 BBLoggerThread::BBLoggerThread(const char *iface_uid,
00076 const char *logdir, bool buffering, bool flushing,
00077 const char *scenario, fawkes::Time *start_time)
00078 : Thread("BBLoggerThread", Thread::OPMODE_WAITFORWAKEUP),
00079 BlackBoardInterfaceListener("BBLoggerThread(%s)", iface_uid)
00080 {
00081 set_coalesce_wakeups(true);
00082 set_name("BBLoggerThread(%s)", iface_uid);
00083
00084 __buffering = buffering;
00085 __flushing = flushing;
00086 __uid = strdup(iface_uid);
00087 __logdir = strdup(logdir);
00088 __scenario = strdup(scenario);
00089 __start = new Time(start_time);
00090 __filename = NULL;
00091 __queue_mutex = new Mutex();
00092 __data_size = 0;
00093 __is_master = false;
00094 __enabled = true;
00095
00096 __now = NULL;
00097
00098
00099 Interface::parse_uid(__uid, &__type, &__id);
00100
00101 char date[21];
00102 Time now;
00103 struct tm *tmp = localtime(&(now.get_timeval()->tv_sec));
00104 strftime(date, 21, "%F-%H-%M-%S", tmp);
00105
00106 if (asprintf(&__filename, "%s/%s-%s-%s-%s.log", LOGDIR, __scenario,
00107 __type, __id, date) == -1) {
00108 throw OutOfMemoryException("Cannot generate log name");
00109 }
00110 }
00111
00112
00113
00114 BBLoggerThread::~BBLoggerThread()
00115 {
00116 free(__uid);
00117 free(__type);
00118 free(__id);
00119 free(__logdir);
00120 free(__scenario);
00121 free(__filename);
00122 delete __queue_mutex;
00123 delete __start;
00124 }
00125
00126
00127 void
00128 BBLoggerThread::init()
00129 {
00130 __queues[0].clear();
00131 __queues[1].clear();
00132 __act_queue = 0;
00133
00134 __queue_mutex = new Mutex();
00135 __data_size = 0;
00136
00137 __now = NULL;
00138 __num_data_items = 0;
00139 __session_start = 0;
00140
00141
00142
00143 mode_t m = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
00144 int fd = open(__filename, O_RDWR | O_CREAT | O_EXCL, m);
00145 if ( ! fd ) {
00146 throw CouldNotOpenFileException(__filename, errno, "Failed to open log 1");
00147 } else {
00148 __f_data = fdopen(fd, "w+");
00149 if ( ! __f_data ) {
00150 throw CouldNotOpenFileException(__filename, errno, "Failed to open log 2");
00151 }
00152 }
00153
00154 try {
00155 __iface = blackboard->open_for_reading(__type, __id);
00156 __data_size = __iface->datasize();
00157 } catch (Exception &e) {
00158 fclose(__f_data);
00159 throw;
00160 }
00161
00162 try {
00163 write_header();
00164 } catch (FileWriteException &e) {
00165 blackboard->close(__iface);
00166 fclose(__f_data);
00167 throw;
00168 }
00169
00170 __now = new Time(clock);
00171
00172 if (__is_master) {
00173 try {
00174 __switch_if = blackboard->open_for_writing<SwitchInterface>("BBLogger");
00175 __switch_if->set_enabled(__enabled);
00176 __switch_if->write();
00177 bbil_add_message_interface(__switch_if);
00178 } catch (Exception &e) {
00179 fclose(__f_data);
00180 throw;
00181 }
00182 }
00183
00184 bbil_add_data_interface(__iface);
00185 bbil_add_writer_interface(__iface);
00186
00187 blackboard->register_listener(this, BlackBoard::BBIL_FLAG_DATA |
00188 BlackBoard::BBIL_FLAG_WRITER |
00189 BlackBoard::BBIL_FLAG_MESSAGES);
00190
00191 logger->log_info(name(), "Logging %s to %s%s", __iface->uid(), __filename,
00192 __is_master ? " as master" : "");
00193 }
00194
00195
00196 void
00197 BBLoggerThread::finalize()
00198 {
00199 blackboard->unregister_listener(this);
00200 if (__is_master) {
00201 blackboard->close(__switch_if);
00202 }
00203 update_header();
00204 fclose(__f_data);
00205 for (unsigned int q = 0; q < 2; ++q) {
00206 while (!__queues[q].empty()) {
00207 void *t = __queues[q].front();
00208 free(t);
00209 __queues[q].pop();
00210 }
00211 }
00212 delete __now;
00213 __now = NULL;
00214 }
00215
00216
00217
00218
00219
00220
00221 const char *
00222 BBLoggerThread::get_filename() const
00223 {
00224 return __filename;
00225 }
00226
00227
00228
00229
00230
00231 void
00232 BBLoggerThread::set_enabled(bool enabled)
00233 {
00234 if (enabled && !__enabled) {
00235 logger->log_info(name(), "Logging enabled",
00236 (__num_data_items - __session_start));
00237 __session_start = __num_data_items;
00238 } else if (!enabled && __enabled) {
00239 logger->log_info(name(), "Logging disabled (wrote %u entries), flushing",
00240 (__num_data_items - __session_start));
00241 update_header();
00242 fflush(__f_data);
00243 }
00244
00245 __enabled = enabled;
00246 }
00247
00248
00249
00250
00251
00252
00253
00254
00255 void
00256 BBLoggerThread::set_threadlist(fawkes::ThreadList &thread_list)
00257 {
00258 __is_master = true;
00259 __threads = thread_list;
00260 }
00261
00262 void
00263 BBLoggerThread::write_header()
00264 {
00265 bblog_file_header header;
00266 memset(&header, 0, sizeof(header));
00267 header.file_magic = htonl(BBLOGGER_FILE_MAGIC);
00268 header.file_version = htonl(BBLOGGER_FILE_VERSION);
00269 #if __BYTE_ORDER == __BIG_ENDIAN
00270 header.endianess = BBLOG_BIG_ENDIAN;
00271 #else
00272 header.endianess = BBLOG_LITTLE_ENDIAN;
00273 #endif
00274 header.num_data_items = __num_data_items;
00275 strncpy(header.scenario, (const char *)__scenario, BBLOG_SCENARIO_SIZE);
00276 strncpy(header.interface_type, __iface->type(), BBLOG_INTERFACE_TYPE_SIZE);
00277 strncpy(header.interface_id, __iface->id(), BBLOG_INTERFACE_ID_SIZE);
00278 memcpy(header.interface_hash, __iface->hash(), BBLOG_INTERFACE_HASH_SIZE);
00279 header.data_size = __iface->datasize();
00280 long start_time_sec, start_time_usec;
00281 __start->get_timestamp(start_time_sec, start_time_usec);
00282 header.start_time_sec = start_time_sec;
00283 header.start_time_usec = start_time_usec;
00284 if (fwrite(&header, sizeof(header), 1, __f_data) != 1) {
00285 throw FileWriteException(__filename, "Failed to write header");
00286 }
00287 fflush(__f_data);
00288 }
00289
00290
00291 void
00292 BBLoggerThread::update_header()
00293 {
00294
00295 #if _POSIX_MAPPED_FILES
00296 void *h = mmap(NULL, sizeof(bblog_file_header), PROT_WRITE, MAP_SHARED,
00297 fileno(__f_data), 0);
00298 if (h == MAP_FAILED) {
00299 logger->log_warn(name(), "Failed to mmap log (%s), "
00300 "not updating number of data items",
00301 strerror(errno));
00302 } else {
00303 bblog_file_header *header = (bblog_file_header *)h;
00304 header->num_data_items = __num_data_items;
00305 munmap(h, sizeof(bblog_file_header));
00306 }
00307 #else
00308 logger->log_warn(name(), "Memory mapped files not available, "
00309 "not updating number of data items on close");
00310 #endif
00311 }
00312
00313 void
00314 BBLoggerThread::write_chunk(const void *chunk)
00315 {
00316 bblog_entry_header ehead;
00317 __now->stamp();
00318 Time d = *__now - *__start;
00319 long rel_time_sec, rel_time_usec;
00320 d.get_timestamp(rel_time_sec, rel_time_usec);
00321 ehead.rel_time_sec = rel_time_sec;
00322 ehead.rel_time_usec = rel_time_usec;
00323 if ( (fwrite(&ehead, sizeof(ehead), 1, __f_data) == 1) &&
00324 (fwrite(chunk, __data_size, 1, __f_data) == 1) ) {
00325 if (__flushing) fflush(__f_data);
00326 __num_data_items += 1;
00327 } else {
00328 logger->log_warn(name(), "Failed to write chunk");
00329 }
00330 }
00331
00332
00333 void
00334 BBLoggerThread::loop()
00335 {
00336 unsigned int write_queue = __act_queue;
00337 __queue_mutex->lock();
00338 __act_queue = 1 - __act_queue;
00339 __queue_mutex->unlock();
00340 LockQueue<void *> &queue = __queues[write_queue];
00341
00342 while (! queue.empty() ) {
00343 void *c = queue.front();
00344 write_chunk(c);
00345 free(c);
00346 queue.pop();
00347 }
00348 }
00349
00350 bool
00351 BBLoggerThread::bb_interface_message_received(Interface *interface,
00352 Message *message) throw()
00353 {
00354 SwitchInterface::EnableSwitchMessage *enm;
00355 SwitchInterface::DisableSwitchMessage *dism;
00356
00357 bool enabled = true;
00358 if ((enm = dynamic_cast<SwitchInterface::EnableSwitchMessage *>(message)) != NULL) {
00359 enabled = true;
00360 } else if ((dism = dynamic_cast<SwitchInterface::DisableSwitchMessage *>(message)) != NULL) {
00361 enabled = false;
00362 } else {
00363 logger->log_debug(name(), "Unhandled message type: %s via %s",
00364 message->type(), interface->uid());
00365 }
00366
00367 for (ThreadList::iterator i = __threads.begin(); i != __threads.end(); ++i) {
00368 BBLoggerThread *bblt = dynamic_cast<BBLoggerThread *>(*i);
00369 bblt->set_enabled(enabled);
00370 }
00371
00372 __switch_if->set_enabled(__enabled);
00373 __switch_if->write();
00374
00375 return false;
00376 }
00377
00378
00379 void
00380 BBLoggerThread::bb_interface_data_changed(Interface *interface) throw()
00381 {
00382 if (!__enabled) return;
00383
00384 try {
00385 __iface->read();
00386
00387 if ( __buffering ) {
00388 void *c = malloc(__iface->datasize());
00389 memcpy(c, __iface->datachunk(), __iface->datasize());
00390 __queue_mutex->lock();
00391 __queues[__act_queue].push_locked(c);
00392 __queue_mutex->unlock();
00393 wakeup();
00394 } else {
00395 __queue_mutex->lock();
00396 write_chunk(__iface->datachunk());
00397 __queue_mutex->unlock();
00398 }
00399
00400 } catch (Exception &e) {
00401 logger->log_error(name(), "Exception when data changed");
00402 logger->log_error(name(), e);
00403 }
00404 }
00405
00406 void
00407 BBLoggerThread::bb_interface_writer_added(Interface *interface,
00408 unsigned int instance_serial) throw()
00409 {
00410 __session_start = __num_data_items;
00411 }
00412
00413 void
00414 BBLoggerThread::bb_interface_writer_removed(Interface *interface,
00415 unsigned int instance_serial) throw()
00416 {
00417 logger->log_info(name(), "Writer removed (wrote %u entries), flushing",
00418 (__num_data_items - __session_start));
00419 update_header();
00420 fflush(__f_data);
00421 }