Adventures with \Device\Afd - a simple client

Page content

Now that I have a reasonably easy to use event-driven socket class I can start to use it. The first thing to build is a simple client. This will connect to a server and expect that server to echo anything that is sent to it. The client will send data and read it back and compare it with what it sent.

A simple, single-threaded, client

Last time, I put together a tcp_socket object that made it easier to work with the \Device\Afd interface that I’ve been playing with. The underlying code can only handle a single connection at present, but before we begin to support multiple, concurrent connections it’s a good idea to see if the design I’ve come up with is actually usable.

Code

Full source can be found here on GitHub.

This article refers to the echo_client code and, specifically, commit 12d70f4.

This isn’t production code, error handling is simply “panic and run away”.

This code is licensed with the MIT license.

The aim here is to write code that uses the tcp_socket to make sure that we can use it…

The following code uses an echo_client object, which, itself, uses the tcp_socket that I put together last time. We use this client to connect to a server that will echo our input back to us, we can use the simple example server from here. All we need to do is create the client object, give it an address to connect to and tell it to connect.

int main(int argc, char **argv)
{
   InitialiseWinsock();

   try
   {
      const auto handles = CreateAfdAndIOCP();

      afd_system afd(handles.afd);

      afd_handle handle(afd, 0);

      const int number_of_messages = 10;

      echo_client client(instance, number_of_messages);

      sockaddr_in address{};

      address.sin_family = AF_INET;
      address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
      address.sin_port = htons(5050);

      client.connect(reinterpret_cast<const sockaddr &>(address), sizeof address);

We then process events for it and wait until it is finished. As with the unit tests that we used to explore the API, we avoid any complex threading by doing everything on the same thread. This doesn’t scale but it does work and is easier to debug and reason about than code with a thread pool to service the I/O completion port and the code is easy… Simply loop, calling handle_events() as events become available, until the client is done.


      while (!client.done())
      {
         // process events

         afd_system *pAfd = GetCompletionAs<afd_system>(handles.iocp, INFINITE);

         if (pAfd)
         {
            pAfd->handle_events();
         }
         else
         {
            throw std::exception("failed to process events");
         }
      }
   }
   catch (std::exception &e)
   {
      std::cout << "exception: " << e.what() << std::endl;
   }

   std::cout << "all done" << std::endl;

   return 0;
}

So this is how we want the client to work and, ideally, scaling out would simply involve creating more client objects…

Using the tcp_socket class to create a client

So what does this client object look like. Well, to start with it inherits from and implements the tcp_socket_callback interface that the tcp_socket object uses to report events to users of the socket object. It also has a very small public interface, just connect() and done() which kick off the connection and then allow you to wait for the client to complete its work.

class echo_client : private tcp_socket_callbacks
{
   public :

      echo_client(
         afd_handle afd,
         int number_of_messages);

      ~echo_client() override;

      void connect(
         const sockaddr &address,
         const int address_length);

      bool done() const;

   private :

      void on_connected(
         tcp_socket &s) override;

      void on_connection_failed(
         tcp_socket &s,
         DWORD error) override;

      void on_readable(
         tcp_socket &s) override;

      void on_readable_oob(
         tcp_socket &s) override;

      void on_writable(
         tcp_socket &s) override;

      void on_client_close(
         tcp_socket &s) override;

      void on_connection_reset(
         tcp_socket &s) override;

      void on_disconnected(
         tcp_socket &s) override;

      tcp_socket s;

      bool is_done;

      BYTE send_buffer[100];

      BYTE recv_buffer[sizeof send_buffer];

      int bytes_read;

      const int number_of_messages;

      int number_of_messages_sent;
};

When we construct the client we initialise a send_buffer with a known pattern of data and then use this to send messages to the server. If we connect successfully we begin a process whereby we send a message and then wait for the server to send it back and as it comes back we store it in the recv_buffer. Once we have all of it we compare what the server sent to what we sent and if all is well we send the message again.

We end up with code like this:

void on_connected(
   tcp_socket &s) override
{
   write_data(s);
}

void on_readable(
   tcp_socket &s) override
{
   read_data(s);
}

Where write_data() does writes the send_buffer to the socket and then tries to read some data back. At present we don’t deal with partial sends but if we did then we would need to track how much we had written and then write the rest when we get an on_writable() event. We don’t deal with this here simply because we’re going to address it a little later, once we have a simple server using the same code and we can manipulate the TCP data flow so that we can cause a flow control situation that means we can’t send all the data that we want to send.

void write_data(
   tcp_socket &s)
{
   if (number_of_messages_sent < number_of_messages)
   {
      if (sizeof send_buffer != s.write(send_buffer, sizeof send_buffer))
      {
         // todo, handle partial sends
         throw std::exception("failed to send all data");
      }

      ++number_of_messages_sent;

      read_data(s);
   }
   else
   {
      s.close();

      is_done = true;
   }
}

Reading is equally straight forward. We allow for reading any amount from the socket and track how much we have accumulated. Then we try and read as much as we need to until we either have a complete message or there’s no data available to read. When there’s no data the socket is polled for readability and we return until we get another on_readable() event and then resume from where we were…

void read_data(
   tcp_socket &s)
{
   int bytes_read_this_time = 0;

   do
   {
      int bytes_needed = sizeof recv_buffer - bytes_read;

      bytes_read_this_time = s.read(&recv_buffer[bytes_read], bytes_needed);

      bytes_read += bytes_read_this_time;
   }
   while (bytes_read_this_time && bytes_read < sizeof recv_buffer);

   if (bytes_read == sizeof recv_buffer)
   {
      // validate

      if (0 != memcmp(send_buffer, recv_buffer, bytes_read))
      {
         throw std::exception("validation failed");
      }

      bytes_read = 0;
      memset(recv_buffer, 0, sizeof recv_buffer);

      write_data(s);
   }
}

Wrapping up

Building a simple client over our tcp_socket was fairly painless. There are still some issues to deal with, such as partial writes, but what we have is a good starting point. Keeping everything single-threaded makes it easier to debug and reason about and the “readiness” events work well. Next we’ll write a server, still only supporting a single connection, and then we’ll look at scaling things for multiple connections.

Code

Full source can be found here on GitHub.

This article refers to the echo_client code and, specifically, commit 12d70f4.

This isn’t production code, error handling is simply “panic and run away”.

This code is licensed with the MIT license.

More on AFD