Saturday, December 14, 2013

pthread: an introduction

pthread_create →  unna edhir paarkaavaa?? Vendamaa?? Sollittu po...
                            uses 'clone()' in Linux. Why not fork?
pthread_detach →(PTHREAD_CREATE_DETACHED)
                           Enna paakkaadha.. Naan andha velaya mudichittu appdiye poyiduven. Naan thaniyaa poyiruven.
                            (PTHREAD_CREATE_JOINABLE)
                            Naan vandhuduren ammaa.. Enakkaaha wait pannu... Naan ponaalum ennoda resource'la key vachittu poren. Nee dhaan key eduthuttu poyi ellaathayum oppadaikkanum. neeyum pannala'nna ellaame ambo (Zombie) dhaan...
pthread_join → Nee varra varaikkum, Wait pannuren! Nee erkanave poyirundhaai enraal, key edhuthittu vandhu, unnoda things ellaathayum release panniduren.
pthread_addr_init() → Initialize the attribute object that has to be passed to the pthread_create. Before releasing it or using in another thread, pthread_attr_destroy() needs to be called.
pthread_exit() → Release of system resources depend on the JOINABLE or not
 NPTL (Native POSIX Threads Library)
pthread_cancel → "ithoda Niruthikko!"nnu Solradha sollitten.. Appuram avan ishtam. Cancel disable aayirunda, nirutha mattaan (pthread_setcancelstate). Cancel enable pannirundhaa, two possibility. (pthread_setcanceltype) Asynchronous'la irundaa, udane niruttha chance undu. Deferred'la irundaaa, avan sonna Cancellation point'la dhaan niruthuvaan. Cancellation point is standard list of functions.
 
 
------------
 
Linux uses a 1-1 threading model, with (to the kernel) no distinction between processes and threads -- everything is simply a runnable task. *
On Linux, the system call clone clones a task, with a configurable level of sharing, among which are:
  • CLONE_FILES: share the same file descriptor table (instead of creating a copy)
  • CLONE_PARENT: don't set up a parent-child relationship between the new task and the old (otherwise, child's getppid() = parent's getpid())
  • CLONE_VM: share the same memory space (instead of creating a COW copy)
fork() calls clone(least sharing) and pthread_create() calls clone(most sharing). **
forking costs a tiny bit more than pthread_createing because of copying tables and creating COW mappings for memory, but the Linux kernel developers have tried (and succeeded) at minimizing those costs.
Switching between tasks, if they share the same memory space and various tables, will be a tiny bit cheaper than if they aren't shared, because the data may already be loaded in cache. However, switching tasks is still very fast even if nothing is shared -- this is something else that Linux kernel developers try to ensure (and succeed at ensuring).
In fact, if you are on a multi-processor system, not sharing may actually be beneficial to performance: if each task is running on a different processor, synchronizing shared memory is expensive.

* Simplified. CLONE_THREAD causes signals delivery to be shared (which needs CLONE_SIGHAND, which shares the signal handler table).
** Simplified. There exist both SYS_fork and SYS_clone syscalls, but in the kernel, the sys_fork and sys_clone are both very thin wrappers around the same do_fork function, which itself is a thin wrapper around copy_process. Yes, the terms process, thread, and task are used rather interchangeably in the Linux kernel...
 
Sample pthread program to act as TCP client and to display the transmission speed: Save the file as tcpcli.c and build with 'gcc tcpcli.c -lpthread -o tcpcli' and run with './tcpcli'.
 
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define false (0)
#define true (1)
#define BUFLEN          4000    /* default length of Tx/Rx buffer */
#define NBUF            2048    /* default number of transmission */
#define MAXCON          10
int nPayload = BUFLEN;
int nbuf = NBUF;
int nSocket = 1;
int startThread = false;
struct timeval sTime, eTime;
double nbytes;      /* bytes on net */
double cput, realt; /* user, real time (seconds) */
void *tClient(void *);
static void
tvsub(tdiff, t1, t0)
    struct timeval *tdiff, *t1, *t0;
{
    tdiff->tv_sec = t1->tv_sec - t0->tv_sec;
    tdiff->tv_usec = t1->tv_usec - t0->tv_usec;
    if (tdiff->tv_usec < 0)
        tdiff->tv_sec--, tdiff->tv_usec += 1000000;
}
/* ******************************************************************
FUNCTION NAME: main
INPUT PARAMETER: N/A
RETURN VALUE: int
DESCRIPTION: the main thread creates all the required client threads
         mapped to available ports.
********************************************************************* */
int main ( int argc, char *argv[] )
{
    int t, retcode, threadcount=0, joincount=0, pass[MAXCON], *retval[MAXCON];
    pthread_attr_t attr;
    struct timeval td;
    nbytes = 0.0;
    if ( argc != 3 )
    {
        printf( "Enter arguments \n");
        return -1;
    }
    nSocket = atoi(argv[1]);
    if ((nSocket > 10) || (nSocket < 1)) {
        printf( "Invalid No of connections (1 ~ 10)\n");
        return -1;
    }
    nPayload = atoi(argv[2]);
    if ((nPayload > BUFLEN) || (nPayload < 10)) {
        printf( "Invalid transfer unit size (10 ~ 4000)\n");
        return -1;
    }
    if ((nSocket == 1) && (nPayload == 4000)) {
        nbuf = (16777216 * 4) / nPayload;
    } else {
        nbuf = (16777216 * 2) / nPayload / nSocket;
    }
    printf("No of sockets: %d, Tx unit size: %d, No of bufs: %d\n", nSocket, nPayload, nbuf);
    pthread_t sockThread[nSocket];
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    pthread_attr_destroy(&attr);
    for(t = 0; t < nSocket; t++)
    {
        pass[t] = t + 1;
        if(0 == (pthread_create(&sockThread[t], &attr, tClient, (void *)(pass + t))))
        {
            threadcount++;
        }
        else
        {
            perror("ERROR: pthread_create failed");
            return -1;
        }
    }
    startThread = true ;
    gettimeofday(&sTime, NULL);
    for(t = 0; t < nSocket; t++)
    {
        retcode = pthread_join(sockThread[t], (void **)(retval + t));
        if (-1 == retcode)
        {
            printf("ERROR: return code from pthread_join() is %d\n", retcode);
            break;
        }
        else
        {
            nbytes += *(retval[t]);
            joincount++;
        }
    }
    if (joincount != threadcount)
    {
        printf("exiting in incomplete state..\n");
    }
    if (nbytes > 0.0) {
        /* Get real time */
        tvsub( &td, &eTime, &sTime );
        realt = td.tv_sec + ((double)td.tv_usec) / 1000000;
        nbytes = nSocket * nPayload * nbuf;
        printf("%.0f bytes in %.2f real seconds = %.2f Mbit/sec \n", nbytes, realt, (nbytes/realt) * 8.0 / 1024.0 / 1024.0);
    }
    pthread_exit(NULL);
    return 0;
}
/*****************************************************************************
* Send TCP data until the specified data length
*
******************************************************************************/
static int tcp_send(int cepid, char *data, int len)
{
    int ercd;
    int i;
    for (i = 0; i < len; i += ercd) {
        ercd = send(cepid, data + i, len - i, 0);
        if (ercd <= 0)
            break;
    }
    return i;   /* sent data length */
}

/* ******************************************************************
FUNCTION NAME: tClient
INPUT PARAMETER: port number is passed as an argument
RETURN VALUE: void
DESCRIPTION: this is the thread handler containing the client body,
         the thread spawned from main contains this body forming
         paralel client requesting the server.
********************************************************************* */
void *tClient(void *p)
{
    int len, total;
    int sockfd, blen = 32768;
    int retcode;
    char *sendbuff;
    int port = 5001;
    int bytesSent = 0;
    struct sockaddr_in address;
    printf("thread number %d started\n", *(int *)p);
    *(int *)p = 0;
    sendbuff = (char *)malloc(nPayload + 8);
    if(NULL != sendbuff)
    {
        memset(sendbuff, 0, nPayload + 8);
    }
    else
    {
        printf("Error in allocating buffer\n");
        pthread_exit(p);
    }
    /* Create socket for client */
    sockfd = socket(PF_INET, SOCK_STREAM, 0);
    if (sockfd == -1) {
        perror("Socket create failed") ;
        pthread_exit(p);
    }
    if (nSocket > 1) {
        if (setsockopt (sockfd, SOL_SOCKET, SO_SNDBUF, &blen, sizeof (blen)) < 0)
        {
            printf ("setsockopt SO_SNDBUF failed\n");
            pthread_exit(p);
        }
    }
    /* Name the socket as agreed with server */
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = inet_addr("172.16.21.20");
    address.sin_port = htons(port);
    len = sizeof(address);
    retcode = connect(sockfd, (struct sockaddr *)&address, len);
    if(retcode == -1)
    {
        perror("Connect failed");
        printf("Connect USB and do \'ifconfig usb0 172.16.21.21\'\n");
        pthread_exit(p);
    }
    while (startThread == false);
    total = 0;
    /* send TCP data until nPayload x nbuf bytes */
    for (retcode = 0; retcode < nbuf; retcode++) {
        len = tcp_send(sockfd, (char *)sendbuff, nPayload);
        if (len < nPayload) {
            if (len > 0)
                total += len;
            break;
        }
        total += len;
    }
    gettimeofday(&eTime, NULL);
    *(int *)p = total;
    free(sendbuff);
    close(sockfd);
    pthread_exit(p);
    return NULL;
}
 
 Sample pthread program to act as TCP server and to display the transmission speed: Save the file as tcpser.c and build with 'gcc tcpser.c -lpthread -o tcpser' and run with './tcpser'.
 
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define false (0)
#define true (1)
#define BUFLEN          8192    /* default length of Tx/Rx buffer */
#define MAXCON          10
#define TCPAPPPORTNO    5001
unsigned int recvbuff[BUFLEN/4 + 1];
int start_count = 0, blen = 16384;
struct timeval sTime, eTime;
double nbytes, realt;      /* bytes on net */
void *tServer(void *);
static void
tvsub(tdiff, t1, t0)
    struct timeval *tdiff, *t1, *t0;
{
    tdiff->tv_sec = t1->tv_sec - t0->tv_sec;
    tdiff->tv_usec = t1->tv_usec - t0->tv_usec;
    if (tdiff->tv_usec < 0)
        tdiff->tv_sec--, tdiff->tv_usec += 1000000;
}
/* ******************************************************************
FUNCTION NAME: main
INPUT PARAMETER: N/A
RETURN VALUE: int
DESCRIPTION: the main thread creates all the required client threads
         mapped to available ports.
********************************************************************* */
int main ( int argc, char *argv[] )
{
    int t, retcode, threadcount=0, joincount=0;
    int sockfd, sd_current = 0;
    unsigned int addrlen;
    pthread_attr_t attr;
    struct timeval td;
    struct sockaddr_in sin, pin;
    int sockarr[MAXCON], ncon, *retval[MAXCON];
    nbytes = 0.0;
    pthread_t sockThread[MAXCON];
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    pthread_attr_destroy(&attr);
    if ( argc != 2 )
    {
        printf( "Enter arguments \n");
        return -1;
    }
    ncon = atoi(argv[1]);
    if ((ncon > MAXCON) || (ncon < 1)) {
        printf( "Invalid No of connections (1 ~ %d)\n", MAXCON);
        return -1;
    }
    /* Create socket for client */
    sockfd = socket(PF_INET, SOCK_STREAM, 0);
    if (sockfd == -1) {
        perror("Socket create failed") ;
        pthread_exit(0);
    }
    /* complete the socket structure */
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(TCPAPPPORTNO);
    if (inet_pton(AF_INET, "172.16.21.21", &(sin.sin_addr.s_addr)) <= 0)
    {
        return -1;
    }
    /* bind the socket to the port number */
    if (-1 == (bind(sockfd, (struct sockaddr *) &sin, sizeof(sin))))
    {
        perror("Bind failed") ;
        printf("Connect USB and do \'ifconfig usb0 172.16.21.21\' or wait for the address to be released\n");
        return -1;
    }
    printf("%d connections possible. Please give same number of connections on target too\n", ncon);
    for (t = 0; t < ncon; t++) {
        if ( -1 == listen(sockfd, MAXCON))
        {
            perror("Listen failed..retrying") ;
            continue;
        }
        if (ncon > 1) {
            if (setsockopt (sockfd, SOL_SOCKET, SO_SNDBUF, &blen, sizeof (blen)) < 0)
            {
                printf ("setsockopt SO_SNDBUF failed\n");
                continue;
            }
        }
        if (-1 == (sd_current = accept(sockfd, (struct sockaddr *) &pin, &addrlen)))
        {
            perror("Accept failed..retrying") ;
            continue;
        }
        printf("Connection %d accepted\n", t+1);
        if (ncon > 1) {
            if (setsockopt (sd_current, SOL_SOCKET, SO_RCVBUF, &blen, sizeof (blen)) < 0)
            {
                printf ("setsockopt SO_RCVBUF failed\n");
                continue;
            }
        }
        sockarr[threadcount] = sd_current;
        if(0 == (pthread_create(&sockThread[threadcount], &attr, tServer, (void *)(sockarr + threadcount))))
        {
            threadcount++;
        }
        else
        {
            perror("ERROR: pthread_create failed");
            return -1;
        }
    }
    for(t = 0; t < threadcount; t++)
    {
        retcode = pthread_join(sockThread[t], (void **)(retval + t));
        if (-1 == retcode)
        {
            printf("ERROR: return code from pthread_join() is %d\n", retcode);
            break;
        }
        else
        {
            nbytes += *(retval[t]);
            joincount++;
        }
    }
    if (joincount != threadcount)
    {
        printf("exiting in incomplete state..\n");
    }
    /* Get real time */
    tvsub( &td, &eTime, &sTime );
    realt = td.tv_sec + ((double)td.tv_usec) / 1000000;
    printf("%.0f bytes in %.2f real seconds = %.2f Mbit/sec \n", nbytes, realt, (nbytes/realt) * 8.0 / 1024.0 / 1024.0);
    pthread_exit(NULL);
    return 0;
}
/*****************************************************************************
* Receive TCP data until the specified buffer size
*
******************************************************************************/
static int tcp_recv(int cepid, char *buf, int size)
{
    int ercd, i;
    for (i = 0; i < size; i += ercd) {
        ercd = recv(cepid, buf + i, size - i, 0);
        if (ercd <= 0) {
            break;
        }
    }
    return i;   /* received data length */
}

/* ******************************************************************
FUNCTION NAME: tClient
INPUT PARAMETER: port number is passed as an argument
RETURN VALUE: void
DESCRIPTION: this is the thread handler containing the client body,
         the thread spawned from main contains this body forming
         paralel client requesting the server.
********************************************************************* */
void *tServer(void *p)
{
    int len, consd, total;
    consd = *(int *)p;
/*    printf("connected socket no: %d\n", *(int *)p); */
    if (start_count == 0) {
        gettimeofday(&sTime, NULL);
    }
    total = 0;
    for (;;)
    {
        len = tcp_recv(consd, (char *)recvbuff, BUFLEN);
        if (len < BUFLEN) {
            if (len > 0)
                total += len;
            break;
        }
        total += len;
    }
    printf("Total data: %d\n", total);
    *(int *)p = total;
    gettimeofday(&eTime, NULL);
    close(consd);
    pthread_exit(p);
    return NULL;
}
 

No comments: