// task.cpp,v 1.5 1999/09/22 03:13:45 jcej Exp
#include "task.h"
#include "block.h"
/* Set our housekeeping pointer to NULL and tell the user we exist. */
Task::Task (size_t n_threads)
: barrier_ (n_threads),
n_threads_ (n_threads)
{
ACE_DEBUG ((LM_DEBUG,
"(%P|%t) Task ctor 0x%x\n",
(void *) this));
}
/* Take care of cleanup & tell the user we're going away. */
Task::~Task (void)
{
ACE_DEBUG ((LM_DEBUG,
"(%P|%t) Task dtor 0x%x\n",
(void *) this));
/* Get our shutdown notification out of the queue and release it. */
ACE_Message_Block *message;
/* Like the getq() in svc() below, this will block until a message
arrives. By blocking, we know that the destruction will be paused
until the last thread is done with the message block. */
this->getq (message);
message->release ();
}
/* Open the object to do work. Next, we activate the Task into the
number of requested threads. */
int
Task::open (void *unused)
{
ACE_UNUSED_ARG (unused);
return this->activate (THR_NEW_LWP,
n_threads_);
}
/* Tell the user we're closing and invoke the baseclass' close() to
take care of things. */
int
Task::close (u_long flags)
{
ACE_DEBUG ((LM_DEBUG,
"(%P|%t) Task close 0x%x\n",
(void *) this));
return inherited::close (flags);
}
/* Our svc() method waits for work on the queue and then processes
that work. */
int
Task::svc (void)
{
/* This will cause all of the threads to wait on this line until all
have invoked this method. The net result is that no thread in the
Task will get a shot at the queue until all of the threads are
active. There's no real need to do this but it's an easy intro
into the use of ACE_Barrier. */
this->barrier_.wait ();
ACE_DEBUG ((LM_DEBUG,
"(%P|%t) Task 0x%x starts in thread %d\n",
(void *) this,
ACE_Thread::self ()));
/* Remember that get() needs a reference to a pointer. To save
stack thrashing we'll go ahead and create a pointer outside of the
almost- infinite loop. */
ACE_Message_Block *message;
for (;;)
{
/* Get a message from the queue. Note that getq() will block
until a message shows up. That makes us very
processor-friendly. */
if (this->getq (message) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"getq"),
-1);
/* If we got the shutdown request, we need to go away. */
if (message->msg_type () == ACE_Message_Block::MB_HANGUP)
{
/* Forward the request to any peer threads. */
this->putq (message);
/* Leave the infinite loop so that the thread exits. */
break;
}
/* The message queue stores char* data. We use rd_ptr() to get
to the beginning of the data. */
const char *cp = message->rd_ptr ();
/* Move the rd_ptr() past the data we read. This isn't real
useful here since we won't be reading any more from the block
but it's a good habit to get into. */
message->rd_ptr (ACE_OS::strlen (cp));
/* Display the block's address and data to the user. */
ACE_DEBUG ((LM_DEBUG,
"(%P|%t) Block 0x%x contains (%s)\n",
(void *) message,
cp));
/* Pretend that it takes a while to process the data. */
ACE_OS::sleep (ACE_Time_Value (0, 5000));
/* Release the message block. Notice that we never delete a
message block. Blocks are reference counted & the release()
method will take care of the delete when there are no more
references to the data. */
message->release ();
}
return 0;
}