00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <blackboard/net/handler.h>
00025 #include <blackboard/net/messages.h>
00026 #include <blackboard/net/ilist_content.h>
00027 #include <blackboard/blackboard.h>
00028 #include <blackboard/exceptions.h>
00029 #include <blackboard/net/interface_listener.h>
00030
00031 #include <interface/interface.h>
00032 #include <interface/interface_info.h>
00033
00034 #include <utils/logging/liblogger.h>
00035 #include <netcomm/fawkes/component_ids.h>
00036 #include <netcomm/fawkes/hub.h>
00037
00038 #include <cstdlib>
00039 #include <cstring>
00040 #include <arpa/inet.h>
00041
00042 namespace fawkes {
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056 BlackBoardNetworkHandler::BlackBoardNetworkHandler(BlackBoard *blackboard,
00057 FawkesNetworkHub *hub)
00058 : Thread("BlackBoardNetworkHandler", Thread::OPMODE_WAITFORWAKEUP),
00059 FawkesNetworkHandler(FAWKES_CID_BLACKBOARD)
00060 {
00061 __bb = blackboard;
00062 __nhub = hub;
00063 __nhub->add_handler(this);
00064 }
00065
00066
00067
00068 BlackBoardNetworkHandler::~BlackBoardNetworkHandler()
00069 {
00070 __nhub->remove_handler(this);
00071 __inbound_queue.clear();
00072
00073 for (__lit = __listeners.begin(); __lit != __listeners.end(); ++__lit) {
00074 delete __lit->second;
00075 }
00076 for (__iit = __interfaces.begin(); __iit != __interfaces.end(); ++__iit) {
00077 __bb->close(__iit->second);
00078 }
00079 }
00080
00081
00082
00083 void
00084 BlackBoardNetworkHandler::loop()
00085 {
00086 while ( ! __inbound_queue.empty() ) {
00087 FawkesNetworkMessage *msg = __inbound_queue.front();
00088
00089
00090 unsigned int clid = msg->clid();
00091
00092 switch (msg->msgid()) {
00093 case MSG_BB_LIST_ALL:
00094 {
00095 BlackBoardInterfaceListContent *ilist = new BlackBoardInterfaceListContent();
00096 InterfaceInfoList *infl = __bb->list_all();
00097
00098 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
00099 ilist->append_interface(*i);
00100 }
00101
00102 try {
00103 __nhub->send(clid, FAWKES_CID_BLACKBOARD, MSG_BB_INTERFACE_LIST, ilist);
00104 } catch (Exception &e) {
00105 LibLogger::log_error("BlackBoardNetworkHandler", "Failed to sent interface "
00106 "list to %u, exception follows", clid);
00107 LibLogger::log_error("BlackBoardNetworkHandler", e);
00108 }
00109 delete infl;
00110 }
00111 break;
00112
00113 case MSG_BB_OPEN_FOR_READING:
00114 case MSG_BB_OPEN_FOR_WRITING:
00115 {
00116 bb_iopen_msg_t *om = msg->msg<bb_iopen_msg_t>();
00117 LibLogger::log_debug("BlackBoardNetworkHandler", "Remote opens interface %s::%s",
00118 om->type, om->id);
00119 try {
00120 Interface *iface;
00121 if ( msg->msgid() == MSG_BB_OPEN_FOR_READING ) {
00122 iface = __bb->open_for_reading(om->type, om->id);
00123 } else {
00124 iface = __bb->open_for_writing(om->type, om->id);
00125 }
00126 if ( memcmp(iface->hash(), om->hash, __INTERFACE_HASH_SIZE) != 0 ) {
00127 LibLogger::log_warn("BlackBoardNetworkHandler", "Opening interface %s::%s failed, "
00128 "hash mismatch", om->type, om->id);
00129 send_openfailure(clid, BB_ERR_HASH_MISMATCH);
00130 } else {
00131 __interfaces[iface->serial()] = iface;
00132 __client_interfaces[clid].push_back(iface);
00133 __serial_to_clid[iface->serial()] = clid;
00134 __listeners[iface->serial()] = new BlackBoardNetHandlerInterfaceListener(__bb,
00135 iface,
00136 __nhub,
00137 clid);
00138 send_opensuccess(clid, iface);
00139 }
00140 } catch (BlackBoardInterfaceNotFoundException &nfe) {
00141 LibLogger::log_warn("BlackBoardNetworkHandler", "Opening interface %s::%s failed, "
00142 "interface class not found", om->type, om->id);
00143 send_openfailure(clid, BB_ERR_UNKNOWN_TYPE);
00144 } catch (BlackBoardWriterActiveException &wae) {
00145 LibLogger::log_warn("BlackBoardNetworkHandler", "Opening interface %s::%s failed, "
00146 "writer already exists", om->type, om->id);
00147 send_openfailure(clid, BB_ERR_WRITER_EXISTS);
00148 } catch (Exception &e) {
00149 LibLogger::log_warn("BlackBoardNetworkHandler", "Opening interface %s::%s failed",
00150 om->type, om->id);
00151 LibLogger::log_warn("BlackBoardNetworkHandler", e);
00152 send_openfailure(clid, BB_ERR_UNKNOWN_ERR);
00153 }
00154
00155
00156
00157
00158
00159 }
00160 break;
00161
00162 case MSG_BB_CLOSE:
00163 {
00164 bb_iserial_msg_t *sm = msg->msg<bb_iserial_msg_t>();
00165 unsigned int sm_serial = ntohl(sm->serial);
00166 if ( __interfaces.find(sm_serial) != __interfaces.end() ) {
00167 bool close = false;
00168 __client_interfaces.lock();
00169 if ( __client_interfaces.find(clid) != __client_interfaces.end()) {
00170
00171 for ( __ciit = __client_interfaces[clid].begin(); __ciit != __client_interfaces[clid].end(); ++__ciit) {
00172 if ( (*__ciit)->serial() == sm_serial ) {
00173 close = true;
00174 __serial_to_clid.erase(sm_serial);
00175 __client_interfaces[clid].erase(__ciit);
00176 if ( __client_interfaces[clid].empty() ) {
00177 __client_interfaces.erase(clid);
00178 }
00179 break;
00180 }
00181 }
00182 }
00183 __client_interfaces.unlock();
00184
00185 if ( close ) {
00186 __interfaces.lock();
00187 LibLogger::log_debug("BlackBoardNetworkHandler", "Remote %u closing interface %s",
00188 clid, __interfaces[sm_serial]->uid());
00189 delete __listeners[sm_serial];
00190 __listeners.erase(sm_serial);
00191 __bb->close(__interfaces[sm_serial]);
00192 __interfaces.erase(sm_serial);
00193 __interfaces.unlock();
00194 } else {
00195 LibLogger::log_warn("BlackBoardNetworkHandler", "Client %u tried to close "
00196 "interface with serial %u, but opened by other client",
00197 clid, sm_serial);
00198 }
00199 } else {
00200 LibLogger::log_warn("BlackBoardNetworkHandler", "Client %u tried to close "
00201 "interface with serial %u which has not been opened",
00202 clid, sm_serial);
00203 }
00204
00205
00206
00207
00208 }
00209 break;
00210
00211 case MSG_BB_DATA_CHANGED:
00212 {
00213 void *payload = msg->payload();
00214 bb_idata_msg_t *dm = (bb_idata_msg_t *)payload;
00215 unsigned int dm_serial = ntohl(dm->serial);
00216 if ( __interfaces.find(dm_serial) != __interfaces.end() ) {
00217
00218 if ( ntohl(dm->data_size) != __interfaces[dm_serial]->datasize() ) {
00219 LibLogger::log_error("BlackBoardNetworkHandler", "DATA_CHANGED: Data size mismatch, "
00220 "expected %zu, but got %zu, ignoring.",
00221 __interfaces[dm_serial]->datasize(), ntohl(dm->data_size));
00222 } else {
00223 __interfaces[dm_serial]->set_from_chunk((char *)payload + sizeof(bb_idata_msg_t));
00224 __interfaces[dm_serial]->write();
00225 }
00226 } else {
00227 LibLogger::log_error("BlackBoardNetworkHandler", "DATA_CHANGED: Interface with "
00228 "serial %u not found, ignoring.", dm_serial);
00229 }
00230 }
00231 break;
00232
00233 case MSG_BB_INTERFACE_MESSAGE:
00234 {
00235 void *payload = msg->payload();
00236 bb_imessage_msg_t *mm = (bb_imessage_msg_t *)payload;
00237 unsigned int mm_serial = ntohl(mm->serial);
00238 if ( __interfaces.find(mm_serial) != __interfaces.end() ) {
00239
00240 if ( ! __interfaces[mm_serial]->is_writer() ) {
00241 try {
00242 Message *ifm = __interfaces[mm_serial]->create_message(mm->msg_type);
00243 ifm->set_id(ntohl(mm->msgid));
00244 ifm->set_hops(ntohl(mm->hops));
00245
00246 if ( ntohl(mm->data_size) != ifm->datasize() ) {
00247 LibLogger::log_error("BlackBoardNetworkHandler", "MESSAGE: Data size mismatch, "
00248 "expected %zu, but got %zu, ignoring.",
00249 ifm->datasize(), ntohl(mm->data_size));
00250 } else {
00251 ifm->set_from_chunk((char *)payload + sizeof(bb_imessage_msg_t));
00252
00253 __interfaces[mm_serial]->msgq_enqueue(ifm);
00254
00255 }
00256 } catch (Exception &e) {
00257 LibLogger::log_error("BlackBoardNetworkHandler", "MESSAGE: Could not create "
00258 "interface message, ignoring.");
00259 LibLogger::log_error("BlackBoardNetworkHandler", e);
00260 }
00261 } else {
00262 LibLogger::log_error("BlackBoardNetworkHandler", "MESSAGE: Received message "
00263 "notification, but for a writing instance, ignoring.");
00264 }
00265 } else {
00266 LibLogger::log_error("BlackBoardNetworkHandler", "DATA_CHANGED: Interface with "
00267 "serial %u not found, ignoring.", mm_serial);
00268 }
00269 }
00270 break;
00271
00272 default:
00273 LibLogger::log_warn("BlackBoardNetworkHandler", "Unknown message of type %u "
00274 "received", msg->msgid());
00275 break;
00276 }
00277
00278 msg->unref();
00279 __inbound_queue.pop_locked();
00280 }
00281 }
00282
00283
00284 void
00285 BlackBoardNetworkHandler::send_opensuccess(unsigned int clid, Interface *interface)
00286 {
00287 void *payload = calloc(1, sizeof(bb_iopensucc_msg_t) + interface->datasize());
00288 bb_iopensucc_msg_t *osm = (bb_iopensucc_msg_t *)payload;
00289 osm->serial = htonl(interface->serial());
00290 osm->has_writer = interface->has_writer() ? 1 : 0;
00291 osm->num_readers = htonl(interface->num_readers());
00292 osm->data_size = htonl(interface->datasize());
00293
00294 if ( ! interface->is_writer() ) {
00295 interface->read();
00296 }
00297
00298 memcpy((char *)payload + sizeof(bb_iopensucc_msg_t),
00299 interface->datachunk(), interface->datasize());
00300
00301 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(clid, FAWKES_CID_BLACKBOARD,
00302 MSG_BB_OPEN_SUCCESS, payload,
00303 sizeof(bb_iopensucc_msg_t) +
00304 interface->datasize());
00305 try {
00306 __nhub->send(omsg);
00307 } catch (Exception &e) {
00308 LibLogger::log_error("BlackBoardNetworkHandler", "Failed to sent interface "
00309 "open success to %u, exception follows", clid);
00310 LibLogger::log_error("BlackBoardNetworkHandler", e);
00311 }
00312 }
00313
00314
00315 void
00316 BlackBoardNetworkHandler::send_openfailure(unsigned int clid, unsigned int errno)
00317 {
00318 bb_iopenfail_msg_t *ofm = (bb_iopenfail_msg_t *)malloc(sizeof(bb_iopenfail_msg_t));
00319 ofm->errno = htonl(errno);
00320
00321 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(clid, FAWKES_CID_BLACKBOARD,
00322 MSG_BB_OPEN_FAILURE, ofm,
00323 sizeof(bb_iopenfail_msg_t));
00324 try {
00325 __nhub->send(omsg);
00326 } catch (Exception &e) {
00327 LibLogger::log_error("BlackBoardNetworkHandler", "Failed to sent interface "
00328 "open failure to %u, exception follows", clid);
00329 LibLogger::log_error("BlackBoardNetworkHandler", e);
00330 }
00331 }
00332
00333
00334
00335
00336
00337
00338 void
00339 BlackBoardNetworkHandler::handle_network_message(FawkesNetworkMessage *msg)
00340 {
00341 msg->ref();
00342 __inbound_queue.push_locked(msg);
00343 wakeup();
00344 }
00345
00346
00347
00348
00349
00350 void
00351 BlackBoardNetworkHandler::client_connected(unsigned int clid)
00352 {
00353 }
00354
00355
00356
00357
00358
00359
00360 void
00361 BlackBoardNetworkHandler::client_disconnected(unsigned int clid)
00362 {
00363
00364 __client_interfaces.lock();
00365 if ( __client_interfaces.find(clid) != __client_interfaces.end() ) {
00366
00367 for ( __ciit = __client_interfaces[clid].begin(); __ciit != __client_interfaces[clid].end(); ++__ciit) {
00368 LibLogger::log_debug("BlackBoardNetworkHandler", "Closing interface %s::%s of remote "
00369 "%u (client disconnected)",
00370 (*__ciit)->type(), (*__ciit)->id(), clid);
00371
00372 unsigned int serial = (*__ciit)->serial();
00373 __serial_to_clid.erase(serial);
00374 __interfaces.erase_locked(serial);
00375 delete __listeners[serial];
00376 __listeners.erase(serial);
00377 __bb->close(*__ciit);
00378 }
00379 __client_interfaces.erase(clid);
00380 }
00381 __client_interfaces.unlock();
00382 }
00383
00384 }