2004 г
Реализация многопотокового сервера RPC и
асинхронного сервера TCP для ОС LINUX
Б.А. Державец
1. Одним из основных инструментов при разработке приложений,
работающих с протоколом RPC является утилита rpcgen.
В качестве примера рассмотрим файл шаблона square.x , такой
же как и в [3], стр. 416 :
struct square_in {
long arg1;
};
struct square_out {
long res1;
};
program SQUARE_PROG {
version SQUARE_VERS {
square_out SQUAREPROC(square_in) = 1;
} = 2 ;
} = 0x31230000;
На основание файла square.x утилита rpcgen
автоматически генерирует :
- Header файл - square.h
- Заглушку сервера - square_svc.c
- Заглушку клиента - square_clnt.c
- Файл протокола XDR - square_xdr.c
Создание файлов процедур на стороне сервера ServerSideProc.c и
на стороне клиента ClientSideProc.c должно быть проделано разработчиком
в соответствии с требованиями протокола RPC.
В среде ОС LINUX утилита rpcgen поддерживает только ключ -M , позволяющий сгенерировать так называемый "thread safe" код, но не ключ
-A , который позволяет в комбинации с ключом -М получать автоматически
многопотоковый код для заглушки на стороне сервера , например в SunOS
Команда : rpcgen -a -M square.x порождает в среде LINUX файл square_svc.c , код которого является строго последовательным.
Таким образом, возникает необходимость изменить самостоятельно код
заглушки сервера square_svc.c для того , чтобы сделать его многопотоковым
используя при этом Posix Threads API, поддерживаемый ОС LINUX.
С этой целью внесем требуемые изменения в код процедуры square_prog_2
и поместим этот код в новую процедуру с именем serv_request , являющейся
процедурой , выполняемой потоком , запускаемым из новой версии процедуры square_prog_2 , передающей процедуре потока ссылку на
структуру , в которую упаковываются параметры , передаваемые в поток.
Соответственно, распакуем эти параметры в процедуре serv_request перед
вызовом стандартной процедуры стороны сервера.
Ниже следует код serv_request , а затем новый код square_prog_2.
void *
serv_request(void *data)
{
struct thr_data
{
struct svc_req *rqstp;
SVCXPRT *transp;
} *ptr_data;
{
union {
square_in squareproc_2_arg;
} argument;
union {
square_out squareproc_2_res;
} result;
bool_t retval;
xdrproc_t _xdr_argument, _xdr_result;
bool_t (*local)(char *, void *,
struct svc_req *);
/*
Распаковка данных, переданных в процедуру потока
*/
ptr_data = (struct thr_data *)data;
struct svc_req *rqstp = ptr_data->rqstp;
register SVCXPRT *transp = ptr_data->transp;
switch (rqstp->rq_proc) {
case NULLPROC:
(void) svc_sendreply (transp,
(xdrproc_t) xdr_void,
(char *)NULL);
return;
case SQUAREPROC:
_xdr_argument = (xdrproc_t) xdr_square_in;
_xdr_result = (xdrproc_t) xdr_square_out;
local = (bool_t (*) (char *, void *,
struct svc_req *))
squareproc_2_svc;
break;
default:
svcerr_noproc (transp);
return;
}
memset ((char *)&argument, 0,
sizeof (argument));
if (!svc_getargs (transp,
(xdrproc_t) _xdr_argument,
(caddr_t) &argument)) {
svcerr_decode (transp);
return;
}
/*
Стандартный вызов функции сервера.
Данные уже приведены к стандарту.
*/
retval = (bool_t) (*local)((char *)&argument,
(void *)&result,
rqstp);
if (retval > 0 &&
!svc_sendreply(transp,
(xdrproc_t) _xdr_result,
(char *)&result))
{
svcerr_systemerr (transp);
}
if (!svc_freeargs (transp,
(xdrproc_t) _xdr_argument,
(caddr_t) &argument)) {
fprintf (stderr, "%s",
"unable to free arguments");
exit (1);
}
if (!square_prog_2_freeresult
(transp, _xdr_result,
(caddr_t) &result))
fprintf (stderr, "%s",
"unable to free results");
return;
}
}
/*
Принципиально измененный код square_prog_2 ,
стартующей теперь новый поток для каждого
инициированного клиентом вызова процедуры
на удаленном сервере
*/
static void
square_prog_2(struct svc_req *rqstp,
register SVCXPRT *transp)
{
struct data_str
{
struct svc_req *rqstp;
SVCXPRT *transp;
} *data_ptr=(struct data_str*)
malloc(sizeof(struct data_str));
{
/*
Упаковка данных в структуру для передачи
ссылки на нее, как параметра запускаемому
потоку
*/
data_ptr->rqstp = rqstp;
data_ptr->transp = transp;
pthread_attr_setdetachstate
(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&p_thread,&attr,serv_request,
(void *)data_ptr);
}
}
Код блока main( ) заглушки square_svc.c оставим без изменений.
Скомпилируем многопотоковый сервер RPC командой :
$ gcc -o ServerSquare ServerSideProc.c square_svc.c \
square_xdr.c -lpthread -lnsl
Контрольное тестирование можно проделать, следуя [3] , глава "Sun RPC".
2. При разработке многопотоковых серверов TCP в среде ОС UNIX
традиционно используют 2 подхода:
a) Ведущий поток, выполняющий в бесконечном цикле вызов accept( ),
порождает ведомый поток для каждого нового клиента, передавая
ему в качестве параметра дескриптор сокета, возвращаемый соответствующим вызовом accept ( )
b) Ведущий поток получает дескриптор пассивного сокета и затем создает
пул ведомых потоков , передавая каждому из них дескриптор пассивного сокета в качестве параметра. В свою очередь каждый из
ведомых потоков выполняет вызов accept( ) и всю последующую
работу по обмену данными с клиентом.
В случае второго подхода обычно требуется блокировка мьютекса перед
вызовом accept( ) для обеспечения последовательного выполнения потоками
критической части кода ( вызов accept( )) (см.[4], глава 27).
В случае ОС LINUX мы предпочитаем второй подход, поскольку LINUX
обладает свойством эффективно распараллеливать вызов accept( ), даже
без необходимости блокировки мьютекса.
Код процедуры , выполняемой каждым из ведомых потоков сервера и
называемой , в дальнейшем handle_request , использует вызов select ( ),
позволяющий определить множество активных дескрипторов, требующих
обработки в данный момент времени. Исходный код для такого рода задач известен и может быть взят из [1] или [2]. Однопотоковый сервер, работающий на основе вызова select( ) называют асинхронным , поскольку
один и тот же процесс обрабатывает и вызовы accept( ),как ведущий поток,
и обмен данными через дескрипторы сокетов, возвращаемых accept( ), как
ведомые потоки. Последовательность переключений определяется вызовом
select(maxFdNum+1,&readFdSet,&writeFdSet,NULL,NULL)
и макросами :
FD_ISSET((*currentConnPtr)->dataSocket,&readFdSet),
FD_ISSET((*currentConnPtr)->dataSocket,&writeFdSet)
проверяющими принадлежность дескриптора (*currentConnPtr)->dataSocket
множеству readFdSet или writeFdSet .FD_ISSET(listenSocket,&readFdSet) определяет необходимость обработки вызова accept( ).
(см.
http://dlenev.nm.ru)
Определенный интерес представляет код блока main ( ), предлагаемой версии сервера, запускающий ведомые потоки с предварительной установкой
размера стека:
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define NUM_THREADS 512
pthread_mutex_t request_mutex =
PTHREAD_MUTEX_INITIALIZER;
/*
Описание поцедуры ведущего потока , которая
возвращает дескриптор пассивного сокета,
привязанного к адресу сервера.
*/
int getServerSocket(unsigned short int port)
{
int listenSocket;
struct sockaddr_in listenSockaddr;
if((listenSocket=socket(PF_INET,SOCK_STREAM,0))<0)
die("socket()",errno);
memset(&listenSockaddr, 0, sizeof(listenSockaddr));
listenSockaddr.sin_family = PF_INET;
listenSockaddr.sin_port = htons(port);
listenSockaddr.sin_addr.s_addr = INADDR_ANY;
if(bind(listenSocket,
(struct sockaddr*)&listenSockaddr,
sizeof(listenSockaddr)) < 0)
die("bind()",errno);
if(listen(listenSocket,5)<0)
die("listen()",errno);
return listenSocket;
}
int main(int argc,char *argv[])
{
int k;
int descSock;
char *service="1500";
switch(argc) {
case 1:
break;
case 2:
service = argv[1];
break;
default:
printf ("Usage: ./ServerBNTH [port]\n");
exit(1);
}
size_t stacksize;
pthread_t p_thread[NUM_THREADS];
/*
Установка размера стека для ведомых потоков
*/
pthread_attr_t attr;
pthread_attr_init(&attr);
stacksize = 500000;
pthread_attr_setstacksize (&attr, stacksize);
pthread_attr_getstacksize (&attr, &stacksize);
/*
Получение значения дескриптора пассивного сокета
*/
descSock = getServerSocket(atoi(service));
/*
Запуск ведомых потоков
*/
for(k=0; k<NUM_THREADS; k++) {
pthread_create(&p_thread[k],&attr,
handle_request,(void*)descSock);
printf("Thread %d started \n",k);
}
pthread_attr_destroy(&attr);
for(k=0;k<NUM_THREADS;k++) {
pthread_join(p_thread[k], NULL);
printf("Completed join with thread %d\n",k);
}
}
Специфика данной реализации состоит в распараллеливании асинхронного
однопотокового сервера, несмотря на потенциальную возможность кол-
лизий, порождаемых вызовом select( ) в параллельно выполняющихся
ведомых потоках (см [4],глава 27)
Литература
1.Дуглас Э. Крамер,Дэвид Л. Стивенс. Сети TCP/IP .Разработка приложений
типа клиент/сервер для LINUX/POSIX . Том 3 .Издательский дом
"Вильямс",2002.
2. http://dlenev.nm.ru/
3.Стивенс У. UNIX: Взаимодействие процессов. Из-во "Питер",2002.
4.Стивенс У. UNIX Разработка сетевых приложений. Из-во "Питер",2004.