00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "sync_thread.h"
00024
00025 #include <blackboard/remote.h>
00026 #include <core/threading/mutex_locker.h>
00027 #include <utils/time/wait.h>
00028
00029 #include <cstring>
00030
00031 using namespace std;
00032 using namespace fawkes;
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045 BlackBoardSynchronizationThread::BlackBoardSynchronizationThread(std::string &bbsync_cfg_prefix,
00046 std::string &peer_cfg_prefix,
00047 std::string &peer)
00048 : Thread("", Thread::OPMODE_CONTINUOUS)
00049 {
00050 set_name("BBSyncThread[%s]", peer.c_str());
00051 set_prepfin_conc_loop(true);
00052
00053 __bbsync_cfg_prefix = bbsync_cfg_prefix;
00054 __peer_cfg_prefix = peer_cfg_prefix;
00055 __peer = peer;
00056
00057 __remote_bb = NULL;
00058 }
00059
00060
00061
00062 BlackBoardSynchronizationThread::~BlackBoardSynchronizationThread()
00063 {
00064 }
00065
00066 void
00067 BlackBoardSynchronizationThread::init()
00068 {
00069 logger->log_debug(name(), "Initializing");
00070 unsigned int check_interval = 0;
00071 try {
00072 __host = config->get_string((__peer_cfg_prefix + "host").c_str());
00073 __port = config->get_uint((__peer_cfg_prefix + "port").c_str());
00074
00075 check_interval = config->get_uint((__bbsync_cfg_prefix + "check_interval").c_str());
00076 } catch (Exception &e) {
00077 e.append("Host or port not specified for peer");
00078 throw;
00079 }
00080
00081 try {
00082 check_interval = config->get_uint((__peer_cfg_prefix + "check_interval").c_str());
00083 logger->log_debug(name(), "Peer check interval set, overriding default.");
00084 } catch (Exception &e) {
00085 logger->log_debug(name(), "No per-peer check interval set, using default");
00086 }
00087
00088 read_config_combos(__peer_cfg_prefix + "reading/", false);
00089 read_config_combos(__peer_cfg_prefix + "writing/", true);
00090
00091 for (ComboMap::iterator i = __combos.begin(); i != __combos.end(); ++i) {
00092 logger->log_debug(name(), "Combo: %s, %s (%s, R) -> %s (%s, W)", i->second.type.c_str(),
00093 i->second.reader_id.c_str(), i->second.remote_writer ? "local" : "remote",
00094 i->second.writer_id.c_str(), i->second.remote_writer ? "remote" : "local");
00095 }
00096
00097 __wsl_local = new SyncWriterInterfaceListener(this, logger, (__peer + "/local").c_str());
00098 __wsl_remote = new SyncWriterInterfaceListener(this, logger, (__peer + "/remote").c_str());
00099
00100 if (! check_connection()) {
00101 logger->log_warn(name(), "Remote peer not reachable, will keep trying");
00102 }
00103
00104 logger->log_debug(name(), "Checking for remote aliveness every %u ms", check_interval);
00105 __timewait = new TimeWait(clock, check_interval * 1000);
00106 }
00107
00108
00109 void
00110 BlackBoardSynchronizationThread::finalize()
00111 {
00112
00113 delete __timewait;
00114
00115 close_interfaces();
00116
00117 delete __wsl_local;
00118 delete __wsl_remote;
00119 delete __remote_bb;
00120 __remote_bb = NULL;
00121 }
00122
00123
00124 void
00125 BlackBoardSynchronizationThread::loop()
00126 {
00127 __timewait->mark_start();
00128 check_connection();
00129 __timewait->wait_systime();
00130 }
00131
00132
00133 bool
00134 BlackBoardSynchronizationThread::check_connection()
00135 {
00136 if (! __remote_bb || ! __remote_bb->is_alive()) {
00137 if (__remote_bb) {
00138 logger->log_warn(name(), "Lost connection via remote BB to %s (%s:%u), will try to re-establish",
00139 __peer.c_str(), __host.c_str(), __port);
00140 blackboard->unregister_listener(__wsl_local);
00141 __remote_bb->unregister_listener(__wsl_remote);
00142 close_interfaces();
00143 delete __remote_bb;
00144 __remote_bb = NULL;
00145 }
00146
00147 try {
00148 __remote_bb = new RemoteBlackBoard(__host.c_str(), __port);
00149 logger->log_info(name(), "Successfully connected via remote BB to %s (%s:%u)",
00150 __peer.c_str(), __host.c_str(), __port);
00151
00152 open_interfaces();
00153 blackboard->register_listener(__wsl_local, BlackBoard::BBIL_FLAG_WRITER);
00154 __remote_bb->register_listener(__wsl_remote, BlackBoard::BBIL_FLAG_WRITER);
00155 } catch (Exception &e) {
00156 e.print_trace();
00157 return false;
00158 }
00159 }
00160 return true;
00161 }
00162
00163 void
00164 BlackBoardSynchronizationThread::read_config_combos(std::string prefix, bool writing)
00165 {
00166 Configuration::ValueIterator *i = config->search(prefix.c_str());
00167 while (i->next()) {
00168 if (strcmp(i->type(), "string") != 0) {
00169 TypeMismatchException e("Only values of type string may occur in %s, "
00170 "but found value of type %s",
00171 prefix.c_str(), i->type());
00172 delete i;
00173 throw e;
00174 }
00175
00176 std::string varname = std::string(i->path()).substr(prefix.length());
00177 std::string uid = i->get_string();
00178 size_t sf;
00179
00180 if ((sf = uid.find("::")) == std::string::npos) {
00181 delete i;
00182 throw Exception("Interface UID '%s' at %s is not valid, missing double colon",
00183 uid.c_str(), i->path());
00184 }
00185
00186 std::string type = uid.substr(0, sf);
00187 std::string id = uid.substr(sf + 2);
00188 combo_t combo = { type, id, id, writing };
00189
00190 if ( (sf = id.find("=")) != std::string::npos) {
00191
00192 if ( writing ) {
00193 combo.writer_id = id.substr(0, sf);
00194 combo.reader_id = id.substr(sf + 1);
00195 } else {
00196 combo.reader_id = id.substr(0, sf);
00197 combo.writer_id = id.substr(sf + 1);
00198 }
00199 }
00200
00201 __combos[varname] = combo;
00202 }
00203 delete i;
00204 }
00205
00206
00207 void
00208 BlackBoardSynchronizationThread::open_interfaces()
00209 {
00210 logger->log_debug(name(), "Opening interfaces");
00211 MutexLocker lock(__interfaces.mutex());
00212
00213 ComboMap::iterator i;
00214 for (i = __combos.begin(); i != __combos.end(); ++i) {
00215 Interface *iface_reader = NULL, *iface_writer = NULL;
00216
00217 BlackBoard *writer_bb = i->second.remote_writer ? __remote_bb : blackboard;
00218 BlackBoard *reader_bb = i->second.remote_writer ? blackboard : __remote_bb;
00219
00220 try {
00221 logger->log_debug(name(), "Opening reading %s (%s:%s)",
00222 i->second.remote_writer ? "locally" : "remotely",
00223 i->second.type.c_str(), i->second.reader_id.c_str());
00224 iface_reader = reader_bb->open_for_reading(i->second.type.c_str(),
00225 i->second.reader_id.c_str());
00226
00227 if (iface_reader->has_writer()) {
00228 logger->log_debug(name(), "Opening writing on %s (%s:%s)",
00229 i->second.remote_writer ? "remotely" : "locally",
00230 i->second.type.c_str(), i->second.writer_id.c_str());
00231 iface_writer = writer_bb->open_for_writing(i->second.type.c_str(),
00232 i->second.writer_id.c_str());
00233 }
00234
00235 InterfaceInfo ii(&i->second, iface_writer, reader_bb, writer_bb);
00236 __interfaces[iface_reader] = ii;
00237
00238 } catch (Exception &e) {
00239 reader_bb->close(iface_reader);
00240 writer_bb->close(iface_writer);
00241 throw;
00242 }
00243
00244 SyncInterfaceListener *sync_listener = NULL;
00245 if (iface_writer) {
00246 logger->log_debug(name(), "Creating sync listener");
00247 sync_listener = new SyncInterfaceListener(logger, iface_reader, iface_writer,
00248 reader_bb, writer_bb);
00249 }
00250 __sync_listeners[iface_reader] = sync_listener;
00251
00252 if (i->second.remote_writer) {
00253 __wsl_local->add_interface(iface_reader);
00254 } else {
00255 __wsl_remote->add_interface(iface_reader);
00256 }
00257 }
00258 }
00259
00260
00261 void
00262 BlackBoardSynchronizationThread::close_interfaces()
00263 {
00264 SyncListenerMap::iterator s;
00265 for (s = __sync_listeners.begin(); s != __sync_listeners.end(); ++s) {
00266 if (s->second) {
00267 logger->log_debug(name(), "Closing sync listener %s", s->second->bbil_name());
00268 delete s->second;
00269 }
00270 }
00271 MutexLocker lock(__interfaces.mutex());
00272 InterfaceMap::iterator i;
00273 for (i = __interfaces.begin(); i != __interfaces.end(); ++i) {
00274 logger->log_debug(name(), "Closing %s reading interface %s",
00275 i->second.combo->remote_writer ? "local" : "remote",
00276 i->first->uid());
00277 if (i->second.combo->remote_writer) {
00278 __wsl_local->remove_interface(i->first);
00279 blackboard->close(i->first);
00280 } else {
00281 __wsl_remote->remove_interface(i->first);
00282 __remote_bb->close(i->first);
00283 }
00284 if (i->second.writer) {
00285 logger->log_debug(name(), "Closing %s writing interface %s",
00286 i->second.combo->remote_writer ? "remote" : "local",
00287 i->second.writer->uid());
00288 if (i->second.combo->remote_writer) {
00289 __remote_bb->close(i->second.writer);
00290 } else {
00291 blackboard->close(i->second.writer);
00292 }
00293 }
00294 }
00295 __interfaces.clear();
00296 __sync_listeners.clear();
00297 }
00298
00299
00300
00301
00302
00303
00304 void
00305 BlackBoardSynchronizationThread::writer_added(fawkes::Interface *interface) throw()
00306 {
00307 MutexLocker lock(__interfaces.mutex());
00308
00309 if (__interfaces[interface].writer) {
00310
00311 logger->log_warn(name(), "Writer added for %s, but relay exists already. Bug?", interface->uid());
00312 } else {
00313 logger->log_warn(name(), "Writer added for %s, opening relay writer", interface->uid());
00314
00315 Interface *iface = NULL;
00316 SyncInterfaceListener *sync_listener = NULL;
00317 InterfaceInfo &ii = __interfaces[interface];
00318 try {
00319 iface = ii.writer_bb->open_for_writing(ii.combo->type.c_str(),
00320 ii.combo->writer_id.c_str());
00321
00322 logger->log_debug(name(), "Creating sync listener for %s:%s-%s",
00323 ii.combo->type.c_str(), ii.combo->reader_id.c_str(),
00324 ii.combo->writer_id.c_str());
00325
00326 sync_listener = new SyncInterfaceListener(logger, interface, iface,
00327 ii.reader_bb, ii.writer_bb);
00328
00329 __sync_listeners[interface] = sync_listener;
00330 ii.writer = iface;
00331
00332 } catch (Exception &e) {
00333 delete sync_listener;
00334 ii.writer_bb->close(iface);
00335 logger->log_error(name(), "Failed to open writer for %s:%s-%s, sync broken",
00336 ii.combo->type.c_str(), ii.combo->reader_id.c_str(),
00337 ii.combo->writer_id.c_str());
00338 logger->log_error(name(), e);
00339 }
00340 }
00341 }
00342
00343
00344
00345
00346
00347
00348 void
00349 BlackBoardSynchronizationThread::writer_removed(fawkes::Interface *interface) throw()
00350 {
00351 MutexLocker lock(__interfaces.mutex());
00352
00353 if (! __interfaces[interface].writer) {
00354
00355 logger->log_warn(name(), "Writer removed for %s, but no relay exists. Bug?", interface->uid());
00356 } else {
00357 logger->log_warn(name(), "Writer removed for %s, closing relay writer", interface->uid());
00358
00359 InterfaceInfo &ii = __interfaces[interface];
00360 try {
00361 delete __sync_listeners[interface];
00362 __sync_listeners[interface] = NULL;
00363
00364 ii.writer_bb->close(ii.writer);
00365 ii.writer = NULL;
00366
00367 } catch (Exception &e) {
00368 logger->log_error(name(), "Failed to close writer for %s:%s-%s, sync broken",
00369 ii.combo->type.c_str(), ii.combo->reader_id.c_str(),
00370 ii.combo->writer_id.c_str());
00371 logger->log_error(name(), e);
00372 }
00373 }
00374 }