短信发送

源代码在线查看: consumer.java

软件大小: 9758 K
上传用户: KMPlayer33
关键词: 短信 发送
下载地址: 免注册下载 普通下载 VIP

相关代码

				/*
				 * Consumer.java
				 *
				 * Copyright (c) 2001 GlobaLoop LTD., Joseph Hartal, Ze'ev Bubis.
				 * All Rights Reserved.
				 *
				 * You may study, use, modify, and distribute this software for any
				 * purpose provided that this copyright notice appears in all copies.
				 *
				 * This software is provided WITHOUT WARRANTY either expressed or
				 * implied.
				 *
				 */
				
				package com.jdev.util;
				
				
				/**
				 * This is an Abstract Consumer class.
				 * This class can be used when a implementing a consumer-producer scenario.
				 * 
				 * Usage:
				 * Subclass Consumer and provide an implementation to the
				 * OnCosume(Object o) method. This method is called
				 * automatically in the consumer's thread context.
				 * The producer should call the add(Object o) method to add
				 * new "items" to the consumer's queue.
				 *
				 * @author Joseph Hartal & Ze'ev Bubis
				 */
				abstract public class Consumer
				{
				
					private static final ILogger logger = SimpleLogger.getLogger();
				
					private boolean _isTerminated = true;
					private boolean _isSleeping = false;
					private boolean isSleeping() {return _isSleeping;}
				
				   // the default implementaion of queue done by a vector queue (based on java.util.Vector)
					private IQueue _queue = new VectorQueue();
					private boolean _waitForTermination = true;
					private String _name;
				
					// Monitors - for wait\synchronize producer\consumer\termination\other threads
					private Object _waitForJobsMonitor = new Object();
					private Object _waitForFinishedMonitor = new Object();
					private Object _terminationMonitor = new Object();
				
					// The consumer's thread
					private Thread _thread = null;
				
					/**
					 * Set a buffer that will be used as the consumer's queue
					 *
					 * @param  a Queue that implements the IQueue interface
					 * @return  a reference to this Consumer object.
					 */
					public Consumer setQueue(IQueue buf)
					{
						synchronized(_queue)
						{
							_queue=buf;
						}
						return this;
					}
				
					/**
					 * Returns the buffer used as this Consumer's queue
					 *
					 * @return  the buffer used as this Consumer's queue
					 */
					protected IQueue getQueue()
					{
						return(_queue);
					}
				
					/**
					 * Returns whether the consumer has more items to consume.
					 *
					 * @return  true if there are more items to consume,
					 * otherwise false
					 */
					public boolean hasNothingToDo()
					{
						return _queue.isEmpty();
					}
				
					/**
					 * Lazy creation of the Consumer's thread.
					 *
					 * @return  the Consumer's thread
					 */
					private Thread getThread()
					{
						if (_thread==null)
						{
							_thread = new Thread()
							{
								public void run()
								{
									Consumer.this.run();
								}
							};
						}
						return _thread;
					}
				
					/**
					 *  Add an object to consume without waking the thread
					 *
					 *  @param the object to consume
					 */
					public void addNoNotify(Object o)
					{
						 _queue.add(o);
					}
				
				
					/**
					 *  Wake up the thread (without adding new stuff to consume)
					 *
					 */
					public void kickThread()
					{
						if (!this._thread.isInterrupted())
						{
							synchronized(_waitForJobsMonitor)
							{
								_waitForJobsMonitor.notifyAll();
							}
						}
					}
				
					/**
					* Add an object to the Consumer.
					* This is the entry point for the producer.
					* After the item is added, the consumer's thread will be notified.
					*
					* @param  the object to be 'consumed' by this consumer
					*/
					public void add(Object o)
					{
						_queue.add(o);
						kickThread();
					}
				
					/**
					 *  Notification of termination
					 *
					 */
					public void onThreadTerminate()
					{
						System.out.println("Consumer "+_name+" : Thread terminated." );
					}
				
					/**
					 * This method is called before notifiying that current batch was done and
					 * the queue is empty.
					 */
					synchronized public void goingToNotifyThatFinished()
					{
					}
				
					/**
					 * This method is called after notifiying that current batch was done and
					 * the queue is empty and just before the consumer's thread is
					 * put to sleep (wait).
					 */
					synchronized public void goingToRest()
					{
					}
				
				
					/**
					 * This method is called before remove new one from queue
					 * the consumer's thread has just woken up from sleep (wait).
					 */
					synchronized public void goingToWakeup()
					{
					}
				
				
					/**
					 *  This method will cause the calling thread to wait until the current
					 *  conumer batch is finished (i.e. queue is finished).
					 */
					public void waitTillCurrentBatchFinished()
					{
						// make sure the thread is in a steady state of sleeping or working
						synchronized(_waitForJobsMonitor)
						{
						}
				
						// going to wait on _waitForFinishedMonitor
						synchronized(_waitForFinishedMonitor)
						{
							// Is the tread already sleeping and there are no jobs to be done?
				         // then there is no need to wait.
							if (isSleeping() && hasNothingToDo())
								return;
				
							try
							{
								// wait for finished notification
								_waitForFinishedMonitor.wait();
							}
							catch (java.lang.InterruptedException ex)
							{
							}
				
							// the thread is not doing any work.
							return;
						}
					}
				
					/**
					 *  Terminate Consumer. Un-consumed objects will be lost.
					 *
					 */
					public void terminate()
					{
						_isTerminated = true;
				
				      // don't work anymore
				      _queue.clear();
				
				      // wake thread if sleeping so it would terminate.
				      synchronized(_waitForJobsMonitor)
				      {
				         // release the waiting thread
				         _waitForJobsMonitor.notifyAll();
				      }
					}
				
					/**
					 *  Terminate Consumer and wait until the consumer's thread
					 *  finishes it's job.
					 *
					 */
					public void terminateWait()
					{
						if (_isTerminated)
						{
							return;
						}
				
				      // prevent queue substitution
				      synchronized(_queue)
				      {
				         // terminate now
				         terminate();
				
				         // if needed, wait till thread has finished
				         synchronized(_terminationMonitor)
				         {
				            if (_waitForTermination)
				            {
				               try
				               {
				                  _terminationMonitor.wait();
				               }
				               catch (java.lang.InterruptedException e)
				               {
				               }
				            }
				         }
				      }
					}//terminateWait()
				
				
					/**
					 * Clears the consumer's queue.
					 */
					public void clear()
					{
						_queue.clear();
					}
				
				
					/**
					 *  Main consumer's thread method.
					 */
					private void run()
					{
						while (!_isTerminated)
						{
							// job handling loop
							while (true)
							{
								Object o;
								synchronized (_queue)
								{
									if (_queue.isEmpty())
										break;
									o = _queue.remove();
								}
								if (o == null)
									break;
				
								try
								{
									onConsume(o);
								}
								catch(Exception ex)
								{
									logger.critical("Consumer '"+_name+"' : Exception while calling onConsume()!",ex);
								}
								catch(Error err)
								{
									logger.critical("Consumer '"+_name+"' : Error while calling onConsume()!!",err);
								}
								catch(Throwable th)
								{
									logger.critical("Consumer '"+_name+"' : Throwable while calling onConsume()!!!",th);
								}
							}
				
							// if we are not terminated and queue is still empty  - wait until
							// new jobs arrive.
				
							synchronized(_waitForJobsMonitor)
							{
								if (_isTerminated)
									break;
								if(_queue.isEmpty())
								{
									try
									{
										_isSleeping = true;
				
										// queue is now empty - notify that finished
										goingToNotifyThatFinished();
				
										// note all waiting that all is done
										synchronized(_waitForFinishedMonitor)
										{
											_waitForFinishedMonitor.notifyAll();
										}
				
										goingToRest();
				
										// sleep-py
										_waitForJobsMonitor.wait();
				
										_isSleeping = false;
									}
									catch (InterruptedException ex)
									{
									}
								}
							}
							goingToWakeup();
						}// while (!_isTerminated)
				
				      // out of while loop - about to terminate
				
						// Notify final termination
						synchronized(_terminationMonitor)
						{
							_waitForTermination = false;
							_terminationMonitor.notifyAll();
						}
						onThreadTerminate();
					}// run()
				
				
					/**
					 *  This is were the actual Consumer work is done.
					 */
					abstract protected void onConsume(Object o);
				
				
					public String toString()
					{
						return _name;
					}
				
					/**
					 * Sets the consumer's name (and the thread's name).
					 * @param   name   the consumer's name.
					 * @return  this consumer (for chaining).
					 */
					public Consumer setName(String name)
					{
					   _name = name;
						getThread().setName(_name + " Consumer");
						return this;
					}
				
					/**
					 * Changes the priority of the consumer's thread.
					 *
					 * @param   prio   the new priority of the consumer's thread.
					 * @see     java.lang.Thread#MAX_PRIORITY
					 * @see     java.lang.Thread#MIN_PRIORITY
					 * @return  this consumer (for chaining).
					 */
					public Consumer setPriority(int prio)
					{
						getThread().setPriority(prio);
						return this;
					}
				
					/**
					 * Start the Consumer.
					 * @return  this consumer (for chaining).
					 */
					public Consumer init()
					{
						if (!_isTerminated)
						{
				         logger.warning("Consumner " + this.toString() + ": init() called on a alread running consumer.");
							return this;
						}
				
						_isTerminated = false;
						_waitForTermination = true;
						getThread().setPriority(Thread.NORM_PRIORITY);
						getThread().start();
						return this;
					}
				
					/**
					 * Allocates a new Consumer object and sets it's name to
					 * it's class name.
					 */
					public Consumer()
					{
						setName(this.getClass().toString());
					}
				
				
				
					public static void main(String[] args)
					{
				      // A simple consumer test
				
						Consumer consumer = new Consumer()
						{
							public void onConsume(Object o)
							{
								Integer i=(Integer)o;
								System.out.println(">>> "+i);
							}
						}.init().setName("tester").setQueue(new VectorQueue());
				
						consumer.add(new Integer(0));
				
						int i=0;
						while(true)
						{
				         Integer next = new Integer(i++);
				         consumer.add(next);
				         try
				         {
				            Thread.currentThread().sleep(1000);
				         }
				         catch (Exception e)
				         {
				         }
						}
					}
				}
				
				
				
				
				
							

相关资源