用消息队列实现Client和Server间的通信
本文主要介绍UNIX操作系统中消息队列机制、调用方法以及多路复用消息技术,通过应用实例,给出了其在Client和Server间的通信实现。
网络应用的标准模型是客户一服务员模型。在金融系统应用软件开发中,这种模型被广泛的采用,并且实现这种模型的方法多种多样。因此,根据不同的业务类型,选择较好的实现方法至关重要。
在UNIX系统的内部结构中,含有消息机构,即所有的消息都放在内核中,并且它们都有一个相应的消息队列号。消息机构允许进程发送一个消息到任意其它进程,从而实现系统中进程间的通信。
一、UNIX系统中的消息机构
消息是一个格式化的可变长度的信息单元。它有如下属性:(1)长整数类型 (2)消息的数据长度 (3)数据。由于消息的长度是可变的,故将消息分为消息首部和消息数据两部分。在消息首部中,记录着消息的类型和大小,指向消息数据区的指针,消息队列的链接指针等。每个消息队列有一个称为 key的名称,如同用户文件描述符一样,每个消息队列还有一个消息队列描述符。此外,在一个系统中,可能有若干个消息队列,所有消息队列的头标组成一个数组。图1 示出了消息和消息队列的数据结构。
图1 消息和消息队列的数据结构
1. 建立或返回消息队列描述符
进程可用系统调用megget来建立或返回消息队列的描述符。该系统调用的语法格式为:
int megget(key,msgflg)
key_t key;
int megflg;
其中,key是消息队列的名字;msgflg是用户设置的标志。如果IPC_CREAT表示系统无以key命名的消息队列,则建立消息队列标识符;若已存在,则返回消息队列描述符msgid。
对于系统调用,核心将搜索消息队列头标数组,确定是否有指定关键字的消息队列。若无,核心将分配一新的队列结构,并返回给用户一个消息队列描述符;否则,它只是检查消息队列的许可权之后便返回。
2.消息的发送
进程可用megsnd( )系统调用来发送一个消息,并将它链入消息队列的尾部。该系统调用的语法格式如下:
int msgsnd(msgid,msgp,msgsz,msgflg)
int msgid;
struct msgbuf * msgp;
int msgsz,msgflg;
其中,msgid是由msgget返回的消息队列描述符;msgp指向包含这条消息的结构,该结构由如下两个成员组成:
struct msgbuf
{
long mtype; /* 消息类型 */
char mtext[ ]; /* 消息的文本 */
}
msgsz是mtext的字节长度;msgflg规定了当无内存空间来存储消息时,进程等待还是立即返回。
对于msgsnd( )系统调用,核心检查消息队列描述符和许可权是否合法;消息长度是否超过系统规定的长度,若过长,进程睡眠等待出现足够大的空间,通过检查后,核心为消息分配消息数据区,并将消息从用户空间拷贝到消息数据区,分配消息首部,将它链入该消息队列的尾部,在消息首部填写消息类型,大小以及指向消息数据区的指针,还有修改消息队列的头标中的数据。然后唤醒在等待消息到来的队列中睡眠的进程。
3. 消息的接收
进程可用msgrcv( )系统调用,从消息队列中读一条消息,语法格式为:
int msgrcv(msgid,msgp,msgsz,msgtyp,msgflg)
int msgid,msgsz,msgflg;
struct msgbuf * msgp;
long msgtyp;
其中,msgid,msgp,msgsz,msgflg与msgsnd相似,msgtype是规定用户想读的消息类型。
对于msgrcv( )系统调用是先由核心检查消息队列标识符和许可权,接着根据msgtyp分三种情况处理。
(1) msgtyp=0,核心寻找消息队列中的第一个消息,并将它返回给调用进程;
(2)msgtyp为正整数,核心返回给类型的第一个消息;
(3)msgtyp为负整数,核心应在其类型值小于或等于msgtyp绝对值的所有消息中,选择类型最低的第一消息返回。
如果所返回的消息的大小等于或小于用户请求,核心便将消息正文拷贝到用户区,再从队列中删除该消息,并唤醒睡眠的发送进程;如果消息比用户要求的大,则系统返回错误信息。
4. 消息队列的操纵
可利用msgctl( )系统调用,来改变消息队列的属性,即拥有者,许可权等。其语法格式如下:
int msgctl(msgid,cmd,buf)
int msgid,cmd;
struct msgid_ds * buf;
其中,cmd是规定的命令;buf是用户缓冲区地址,用户用它来存放控制参数和查询结果。命令可分为三类:(1)用于查询有关消息队列的情况。(2)用于改变有关消息队列的属性。(3)消除消息队列的标识符。
二、多路复用消息
在客户-服务员模型中,一个服务员往往对应多个客户。这时我们可以利用消息的类型参量,让多个进程把消息放入同一个队列中,以便消息队列能够多路复用。如图2 所示,我们只要把type置为1,以表示消息是从客户流向服务员的。如果客户把它的进程号作为消息的一部分传递,那么服务员只要把客户进程号作为其消息类型,把它的消息发送给客户进程。每个客户进程都把msgrcv的参数msgtyp置为其进程号。
图2 消息队列的多路复用
三、应用实例
由上面介绍,我们了解了UNIX 操作系统中消息队列机构及调用方法,为了进一步说明其在Client 和Server 间的通信,我们给出如下应用实例:
1.实例说明
本例主要是Client 端向Server 端提出查询申请,Server 端通过访问系统上INFORMIX 数据库,将所得信息反馈给Client 端。其中,把消息类型设为1表明消息是从Client 端传到Server 端,消息类型设为2表明消息是从Server 端传到Client 端。
INFORMIX 中的数据库dxddb 里的表 dxdtable 数据结构如下:
(1) name char( /* 姓名 */
(2) acct_no char(16) /* 帐号 */
(3) ph_code char( /* 电话号码 */
2.程序说明
(1)头文件(文件名:msgg.h)
#include
#include
#include
#define MKEY1 1234L
#define MKEY2 2345L
#define RERMS 0666
(2)公用函数
mesg_recv(id,mesgptr)
int id;
Mesg *mesgptr;
{
int n;
n=msgrcv(id,(char *) & (mesgptr->mesg_type),MAXMESGDATA,
mesgptr->mesg_type,0);
if((mesgptr->mesg_len=n) printf("msgrcv error.\n");
return(n);
}
mesg_send(id,mesgptr)
int id;
Mesg * mesgptr;
{
if(msgsnd(id,(char*)&(mesgptr->mesg_type),
mesgptr->mesg_len,0)!=0)
printf("msgsnd error\n");
}
(3)Client端程序(文件名:client.c)
#include "msgg.h"
#include
#define MAXMESGDATA 1024
typedef struct
{
int mesg_len;
long mesg_type;
char mesg_data[1024];
} Mesg;
Mesg mesg;
int i;
main()
{
int id;
int sum;
sum=0;
for(; {
if((id=msgget(MKEY1,0)) printf("client: can not msgget message queue 1.\n");
else
printf("CLIENT(%d)-> please send a message: ",sum++);
client(id);
}
if(msgctl(id,IPC_RMID,(struct msqid_ds*)0) printf("client: can't PMIN message queue 1.\n");
exit(0);
}
client(id)
int id;
{
int n;
if (fgets(mesg.mesg_data,MAXMESGDATA,stdin)==NULL)
printf("filename read error.\n");
n=strlen(mesg.mesg_data);
if(mesg.mesg_data[n-1]=='\n')
n--;
mesg.mesg_data[n]='\0';
mesg.mesg_len=n;
mesg.mesg_type=1L;
mesg_send(id,&mesg);
printf("client: this message has been sent to server. \nthe message from server is: \n\n");
mesg.mesg_type=2L;
while((n=mesg_recv(id,&mesg))>0)
if(write(1,mesg.mesg_data,n)!=n)
printf("data write error.\n");
if (n>0)
printf("data read error.\n");
}
(4)Server端程序(文件名:server.ec)
#include "msgg.h"
#include
#define MESGHDRSIZE 1024
$typedef struct
{
char name[9];
char acct_no[17];
char ph_code[8];
} st;
$typedef struct
{
int mesg_len;
long mesg_type;
char mesg_data[8];
st p_code;
} Mesg;
$typedef struct
{
int mesg_len;
long mesg_type;
char mesg_error[20];
} Mesg1;
$include sqltypes.h;
$include sqlca;
$Mesg mesg;
$Mesg1 mesg1;
$char mm[9];
main()
{
int id;
int sum;
sum=0;
for(;
{
if((id=msgget(MKEY1,RERMS|IPC_CREAT)) printf("server: can not get message queue 1.\n");
else
{
printf("server: waitting for server.... ");
printf("%d \n",sum++);
server(id);
}
}
}
server(id)
int id;
{
int n,filefd;
$database dxddb;
mesg.mesg_type=1L;
if((n=mesg_recv(id,&mesg)) printf("serve:filename read error\n");
mesg.mesg_data[n]='\0';
strcpy(mm,mesg.mesg_data);
printf("this message is :%s \n",mm);
mesg.mesg_type=2L;
$select * into $mesg.p_code
from dxdtable
where ph_code = $mm;
if(sqlca.sqlcode==0)
{
printf("this name is :%s\n",mesg.p_code.name);
printf("begin to send this message to client.\n");
mesg.mesg_len=sizeof(mesg);
mesg_send(id,&mesg);
mesg.mesg_len=0;
mesg_send(id,&mesg);
}
else
{
printf("this phone number is not exist ! \n");
strcpy(mesg1.mesg_error,"there are not this number !");
mesg1.mesg_type=2L;
mesg1.mesg_len=strlen(mesg1.mesg_error);
mesg_send(id,&mesg1);
mesg1.mesg_len=0;
mesg_send(id,&mesg1);
}
}