最新的版本ACE-5.6.8,刚从外文网上搬下,与大家分享.

源代码在线查看: ex2.cpp

软件大小: 24894 K
上传用户: guozhongjiesg02
关键词: ACE 版本
下载地址: 免注册下载 普通下载 VIP

相关代码

				// $Id: ex2.cpp 80826 2008-03-04 14:51:23Z wotte $
				
				// ============================================================================
				//
				// = LIBRARY
				//   examples
				//
				// = FILENAME
				//   ex2.cpp
				//
				// = DESCRIPTION
				//    Example for using  and  for
				//    intra-process communication.
				//
				// = AUTHOR
				//    Gerhard Lenzer and Douglas C. Schmidt
				//
				// ============================================================================
				
				#include "ace/OS_main.h"
				#include "ace/UPIPE_Connector.h"
				#include "ace/UPIPE_Acceptor.h"
				#include "ace/Auto_Ptr.h"
				#include "ace/OS_NS_time.h"
				
				ACE_RCSID(UPIPE_SAP, ex2, "$Id: ex2.cpp 80826 2008-03-04 14:51:23Z wotte $")
				
				#if defined (ACE_HAS_THREADS)
				
				// Data for testsuite.
				static int size = 0;
				static int iterations = 0;
				
				static void *
				supplier (void *)
				{
				  ACE_UPIPE_Stream s_stream;
				
				  ACE_UPIPE_Addr c_addr (ACE_TEXT("pattern"));
				
				  ACE_Auto_Basic_Array_Ptr mybuf (new char[size]);
				
				  for (int i = 0; i < size; i++)
				    mybuf[i] = 'a';
				
				  ACE_DEBUG ((LM_DEBUG,
				              "(%t) supplier starting connect thread\n"));
				
				  ACE_UPIPE_Connector con;
				
				  if (con.connect (s_stream, c_addr) == -1)
				    ACE_ERROR ((LM_ERROR,
				                "(%t) %p\n",
				                "ACE_UPIPE_Acceptor.connect failed"));
				
				  // Test asynchronicity (the "acausal principle" ;-)).
				  s_stream.enable (ACE_SIGIO);
				
				  ACE_Message_Block *mb_p;
				
				  for (int j = 0; j < iterations; j++)
				    {
				      ACE_NEW_RETURN (mb_p,
				                      ACE_Message_Block (size,
				                                         ACE_Message_Block::MB_DATA,
				                                         (ACE_Message_Block *) 0,
				                                         mybuf.get ()),
				                      0);
				      if (s_stream.send (mb_p) == -1)
				        ACE_ERROR_RETURN ((LM_ERROR,
				                           "(%t) %p\n",
				                           "send failed"),
				                          0);
				    }
				
				  ACE_NEW_RETURN (mb_p,
				                  ACE_Message_Block ((size_t) 0),
				                  0);
				
				  // Insert a 0-sized message block to signal the other side to shut
				  // down.
				  if (s_stream.send (mb_p) == -1)
				    ACE_ERROR_RETURN ((LM_ERROR,
				                       "(%t) %p\n",
				                       "send failed"),
				                          0);
				  s_stream.close ();
				  return 0;
				}
				
				static void *
				consumer (void *)
				{
				  ACE_UPIPE_Stream c_stream;
				
				  // Set the high water mark to size to achieve optimum performance.
				
				  int wm = size * iterations;
				
				  if (c_stream.control (ACE_IO_Cntl_Msg::SET_HWM,
				                        &wm) == -1)
				    ACE_DEBUG ((LM_DEBUG,
				                "set HWM failed\n"));
				
				  ACE_UPIPE_Addr serv_addr (ACE_TEXT("pattern"));
				
				  // accept will wait up to 4 seconds
				  ACE_UPIPE_Acceptor acc (serv_addr);
				
				  ACE_DEBUG ((LM_DEBUG,
				              "(%t) consumer spawning the supplier thread\n"));
				
				  // Spawn the supplier thread.
				  if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (supplier),
				                                              (void *) 0,
				                                              THR_NEW_LWP | THR_DETACHED) == -1)
				    ACE_ERROR_RETURN ((LM_ERROR,
				                       "%p\n",
				                       "spawn"),
				                      0);
				
				  ACE_DEBUG ((LM_DEBUG,
				              "(%t) consumer starting accept\n"));
				
				  if (acc.accept (c_stream) == -1)
				    ACE_ERROR ((LM_ERROR,
				                "(%t) %p\n",
				                "ACE_UPIPE_Acceptor.accept failed"));
				
				  // Time measurement.
				  time_t currsec;
				  ACE_OS::time (&currsec);
				  time_t start = (time_t) currsec;
				
				  int received_messages = 0;
				
				  for (ACE_Message_Block *mb = 0;
				       c_stream.recv (mb) != -1 && mb->size () != 0;
				       mb->release ())
				    received_messages++;
				
				  ACE_OS::time (&currsec);
				  time_t secs = (time_t) currsec - start;
				
				  ACE_DEBUG ((LM_DEBUG,
				              "(%t) Transferred %d blocks of size %d\n"
				              "The program ran %d seconds\n",
				              received_messages, size, secs));
				  c_stream.close ();
				  return 0;
				}
				
				int
				ACE_TMAIN (int argc, ACE_TCHAR *argv[])
				{
				  size = argc > 1 ? ACE_OS::atoi (argv[1]) : 32;
				  iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : 16;
				
				  // Spawn the two threads.
				  if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (consumer),
				                                              (void *) 0,
				                                              THR_NEW_LWP | THR_DETACHED) == -1)
				    ACE_ERROR_RETURN ((LM_ERROR,
				                       "%p\n",
				                       "spawn"),
				                      1);
				  // Wait for producer and consumer threads to exit.
				  ACE_Thread_Manager::instance ()->wait ();
				  return 0;
				}
				#else
				int
				ACE_TMAIN (int, ACE_TCHAR *[])
				{
				  ACE_ERROR_RETURN ((LM_ERROR,
				                     "threads not supported on this platform\n"),
				                     0);
				}
				#endif /* ACE_HAS_THREADS */
							

相关资源