00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "engine.h"
00022
00023 #include <errno.h>
00024 #include <sys/stat.h>
00025 #include <sys/types.h>
00026
00027 #include <glib.h>
00028
00029 #include <opensync/opensync_support.h>
00030 #include "opensync/opensync_message_internals.h"
00031 #include "opensync/opensync_queue_internals.h"
00032 #include "opensync/opensync_format_internals.h"
00033
00034 #include "engine_internals.h"
00035 #include <opensync/opensync_user_internals.h>
00036
00037 OSyncMappingEntry *osengine_mappingtable_find_entry(OSyncMappingTable *table, const char *uid, const char *objtype, long long int memberid);
00056
00057 void _new_change_receiver(OSyncEngine *engine, OSyncClient *client, OSyncChange *change)
00058 {
00059 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, engine, client, change);
00060
00061 OSyncError *error = NULL;
00062 OSyncChangeType change_type = osync_change_get_changetype(change);
00063 OSyncFormatEnv *format_env = osync_group_get_format_env(engine->group);
00064 OSyncObjType *objtype = osync_change_get_objtype(change);
00065 const char* uid = osync_change_get_uid(change);
00066 OSyncObjFormat *objformat = osync_change_get_objformat(change);
00067
00068 osync_change_set_member(change, client->member);
00069
00070 osync_trace(TRACE_INTERNAL, "Handling new change with uid %s, changetype %i, objtype %s and format %s from member %lli", uid, change_type,
00071 objtype ? osync_objtype_get_name(objtype) : "None", osync_change_get_objformat(change) ? osync_objformat_get_name(osync_change_get_objformat(change)) : "None",
00072 osync_member_get_id(client->member));
00073
00074
00081 if ( (change_type != CHANGE_DELETED) &&
00082 (osync_change_has_data(change))) {
00083 osync_bool is_file_objformat = FALSE;
00084 if(objformat)
00085 is_file_objformat =
00086 ((!strcmp(objformat->name, "file"))?(TRUE):(FALSE));
00087 if ( (!objtype) || (!objformat) ||
00088 (!strcmp(osync_objtype_get_name(objtype), "data")) ||
00089 (!strcmp(objformat->name, "plain"))) {
00090 OSyncObjType *objtype_test = osync_change_detect_objtype_full(format_env, change, &error);
00091 objtype = (objtype_test)?(objtype_test):(objtype);
00092 }
00093 if (objtype) {
00094 osync_trace(TRACE_INTERNAL, "Detected the object to be of type %s", osync_objtype_get_name(objtype));
00095
00096 osync_change_set_objtype(change, objtype);
00097
00102 if ( ( (osync_group_get_slow_sync(engine->group,
00103 osync_objtype_get_name(objtype))) ||
00104 ( (!is_file_objformat) &&
00105 (!osengine_mappingtable_find_entry(
00106 engine->maptable, uid,
00107 osync_objtype_get_name(objtype),
00108 osync_member_get_id(client->member))) )
00109 ) && (change_type == CHANGE_MODIFIED) ){
00110 osync_change_set_changetype(change, CHANGE_ADDED);
00111 change_type = osync_change_get_changetype(change);
00112 }
00113 }
00114 } else
00115 if (change_type == CHANGE_DELETED){
00121 if ( !objtype ||
00122 (( !strcmp(osync_objtype_get_name(objtype), "data") ) &&
00123 ( !osengine_mappingtable_find_entry(
00124 engine->maptable, uid,
00125 osync_objtype_get_name(objtype),
00126 osync_member_get_id(client->member)) )) ){
00127
00128 OSyncMappingEntry *entry =
00129 osengine_mappingtable_find_entry(
00130 engine->maptable, uid, NULL,
00131 osync_member_get_id(client->member)
00132 );
00133 if (entry) {
00134 osync_change_set_objtype(change,
00135 osync_change_get_objtype(
00136 entry->change));
00137 objtype=osync_change_get_objtype(change);
00138 } else {
00139 osync_error_set(&error, OSYNC_ERROR_GENERIC,
00140 "Could not find one entry with UID=%s to delete.", uid);
00141 goto error;
00142 }
00143 }
00144 } else {
00145 osync_trace(TRACE_INTERNAL, "Change has no data!");
00146 }
00147
00148 osync_trace(TRACE_INTERNAL, "Handling new change with uid %s, changetype %i, data %p, size %i, objtype %s and format %s from member %lli", uid, change_type, osync_change_get_data(change), osync_change_get_datasize(change), objtype ? osync_objtype_get_name(objtype) : "None", osync_change_get_objformat(change) ? osync_objformat_get_name(osync_change_get_objformat(change)) : "None", osync_member_get_id(client->member));
00149
00150 if (!objtype){
00151 osync_error_set(&error, OSYNC_ERROR_GENERIC,
00152 "ObjType not set for uid %s.", uid);
00153 goto error;
00154 }
00155
00156
00157 OSyncMappingEntry *entry = osengine_mappingtable_store_change(engine->maptable, change);
00158 change = entry->change;
00159 if (!osync_change_save(change, TRUE, &error)) {
00160 osync_error_duplicate(&engine->error, &error);
00161 osync_status_update_change(engine, change, CHANGE_RECV_ERROR, &error);
00162 osync_error_update(&engine->error, "Unable to receive one or more objects");
00163 osync_flag_unset(entry->fl_has_data);
00164 goto error;
00165 }
00166
00167 osync_group_remove_changelog(engine->group, change, &error);
00168
00169
00170 osync_change_convert_to_common(change, NULL);
00171
00172 if (!entry->mapping) {
00173 osync_flag_attach(entry->fl_mapped, engine->cmb_entries_mapped);
00174 osync_flag_unset(entry->fl_mapped);
00175 osync_debug("ENG", 3, "+It has no mapping");
00176 } else {
00177 osync_debug("ENG", 3, "+It has mapping");
00178 osync_flag_set(entry->fl_mapped);
00179 osync_flag_unset(entry->mapping->fl_solved);
00180 osync_flag_unset(entry->mapping->fl_chkconflict);
00181 osync_flag_unset(entry->mapping->fl_multiplied);
00182 }
00183
00184 if (osync_change_has_data(change)) {
00185 osync_debug("ENG", 3, "+It has data");
00186 osync_flag_set(entry->fl_has_data);
00187 osync_status_update_change(engine, change, CHANGE_RECEIVED, NULL);
00188 } else {
00189 osync_debug("ENG", 3, "+It has no data");
00190 osync_flag_unset(entry->fl_has_data);
00191 osync_status_update_change(engine, change, CHANGE_RECEIVED_INFO, NULL);
00192 }
00193
00194 if (osync_change_get_changetype(change) == CHANGE_DELETED)
00195 osync_flag_set(entry->fl_deleted);
00196
00197 osync_flag_set(entry->fl_has_info);
00198 osync_flag_unset(entry->fl_synced);
00199
00200 osengine_mappingentry_decider(engine, entry);
00201
00202 osync_trace(TRACE_EXIT, "%s", __func__);
00203 return;
00204
00205 error:
00206 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(&error));
00207 osync_error_free(&error);
00208 return;
00209 }
00210
00211 OSyncClient *osengine_get_client(OSyncEngine *engine, long long int memberId)
00212 {
00213 GList *c = NULL;
00214 for (c = engine->clients; c; c = c->next) {
00215 OSyncClient *client = c->data;
00216 if (osync_member_get_id(client->member) == memberId)
00217 return client;
00218 }
00219 return NULL;
00220 }
00221
00222
00223 void send_engine_changed(OSyncEngine *engine)
00224 {
00225 if (!engine->is_initialized)
00226 return;
00227
00228 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_ENGINE_CHANGED, 0, NULL);
00229
00230
00231 osync_debug("ENG", 4, "Sending message %p:\"ENGINE_CHANGED\"", message);
00232 osync_queue_send_message(engine->commands_to_self, NULL, message, NULL);
00233 }
00234
00235 void send_mapping_changed(OSyncEngine *engine, OSyncMapping *mapping)
00236 {
00237 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_MAPPING_CHANGED, sizeof(long long), NULL);
00238 osync_message_write_long_long_int(message, mapping->id);
00239
00240
00241 osync_queue_send_message(engine->commands_to_self, NULL, message, NULL);
00242
00243 }
00244
00245 void send_mappingentry_changed(OSyncEngine *engine, OSyncMappingEntry *entry)
00246 {
00247 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_MAPPINGENTRY_CHANGED, sizeof(long long)*2, NULL);
00248
00249
00250 long long ptr = (long long)(long)entry;
00251 osync_message_write_long_long_int(message, ptr);
00252
00253
00254 osync_queue_send_message(engine->commands_to_self, NULL, message, NULL);
00255
00256 }
00257
00265 static void engine_message_handler(OSyncMessage *message, OSyncEngine *engine)
00266 {
00267 osync_trace(TRACE_ENTRY, "engine_message_handler(%p:%lli-%i, %p)", message, message->id1, message->id2, engine);
00268
00269 OSyncChange *change = NULL;
00270
00271 osync_trace(TRACE_INTERNAL, "engine received command %i", osync_message_get_command(message));
00272
00273 switch (osync_message_get_command(message)) {
00274 case OSYNC_MESSAGE_SYNCHRONIZE:
00275 osync_trace(TRACE_INTERNAL, "all deciders");
00276 osengine_client_all_deciders(engine);
00277 break;
00278 case OSYNC_MESSAGE_NEW_CHANGE:
00279 osync_demarshal_change(message, osync_group_get_format_env(engine->group), &change);
00280
00281 long long int member_id = 0;
00282 osync_message_read_long_long_int(message, &member_id);
00283 OSyncClient *sender = osengine_get_client(engine, member_id);
00284
00285 _new_change_receiver(engine, sender, change);
00286 break;
00287 case OSYNC_MESSAGE_ENGINE_CHANGED:
00288 osengine_client_all_deciders(engine);
00289 osengine_mapping_all_deciders(engine);
00290 GList *u;
00291 for (u = engine->maptable->unmapped; u; u = u->next) {
00292 OSyncMappingEntry *unmapped = u->data;
00293 send_mappingentry_changed(engine, unmapped);
00294 }
00295 break;
00296 case OSYNC_MESSAGE_MAPPING_CHANGED:
00297 {
00298 long long id;
00299 osync_message_read_long_long_int(message, &id);
00300
00301 OSyncMapping *mapping = osengine_mappingtable_mapping_from_id(engine->maptable, id);
00302
00303 if (!g_list_find(engine->maptable->mappings, mapping)) {
00304 osync_trace(TRACE_EXIT, "%s: Mapping %p is dead", __func__, mapping);
00305 return;
00306 }
00307
00308 osengine_mapping_decider(engine, mapping);
00309 }
00310 break;
00311 case OSYNC_MESSAGE_MAPPINGENTRY_CHANGED:
00312 {
00313 long long ptr;
00314 osync_message_read_long_long_int(message, &ptr);
00315 OSyncMappingEntry *entry = (OSyncMappingEntry*)(long)ptr;
00316
00317 if (!g_list_find(engine->maptable->entries, entry) && !g_list_find(engine->maptable->unmapped, entry)) {
00318 osync_trace(TRACE_EXIT, "%s: Entry %p is dead", __func__, entry);
00319 return;
00320 }
00321
00322 osengine_mappingentry_decider(engine, entry);
00323 }
00324 break;
00325 case OSYNC_MESSAGE_SYNC_ALERT:
00326 if (engine->allow_sync_alert)
00327 osync_flag_set(engine->fl_running);
00328 else
00329 osync_trace(TRACE_INTERNAL, "Sync Alert not allowed");
00330 break;
00331
00332 default:
00333 break;
00334 }
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357
00358
00359
00360
00361
00362
00363 osync_trace(TRACE_EXIT, "%s", __func__);
00364 }
00365
00366 static void trigger_clients_sent_changes(OSyncEngine *engine)
00367 {
00368 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00369 osync_status_update_engine(engine, ENG_ENDPHASE_READ, NULL);
00370
00371 g_mutex_lock(engine->info_received_mutex);
00372 g_cond_signal(engine->info_received);
00373 g_mutex_unlock(engine->info_received_mutex);
00374
00375
00376 osengine_mappingtable_inject_changes(engine->maptable);
00377
00378 send_engine_changed(engine);
00379 osync_trace(TRACE_EXIT, "%s", __func__);
00380 }
00381
00382 static void trigger_clients_read_all(OSyncEngine *engine)
00383 {
00384 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00385
00386 send_engine_changed(engine);
00387 osync_trace(TRACE_EXIT, "%s", __func__);
00388 }
00389
00390 static void trigger_status_end_conflicts(OSyncEngine *engine)
00391 {
00392 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00393 osync_status_update_engine(engine, ENG_END_CONFLICTS, NULL);
00394
00395 osync_trace(TRACE_EXIT, "%s", __func__);
00396 }
00397
00398 static void trigger_clients_connected(OSyncEngine *engine)
00399 {
00400 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00401 osync_status_update_engine(engine, ENG_ENDPHASE_CON, NULL);
00402 osengine_client_all_deciders(engine);
00403
00404 osync_trace(TRACE_EXIT, "%s", __func__);
00405 }
00406
00407 static void trigger_clients_comitted_all(OSyncEngine *engine)
00408 {
00409 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00410 osync_status_update_engine(engine, ENG_ENDPHASE_WRITE, NULL);
00411
00412 osync_trace(TRACE_EXIT, "%s", __func__);
00413 }
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448 static gboolean startupfunc(gpointer data)
00449 {
00450 OSyncEngine *engine = data;
00451 osync_trace(TRACE_INTERNAL, "+++++++++ This is the engine of group \"%s\" +++++++++", osync_group_get_name(engine->group));
00452
00453 OSyncError *error = NULL;
00454 if (!osengine_mappingtable_load(engine->maptable, &error)) {
00455 osync_error_duplicate(&engine->error, &error);
00456 osync_status_update_engine(engine, ENG_ERROR, &error);
00457 osync_error_update(&engine->error, "Unable to connect one of the members");
00458 osync_flag_set(engine->fl_stop);
00459 }
00460
00461 g_mutex_lock(engine->started_mutex);
00462 g_cond_signal(engine->started);
00463 g_mutex_unlock(engine->started_mutex);
00464 return FALSE;
00465 }
00466
00478
00489 osync_bool osengine_reset(OSyncEngine *engine, OSyncError **error)
00490 {
00491
00492 osync_trace(TRACE_ENTRY, "osengine_reset(%p, %p)", engine, error);
00493 GList *c = NULL;
00494 for (c = engine->clients; c; c = c->next) {
00495 OSyncClient *client = c->data;
00496 osync_client_reset(client);
00497 }
00498
00499 osync_flag_set_state(engine->fl_running, FALSE);
00500 osync_flag_set_state(engine->fl_stop, FALSE);
00501 osync_flag_set_state(engine->cmb_sent_changes, FALSE);
00502 osync_flag_set_state(engine->cmb_entries_mapped, TRUE);
00503 osync_flag_set_state(engine->cmb_synced, TRUE);
00504 osync_flag_set_state(engine->cmb_chkconflict, TRUE);
00505 osync_flag_set_state(engine->cmb_finished, FALSE);
00506 osync_flag_set_state(engine->cmb_connected, FALSE);
00507 osync_flag_set_state(engine->cmb_read_all, TRUE);
00508 osync_flag_set_state(engine->cmb_committed_all, TRUE);
00509 osync_flag_set_state(engine->cmb_committed_all_sent, FALSE);
00510
00511 osync_status_update_engine(engine, ENG_ENDPHASE_DISCON, NULL);
00512
00513 engine->committed_all_sent = FALSE;
00514
00515 osengine_mappingtable_reset(engine->maptable);
00516
00517 if (engine->error) {
00518
00519 OSyncError *newerror = NULL;
00520 osync_error_duplicate(&newerror, &engine->error);
00521 osync_status_update_engine(engine, ENG_ERROR, &newerror);
00522 osync_group_set_slow_sync(engine->group, "data", TRUE);
00523 } else {
00524 osync_status_update_engine(engine, ENG_SYNC_SUCCESSFULL, NULL);
00525 osync_group_reset_slow_sync(engine->group, "data");
00526 }
00527
00528 osync_trace(TRACE_INTERNAL, "engine error is %p", engine->error);
00529
00530 g_mutex_lock(engine->syncing_mutex);
00531 g_cond_signal(engine->syncing);
00532 g_mutex_unlock(engine->syncing_mutex);
00533
00534 osync_trace(TRACE_EXIT, "osengine_reset");
00535 return TRUE;
00536 }
00537
00538
00539
00540
00541
00542 static int __mkdir_with_parents(char *dir, int mode)
00543 {
00544 if (g_file_test(dir, G_FILE_TEST_IS_DIR))
00545 return 0;
00546
00547 char *slash = strrchr(dir, '/');
00548 if (slash && slash != dir) {
00549
00550
00551
00552
00553
00554
00555
00556 *slash = '\0';
00557 if (__mkdir_with_parents(dir, mode) < 0)
00558 return -1;
00559 *slash = '/';
00560 }
00561
00562 if (mkdir(dir, mode) < 0)
00563 return -1;
00564
00565 return 0;
00566 }
00567
00568 static int mkdir_with_parents(const char *dir, int mode)
00569 {
00570 int r;
00571 char *mydir = strdup(dir);
00572 if (!mydir)
00573 return -1;
00574
00575 r = __mkdir_with_parents(mydir, mode);
00576 free(mydir);
00577 return r;
00578 }
00579
00589 OSyncEngine *osengine_new(OSyncGroup *group, OSyncError **error)
00590 {
00591 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, group, error);
00592
00593 g_assert(group);
00594 OSyncEngine *engine = g_malloc0(sizeof(OSyncEngine));
00595 osync_group_set_data(group, engine);
00596
00597 if (!g_thread_supported ())
00598 g_thread_init (NULL);
00599
00600 engine->context = g_main_context_new();
00601 engine->syncloop = g_main_loop_new(engine->context, FALSE);
00602 engine->group = group;
00603
00604 OSyncUserInfo *user = osync_user_new(error);
00605 if (!user)
00606 goto error;
00607
00608 char *enginesdir = g_strdup_printf("%s/engines", osync_user_get_confdir(user));
00609 char *path = g_strdup_printf("%s/enginepipe", enginesdir);
00610
00611 if (mkdir_with_parents(enginesdir, 0755) < 0) {
00612 osync_error_set(error, OSYNC_ERROR_GENERIC, "Couldn't create engines directory: %s", strerror(errno));
00613 goto error_free_paths;
00614 }
00615
00616 engine->syncing_mutex = g_mutex_new();
00617 engine->info_received_mutex = g_mutex_new();
00618 engine->syncing = g_cond_new();
00619 engine->info_received = g_cond_new();
00620 engine->started_mutex = g_mutex_new();
00621 engine->started = g_cond_new();
00622
00623
00624 engine->fl_running = osync_flag_new(NULL);
00625 osync_flag_set_pos_trigger(engine->fl_running, (OSyncFlagTriggerFunc)osengine_client_all_deciders, engine, NULL);
00626
00627 engine->fl_sync = osync_flag_new(NULL);
00628 engine->fl_stop = osync_flag_new(NULL);
00629 osync_flag_set_pos_trigger(engine->fl_stop, (OSyncFlagTriggerFunc)osengine_client_all_deciders, engine, NULL);
00630
00631
00632 engine->cmb_sent_changes = osync_comb_flag_new(FALSE, FALSE);
00633 osync_flag_set_pos_trigger(engine->cmb_sent_changes, (OSyncFlagTriggerFunc)trigger_clients_sent_changes, engine, NULL);
00634
00635 engine->cmb_read_all = osync_comb_flag_new(FALSE, TRUE);
00636 osync_flag_set_pos_trigger(engine->cmb_read_all, (OSyncFlagTriggerFunc)trigger_clients_read_all, engine, NULL);
00637
00638 engine->cmb_entries_mapped = osync_comb_flag_new(FALSE, FALSE);
00639 osync_flag_set_pos_trigger(engine->cmb_entries_mapped, (OSyncFlagTriggerFunc)send_engine_changed, engine, NULL);
00640
00641
00642 engine->cmb_synced = osync_comb_flag_new(FALSE, TRUE);
00643 osync_flag_set_pos_trigger(engine->cmb_synced, (OSyncFlagTriggerFunc)send_engine_changed, engine, NULL);
00644
00645
00646 engine->cmb_finished = osync_comb_flag_new(FALSE, TRUE);
00647 osync_flag_set_pos_trigger(engine->cmb_finished, (OSyncFlagTriggerFunc)osengine_reset, engine, NULL);
00648
00649 engine->cmb_connected = osync_comb_flag_new(FALSE, FALSE);
00650 osync_flag_set_pos_trigger(engine->cmb_connected, (OSyncFlagTriggerFunc)trigger_clients_connected, engine, NULL);
00651
00652 engine->cmb_chkconflict = osync_comb_flag_new(FALSE, TRUE);
00653 osync_flag_set_pos_trigger(engine->cmb_chkconflict, (OSyncFlagTriggerFunc)trigger_status_end_conflicts, engine, NULL);
00654
00655 engine->cmb_multiplied = osync_comb_flag_new(FALSE, TRUE);
00656
00657 engine->cmb_committed_all = osync_comb_flag_new(FALSE, TRUE);
00658 osync_flag_set_pos_trigger(engine->cmb_committed_all, (OSyncFlagTriggerFunc)send_engine_changed, engine, NULL);
00659
00660
00661 engine->cmb_committed_all_sent = osync_comb_flag_new(FALSE, TRUE);
00662 osync_flag_set_pos_trigger(engine->cmb_committed_all_sent, (OSyncFlagTriggerFunc)trigger_clients_comitted_all, engine, NULL);
00663
00664 osync_flag_set(engine->fl_sync);
00665
00666 int i;
00667 for (i = 0; i < osync_group_num_members(group); i++) {
00668 OSyncMember *member = osync_group_nth_member(group, i);
00669 if (!osync_client_new(engine, member, error))
00670 goto error_free_paths;
00671 }
00672
00673 engine->maptable = osengine_mappingtable_new(engine);
00674
00675 osync_trace(TRACE_EXIT, "osengine_new: %p", engine);
00676 return engine;
00677
00678 error_free_paths:
00679 g_free(path);
00680 g_free(enginesdir);
00681 error:
00682 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00683 return NULL;
00684 }
00685
00693 void osengine_free(OSyncEngine *engine)
00694 {
00695 osync_trace(TRACE_ENTRY, "osengine_free(%p)", engine);
00696
00697 GList *c = NULL;
00698 for (c = engine->clients; c; c = c->next) {
00699 OSyncClient *client = c->data;
00700 osync_client_free(client);
00701 }
00702
00703 osengine_mappingtable_free(engine->maptable);
00704 engine->maptable = NULL;
00705
00706 osync_flag_free(engine->fl_running);
00707 osync_flag_free(engine->fl_sync);
00708 osync_flag_free(engine->fl_stop);
00709 osync_flag_free(engine->cmb_sent_changes);
00710 osync_flag_free(engine->cmb_entries_mapped);
00711 osync_flag_free(engine->cmb_synced);
00712 osync_flag_free(engine->cmb_chkconflict);
00713 osync_flag_free(engine->cmb_finished);
00714 osync_flag_free(engine->cmb_connected);
00715 osync_flag_free(engine->cmb_read_all);
00716 osync_flag_free(engine->cmb_multiplied);
00717 osync_flag_free(engine->cmb_committed_all);
00718 osync_flag_free(engine->cmb_committed_all_sent);
00719
00720 g_list_free(engine->clients);
00721 g_main_loop_unref(engine->syncloop);
00722
00723 g_main_context_unref(engine->context);
00724
00725 g_mutex_free(engine->syncing_mutex);
00726 g_mutex_free(engine->info_received_mutex);
00727 g_cond_free(engine->syncing);
00728 g_cond_free(engine->info_received);
00729 g_mutex_free(engine->started_mutex);
00730 g_cond_free(engine->started);
00731
00732 g_free(engine);
00733 osync_trace(TRACE_EXIT, "osengine_free");
00734 }
00735
00745 void osengine_set_conflict_callback(OSyncEngine *engine, void (* function) (OSyncEngine *, OSyncMapping *, void *), void *user_data)
00746 {
00747 engine->conflict_callback = function;
00748 engine->conflict_userdata = user_data;
00749 }
00750
00760 void osengine_set_changestatus_callback(OSyncEngine *engine, void (* function) (OSyncEngine *, OSyncChangeUpdate *, void *), void *user_data)
00761 {
00762 engine->changestat_callback = function;
00763 engine->changestat_userdata = user_data;
00764 }
00765
00775 void osengine_set_mappingstatus_callback(OSyncEngine *engine, void (* function) (OSyncMappingUpdate *, void *), void *user_data)
00776 {
00777 engine->mapstat_callback = function;
00778 engine->mapstat_userdata = user_data;
00779 }
00780
00790 void osengine_set_enginestatus_callback(OSyncEngine *engine, void (* function) (OSyncEngine *, OSyncEngineUpdate *, void *), void *user_data)
00791 {
00792 engine->engstat_callback = function;
00793 engine->engstat_userdata = user_data;
00794 }
00795
00805 void osengine_set_memberstatus_callback(OSyncEngine *engine, void (* function) (OSyncMemberUpdate *, void *), void *user_data)
00806 {
00807 engine->mebstat_callback = function;
00808 engine->mebstat_userdata = user_data;
00809 }
00810
00820 void osengine_set_message_callback(OSyncEngine *engine, void *(* function) (OSyncEngine *, OSyncClient *, const char *, void *, void *), void *user_data)
00821 {
00822 engine->plgmsg_callback = function;
00823 engine->plgmsg_userdata = user_data;
00824 }
00825
00837 osync_bool osengine_init(OSyncEngine *engine, OSyncError **error)
00838 {
00839 osync_trace(TRACE_ENTRY, "osengine_init(%p, %p)", engine, error);
00840
00841 if (engine->is_initialized) {
00842 osync_error_set(error, OSYNC_ERROR_MISCONFIGURATION, "This engine was already initialized");
00843 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00844 return FALSE;
00845 }
00846
00847 switch (osync_group_lock(engine->group)) {
00848 case OSYNC_LOCKED:
00849 osync_error_set(error, OSYNC_ERROR_LOCKED, "Group is locked");
00850 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00851 return FALSE;
00852 case OSYNC_LOCK_STALE:
00853 osync_debug("ENG", 1, "Detected stale lock file. Slow-syncing");
00854 osync_status_update_engine(engine, ENG_PREV_UNCLEAN, NULL);
00855 osync_group_set_slow_sync(engine->group, "data", TRUE);
00856 break;
00857 default:
00858 break;
00859 }
00860
00861 osync_flag_set(engine->cmb_entries_mapped);
00862 osync_flag_set(engine->cmb_synced);
00863 engine->allow_sync_alert = TRUE;
00864
00865
00866 OSyncGroup *group = engine->group;
00867
00868 if (osync_group_num_members(group) < 2) {
00869
00870 osync_error_set(error, OSYNC_ERROR_MISCONFIGURATION, "You only configured %i members, but at least 2 are needed", osync_group_num_members(group));
00871 osync_group_unlock(engine->group, TRUE);
00872 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00873 return FALSE;
00874 }
00875
00876 engine->is_initialized = TRUE;
00877
00878 osync_trace(TRACE_INTERNAL, "Spawning clients");
00879 GList *c = NULL;
00880 for (c = engine->clients; c; c = c->next) {
00881 OSyncClient *client = c->data;
00882 osync_queue_create(client->commands_from_osplugin, NULL);
00883
00884 if (!osync_client_spawn(client, engine, error)) {
00885 osync_group_unlock(engine->group, TRUE);
00886 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00887 return FALSE;
00888 }
00889
00890 osync_queue_set_message_handler(client->commands_from_osplugin, (OSyncMessageHandler)engine_message_handler, engine);
00891 if (!(engine->man_dispatch))
00892 osync_queue_setup_with_gmainloop(client->commands_from_osplugin, engine->context);
00893 osync_trace(TRACE_INTERNAL, "opening client queue");
00894 if (!osync_queue_connect(client->commands_from_osplugin, OSYNC_QUEUE_RECEIVER, 0 )) {
00895 osync_group_unlock(engine->group, TRUE);
00896 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00897 return FALSE;
00898 }
00899 }
00900
00901 osync_trace(TRACE_INTERNAL, "opening engine queue");
00902 if (!osync_queue_new_pipes(&engine->commands_from_self, &engine->commands_to_self, error)) {
00903 osync_group_unlock(engine->group, TRUE);
00904 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00905 return FALSE;
00906 }
00907
00908 if (!osync_queue_connect(engine->commands_from_self, OSYNC_QUEUE_RECEIVER, 0 )) {
00909 osync_group_unlock(engine->group, TRUE);
00910 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00911 return FALSE;
00912 }
00913
00914 if (!osync_queue_connect(engine->commands_to_self, OSYNC_QUEUE_SENDER, 0 )) {
00915 osync_group_unlock(engine->group, TRUE);
00916 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00917 return FALSE;
00918 }
00919
00920 osync_queue_set_message_handler(engine->commands_from_self, (OSyncMessageHandler)engine_message_handler, engine);
00921 if (!(engine->man_dispatch))
00922 osync_queue_setup_with_gmainloop(engine->commands_from_self, engine->context);
00923
00924 osync_trace(TRACE_INTERNAL, "initializing clients");
00925 for (c = engine->clients; c; c = c->next) {
00926 OSyncClient *client = c->data;
00927 if (!osync_client_init(client, engine, error)) {
00928 osengine_finalize(engine);
00929 osync_group_unlock(engine->group, TRUE);
00930 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00931 return FALSE;
00932 }
00933 }
00934
00935 osync_debug("ENG", 3, "Running the main loop");
00936
00937
00938
00939 g_mutex_lock(engine->started_mutex);
00940 GSource *idle = g_idle_source_new();
00941 g_source_set_priority(idle, G_PRIORITY_HIGH);
00942 g_source_set_callback(idle, startupfunc, engine, NULL);
00943 g_source_attach(idle, engine->context);
00944 engine->thread = g_thread_create ((GThreadFunc)g_main_loop_run, engine->syncloop, TRUE, NULL);
00945 g_cond_wait(engine->started, engine->started_mutex);
00946 g_mutex_unlock(engine->started_mutex);
00947
00948 osync_trace(TRACE_EXIT, "osengine_init");
00949 return TRUE;
00950 }
00951
00960 void osengine_finalize(OSyncEngine *engine)
00961 {
00962
00963 osync_trace(TRACE_ENTRY, "osengine_finalize(%p)", engine);
00964
00965 if (!engine->is_initialized) {
00966 osync_trace(TRACE_EXIT_ERROR, "osengine_finalize: Not initialized");
00967 return;
00968 }
00969
00970 g_assert(engine);
00971 osync_debug("ENG", 3, "finalizing engine %p", engine);
00972
00973 if (engine->thread) {
00974 g_main_loop_quit(engine->syncloop);
00975 g_thread_join(engine->thread);
00976 }
00977
00978 GList *c = NULL;
00979 for (c = engine->clients; c; c = c->next) {
00980 OSyncClient *client = c->data;
00981 osync_queue_disconnect(client->commands_from_osplugin, NULL);
00982 osync_client_finalize(client, NULL);
00983 }
00984
00985 osync_queue_disconnect(engine->commands_from_self, NULL);
00986 osync_queue_disconnect(engine->commands_to_self, NULL);
00987
00988 osync_queue_free(engine->commands_from_self);
00989 engine->commands_from_self = NULL;
00990 osync_queue_free(engine->commands_to_self);
00991 engine->commands_to_self = NULL;
00992
00993 osengine_mappingtable_close(engine->maptable);
00994
00995 if (engine->error) {
00996
00997
00998
00999
01000
01001
01002 if (!osync_flag_is_set(engine->cmb_connected) && !engine->slowsync)
01003 osync_group_unlock(engine->group, TRUE);
01004 else
01005 osync_group_unlock(engine->group, FALSE);
01006 } else
01007 osync_group_unlock(engine->group, TRUE);
01008
01009 engine->is_initialized = FALSE;
01010 osync_trace(TRACE_EXIT, "osengine_finalize");
01011 }
01012
01023 osync_bool osengine_synchronize(OSyncEngine *engine, OSyncError **error)
01024 {
01025 osync_trace(TRACE_INTERNAL, "synchronize now");
01026 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
01027 g_assert(engine);
01028
01029 if (!engine->is_initialized) {
01030 osync_error_set(error, OSYNC_ERROR_GENERIC, "osengine_synchronize: Not initialized");
01031 goto error;
01032 }
01033
01034
01035
01036
01037 if (osync_group_get_slow_sync(engine->group, "data")) {
01038 engine->slowsync = TRUE;
01039 } else {
01040 engine->slowsync = FALSE;
01041 }
01042
01043 engine->wasted = 0;
01044 engine->alldeciders = 0;
01045
01046 osync_flag_set(engine->fl_running);
01047
01048 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_SYNCHRONIZE, 0, error);
01049 if (!message)
01050 goto error;
01051
01052 if (!osync_queue_send_message(engine->commands_to_self, NULL, message, error))
01053 goto error_free_message;
01054
01055 osync_message_unref(message);
01056
01057 osync_trace(TRACE_EXIT, "%s", __func__);
01058 return TRUE;
01059
01060 error_free_message:
01061 osync_message_unref(message);
01062 error:
01063 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
01064 return FALSE;
01065 }
01066
01074 void osengine_flag_only_info(OSyncEngine *engine)
01075 {
01076 osync_flag_unset(engine->fl_sync);
01077 }
01078
01086 void osengine_flag_manual(OSyncEngine *engine)
01087 {
01088 if (engine->syncloop) {
01089 g_warning("Unable to flag manual since engine is already initialized\n");
01090 }
01091 engine->man_dispatch = TRUE;
01092 }
01093
01100 void osengine_pause(OSyncEngine *engine)
01101 {
01102 osync_flag_unset(engine->fl_running);
01103 }
01104
01112 void osengine_abort(OSyncEngine *engine)
01113 {
01114 osync_flag_set(engine->fl_stop);
01115 }
01116
01123 void osengine_allow_sync_alert(OSyncEngine *engine)
01124 {
01125 engine->allow_sync_alert = TRUE;
01126 }
01127
01134 void osengine_deny_sync_alert(OSyncEngine *engine)
01135 {
01136 engine->allow_sync_alert = FALSE;
01137 }
01138
01149 osync_bool osengine_sync_and_block(OSyncEngine *engine, OSyncError **error)
01150 {
01151 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, engine, error);
01152
01153 g_mutex_lock(engine->syncing_mutex);
01154
01155 if (!osengine_synchronize(engine, error)) {
01156 g_mutex_unlock(engine->syncing_mutex);
01157 goto error;
01158 }
01159
01160 g_cond_wait(engine->syncing, engine->syncing_mutex);
01161 g_mutex_unlock(engine->syncing_mutex);
01162
01163 if (engine->error) {
01164 osync_error_duplicate(error, &(engine->error));
01165 goto error;
01166 }
01167
01168 osync_trace(TRACE_EXIT, "%s", __func__);
01169 return TRUE;
01170
01171 error:
01172 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
01173 return FALSE;
01174 }
01175
01186 osync_bool osengine_wait_sync_end(OSyncEngine *engine, OSyncError **error)
01187 {
01188 g_mutex_lock(engine->syncing_mutex);
01189 g_cond_wait(engine->syncing, engine->syncing_mutex);
01190 g_mutex_unlock(engine->syncing_mutex);
01191
01192 if (engine->error) {
01193 osync_error_duplicate(error, &(engine->error));
01194 return FALSE;
01195 }
01196 return TRUE;
01197 }
01198
01205 void osengine_wait_info_end(OSyncEngine *engine)
01206 {
01207 g_mutex_lock(engine->info_received_mutex);
01208 g_cond_wait(engine->info_received, engine->info_received_mutex);
01209 g_mutex_unlock(engine->info_received_mutex);
01210 }
01211
01216 void osengine_one_iteration(OSyncEngine *engine)
01217 {
01218
01219 abort();
01220 }
01221
01228 OSyncMapping *osengine_mapping_from_id(OSyncEngine *engine, long long int id)
01229 {
01230 return osengine_mappingtable_mapping_from_id(engine->maptable, id);
01231 }
01232