kk Blog —— 通用基础


date [-d @int|str] [+%s|"+%F %T"]
netstat -ltunp
sar -n DEV 1

进程通信--消息队列

https://blog.csdn.net/lh2016rocky/article/details/70256844

消息队列提供了一种由一个进程向另一个进程发送块数据的方法。另外,每一个数据块被看作有一个类型,而接收进程可以独立接收具有不同类型的数据块。消息队列的好处在于我们几乎可以完全避免同步问题,并且可以通过发送消息屏蔽有名管道的问题。更好的是,我们可以使用某些紧急方式发送消息。坏处在于,与管道类似,在每一个数据块上有一个最大尺寸限制,同时在系统中所有消息队列上的块尺寸上也有一个最大尺寸限制。

尽管有这些限制,但是X/Open规范并没有定义这些限制的具体值,除了指出超过这些尺寸是某些消息队列功能失败的原因。Linux系统有两个定义,MSGMAX与MSGMNB,这分别定义单个消息与一个队列的最大尺寸。这些宏定义在其他系统上也许并不相同,甚至也许就不存在。

消息队列函数定义如下:

1
2
3
4
5
6
#include <sys/msg.h>

int msgget(key_t key, int msgflg);
int msgrcv(int msqid, void *msg_ptr, size_t msg_sz, long int msgtype, int msgflg);
int msgsnd(int msqid, const void *msg_ptr, size_t msg_sz, int msgflg);
int msgctl(int msqid, int cmd, struct msqid_ds *buf);

与信息号和共享内存一样,头文件sys/types.h与sys/ipc.h通常也是需要的。

msgget

我们可以使用msgget函数创建与访问一个消息队列:

1
int msgget(key_t key, int msgflg);

与其他IPC工具类似,程序必须提供一个指定一个特定消息队列的key值。特殊值IPC_PRIVATE创建一个私有队列,这在理论上只可以为当前进程所访问。与信息量和共享内存一样,在某些Linux系统上,消息队列并不是私有的。因为私有队列用处较少,因而这并不是一个严重问题。与前面一样,第二个参数,msgflg,由9个权限标记组成。要创建一个新的消息队列,由IPC_CREAT特殊位必须与其他的权限位进行或操作。设置IPC_CREAT标记与指定一个已存在的消息队列并不是错误。如果消息队列已经存在,IPC_CREAT标记只是简单的被忽略。

如果成功,msgget函数会返回一个正数作为队列标识符,如果失败则会返回-1。

msgsnd

msgsnd函数允许我们将消息添加到消息队列:

1
int msgsnd(int msqid, const void *msg_ptr, size_t msg_sz, int msgflg);

消息结构由两种方式来限定。第一,他必须小于系统限制,第二,必须以long int开始,这在接收函数中会用作一个消息类型。当我们在使用消息时,最好是以如下形式来定义我们的消息结构:

1
2
3
4
struct my_message {
	long int message_type;
	/* The data you wish to transfer */
}

因为message_type用于消息接收,所以我们不能简单的忽略他。我们必须定义我们自己的数据结构来包含并对其进行初始化,从而他可以包含一个可知的值。

第一个参数,msgid,是由msgget函数所返回的消息队列标识符。

第二个参数,msg_ptr,是一个指向要发送消息的指针,正如前面所描述的,这个消息必须以long int类型开始。

第三个参数,msg_sz,是由msg_ptr所指向的消息的尺寸。这个尺寸必须不包含long int消息类型。

第四个参数,msgflg,控制如果当前消息队列已满或是达到了队列消息的系统限制时如何处理。如果msgflg标记设置了IPC_NOWAIT,函数就会立即返回而不发送消息,并且返回值为-1。如果msgflg标记清除了IPC_NOWAIT标记,发送进程就会被挂起,等待队列中有可用的空间。

如果成功,函数会返回0,如果失败,则会返回-1。如果调用成功,系统就会复制一份消息数据并将其放入消息队列中。

msgrcv

msgrcv函数由一个消息队列中收取消息:

1
int msgrcv(int msqid, void *msg_ptr, size_t msg_sz, long int msgtype, int msgflg);

第一个参数,msqid,是由msgget函数所返回的消息队列标记符。

第二个参数,msg_ptr,是一个指向将要接收消息的指针,正如在msgsnd函数中所描述的,这个消息必须以long int类型开始。

第三个参数,msg_sz,是由msg_ptr所指向的消息的尺寸,并不包含long int消息类型。

第四个参数,msgtype,是一个long int类型,允许一个接收优先级形式的实现。如果msgtype的值为0,队列中第一个可用的消息就会被接收。如果其值大于0,具有相同消息类型的第一个消息就会被接收。如果其值小于0,第一个具有相同类型或是小于msgtype绝对值的消息就会被接收。

这听起来要比实际操作复杂得多。如果我们只是简单的希望以其发送的顺序来接收消息,我们可以将msgtype设置为0。如果我们希望接收特殊消息类型的消息,我们可以将msgtype设置为等于这个值。如果我们希望接收消息类型为n或是小于n的值,我们可以将msgtype设置为-n。

第五个参数,msgflg,控制当没有合适类型的消息正在等待被接收时如何处理。如果在msgflg中设置了IPC_NOWAIT位,调用就会立即返回,而返回值为-1。如果msgflg标记中消除了IPC_NOWAIT位,进程就会被挂起,等待一个合适类型的消息到来。

如果成功,msgrcv会返回放入接收缓冲区中的字节数,消息会被拷贝到由msg_ptr所指向的用户分配缓冲区中,而数据就会由消息队列中删除。如果失败则会返回-1。

msgctl

最后一个消息队列函数是msgctl,这与共享内存中的控制函数十分类似。

1
int msgctl(int msqid, int command, struct msqid_ds *buf);

msqid_ds结构至少包含下列成员:

1
2
3
4
5
struct msqid_ds {
	uid_t msg_perm.uid;
	uid_t msg_perm.gid
	mode_t msg_perm.mode;
}

第一个参数,msqid,是由msgget函数所返回的标记符。

第二个参数,command,是要执行的动作。他可以取下面三个值:

命令 描述
IPC_STAT 设置msqid_ds结构中的数据来反射与消息队列相关联的值。
IPC_SET 如果进程有权限这样做,这个命令会设置与msqid_ds数据结构中所提供的消息队列相关联的值。
IPC_RMID 删除消息队列。

msgrecv.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define MSG_KEY   3

struct my_msg_st
{
	long int my_msg_type;
	char some_text[BUFSIZ];
};

int main()
{
	int running = 1;
	int msgid;
	struct my_msg_st some_data;
	long int msg_to_receive = 0;
	msgid = msgget(MSG_KEY, 0666|IPC_CREAT);
	if (msgid == -1) {
		fprintf(stderr,"msgget failed with error: %d\n", errno);
		return -1;
	}
	while (running) {
		if (msgrcv(msgid, (void *)&some_data, BUFSIZ, msg_to_receive, 0) == -1) {
			fprintf(stderr, "msgrcv failed with errno: %d\n", errno);
			return -2;
		}
		printf("recvmsg: %s", some_data.some_text);
		if (strncmp(some_data.some_text, "end", 3) == 0)
			running = 0;
	}
	if (msgctl(msgid, IPC_RMID, 0) == -1) {
		fprintf(stderr, "msgctl(IPC_RMID) failed\n");
		return -3;
	}
	return 0;
}

msgsend.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define MAX_TEXT 512
#define MSG_KEY   3

struct my_msg_st
{
	long int my_msg_type;
	char some_text[MAX_TEXT];
};

int main()
{
	int running = 1;
	struct my_msg_st some_data;
	int msgid;
	char buffer[BUFSIZ];
	msgid = msgget(MSG_KEY, 0666|IPC_CREAT);
	if (msgid == -1) {
		fprintf(stderr,"msgget failed with errno: %d\n", errno);
		return -1;
	}
	while (running) {
		printf("Enter some text: ");
		fgets(buffer, BUFSIZ, stdin);
		printf("You wrote: %s", buffer);

		some_data.my_msg_type = 1;
		strcpy(some_data.some_text, buffer);
		if (msgsnd(msgid, (void *)&some_data, MAX_TEXT, 0) == -1) {
			fprintf(stderr, "msgsnd failed\n");
			return -2;
		}
		if (strncmp(some_data.some_text, "end", 3) == 0)
			running = 0;
	}
	if (msgctl(msgid, IPC_RMID, 0) == -1) {
		fprintf(stderr, "msgctl(IPC_RMID) failed\n");
		return -3;
	}
	return 0;
}

language, c

« 信号量内核源码 进程通信--pipe管道 »