Программирование на языке Си++

Google
  Главная   Новости   Статьи   Книги   Ссылки  

Сервер TCP/IP:

много серверов хороших и разных.
Олег И. Цилюрик: olej@front.ru
Редакция 1.04 от 23.06.2003г.

Чаще всего, если это не приходится делать очень часть (т.е. не является основной спецификой работы), при необходимости написания TCP/IP сервера используются одна из двух <классических> технологий: последовательный сервер, или параллельный сервер на основе fork() (Windows-программисты в этом случае пишут сервер на основе thread). Хотя реально можно предложить гораздо больше принципиально различных серверов, которые будут существенно отличаться своей сложностью, временем реакции на запрос клиента и т.д. Ниже описано несколько из таких способов с результатами их тестирования. Программы делались и испытывались в OS QNX 6.2.1, но могут (за исключением специально оговоренного случая) практически без изменений использоваться в любой UNIX-like OS, а за некоторым исключением - и в Windows.

1. Постановка задачи: мы напишем специальный тестовый TCP/IP клиент, который посылает требуемое число раз запрос к серверу (ретранслятору), принимает от него ответ, и тут же разрывает соединение. Серия запросов от клиента делается для усреднения результата и для того (как будет видно далее), чтобы исключить (или учесть) эффекты кэширования памяти. Клиент измеряет время (точнее - число циклов процессора) между отправкой запроса серверу и приходом ответа от него. Сервера в этом анализе являются простыми ретрансляторами. Все показанные программы - предложены в упрощённых вариантах: не везде сделана полная обработка ошибочных ситуаций (что, вообще-то говоря, крайне необходимо), и сознательно не включена обработка сигнала SIGCHLD, которая должна препятствовать появлению <зомби> процессов. Все приводимые коды программ - работающие и апробированные: весь результирующий вывод скопирован непосредственно с консоли задачи. Весь приводимый программный код транслировался компилятором gcc-2-95 в нотации языка C++ (хотя специфические особенности С++, за исключением потокового ввода-вывода С++ и не использованы).

2. Клиент. Собственно клиент размещён в файле cli.cpp, но он, совместно с сервером, использует общие файлы common.h & common.cpp, все эти файлы с краткими комментариями приведены ниже:

common.h - в этом файле определены различные порты TCP, по которым клиент будет связываться с различными модификациями серверов. Кроме того, здесь определены:

- функция завершения по критической ошибке;

- единая процедура ретрансляции через сокет, которую используют все сервера(для единообразия и корректности сравнений);

- функция подготовки прослушивающего сокета TCP/IP (для того, чтобы устранить этот, достаточно объёмный, код из кода серверов, рассматриваемых ниже).

 
#if !defined( __COMMON_H )
#define __COMMON_H
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <iostream.h>
#include <netdb.h>
const int PORT = 9000,                      /* программа: */
              SINGLE_PORT = PORT,              /* ech0  */
              FORK_PORT = PORT + 1,            /* ech1  */
              FORK_LARGE_PORT = PORT + 2,      /* ech10 */
              PREFORK_PORT = PORT + 3,         /* ech11 */
              INET_PORT = PORT + 4,            /* ech3  */
              THREAD_PORT = PORT + 5,          /* ech2  */
              THREAD_POOL_PORT = PORT + 6,     /* ech21 */
              PRETHREAD_PORT = PORT + 7;       /* ech22 */
const int MAXLINE = 40;
// критическая ошибка ...
void errx( const char *msg, int err = EOK );

// ретранслятор тестовых пакетов TCP
void retrans( int sc );

// создание и подготовка прослушивающего сокета
int getsocket( in_port_t );
#endif

common.cpp - реализационная часть:

#include "common.h"

// ошибка ...
void errx( const char *msg, int err = EOK ) {
    perror( msg );
    if( err != EOK ) errno = err;
    exit( EXIT_FAILURE );
};

// ретранслятор тестовых пакетов TCP
static char data[ MAXLINE ];
void retrans( int sc ) {
   int rc = read( sc, data, MAXLINE );
   if( rc > 0 ) {
      rc = write( sc, data, strlen( data ) + 1 );
      if ( rc < 0 ) perror( "write data failed" );
   }
   else if( rc < 0 ) { perror( "read data failed" ); return; }
   else if( rc == 0 ) { cout << "client closed connection" << endl; return; };
   return;
};
// создание и подготовка прослушивающего сокета
struct sockaddr_in addr;
int getsocket(  in_port_t p ) {
   int rc = 1, ls;
   if( ( ls = socket( AF_INET, SOCK_STREAM, 0 ) ) = -1 )
      errx( "create stream socket failed" );
   if( setsockopt( ls, SOL_SOCKET, SO_REUSEADDR, &rc, sizeof( rc ) ) != 0 )
      errx( "set socket option failed" );
   memset( &addr, 0, sizeof( addr ) );
   addr.sin_len = sizeof( addr );
   addr.sin_family = AF_INET;
   addr.sin_port = htons( p );
   addr.sin_addr.s_addr = htonl( INADDR_ANY );
   if( bind( ls, (struct sockaddr*)&addr, sizeof( sockaddr ) ) != 0 )
      errx( "bind socket address failed" );
   if( listen( ls, 25 ) != 0 ) errx( "put socket in listen state failed" );
   return ls;
};

cli.cpp - код клиента:

 #include <inttypes.h>
#include <sys/neutrino.h>
#include <sys/syspage.h>
#include <sys/procfs.h>
#include "common.h"
// установка параметров клиентов: порт и число повторений
static void setkey( int argc, char *argv[], in_port_t* port, int* num ) {
    int opt, val;
    while ( ( opt = getopt( argc, argv, "p:n:") ) != -1 ) {
        switch( opt ) {
            case 'p' :
                if( sscanf( optarg, "%i", &val ) != 1 )
                   errx( "parse command line failed", EINVAL );
                *port = (in_port_t)val;
                break;
            case 'n' :
                if( ( sscanf( optarg, "%i", &val ) != 1 ) || ( val <= 0 ) )
                   errx( "parse command line failed", EINVAL );
                *num = val;
                break;
            default :
                errx( "parse command line failed", EINVAL );
                break;
        };
    };
};

// клиент - источник потока тестовых пакетов TCP
int main( int argc, char *argv[] ) {
   in_port_t listen_port = SINGLE_PORT;
   int num = 10;
   setkey( argc, argv, &listen_port, &num );
   char data[ MAXLINE ], echo[ MAXLINE ];   
   uint64_t cps = cps = SYSPAGE_ENTRY( qtime )->cycles_per_sec;
   cout << "TCP port = " << listen_port << ", number of echoes = " << num << endl
           << "time of reply - Cycles [usec.] :" << endl;
   for( int i = 0; i < num; i++ ) {
      int rc, ls;
      if( ( ls = socket( AF_INET, SOCK_STREAM, 0 ) ) < 0 )
         errx( "create stream socket failed" );
      struct sockaddr_in addr;
      memset( &addr, 0, sizeof( addr ) );
      addr.sin_len = sizeof( addr );
      addr.sin_family = AF_INET;
      addr.sin_port = htons( listen_port );
      inet_aton( "localhost", &addr.sin_addr );
      if( ( rc = connect( ls, (struct sockaddr*)&addr, sizeof( sockaddr ) ) ) < 0 )
         errx( "connect failed" );
      sprintf( data, "%d", rand() );
      uint64_t cycle = ClockCycles();
      if( ( rc = write( ls, data, strlen( data ) + 1 ) ) <= 0 )
         errx( "write data failed" );
      rc = read( ls, echo, MAXLINE );
      cycle = ClockCycles() - cycle;     
      if( rc < 0 ) errx( "read data failed" );
      if( rc == 0 ) errx( "server closed connection" );
      if( strcmp( data, echo ) != 0 ) { cout << "wrong data" << endl; break; };
      cout << cycle << "[" << cycle * 1000000 / cps << "]";
      if( i % 5 == 4 ) cout << endl; else cout << '\t'; cout << flush;
      close( ls );
      delay( 100 );       
   };
   if( num % 5 != 0 ) cout << endl;
   exit( EXIT_SUCCESS );
};

После запуска клиент анализирует ключи запуска. Предусмотрены значения: <-p> значение порта подключения (по умолчанию - последовательный сервер, порт 9000), и <-n> - число запросов к серверу в серии (по умолчанию - 10). Каждый запрос представляет собой случайное число, генерируемое клиентом, в символьной форме. Ретранслированный сервером ответ сверяется с запросом для дополнительного контроля. Клиент подключается к серверу по петлевому интерфейсу 127.0.0.1, что вполне достаточно для сравнительного анализа. Далее мы рассмотрим его работу с различными серверами.

3. Последовательный сервер ретранслятор. Такой сервер нас интересует только как эталон для сравнения: он имеет минимальное время реакции, т.к. не затрачивается время на порождение каких-либо механизмов параллелизма. С другой стороны, такой сервер, зачастую, просто неинтересен, т.к. не позволяет обслуживать других клиентов до завершения текущего обслуживания.

Все сервера имеют крайне простой код, потому что большая часть <рутины> снесена в файлы common (h & cpp). Вот код 1-го используемого нами - последовательного сервера (файл ech0.cpp):

 #include "common.h"
int main( int argc, char *argv[] ) {
   int ls = getsocket( SINGLE_PORT ), rs;
   while( true ) {
      if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" );
      retrans( rs );
      close( rs );
      cout << "*" << flush;
   };
   exit( EXIT_SUCCESS );
};

Вот результаты выполнения клиента с этим сервером (указано число машинных циклов ожидания, а в скобках - для справки - время в микросекундах для процессора Celeron 533Mhz):

 
/root/ForkThread # cli -p9000 -n20
TCP port = 9000, number of echoes = 20
time of reply - Cycles [usec.] :
868325[1624]    135364[253]     135287[253]     133438[249]     133057[248]
136061[254]     133554[249]     133887[250]     138776[259]     131237[245]
134748[252]     133823[250]     135650[253]     130583[244]     134562[251]
132601[248]     134622[251]     134516[251]     132055[246]     134139[250]

Отчётливо виден (1-й запрос) эффект, который мы отнесли к эффектам кэширования памяти программ - различие времени выполнения первого и последующих запросов. В каталоге проекта есть ещё один (тестовый) вариант последовательного сервера, код которого выглядит несколько иначе:

 #include <sys/neutrino.h>
#include "common.h"
int main( int argc, char *argv[] ) {
   int ls = getsocket( SINGLE_PORT ), rs, i = 0;
   while( true ) {
      if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" );
      uint64_t cycle = ClockCycles();
      retrans( rs );
      cycle = ClockCycles() - cycle; 
      close( rs );
      cout << cycle;
      if( i++ % 5 == 4 ) cout << endl; else cout << '\t'; cout << flush;
   };
   exit( EXIT_SUCCESS );
};

Он отличается тем, что <хронометрирует> (для справки) оценочно число циклов на ретрансляцию (затрачиваемые внутри сервера). Все типы серверов используют общую процедуру retrans() и единые затраты <чистого времени>. Приведём для справки эти оценки (только машинные циклы):

 /root/ForkThread # ech0_
757808  60862   60085   60444   60197
61111   60565   60154   59121   59984

Видно, что это время составляет около 50% времени, наблюдаемого со стороны клиента, которое включает в себя время реакции на accept() (со стороны сервера), 2-кратные затраты write() + read() (со стороны как клиента, так и сервера), время передачи буферов по петлевому интерфейсу и т.п.

4. <Классический> параллельный сервер. Ниже приведен код такого <классического> сервера (в отличающейся части), в котором обслуживающий процесс порождается fork() после разблокирования на accept()(файл ech1.cpp):

 #include "common.h"
int main( int argc, char *argv[] ) {
   int ls = getsocket( FORK_PORT ), rs;  
   while( true ) {
      if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" );
      pid_t pid = fork();
      if( pid < 0 ) errx( "fork error" );
      if( pid == 0 ) {
         close( ls );
         retrans( rs );
         close( rs );
         cout << "*" << flush;                 
         exit( EXIT_SUCCESS );
      }
      else close( rs );
   };
   exit( EXIT_SUCCESS );
};
 

После выхода из accept() (получение запроса connect() от клиента) - порождается отдельный обслуживающий процесс, который тут же закрывает свою копию прослушивающего сокета, производит ретрансляцию через соединённый сокет, завершает соединение и завершается сам. Родительский же процесс закрывает свою копию соединённого сокета и продолжает прослушивание канала. Вот результаты выполнения такого сервера:

 
/root/ForkThread # cli -p9001 -n20
TCP port = 9001, number of echoes = 20
time of reply - Cycles [usec.] :
2219652[4151]   1467470[2744]   1470056[2749]   1466860[2743]   1469294[2748]
1466875[2743]   1467612[2745]   1489083[2785]   1475620[2759]   1665398[3114]
1472091[2753]   1471635[2752]   1481768[2771]   1462214[2734]   1467229[2744]
1468731[2747]   1466483[2742]   1465499[2741]   1461780[2734]   1649821[3085]
 

Да . . . время реакции больше чем на порядок превышает простой последовательный сервер. Видно заметно меньше (относительно) выраженный эффект кэширования - вновь создаваемое адресное пространство процесса повторно не используется, однако некоторое влияние кэширования сказывается (в программе на стороне клиента?). Добавим в код сервера 1 строчку - перед точкой main (файл ech10.cpp - и изменён порт):

 
static long MEM[ 2500000 ];
 
/root/ForkThread # cli -p9002
TCP port = 9002, number of echoes = 10
time of reply - Cycles [usec.] :
67061908[125432]   64674322[120966]   64126835[119942]   63071907[117969]   64185096[120051]
65478368[122470]   64495464[120632]   64533852[120703]   63831652[119390]   64407915[120468]
 

Строки вывода перенесены мною, потому, что он уже не помещаются в формат страницы: время реакции увеличилось почти в 50 раз, превышает время реакции простейшего последовательного сервера уже почти на 3 порядка (500 раз, или 1000 раз по <чистому> времени обслуживания), и составляет уже 0.12 секунды на каждый запрос. Что произошло? При порождении нового процесса по fork() (можно считать, что здесь затраты не столь большие - из предыдущей таблицы: порядка 1.5 млн. циклов) - OS обязана перекопировать образ задачи (к которой мы добавили ~20Mb) из адресного пространства одного процесса, а пространство другого. И не посредством memcpy(), а запросами к ядру системы, потому как копирование идёт между различными защищёнными образами!

Какие предварительные итоги можно сделать из рассматриваемых результатов? Во-первых, то, что OS QNX определённо не использует технику (COW) для копирования образов порождаемых по fork копий процессов, а, во-вторых, : меняет ли что-то принципиально применение COW в других OS, например в Linux? Думаю, что <скорее нет>, т.к. радикальное снижение начального времени реакции (времени латентности) при использовании COW оборачивается только скрытием тех же затрат, но <распределённых> по интервалу обслуживанию. Т.е., использование COW эффективно только как <рекламный>, <рыночный> трюк, рассчитанный на гипнотическое воздействие на конечного потребителя некоторых <магических> тестовых цифр: и уж категорически неприменимо для realtime OS, поведение которых во времени должно быть строго детерминировано.

5. Параллельный сервер с предварительным созданием копий. Так что же получается: для серверов, работающих на высоко интенсивных потоках запросов, с традиционным fork-методом всё так плохо? Отнюдь! Нужно только поменять fork & accept местами - создать заранее некоторый пул обслуживающих процессов, каждый из которых до прихода клиентского запроса будет заблокирован на accept (кстати - accept на одном и том же прослушиваемом сокете). А после отработки клиентского запроса заблаговременно создать новый обслуживающий процесс. Эта техника известна как <предварительный fork> или pre-fork. Меняем текст сервера (файл ech11.cpp):

 
#include "common.h"
const int NUMPROC = 3;
int main( int argc, char *argv[] ) {
   int ls = getsocket( PREFORK_PORT ), rs;        
   for( int i = 0; i < NUMPROC; i++ ) {  
      if( fork() == 0 ) {
         int rs;        
         while( true ) {
            if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" );
            retrans( rs );
            close( rs );
            cout << i << flush;
            delay( 250 );
         };
      };
   };
   for( int i = 0; i < NUMPROC; i++ ) waitpid( 0, NULL, 0 );
   exit( EXIT_SUCCESS );
};
 

При написании этого текста я несколько <схитрил> и упростил в сравнении с предложенной абзацем выше моделью. Здесь 3 обслуживающих процесса сделаны циклическими и не завершаются по окончанию обслуживания, а снова блокируются на accept, но для наблюдения эффектов этого вполне достаточно (последняя строка нужна вообще только для блокировки родительского процесса, и <сохранения> управляющего терминала - для возможности прекращения всей группы по ^C):

 
# pidin 
...
6901868   1 ./ech11             10r REPLY       94228
6901869   1 ./ech11             10r REPLY       94228
6901870   1 ./ech11             10r REPLY       94228
 
/root/ForkThread # cli -p9003
TCP port = 9003, number of echoes = 10
time of reply - Cycles [usec.] :
854276[1597]    138356[258]     135665[253]     131656[246]     136653[255]
132532[247]     133583[249]     134639[251]     136363[255]     131482[245]
 

Время реакции - практически равно последовательному серверу, чего мы и добивались. В этой программе добавлен вывод идентификатора (i) обрабатывающего процесса (предыдущие сервера выводили только символ <*> для идентификации факта обработки запроса). Для этого добавлена и задержка <пере-активизации> процесса delay(250) - больше 2-х периодов запросов клиентов, чтоб заставить обрабатывающие процессы чередоваться. Вот возможный вид протокола сервера:

 
/root/ForkThread # 2012012012201201201220120120122012012012
 

Хорошо видно нарушение периодичности последовательности идентификационных номеров процессов: после периода простоя всегда обслуживание осуществляется процессом с индексом 2 (максимальным) - при множественном блокировании на acept() первым разблокируется процесс, заблокировавшийся последним (!?).

В принципе, не так и сложно в такой схеме сделать и динамический пул процессов, как будет показано ниже для потоков - с той лишь некоторой сложностью, что здесь каждый процесс выполняется в своём закрытом адресном пространстве, и для их взаимной синхронизации придётся использовать что-то из механизмов IPC.

6. Прежде, чем переходить к потоковым (thread) реализациям, рассмотрим ещё один fork-вариант: использование суперсервера inetd. При этом весь сервис по запуску процессов-копий нашего приложения, и перенаправлению его стандартных потоков ввода-вывода в сокет - возьмёт на себя inetd. Вот полный текст ретранслирующего сервера для этого случая (файл ech3.cpp):

 
#include <stdio.h>
#include "common.h"
static char data[ MAXLINE ];
void main( void ) { write( STDOUT_FILENO, data, read( STDIN_FILENO, data, MAXLINE )  ); };
 

Просто? Мне кажется, что - очень. Теперь настроим на наше приложение и запустим inetd

- Дописываем в конфигурационный файл /etc/services строку, определяющую порт, через который будет вызываться приложение:

ech3            9004/tcp
 

- В конфигурационный файл файл /etc/inetd.conf добавляем строку, которая определяет режим обслуживания и конкретные параметры вызываемого приложения:

ech3    stream  tcp nowait root /root/ForkThread/ech3 ech3
 

- Запускаем inetd:

/root/ForkThread # inetd &
 

При заполнении строк концигурационных файлов нужна особая тщательность, если заголовки сервисов (ech3) в файлах не будут совпадать, то вы получите просто ошибку связи:

/root/ForkThread # cli -p9004
TCP port = 9004, number of echoes = 10
time of reply - Cycles [usec.] :
connect failed: Connection refused
 

Проверить, что inetd настроен на прослушивание нашего порта можно так:

/etc # netstat -a
Active Internet connections (including servers)
Proto Recv-Q Send-Q  Local Address          Foreign Address        State
...
tcp        0      0  *.ech3                 *.*                    LISTEN
 

Заставить inetd перечитать свои конфигурационные файлы после каждой правки /etc/services или /etc/inetd.conf вы можете, послав ему сигнал SGHUP, например:

/etc # pidin
...
10231922   1 usr/sbin/inetd      10r SIGWAITINFO
/etc # kill -SIGHUP 10231922
 

Если ошибка допущена в полном имени программы сервера (поля 6-7 строки inetd.conf), то мы тоже получим не сразу объяснимый результат:

/root/ForkThread # cli -p9004
TCP port = 9004, number of echoes = 10
time of reply - Cycles [usec.] :
server closed connection
 

... и, наконец, если всё в настройке inetd правильно, то получим нечто похожее:

 
/root/ForkThread # cli -p9004
TCP port = 9004, number of echoes = 10
time of reply - Cycles [usec.] :
16442468[30753] 14169659[26502] 14354292[26848] 14160723[26486] 14187182[26535]
14145131[26457] 14411884[26955] 14761467[27609] 14207573[26573] 14491483[27104]
 

Отметим, что время реакции в несколько раз (до 10-ти) выше прямой реализации с fork (inetd ведь также <скрыто> делает fork), но зато какая простота и трудоёмкость! Характерно почти полное отсутствие эффектов кэширования. Для серверов, обслуживающих <неплотный> поток запросов - это, пожалуй, оптимальное решение (кстати, большинство <штатных> сетевых сервисов UNIX выполняется именно по такой схеме).

 

7. Сервер, использующий pthread_create по запросу обслуживания клиента (файл ech2.cpp):

 
#include <pthread.h>
#include "common.h"
void* echo( void* ps ) {
    int sc = *(int*)ps;
    sched_yield();
    retrans( sc );
    close( sc );
    cout << "*" << flush;                     
    return NULL;
}
int main( int argc, char *argv[] ) {
   int ls = getsocket( THREAD_PORT ), rs;           
   while( true ) {
      if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" );
      if( pthread_create( NULL, NULL, &echo, &rs ) != EOK ) errx( "thread create error" );
      sched_yield();      
   };
   exit( EXIT_SUCCESS );
};
 

Минимальные комментарии: 2 вызова sched_yield() (в вызывающем потоке, и, позже, в функции обслуживания созданного потока) - предназначены для гарантии копирования созданным потоком переданного дескриптора сокета до его повторного переопределения в цикле вызывающего потока. Результаты выполнения программы:

 
/root/ForkThread # cli -p9005
TCP port = 9005, number of echoes = 10
time of reply - Cycles [usec.] :
2493948[4664]   266123[497]     269490[504]     279049[521]     267775[500]
266880[499]     288175[539]     268589[502]     267990[501]     267003[499]
 

Это только в 2 раза (в 3, если оценивать по <чистому> времени) хуже простого последовательного сервера. Чрезвычайно сильно выражен эффект кэширования - вся обработка последовательности запросов производится на едином (многократно используемом) пространстве адресов.

8. Сервер с предварительным созданием потоков. Поступим по аналогии с pre-fork, и создадим фиксированный пул потоков предварительно (pre-thread, файл ech22.cpp):

 
#include <pthread.h>
#include "common.h"
static int ntr = 3;     /*число thread в пуле*/
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t condvar = PTHREAD_COND_INITIALIZER;
void* echo( void* ps ) {
    int sc = *(int*)ps, rs;
    sched_yield();
    if( ( rs = accept( sc, NULL, NULL ) ) < 0 ) errx( "accept error" );
    retrans( rs );
    close( rs );
    pthread_mutex_lock( &mutex );
    ntr++;
    pthread_cond_signal( &condvar );
    pthread_mutex_unlock( &mutex );
    cout << pthread_self() << flush;
    delay( 250 );
    return NULL;
}
int main( int argc, char *argv[] ) {
   int ls = getsocket( PRETHREAD_PORT ), rs;              
   while( true ) {
      if( pthread_create( NULL, NULL, &echo, &ls ) != EOK ) errx( "thread create error" );
      sched_yield();      
      pthread_mutex_lock( &mutex );
      ntr--;
      while( ntr <= 0 ) pthread_cond_wait( &condvar, &mutex );
      pthread_mutex_unlock( &mutex );
   };
   exit( EXIT_SUCCESS );
};
 

Здесь accept (как и раньше в случае prefork) перенесен в обрабатывающий поток (все thread блокированы в accept на единственном прослушивающем сокете). Для синхронизации я использую условную переменную, но могут применятся любые из синхронизирующих примитивов. Испытываем полученную программу:

 
/root/ForkThread # cli -p9007
TCP port = 9007, number of echoes = 10
time of reply - Cycles [usec.] :
879988[1645]    134687[251]     137152[256]     136303[254]     693676[1297]
138605[259]     140320[262]     138937[259]     136886[256]     342027[639]
 

Время реакции очень близко к последовательному серверу (к минимально достижимому потенциально!). Потоки обработчики на сервере идентифицируют себя своим tid:

 
/root/ForkThread # ech22
4567891011121314151617181920212223
 

Хорошо видно последовательное порождение нового потока для обработки каждого запроса клиента. Так же сильно, как и в предыдущем случае, выражены эффекты кэширования.

 

9. Можно создать сколь угодно сложный диспетчер, поддерживающий оптимальное число потоков (или процессов) в сервере, но в OS QNX от уже предоставлен как стандартное средство системы: потоковый пул (thread_pool_*). Сервер с использованием динамического пула потоков (файл ech21.cpp):

 
#include <pthread.h>
#include <sys/dispatch.h>
#include "common.h"
static int ls;
THREAD_POOL_PARAM_T *alloc( THREAD_POOL_HANDLE_T *h ) { return (THREAD_POOL_PARAM_T*)h; };
THREAD_POOL_PARAM_T *block( THREAD_POOL_PARAM_T *p ) {
    int rs = accept( ls, NULL, NULL );
    if( rs < 0 ) errx( "accept error" );
    return (THREAD_POOL_PARAM_T*)rs; 
};
int handler( THREAD_POOL_PARAM_T *p ) {
    retrans( (int)p );
    close( (int)p );
    delay( 250 );
    cout << pthread_self() << flush;                     
    return 0;
};
int main( int argc, char *argv[] ) {
   ls = getsocket( THREAD_POOL_PORT );                 
   thread_pool_attr_t attr;
   memset( &attr, 0, sizeof( thread_pool_attr_t ) );
   attr.lo_water = 3;     /* заполнение блока атрибутов пула */
   attr.hi_water = 7;
   attr.increment = 2;
   attr.maximum = 9;
   attr.handle = dispatch_create();
   attr.context_alloc = alloc;
   attr.block_func = block;
   attr.handler_func = handler;
   void *tpp = thread_pool_create( &attr, POOL_FLAG_USE_SELF ) ;
   if(  tpp == NULL ) errx( "create pool" );
   thread_pool_start( tpp );
   exit( EXIT_SUCCESS );
};
 

Всё, сервер готов - почти всё необходимое за нас сделала библиотека OS. Грубо, логика работы пула потоков QNX следующая:

- начально создаётся attr.lo_water (<нижняя ватерлиния>) потоков;

- для каждого потока при создании вызывается функция *attr.context_alloc;

- эта функция по завершению вызовет (сама) блокирующую функцию потока *attr.block_func;

- эта функция, после разблокирования (accept) вызовет функцию обработчика *attr.handler_func, которой в качестве параметра (в нашем тексте) передаст дескриптор присоединённого сокета;

- как только число заблокированных потоков станет ниже attr.lo_water - механизм пула создаст дополнительно attr.increment потоков;

- если число блокированных потоков в какой-то момент превысит attr.hi_water (<верхняя ватерлиния>) - <лишние> потоки будут уничтожены;

- . . . и всё это так, чтобы общее число потоков (выполняющиеся + блокированные) не превышало attr.maximum.

Это - уникально мощный механизм, с очень широкой функциональностью, но за более детальной информацией я отсылаю всех заинтересованных к технической документации OS QNX. Смотрим это в действии:

 
/root/ForkThread # cli -p9006 -n20
TCP port = 9006, number of echoes = 20
time of reply - Cycles [usec.] :
828384[1549]    139615[261]     142050[265]     144799[270]     143895[269]
146760[274]     142760[267]     145951[272]     142816[267]     144384[270]
144657[270]     159474[298]     147504[275]     147113[275]     145257[271]
146866[274]     153215[286]     145461[272]     145013[271]     145311[271]
 

Результаты очень близкие к максимально возможным! Так же, как и в предыдущих случаях - очень ярко выражен эффект кэширования: вся обработка ведётся на одном и том же, многократно используемом, адресном пространстве.

 

Посмотрим <чередование> tid обрабатывающих потоков:

 
/root/ForkThread # ech21
15315315311731731731731731731776176176176176176176
 

Хорошо видно, что через некоторое время работы число потоков в пуле стабилизируется на уровне 7-ми (<верхней ватерлинии>). Через какое-то время выполнения состояние пула будет примерно таким:

 
/ # pidin 
...
10059820   1 ./ech21             10r REPLY       94228
10059820   2 ./ech21             10r REPLY       94228
10059820   3 ./ech21             10r REPLY       94228
10059820   4 ./ech21             10r REPLY       94228
10059820   5 ./ech21             10r REPLY       94228
10059820   6 ./ech21             10r REPLY       94228
10059820   7 ./ech21             10r REPLY       94228
...
 

Как и предсказывает документация - мы имеем 7 блокированных на accept потоков - по <верхнюю ватерлинию>.

 

Достаточно интересно посмотреть состояния ожидающих сокетов при запущенных всех (или почти всех) видах описанных выше серверов - вот возможное начало таблицы netstat:

 
/root/ForkThread # netstat -a
Active Internet connections (including servers)
Proto Recv-Q Send-Q  Local Address          Foreign Address        State
tcp        0      0  *.9005                 *.*                    LISTEN
tcp        0      0  *.9003                 *.*                    LISTEN
tcp        0      0  *.9006                 *.*                    LISTEN
tcp        0      0  *.9002                 *.*                    LISTEN
tcp        0      0  *.9001                 *.*                    LISTEN
tcp        0      0  *.9000                 *.*                    LISTEN
tcp        0      0  *.ech3                 *.*                    LISTEN
 

10. Итоги. Выше рассмотрено 7 различных альтернативных технологий построения сервера TCP/IP. Сравним средние характеристики вариантов по критерию <время задержки реакции> (представляют интерес только порядки величин, сами значения могут радикально <гулять> в зависимости от конкретного вида серверной функции):

Тип сервера

Среднее время обслуживания

Время латентности

Последовательный - п.3

 135 000
 0
 
fork - п.4
 >>1 470 000
 >>1 000 000
 
pre-fork - п.5 
 133 000
 0
 
inetd - п.6
 14 100 000
 14 000 000
 
thread - п.7
 267 000
 130 000
 
pre-thread - п.8
 140 000
 5 000 (~0)
 
thread pool - п.9
 144 000
 9 000 (~0)
 
 

Тем не менее, не следует категорически руководствоваться выбором той или иной технологии построения сервера только исходя из содержимого показанной выше таблицы. В каждом конкретном случае при выборе решения должно учитываться существенно больше факторов: трудоёмкость реализации, потребление ресурсов, в частности RAM (которое мы никак не затрагиваем в нашем рассмотрении), простота отладки и сопровождения etc.

P.S. 1. Все упоминаемые в тексте элементы программного кода, или необходимые для их сборки элементы (Makefile) содержаться в составе прилагаемого проекта echsrv.tgz.

2. Материал данного рассмотрения непосредственно произошёл от обсуждения подобных вопросов на форуме http://qnx.org.ru/forum тема <fork или thread> : т.е., в первую очередь, автор приносит свои благодарности всем, принявшим участие в обсуждении. Во-вторых - все обсуждавшие данную тему в форуме, являются соавторами предлагаемого материала в той же мере, как и автор, указанный в титуле статьи, а их полное поимённое перечисление : я опускаю только в силу их многочисленности.

Содержание

 

Используются технологии uCoz