00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include <fcntl.h>
00022 #include <sys/poll.h>
00023
00024 #include "opensync.h"
00025 #include "opensync_internals.h"
00026
00027 #include <sys/time.h>
00028 #include <signal.h>
00029
00030 typedef struct OSyncPendingMessage {
00031 long long int id1;
00032 int id2;
00034 OSyncMessageHandler callback;
00036 gpointer user_data;
00037 } OSyncPendingMessage;
00038
00046
00047 static
00048 gboolean _incoming_prepare(GSource *source, gint *timeout_)
00049 {
00050 *timeout_ = 1;
00051 return FALSE;
00052 }
00053
00054 static
00055 gboolean _incoming_check(GSource *source)
00056 {
00057 OSyncQueue *queue = *((OSyncQueue **)(source + 1));
00058 if (g_async_queue_length(queue->incoming) > 0)
00059 return TRUE;
00060
00061 return FALSE;
00062 }
00063
00064
00065
00066 static
00067 gboolean _incoming_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
00068 {
00069 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, user_data);
00070 OSyncQueue *queue = user_data;
00071
00072 OSyncMessage *message = NULL;
00073 while ((message = g_async_queue_try_pop(queue->incoming))) {
00074
00075 if (message->cmd == OSYNC_MESSAGE_REPLY || message->cmd == OSYNC_MESSAGE_ERRORREPLY) {
00076
00077
00078
00079 g_mutex_lock(queue->pendingLock);
00080
00081 OSyncPendingMessage *found = NULL;
00082
00083 GList *p = NULL;
00084 for (p = queue->pendingReplies; p; p = p->next) {
00085 OSyncPendingMessage *pending = p->data;
00086
00087 if (pending->id1 == message->id1 && pending->id2 == message->id2) {
00088
00089
00090 queue->pendingReplies = g_list_remove(queue->pendingReplies, pending);
00091 found = pending;
00092 break;
00093 }
00094 }
00095 g_mutex_unlock(queue->pendingLock);
00096
00097 if (found) {
00098
00099 osync_assert(found->callback);
00100 found->callback(message, found->user_data);
00101
00102 g_free(found);
00103 } else
00104 osync_trace(TRACE_INTERNAL, "%s: No pending message for %lld:%d", __func__, message->id1, message->id2);
00105
00106 } else
00107 queue->message_handler(message, queue->user_data);
00108
00109 osync_message_unref(message);
00110 }
00111
00112 osync_trace(TRACE_EXIT, "%s: Done dispatching", __func__);
00113 return TRUE;
00114 }
00115
00116 static void _osync_queue_stop_incoming(OSyncQueue *queue)
00117 {
00118 if (queue->incoming_source) {
00119 g_source_destroy(queue->incoming_source);
00120 queue->incoming_source = NULL;
00121 }
00122
00123 if (queue->incomingContext) {
00124 g_main_context_unref(queue->incomingContext);
00125 queue->incomingContext = NULL;
00126 }
00127
00128 if (queue->incoming_functions) {
00129 g_free(queue->incoming_functions);
00130 queue->incoming_functions = NULL;
00131 }
00132 }
00133
00134 static
00135 gboolean _queue_prepare(GSource *source, gint *timeout_)
00136 {
00137 *timeout_ = 1;
00138 return FALSE;
00139 }
00140
00141 static
00142 gboolean _queue_check(GSource *source)
00143 {
00144 OSyncQueue *queue = *((OSyncQueue **)(source + 1));
00145 if (g_async_queue_length(queue->outgoing) > 0)
00146 return TRUE;
00147 return FALSE;
00148 }
00149
00150 int _osync_queue_write_data(OSyncQueue *queue, const void *vptr, size_t n, OSyncError **error)
00151 {
00152 ssize_t nwritten = 0;
00153
00154 while (n > 0) {
00155 if ((nwritten = write(queue->fd, vptr, n)) <= 0) {
00156 if (errno == EINTR)
00157 nwritten = 0;
00158 else {
00159 osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to write IPC data: %i: %s", errno, strerror(errno));
00160 return (-1);
00161 }
00162 }
00163
00164 n -= nwritten;
00165 vptr += nwritten;
00166 }
00167 return (nwritten);
00168 }
00169
00170 osync_bool _osync_queue_write_long_long_int(OSyncQueue *queue, const long long int message, OSyncError **error)
00171 {
00172 if (_osync_queue_write_data(queue, &message, sizeof(long long int), error) < 0)
00173 return FALSE;
00174
00175 return TRUE;
00176 }
00177
00178 osync_bool _osync_queue_write_int(OSyncQueue *queue, const int message, OSyncError **error)
00179 {
00180 if (_osync_queue_write_data(queue, &message, sizeof(int), error) < 0)
00181 return FALSE;
00182
00183 return TRUE;
00184 }
00185
00186
00187
00188 static
00189 gboolean _queue_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
00190 {
00191 OSyncQueue *queue = user_data;
00192 OSyncError *error = NULL;
00193
00194 OSyncMessage *message = NULL;
00195
00196 while ((message = g_async_queue_try_pop(queue->outgoing))) {
00197
00198 if (!queue->connected) {
00199 osync_error_set(&error, OSYNC_ERROR_GENERIC, "Trying to send to a queue thats not connected");
00200 goto error;
00201 }
00202
00203
00204 if (!_osync_queue_write_int(queue, message->buffer->len + osync_marshal_get_size_message(message), &error))
00205 goto error;
00206
00207 if (!_osync_queue_write_int(queue, message->cmd, &error))
00208 goto error;
00209
00210 if (!_osync_queue_write_long_long_int(queue, message->id1, &error))
00211 goto error;
00212
00213 if (!_osync_queue_write_int(queue, message->id2, &error))
00214 goto error;
00215
00216 if (message->buffer->len) {
00217 int sent = 0;
00218 do {
00219 int written = _osync_queue_write_data(queue, message->buffer->data + sent, message->buffer->len - sent, &error);
00220 if (written < 0)
00221 goto error;
00222
00223 sent += written;
00224 } while (sent < message->buffer->len);
00225 }
00226
00227 osync_message_unref(message);
00228 }
00229
00230 return TRUE;
00231
00232 error:
00233 if (message)
00234 osync_message_unref(message);
00235
00236 if (error) {
00237 message = osync_message_new(OSYNC_MESSAGE_QUEUE_ERROR, 0, &error);
00238 if (message) {
00239 osync_marshal_error(message, error);
00240 g_async_queue_push(queue->incoming, message);
00241 }
00242
00243 osync_error_free(&error);
00244 }
00245 return FALSE;
00246 }
00247
00248 static
00249 gboolean _source_prepare(GSource *source, gint *timeout_)
00250 {
00251 *timeout_ = 1;
00252 return FALSE;
00253 }
00254
00255 static
00256 int _osync_queue_read_data(OSyncQueue *queue, void *vptr, size_t n, OSyncError **error)
00257 {
00258 size_t nleft;
00259 ssize_t nread = 0;
00260
00261 nleft = n;
00262 while (n > 0) {
00263 if ((nread = read(queue->fd, vptr, nleft)) < 0) {
00264 if (errno == EINTR)
00265 nread = 0;
00266 else {
00267 osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to read IPC data: %i: %s", errno, strerror(errno));
00268 return (-1);
00269 }
00270 } else if (nread == 0)
00271 break;
00272
00273 nleft -= nread;
00274 vptr += nread;
00275 }
00276 return (n - nleft);
00277 }
00278
00279 static
00280 osync_bool _osync_queue_read_int(OSyncQueue *queue, int *message, OSyncError **error)
00281 {
00282 int read = _osync_queue_read_data(queue, message, sizeof(int), error);
00283
00284 if (read < 0)
00285 return FALSE;
00286
00287 if (read != sizeof(int)) {
00288 osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to read int. EOF");
00289 return FALSE;
00290 }
00291
00292 return TRUE;
00293 }
00294
00295 static
00296 osync_bool _osync_queue_read_long_long_int(OSyncQueue *queue, long long int *message, OSyncError **error)
00297 {
00298 int read = _osync_queue_read_data(queue, message, sizeof(long long int), error);
00299
00300 if (read < 0)
00301 return FALSE;
00302
00303 if (read != sizeof(long long int)) {
00304 osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to read int. EOF");
00305 return FALSE;
00306 }
00307
00308 return TRUE;
00309 }
00310
00311 static
00312 gboolean _source_check(GSource *source)
00313 {
00314 OSyncQueue *queue = *((OSyncQueue **)(source + 1));
00315 OSyncMessage *message = NULL;
00316 OSyncError *error = NULL;
00317
00318 if (queue->connected == FALSE) {
00319
00320
00321
00322 if (queue->pendingReplies) {
00323 g_mutex_lock(queue->pendingLock);
00324 osync_error_set(&error, OSYNC_ERROR_IO_ERROR, "Broken Pipe");
00325 GList *p = NULL;
00326 for (p = queue->pendingReplies; p; p = p->next) {
00327 OSyncPendingMessage *pending = p->data;
00328
00329 message = osync_message_new(OSYNC_MESSAGE_ERRORREPLY, 0, NULL);
00330 if (message) {
00331 osync_marshal_error(message, error);
00332
00333 message->id1 = pending->id1;
00334 message->id2 = pending->id2;
00335
00336 g_async_queue_push(queue->incoming, message);
00337 }
00338 }
00339
00340 osync_error_free(&error);
00341 g_mutex_unlock(queue->pendingLock);
00342 }
00343
00344 return FALSE;
00345 }
00346
00347 switch (osync_queue_poll(queue)) {
00348 case OSYNC_QUEUE_EVENT_NONE:
00349 return FALSE;
00350 case OSYNC_QUEUE_EVENT_READ:
00351 return TRUE;
00352 case OSYNC_QUEUE_EVENT_HUP:
00353 case OSYNC_QUEUE_EVENT_ERROR:
00354 queue->connected = FALSE;
00355
00356
00357
00358 message = osync_message_new(OSYNC_MESSAGE_QUEUE_HUP, 0, &error);
00359 if (!message)
00360 goto error;
00361
00362 g_async_queue_push(queue->incoming, message);
00363
00364 if (queue->incomingContext)
00365 g_main_context_wakeup(queue->incomingContext);
00366 return FALSE;
00367 }
00368
00369 return FALSE;
00370
00371 error:
00372 message = osync_message_new(OSYNC_MESSAGE_QUEUE_ERROR, 0, &error);
00373 if (message) {
00374 osync_marshal_error(message, error);
00375 g_async_queue_push(queue->incoming, message);
00376 }
00377 osync_error_free(&error);
00378 return FALSE;
00379 }
00380
00381
00382
00383 static
00384 gboolean _source_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
00385 {
00386 OSyncQueue *queue = user_data;
00387 OSyncMessage *message = NULL;
00388 OSyncError *error = NULL;
00389
00390 do {
00391 int size = 0;
00392 int cmd = 0;
00393 long long int id1 = 0;
00394 int id2 = 0;
00395
00396 if (!_osync_queue_read_int(queue, &size, &error))
00397 goto error;
00398
00399 if (!_osync_queue_read_int(queue, &cmd, &error))
00400 goto error;
00401
00402 if (!_osync_queue_read_long_long_int(queue, &id1, &error))
00403 goto error;
00404
00405 if (!_osync_queue_read_int(queue, &id2, &error))
00406 goto error;
00407
00408 message = osync_message_new(cmd, size, &error);
00409 if (!message)
00410 goto error;
00411
00412 message->id1 = id1;
00413 message->id2 = id2;
00414
00415 if (size) {
00416 int read = 0;
00417 do {
00418 int inc = _osync_queue_read_data(queue, message->buffer->data + read, size - read, &error);
00419
00420 if (inc < 0)
00421 goto error_free_message;
00422
00423 if (inc == 0) {
00424 osync_error_set(&error, OSYNC_ERROR_IO_ERROR, "Encountered EOF while data was missing");
00425 goto error_free_message;
00426 }
00427
00428 read += inc;
00429 } while (read < size);
00430 }
00431
00432 g_async_queue_push(queue->incoming, message);
00433
00434 if (queue->incomingContext)
00435 g_main_context_wakeup(queue->incomingContext);
00436 } while (_source_check(queue->read_source));
00437
00438 return TRUE;
00439
00440 error_free_message:
00441 osync_message_unref(message);
00442 error:
00443 if (error) {
00444 message = osync_message_new(OSYNC_MESSAGE_QUEUE_ERROR, 0, &error);
00445 if (message) {
00446 osync_marshal_error(message, error);
00447 g_async_queue_push(queue->incoming, message);
00448 }
00449
00450 osync_error_free(&error);
00451 }
00452
00453 return FALSE;
00454 }
00455
00461 OSyncQueue *osync_queue_new(const char *name, OSyncError **error)
00462 {
00463 osync_trace(TRACE_ENTRY, "%s(%s, %p)", __func__, name, error);
00464
00465 OSyncQueue *queue = osync_try_malloc0(sizeof(OSyncQueue), error);
00466 if (!queue)
00467 goto error;
00468
00469 if (name)
00470 queue->name = g_strdup(name);
00471 queue->fd = -1;
00472
00473 if (!g_thread_supported ())
00474 g_thread_init (NULL);
00475
00476 queue->pendingLock = g_mutex_new();
00477
00478 queue->context = g_main_context_new();
00479
00480 queue->outgoing = g_async_queue_new();
00481 queue->incoming = g_async_queue_new();
00482
00483 osync_trace(TRACE_EXIT, "%s: %p", __func__, queue);
00484 return queue;
00485
00486 error:
00487 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00488 return NULL;
00489 }
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508 osync_bool osync_queue_new_pipes(OSyncQueue **read_queue, OSyncQueue **write_queue, OSyncError **error)
00509 {
00510 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, read_queue, write_queue, error);
00511
00512 *read_queue = osync_queue_new(NULL, error);
00513 if (!*read_queue)
00514 goto error;
00515
00516 *write_queue = osync_queue_new(NULL, error);
00517 if (!*write_queue)
00518 goto error_free_read_queue;
00519
00520 int filedes[2];
00521
00522 if (pipe(filedes) < 0) {
00523 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to create pipes");
00524 goto error_free_write_queue;
00525 }
00526
00527 (*read_queue)->fd = filedes[0];
00528 (*write_queue)->fd = filedes[1];
00529
00530 osync_trace(TRACE_EXIT, "%s", __func__);
00531 return TRUE;
00532
00533 error_free_write_queue:
00534 osync_queue_free(*write_queue);
00535 error_free_read_queue:
00536 osync_queue_free(*read_queue);
00537 error:
00538 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00539 return FALSE;
00540 }
00541
00542 void osync_queue_free(OSyncQueue *queue)
00543 {
00544 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, queue);
00545 OSyncMessage *message = NULL;
00546 OSyncPendingMessage *pending = NULL;
00547
00548 g_mutex_free(queue->pendingLock);
00549
00550 g_main_context_unref(queue->context);
00551
00552 _osync_queue_stop_incoming(queue);
00553
00554 while ((message = g_async_queue_try_pop(queue->incoming))) {
00555 osync_message_unref(message);
00556 }
00557 g_async_queue_unref(queue->incoming);
00558
00559 while ((message = g_async_queue_try_pop(queue->outgoing))) {
00560 osync_message_unref(message);
00561 }
00562 g_async_queue_unref(queue->outgoing);
00563
00564 while (queue->pendingReplies) {
00565 pending = queue->pendingReplies->data;
00566 g_free(pending);
00567 queue->pendingReplies = g_list_remove(queue->pendingReplies, pending);
00568 }
00569
00570 if (queue->name)
00571 g_free(queue->name);
00572
00573 g_free(queue);
00574
00575 osync_trace(TRACE_EXIT, "%s", __func__);
00576 }
00577
00578 osync_bool osync_queue_exists(OSyncQueue *queue)
00579 {
00580 return g_file_test(queue->name, G_FILE_TEST_EXISTS) ? TRUE : FALSE;
00581 }
00582
00583 osync_bool osync_queue_create(OSyncQueue *queue, OSyncError **error)
00584 {
00585 if (mkfifo(queue->name, 0600) != 0) {
00586 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to create fifo");
00587 return FALSE;
00588 }
00589
00590 return TRUE;
00591 }
00592
00593 osync_bool osync_queue_remove(OSyncQueue *queue, OSyncError **error)
00594 {
00595 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, error);
00596
00597 if (unlink(queue->name) != 0) {
00598 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to remove queue");
00599 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00600 return FALSE;
00601 }
00602
00603 osync_trace(TRACE_EXIT, "%s", __func__);
00604 return TRUE;
00605 }
00606
00607 static osync_bool __osync_queue_connect(OSyncQueue *queue, OSyncQueueType type, osync_bool nonblocking, OSyncError **error)
00608 {
00609 osync_assert(queue);
00610 osync_assert(queue->connected == FALSE);
00611 OSyncQueue **queueptr = NULL;
00612
00613 queue->type = type;
00614
00615 if (queue->fd == -1) {
00616
00617 int fd = open(queue->name, (type == OSYNC_QUEUE_SENDER ? O_WRONLY : O_RDONLY) | (nonblocking ? O_NONBLOCK : 0));
00618 if (fd == -1) {
00619 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to open fifo");
00620 goto error;
00621 }
00622 queue->fd = fd;
00623
00624 int oldflags = fcntl(queue->fd, F_GETFD);
00625 if (oldflags == -1) {
00626 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to get fifo flags");
00627 goto error_close;
00628 }
00629 if (fcntl(queue->fd, F_SETFD, oldflags|FD_CLOEXEC) == -1) {
00630 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to set fifo flags");
00631 goto error_close;
00632 }
00633 }
00634
00635 queue->connected = TRUE;
00636 signal(SIGPIPE, SIG_IGN);
00637
00638
00639 queue->thread = osync_thread_new(queue->context, error);
00640
00641 if (!queue->thread)
00642 goto error;
00643
00644 queue->write_functions = g_malloc0(sizeof(GSourceFuncs));
00645 queue->write_functions->prepare = _queue_prepare;
00646 queue->write_functions->check = _queue_check;
00647 queue->write_functions->dispatch = _queue_dispatch;
00648 queue->write_functions->finalize = NULL;
00649
00650 queue->write_source = g_source_new(queue->write_functions, sizeof(GSource) + sizeof(OSyncQueue *));
00651 queueptr = (OSyncQueue **)(queue->write_source + 1);
00652 *queueptr = queue;
00653 g_source_set_callback(queue->write_source, NULL, queue, NULL);
00654 g_source_attach(queue->write_source, queue->context);
00655 g_main_context_ref(queue->context);
00656
00657 queue->read_functions = g_malloc0(sizeof(GSourceFuncs));
00658 queue->read_functions->prepare = _source_prepare;
00659 queue->read_functions->check = _source_check;
00660 queue->read_functions->dispatch = _source_dispatch;
00661 queue->read_functions->finalize = NULL;
00662
00663 queue->read_source = g_source_new(queue->read_functions, sizeof(GSource) + sizeof(OSyncQueue *));
00664 queueptr = (OSyncQueue **)(queue->read_source + 1);
00665 *queueptr = queue;
00666 g_source_set_callback(queue->read_source, NULL, queue, NULL);
00667 g_source_attach(queue->read_source, queue->context);
00668 g_main_context_ref(queue->context);
00669
00670 osync_thread_start(queue->thread);
00671
00672 return TRUE;
00673
00674 error_close:
00675 close(queue->fd);
00676 error:
00677 return FALSE;
00678 }
00679
00680
00681 osync_bool osync_queue_connect(OSyncQueue *queue, OSyncQueueType type, OSyncError **error)
00682 {
00683 return __osync_queue_connect(queue, type, FALSE, error);
00684 }
00685
00686 osync_bool osync_queue_try_connect(OSyncQueue *queue, OSyncQueueType type, OSyncError **error)
00687 {
00688 return __osync_queue_connect(queue, type, TRUE, error);
00689 }
00690
00691 osync_bool osync_queue_disconnect(OSyncQueue *queue, OSyncError **error)
00692 {
00693 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, error);
00694 osync_assert(queue);
00695
00696 if (queue->thread) {
00697 osync_thread_stop(queue->thread);
00698 osync_thread_free(queue->thread);
00699 queue->thread = NULL;
00700 }
00701
00702
00703
00704 if (queue->write_functions)
00705 g_free(queue->write_functions);
00706
00707
00708
00709 _osync_queue_stop_incoming(queue);
00710
00711
00712
00713 OSyncMessage *message = NULL;
00714 while ((message = g_async_queue_try_pop(queue->incoming))) {
00715 osync_message_unref(message);
00716 }
00717
00718 if (close(queue->fd) != 0) {
00719 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to close queue");
00720 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00721 return FALSE;
00722 }
00723
00724 queue->fd = -1;
00725 queue->connected = FALSE;
00726
00727 osync_trace(TRACE_EXIT, "%s", __func__);
00728 return TRUE;
00729 }
00730
00731 osync_bool osync_queue_is_connected(OSyncQueue *queue)
00732 {
00733 osync_assert(queue);
00734 return queue->connected;
00735 }
00736
00746 void osync_queue_set_message_handler(OSyncQueue *queue, OSyncMessageHandler handler, gpointer user_data)
00747 {
00748 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, queue, handler, user_data);
00749
00750 queue->message_handler = handler;
00751 queue->user_data = user_data;
00752
00753 osync_trace(TRACE_EXIT, "%s", __func__);
00754 }
00755
00766 void osync_queue_setup_with_gmainloop(OSyncQueue *queue, GMainContext *context)
00767 {
00768 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, context);
00769
00770 queue->incoming_functions = g_malloc0(sizeof(GSourceFuncs));
00771 queue->incoming_functions->prepare = _incoming_prepare;
00772 queue->incoming_functions->check = _incoming_check;
00773 queue->incoming_functions->dispatch = _incoming_dispatch;
00774 queue->incoming_functions->finalize = NULL;
00775
00776 queue->incoming_source = g_source_new(queue->incoming_functions, sizeof(GSource) + sizeof(OSyncQueue *));
00777 OSyncQueue **queueptr = (OSyncQueue **)(queue->incoming_source + 1);
00778 *queueptr = queue;
00779 g_source_set_callback(queue->incoming_source, NULL, queue, NULL);
00780 g_source_attach(queue->incoming_source, context);
00781 queue->incomingContext = context;
00782
00783 g_main_context_ref(context);
00784
00785
00786 g_main_context_ref(context);
00787
00788 osync_trace(TRACE_EXIT, "%s", __func__);
00789 }
00790
00791 osync_bool osync_queue_dispatch(OSyncQueue *queue, OSyncError **error)
00792 {
00793 _incoming_dispatch(NULL, NULL, queue);
00794 return TRUE;
00795 }
00796
00797 OSyncQueueEvent osync_queue_poll(OSyncQueue *queue)
00798 {
00799 struct pollfd pfd;
00800 pfd.fd = queue->fd;
00801 pfd.events = POLLIN;
00802
00803
00804
00805
00806
00807
00808 int ret = poll(&pfd, 1, queue->type == OSYNC_QUEUE_SENDER ? 0 : 100);
00809
00810 if (ret < 0 && errno == EINTR)
00811 return OSYNC_QUEUE_EVENT_NONE;
00812
00813 if (ret == 0)
00814 return OSYNC_QUEUE_EVENT_NONE;
00815
00816 if (pfd.revents & POLLERR)
00817 return OSYNC_QUEUE_EVENT_ERROR;
00818 else if (pfd.revents & POLLHUP)
00819 return OSYNC_QUEUE_EVENT_HUP;
00820 else if (pfd.revents & POLLIN)
00821 return OSYNC_QUEUE_EVENT_READ;
00822
00823 return OSYNC_QUEUE_EVENT_ERROR;
00824 }
00825
00827 OSyncMessage *osync_queue_get_message(OSyncQueue *queue)
00828 {
00829 return g_async_queue_pop(queue->incoming);
00830 }
00831
00832 void gen_id(long long int *part1, int *part2)
00833 {
00834 struct timeval tv;
00835 struct timezone tz;
00836
00837 gettimeofday(&tv, &tz);
00838
00839 long long int now = tv.tv_sec * 1000000 + tv.tv_usec;
00840
00841 int rnd = (int)random();
00842 rnd = rnd << 16 | getpid();
00843
00844 *part1 = now;
00845 *part2 = rnd;
00846 }
00847
00848 osync_bool osync_queue_send_message(OSyncQueue *queue, OSyncQueue *replyqueue, OSyncMessage *message, OSyncError **error)
00849 {
00850 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p, %p)", __func__, queue, replyqueue, message, error);
00851
00852 if (message->callback) {
00853 osync_assert(replyqueue);
00854 OSyncPendingMessage *pending = osync_try_malloc0(sizeof(OSyncPendingMessage), error);
00855 if (!pending)
00856 goto error;
00857
00858 gen_id(&(message->id1), &(message->id2));
00859 pending->id1 = message->id1;
00860 pending->id2 = message->id2;
00861
00862 pending->callback = message->callback;
00863 pending->user_data = message->user_data;
00864
00865 g_mutex_lock(replyqueue->pendingLock);
00866 replyqueue->pendingReplies = g_list_append(replyqueue->pendingReplies, pending);
00867 g_mutex_unlock(replyqueue->pendingLock);
00868 }
00869
00870 osync_message_ref(message);
00871 g_async_queue_push(queue->outgoing, message);
00872
00873 g_main_context_wakeup(queue->context);
00874
00875 osync_trace(TRACE_EXIT, "%s", __func__);
00876 return TRUE;
00877
00878 error:
00879 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00880 return FALSE;
00881 }
00882
00883 osync_bool osync_queue_send_message_with_timeout(OSyncQueue *queue, OSyncQueue *replyqueue, OSyncMessage *message, int timeout, OSyncError **error)
00884 {
00885 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, queue, message, error);
00886
00887
00888
00889 osync_bool ret = osync_queue_send_message(queue, replyqueue, message, error);
00890
00891 osync_trace(ret ? TRACE_EXIT : TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00892 return ret;
00893 }
00894
00895 osync_bool osync_queue_is_alive(OSyncQueue *queue)
00896 {
00897
00898 if (!osync_queue_try_connect(queue, OSYNC_QUEUE_SENDER, NULL)) {
00899 return FALSE;
00900 }
00901
00902 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_NOOP, 0, NULL);
00903 if (!message) {
00904 return FALSE;
00905 }
00906
00907 if (!osync_queue_send_message(queue, NULL, message, NULL)) {
00908 return FALSE;
00909 }
00910
00911 osync_queue_disconnect(queue, NULL);
00912
00913 return TRUE;
00914 }