00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "config.h"
00022 #include "engine.h"
00023 #include <glib.h>
00024 #include <opensync/opensync_support.h>
00025 #include "opensync/opensync_format_internals.h"
00026 #include "opensync/opensync_member_internals.h"
00027 #include "opensync/opensync_message_internals.h"
00028 #include "opensync/opensync_queue_internals.h"
00029
00030 #include "engine_internals.h"
00031 #include <unistd.h>
00032
00033 #include <sys/types.h>
00034 #include <sys/wait.h>
00035 #include <errno.h>
00036 #include <signal.h>
00037
00043 void _get_changes_reply_receiver(OSyncMessage *message, OSyncClient *sender)
00044 {
00045 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, sender);
00046 OSyncEngine *engine = sender->engine;
00047
00048 if (osync_message_is_error(message)) {
00049 OSyncError *error = NULL;
00050 osync_demarshal_error(message, &error);
00051 osync_error_duplicate(&engine->error, &error);
00052 osync_debug("ENG", 1, "Get changes command reply was a error: %s", osync_error_print(&error));
00053 osync_status_update_member(engine, sender, MEMBER_GET_CHANGES_ERROR, &error);
00054 osync_error_update(&engine->error, "Unable to read from one of the members");
00055 osync_flag_unset(sender->fl_sent_changes);
00056
00057 osync_flag_set(sender->fl_done);
00058
00059
00060
00061
00062
00063 osync_flag_set(engine->fl_stop);
00064
00065 } else {
00066 osync_status_update_member(engine, sender, MEMBER_SENT_CHANGES, NULL);
00067 osync_flag_set(sender->fl_sent_changes);
00068 }
00069
00070 osengine_client_decider(engine, sender);
00071 osync_trace(TRACE_EXIT, "_get_changes_reply_receiver");
00072 }
00073
00079 void _connect_reply_receiver(OSyncMessage *message, OSyncClient *sender)
00080 {
00081 osync_trace(TRACE_ENTRY, "_connect_reply_receiver(%p, %p)", message, sender);
00082
00083 osync_trace(TRACE_INTERNAL, "connect reply %i", osync_message_is_error(message));
00084 OSyncEngine *engine = sender->engine;
00085
00086 if (osync_message_is_error(message)) {
00087 OSyncError *error = NULL;
00088 osync_demarshal_error(message, &error);
00089 osync_error_duplicate(&engine->error, &error);
00090 osync_debug("ENG", 1, "Connect command reply was a error: %s", osync_error_print(&error));
00091 osync_status_update_member(engine, sender, MEMBER_CONNECT_ERROR, &error);
00092 osync_error_update(&engine->error, "Unable to connect one of the members");
00093 osync_flag_unset(sender->fl_connected);
00094 osync_flag_set(sender->fl_finished);
00095 osync_flag_set(sender->fl_sent_changes);
00096 osync_flag_set(sender->fl_done);
00097
00098
00099
00100
00101
00102 osync_flag_set(engine->fl_stop);
00103
00104 } else {
00105 osync_member_read_sink_info(sender->member, message);
00106
00107 osync_status_update_member(engine, sender, MEMBER_CONNECTED, NULL);
00108 osync_flag_set(sender->fl_connected);
00109 }
00110
00111 osengine_client_decider(engine, sender);
00112 osync_trace(TRACE_EXIT, "_connect_reply_receiver");
00113 }
00114
00115 void _sync_done_reply_receiver(OSyncMessage *message, OSyncClient *sender)
00116 {
00117 osync_trace(TRACE_ENTRY, "_sync_done_reply_receiver(%p, %p)", message, sender);
00118
00119 OSyncEngine *engine = sender->engine;
00120
00121 if (osync_message_is_error(message)) {
00122 OSyncError *error = NULL;
00123 osync_demarshal_error(message, &error);
00124 osync_error_duplicate(&engine->error, &error);
00125 osync_debug("ENG", 1, "Sync done command reply was a error: %s", osync_error_print(&error));
00126 osync_status_update_member(engine, sender, MEMBER_SYNC_DONE_ERROR, &error);
00127 osync_error_update(&engine->error, "Unable to finish the sync for one of the members");
00128 }
00129
00130 osync_flag_set(sender->fl_done);
00131 osengine_client_decider(engine, sender);
00132 osync_trace(TRACE_EXIT, "_sync_done_reply_receiver");
00133 }
00134
00135 void _committed_all_reply_receiver(OSyncMessage *message, OSyncClient *sender)
00136 {
00137 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, sender);
00138
00139 OSyncEngine *engine = sender->engine;
00140
00141 if (osync_message_is_error(message)) {
00142 OSyncError *error = NULL;
00143 osync_demarshal_error(message, &error);
00144 osync_error_duplicate(&engine->error, &error);
00145 osync_debug("ENG", 1, "Committed all command reply was a error: %s", osync_error_print(&error));
00146 osync_status_update_member(engine, sender, MEMBER_COMMITTED_ALL_ERROR, &error);
00147 osync_error_update(&engine->error, "Unable to write changes to one of the members");
00148 } else
00149 osync_status_update_member(engine, sender, MEMBER_COMMITTED_ALL, NULL);
00150
00151 osync_flag_set(sender->fl_committed_all);
00152 osengine_client_decider(engine, sender);
00153 osync_trace(TRACE_EXIT, "%s", __func__);
00154 }
00155
00156 void _disconnect_reply_receiver(OSyncMessage *message, OSyncClient *sender)
00157 {
00158 osync_trace(TRACE_ENTRY, "_disconnect_reply_receiver(%p, %p)", message, sender);
00159
00160 OSyncEngine *engine = sender->engine;
00161
00162 if (osync_message_is_error(message)) {
00163 OSyncError *error = NULL;
00164 osync_demarshal_error(message, &error);
00165 osync_debug("ENG", 1, "Sync done command reply was a error: %s", osync_error_print(&error));
00166 osync_status_update_member(engine, sender, MEMBER_DISCONNECT_ERROR, &error);
00167 } else
00168 osync_status_update_member(engine, sender, MEMBER_DISCONNECTED, NULL);
00169
00170 osync_flag_unset(sender->fl_connected);
00171 osync_flag_set(sender->fl_finished);
00172 osengine_client_decider(engine, sender);
00173 osync_trace(TRACE_EXIT, "_disconnect_reply_receiver");
00174 }
00175
00176 void _get_change_data_reply_receiver(OSyncMessage *message, OSyncMappingEntry *entry)
00177 {
00178 osync_trace(TRACE_ENTRY, "_get_change_data_reply_receiver(%p, %p, %p)", message, entry);
00179 OSyncEngine *engine = entry->client->engine;
00180
00181 if (osync_message_is_error(message)) {
00182 OSyncError *error = NULL;
00183 osync_demarshal_error(message, &error);
00184 osync_error_duplicate(&engine->error, &error);
00185 osync_debug("MAP", 1, "Commit change command reply was a error: %s", osync_error_print(&error));
00186 osync_status_update_change(engine, entry->change, CHANGE_RECV_ERROR, &error);
00187 osync_error_update(&engine->error, "Unable to read one or more objects");
00188
00189
00190
00191 } else {
00192
00193 osync_demarshal_changedata(message, entry->change);
00194
00195 osync_flag_set(entry->fl_has_data);
00196 osync_status_update_change(engine, entry->change, CHANGE_RECEIVED, NULL);
00197 }
00198
00199 osync_change_save(entry->change, TRUE, NULL);
00200 osengine_mappingentry_decider(engine, entry);
00201 osync_trace(TRACE_EXIT, "_get_change_data_reply_receiver");
00202 }
00203
00204 void _read_change_reply_receiver(OSyncClient *sender, OSyncMessage *message, OSyncEngine *engine)
00205 {
00206 osync_trace(TRACE_ENTRY, "_read_change_reply_receiver(%p, %p, %p)", sender, message, engine);
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227 osync_trace(TRACE_EXIT, "_read_change_reply_receiver");
00228 }
00229
00230 void _commit_change_reply_receiver(OSyncMessage *message, OSyncMappingEntry *entry)
00231 {
00232 osync_trace(TRACE_ENTRY, "_commit_change_reply_receiver(%p, %p)", message, entry);
00233 OSyncEngine *engine = entry->client->engine;
00234
00235 if (osync_message_is_error(message)) {
00236 OSyncError *error = NULL;
00237 osync_demarshal_error(message, &error);
00238 osync_error_duplicate(&engine->error, &error);
00239 osync_debug("MAP", 1, "Commit change command reply was a error: %s", osync_error_print(&error));
00240 osync_status_update_change(engine, entry->change, CHANGE_WRITE_ERROR, &error);
00241 OSyncError *maperror = NULL;
00242 osync_error_duplicate(&maperror, &error);
00243 osync_status_update_mapping(engine, entry->mapping, MAPPING_WRITE_ERROR, &maperror);
00244 osync_error_update(&engine->error, "Unable to write one or more objects");
00245
00246
00247 osync_flag_unset(entry->fl_dirty);
00248 osync_flag_set(entry->fl_synced);
00249 } else {
00250
00251
00252
00253
00254 char *newuid;
00255 osync_message_read_string(message, &newuid);
00256 osync_change_set_uid(entry->change, newuid);
00257
00258 osync_status_update_change(engine, entry->change, CHANGE_SENT, NULL);
00259 osync_flag_unset(entry->fl_dirty);
00260 osync_flag_set(entry->fl_synced);
00261 }
00262
00263 if (osync_change_get_changetype(entry->change) == CHANGE_DELETED)
00264 osync_flag_set(entry->fl_deleted);
00265
00266 osync_change_reset(entry->change);
00267
00268 OSyncError *error = NULL;
00269 osync_change_save(entry->change, TRUE, &error);
00270
00271 osengine_mappingentry_decider(engine, entry);
00272 osync_trace(TRACE_EXIT, "_commit_change_reply_receiver");
00273 }
00274
00275 OSyncClient *osync_client_new(OSyncEngine *engine, OSyncMember *member, OSyncError **error)
00276 {
00277 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, engine, member, error);
00278 OSyncClient *client = osync_try_malloc0(sizeof(OSyncClient), error);
00279 if (!client)
00280 goto error;
00281
00282 client->member = member;
00283 osync_member_set_data(member, client);
00284 client->engine = engine;
00285 engine->clients = g_list_append(engine->clients, client);
00286
00287 char *name = g_strdup_printf("%s/pluginpipe", osync_member_get_configdir(member));
00288 client->commands_to_osplugin = osync_queue_new(name, error);
00289 g_free(name);
00290
00291 name = g_strdup_printf("%s/enginepipe", osync_member_get_configdir(member));
00292 client->commands_from_osplugin = osync_queue_new(name, error);
00293 g_free(name);
00294
00295 if (!client->commands_to_osplugin || !client->commands_from_osplugin)
00296 goto error_free_client;
00297
00298 client->fl_connected = osync_flag_new(engine->cmb_connected);
00299 client->fl_sent_changes = osync_flag_new(engine->cmb_sent_changes);
00300 client->fl_done = osync_flag_new(NULL);
00301 client->fl_committed_all = osync_flag_new(engine->cmb_committed_all_sent);
00302 client->fl_finished = osync_flag_new(engine->cmb_finished);
00303
00304 osync_trace(TRACE_EXIT, "%s: %p", __func__, client);
00305 return client;
00306
00307 error_free_client:
00308 g_free(client);
00309 error:
00310 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00311 return NULL;
00312 }
00313
00314 void osync_client_reset(OSyncClient *client)
00315 {
00316 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, client);
00317 osync_flag_set_state(client->fl_connected, FALSE);
00318 osync_flag_set_state(client->fl_sent_changes, FALSE);
00319 osync_flag_set_state(client->fl_done, FALSE);
00320 osync_flag_set_state(client->fl_finished, FALSE);
00321 osync_flag_set_state(client->fl_committed_all, FALSE);
00322 osync_trace(TRACE_EXIT, "%s", __func__);
00323 }
00324
00325 void osync_client_free(OSyncClient *client)
00326 {
00327 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, client);
00328 osync_queue_free(client->commands_to_osplugin);
00329 osync_queue_free(client->commands_from_osplugin);
00330
00331 osync_flag_free(client->fl_connected);
00332 osync_flag_free(client->fl_sent_changes);
00333 osync_flag_free(client->fl_done);
00334 osync_flag_free(client->fl_finished);
00335 osync_flag_free(client->fl_committed_all);
00336
00337 g_free(client);
00338 osync_trace(TRACE_EXIT, "%s", __func__);
00339 }
00340
00341 void *osync_client_message_sink(OSyncMember *member, const char *name, void *data, osync_bool synchronous)
00342 {
00343 OSyncClient *client = osync_member_get_data(member);
00344 OSyncEngine *engine = client->engine;
00345 if (!synchronous) {
00346
00347
00348
00349
00350
00351 return NULL;
00352 } else {
00353 return engine->plgmsg_callback(engine, client, name, data, engine->plgmsg_userdata);
00354 }
00355 }
00356
00357 OSyncPluginTimeouts osync_client_get_timeouts(OSyncClient *client)
00358 {
00359 return osync_plugin_get_timeouts(osync_member_get_plugin(client->member));
00360 }
00361
00362 void osync_client_call_plugin(OSyncClient *client, char *function, void *data, OSyncPluginReplyHandler replyhandler, void *userdata)
00363 {
00364 osync_trace(TRACE_ENTRY, "%s(%p, %s, %p, %p, %p)", __func__, client, function, data, replyhandler, userdata);
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383 osync_trace(TRACE_EXIT, "%s", __func__);
00384 }
00385
00386 osync_bool osync_client_get_changes(OSyncClient *target, OSyncEngine *sender, OSyncError **error)
00387 {
00388 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, error);
00389
00390 osync_flag_changing(target->fl_sent_changes);
00391
00392 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_GET_CHANGES, 0, error);
00393 if (!message)
00394 goto error;
00395
00396 osync_message_set_handler(message, (OSyncMessageHandler)_get_changes_reply_receiver, target);
00397
00398 osync_member_write_sink_info(target->member, message);
00399
00400 OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
00401 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.get_changeinfo_timeout, error))
00402 goto error_free_message;
00403
00404 osync_message_unref(message);
00405
00406 osync_trace(TRACE_EXIT, "%s", __func__);
00407 return TRUE;
00408
00409 error_free_message:
00410 osync_message_unref(message);
00411 error:
00412 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00413 return FALSE;
00414 }
00415
00416 osync_bool osync_client_get_change_data(OSyncClient *target, OSyncEngine *sender, OSyncMappingEntry *entry, OSyncError **error)
00417 {
00418 osync_flag_changing(entry->fl_has_data);
00419
00420 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_GET_CHANGEDATA, 0, error);
00421 if (!message)
00422 goto error;
00423
00424 osync_message_set_handler(message, (OSyncMessageHandler)_get_change_data_reply_receiver, entry);
00425
00426 osync_marshal_change(message, entry->change);
00427
00428 osync_debug("ENG", 3, "Sending get_changedata message %p to client %p", message, entry->client);
00429
00430 OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
00431 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.get_data_timeout, error))
00432 goto error_free_message;
00433
00434 osync_message_unref(message);
00435
00436 osync_trace(TRACE_EXIT, "%s", __func__);
00437 return TRUE;
00438
00439 error_free_message:
00440 osync_message_unref(message);
00441 error:
00442 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00443 return FALSE;
00444 }
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459 osync_bool osync_client_connect(OSyncClient *target, OSyncEngine *sender, OSyncError **error)
00460 {
00461 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, error);
00462
00463 osync_flag_changing(target->fl_connected);
00464
00465 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_CONNECT, 0, error);
00466 if (!message)
00467 goto error;
00468
00469 osync_member_write_sink_info(target->member, message);
00470
00471 osync_message_set_handler(message, (OSyncMessageHandler)_connect_reply_receiver, target);
00472
00473 OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
00474 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.connect_timeout, error))
00475 goto error_free_message;
00476
00477 osync_message_unref(message);
00478
00479 osync_trace(TRACE_EXIT, "%s", __func__);
00480 return TRUE;
00481
00482 error_free_message:
00483 osync_message_unref(message);
00484 error:
00485 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00486 return FALSE;
00487 }
00488
00489 osync_bool osync_client_commit_change(OSyncClient *target, OSyncEngine *sender, OSyncMappingEntry *entry, OSyncError **error)
00490 {
00491 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, entry);
00492 osync_trace(TRACE_INTERNAL, "Committing change with uid %s, changetype %i, data %p, size %i, objtype %s and format %s from member %lli", osync_change_get_uid(entry->change), osync_change_get_changetype(entry->change), osync_change_get_data(entry->change), osync_change_get_datasize(entry->change), osync_change_get_objtype(entry->change) ? osync_objtype_get_name(osync_change_get_objtype(entry->change)) : "None", osync_change_get_objformat(entry->change) ? osync_objformat_get_name(osync_change_get_objformat(entry->change)) : "None", osync_member_get_id(entry->client->member));
00493
00494 osync_flag_changing(entry->fl_dirty);
00495
00496
00497 if (!osync_change_convert_member_sink(osync_group_get_format_env(sender->group), entry->change, target->member, error))
00498 goto error;
00499
00500 if (osync_change_get_changetype(entry->change) == CHANGE_ADDED) {
00501 int elevated = 0;
00502
00503 OSyncMappingView *view = osengine_mappingtable_find_view(sender->maptable, target->member);
00504 while (!osengine_mappingview_uid_is_unique(view, entry, TRUE)) {
00505 if (!osync_change_elevate(sender, entry->change, 1))
00506 break;
00507 elevated++;
00508 }
00509
00510 if (elevated) {
00511
00512 if (!osync_change_save(entry->change, TRUE, error))
00513 goto error;
00514 }
00515 }
00516
00517 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_COMMIT_CHANGE, 0, error);
00518 if (!message)
00519 goto error;
00520
00521 osync_marshal_change(message, entry->change);
00522
00523 osync_message_set_handler(message, (OSyncMessageHandler)_commit_change_reply_receiver, entry);
00524 OSyncPluginTimeouts timeouts = osync_client_get_timeouts(entry->client);
00525
00526 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.commit_timeout, error))
00527 goto error_free_message;
00528
00529 osync_message_unref(message);
00530
00531 g_assert(osync_flag_is_attached(entry->fl_committed) == TRUE);
00532 osync_flag_detach(entry->fl_committed);
00533
00534 osync_trace(TRACE_EXIT, "%s", __func__);
00535 return TRUE;
00536
00537 error_free_message:
00538 osync_message_unref(message);
00539 error:
00540 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00541 return FALSE;
00542 }
00543
00544 osync_bool osync_client_sync_done(OSyncClient *target, OSyncEngine *sender, OSyncError **error)
00545 {
00546 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, error);
00547
00548 osync_flag_changing(target->fl_done);
00549 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_SYNC_DONE, 0, error);
00550 if (!message)
00551 goto error;
00552
00553 osync_message_set_handler(message, (OSyncMessageHandler)_sync_done_reply_receiver, target);
00554
00555 OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
00556 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.sync_done_timeout, error))
00557 goto error_free_message;
00558
00559 osync_message_unref(message);
00560
00561 osync_trace(TRACE_EXIT, "%s", __func__);
00562 return TRUE;
00563
00564 error_free_message:
00565 osync_message_unref(message);
00566 error:
00567 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00568 return FALSE;
00569 }
00570
00571 osync_bool osync_client_committed_all(OSyncClient *target, OSyncEngine *sender, OSyncError **error)
00572 {
00573 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, target, sender);
00574
00575 osync_flag_changing(target->fl_committed_all);
00576
00577 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_COMMITTED_ALL, 0, error);
00578 if (!message)
00579 goto error;
00580
00581 osync_message_set_handler(message, (OSyncMessageHandler)_committed_all_reply_receiver, target);
00582
00583
00584
00585 if (!osync_queue_send_message(target->commands_to_osplugin, target->commands_from_osplugin, message, error))
00586 goto error_free_message;
00587
00588 osync_message_unref(message);
00589
00590 osync_trace(TRACE_EXIT, "%s", __func__);
00591 return TRUE;
00592
00593 error_free_message:
00594 osync_message_unref(message);
00595 error:
00596 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00597 return FALSE;
00598 }
00599
00600 osync_bool osync_client_disconnect(OSyncClient *target, OSyncEngine *sender, OSyncError **error)
00601 {
00602 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, target, sender);
00603
00604 osync_flag_changing(target->fl_connected);
00605
00606 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_DISCONNECT, 0, error);
00607 if (!message)
00608 goto error;
00609
00610 osync_message_set_handler(message, (OSyncMessageHandler)_disconnect_reply_receiver, target);
00611
00612 OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
00613 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.disconnect_timeout, error))
00614 goto error_free_message;
00615
00616 osync_message_unref(message);
00617
00618 osync_trace(TRACE_EXIT, "%s", __func__);
00619 return TRUE;
00620
00621 error_free_message:
00622 osync_message_unref(message);
00623 error:
00624 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00625 return FALSE;
00626 }
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640 char *osync_client_pid_filename(OSyncClient *client)
00641 {
00642 return g_strdup_printf("%s/osplugin.pid", client->member->configdir);
00643 }
00644
00645 osync_bool osync_client_remove_pidfile(OSyncClient *client, OSyncError **error)
00646 {
00647 osync_bool ret = FALSE;
00648 char *pidpath = osync_client_pid_filename(client);
00649
00650 if (unlink(pidpath) < 0) {
00651 osync_error_set(error, OSYNC_ERROR_GENERIC, "Couldn't remove pid file: %s", strerror(errno));
00652 goto out_free_path;
00653 }
00654
00655
00656 ret = TRUE;
00657
00658 out_free_path:
00659 g_free(pidpath);
00660
00661 return ret;
00662 }
00663
00664 osync_bool osync_client_create_pidfile(OSyncClient *client, OSyncError **error)
00665 {
00666 osync_bool ret = FALSE;
00667 char *pidpath = osync_client_pid_filename(client);
00668 char *pidstr = g_strdup_printf("%ld", (long)client->child_pid);
00669
00670 if (!osync_file_write(pidpath, pidstr, strlen(pidstr), 0644, error))
00671 goto out_free_pidstr;
00672
00673
00674 ret = TRUE;
00675
00676 out_free_pidstr:
00677 g_free(pidstr);
00678
00679 g_free(pidpath);
00680
00681 return ret;
00682 }
00683
00684 osync_bool osync_client_kill_old_osplugin(OSyncClient *client, OSyncError **error)
00685 {
00686 osync_bool ret = FALSE;
00687
00688 char *pidstr;
00689 int pidlen;
00690 pid_t pid;
00691
00692 char *pidpath = osync_client_pid_filename(client);
00693
00694
00695 if (!g_file_test(pidpath, G_FILE_TEST_EXISTS)) {
00696 ret = TRUE;
00697 goto out_free_path;
00698 }
00699
00700 if (!osync_file_read(pidpath, &pidstr, &pidlen, error))
00701 goto out_free_path;
00702
00703 pid = atol(pidstr);
00704 if (!pid)
00705 goto out_free_str;
00706
00707 osync_trace(TRACE_INTERNAL, "Killing old osplugin process. PID: %ld", (long)pid);
00708
00709 if (kill(pid, SIGTERM) < 0) {
00710 osync_trace(TRACE_INTERNAL, "Error killing old osplugin: %s. Stale pid file?", strerror(errno));
00711
00712 }
00713
00714 int count = 0;
00715 while (osync_queue_is_alive(client->commands_to_osplugin)) {
00716 if (count++ > 10) {
00717 osync_trace(TRACE_INTERNAL, "Killing old osplugin process with SIGKILL");
00718 kill(pid, SIGKILL);
00719 break;
00720 }
00721 osync_trace(TRACE_INTERNAL, "Waiting for other side to terminate");
00722
00723 usleep(500000);
00724 }
00725
00726 if (unlink(pidpath) < 0) {
00727 osync_error_set(error, OSYNC_ERROR_GENERIC, "Couldn't erase PID file: %s", strerror(errno));
00728 goto out_free_str;
00729 }
00730
00731
00732 ret = TRUE;
00733
00734 out_free_str:
00735 g_free(pidstr);
00736 out_free_path:
00737 g_free(pidpath);
00738
00739 return ret;
00740 }
00741
00742
00743 osync_bool osync_client_spawn(OSyncClient *client, OSyncEngine *engine, OSyncError **error)
00744 {
00745 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, engine, error);
00746
00747 int waiting = 0;
00748
00749 if (!osync_client_kill_old_osplugin(client, error))
00750 goto error;
00751
00752 if (!osync_queue_exists(client->commands_to_osplugin) || !osync_queue_is_alive(client->commands_to_osplugin)) {
00753 pid_t cpid = fork();
00754 if (cpid == 0) {
00755 osync_trace_reset_indent();
00756
00757
00758 osync_env_export_all_options(osync_group_get_env(engine->group));
00759
00760 OSyncMember *member = client->member;
00761 OSyncPlugin *plugin = osync_member_get_plugin(member);
00762 const char *path = osync_plugin_get_path(plugin);
00763 setenv("OSYNC_MODULE_LIST", path, 1);
00764
00765 osync_env_export_loaded_modules(osync_group_get_env(engine->group));
00766
00767 char *memberstring = g_strdup_printf("%lli", osync_member_get_id(client->member));
00768 execlp(OSPLUGIN, OSPLUGIN, osync_group_get_configdir(engine->group), memberstring, NULL);
00769
00770 if (errno == ENOENT) {
00771 execlp("./osplugin", "osplugin", osync_group_get_configdir(engine->group), memberstring, NULL);
00772 }
00773
00774 osync_trace(TRACE_INTERNAL, "unable to exec");
00775 exit(1);
00776 }
00777
00778 client->child_pid = cpid;
00779
00780
00781 while (!osync_queue_exists(client->commands_to_osplugin) && waiting <= 5) {
00782 osync_trace(TRACE_INTERNAL, "Waiting for other side to create fifo");
00783
00784 sleep(1);
00785 waiting++;
00786 }
00787
00788 osync_trace(TRACE_INTERNAL, "Queue was created");
00789 }
00790
00791 if (client->child_pid) {
00792 if (!osync_client_create_pidfile(client, error))
00793 goto error;
00794 }
00795
00796 if (!osync_queue_connect(client->commands_to_osplugin, OSYNC_QUEUE_SENDER, error))
00797 goto error;
00798
00799 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_INITIALIZE, 0, error);
00800 if (!message)
00801 goto error_disconnect;
00802
00803 osync_message_write_string(message, client->commands_from_osplugin->name);
00804
00805 if (!osync_queue_send_message(client->commands_to_osplugin, NULL, message, error))
00806 goto error_free_message;
00807
00808 osync_message_unref(message);
00809
00810 osync_trace(TRACE_EXIT, "%s", __func__);
00811 return TRUE;
00812
00813 error_free_message:
00814 osync_message_unref(message);
00815 error_disconnect:
00816 osync_queue_disconnect(client->commands_to_osplugin, NULL);
00817 error:
00818 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00819 return FALSE;
00820 }
00821
00822 osync_bool osync_client_init(OSyncClient *client, OSyncEngine *engine, OSyncError **error)
00823 {
00824 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, engine, error);
00825
00826 OSyncMessage *reply = osync_queue_get_message(client->commands_from_osplugin);
00827
00828 osync_trace(TRACE_INTERNAL, "reply received %i", reply->cmd);
00829 if (reply->cmd == OSYNC_MESSAGE_ERRORREPLY) {
00830 if (error)
00831 osync_demarshal_error(reply, error);
00832 goto error_free_reply;
00833 }
00834
00835 if (reply->cmd != OSYNC_MESSAGE_REPLY) {
00836 osync_error_set(error, OSYNC_ERROR_GENERIC, "Invalid answer from plugin process");
00837 goto error_free_reply;
00838 }
00839
00840 osync_message_unref(reply);
00841
00842 osync_trace(TRACE_EXIT, "%s", __func__);
00843 return TRUE;
00844
00845 error_free_reply:
00846 osync_message_unref(reply);
00847 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00848 return FALSE;
00849 }
00850
00851 osync_bool osync_client_finalize(OSyncClient *client, OSyncError **error)
00852 {
00853 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, client, error);
00854
00855 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_FINALIZE, 0, error);
00856 if (!message)
00857 goto error;
00858
00859 if (!osync_queue_send_message(client->commands_to_osplugin, NULL, message, error))
00860 goto error_free_message;
00861
00862 osync_message_unref(message);
00863
00864 if (client->child_pid) {
00865 int status;
00866 if (waitpid(client->child_pid, &status, 0) == -1) {
00867 osync_error_set(error, OSYNC_ERROR_GENERIC, "Error waiting for osplugin process: %s", strerror(errno));
00868 goto error;
00869 }
00870
00871 if (!WIFEXITED(status))
00872 osync_trace(TRACE_INTERNAL, "Child has exited abnormally");
00873 else if (WEXITSTATUS(status) != 0)
00874 osync_trace(TRACE_INTERNAL, "Child has returned non-zero exit status (%d)", WEXITSTATUS(status));
00875
00876 if (!osync_client_remove_pidfile(client, error))
00877 goto error;
00878 }
00879
00880 osync_queue_disconnect(client->commands_to_osplugin, NULL);
00881
00882
00883 osync_trace(TRACE_EXIT, "%s", __func__);
00884 return TRUE;
00885
00886 error_free_message:
00887 osync_message_unref(message);
00888 error:
00889 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00890 return FALSE;
00891 }