00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "resourcescheduler_p.h"
00021
00022 #include <kdebug.h>
00023 #include <klocale.h>
00024
00025 #include <QtCore/QTimer>
00026 #include <QtDBus/QDBusInterface>
00027 #include <QtDBus/QDBusConnectionInterface>
00028 #include <boost/graph/graph_concepts.hpp>
00029
00030 using namespace Akonadi;
00031
00032 qint64 ResourceScheduler::Task::latestSerial = 0;
00033 static QDBusAbstractInterface *s_resourcetracker = 0;
00034
00035
00036
00037 ResourceScheduler::ResourceScheduler( QObject *parent ) :
00038 QObject( parent ),
00039 mCurrentTasksQueue( -1 ),
00040 mOnline( false )
00041 {
00042 }
00043
00044 void ResourceScheduler::scheduleFullSync()
00045 {
00046 Task t;
00047 t.type = SyncAll;
00048 TaskList& queue = queueForTaskType( t.type );
00049 if ( queue.contains( t ) || mCurrentTask == t )
00050 return;
00051 queue << t;
00052 signalTaskToTracker( t, "SyncAll" );
00053 scheduleNext();
00054 }
00055
00056 void ResourceScheduler::scheduleCollectionTreeSync()
00057 {
00058 Task t;
00059 t.type = SyncCollectionTree;
00060 TaskList& queue = queueForTaskType( t.type );
00061 if ( queue.contains( t ) || mCurrentTask == t )
00062 return;
00063 queue << t;
00064 signalTaskToTracker( t, "SyncCollectionTree" );
00065 scheduleNext();
00066 }
00067
00068 void ResourceScheduler::scheduleSync(const Collection & col)
00069 {
00070 Task t;
00071 t.type = SyncCollection;
00072 t.collection = col;
00073 TaskList& queue = queueForTaskType( t.type );
00074 if ( queue.contains( t ) || mCurrentTask == t )
00075 return;
00076 queue << t;
00077 signalTaskToTracker( t, "SyncCollection" );
00078 scheduleNext();
00079 }
00080
00081 void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet<QByteArray> &parts, const QDBusMessage & msg)
00082 {
00083 Task t;
00084 t.type = FetchItem;
00085 t.item = item;
00086 t.itemParts = parts;
00087
00088
00089
00090 if ( mCurrentTask == t ) {
00091 mCurrentTask.dbusMsgs << msg;
00092 return;
00093 }
00094
00095
00096 TaskList& queue = queueForTaskType( t.type );
00097 const int idx = queue.indexOf( t );
00098 if ( idx != -1 ) {
00099 queue[ idx ].dbusMsgs << msg;
00100 return;
00101 }
00102
00103 t.dbusMsgs << msg;
00104 queue << t;
00105 signalTaskToTracker( t, "FetchItem" );
00106 scheduleNext();
00107 }
00108
00109 void ResourceScheduler::scheduleResourceCollectionDeletion()
00110 {
00111 Task t;
00112 t.type = DeleteResourceCollection;
00113 TaskList& queue = queueForTaskType( t.type );
00114 if ( queue.contains( t ) || mCurrentTask == t )
00115 return;
00116 queue << t;
00117 signalTaskToTracker( t, "DeleteResourceCollection" );
00118 scheduleNext();
00119 }
00120
00121 void ResourceScheduler::scheduleChangeReplay()
00122 {
00123 Task t;
00124 t.type = ChangeReplay;
00125 TaskList& queue = queueForTaskType( t.type );
00126
00127 if ( queue.contains( t ) )
00128 return;
00129 queue << t;
00130 signalTaskToTracker( t, "ChangeReplay" );
00131 scheduleNext();
00132 }
00133
00134 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
00135 {
00136 Task t;
00137 t.type = SyncAllDone;
00138 TaskList& queue = queueForTaskType( t.type );
00139
00140 queue << t;
00141 signalTaskToTracker( t, "SyncAllDone" );
00142 scheduleNext();
00143 }
00144
00145 void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver, const char* methodName, const QVariant &argument, ResourceBase::SchedulePriority priority )
00146 {
00147 Task t;
00148 t.type = Custom;
00149 t.receiver = receiver;
00150 t.methodName = methodName;
00151 t.argument = argument;
00152 QueueType queueType = GenericTaskQueue;
00153 if ( priority == ResourceBase::AfterChangeReplay )
00154 queueType = AfterChangeReplayQueue;
00155 else if ( priority == ResourceBase::Prepend )
00156 queueType = PrependTaskQueue;
00157 TaskList& queue = mTaskList[ queueType ];
00158
00159 if ( queue.contains( t ) )
00160 return;
00161
00162 switch (priority) {
00163 case ResourceBase::Prepend:
00164 queue.prepend( t );
00165 break;
00166 default:
00167 queue.append(t);
00168 break;
00169 }
00170
00171 signalTaskToTracker( t, "Custom-" + t.methodName );
00172 scheduleNext();
00173 }
00174
00175 void ResourceScheduler::taskDone()
00176 {
00177 if ( isEmpty() )
00178 emit status( AgentBase::Idle, i18nc( "@info:status Application ready for work", "Ready" ) );
00179
00180 if ( s_resourcetracker ) {
00181 QList<QVariant> argumentList;
00182 argumentList << QString::number( mCurrentTask.serial )
00183 << QString();
00184 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
00185 }
00186
00187 mCurrentTask = Task();
00188 mCurrentTasksQueue = -1;
00189 scheduleNext();
00190 }
00191
00192 void ResourceScheduler::deferTask()
00193 {
00194 if ( mCurrentTask.type == Invalid )
00195 return;
00196
00197 if ( s_resourcetracker ) {
00198 QList<QVariant> argumentList;
00199 argumentList << QString::number( mCurrentTask.serial )
00200 << QString();
00201 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
00202 }
00203
00204 Task t = mCurrentTask;
00205 mCurrentTask = Task();
00206
00207 Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount );
00208 mTaskList[mCurrentTasksQueue].prepend( t );
00209 mCurrentTasksQueue = -1;
00210
00211 signalTaskToTracker( t, "DeferedTask" );
00212
00213 scheduleNext();
00214 }
00215
00216 bool ResourceScheduler::isEmpty()
00217 {
00218 for ( int i = 0; i < NQueueCount; ++i ) {
00219 if ( !mTaskList[i].isEmpty() )
00220 return false;
00221 }
00222 return true;
00223 }
00224
00225 void ResourceScheduler::scheduleNext()
00226 {
00227 if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline )
00228 return;
00229 QTimer::singleShot( 0, this, SLOT( executeNext() ) );
00230 }
00231
00232 void ResourceScheduler::executeNext()
00233 {
00234 if ( mCurrentTask.type != Invalid || isEmpty() )
00235 return;
00236
00237 for ( int i = 0; i < NQueueCount; ++i ) {
00238 if ( !mTaskList[ i ].isEmpty() ) {
00239 mCurrentTask = mTaskList[ i ].takeFirst();
00240 mCurrentTasksQueue = i;
00241 break;
00242 }
00243 }
00244
00245 if ( s_resourcetracker ) {
00246 QList<QVariant> argumentList;
00247 argumentList << QString::number( mCurrentTask.serial );
00248 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobStarted" ), argumentList);
00249 }
00250
00251 switch ( mCurrentTask.type ) {
00252 case SyncAll:
00253 emit executeFullSync();
00254 break;
00255 case SyncCollectionTree:
00256 emit executeCollectionTreeSync();
00257 break;
00258 case SyncCollection:
00259 emit executeCollectionSync( mCurrentTask.collection );
00260 break;
00261 case FetchItem:
00262 emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts );
00263 break;
00264 case DeleteResourceCollection:
00265 emit executeResourceCollectionDeletion();
00266 break;
00267 case ChangeReplay:
00268 emit executeChangeReplay();
00269 break;
00270 case SyncAllDone:
00271 emit fullSyncComplete();
00272 break;
00273 case Custom:
00274 {
00275 bool success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) );
00276 if ( !success )
00277 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName );
00278
00279 if ( !success )
00280 kError() << "Could not invoke slot" << mCurrentTask.methodName << "on" << mCurrentTask.receiver << "with argument" << mCurrentTask.argument;
00281 break;
00282 }
00283 default: {
00284 kError() << "Unhandled task type" << mCurrentTask.type;
00285 dump();
00286 Q_ASSERT( false );
00287 }
00288 }
00289 }
00290
00291 ResourceScheduler::Task ResourceScheduler::currentTask() const
00292 {
00293 return mCurrentTask;
00294 }
00295
00296 void ResourceScheduler::setOnline(bool state)
00297 {
00298 if ( mOnline == state )
00299 return;
00300 mOnline = state;
00301 if ( mOnline ) {
00302 scheduleNext();
00303 } else {
00304 if ( mCurrentTask.type != Invalid ) {
00305
00306 queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask );
00307 mCurrentTask = Task();
00308 mCurrentTasksQueue = -1;
00309 }
00310
00311 TaskList& itemFetchQueue = queueForTaskType( FetchItem );
00312 for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) {
00313 if ( (*it).type == FetchItem ) {
00314 (*it).sendDBusReplies( false );
00315 it = itemFetchQueue.erase( it );
00316 if ( s_resourcetracker ) {
00317 QList<QVariant> argumentList;
00318 argumentList << QString::number( mCurrentTask.serial )
00319 << QLatin1String( "Job canceled." );
00320 s_resourcetracker->asyncCallWithArgumentList( QLatin1String( "jobEnded" ), argumentList );
00321 }
00322 } else {
00323 ++it;
00324 }
00325 }
00326 }
00327 }
00328
00329 void ResourceScheduler::signalTaskToTracker( const Task &task, const QByteArray &taskType )
00330 {
00331
00332 if ( !s_resourcetracker && QDBusConnection::sessionBus().interface()->isServiceRegistered(QLatin1String( "org.kde.akonadiconsole" ) ) ) {
00333 s_resourcetracker = new QDBusInterface( QLatin1String( "org.kde.akonadiconsole" ),
00334 QLatin1String( "/resourcesJobtracker" ),
00335 QLatin1String( "org.freedesktop.Akonadi.JobTracker" ),
00336 QDBusConnection::sessionBus(), 0 );
00337 }
00338
00339 if ( s_resourcetracker ) {
00340 QList<QVariant> argumentList;
00341 argumentList << static_cast<AgentBase*>( parent() )->identifier()
00342 << QString::number( task.serial )
00343 << QString()
00344 << QString::fromLatin1( taskType );
00345 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobCreated" ), argumentList);
00346 }
00347 }
00348
00349 void ResourceScheduler::collectionRemoved( const Akonadi::Collection &collection )
00350 {
00351 if ( !collection.isValid() )
00352 return;
00353 TaskList& queue = queueForTaskType( SyncCollection );
00354 for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) {
00355 if ( (*it).type == SyncCollection && (*it).collection == collection ) {
00356 it = queue.erase( it );
00357 kDebug() << " erasing";
00358 } else
00359 ++it;
00360 }
00361 }
00362
00363 void ResourceScheduler::Task::sendDBusReplies( bool success )
00364 {
00365 Q_FOREACH( const QDBusMessage &msg, dbusMsgs ) {
00366 QDBusMessage reply( msg );
00367 reply << success;
00368 QDBusConnection::sessionBus().send( reply );
00369 }
00370 }
00371
00372 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type )
00373 {
00374 switch( type ) {
00375 case ChangeReplay:
00376 return ChangeReplayQueue;
00377 case FetchItem:
00378 return ItemFetchQueue;
00379 default:
00380 return GenericTaskQueue;
00381 }
00382 }
00383
00384 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type )
00385 {
00386 const QueueType qt = queueTypeForTaskType( type );
00387 return mTaskList[ qt ];
00388 }
00389
00390 void ResourceScheduler::dump()
00391 {
00392 kDebug() << "ResourceScheduler: Online:" << mOnline;
00393 kDebug() << " current task:" << mCurrentTask;
00394 for ( int i = 0; i < NQueueCount; ++i ) {
00395 const TaskList& queue = mTaskList[i];
00396 kDebug() << " queue" << i << queue.size() << "tasks:";
00397 for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) {
00398 kDebug() << " " << (*it);
00399 }
00400 }
00401 }
00402
00403 void ResourceScheduler::clear()
00404 {
00405 kDebug() << "Clearing ResourceScheduler queues:";
00406 for ( int i = 0; i < NQueueCount; ++i ) {
00407 TaskList& queue = mTaskList[i];
00408 queue.clear();
00409 }
00410 mCurrentTask = Task();
00411 mCurrentTasksQueue = -1;
00412 }
00413
00414 static const char s_taskTypes[][25] = {
00415 "Invalid",
00416 "SyncAll",
00417 "SyncCollectionTree",
00418 "SyncCollection",
00419 "FetchItem",
00420 "ChangeReplay",
00421 "DeleteResourceCollection",
00422 "SyncAllDone",
00423 "Custom"
00424 };
00425
00426 QDebug Akonadi::operator<<( QDebug d, const ResourceScheduler::Task& task )
00427 {
00428 d << task.serial << s_taskTypes[task.type];
00429 if ( task.type != ResourceScheduler::Invalid ) {
00430 if ( task.collection.id() != -1 )
00431 d << "collection" << task.collection.id();
00432 if ( task.item.id() != -1 )
00433 d << "item" << task.item.id();
00434 if ( !task.methodName.isEmpty() )
00435 d << task.methodName << task.argument;
00436 }
00437 return d;
00438 }
00439
00440
00441
00442 #include "resourcescheduler_p.moc"