• Main Page
  • Related Pages
  • Modules
  • Data Structures
  • Files
  • File List

osengine/osengine_client.c

00001 /*
00002  * libosengine - A synchronization engine for the opensync framework
00003  * Copyright (C) 2004-2005  Armin Bauer <armin.bauer@opensync.org>
00004  * 
00005  * This library is free software; you can redistribute it and/or
00006  * modify it under the terms of the GNU Lesser General Public
00007  * License as published by the Free Software Foundation; either
00008  * version 2.1 of the License, or (at your option) any later version.
00009  * 
00010  * This library is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013  * Lesser General Public License for more details.
00014  * 
00015  * You should have received a copy of the GNU Lesser General Public
00016  * License along with this library; if not, write to the Free Software
00017  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
00018  * 
00019  */
00020 
00021 #include "config.h"
00022 #include "engine.h"
00023 #include <glib.h>
00024 #include <opensync/opensync_support.h>
00025 #include "opensync/opensync_format_internals.h"
00026 #include "opensync/opensync_member_internals.h"
00027 #include "opensync/opensync_message_internals.h"
00028 #include "opensync/opensync_queue_internals.h"
00029 
00030 #include "engine_internals.h"
00031 #include <unistd.h>
00032 
00033 #include <sys/types.h>
00034 #include <sys/wait.h>
00035 #include <errno.h>
00036 #include <signal.h>
00037  
00043 void _get_changes_reply_receiver(OSyncMessage *message, OSyncClient *sender)
00044 {
00045         osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, sender);
00046         OSyncEngine *engine = sender->engine;
00047         
00048         if (osync_message_is_error(message)) {
00049                 OSyncError *error = NULL;
00050                 osync_demarshal_error(message, &error);
00051                 osync_error_duplicate(&engine->error, &error);
00052                 osync_debug("ENG", 1, "Get changes command reply was a error: %s", osync_error_print(&error));
00053                 osync_status_update_member(engine, sender, MEMBER_GET_CHANGES_ERROR, &error);
00054                 osync_error_update(&engine->error, "Unable to read from one of the members");
00055                 osync_flag_unset(sender->fl_sent_changes);
00056                 //osync_flag_set(sender->fl_finished);
00057                 osync_flag_set(sender->fl_done);
00058                 /*
00059                  * FIXME: For now we want to stop the engine if
00060                  * one of the member didnt connect yet. Later it should
00061                  * be that if >= 2 members connect, the sync should continue
00062                  */
00063                 osync_flag_set(engine->fl_stop);
00064                 
00065         } else {
00066                 osync_status_update_member(engine, sender, MEMBER_SENT_CHANGES, NULL);
00067                 osync_flag_set(sender->fl_sent_changes);
00068         }
00069         
00070         osengine_client_decider(engine, sender);
00071         osync_trace(TRACE_EXIT, "_get_changes_reply_receiver");
00072 }
00073 
00079 void _connect_reply_receiver(OSyncMessage *message, OSyncClient *sender)
00080 {
00081         osync_trace(TRACE_ENTRY, "_connect_reply_receiver(%p, %p)", message, sender);
00082         
00083         osync_trace(TRACE_INTERNAL, "connect reply %i", osync_message_is_error(message));
00084         OSyncEngine *engine = sender->engine;
00085         
00086         if (osync_message_is_error(message)) {
00087                 OSyncError *error = NULL;
00088                 osync_demarshal_error(message, &error);
00089                 osync_error_duplicate(&engine->error, &error);
00090                 osync_debug("ENG", 1, "Connect command reply was a error: %s", osync_error_print(&error));
00091                 osync_status_update_member(engine, sender, MEMBER_CONNECT_ERROR, &error);
00092                 osync_error_update(&engine->error, "Unable to connect one of the members");
00093                 osync_flag_unset(sender->fl_connected);
00094                 osync_flag_set(sender->fl_finished);
00095                 osync_flag_set(sender->fl_sent_changes);
00096                 osync_flag_set(sender->fl_done);
00097                 /*
00098                  * FIXME: For now we want to stop the engine if
00099                  * one of the member didnt connect yet. Later it should
00100                  * be that if >= 2 members connect, the sync should continue
00101                  */
00102                 osync_flag_set(engine->fl_stop);
00103                 
00104         } else {
00105                 osync_member_read_sink_info(sender->member, message);
00106 
00107                 osync_status_update_member(engine, sender, MEMBER_CONNECTED, NULL);
00108                 osync_flag_set(sender->fl_connected);   
00109         }
00110 
00111         osengine_client_decider(engine, sender);
00112         osync_trace(TRACE_EXIT, "_connect_reply_receiver");
00113 }
00114 
00115 void _sync_done_reply_receiver(OSyncMessage *message, OSyncClient *sender)
00116 {
00117         osync_trace(TRACE_ENTRY, "_sync_done_reply_receiver(%p, %p)", message, sender);
00118 
00119         OSyncEngine *engine = sender->engine;
00120         
00121         if (osync_message_is_error(message)) {
00122                 OSyncError *error = NULL;
00123                 osync_demarshal_error(message, &error);
00124                 osync_error_duplicate(&engine->error, &error);
00125                 osync_debug("ENG", 1, "Sync done command reply was a error: %s", osync_error_print(&error));
00126                 osync_status_update_member(engine, sender, MEMBER_SYNC_DONE_ERROR, &error);
00127                 osync_error_update(&engine->error, "Unable to finish the sync for one of the members");
00128         }
00129         
00130         osync_flag_set(sender->fl_done);
00131         osengine_client_decider(engine, sender);
00132         osync_trace(TRACE_EXIT, "_sync_done_reply_receiver");
00133 }
00134 
00135 void _committed_all_reply_receiver(OSyncMessage *message, OSyncClient *sender)
00136 {
00137         osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, sender);
00138 
00139         OSyncEngine *engine = sender->engine;
00140         
00141         if (osync_message_is_error(message)) {
00142                 OSyncError *error = NULL;
00143                 osync_demarshal_error(message, &error);
00144                 osync_error_duplicate(&engine->error, &error);
00145                 osync_debug("ENG", 1, "Committed all command reply was a error: %s", osync_error_print(&error));
00146                 osync_status_update_member(engine, sender, MEMBER_COMMITTED_ALL_ERROR, &error);
00147                 osync_error_update(&engine->error, "Unable to write changes to one of the members");
00148         } else
00149                 osync_status_update_member(engine, sender, MEMBER_COMMITTED_ALL, NULL);
00150         
00151         osync_flag_set(sender->fl_committed_all);
00152         osengine_client_decider(engine, sender);
00153         osync_trace(TRACE_EXIT, "%s", __func__);
00154 }
00155 
00156 void _disconnect_reply_receiver(OSyncMessage *message, OSyncClient *sender)
00157 {
00158         osync_trace(TRACE_ENTRY, "_disconnect_reply_receiver(%p, %p)", message, sender);
00159         
00160         OSyncEngine *engine = sender->engine;
00161 
00162         if (osync_message_is_error(message)) {
00163                 OSyncError *error = NULL;
00164                 osync_demarshal_error(message, &error);
00165                 osync_debug("ENG", 1, "Sync done command reply was a error: %s", osync_error_print(&error));
00166                 osync_status_update_member(engine, sender, MEMBER_DISCONNECT_ERROR, &error);
00167         } else
00168                 osync_status_update_member(engine, sender, MEMBER_DISCONNECTED, NULL);
00169                         
00170         osync_flag_unset(sender->fl_connected);
00171         osync_flag_set(sender->fl_finished);
00172         osengine_client_decider(engine, sender);
00173         osync_trace(TRACE_EXIT, "_disconnect_reply_receiver");
00174 }
00175 
00176 void _get_change_data_reply_receiver(OSyncMessage *message, OSyncMappingEntry *entry)
00177 {
00178         osync_trace(TRACE_ENTRY, "_get_change_data_reply_receiver(%p, %p, %p)", message, entry);
00179         OSyncEngine *engine = entry->client->engine;
00180         
00181         if (osync_message_is_error(message)) {
00182                 OSyncError *error = NULL;
00183                 osync_demarshal_error(message, &error);
00184                 osync_error_duplicate(&engine->error, &error);
00185                 osync_debug("MAP", 1, "Commit change command reply was a error: %s", osync_error_print(&error));
00186                 osync_status_update_change(engine, entry->change, CHANGE_RECV_ERROR, &error);
00187                 osync_error_update(&engine->error, "Unable to read one or more objects");
00188                 
00189                 //FIXME Do we need to do anything here?
00190                 //osync_flag_unset(entry->fl_has_data);
00191         } else {
00192 
00193                 osync_demarshal_changedata(message, entry->change);
00194 
00195                 osync_flag_set(entry->fl_has_data);
00196                 osync_status_update_change(engine, entry->change, CHANGE_RECEIVED, NULL);
00197         }
00198         
00199         osync_change_save(entry->change, TRUE, NULL);
00200         osengine_mappingentry_decider(engine, entry);
00201         osync_trace(TRACE_EXIT, "_get_change_data_reply_receiver");
00202 }
00203 
00204 void _read_change_reply_receiver(OSyncClient *sender, OSyncMessage *message, OSyncEngine *engine)
00205 {
00206         osync_trace(TRACE_ENTRY, "_read_change_reply_receiver(%p, %p, %p)", sender, message, engine);
00207         
00208         /*OSyncMappingEntry *entry = osync_message_get_data(message, "entry");
00209         
00210         osync_flag_detach(entry->fl_read);
00211         
00212         osync_flag_unset(entry->mapping->fl_solved);
00213         osync_flag_unset(entry->mapping->fl_chkconflict);
00214         osync_flag_unset(entry->mapping->fl_multiplied);
00215         
00216         if (osync_change_get_changetype(entry->change) == CHANGE_DELETED)
00217                 osync_flag_set(entry->fl_deleted);
00218         
00219         osync_flag_set(entry->fl_has_info);
00220         osync_flag_unset(entry->fl_synced);
00221         
00222         osync_change_save(entry->change, TRUE, NULL);
00223         
00224         osync_status_update_change(engine, entry->change, CHANGE_RECEIVED, NULL);
00225         
00226         osengine_mappingentry_decider(engine, entry);*/
00227         osync_trace(TRACE_EXIT, "_read_change_reply_receiver");
00228 }
00229 
00230 void _commit_change_reply_receiver(OSyncMessage *message, OSyncMappingEntry *entry)
00231 {
00232         osync_trace(TRACE_ENTRY, "_commit_change_reply_receiver(%p, %p)", message, entry);
00233         OSyncEngine *engine = entry->client->engine;
00234 
00235         if (osync_message_is_error(message)) {
00236                 OSyncError *error = NULL;
00237                 osync_demarshal_error(message, &error);
00238                 osync_error_duplicate(&engine->error, &error);
00239                 osync_debug("MAP", 1, "Commit change command reply was a error: %s", osync_error_print(&error));
00240                 osync_status_update_change(engine, entry->change, CHANGE_WRITE_ERROR, &error);
00241                 OSyncError *maperror = NULL;
00242                 osync_error_duplicate(&maperror, &error);
00243                 osync_status_update_mapping(engine, entry->mapping, MAPPING_WRITE_ERROR, &maperror);
00244                 osync_error_update(&engine->error, "Unable to write one or more objects");
00245                 
00246                 //FIXME Do we need to do anything here?
00247                 osync_flag_unset(entry->fl_dirty);
00248                 osync_flag_set(entry->fl_synced);
00249         } else {
00250                 /* The plugin may have generated a new UID after committing the change. The commit
00251                  * change reply will return the new UID of the change
00252                  */
00253 
00254                 char *newuid;
00255                 osync_message_read_string(message, &newuid);
00256                 osync_change_set_uid(entry->change, newuid);
00257 
00258                 osync_status_update_change(engine, entry->change, CHANGE_SENT, NULL);
00259                 osync_flag_unset(entry->fl_dirty);
00260                 osync_flag_set(entry->fl_synced);
00261         }
00262         
00263         if (osync_change_get_changetype(entry->change) == CHANGE_DELETED)
00264                 osync_flag_set(entry->fl_deleted);
00265         
00266         osync_change_reset(entry->change);
00267         
00268         OSyncError *error = NULL;
00269         osync_change_save(entry->change, TRUE, &error);
00270         
00271         osengine_mappingentry_decider(engine, entry);
00272         osync_trace(TRACE_EXIT, "_commit_change_reply_receiver");
00273 }
00274 
00275 OSyncClient *osync_client_new(OSyncEngine *engine, OSyncMember *member, OSyncError **error)
00276 {
00277         osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, engine, member, error);
00278         OSyncClient *client = osync_try_malloc0(sizeof(OSyncClient), error);
00279         if (!client)
00280                 goto error;
00281                 
00282         client->member = member;
00283         osync_member_set_data(member, client);
00284         client->engine = engine;
00285         engine->clients = g_list_append(engine->clients, client);
00286         
00287         char *name = g_strdup_printf("%s/pluginpipe", osync_member_get_configdir(member));
00288         client->commands_to_osplugin = osync_queue_new(name, error);
00289         g_free(name);
00290 
00291         name = g_strdup_printf("%s/enginepipe", osync_member_get_configdir(member));
00292         client->commands_from_osplugin = osync_queue_new(name, error);
00293         g_free(name);
00294 
00295         if (!client->commands_to_osplugin || !client->commands_from_osplugin)
00296                 goto error_free_client;
00297                 
00298         client->fl_connected = osync_flag_new(engine->cmb_connected);
00299         client->fl_sent_changes = osync_flag_new(engine->cmb_sent_changes);
00300         client->fl_done = osync_flag_new(NULL);
00301         client->fl_committed_all = osync_flag_new(engine->cmb_committed_all_sent);
00302         client->fl_finished = osync_flag_new(engine->cmb_finished);
00303         
00304         osync_trace(TRACE_EXIT, "%s: %p", __func__, client);
00305     return client;
00306 
00307 error_free_client:
00308         g_free(client);
00309 error:
00310         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00311         return NULL;
00312 }
00313 
00314 void osync_client_reset(OSyncClient *client)
00315 {
00316         osync_trace(TRACE_ENTRY, "%s(%p)", __func__, client);
00317         osync_flag_set_state(client->fl_connected, FALSE);
00318         osync_flag_set_state(client->fl_sent_changes, FALSE);
00319         osync_flag_set_state(client->fl_done, FALSE);
00320         osync_flag_set_state(client->fl_finished, FALSE);
00321         osync_flag_set_state(client->fl_committed_all, FALSE);
00322         osync_trace(TRACE_EXIT, "%s", __func__);
00323 }
00324 
00325 void osync_client_free(OSyncClient *client)
00326 {
00327         osync_trace(TRACE_ENTRY, "%s(%p)", __func__, client);
00328         osync_queue_free(client->commands_to_osplugin);
00329         osync_queue_free(client->commands_from_osplugin);
00330         
00331         osync_flag_free(client->fl_connected);
00332         osync_flag_free(client->fl_sent_changes);
00333         osync_flag_free(client->fl_done);
00334         osync_flag_free(client->fl_finished);
00335         osync_flag_free(client->fl_committed_all);
00336 
00337         g_free(client);
00338         osync_trace(TRACE_EXIT, "%s", __func__);
00339 }
00340 
00341 void *osync_client_message_sink(OSyncMember *member, const char *name, void *data, osync_bool synchronous)
00342 {
00343         OSyncClient *client = osync_member_get_data(member);
00344         OSyncEngine *engine = client->engine;
00345         if (!synchronous) {
00346                 /*OSyncMessage *message = itm_message_new_signal(client, "PLUGIN_MESSAGE");
00347                 osync_debug("CLI", 3, "Sending message %p PLUGIN_MESSAGE for message %s", message, name);
00348                 itm_message_set_data(message, "data", data);
00349                 itm_message_set_data(message, "name", g_strdup(name));
00350                 itm_queue_send(engine->incoming, message);*/
00351                 return NULL;
00352         } else {
00353                 return engine->plgmsg_callback(engine, client, name, data, engine->plgmsg_userdata);
00354         }       
00355 }
00356 
00357 OSyncPluginTimeouts osync_client_get_timeouts(OSyncClient *client)
00358 {
00359         return osync_plugin_get_timeouts(osync_member_get_plugin(client->member));
00360 }
00361 
00362 void osync_client_call_plugin(OSyncClient *client, char *function, void *data, OSyncPluginReplyHandler replyhandler, void *userdata)
00363 {
00364         osync_trace(TRACE_ENTRY, "%s(%p, %s, %p, %p, %p)", __func__, client, function, data, replyhandler, userdata);
00365         
00366         /*OSyncEngine *engine = client->engine;
00367         ITMessage *message = itm_message_new_methodcall(engine, "CALL_PLUGIN");
00368         itm_message_set_data(message, "data", data);
00369         itm_message_set_data(message, "function", g_strdup(function));
00370         
00371         if (replyhandler) {
00372                 OSyncPluginCallContext *ctx = g_malloc0(sizeof(OSyncPluginCallContext));
00373                 ctx->handler = replyhandler;
00374                 ctx->userdata = userdata;
00375                 itm_message_set_handler(message, engine->incoming, (ITMessageHandler)_recv_plugin_answer, ctx);
00376 
00377                 itm_message_set_data(message, "want_reply", GINT_TO_POINTER(1));
00378         } else
00379                 itm_message_set_data(message, "want_reply", GINT_TO_POINTER(0));
00380         
00381         itm_queue_send(client->incoming, message);*/
00382         
00383         osync_trace(TRACE_EXIT, "%s", __func__);
00384 }
00385 
00386 osync_bool osync_client_get_changes(OSyncClient *target, OSyncEngine *sender, OSyncError **error)
00387 {
00388         osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, error);
00389         
00390         osync_flag_changing(target->fl_sent_changes);
00391         
00392         OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_GET_CHANGES, 0, error);
00393         if (!message)
00394                 goto error;
00395                 
00396         osync_message_set_handler(message, (OSyncMessageHandler)_get_changes_reply_receiver, target);
00397 
00398         osync_member_write_sink_info(target->member, message);
00399         
00400         OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
00401         if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.get_changeinfo_timeout, error))
00402                 goto error_free_message;
00403         
00404         osync_message_unref(message);
00405         
00406         osync_trace(TRACE_EXIT, "%s", __func__);
00407         return TRUE;
00408 
00409 error_free_message:
00410         osync_message_unref(message);
00411 error:
00412         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00413         return FALSE;
00414 }
00415 
00416 osync_bool osync_client_get_change_data(OSyncClient *target, OSyncEngine *sender, OSyncMappingEntry *entry, OSyncError **error)
00417 {
00418         osync_flag_changing(entry->fl_has_data);
00419 
00420         OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_GET_CHANGEDATA, 0, error);
00421         if (!message)
00422                 goto error;
00423                 
00424         osync_message_set_handler(message, (OSyncMessageHandler)_get_change_data_reply_receiver, entry);
00425 
00426         osync_marshal_change(message, entry->change);
00427 
00428         osync_debug("ENG", 3, "Sending get_changedata message %p to client %p", message, entry->client);
00429         
00430         OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
00431         if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.get_data_timeout, error))
00432                 goto error_free_message;
00433         
00434         osync_message_unref(message);
00435         
00436         osync_trace(TRACE_EXIT, "%s", __func__);
00437         return TRUE;
00438 
00439 error_free_message:
00440         osync_message_unref(message);
00441 error:
00442         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00443         return FALSE;
00444 }
00445 
00446 /*void osync_client_read_change(OSyncEngine *sender, OSyncMappingEntry *entry)
00447 {
00448         //osync_flag_changing(entry->fl_has_data);
00449         OSyncMessage *message = osync_message_new_methodcall(sender, "READ_CHANGE");
00450         osync_message_set_handler(message, sender->incoming, (OSyncMessageHandler)_read_change_reply_receiver, sender);
00451         osync_message_set_data(message, "change", entry->change);
00452         osync_message_set_data(message, "entry", entry);
00453         osync_debug("ENG", 3, "Sending read_change message %p to client %p", message, entry->client);
00454         
00455         OSyncPluginTimeouts timeouts = osync_client_get_timeouts(entry->client);
00456         osync_queue_send_with_timeout(entry->client->incoming, message, timeouts.read_change_timeout, sender);
00457 }*/
00458 
00459 osync_bool osync_client_connect(OSyncClient *target, OSyncEngine *sender, OSyncError **error)
00460 {
00461         osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, error);
00462         
00463         osync_flag_changing(target->fl_connected);
00464         
00465         OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_CONNECT, 0, error);
00466         if (!message)
00467                 goto error;
00468                 
00469         osync_member_write_sink_info(target->member, message);
00470 
00471         osync_message_set_handler(message, (OSyncMessageHandler)_connect_reply_receiver, target);
00472         
00473         OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
00474         if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.connect_timeout, error))
00475                 goto error_free_message;
00476         
00477         osync_message_unref(message);
00478         
00479         osync_trace(TRACE_EXIT, "%s", __func__);
00480         return TRUE;
00481 
00482 error_free_message:
00483         osync_message_unref(message);
00484 error:
00485         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00486         return FALSE;
00487 }
00488 
00489 osync_bool osync_client_commit_change(OSyncClient *target, OSyncEngine *sender, OSyncMappingEntry *entry, OSyncError **error)
00490 {
00491         osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, entry);
00492         osync_trace(TRACE_INTERNAL, "Committing change with uid %s, changetype %i, data %p, size %i, objtype %s and format %s from member %lli", osync_change_get_uid(entry->change), osync_change_get_changetype(entry->change), osync_change_get_data(entry->change), osync_change_get_datasize(entry->change), osync_change_get_objtype(entry->change) ? osync_objtype_get_name(osync_change_get_objtype(entry->change)) : "None", osync_change_get_objformat(entry->change) ? osync_objformat_get_name(osync_change_get_objformat(entry->change)) : "None", osync_member_get_id(entry->client->member));
00493         
00494         osync_flag_changing(entry->fl_dirty);
00495 
00496         // convert the data to the format accepted by the member
00497         if (!osync_change_convert_member_sink(osync_group_get_format_env(sender->group), entry->change, target->member, error))
00498                 goto error;
00499 
00500         if (osync_change_get_changetype(entry->change) == CHANGE_ADDED) {
00501                 int elevated = 0;
00502                 // Generate a new UID, if necessary
00503                 OSyncMappingView *view = osengine_mappingtable_find_view(sender->maptable, target->member);
00504                 while (!osengine_mappingview_uid_is_unique(view, entry, TRUE)) {
00505                         if (!osync_change_elevate(sender, entry->change, 1))
00506                                 break;
00507                         elevated++;
00508                 }
00509 
00510                 if (elevated) {
00511                         // Save the newly generated UID
00512                         if (!osync_change_save(entry->change, TRUE, error))
00513                                 goto error;
00514                 }
00515         }
00516 
00517         OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_COMMIT_CHANGE, 0, error);
00518         if (!message)
00519                 goto error;
00520 
00521         osync_marshal_change(message, entry->change);
00522 
00523         osync_message_set_handler(message, (OSyncMessageHandler)_commit_change_reply_receiver, entry);
00524         OSyncPluginTimeouts timeouts = osync_client_get_timeouts(entry->client);
00525         
00526         if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.commit_timeout, error))
00527                 goto error_free_message;
00528 
00529         osync_message_unref(message);
00530 
00531         g_assert(osync_flag_is_attached(entry->fl_committed) == TRUE);
00532         osync_flag_detach(entry->fl_committed);
00533         
00534         osync_trace(TRACE_EXIT, "%s", __func__);
00535         return TRUE;
00536 
00537 error_free_message:
00538         osync_message_unref(message);
00539 error:
00540         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00541         return FALSE;
00542 }
00543 
00544 osync_bool osync_client_sync_done(OSyncClient *target, OSyncEngine *sender, OSyncError **error)
00545 {
00546         osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, error);
00547 
00548         osync_flag_changing(target->fl_done);
00549         OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_SYNC_DONE, 0, error);
00550         if (!message)
00551                 goto error;
00552 
00553         osync_message_set_handler(message, (OSyncMessageHandler)_sync_done_reply_receiver, target);
00554         
00555         OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
00556         if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.sync_done_timeout, error))
00557                 goto error_free_message;
00558         
00559         osync_message_unref(message);
00560         
00561         osync_trace(TRACE_EXIT, "%s", __func__);
00562         return TRUE;
00563 
00564 error_free_message:
00565         osync_message_unref(message);
00566 error:
00567         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00568         return FALSE;
00569 }
00570 
00571 osync_bool osync_client_committed_all(OSyncClient *target, OSyncEngine *sender, OSyncError **error)
00572 {
00573         osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, target, sender);
00574 
00575         osync_flag_changing(target->fl_committed_all);
00576 
00577         OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_COMMITTED_ALL, 0, error);
00578         if (!message)
00579                 goto error;
00580                 
00581         osync_message_set_handler(message, (OSyncMessageHandler)_committed_all_reply_receiver, target);
00582         
00583         //OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
00584         /*FIXME: Add timeout to committed_all message */
00585         if (!osync_queue_send_message(target->commands_to_osplugin, target->commands_from_osplugin, message, error))
00586                 goto error_free_message;
00587         
00588         osync_message_unref(message);
00589         
00590         osync_trace(TRACE_EXIT, "%s", __func__);
00591         return TRUE;
00592 
00593 error_free_message:
00594         osync_message_unref(message);
00595 error:
00596         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00597         return FALSE;
00598 }
00599 
00600 osync_bool osync_client_disconnect(OSyncClient *target, OSyncEngine *sender, OSyncError **error)
00601 {
00602         osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, target, sender);
00603 
00604         osync_flag_changing(target->fl_connected);
00605 
00606         OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_DISCONNECT, 0, error);
00607         if (!message)
00608                 goto error;
00609                 
00610         osync_message_set_handler(message, (OSyncMessageHandler)_disconnect_reply_receiver, target);
00611         
00612         OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target);
00613         if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.disconnect_timeout, error))
00614                 goto error_free_message;
00615         
00616         osync_message_unref(message);
00617         
00618         osync_trace(TRACE_EXIT, "%s", __func__);
00619         return TRUE;
00620 
00621 error_free_message:
00622         osync_message_unref(message);
00623 error:
00624         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00625         return FALSE;
00626 }
00627 
00628 
00629 /*
00630 void osync_client_call_plugin_with_reply(OSyncClient *client, char *function, void *data, void ( *replyhandler)(OSyncEngine *, OSyncClient *, void *, OSyncError *), int timeout)
00631 {
00632         OSyncEngine *engine = client->engine;
00633         ITMessage *message = itm_message_new_signal(engine, "CALL_PLUGIN");
00634         osync_debug("CLI", 3, "Sending message %p CALL_PLUGIN for function %s", message, function);
00635         itm_message_set_data(message, "data", data);
00636         itm_message_set_data(message, "function", g_strdup(function));
00637         itm_queue_send_with_reply(client->incoming, message);
00638 }*/
00639 
00640 char *osync_client_pid_filename(OSyncClient *client)
00641 {
00642         return g_strdup_printf("%s/osplugin.pid", client->member->configdir);
00643 }
00644 
00645 osync_bool osync_client_remove_pidfile(OSyncClient *client, OSyncError **error)
00646 {
00647         osync_bool ret = FALSE;
00648         char *pidpath = osync_client_pid_filename(client);
00649 
00650         if (unlink(pidpath) < 0) {
00651                 osync_error_set(error, OSYNC_ERROR_GENERIC, "Couldn't remove pid file: %s", strerror(errno));
00652                 goto out_free_path;
00653         }
00654 
00655         /* Success */
00656         ret = TRUE;
00657 
00658 out_free_path:
00659         g_free(pidpath);
00660 //out:
00661         return ret;
00662 }
00663 
00664 osync_bool osync_client_create_pidfile(OSyncClient *client, OSyncError **error)
00665 {
00666         osync_bool ret = FALSE;
00667         char *pidpath = osync_client_pid_filename(client);
00668         char *pidstr = g_strdup_printf("%ld", (long)client->child_pid);
00669 
00670         if (!osync_file_write(pidpath, pidstr, strlen(pidstr), 0644, error))
00671                 goto out_free_pidstr;
00672 
00673         /* Success */
00674         ret = TRUE;
00675 
00676 out_free_pidstr:
00677         g_free(pidstr);
00678 //out_free_path:
00679         g_free(pidpath);
00680 //out:
00681         return ret;
00682 }
00683 
00684 osync_bool osync_client_kill_old_osplugin(OSyncClient *client, OSyncError **error)
00685 {
00686         osync_bool ret = FALSE;
00687 
00688         char *pidstr;
00689         int pidlen;
00690         pid_t pid;
00691 
00692         char *pidpath = osync_client_pid_filename(client);
00693 
00694         /* Simply returns if there is no PID file */
00695         if (!g_file_test(pidpath, G_FILE_TEST_EXISTS)) {
00696                 ret = TRUE;
00697                 goto out_free_path;
00698         }
00699 
00700         if (!osync_file_read(pidpath, &pidstr, &pidlen, error))
00701                 goto out_free_path;
00702 
00703         pid = atol(pidstr);
00704         if (!pid)
00705                 goto out_free_str;
00706 
00707         osync_trace(TRACE_INTERNAL, "Killing old osplugin process. PID: %ld", (long)pid);
00708 
00709         if (kill(pid, SIGTERM) < 0) {
00710                 osync_trace(TRACE_INTERNAL, "Error killing old osplugin: %s. Stale pid file?", strerror(errno));
00711                 /* Don't return failure if kill() failed, because it may be a stale pid file */
00712         }
00713 
00714         int count = 0;
00715         while (osync_queue_is_alive(client->commands_to_osplugin)) {
00716                 if (count++ > 10) {
00717                         osync_trace(TRACE_INTERNAL, "Killing old osplugin process with SIGKILL");
00718                         kill(pid, SIGKILL);
00719                         break;
00720                 }
00721                 osync_trace(TRACE_INTERNAL, "Waiting for other side to terminate");
00722                 /*FIXME: Magic numbers are evil */
00723                 usleep(500000);
00724         }
00725 
00726         if (unlink(pidpath) < 0) {
00727                 osync_error_set(error, OSYNC_ERROR_GENERIC, "Couldn't erase PID file: %s", strerror(errno));
00728                 goto out_free_str;
00729         }
00730 
00731         /* Success */
00732         ret = TRUE;
00733 
00734 out_free_str:
00735         g_free(pidstr);
00736 out_free_path:
00737         g_free(pidpath);
00738 //out:
00739         return ret;
00740 }
00741 
00742 
00743 osync_bool osync_client_spawn(OSyncClient *client, OSyncEngine *engine, OSyncError **error)
00744 {
00745         osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, engine, error);
00746 
00747         int waiting = 0;
00748         
00749         if (!osync_client_kill_old_osplugin(client, error))
00750                 goto error;
00751 
00752         if (!osync_queue_exists(client->commands_to_osplugin) || !osync_queue_is_alive(client->commands_to_osplugin)) {
00753                 pid_t cpid = fork();
00754                 if (cpid == 0) {
00755                         osync_trace_reset_indent();
00756                         
00757                         /* Export all options to osplugin through environment variables */
00758                         osync_env_export_all_options(osync_group_get_env(engine->group));
00759 
00760                         OSyncMember *member = client->member;
00761                         OSyncPlugin *plugin = osync_member_get_plugin(member);
00762                         const char *path = osync_plugin_get_path(plugin);
00763                         setenv("OSYNC_MODULE_LIST", path, 1);
00764 
00765                         osync_env_export_loaded_modules(osync_group_get_env(engine->group));
00766 
00767                         char *memberstring = g_strdup_printf("%lli", osync_member_get_id(client->member));
00768                         execlp(OSPLUGIN, OSPLUGIN, osync_group_get_configdir(engine->group), memberstring, NULL);
00769                         
00770                         if (errno == ENOENT) {
00771                                 execlp("./osplugin", "osplugin", osync_group_get_configdir(engine->group), memberstring, NULL);
00772                         }
00773                         
00774                         osync_trace(TRACE_INTERNAL, "unable to exec");
00775                         exit(1);
00776                 }
00777 
00778                 client->child_pid = cpid;
00779                 
00780                 /* We are going to wait 5 seconds for plugin */
00781                 while (!osync_queue_exists(client->commands_to_osplugin) && waiting <= 5) {
00782                         osync_trace(TRACE_INTERNAL, "Waiting for other side to create fifo");
00783 
00784                         sleep(1);
00785                         waiting++;
00786                 }
00787                 
00788                 osync_trace(TRACE_INTERNAL, "Queue was created");
00789         }
00790 
00791         if (client->child_pid) {
00792                 if (!osync_client_create_pidfile(client, error))
00793                         goto error;
00794         }
00795                 
00796         if (!osync_queue_connect(client->commands_to_osplugin, OSYNC_QUEUE_SENDER, error))
00797                 goto error;
00798         
00799         OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_INITIALIZE, 0, error);
00800         if (!message)
00801                 goto error_disconnect;
00802         
00803         osync_message_write_string(message, client->commands_from_osplugin->name);
00804         
00805         if (!osync_queue_send_message(client->commands_to_osplugin, NULL, message, error))
00806                 goto error_free_message;
00807         
00808         osync_message_unref(message);
00809         
00810         osync_trace(TRACE_EXIT, "%s", __func__);
00811         return TRUE;
00812         
00813 error_free_message:
00814         osync_message_unref(message);
00815 error_disconnect:
00816         osync_queue_disconnect(client->commands_to_osplugin, NULL);
00817 error:
00818         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00819         return FALSE;
00820 }
00821 
00822 osync_bool osync_client_init(OSyncClient *client, OSyncEngine *engine, OSyncError **error)
00823 {
00824         osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, engine, error);
00825         
00826         OSyncMessage *reply = osync_queue_get_message(client->commands_from_osplugin);
00827         
00828         osync_trace(TRACE_INTERNAL, "reply received %i", reply->cmd);
00829         if (reply->cmd == OSYNC_MESSAGE_ERRORREPLY) {
00830                 if (error)
00831                         osync_demarshal_error(reply, error);
00832                 goto error_free_reply;
00833         }
00834 
00835         if (reply->cmd != OSYNC_MESSAGE_REPLY) {
00836                 osync_error_set(error, OSYNC_ERROR_GENERIC, "Invalid answer from plugin process");
00837                 goto error_free_reply;
00838         }
00839         
00840         osync_message_unref(reply);
00841         
00842         osync_trace(TRACE_EXIT, "%s", __func__);
00843     return TRUE;
00844 
00845 error_free_reply:
00846         osync_message_unref(reply);
00847         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00848         return FALSE;
00849 }
00850 
00851 osync_bool osync_client_finalize(OSyncClient *client, OSyncError **error)
00852 {
00853         osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, client, error);
00854 
00855         OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_FINALIZE, 0, error);
00856         if (!message)
00857                 goto error;
00858 
00859         if (!osync_queue_send_message(client->commands_to_osplugin, NULL, message, error))
00860                 goto error_free_message;
00861         
00862         osync_message_unref(message);
00863 
00864         if (client->child_pid) {
00865                 int status;
00866                 if (waitpid(client->child_pid, &status, 0) == -1) {
00867                         osync_error_set(error, OSYNC_ERROR_GENERIC, "Error waiting for osplugin process: %s", strerror(errno));
00868                         goto error;
00869                 }
00870 
00871                 if (!WIFEXITED(status))
00872                         osync_trace(TRACE_INTERNAL, "Child has exited abnormally");
00873                 else if (WEXITSTATUS(status) != 0)
00874                         osync_trace(TRACE_INTERNAL, "Child has returned non-zero exit status (%d)", WEXITSTATUS(status));
00875 
00876                 if (!osync_client_remove_pidfile(client, error))
00877                         goto error;
00878         }
00879 
00880         osync_queue_disconnect(client->commands_to_osplugin, NULL);
00881 
00882 
00883         osync_trace(TRACE_EXIT, "%s", __func__);
00884     return TRUE;
00885 
00886 error_free_message:
00887         osync_message_unref(message);
00888 error:
00889         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00890         return FALSE;
00891 }

Generated on Sat Aug 13 2011 for OpenSync by  doxygen 1.7.1