Главная | Новости | Статьи | Книги | Ссылки |
много серверов хороших и разных.
Олег И. Цилюрик: olej@front.ru
Редакция 1.04 от 23.06.2003г.
Чаще всего, если это не приходится делать очень часть (т.е. не является основной спецификой работы), при необходимости написания TCP/IP сервера используются одна из двух <классических> технологий: последовательный сервер, или параллельный сервер на основе fork() (Windows-программисты в этом случае пишут сервер на основе thread). Хотя реально можно предложить гораздо больше принципиально различных серверов, которые будут существенно отличаться своей сложностью, временем реакции на запрос клиента и т.д. Ниже описано несколько из таких способов с результатами их тестирования. Программы делались и испытывались в OS QNX 6.2.1, но могут (за исключением специально оговоренного случая) практически без изменений использоваться в любой UNIX-like OS, а за некоторым исключением - и в Windows.
1. Постановка задачи: мы напишем специальный тестовый TCP/IP клиент, который посылает требуемое число раз запрос к серверу (ретранслятору), принимает от него ответ, и тут же разрывает соединение. Серия запросов от клиента делается для усреднения результата и для того (как будет видно далее), чтобы исключить (или учесть) эффекты кэширования памяти. Клиент измеряет время (точнее - число циклов процессора) между отправкой запроса серверу и приходом ответа от него. Сервера в этом анализе являются простыми ретрансляторами. Все показанные программы - предложены в упрощённых вариантах: не везде сделана полная обработка ошибочных ситуаций (что, вообще-то говоря, крайне необходимо), и сознательно не включена обработка сигнала SIGCHLD, которая должна препятствовать появлению <зомби> процессов. Все приводимые коды программ - работающие и апробированные: весь результирующий вывод скопирован непосредственно с консоли задачи. Весь приводимый программный код транслировался компилятором gcc-2-95 в нотации языка C++ (хотя специфические особенности С++, за исключением потокового ввода-вывода С++ и не использованы).
2. Клиент. Собственно клиент размещён в файле cli.cpp, но он, совместно с сервером, использует общие файлы common.h & common.cpp, все эти файлы с краткими комментариями приведены ниже:
common.h - в этом файле определены различные порты TCP, по которым клиент будет связываться с различными модификациями серверов. Кроме того, здесь определены:
- функция завершения по критической ошибке;
- единая процедура ретрансляции через сокет, которую используют все сервера(для единообразия и корректности сравнений);
- функция подготовки прослушивающего сокета TCP/IP (для того, чтобы устранить этот, достаточно объёмный, код из кода серверов, рассматриваемых ниже).
#if !defined( __COMMON_H ) #define __COMMON_H #include <stdlib.h> #include <stdio.h> #include <errno.h> #include <unistd.h> #include <string.h> #include <iostream.h> #include <netdb.h> const int PORT = 9000, /* программа: */ SINGLE_PORT = PORT, /* ech0 */ FORK_PORT = PORT + 1, /* ech1 */ FORK_LARGE_PORT = PORT + 2, /* ech10 */ PREFORK_PORT = PORT + 3, /* ech11 */ INET_PORT = PORT + 4, /* ech3 */ THREAD_PORT = PORT + 5, /* ech2 */ THREAD_POOL_PORT = PORT + 6, /* ech21 */ PRETHREAD_PORT = PORT + 7; /* ech22 */ const int MAXLINE = 40; // критическая ошибка ... void errx( const char *msg, int err = EOK ); // ретранслятор тестовых пакетов TCP void retrans( int sc ); // создание и подготовка прослушивающего сокета int getsocket( in_port_t ); #endif
common.cpp - реализационная часть:
#include "common.h" // ошибка ... void errx( const char *msg, int err = EOK ) { perror( msg ); if( err != EOK ) errno = err; exit( EXIT_FAILURE ); }; // ретранслятор тестовых пакетов TCP static char data[ MAXLINE ]; void retrans( int sc ) { int rc = read( sc, data, MAXLINE ); if( rc > 0 ) { rc = write( sc, data, strlen( data ) + 1 ); if ( rc < 0 ) perror( "write data failed" ); } else if( rc < 0 ) { perror( "read data failed" ); return; } else if( rc == 0 ) { cout << "client closed connection" << endl; return; }; return; }; // создание и подготовка прослушивающего сокета struct sockaddr_in addr; int getsocket( in_port_t p ) { int rc = 1, ls; if( ( ls = socket( AF_INET, SOCK_STREAM, 0 ) ) = -1 ) errx( "create stream socket failed" ); if( setsockopt( ls, SOL_SOCKET, SO_REUSEADDR, &rc, sizeof( rc ) ) != 0 ) errx( "set socket option failed" ); memset( &addr, 0, sizeof( addr ) ); addr.sin_len = sizeof( addr ); addr.sin_family = AF_INET; addr.sin_port = htons( p ); addr.sin_addr.s_addr = htonl( INADDR_ANY ); if( bind( ls, (struct sockaddr*)&addr, sizeof( sockaddr ) ) != 0 ) errx( "bind socket address failed" ); if( listen( ls, 25 ) != 0 ) errx( "put socket in listen state failed" ); return ls; };
cli.cpp - код клиента:
#include <inttypes.h> #include <sys/neutrino.h> #include <sys/syspage.h> #include <sys/procfs.h> #include "common.h" // установка параметров клиентов: порт и число повторений static void setkey( int argc, char *argv[], in_port_t* port, int* num ) { int opt, val; while ( ( opt = getopt( argc, argv, "p:n:") ) != -1 ) { switch( opt ) { case 'p' : if( sscanf( optarg, "%i", &val ) != 1 ) errx( "parse command line failed", EINVAL ); *port = (in_port_t)val; break; case 'n' : if( ( sscanf( optarg, "%i", &val ) != 1 ) || ( val <= 0 ) ) errx( "parse command line failed", EINVAL ); *num = val; break; default : errx( "parse command line failed", EINVAL ); break; }; }; }; // клиент - источник потока тестовых пакетов TCP int main( int argc, char *argv[] ) { in_port_t listen_port = SINGLE_PORT; int num = 10; setkey( argc, argv, &listen_port, &num ); char data[ MAXLINE ], echo[ MAXLINE ]; uint64_t cps = cps = SYSPAGE_ENTRY( qtime )->cycles_per_sec; cout << "TCP port = " << listen_port << ", number of echoes = " << num << endl << "time of reply - Cycles [usec.] :" << endl; for( int i = 0; i < num; i++ ) { int rc, ls; if( ( ls = socket( AF_INET, SOCK_STREAM, 0 ) ) < 0 ) errx( "create stream socket failed" ); struct sockaddr_in addr; memset( &addr, 0, sizeof( addr ) ); addr.sin_len = sizeof( addr ); addr.sin_family = AF_INET; addr.sin_port = htons( listen_port ); inet_aton( "localhost", &addr.sin_addr ); if( ( rc = connect( ls, (struct sockaddr*)&addr, sizeof( sockaddr ) ) ) < 0 ) errx( "connect failed" ); sprintf( data, "%d", rand() ); uint64_t cycle = ClockCycles(); if( ( rc = write( ls, data, strlen( data ) + 1 ) ) <= 0 ) errx( "write data failed" ); rc = read( ls, echo, MAXLINE ); cycle = ClockCycles() - cycle; if( rc < 0 ) errx( "read data failed" ); if( rc == 0 ) errx( "server closed connection" ); if( strcmp( data, echo ) != 0 ) { cout << "wrong data" << endl; break; }; cout << cycle << "[" << cycle * 1000000 / cps << "]"; if( i % 5 == 4 ) cout << endl; else cout << '\t'; cout << flush; close( ls ); delay( 100 ); }; if( num % 5 != 0 ) cout << endl; exit( EXIT_SUCCESS ); };
После запуска клиент анализирует ключи запуска. Предусмотрены значения: <-p> значение порта подключения (по умолчанию - последовательный сервер, порт 9000), и <-n> - число запросов к серверу в серии (по умолчанию - 10). Каждый запрос представляет собой случайное число, генерируемое клиентом, в символьной форме. Ретранслированный сервером ответ сверяется с запросом для дополнительного контроля. Клиент подключается к серверу по петлевому интерфейсу 127.0.0.1, что вполне достаточно для сравнительного анализа. Далее мы рассмотрим его работу с различными серверами.
3. Последовательный сервер ретранслятор. Такой сервер нас интересует только как эталон для сравнения: он имеет минимальное время реакции, т.к. не затрачивается время на порождение каких-либо механизмов параллелизма. С другой стороны, такой сервер, зачастую, просто неинтересен, т.к. не позволяет обслуживать других клиентов до завершения текущего обслуживания.
Все сервера имеют крайне простой код, потому что большая часть <рутины> снесена в файлы common (h & cpp). Вот код 1-го используемого нами - последовательного сервера (файл ech0.cpp):
#include "common.h" int main( int argc, char *argv[] ) { int ls = getsocket( SINGLE_PORT ), rs; while( true ) { if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" ); retrans( rs ); close( rs ); cout << "*" << flush; }; exit( EXIT_SUCCESS ); };
Вот результаты выполнения клиента с этим сервером (указано число машинных циклов ожидания, а в скобках - для справки - время в микросекундах для процессора Celeron 533Mhz):
/root/ForkThread # cli -p9000 -n20 TCP port = 9000, number of echoes = 20 time of reply - Cycles [usec.] : 868325[1624] 135364[253] 135287[253] 133438[249] 133057[248] 136061[254] 133554[249] 133887[250] 138776[259] 131237[245] 134748[252] 133823[250] 135650[253] 130583[244] 134562[251] 132601[248] 134622[251] 134516[251] 132055[246] 134139[250]
Отчётливо виден (1-й запрос) эффект, который мы отнесли к эффектам кэширования памяти программ - различие времени выполнения первого и последующих запросов. В каталоге проекта есть ещё один (тестовый) вариант последовательного сервера, код которого выглядит несколько иначе:
#include <sys/neutrino.h> #include "common.h" int main( int argc, char *argv[] ) { int ls = getsocket( SINGLE_PORT ), rs, i = 0; while( true ) { if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" ); uint64_t cycle = ClockCycles(); retrans( rs ); cycle = ClockCycles() - cycle; close( rs ); cout << cycle; if( i++ % 5 == 4 ) cout << endl; else cout << '\t'; cout << flush; }; exit( EXIT_SUCCESS ); };
Он отличается тем, что <хронометрирует> (для справки) оценочно число циклов на ретрансляцию (затрачиваемые внутри сервера). Все типы серверов используют общую процедуру retrans() и единые затраты <чистого времени>. Приведём для справки эти оценки (только машинные циклы):
/root/ForkThread # ech0_ 757808 60862 60085 60444 60197 61111 60565 60154 59121 59984
Видно, что это время составляет около 50% времени, наблюдаемого со стороны клиента, которое включает в себя время реакции на accept() (со стороны сервера), 2-кратные затраты write() + read() (со стороны как клиента, так и сервера), время передачи буферов по петлевому интерфейсу и т.п.
4. <Классический> параллельный сервер. Ниже приведен код такого <классического> сервера (в отличающейся части), в котором обслуживающий процесс порождается fork() после разблокирования на accept()(файл ech1.cpp):
#include "common.h" int main( int argc, char *argv[] ) { int ls = getsocket( FORK_PORT ), rs; while( true ) { if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" ); pid_t pid = fork(); if( pid < 0 ) errx( "fork error" ); if( pid == 0 ) { close( ls ); retrans( rs ); close( rs ); cout << "*" << flush; exit( EXIT_SUCCESS ); } else close( rs ); }; exit( EXIT_SUCCESS ); };
После выхода из accept() (получение запроса connect() от клиента) - порождается отдельный обслуживающий процесс, который тут же закрывает свою копию прослушивающего сокета, производит ретрансляцию через соединённый сокет, завершает соединение и завершается сам. Родительский же процесс закрывает свою копию соединённого сокета и продолжает прослушивание канала. Вот результаты выполнения такого сервера:
/root/ForkThread # cli -p9001 -n20 TCP port = 9001, number of echoes = 20 time of reply - Cycles [usec.] : 2219652[4151] 1467470[2744] 1470056[2749] 1466860[2743] 1469294[2748] 1466875[2743] 1467612[2745] 1489083[2785] 1475620[2759] 1665398[3114] 1472091[2753] 1471635[2752] 1481768[2771] 1462214[2734] 1467229[2744] 1468731[2747] 1466483[2742] 1465499[2741] 1461780[2734] 1649821[3085]
Да . . . время реакции больше чем на порядок превышает простой последовательный сервер. Видно заметно меньше (относительно) выраженный эффект кэширования - вновь создаваемое адресное пространство процесса повторно не используется, однако некоторое влияние кэширования сказывается (в программе на стороне клиента?). Добавим в код сервера 1 строчку - перед точкой main (файл ech10.cpp - и изменён порт):
static long MEM[ 2500000 ]; /root/ForkThread # cli -p9002 TCP port = 9002, number of echoes = 10 time of reply - Cycles [usec.] : 67061908[125432] 64674322[120966] 64126835[119942] 63071907[117969] 64185096[120051] 65478368[122470] 64495464[120632] 64533852[120703] 63831652[119390] 64407915[120468]
Строки вывода перенесены мною, потому, что он уже не помещаются в формат страницы: время реакции увеличилось почти в 50 раз, превышает время реакции простейшего последовательного сервера уже почти на 3 порядка (500 раз, или 1000 раз по <чистому> времени обслуживания), и составляет уже 0.12 секунды на каждый запрос. Что произошло? При порождении нового процесса по fork() (можно считать, что здесь затраты не столь большие - из предыдущей таблицы: порядка 1.5 млн. циклов) - OS обязана перекопировать образ задачи (к которой мы добавили ~20Mb) из адресного пространства одного процесса, а пространство другого. И не посредством memcpy(), а запросами к ядру системы, потому как копирование идёт между различными защищёнными образами!
Какие предварительные итоги можно сделать из рассматриваемых результатов? Во-первых, то, что OS QNX определённо не использует технику
5. Параллельный сервер с предварительным созданием копий. Так что же получается: для серверов, работающих на высоко интенсивных потоках запросов, с традиционным fork-методом всё так плохо? Отнюдь! Нужно только поменять fork & accept местами - создать заранее некоторый пул обслуживающих процессов, каждый из которых до прихода клиентского запроса будет заблокирован на accept (кстати - accept на одном и том же прослушиваемом сокете). А после отработки клиентского запроса заблаговременно создать новый обслуживающий процесс. Эта техника известна как <предварительный fork> или pre-fork. Меняем текст сервера (файл ech11.cpp):
#include "common.h" const int NUMPROC = 3; int main( int argc, char *argv[] ) { int ls = getsocket( PREFORK_PORT ), rs; for( int i = 0; i < NUMPROC; i++ ) { if( fork() == 0 ) { int rs; while( true ) { if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" ); retrans( rs ); close( rs ); cout << i << flush; delay( 250 ); }; }; }; for( int i = 0; i < NUMPROC; i++ ) waitpid( 0, NULL, 0 ); exit( EXIT_SUCCESS ); };
При написании этого текста я несколько <схитрил> и упростил в сравнении с предложенной абзацем выше моделью. Здесь 3 обслуживающих процесса сделаны циклическими и не завершаются по окончанию обслуживания, а снова блокируются на accept, но для наблюдения эффектов этого вполне достаточно (последняя строка нужна вообще только для блокировки родительского процесса, и <сохранения> управляющего терминала - для возможности прекращения всей группы по ^C):
# pidin ... 6901868 1 ./ech11 10r REPLY 94228 6901869 1 ./ech11 10r REPLY 94228 6901870 1 ./ech11 10r REPLY 94228 /root/ForkThread # cli -p9003 TCP port = 9003, number of echoes = 10 time of reply - Cycles [usec.] : 854276[1597] 138356[258] 135665[253] 131656[246] 136653[255] 132532[247] 133583[249] 134639[251] 136363[255] 131482[245]
Время реакции - практически равно последовательному серверу, чего мы и добивались. В этой программе добавлен вывод идентификатора (i) обрабатывающего процесса (предыдущие сервера выводили только символ <*> для идентификации факта обработки запроса). Для этого добавлена и задержка <пере-активизации> процесса delay(250) - больше 2-х периодов запросов клиентов, чтоб заставить обрабатывающие процессы чередоваться. Вот возможный вид протокола сервера:
/root/ForkThread # 2012012012201201201220120120122012012012
Хорошо видно нарушение периодичности последовательности идентификационных номеров процессов: после периода простоя всегда обслуживание осуществляется процессом с индексом 2 (максимальным) - при множественном блокировании на acept() первым разблокируется процесс, заблокировавшийся последним (!?).
В принципе, не так и сложно в такой схеме сделать и динамический пул процессов, как будет показано ниже для потоков - с той лишь некоторой сложностью, что здесь каждый процесс выполняется в своём закрытом адресном пространстве, и для их взаимной синхронизации придётся использовать что-то из механизмов IPC.
6. Прежде, чем переходить к потоковым (thread) реализациям, рассмотрим ещё один fork-вариант: использование суперсервера inetd. При этом весь сервис по запуску процессов-копий нашего приложения, и перенаправлению его стандартных потоков ввода-вывода в сокет - возьмёт на себя inetd. Вот полный текст ретранслирующего сервера для этого случая (файл ech3.cpp):
#include <stdio.h> #include "common.h" static char data[ MAXLINE ]; void main( void ) { write( STDOUT_FILENO, data, read( STDIN_FILENO, data, MAXLINE ) ); };
Просто? Мне кажется, что - очень. Теперь настроим на наше приложение и запустим inetd
- Дописываем в конфигурационный файл /etc/services строку, определяющую порт, через который будет вызываться приложение:
ech3 9004/tcp
- В конфигурационный файл файл /etc/inetd.conf добавляем строку, которая определяет режим обслуживания и конкретные параметры вызываемого приложения:
ech3 stream tcp nowait root /root/ForkThread/ech3 ech3
- Запускаем inetd:
/root/ForkThread # inetd &
При заполнении строк концигурационных файлов нужна особая тщательность, если заголовки сервисов (ech3) в файлах не будут совпадать, то вы получите просто ошибку связи:
/root/ForkThread # cli -p9004 TCP port = 9004, number of echoes = 10 time of reply - Cycles [usec.] : connect failed: Connection refused
Проверить, что inetd настроен на прослушивание нашего порта можно так:
/etc # netstat -a Active Internet connections (including servers) Proto Recv-Q Send-Q Local Address Foreign Address State ... tcp 0 0 *.ech3 *.* LISTEN
Заставить inetd перечитать свои конфигурационные файлы после каждой правки /etc/services или /etc/inetd.conf вы можете, послав ему сигнал SGHUP, например:
/etc # pidin ... 10231922 1 usr/sbin/inetd 10r SIGWAITINFO /etc # kill -SIGHUP 10231922
Если ошибка допущена в полном имени программы сервера (поля 6-7 строки inetd.conf), то мы тоже получим не сразу объяснимый результат:
/root/ForkThread # cli -p9004 TCP port = 9004, number of echoes = 10 time of reply - Cycles [usec.] : server closed connection
... и, наконец, если всё в настройке inetd правильно, то получим нечто похожее:
/root/ForkThread # cli -p9004 TCP port = 9004, number of echoes = 10 time of reply - Cycles [usec.] : 16442468[30753] 14169659[26502] 14354292[26848] 14160723[26486] 14187182[26535] 14145131[26457] 14411884[26955] 14761467[27609] 14207573[26573] 14491483[27104]
Отметим, что время реакции в несколько раз (до 10-ти) выше прямой реализации с fork (inetd ведь также <скрыто> делает fork), но зато какая простота и трудоёмкость! Характерно почти полное отсутствие эффектов кэширования. Для серверов, обслуживающих <неплотный> поток запросов - это, пожалуй, оптимальное решение (кстати, большинство <штатных> сетевых сервисов UNIX выполняется именно по такой схеме).
7. Сервер, использующий pthread_create по запросу обслуживания клиента (файл ech2.cpp):
#include <pthread.h> #include "common.h" void* echo( void* ps ) { int sc = *(int*)ps; sched_yield(); retrans( sc ); close( sc ); cout << "*" << flush; return NULL; } int main( int argc, char *argv[] ) { int ls = getsocket( THREAD_PORT ), rs; while( true ) { if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" ); if( pthread_create( NULL, NULL, &echo, &rs ) != EOK ) errx( "thread create error" ); sched_yield(); }; exit( EXIT_SUCCESS ); };
Минимальные комментарии: 2 вызова sched_yield() (в вызывающем потоке, и, позже, в функции обслуживания созданного потока) - предназначены для гарантии копирования созданным потоком переданного дескриптора сокета до его повторного переопределения в цикле вызывающего потока. Результаты выполнения программы:
/root/ForkThread # cli -p9005 TCP port = 9005, number of echoes = 10 time of reply - Cycles [usec.] : 2493948[4664] 266123[497] 269490[504] 279049[521] 267775[500] 266880[499] 288175[539] 268589[502] 267990[501] 267003[499]
Это только в 2 раза (в 3, если оценивать по <чистому> времени) хуже простого последовательного сервера. Чрезвычайно сильно выражен эффект кэширования - вся обработка последовательности запросов производится на едином (многократно используемом) пространстве адресов.
8. Сервер с предварительным созданием потоков. Поступим по аналогии с pre-fork, и создадим фиксированный пул потоков предварительно (pre-thread, файл ech22.cpp):
#include <pthread.h> #include "common.h" static int ntr = 3; /*число thread в пуле*/ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t condvar = PTHREAD_COND_INITIALIZER; void* echo( void* ps ) { int sc = *(int*)ps, rs; sched_yield(); if( ( rs = accept( sc, NULL, NULL ) ) < 0 ) errx( "accept error" ); retrans( rs ); close( rs ); pthread_mutex_lock( &mutex ); ntr++; pthread_cond_signal( &condvar ); pthread_mutex_unlock( &mutex ); cout << pthread_self() << flush; delay( 250 ); return NULL; } int main( int argc, char *argv[] ) { int ls = getsocket( PRETHREAD_PORT ), rs; while( true ) { if( pthread_create( NULL, NULL, &echo, &ls ) != EOK ) errx( "thread create error" ); sched_yield(); pthread_mutex_lock( &mutex ); ntr--; while( ntr <= 0 ) pthread_cond_wait( &condvar, &mutex ); pthread_mutex_unlock( &mutex ); }; exit( EXIT_SUCCESS ); };
Здесь accept (как и раньше в случае prefork) перенесен в обрабатывающий поток (все thread блокированы в accept на единственном прослушивающем сокете). Для синхронизации я использую условную переменную, но могут применятся любые из синхронизирующих примитивов. Испытываем полученную программу:
/root/ForkThread # cli -p9007 TCP port = 9007, number of echoes = 10 time of reply - Cycles [usec.] : 879988[1645] 134687[251] 137152[256] 136303[254] 693676[1297] 138605[259] 140320[262] 138937[259] 136886[256] 342027[639]
Время реакции очень близко к последовательному серверу (к минимально достижимому потенциально!). Потоки обработчики на сервере идентифицируют себя своим tid:
/root/ForkThread # ech22 4567891011121314151617181920212223
Хорошо видно последовательное порождение нового потока для обработки каждого запроса клиента. Так же сильно, как и в предыдущем случае, выражены эффекты кэширования.
9. Можно создать сколь угодно сложный диспетчер, поддерживающий оптимальное число потоков (или процессов) в сервере, но в OS QNX от уже предоставлен как стандартное средство системы: потоковый пул (thread_pool_*). Сервер с использованием динамического пула потоков (файл ech21.cpp):
#include <pthread.h> #include <sys/dispatch.h> #include "common.h" static int ls; THREAD_POOL_PARAM_T *alloc( THREAD_POOL_HANDLE_T *h ) { return (THREAD_POOL_PARAM_T*)h; }; THREAD_POOL_PARAM_T *block( THREAD_POOL_PARAM_T *p ) { int rs = accept( ls, NULL, NULL ); if( rs < 0 ) errx( "accept error" ); return (THREAD_POOL_PARAM_T*)rs; }; int handler( THREAD_POOL_PARAM_T *p ) { retrans( (int)p ); close( (int)p ); delay( 250 ); cout << pthread_self() << flush; return 0; }; int main( int argc, char *argv[] ) { ls = getsocket( THREAD_POOL_PORT ); thread_pool_attr_t attr; memset( &attr, 0, sizeof( thread_pool_attr_t ) ); attr.lo_water = 3; /* заполнение блока атрибутов пула */ attr.hi_water = 7; attr.increment = 2; attr.maximum = 9; attr.handle = dispatch_create(); attr.context_alloc = alloc; attr.block_func = block; attr.handler_func = handler; void *tpp = thread_pool_create( &attr, POOL_FLAG_USE_SELF ) ; if( tpp == NULL ) errx( "create pool" ); thread_pool_start( tpp ); exit( EXIT_SUCCESS ); };
Всё, сервер готов - почти всё необходимое за нас сделала библиотека OS. Грубо, логика работы пула потоков QNX следующая:
- начально создаётся attr.lo_water (<нижняя ватерлиния>) потоков;
- для каждого потока при создании вызывается функция *attr.context_alloc;
- эта функция по завершению вызовет (сама) блокирующую функцию потока *attr.block_func;
- эта функция, после разблокирования (accept) вызовет функцию обработчика *attr.handler_func, которой в качестве параметра (в нашем тексте) передаст дескриптор присоединённого сокета;
- как только число заблокированных потоков станет ниже attr.lo_water - механизм пула создаст дополнительно attr.increment потоков;
- если число блокированных потоков в какой-то момент превысит attr.hi_water (<верхняя ватерлиния>) - <лишние> потоки будут уничтожены;
- . . . и всё это так, чтобы общее число потоков (выполняющиеся + блокированные) не превышало attr.maximum.
Это - уникально мощный механизм, с очень широкой функциональностью, но за более детальной информацией я отсылаю всех заинтересованных к технической документации OS QNX. Смотрим это в действии:
/root/ForkThread # cli -p9006 -n20 TCP port = 9006, number of echoes = 20 time of reply - Cycles [usec.] : 828384[1549] 139615[261] 142050[265] 144799[270] 143895[269] 146760[274] 142760[267] 145951[272] 142816[267] 144384[270] 144657[270] 159474[298] 147504[275] 147113[275] 145257[271] 146866[274] 153215[286] 145461[272] 145013[271] 145311[271]
Результаты очень близкие к максимально возможным! Так же, как и в предыдущих случаях - очень ярко выражен эффект кэширования: вся обработка ведётся на одном и том же, многократно используемом, адресном пространстве.
Посмотрим <чередование> tid обрабатывающих потоков:
/root/ForkThread # ech21 15315315311731731731731731731776176176176176176176
Хорошо видно, что через некоторое время работы число потоков в пуле стабилизируется на уровне 7-ми (<верхней ватерлинии>). Через какое-то время выполнения состояние пула будет примерно таким:
/ # pidin ... 10059820 1 ./ech21 10r REPLY 94228 10059820 2 ./ech21 10r REPLY 94228 10059820 3 ./ech21 10r REPLY 94228 10059820 4 ./ech21 10r REPLY 94228 10059820 5 ./ech21 10r REPLY 94228 10059820 6 ./ech21 10r REPLY 94228 10059820 7 ./ech21 10r REPLY 94228 ...
Как и предсказывает документация - мы имеем 7 блокированных на accept потоков - по <верхнюю ватерлинию>.
Достаточно интересно посмотреть состояния ожидающих сокетов при запущенных всех (или почти всех) видах описанных выше серверов - вот возможное начало таблицы netstat:
/root/ForkThread # netstat -a Active Internet connections (including servers) Proto Recv-Q Send-Q Local Address Foreign Address State tcp 0 0 *.9005 *.* LISTEN tcp 0 0 *.9003 *.* LISTEN tcp 0 0 *.9006 *.* LISTEN tcp 0 0 *.9002 *.* LISTEN tcp 0 0 *.9001 *.* LISTEN tcp 0 0 *.9000 *.* LISTEN tcp 0 0 *.ech3 *.* LISTEN
10. Итоги. Выше рассмотрено 7 различных альтернативных технологий построения сервера TCP/IP. Сравним средние характеристики вариантов по критерию <время задержки реакции> (представляют интерес только порядки величин, сами значения могут радикально <гулять> в зависимости от конкретного вида серверной функции):
Тип сервера
Среднее время обслуживания
Время латентности
Последовательный - п.3
135 000 0 fork - п.4 >>1 470 000 >>1 000 000 pre-fork - п.5 133 000 0 inetd - п.6 14 100 000 14 000 000 thread - п.7 267 000 130 000 pre-thread - п.8 140 000 5 000 (~0) thread pool - п.9 144 000 9 000 (~0)
Тем не менее, не следует категорически руководствоваться выбором той или иной технологии построения сервера только исходя из содержимого показанной выше таблицы. В каждом конкретном случае при выборе решения должно учитываться существенно больше факторов: трудоёмкость реализации, потребление ресурсов, в частности RAM (которое мы никак не затрагиваем в нашем рассмотрении), простота отладки и сопровождения etc.
P.S. 1. Все упоминаемые в тексте элементы программного кода, или необходимые для их сборки элементы (Makefile) содержаться в составе прилагаемого проекта echsrv.tgz.
2. Материал данного рассмотрения непосредственно произошёл от обсуждения подобных вопросов на форуме http://qnx.org.ru/forum тема <fork или thread> : т.е., в первую очередь, автор приносит свои благодарности всем, принявшим участие в обсуждении. Во-вторых - все обсуждавшие данную тему в форуме, являются соавторами предлагаемого материала в той же мере, как и автор, указанный в титуле статьи, а их полное поимённое перечисление : я опускаю только в силу их многочисленности.