View Javadoc

1   //========================================================================
2   //Copyright 2004-2008 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //Licensed under the Apache License, Version 2.0 (the "License");
5   //you may not use this file except in compliance with the License.
6   //You may obtain a copy of the License at 
7   //http://www.apache.org/licenses/LICENSE-2.0
8   //Unless required by applicable law or agreed to in writing, software
9   //distributed under the License is distributed on an "AS IS" BASIS,
10  //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  //See the License for the specific language governing permissions and
12  //limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.io.nio;
16  
17  import java.io.IOException;
18  import java.nio.channels.CancelledKeyException;
19  import java.nio.channels.SelectableChannel;
20  import java.nio.channels.SelectionKey;
21  import java.nio.channels.Selector;
22  import java.nio.channels.ServerSocketChannel;
23  import java.nio.channels.SocketChannel;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  
28  import org.mortbay.component.AbstractLifeCycle;
29  import org.mortbay.io.Connection;
30  import org.mortbay.io.EndPoint;
31  import org.mortbay.log.Log;
32  import org.mortbay.thread.Timeout;
33  
34  
35  /* ------------------------------------------------------------ */
36  /**
37   * The Selector Manager manages and number of SelectSets to allow
38   * NIO scheduling to scale to large numbers of connections.
39   * 
40   * @author gregw
41   *
42   */
43  public abstract class SelectorManager extends AbstractLifeCycle
44  {
45      // TODO Tune these by approx system speed.
46      private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.mortbay.io.nio.JVMBUG_THRESHHOLD",512).intValue();
47      private static final int __MONITOR_PERIOD=Integer.getInteger("org.mortbay.io.nio.MONITOR_PERIOD",1000).intValue();
48      private static final int __MAX_SELECTS=Integer.getInteger("org.mortbay.io.nio.MAX_SELECTS",15000).intValue();
49      private static final int __BUSY_PAUSE=Integer.getInteger("org.mortbay.io.nio.BUSY_PAUSE",50).intValue();
50      private static final int __BUSY_KEY=Integer.getInteger("org.mortbay.io.nio.BUSY_KEY",-1).intValue();
51      
52      private boolean _delaySelectKeyUpdate=true;
53      private long _maxIdleTime;
54      private long _lowResourcesConnections;
55      private long _lowResourcesMaxIdleTime;
56      private transient SelectSet[] _selectSet;
57      private int _selectSets=1;
58      private volatile int _set;
59      
60      /* ------------------------------------------------------------ */
61      /**
62       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
63       * @see {@link #setLowResourcesMaxIdleTime(long)}
64       */
65      public void setMaxIdleTime(long maxIdleTime)
66      {
67          _maxIdleTime=maxIdleTime;
68      }
69      
70      /* ------------------------------------------------------------ */
71      /**
72       * @param selectSets
73       */
74      public void setSelectSets(int selectSets)
75      {
76          long lrc = _lowResourcesConnections * _selectSets; 
77          _selectSets=selectSets;
78          _lowResourcesConnections=lrc/_selectSets;
79      }
80      
81      /* ------------------------------------------------------------ */
82      /**
83       * @return
84       */
85      public long getMaxIdleTime()
86      {
87          return _maxIdleTime;
88      }
89      
90      /* ------------------------------------------------------------ */
91      /**
92       * @return
93       */
94      public int getSelectSets()
95      {
96          return _selectSets;
97      }
98      
99      /* ------------------------------------------------------------ */
100     /**
101      * @return
102      */
103     public boolean isDelaySelectKeyUpdate()
104     {
105         return _delaySelectKeyUpdate;
106     }
107 
108     /* ------------------------------------------------------------ */
109     /** Register a channel
110      * @param channel
111      * @param att Attached Object
112      * @throws IOException
113      */
114     public void register(SocketChannel channel, Object att) throws IOException
115     {
116         int s=_set++; 
117         s=s%_selectSets;
118         SelectSet[] sets=_selectSet;
119         if (sets!=null)
120         {
121             SelectSet set=sets[s];
122             set.addChange(channel,att);
123             set.wakeup();
124         }
125     }
126     
127     /* ------------------------------------------------------------ */
128     /** Register a serverchannel
129      * @param acceptChannel
130      * @return
131      * @throws IOException
132      */
133     public void register(ServerSocketChannel acceptChannel) throws IOException
134     {
135         int s=_set++; 
136         s=s%_selectSets;
137         SelectSet set=_selectSet[s];
138         set.addChange(acceptChannel);
139         set.wakeup();
140     }
141 
142     /* ------------------------------------------------------------ */
143     /**
144      * @return the lowResourcesConnections
145      */
146     public long getLowResourcesConnections()
147     {
148         return _lowResourcesConnections*_selectSets;
149     }
150 
151     /* ------------------------------------------------------------ */
152     /**
153      * Set the number of connections, which if exceeded places this manager in low resources state.
154      * This is not an exact measure as the connection count is averaged over the select sets.
155      * @param lowResourcesConnections the number of connections
156      * @see {@link #setLowResourcesMaxIdleTime(long)}
157      */
158     public void setLowResourcesConnections(long lowResourcesConnections)
159     {
160         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
161     }
162 
163     /* ------------------------------------------------------------ */
164     /**
165      * @return the lowResourcesMaxIdleTime
166      */
167     public long getLowResourcesMaxIdleTime()
168     {
169         return _lowResourcesMaxIdleTime;
170     }
171 
172     /* ------------------------------------------------------------ */
173     /**
174      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
175      * @see {@link #setMaxIdleTime(long)}
176      */
177     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
178     {
179         _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
180     }
181     
182     /* ------------------------------------------------------------ */
183     /**
184      * @param acceptorID
185      * @throws IOException
186      */
187     public void doSelect(int acceptorID) throws IOException
188     {
189         SelectSet[] sets= _selectSet;
190         if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
191             sets[acceptorID].doSelect();
192     }
193 
194 
195     /* ------------------------------------------------------------ */
196     /**
197      * @param delaySelectKeyUpdate
198      */
199     public void setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate)
200     {
201         _delaySelectKeyUpdate=delaySelectKeyUpdate;
202     }
203 
204     /* ------------------------------------------------------------ */
205     /**
206      * @param key
207      * @return
208      * @throws IOException 
209      */
210     protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
211 
212     /* ------------------------------------------------------------------------------- */
213     public abstract boolean dispatch(Runnable task) throws IOException;
214 
215     /* ------------------------------------------------------------ */
216     /* (non-Javadoc)
217      * @see org.mortbay.component.AbstractLifeCycle#doStart()
218      */
219     protected void doStart() throws Exception
220     {
221         _selectSet = new SelectSet[_selectSets];
222         for (int i=0;i<_selectSet.length;i++)
223             _selectSet[i]= new SelectSet(i);
224 
225         super.doStart();
226     }
227 
228 
229     /* ------------------------------------------------------------------------------- */
230     protected void doStop() throws Exception
231     {
232         SelectSet[] sets= _selectSet;
233         _selectSet=null;
234         if (sets!=null)
235             for (int i=0;i<sets.length;i++)
236             {
237                 SelectSet set = sets[i];
238                 if (set!=null)
239                     set.stop();
240             }
241         super.doStop();
242     }
243 
244     /* ------------------------------------------------------------ */
245     /**
246      * @param endpoint
247      */
248     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
249 
250     /* ------------------------------------------------------------ */
251     /**
252      * @param endpoint
253      */
254     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
255 
256     /* ------------------------------------------------------------------------------- */
257     protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
258 
259     /* ------------------------------------------------------------ */
260     /**
261      * @param channel
262      * @param selectSet
263      * @param sKey
264      * @return
265      * @throws IOException
266      */
267     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
268 
269     /* ------------------------------------------------------------------------------- */
270     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
271     {
272         Log.warn(ex);
273     }
274     
275     /* ------------------------------------------------------------------------------- */
276     /* ------------------------------------------------------------------------------- */
277     /* ------------------------------------------------------------------------------- */
278     public class SelectSet 
279     {
280         private transient int _change;
281         private transient List[] _changes;
282         private transient Timeout _idleTimeout;
283         private transient int _nextSet;
284         private transient Timeout _retryTimeout;
285         private transient Selector _selector;
286         private transient int _setID;
287         private volatile boolean _selecting;
288         private transient int _jvmBug;
289         private int _selects;
290         private long _monitorStart;
291         private long _monitorNext;
292         private boolean _pausing;
293         private SelectionKey _busyKey;
294         private int _busyKeyCount;
295         private long _log;
296         private int _paused;
297         private int _jvmFix0;
298         private int _jvmFix1;
299         private int _jvmFix2;
300         
301         /* ------------------------------------------------------------ */
302         SelectSet(int acceptorID) throws Exception
303         {
304             _setID=acceptorID;
305 
306             _idleTimeout = new Timeout(this);
307             _idleTimeout.setDuration(getMaxIdleTime());
308             _retryTimeout = new Timeout(this);
309             _retryTimeout.setDuration(0L);
310 
311             // create a selector;
312             _selector = Selector.open();
313             _changes = new ArrayList[] {new ArrayList(),new ArrayList()};
314             _change=0;
315             _monitorStart=System.currentTimeMillis();
316             _monitorNext=_monitorStart+__MONITOR_PERIOD;
317             _log=_monitorStart+60000;
318         }
319         
320         /* ------------------------------------------------------------ */
321         public void addChange(Object point)
322         {
323             synchronized (_changes)
324             {
325                 _changes[_change].add(point);
326             }
327         }
328         
329         /* ------------------------------------------------------------ */
330         public void addChange(SelectableChannel channel, Object att)
331         {   
332             if (att==null)
333                 addChange(channel);
334             else if (att instanceof EndPoint)
335                 addChange(att);
336             else
337                 addChange(new ChangeSelectableChannel(channel,att));
338         }
339         
340         /* ------------------------------------------------------------ */
341         public void cancelIdle(Timeout.Task task)
342         {
343             synchronized (this)
344             {
345                 task.cancel();
346             }
347         }
348 
349         /* ------------------------------------------------------------ */
350         /**
351          * Select and dispatch tasks found from changes and the selector.
352          * 
353          * @throws IOException
354          */
355         public void doSelect() throws IOException
356         {
357             SelectionKey key=null;
358             
359             try
360             {
361                 List changes;
362                 final Selector selector;
363                 synchronized (_changes)
364                 {
365                     changes=_changes[_change];
366                     _change=_change==0?1:0;
367                     _selecting=true;
368                     selector=_selector;
369                 }
370 
371                 // Make any key changes required
372                 for (int i = 0; i < changes.size(); i++)
373                 {
374                     try
375                     {
376                         Object o = changes.get(i);
377                         
378                         if (o instanceof EndPoint)
379                         {
380                             // Update the operations for a key.
381                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
382                             endpoint.doUpdateKey();
383                         }
384                         else if (o instanceof Runnable)
385                         {
386                             dispatch((Runnable)o);
387                         }
388                         else if (o instanceof ChangeSelectableChannel)
389                         {
390                             // finish accepting/connecting this connection
391                             final ChangeSelectableChannel asc = (ChangeSelectableChannel)o;
392                             final SelectableChannel channel=asc._channel;
393                             final Object att = asc._attachment;
394 
395                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
396                             {
397                                 key = channel.register(selector,SelectionKey.OP_READ,att);
398                                 SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key);
399                                 key.attach(endpoint);
400                                 endpoint.dispatch();
401                             }
402                             else if (channel.isOpen())
403                             {
404                                 channel.register(selector,SelectionKey.OP_CONNECT,att);
405                             }
406                         }
407                         else if (o instanceof SocketChannel)
408                         {
409                             final SocketChannel channel=(SocketChannel)o;
410 
411                             if (channel.isConnected())
412                             {
413                                 key = channel.register(selector,SelectionKey.OP_READ,null);
414                                 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
415                                 key.attach(endpoint);
416                                 endpoint.dispatch();
417                             }
418                             else if (channel.isOpen())
419                             {
420                                 channel.register(selector,SelectionKey.OP_CONNECT,null);
421                             }
422                         }
423                         else if (o instanceof ServerSocketChannel)
424                         {
425                             ServerSocketChannel channel = (ServerSocketChannel)o;
426                             channel.register(getSelector(),SelectionKey.OP_ACCEPT);
427                         }
428                         else if (o instanceof ChangeTask)
429                         {
430                             ((ChangeTask)o).run();
431                         }
432                         else
433                             throw new IllegalArgumentException(o.toString());
434                     }
435                     catch (Exception e)
436                     {
437                         if (isRunning())
438                             Log.warn(e);
439                         else
440                             Log.debug(e);
441                     }
442                 }
443                 changes.clear();
444 
445                 long idle_next = 0;
446                 long retry_next = 0;
447                 long now=System.currentTimeMillis();
448                 synchronized (this)
449                 {
450                     _idleTimeout.setNow(now);
451                     _retryTimeout.setNow(now);
452                     if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)
453                         _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
454                     else 
455                         _idleTimeout.setDuration(_maxIdleTime);
456                     idle_next=_idleTimeout.getTimeToNext();
457                     retry_next=_retryTimeout.getTimeToNext();
458                 }
459 
460                 // workout how low to wait in select
461                 long wait = 1000L;  // not getMaxIdleTime() as the now value of the idle timers needs to be updated.
462                 if (idle_next >= 0 && wait > idle_next)
463                     wait = idle_next;
464                 if (wait > 0 && retry_next >= 0 && wait > retry_next)
465                     wait = retry_next;
466     
467                 // Do the select.
468                 if (wait > 2) // TODO tune or configure this
469                 {
470                     // If we are in pausing mode
471                     if (_pausing)
472                     {
473                         try
474                         {
475                             Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
476                         }
477                         catch(InterruptedException e)
478                         {
479                             Log.ignore(e);
480                         }
481                     }
482                         
483                     long before=now;
484                     int selected=selector.select(wait);
485                     now = System.currentTimeMillis();
486                     _idleTimeout.setNow(now);
487                     _retryTimeout.setNow(now);
488                     _selects++;
489 
490                     // Look for JVM bugs over a monitor period.
491                     // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
492                     // http://bugs.sun.com/view_bug.do?bug_id=6693490
493                     if (now>_monitorNext)
494                     {
495                         _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
496                         _pausing=_selects>__MAX_SELECTS;
497                         if (_pausing)
498                             _paused++;
499                             
500                         _selects=0;
501                         _jvmBug=0;
502                         _monitorStart=now;
503                         _monitorNext=now+__MONITOR_PERIOD;
504                     }
505                     
506                     if (now>_log)
507                     {
508                         if (_paused>0)  
509                             Log.info(this+" Busy selector - injecting delay "+_paused+" times");
510 
511                         if (_jvmFix2>0)
512                             Log.info(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
513 
514                         if (_jvmFix1>0)
515                             Log.info(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, canceled keys "+_jvmFix0+" times");
516 
517                         else if(Log.isDebugEnabled() && _jvmFix0>0)
518                             Log.info(this+" JVM BUG(s) - canceled keys "+_jvmFix0+" times");
519                         _paused=0;
520                         _jvmFix2=0;
521                         _jvmFix1=0;
522                         _jvmFix0=0;
523                         _log=now+60000;
524                     }
525                     
526                     // If we see signature of possible JVM bug, increment count.
527                     if (selected==0 && wait>10 && (now-before)<(wait/2))
528                     {
529                         // Increment bug count and try a work around
530                         _jvmBug++;
531                         if (_jvmBug>(__JVMBUG_THRESHHOLD))
532                         {
533                             try
534                             {
535                                 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
536                                     _jvmFix2++;
537                                     
538                                 Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop
539                             }
540                             catch(InterruptedException e)
541                             {
542                                 Log.ignore(e);
543                             }
544                         }
545                         else if (_jvmBug==__JVMBUG_THRESHHOLD)
546                         {
547                             synchronized (this)
548                             {
549                                 // BLOODY SUN BUG !!!  Try refreshing the entire selector.
550                                 _jvmFix1++;
551                                 
552                                 final Selector new_selector = Selector.open();
553                                 Iterator iterator = _selector.keys().iterator();
554                                 while (iterator.hasNext())
555                                 {
556                                     SelectionKey k = (SelectionKey)iterator.next();
557                                     if (!k.isValid() || k.interestOps()==0)
558                                         continue;
559                                     
560                                     final SelectableChannel channel = k.channel();
561                                     final Object attachment = k.attachment();
562                                     
563                                     if (attachment==null)
564                                         addChange(channel);
565                                     else
566                                         addChange(channel,attachment);
567                                 }
568                                 _selector.close();
569                                 _selector=new_selector;
570                                 return;
571                             }
572                         }
573                         else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops
574                         {
575                             // Cancel keys with 0 interested ops
576                             int cancelled=0;
577                             Iterator iter = selector.keys().iterator();
578                             while(iter.hasNext())
579                             {
580                                 SelectionKey k = (SelectionKey) iter.next();
581                                 if (k.isValid()&&k.interestOps()==0)
582                                 {
583                                     k.cancel();
584                                     cancelled++;
585                                 }
586                             }
587                             if (cancelled>0)
588                                 _jvmFix0++;
589                             
590                             return;
591                         }
592                     }
593                     else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
594                     {
595                         // Look for busy key
596                         SelectionKey busy = (SelectionKey)selector.selectedKeys().iterator().next();
597                         if (busy==_busyKey)
598                         {
599                             if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
600                             {
601                                 final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
602                                 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
603                                 busy.cancel();
604                                 if (endpoint!=null)
605                                 {
606                                     dispatch(new Runnable()
607                                     {
608                                         public void run()
609                                         {
610                                             try
611                                             {
612                                                 endpoint.close();
613                                             }
614                                             catch (IOException e)
615                                             {
616                                                 Log.ignore(e);
617                                             }
618                                         }
619                                     });
620                                 }
621                             }
622                         }
623                         else
624                             _busyKeyCount=0;
625                         _busyKey=busy;
626                     }
627                 }
628                 else 
629                 {
630                     selector.selectNow();
631                     _selects++;
632                 }
633 
634                 // have we been destroyed while sleeping
635                 if (_selector==null || !selector.isOpen())
636                     return;
637 
638                 // Look for things to do
639                 Iterator iter = selector.selectedKeys().iterator();
640                 while (iter.hasNext())
641                 {
642                     key = (SelectionKey) iter.next();
643                                         
644                     try
645                     {
646                         if (!key.isValid())
647                         {
648                             key.cancel();
649                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
650                             if (endpoint != null)
651                                 endpoint.doUpdateKey();
652                             continue;
653                         }
654                         
655                         Object att = key.attachment();
656                         
657                         if (att instanceof SelectChannelEndPoint)
658                         {
659                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)att;
660                             endpoint.dispatch();
661                         }
662                         else if (key.isAcceptable())
663                         {
664                             SocketChannel channel = acceptChannel(key);
665                             if (channel==null)
666                                 continue;
667 
668                             channel.configureBlocking(false);
669 
670                             // TODO make it reluctant to leave 0
671                             _nextSet=++_nextSet%_selectSet.length;
672 
673                             // Is this for this selectset
674                             if (_nextSet==_setID)
675                             {
676                                 // bind connections to this select set.
677                                 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
678                                 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
679                                 cKey.attach(endpoint);
680                                 if (endpoint != null)
681                                     endpoint.dispatch();
682                             }
683                             else
684                             {
685                                 // nope - give it to another.
686                                 _selectSet[_nextSet].addChange(channel);
687                                 _selectSet[_nextSet].wakeup();
688                             }
689                         }
690                         else if (key.isConnectable())
691                         {
692                             // Complete a connection of a registered channel
693                             SocketChannel channel = (SocketChannel)key.channel();
694                             boolean connected=false;
695                             try
696                             {
697                                 connected=channel.finishConnect();
698                             }
699                             catch(Exception e)
700                             {
701                                 connectionFailed(channel,e,att);
702                             }
703                             finally
704                             {
705                                 if (connected)
706                                 {
707                                     key.interestOps(SelectionKey.OP_READ);
708                                     SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
709                                     key.attach(endpoint);
710                                     endpoint.dispatch();
711                                 }
712                                 else
713                                 {
714                                     key.cancel();
715                                 }
716                             }
717                         }
718                         else
719                         {
720                             // Wrap readable registered channel in an endpoint
721                             SocketChannel channel = (SocketChannel)key.channel();
722                             SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
723                             key.attach(endpoint);
724                             if (key.isReadable())
725                                 endpoint.dispatch();                           
726                         }
727                         key = null;
728                     }
729                     catch (CancelledKeyException e)
730                     {
731                         Log.ignore(e);
732                     }
733                     catch (Exception e)
734                     {
735                         if (isRunning())
736                             Log.warn(e);
737                         else
738                             Log.ignore(e);
739 
740                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
741                         {
742                             key.interestOps(0);
743 
744                             key.cancel();
745                         } 
746                     }
747                 }
748                 
749                 // Everything always handled
750                 selector.selectedKeys().clear();
751 
752                 // tick over the timers
753                 _idleTimeout.tick(now);
754                 _retryTimeout.tick(now);
755                 
756             }
757             catch (CancelledKeyException e)
758             {
759                 Log.ignore(e);
760             }
761             finally
762             {
763                 _selecting=false;
764             }
765         }
766 
767         /* ------------------------------------------------------------ */
768         public SelectorManager getManager()
769         {
770             return SelectorManager.this;
771         }
772 
773         /* ------------------------------------------------------------ */
774         public long getNow()
775         {
776             return _idleTimeout.getNow();
777         }
778         
779         /* ------------------------------------------------------------ */
780         public void scheduleIdle(Timeout.Task task)
781         {
782             synchronized (this)
783             {
784                 if (_idleTimeout.getDuration() <= 0)
785                     return;
786                 
787                 task.schedule(_idleTimeout);
788             }
789         }
790 
791         /* ------------------------------------------------------------ */
792         public void scheduleTimeout(Timeout.Task task, long timeout)
793         {
794             synchronized (this)
795             {
796                 _retryTimeout.schedule(task, timeout);
797             }
798         }
799 
800         /* ------------------------------------------------------------ */
801         public void wakeup()
802         {
803             Selector selector = _selector;
804             if (selector!=null)
805                 selector.wakeup();
806         }
807 
808         /* ------------------------------------------------------------ */
809         Selector getSelector()
810         {
811             return _selector;
812         }
813         
814         /* ------------------------------------------------------------ */
815         void stop() throws Exception
816         {
817             boolean selecting=true;
818             while(selecting)
819             {
820                 wakeup();
821                 selecting=_selecting;
822             }
823             
824             ArrayList keys=new ArrayList(_selector.keys());
825             Iterator iter =keys.iterator();
826 
827             while (iter.hasNext())
828             {
829                 SelectionKey key = (SelectionKey)iter.next();
830                 if (key==null)
831                     continue;
832                 Object att=key.attachment();
833                 if (att instanceof EndPoint)
834                 {
835                     EndPoint endpoint = (EndPoint)att;
836                     try
837                     {
838                         endpoint.close();
839                     }
840                     catch(IOException e)
841                     {
842                         Log.ignore(e);
843                     }
844                 }
845             }
846             
847             synchronized (this)
848             {
849                 selecting=_selecting;
850                 while(selecting)
851                 {
852                     wakeup();
853                     selecting=_selecting;
854                 }
855                 
856                 _idleTimeout.cancelAll();
857                 _retryTimeout.cancelAll();
858                 try
859                 {
860                     if (_selector != null)
861                         _selector.close();
862                 }
863                 catch (IOException e)
864                 {
865                     Log.ignore(e);
866                 } 
867                 _selector=null;
868             }
869         }
870     }
871 
872     /* ------------------------------------------------------------ */
873     private static class ChangeSelectableChannel
874     {
875         final SelectableChannel _channel;
876         final Object _attachment;
877         
878         public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
879         {
880             super();
881             _channel = channel;
882             _attachment = attachment;
883         }
884     }
885 
886     /* ------------------------------------------------------------ */
887     private interface ChangeTask
888     {
889         public void run();
890     }
891 }