• Skip to content
  • Skip to link menu
KDE 4.5 API Reference
  • KDE API Reference
  • KDE-PIM Libraries
  • Sitemap
  • Contact Us
 

akonadi

resourcescheduler.cpp

00001 /*
00002     Copyright (c) 2007 Volker Krause <vkrause@kde.org>
00003 
00004     This library is free software; you can redistribute it and/or modify it
00005     under the terms of the GNU Library General Public License as published by
00006     the Free Software Foundation; either version 2 of the License, or (at your
00007     option) any later version.
00008 
00009     This library is distributed in the hope that it will be useful, but WITHOUT
00010     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00011     FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
00012     License for more details.
00013 
00014     You should have received a copy of the GNU Library General Public License
00015     along with this library; see the file COPYING.LIB.  If not, write to the
00016     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00017     02110-1301, USA.
00018 */
00019 
00020 #include "resourcescheduler_p.h"
00021 
00022 #include <kdebug.h>
00023 #include <klocale.h>
00024 
00025 #include <QtCore/QTimer>
00026 #include <QtDBus/QDBusInterface>
00027 #include <QtDBus/QDBusConnectionInterface>
00028 #include <boost/graph/graph_concepts.hpp>
00029 
00030 using namespace Akonadi;
00031 
00032 qint64 ResourceScheduler::Task::latestSerial = 0;
00033 static QDBusAbstractInterface *s_resourcetracker = 0;
00034 
00035 //@cond PRIVATE
00036 
00037 ResourceScheduler::ResourceScheduler( QObject *parent ) :
00038     QObject( parent ),
00039     mCurrentTasksQueue( -1 ),
00040     mOnline( false )
00041 {
00042 }
00043 
00044 void ResourceScheduler::scheduleFullSync()
00045 {
00046   Task t;
00047   t.type = SyncAll;
00048   TaskList& queue = queueForTaskType( t.type );
00049   if ( queue.contains( t ) || mCurrentTask == t )
00050     return;
00051   queue << t;
00052   signalTaskToTracker( t, "SyncAll" );
00053   scheduleNext();
00054 }
00055 
00056 void ResourceScheduler::scheduleCollectionTreeSync()
00057 {
00058   Task t;
00059   t.type = SyncCollectionTree;
00060   TaskList& queue = queueForTaskType( t.type );
00061   if ( queue.contains( t ) || mCurrentTask == t )
00062     return;
00063   queue << t;
00064   signalTaskToTracker( t, "SyncCollectionTree" );
00065   scheduleNext();
00066 }
00067 
00068 void ResourceScheduler::scheduleSync(const Collection & col)
00069 {
00070   Task t;
00071   t.type = SyncCollection;
00072   t.collection = col;
00073   TaskList& queue = queueForTaskType( t.type );
00074   if ( queue.contains( t ) || mCurrentTask == t )
00075     return;
00076   queue << t;
00077   signalTaskToTracker( t, "SyncCollection" );
00078   scheduleNext();
00079 }
00080 
00081 void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet<QByteArray> &parts, const QDBusMessage & msg)
00082 {
00083   Task t;
00084   t.type = FetchItem;
00085   t.item = item;
00086   t.itemParts = parts;
00087 
00088   // if the current task does already fetch the requested item, break here but
00089   // keep the dbus message, so we can send the reply later on
00090   if ( mCurrentTask == t ) {
00091     mCurrentTask.dbusMsgs << msg;
00092     return;
00093   }
00094 
00095   // If this task is already in the queue, merge with it.
00096   TaskList& queue = queueForTaskType( t.type );
00097   const int idx = queue.indexOf( t );
00098   if ( idx != -1 ) {
00099     queue[ idx ].dbusMsgs << msg;
00100     return;
00101   }
00102 
00103   t.dbusMsgs << msg;
00104   queue << t;
00105   signalTaskToTracker( t, "FetchItem" );
00106   scheduleNext();
00107 }
00108 
00109 void ResourceScheduler::scheduleResourceCollectionDeletion()
00110 {
00111   Task t;
00112   t.type = DeleteResourceCollection;
00113   TaskList& queue = queueForTaskType( t.type );
00114   if ( queue.contains( t ) || mCurrentTask == t )
00115     return;
00116   queue << t;
00117   signalTaskToTracker( t, "DeleteResourceCollection" );
00118   scheduleNext();
00119 }
00120 
00121 void ResourceScheduler::scheduleChangeReplay()
00122 {
00123   Task t;
00124   t.type = ChangeReplay;
00125   TaskList& queue = queueForTaskType( t.type );
00126   // see ResourceBase::changeProcessed() for why we do not check for mCurrentTask == t here like in the other tasks
00127   if ( queue.contains( t ) )
00128     return;
00129   queue << t;
00130   signalTaskToTracker( t, "ChangeReplay" );
00131   scheduleNext();
00132 }
00133 
00134 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
00135 {
00136   Task t;
00137   t.type = SyncAllDone;
00138   TaskList& queue = queueForTaskType( t.type );
00139   // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost
00140   queue << t;
00141   signalTaskToTracker( t, "SyncAllDone" );
00142   scheduleNext();
00143 }
00144 
00145 void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver, const char* methodName, const QVariant &argument, ResourceBase::SchedulePriority priority )
00146 {
00147   Task t;
00148   t.type = Custom;
00149   t.receiver = receiver;
00150   t.methodName = methodName;
00151   t.argument = argument;
00152   QueueType queueType = GenericTaskQueue;
00153   if ( priority == ResourceBase::AfterChangeReplay )
00154     queueType = AfterChangeReplayQueue;
00155   else if ( priority == ResourceBase::Prepend )
00156     queueType = PrependTaskQueue;
00157   TaskList& queue = mTaskList[ queueType ];
00158 
00159   if ( queue.contains( t ) )
00160     return;
00161 
00162   switch (priority) {
00163   case ResourceBase::Prepend:
00164     queue.prepend( t );
00165     break;
00166   default:
00167     queue.append(t);
00168     break;
00169   }
00170 
00171   signalTaskToTracker( t, "Custom-" + t.methodName );
00172   scheduleNext();
00173 }
00174 
00175 void ResourceScheduler::taskDone()
00176 {
00177   if ( isEmpty() )
00178     emit status( AgentBase::Idle, i18nc( "@info:status Application ready for work", "Ready" ) );
00179 
00180   if ( s_resourcetracker ) {
00181     QList<QVariant> argumentList;
00182     argumentList << QString::number( mCurrentTask.serial )
00183                  << QString();
00184     s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
00185   }
00186 
00187   mCurrentTask = Task();
00188   mCurrentTasksQueue = -1;
00189   scheduleNext();
00190 }
00191 
00192 void ResourceScheduler::deferTask()
00193 {
00194   if ( mCurrentTask.type == Invalid )
00195       return;
00196 
00197   if ( s_resourcetracker ) {
00198     QList<QVariant> argumentList;
00199     argumentList << QString::number( mCurrentTask.serial )
00200                  << QString();
00201     s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
00202   }
00203 
00204   Task t = mCurrentTask;
00205   mCurrentTask = Task();
00206 
00207   Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount );
00208   mTaskList[mCurrentTasksQueue].prepend( t );
00209   mCurrentTasksQueue = -1;
00210 
00211   signalTaskToTracker( t, "DeferedTask" );
00212 
00213   scheduleNext();
00214 }
00215 
00216 bool ResourceScheduler::isEmpty()
00217 {
00218   for ( int i = 0; i < NQueueCount; ++i ) {
00219     if ( !mTaskList[i].isEmpty() )
00220       return false;
00221   }
00222   return true;
00223 }
00224 
00225 void ResourceScheduler::scheduleNext()
00226 {
00227   if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline )
00228     return;
00229   QTimer::singleShot( 0, this, SLOT( executeNext() ) );
00230 }
00231 
00232 void ResourceScheduler::executeNext()
00233 {
00234   if ( mCurrentTask.type != Invalid || isEmpty() )
00235     return;
00236 
00237   for ( int i = 0; i < NQueueCount; ++i ) {
00238     if ( !mTaskList[ i ].isEmpty() ) {
00239       mCurrentTask = mTaskList[ i ].takeFirst();
00240       mCurrentTasksQueue = i;
00241       break;
00242     }
00243   }
00244 
00245   if ( s_resourcetracker ) {
00246     QList<QVariant> argumentList;
00247     argumentList << QString::number( mCurrentTask.serial );
00248     s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobStarted" ), argumentList);
00249   }
00250 
00251   switch ( mCurrentTask.type ) {
00252     case SyncAll:
00253       emit executeFullSync();
00254       break;
00255     case SyncCollectionTree:
00256       emit executeCollectionTreeSync();
00257       break;
00258     case SyncCollection:
00259       emit executeCollectionSync( mCurrentTask.collection );
00260       break;
00261     case FetchItem:
00262       emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts );
00263       break;
00264     case DeleteResourceCollection:
00265       emit executeResourceCollectionDeletion();
00266       break;
00267     case ChangeReplay:
00268       emit executeChangeReplay();
00269       break;
00270     case SyncAllDone:
00271       emit fullSyncComplete();
00272       break;
00273     case Custom:
00274     {
00275       bool success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) );
00276       if ( !success )
00277         success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName );
00278 
00279       if ( !success )
00280         kError() << "Could not invoke slot" << mCurrentTask.methodName << "on" << mCurrentTask.receiver << "with argument" << mCurrentTask.argument;
00281       break;
00282     }
00283     default: {
00284       kError() << "Unhandled task type" << mCurrentTask.type;
00285       dump();
00286       Q_ASSERT( false );
00287     }
00288   }
00289 }
00290 
00291 ResourceScheduler::Task ResourceScheduler::currentTask() const
00292 {
00293   return mCurrentTask;
00294 }
00295 
00296 void ResourceScheduler::setOnline(bool state)
00297 {
00298   if ( mOnline == state )
00299     return;
00300   mOnline = state;
00301   if ( mOnline ) {
00302     scheduleNext();
00303   } else {
00304     if ( mCurrentTask.type != Invalid ) {
00305       // abort running task
00306       queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask );
00307       mCurrentTask = Task();
00308       mCurrentTasksQueue = -1;
00309     }
00310     // abort pending synchronous tasks, might take longer until the resource goes online again
00311     TaskList& itemFetchQueue = queueForTaskType( FetchItem );
00312     for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) {
00313       if ( (*it).type == FetchItem ) {
00314         (*it).sendDBusReplies( false );
00315         it = itemFetchQueue.erase( it );
00316         if ( s_resourcetracker ) {
00317           QList<QVariant> argumentList;
00318           argumentList << QString::number( mCurrentTask.serial )
00319                        << QLatin1String( "Job canceled." );
00320           s_resourcetracker->asyncCallWithArgumentList( QLatin1String( "jobEnded" ), argumentList );
00321         }
00322       } else {
00323         ++it;
00324       }
00325     }
00326   }
00327 }
00328 
00329 void ResourceScheduler::signalTaskToTracker( const Task &task, const QByteArray &taskType )
00330 {
00331   // if there's a job tracer running, tell it about the new job
00332   if ( !s_resourcetracker && QDBusConnection::sessionBus().interface()->isServiceRegistered(QLatin1String( "org.kde.akonadiconsole" ) ) ) {
00333     s_resourcetracker = new QDBusInterface( QLatin1String( "org.kde.akonadiconsole" ),
00334                                        QLatin1String( "/resourcesJobtracker" ),
00335                                        QLatin1String( "org.freedesktop.Akonadi.JobTracker" ),
00336                                        QDBusConnection::sessionBus(), 0 );
00337   }
00338 
00339   if ( s_resourcetracker ) {
00340     QList<QVariant> argumentList;
00341     argumentList << static_cast<AgentBase*>(  parent() )->identifier()
00342                  << QString::number( task.serial )
00343                  << QString()
00344                  << QString::fromLatin1( taskType );
00345     s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobCreated" ), argumentList);
00346   }
00347 }
00348 
00349 void ResourceScheduler::collectionRemoved( const Akonadi::Collection &collection )
00350 {
00351   if ( !collection.isValid() ) // should not happen, but you never know...
00352     return;
00353   TaskList& queue = queueForTaskType( SyncCollection );
00354   for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) {
00355     if ( (*it).type == SyncCollection && (*it).collection == collection ) {
00356       it = queue.erase( it );
00357       kDebug() << " erasing";
00358     } else
00359       ++it;
00360   }
00361 }
00362 
00363 void ResourceScheduler::Task::sendDBusReplies( bool success )
00364 {
00365   Q_FOREACH( const QDBusMessage &msg, dbusMsgs ) {
00366     QDBusMessage reply( msg );
00367     reply << success;
00368     QDBusConnection::sessionBus().send( reply );
00369   }
00370 }
00371 
00372 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type )
00373 {
00374   switch( type ) {
00375   case ChangeReplay:
00376     return ChangeReplayQueue;
00377   case FetchItem:
00378     return ItemFetchQueue;
00379   default:
00380     return GenericTaskQueue;
00381   }
00382 }
00383 
00384 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type )
00385 {
00386   const QueueType qt = queueTypeForTaskType( type );
00387   return mTaskList[ qt ];
00388 }
00389 
00390 void ResourceScheduler::dump()
00391 {
00392   kDebug() << "ResourceScheduler: Online:" << mOnline;
00393   kDebug() << " current task:" << mCurrentTask;
00394   for ( int i = 0; i < NQueueCount; ++i ) {
00395     const TaskList& queue = mTaskList[i];
00396     kDebug() << " queue" << i << queue.size() << "tasks:";
00397     for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) {
00398       kDebug() << "  " << (*it);
00399     }
00400   }
00401 }
00402 
00403 void ResourceScheduler::clear()
00404 {
00405   kDebug() << "Clearing ResourceScheduler queues:";
00406   for ( int i = 0; i < NQueueCount; ++i ) {
00407     TaskList& queue = mTaskList[i];
00408     queue.clear();
00409   }
00410   mCurrentTask = Task();
00411   mCurrentTasksQueue = -1;
00412 }
00413 
00414 static const char s_taskTypes[][25] = {
00415       "Invalid",
00416       "SyncAll",
00417       "SyncCollectionTree",
00418       "SyncCollection",
00419       "FetchItem",
00420       "ChangeReplay",
00421       "DeleteResourceCollection",
00422       "SyncAllDone",
00423       "Custom"
00424 };
00425 
00426 QDebug Akonadi::operator<<( QDebug d, const ResourceScheduler::Task& task )
00427 {
00428   d << task.serial << s_taskTypes[task.type];
00429   if ( task.type != ResourceScheduler::Invalid ) {
00430     if ( task.collection.id() != -1 )
00431       d << "collection" << task.collection.id();
00432     if ( task.item.id() != -1 )
00433       d << "item" << task.item.id();
00434     if ( !task.methodName.isEmpty() )
00435       d << task.methodName << task.argument;
00436   }
00437   return d;
00438 }
00439 
00440 //@endcond
00441 
00442 #include "resourcescheduler_p.moc"

akonadi

Skip menu "akonadi"
  • Main Page
  • Modules
  • Namespace List
  • Class Hierarchy
  • Alphabetical List
  • Class List
  • File List
  • Namespace Members
  • Class Members
  • Related Pages

KDE-PIM Libraries

Skip menu "KDE-PIM Libraries"
  • akonadi
  •   contact
  •   kmime
  • kabc
  • kblog
  • kcal
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  •   richtextbuilders
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Generated for KDE-PIM Libraries by doxygen 1.7.1
This website is maintained by Adriaan de Groot and Allen Winter.
KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal