5 approaches to handle I/O operations in any system – Part 2



Introduction

In this series, we will dive deep into the low-level system to understand what input/output (I/O) is.

You can find the part 1 here.

This is the part 2 of this series:

This part will dissect the multiplexing I/O and signal-driven I/O models.


Multiplexing I/O model

In Linux, to implement a multiplexing I/O model, we begin by using the select or poll system call to check the state of file descriptors (FDs). Next, we invoke the recvfrom system call to retrieve data from the kernel, which is then copied from kernel space to user space.

While the non-blocking I/O model can make multiple requests to ask for the availability of only one file descriptor, the multiplexing I/O model can use select or pull syscalls to monitor a list of file descriptors (FDs) such as sockets, files, pipes, etc. Once the data is ready to read from a socket, for example, the kernel will notify the application by returning success to the select syscall. Then it uses recvfrom syscall to read the copied data diagram from the socket by the kernel.

Compared to the blocking I/O model or non-blocking I/O, it still blocks the process when waiting for the availability of the data in the socket and the process of getting the data from the socket. Moreover, it takes two syscalls to achieve the same result as the blocking I/O model and the non-blocking I/O model. The only advantage it has is the ability to monitor multiple file descriptors at once, which is infeasible in a blocking or non-blocking I/O model. I must clarify that I am referring to the use of a single thread only, as it is possible to monitor multiple FDs in a blocking or non-blocking I/O model that uses multiple threads to make these calls.

Show me the code

You can find the full source code here: https://github.com/hexknight01/go-io-model-demo/blob/master/multiplexing-io

if (setsockopt(sockets[i], SOL_SOCKET, SO_REUSEPORT, &optval,     
    sizeof(optval)) == -1) {
    perror("setsockopt(SO_REUSEPORT) failed");
    close(sockets[i]);
    return 1;
}

Since Linux 3.19, it is possible to bind several sockets to the same port of a server using SO_REUSEPORT option (you can read more about this option here: SO_REUSEPORT). However, it is the operating system’s responsibility to route requests across configured sockets and hence it is not guaranteed to distribute requests to each socket equally. For this reason, it is hard for me to demo the capability of multiplexing I/O if I only bind multiple sockets to the same port. Hence, I will create 5 sockets with a port range from 8081 to 8085 for simplicity.

ObjectObjective
ClientSend requests to 5 sockets with port from 8081 to 8085
ServerListening to these 5 sockets using Linux poll(2) syscall and copy the data from kernel space to application space and print out the message.
Server.c
int read_multiplexing_IO() {
    int MAX_EVENTS = 10;
    int BUFFER_SIZE = 1024;
    int MAX_SOCKETS = 5;
    int sockets[MAX_SOCKETS];
    char applicationBuffer[BUFFER_SIZE];
    struct sockaddr_in server_addr, client_addr;
    socklen_t client_len = sizeof(client_addr);
    // Initialize socket array
    for (int i = 0; i < MAX_SOCKETS; i++) {
        sockets[i] = socket(AF_INET, SOCK_DGRAM, 0);
        if (sockets[i] == -1) {
            perror("socket");
            return 1;
        }
        // Define server address for each socket
        server_addr.sin_family = AF_INET;
        server_addr.sin_addr.s_addr = INADDR_ANY;
        server_addr.sin_port = htons(8081 + i); // Assign a different port for each socket

        // Bind the socket to the specified port and IP address
        if (bind(sockets[i], (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
            perror("bind failed");
            close(sockets[i]);
            return 1;
        }

        printf("Socket %d is running on port %d...\n", i, 8081 + i);
    }
    // Set up the poll structure
    /*
        struct pollfd {
            int   fd;          file descriptor 
            short events;      requested events 
            short revents;     returned events 
        };
    */
   // https://man7.org/linux/man-pages/man2/poll.2.html
    struct pollfd fds[MAX_SOCKETS];
    for (int i = 0; i < MAX_SOCKETS; i++) {
        fds[i].fd = sockets[i];
        fds[i].events = POLLIN; // POLLIN signal rerpresents data is avaialble to read.
    }
    while (1) {
        printf("Waiting for client");
        int poll_count = poll(fds, MAX_SOCKETS, -1); // Wait indefinitely for events
        if (poll_count == -1) {
            perror("poll failed");
            break;
        }

        for (int i = 0; i < MAX_SOCKETS; i++) {
            if (fds[i].revents & POLLIN) {
                // Data is ready to be read on socket i
                int received_byte = recvfrom(sockets[i], applicationBuffer, sizeof(applicationBuffer) - 1, 0,
                                             (struct sockaddr *)&client_addr, &client_len);
                if (received_byte > 0) {
                    applicationBuffer[received_byte] = '\0';  // Null-terminate the string
                    printf("Received message on socket %d: %s\n", i, applicationBuffer);

                    char client_ip[INET_ADDRSTRLEN];
                    inet_ntop(AF_INET, &client_addr.sin_addr, client_ip, sizeof(client_ip));
                    printf("Message from client on socket %d: %s:%d\n", i, client_ip, ntohs(client_addr.sin_port));
                } else if (received_byte == -1) {
                    perror("recvfrom failed");
                }
            }
        }
    }
    return 0;
}
Client.c
int client_socket;
int message_count = 5000;  // Number of messages to send
int SERVER_PORT =  8081;
char SERVER_IP[] = "127.0.0.1";
int BUFFER_SIZE = 1024;
int PORT_RANGE = 5;
struct sockaddr_in server_addr;
char buffer[BUFFER_SIZE];
 

// Create a UDP socket
client_socket = socket(AF_INET, SOCK_DGRAM, 0);
if (client_socket < 0) {
    perror("Socket creation failed");
    exit(EXIT_FAILURE);
}

// Set up the server address structure
server_addr.sin_family = AF_INET;
if (inet_pton(AF_INET, SERVER_IP, &server_addr.sin_addr) <= 0) {
    perror("Invalid server IP address");
    close(client_socket);
    exit(EXIT_FAILURE);
}

// Send multiple messages to the server
for (int i = 0; i < message_count; i++) {
    int random_port = SERVER_PORT + (rand() % PORT_RANGE); // Random port between 8081 and 8085
    server_addr.sin_port = htons(random_port);
    snprintf(buffer, BUFFER_SIZE, "Message %d from client", i + 1);

    ssize_t sent_bytes = sendto(client_socket, buffer, strlen(buffer), 0,
                                (struct sockaddr *)&server_addr, sizeof(server_addr));
    if (sent_bytes < 0) {
        perror("sendto failed");
    } else {
        printf("Sent: %s\n", buffer);
    }
}
Result
Received message on socket 4: Message 4998 from client
Message from client on socket 4: 127.0.0.1:63620
Waiting for clientReceived message on socket 0: Message 4948 from client
Message from client on socket 0: 127.0.0.1:63620
Received message on socket 1: Message 4974 from client
Message from client on socket 1: 127.0.0.1:63620
Received message on socket 2: Message 4886 from client
Message from client on socket 2: 127.0.0.1:63620
Waiting for clientReceived message on socket 0: Message 4951 from client
Message from client on socket 0: 127.0.0.1:63620
Received message on socket 1: Message 4975 from client
Message from client on socket 1: 127.0.0.1:63620
Received message on socket 2: Message 4889 from client
Message from client on socket 2: 127.0.0.1:63620

Signal-driven I/O model

At this point, you’ve learned that blocking, non-blocking, and even multiplexing I/O models still require the main thread to fetch the state of the socket(s) when data is available to read. You might wonder, “Why do I have to poll sockets for their state, which blocks the main thread? Is there a better way to handle this?”

The answer is yes. Instead of polling the state of the sockets, why not have the kernel allow sockets themselves to notify the application when data becomes available? This approach is called the event-driven I/O model, where sockets are empowered to send a signal indicating that data is ready to be read.

As shown in the figure above, the process can continue performing other tasks without waiting for data to become available in the sockets. This is a significant improvement, as it allows the main thread to handle other tasks concurrently in the main loop.

To use this I/O mode, Linux support SIGIO signal which is an indication of possible I/O. The SIGIO system call is implemented following the POSIX standards in Linux.

Show me the code

I am going to use sigaction system call to wait for the SIGIO signal to be sent from one of the sockets. sigaction allows you to specify a custom function (signal handler) that will be called when a specific signal is received by the process. You can find the manual of the sigaction here

I put the source code of this demo here

ObjectObjective
ClientSend requests to 5 sockets with ports from 8081 to 8085
ServerWaiting for the SIGIO to be sent from one of the sockets when data is available to read. Upon receiving the signal, the application uses recevfrom system call to copy the data from kernel space to application space and prints out the message.
server.c
#define MAX_SOCKETS 5
#define BUFFER_SIZE 1024

int sockets[MAX_SOCKETS];
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);

// Signal handler for SIGIO
void handle_sigio(int signo) {
    char applicationBuffer[BUFFER_SIZE];
    for (int i = 0; i < MAX_SOCKETS; i++) {
        int received_byte = recvfrom(sockets[i], applicationBuffer, sizeof(applicationBuffer) - 1, 0,
                                     (struct sockaddr *)&client_addr, &client_len);
        if (received_byte > 0) {
            applicationBuffer[received_byte] = '\0'; // Null-terminate the string
            printf("Received message on socket %d: %s\n", i, applicationBuffer);

            char client_ip[INET_ADDRSTRLEN];
            inet_ntop(AF_INET, &client_addr.sin_addr, client_ip, sizeof(client_ip));
            printf("Message from client on socket %d: %s:%d\n", i, client_ip, ntohs(client_addr.sin_port));
        } else if (received_byte == -1 && errno != EWOULDBLOCK) {
            perror("recvfrom failed");
        }
    }
}

int read_event_driven_IO() {
    struct sockaddr_in server_addr;

    // Initialize sockets
    for (int i = 0; i < MAX_SOCKETS; i++) {
        sockets[i] = socket(AF_INET, SOCK_DGRAM, 0);
        if (sockets[i] == -1) {
            perror("socket");
            return 1;
        }

        // Define server address for each socket
        server_addr.sin_family = AF_INET;
        server_addr.sin_addr.s_addr = INADDR_ANY;
        server_addr.sin_port = htons(8081 + i);

        // Bind the socket to the specified port and IP address
        if (bind(sockets[i], (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
            perror("bind failed");
            close(sockets[i]);
            return 1;
        }

        printf("Socket %d is running on port %d...\n", i, 8081 + i);

        // Set socket to non-blocking mode
        int flags = fcntl(sockets[i], F_GETFL, 0);
        fcntl(sockets[i], F_SETFL, flags | O_NONBLOCK);

        // Set the process as the owner of the socket for signals
        fcntl(sockets[i], F_SETOWN, getpid());

        // Enable asynchronous I/O
        fcntl(sockets[i], F_SETFL, flags | O_ASYNC);
    }

    // Install signal handler for SIGIO
    struct sigaction sa;
    sa.sa_handler = handle_sigio;
    sa.sa_flags = 0;
    sigemptyset(&sa.sa_mask);
    if (sigaction(SIGIO, &sa, NULL) == -1) {
        perror("sigaction");
        return 1;
    }

    while (1) { 
        printf("Doing something else while waiting for data in sockets\n");
        sleep(1);
    }
    return 0;
}
client.c
    int client_socket;
    int message_count = 100;  // Number of messages to send
    int SERVER_PORT =  8081;
    char SERVER_IP[] = "127.0.0.1";
    int BUFFER_SIZE = 1024;
    int PORT_RANGE = 5;
    struct sockaddr_in server_addr;
    char buffer[BUFFER_SIZE];
   

    // Create a UDP socket
    client_socket = socket(AF_INET, SOCK_DGRAM, 0);
    if (client_socket < 0) {
        perror("Socket creation failed");
        exit(EXIT_FAILURE);
    }

    // Set up the server address structure
    server_addr.sin_family = AF_INET;
    if (inet_pton(AF_INET, SERVER_IP, &server_addr.sin_addr) <= 0) {
        perror("Invalid server IP address");
        close(client_socket);
        exit(EXIT_FAILURE);
    }

    // Send multiple messages to the server
    for (int i = 0; i < message_count; i++) {
        int random_port = SERVER_PORT + (rand() % PORT_RANGE); // Random port between 8081 and 8085
        server_addr.sin_port = htons(random_port);
        snprintf(buffer, BUFFER_SIZE, "Message %d from client", i + 1);

        ssize_t sent_bytes = sendto(client_socket, buffer, strlen(buffer), 0,
                                    (struct sockaddr *)&server_addr, sizeof(server_addr));
        if (sent_bytes < 0) {
            perror("sendto failed");
        } else {
            printf("Sent: %s\n", buffer);
        }
    }

    printf("All messages sent.\n");

    // Close the socket
    close(client_socket);

    return 0;
Result

In the result snippet below, you can witness that the main thread is not blocked at all while waiting for data to be sent to the sockets. This can make the main thread doing something else without being in idle state.

➜  server git:(master) ✗ gcc -o program main.c eventdrivenio.c
➜  server git:(master) ✗ ./program
Socket 0 is running on port 8081...
Socket 1 is running on port 8082...
Socket 2 is running on port 8083...
Socket 3 is running on port 8084...
Socket 4 is running on port 8085...
Doing something else while waiting for data in sockets
Doing something else while waiting for data in sockets
Doing something else while waiting for data in sockets
Doing something else while waiting for data in sockets
Received message on socket 0: Message 5 from client
Message from client on socket 0: 127.0.0.1:57347
Received message on socket 1: Message 29 from client
Message from client on socket 1: 127.0.0.1:57347
Received message on socket 2: Message 1 from client
Message from client on socket 2: 127.0.0.1:57347
Received message on socket 3: Message 3 from client
Message from client on socket 3: 127.0.0.1:57347
Received message on socket 4: Message 2 from client
Message from client on socket 4: 127.0.0.1:57347
Received message on socket 0: Message 11 from client

The journey keeps continuing

“In this part 2, I have illustrated what event-driven I/O and the multiplexing I/O models are, along with examples for each. These models are essential for efficiently handling multiple I/O operations concurrently, making them indispensable for building scalable systems. In the next post of this series, I will explore one more I/O model and a game-changing system call called epoll, which revolutionized the industry and its applications (truly the most fascinating one!). Stay tuned.”

Thank you for reading my article.

References:

https://notes.shichao.io/unp/ch6/#io-models

https://man7.org/linux/man-pages/man7/epoll.7.html

The Linux Programming Interface – Book by Michael Kerrisk


0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments