ACE自适配通信环境(ADAPTIVE Communication Environment)是可以自由使用、开放源码的面向对象(OO)框架(Framework)

源代码在线查看: ex2.cpp

软件大小: 10984 K
上传用户: ranzige
关键词: Communication Environment Framework ADAPTIVE
下载地址: 免注册下载 普通下载 VIP

相关代码

				// ex2.cpp,v 4.20 2003/11/01 11:15:23 dhinton Exp								// ============================================================================				//				// = LIBRARY				//   examples				//				// = FILENAME				//   ex2.cpp				//				// = DESCRIPTION				//    Example for using  and  for				//    intra-process communication.				//				// = AUTHOR				//    Gerhard Lenzer and Douglas C. Schmidt				//				// ============================================================================								#include "ace/OS_main.h"				#include "ace/UPIPE_Connector.h"				#include "ace/UPIPE_Acceptor.h"				#include "ace/Auto_Ptr.h"				#include "ace/OS_NS_time.h"								ACE_RCSID(UPIPE_SAP, ex2, "ex2.cpp,v 4.20 2003/11/01 11:15:23 dhinton Exp")								#if defined (ACE_HAS_THREADS)								// Data for testsuite.				static int size = 0;				static int iterations = 0;								static void *				supplier (void *)				{				  ACE_UPIPE_Stream s_stream;								  ACE_UPIPE_Addr c_addr (ACE_TEXT("pattern"));								  ACE_Auto_Basic_Array_Ptr mybuf (new char[size]);								  for (int i = 0; i < size; i++)				    mybuf[i] = 'a';								  ACE_DEBUG ((LM_DEBUG,				              "(%t) supplier starting connect thread\n"));								  ACE_UPIPE_Connector con;								  if (con.connect (s_stream, c_addr) == -1)				    ACE_ERROR ((LM_ERROR,				                "(%t) %p\n",				                "ACE_UPIPE_Acceptor.connect failed"));								  // Test asynchronicity (the "acausal principle" ;-)).				  s_stream.enable (ACE_SIGIO);								  ACE_Message_Block *mb_p;								  for (int j = 0; j < iterations; j++)				    {				      ACE_NEW_RETURN (mb_p,				                      ACE_Message_Block (size,				                                         ACE_Message_Block::MB_DATA,				                                         (ACE_Message_Block *) 0,				                                         mybuf.get ()),				                      0);				      if (s_stream.send (mb_p) == -1)				        ACE_ERROR_RETURN ((LM_ERROR,				                           "(%t) %p\n",				                           "send failed"),				                          0);				    }								  ACE_NEW_RETURN (mb_p,				                  ACE_Message_Block ((size_t) 0),				                  0);								  // Insert a 0-sized message block to signal the other side to shut				  // down.				  if (s_stream.send (mb_p) == -1)				    ACE_ERROR_RETURN ((LM_ERROR,				                       "(%t) %p\n",				                       "send failed"),				                          0);				  s_stream.close ();				  return 0;				}								static void *				consumer (void *)				{				  ACE_UPIPE_Stream c_stream;								  // Set the high water mark to size to achieve optimum performance.								  int wm = size * iterations;								  if (c_stream.control (ACE_IO_Cntl_Msg::SET_HWM,				                        &wm) == -1)				    ACE_DEBUG ((LM_DEBUG,				                "set HWM failed\n"));								  ACE_UPIPE_Addr serv_addr (ACE_TEXT("pattern"));								  // accept will wait up to 4 seconds				  ACE_UPIPE_Acceptor acc (serv_addr);								  ACE_DEBUG ((LM_DEBUG,				              "(%t) consumer spawning the supplier thread\n"));								  // Spawn the supplier thread.				  if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (supplier),				                                              (void *) 0,				                                              THR_NEW_LWP | THR_DETACHED) == -1)				    ACE_ERROR_RETURN ((LM_ERROR,				                       "%p\n",				                       "spawn"),				                      0);								  ACE_DEBUG ((LM_DEBUG,				              "(%t) consumer starting accept\n"));								  if (acc.accept (c_stream) == -1)				    ACE_ERROR ((LM_ERROR,				                "(%t) %p\n",				                "ACE_UPIPE_Acceptor.accept failed"));								  // Time measurement.				  time_t currsec;				  ACE_OS::time (&currsec);				  time_t start = (time_t) currsec;								  int received_messages = 0;								  for (ACE_Message_Block *mb = 0;				       c_stream.recv (mb) != -1 && mb->size () != 0;				       mb->release ())				    received_messages++;								  ACE_OS::time (&currsec);				  time_t secs = (time_t) currsec - start;								  ACE_DEBUG ((LM_DEBUG,				              "(%t) Transferred %d blocks of size %d\n"				              "The program ran %d seconds\n",				              received_messages, size, secs));				  c_stream.close ();				  return 0;				}								int				ACE_TMAIN (int argc, ACE_TCHAR *argv[])				{				  size = argc > 1 ? ACE_OS::atoi (argv[1]) : 32;				  iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : 16;								  // Spawn the two threads.				  if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (consumer),				                                              (void *) 0,				                                              THR_NEW_LWP | THR_DETACHED) == -1)				    ACE_ERROR_RETURN ((LM_ERROR,				                       "%p\n",				                       "spawn"),				                      1);				  // Wait for producer and consumer threads to exit.				  ACE_Thread_Manager::instance ()->wait ();				  return 0;				}				#else				int				ACE_TMAIN (int, ACE_TCHAR *[])				{				  ACE_ERROR_RETURN ((LM_ERROR,				                     "threads not supported on this platform\n"),				                     0);				}				#endif /* ACE_HAS_THREADS */							

相关资源