牛骨文教育服务平台(让学习变的简单)
博文笔记

libevent异步IO读写操作

创建时间:2016-04-14 投稿人: 浏览次数:323

一、libevent简介

Libevent 是一个用C语言编写的、轻量级的开源高性能网络库,主要有以下几个亮点:事件驱动( event-driven),高性能;轻量级,专注于网络,不如 ACE 那么臃肿庞大;源代码相当精炼、易读;跨平台,支持 Windows、 Linux、 *BSD 和 Mac Os;支持多种 I/O 多路复用技术, epoll、 poll、 dev/poll、 select 和 kqueue 等;支持 I/O,定时器和信号等事件;注册事件优先级。

Chromium(Google)、Memcached、NTP、HTTPSQS等著名的开源程序都使用libevent库,足见libevent的稳定。


二、libevent主要功能列表

* 事件管理包括各种IO(socket)、定时器、信号等事件,也是libevent应用最广的模块;

* 缓存管理是指evbuffer功能;

* DNS是libevent提供的一个异步DNS查询功能;

* HTTP是libevent的一个轻量级http实现,包括服务器和客户端


三、libevent应用实现Client/Server的聊天功能

* 异步socket读写,libevent简单聊天服务,C/S结构 
* 包括流量统计功能


1. libevent-server.c

/*
* 异步socket读写,libevent简单聊天服务,C/S结构 
* 包括流量统计功能
*/

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

/* For inet_ntoa. */
#include <arpa/inet.h>

/* Required by event.h. */
#include <sys/time.h>

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <errno.h>
#include <err.h>

/* Easy sensible linked lists. */
#include "queue.h"

/* Libevent. */
#include <event2/bufferevent.h>
#include <event2/bufferevent_struct.h>  
#include <event2/bufferevent_compat.h>
#include <event2/event_compat.h>
#include <event2/event.h>
/* Port to listen on. */
#define SERVER_PORT 8080

/* Length of each buffer in the buffer queue. Also becomes the amount
* of data we try to read per call to read(2). */
#define BUFLEN 2048


#define PERTIME 5

/**
* 在以事件为基础的编程时,我们需要一个队列来存储要写入的数据,
* 直到libevent通知我们可以写入,才能够真正的写入数据。这个简单的buffer队列用
* queue.h中的TAILQ来写入。
*/
struct bufferq
{
	/* The buffer. */
	u_char *buf;

	/* The length of buf. */
	int len;

	/* 本次写buffer的开始位置偏移。 */
	int offset;

	/* 链表结构。 */
	TAILQ_ENTRY(bufferq) entries;
};

/**
* 指定的客户数据结构,也包含一个存储一列用户的指针。
*
* 在基于事件的程序中,通常需要为每个客户保存一些状态信息的对象。
*/
struct client
{
	/* 事件对象。我们需要两个事件对象,一个用于读事件一个用于写事件通知。 */
	struct event ev_read;
	struct event ev_write;

	/* 这是数据队列,它将被写入这个客户端。我们只能等到libevent告诉我们这个socket准备好写入时,
	* 才能真正写入。 */
	TAILQ_HEAD(, bufferq) writeq;
};

unsigned int _countSize = 0;

/**
* 设置socket为非阻塞模式。
*/
int setnonblock(int fd)
{
	int flags;

	flags = fcntl(fd, F_GETFL);
	if (flags < 0)
		return flags;
	flags |= O_NONBLOCK;
	if (fcntl(fd, F_SETFL, flags) < 0)
		return -1;

	return 0;
}

/**
* 当客户端socket可读时,libevent调用这个函数。
*/
void on_read(int fd, short ev, void *arg)
{
	struct client *client = (struct client *)arg;
	struct bufferq *bufferq;
	u_char *buf;
	int len;

//	printf("on_read begin.
");
	/* 因为我们需要在可写事接收到通知,所以我们需要分配读取缓冲区,
	* 并且将它放到客户数据对象的写入队列。 */
	buf = (u_char*)malloc(BUFLEN);
	if (buf == NULL)
		err(1, "malloc failed");

	len = read(fd, buf, BUFLEN);
	if (len == 0)
	{
		/* 客户端断开连接,在这里移除读事件并释放客户数据结构。 */
		printf("Client disconnected.
");
		close(fd);
		event_del(&client->ev_read);
		free(client);
		return;
	}
	else if (len < 0)
	{
		/* 一些其它的错误发生,在这里关闭socket、事件对象并释放客户数据结构。 */
		printf("Socket failure, disconnecting client: %s", strerror(errno));
		close(fd);
		event_del(&client->ev_read);
		free(client);
		return;
	}

	//	统计流量 
//	_countSize += sizeof(char)*len;
	_countSize += len;

	/* 我们不能仅仅将buffer写回,因为我们需要在可以写入时响应libevent的可写事件,
	* 将缓冲区放到客户数据结构中的写队列,并安排一个可写事件。 */
	bufferq = (struct bufferq *)calloc(1, sizeof(*bufferq));
	if (bufferq == NULL)
		err(1, "malloc faild");
	bufferq->buf = buf;
	bufferq->len = len;
	bufferq->offset = 0;
	TAILQ_INSERT_TAIL(&client->writeq, bufferq, entries);

	/* 由于现在我们拥有需要写回到客户端的数据,所以添加一个可写事件通知。 */
	event_add(&client->ev_write, NULL);
}

/**
* 当客户端socket准备好写入时,libevent调用这个函数。
*/
void on_write(int fd, short ev, void *arg)
{
	struct client *client = (struct client *)arg;
	struct bufferq *bufferq;
	int len;

//	printf("on_write begin.
");
	/* 将第一个元素移出写队列。我们可能不用再探测写队列是否是空,但是应该确认返回值不是NULL。 */
	bufferq = TAILQ_FIRST(&client->writeq);
	if (bufferq == NULL)
		return;

	/* 写buffer里的数据。buffer中的一部分可能已经在前面写入了,所以只写剩下的字节。 */
	len = bufferq->len - bufferq->offset;
	len = write(fd, bufferq->buf + bufferq->offset, bufferq->len - bufferq->offset);
	if (len == -1)
	{
		if (errno == EINTR || errno == EAGAIN)
		{
			/* 写操作被一个信号打断,或者我们不能写入数据,调整一下并返回。 */
			event_add(&client->ev_write, NULL);
			return;
		}
		else
		{
			/* 一些其它的socket错误发生,退出吧。 */
			err(1, "write");
		}
	}
	else if ((bufferq->offset + len) < bufferq->len)
	{
		/* 不是所有的数据都被写入了,更新写入的偏移并调整写入事件。 */
		bufferq->offset += len;
		event_add(&client->ev_write, NULL);
		return;
	}

	/* 数据已经完整的写入了,从写队列中移除这个buffer。 */
	TAILQ_REMOVE(&client->writeq, bufferq, entries);
	free(bufferq->buf);
	free(bufferq);
}

/**
* 当有一个连接请求准备被接受时,libevent将调用这个函数。
*/
void on_accept(int fd, short ev, void *arg)
{
	int client_fd;
	struct sockaddr_in client_addr;
	socklen_t client_len = sizeof(client_addr);
	struct client *client;


//	printf("on_accept begin.
");

	/* 接受这个新的连接请求。 */
	client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
	if (client_fd == -1)
	{
		warn("accept failed");
		return;
	}

	/* 设置客户端socket为非阻塞模式。 */
	if (setnonblock(client_fd) < 0)
		warn("failed to set client socket non-blocking");

	/* 我们已经接受了一个新的客户,分配一个客户数据结构对象来保存这个客户的状态。 */
	client = (struct client *)calloc(1, sizeof(*client));
	if (client == NULL)
		err(1, "malloc failed");

	/* 设置读事件,当客户端socket可读时,libevent将调用on_read()函数。
	* 我们也不断的构造可读事件,所以不用再每次读取时从新加入读事件了。 */
	event_set(&client->ev_read, client_fd, EV_READ | EV_PERSIST, on_read, client);

	/* 设置的事件没有激活,添加事件以便激活它。 */
	event_add(&client->ev_read, NULL);

	/* 创建写事件,但是在我们有数据可写之前,不要添加它。 */
	event_set(&client->ev_write, client_fd, EV_WRITE, on_write, client);

	/* 初始化客户端写队列。 */
	TAILQ_INIT(&client->writeq);
		
//	printf("Accepted connection from %s
", inet_ntoa(client_addr.sin_addr));
}

void on_timer(int sock, short even, void *arg)
{
	freopen("rate.log", "a", stdout);
	printf("communicate size: %d B/s
", _countSize / PERTIME);
	fclose(stdout);
	_countSize = 0;
	struct timeval tv;
	tv.tv_sec = PERTIME;
	tv.tv_usec = 0;
	// 重新添加定时事件(定时事件触发后默认自动删除)
	event_add((struct event*) arg, &tv);
}

int main(int argc	, char **argv)
{
	int listen_fd;
	struct sockaddr_in listen_addr;
	int reuseaddr_on = 1;

	
	/* 初始化 libevent. */
	event_init();
	printf("Init ok.
");

	/* 创建我们的监听socket。 This is largely boiler plate
	* code that I"ll abstract away in the future. */
	listen_fd = socket(AF_INET, SOCK_STREAM, 0);
	if (listen_fd < 0)
		err(1, "listen failed");
	if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on,
		sizeof(reuseaddr_on)) == -1)
		err(1, "setsockopt failed");

	memset(&listen_addr, 0, sizeof(listen_addr));
	listen_addr.sin_family = AF_INET;
	listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
	listen_addr.sin_port = htons(SERVER_PORT);

	if (bind(listen_fd, (struct sockaddr *)&listen_addr,
		sizeof(listen_addr)) < 0)
		err(1, "bind failed");
	if (listen(listen_fd, 5) < 0)
		err(1, "listen failed");

	/* 设置socket为非阻塞模式,这个在使用libevent编程中时必须的。 */
	if (setnonblock(listen_fd) < 0)
		err(1, "failed to set server socket to non-blocking");

	printf("set libevent ok.
");
	/* socket的接受事件对象。 */
	struct event ev_accept;
	/* 我们现在有了一个监听的socket,我们创建一个读事件,以便有客户连接请求时能够接收到通知。 */
	event_set(&ev_accept, listen_fd, EV_READ | EV_PERSIST, on_accept, (void *)&ev_accept);
	event_add(&ev_accept, NULL);


	/* 定时器 */
	struct event evTime;

	evtimer_set(&evTime, on_timer, &evTime);

	struct timeval tv;
	tv.tv_sec = PERTIME;
	tv.tv_usec = 0;
	event_add(&evTime, &tv);

	/* 启动 libevent 时间循环. */
	event_dispatch();

	printf("set libevent over.
");
	return 0;
}


2. libevent-client.c

/******* 客户端程序 ************/
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <time.h>

#include <event2/event.h>
#include <event2/event_compat.h>
#include <event2/bufferevent.h>
#include <event2/bufferevent_struct.h>
#include <event2/bufferevent_compat.h>

#define MAXBUFF 2048

int main(int argc, char *argv[])
{
	printf("tip usage: %s [host-ip] [port]]
", argv[0]);
	printf("default: %s 127.0.0.1 8080
", argv[0]);

	char hostname[20] = { "127.0.0.1" };
	int portnum = 8080;
	switch (argc)
	{
	case 3:
		sscanf(argv[2], "%d", &portnum);
	case 2:
		strcpy(hostname, argv[1]);
	default:
		break;
	}

	clock_t startT = clock();

    int sockfd;
		char buffer[MAXBUFF] = { 0 };
        struct sockaddr_in server_addr;
        struct hostent *host;
        int portnumber,nbytes;

        if((host=gethostbyname(hostname))==NULL)
        {
                fprintf(stderr,"Gethostname error
");
                exit(1);
        }

		portnumber = portnum;
//        if((portnumber=atoi("8080"))<0)
//        {
//                fprintf(stderr,"Usage:%s hostname portnumbera
",argv[0]);
//                exit(1);
//        }

        /* 客户程序开始建立 sockfd描述符  */
        if((sockfd=socket(AF_INET,SOCK_STREAM,0))==-1)
        {
                fprintf(stderr,"Socket Error:%sa
",strerror(errno));
                exit(1);
        }

        /* 客户程序填充服务端的资料       */
        bzero(&server_addr,sizeof(server_addr));
        server_addr.sin_family=AF_INET;
        server_addr.sin_port=htons(portnumber);
        server_addr.sin_addr=*((struct in_addr *)host->h_addr);

        /* 客户程序发起连接请求         */
        if(connect(sockfd,(struct sockaddr *)(&server_addr),sizeof(struct sockaddr))==-1)
        {
                fprintf(stderr,"Connect Error:%sa
",strerror(errno));
                exit(1);
        }


		 unsigned int count = 0;
		 //	client request server
		 {
				char msg[64] = "client request.. ";
				sprintf(msg+strlen(msg), "#%d", ++count);
				
				if (send(sockfd, msg, strlen(msg), 0) == -1)
				{
					printf("the net has error occured..");
					exit(0);
				}
				//nbytes = recv(sockfd, buffer, MAXBUFF,0);
				nbytes = read(sockfd, buffer, MAXBUFF);
				if (nbytes == -1)
				{
					fprintf(stderr, "read error, reason: %s", strerror(errno));
					exit(0);
				}

				buffer[nbytes] = 0;
				printf("recv server: %s
", buffer);
				memset(buffer, 0, sizeof(buffer));
		
		}
     /* 结束通讯     */
     close(sockfd);

	 clock_t endT = clock();
	 printf("recv time: %.0lfms
", (double)(endT - startT) / CLOCKS_PER_SEC * 1000);
   exit(0);
}

编译服务端:gcc -o libevent-server libevent-server.c -levent

编译客户端:gcc -o libevent-client.c libevent-client.c -levent


运行:

[~] ./libevent-server

[~] ./libevent-client 127.0.0.1 8080


声明:该文观点仅代表作者本人,牛骨文系教育信息发布平台,牛骨文仅提供信息存储空间服务。