Add Book to My BookshelfPurchase This Book Online

Chapter 3 - Synchronizing Pthreads

Pthreads Programming
Bradford Nichols, Dick Buttlar and Jacqueline Proulx Farrell
 Copyright © 1996 O'Reilly & Associates, Inc.

Synchronization in the ATM Server
To wrap up our discussion of mutex and condition variables, we'll return to our ATM example. In our discussion of mutexes earlier in this chapter, we added a single mutex to the example to protect the bank account database. As we noted at the time, this isn't the best way to impose synchronization inasmuch as it allows only one thread to access the database at a time.
In this section, we'll provide a more optimal solution to our ATM server's synchronization problems. We'll focus on the following three areas:
 Synchronizing access to the bank account database
 Limiting the number of concurrent worker threads
 Controlling the shutdown of the server
We'll continue to use mutex variables to synchronize access to account data. Imposing a limit on the number of simultaneously active worker threads and controlling server shutdown are event-driven tasks; we'll use both mutexes and condition variables when implementing them.
We first encountered the ATM server example in Chapter 2. We designed it according to the classic boss/worker model for a multithreaded program. In our server, the boss creates a new thread for each request it receives (be it a deposit, withdrawal, or balance inquiry), and the worker thread processes the request independently of the boss or any other worker thread. We've done only half the job by creating threads and adding concurrency to the server. Now we'll finish up by adding robust and efficient synchronization mechanisms.
Synchronizing Access to Account Data
Our multithreaded ATM server must contend with many potential race conditions between worker threads accessing account data. We expect its deposit and withdraw operations to be atomic. Because of this, in Example 3-1 we added a single mutex to the server to protect the integrity of the accounts database.
Although simple, this approach has major performance limitations. Because every worker thread accesses the database and only one at a time can lock the mutex, only one thread can be executing each time an account balance changes. When the server is heavily loaded, the new result is that it behaves very like a single-threaded program. Because different requests frequently access different accounts in the database, multiple requests could often execute at the same time without interfering with each other. In this light, the single-mutex approach is overly conservative.
A much more appropriate solution would be to use finer-grained locking on our data. Thus, we'll associate a mutex with each database account.
The cleanest way to proceed with this decision would be to redesign the ATM server's database module to include mutexes in the account structures themselves. For our purposes, let's assume that the database module is legacy code and we can't—or don't want to—modify it. Instead, we'll place the mutex in a separate structure outside the database module.
In the code fragment in Example 3-15, we'll implement our locking scheme. We'll globally define an array of mutex variables, called account_mutex, that has an element for each account. Because accounts have IDs between zero and MAX_NUM_ACCOUNTS, we'll use the account ID as an index into the mutex array. The server's main routine will initialize the mutex array by calling atm_server_init.
Example 3-15: Initializing per-account locks for the ATM database (atm_svr.c)
pthread_mutex_t account_mutex[MAX_NUM_ACCOUNTS];
.
.
.
void atm_server_init(int argc, char **argv)
{
  .
  .
  .
  for (i = 0; i < MAX_NUM_ACCOUNTS; i++)
    pthread_mutex_init(&account_mutex[i], NULL);
  .
  .
  .
}
Now, with this set of mutexes, a worker thread need lock only the mutex for the specific account it is accessing. It no longer needs to lock up the entire database; other threads can concurrently lock other mutexes and access other accounts, as shown in Example 3-16.
Example 3-16: Using Per-Account Locks for the ATM Database (atm_svr.c)
void deposit(char *req_buf, char *resp_buf)
{
  int rtn;
  int temp, id, password, amount;
  account_t *accountp;
  /* Parse input string */
  sscanf(req_buf, "%d %d %d %d ", &temp, &id, &password, &amount);
  /* Check inputs */
  if ((id < 0) || (id >= MAX_NUM_ACCOUNTS)) {
    sprintf(resp_buf, "%d %s", TRANS_FAILURE, ERR_MSG_BAD_ACCOUNT);
    return;
  }
  pthread_mutex_lock(&account_mutex[id]);
  /* Retrieve account from database */
  if ((rtn = retrieve_account( id, &accountp)) < 0) {
    sprintf(resp_buf, "%d %s", TRANS_FAILURE, atm_err_tbl[-rtn]);
    .
    .
    .
    /* Code to update and access account balance. */
  }
  pthread_mutex_unlock(&account_mutex[id]);
}
The thread that runs our create_open routine to create a new account poses a special problem. Which mutex should it lock? The account doesn't exist yet, and the worker thread has no account ID to use!
Let's look at how the database layer of our ATM server actually creates a new account.
The database contains a list of potential accounts, each with a flag indicating whether or not it's in use. The new_account routine looks for the first account whose in-use flag is clear, sets the flag, and plugs in the information about the new account.
Here is fertile ground for a classic race condition. If two threads execute new_account concurrently, they could interleave their flag-reading and flag-setting. Both could return with the same account ID for two different customer accounts—not a good idea. To remove this hazard, we'll need an additional mutex.
The revision of the create_account routine in Example 3-17 shows the new mutex. Any thread wishing to add an account must hold this mutex (which we've globally defined) before proceeding.
Example 3-17: A Special Mutex for Opening New Accounts (atm_svr.c)
pthread_mutex_t create_account_mutex = PTHREAD_MUTEX_INITIALIZER;
.
.
.
void create_account(char *resp_buf)
{
  int id;
  int rtn;
  account_t *accountp;
  pthread_mutex_lock(&create_account_mutex);
  /* Get a new account */
  if ((rtn = new_account(&id, &accountp)) < 0) {
    sprintf(resp_buf, "%d %d %d %s", TRANS_FAILURE, -1, -1, atm_err_tbl[-rtn]);
    .
    .
    .
  }
  pthread_mutex_unlock(&create_account_mutex);
}
Note that deleting an account will work the same way. There is symmetry in creating an object and destroying an object: both require the same kind of protection.
Limiting the Number of Worker Threads
Our next synchronization task will be to limit the number of worker threads that can exist at a single time. There are some good reasons for doing so. On some operating systems, the kernel manages threads as separate contenders for the CPU, just as it manages processes. These systems must limit the number of threads each user may run at a time. Even if your system imposes no limit or an extremely high one, you reach a practical limit at the point you find that you're getting diminishing returns by creating more and more threads.* We'll examine this phenomenon further in our performance measurements in Chapter 6.
 *Thread pools don't have this problem. The number of worker threads is determined and fixed at initialization. At this point, the worker threads are created, and they live for the duration of the program.
To limit the number of worker threads, we'll need to keep a count of them. Both boss and worker threads must access this counter. The boss thread increments it when it creates a new worker, and each worker decrements it when it exits. We'll synchronize access to the counter using a mutex.
In Example 3-18, we'll modify our ATM to add a worker_info structure. It'll include a counter (num_active), a mutex (num_active_mutex), and a condition variable (thread_exit_cv). The server's main routine will set the counter to zero and initialize the mutex and the condition variable.
Example 3-18: Limiting the Number of Worker Threads—Boss (atm_svr.c)
#define MAX_NUM_THREADS 10
.
.
.
typedef struct {
  int             num_active;
  pthread_cond_t  thread_exit_cv;
  pthread_mutex_t mutex;
}thread_info_t;
thread_info_t worker_info;
.
.
extern int
main(argc, argv)
int argc;
char **argv;
{
  workorder_t *workorderp;
  pthread_t   *worker_threadp;
  int  conn;
  int  trans_id;
  atm_server_init(argc, argv);
  for(;;) {
    /*** Wait for a request ***/
    workorderp = (workorder_t *)malloc(sizeof(workorder_t));
    server_comm_get_request(&workorderp->conn, workorderp->req_buf);
    .
    .
    .
    /*** Have we exceeded our limit of active threads ? ***/
    pthread_mutex_lock(&worker_info.mutex);
    while (worker_info.num_active == MAX_NUM_THREADS) {
      pthread_cond_wait(&worker_info.thread_exit_cv,
                &worker_info.mutex);
    }
    worker_info.num_active++;
    pthread_mutex_unlock(&worker_info.mutex);
    /*** Spawn a thread to process this request ***/
    pthread_create(worker_threadp, ...
    .
    .
    .
}
  server_comm_shutdown();
  return 0;
}
Now, when the boss thread receives a request, it locks the worker_info mutex and checks the count of active workers before creating a new worker thread. If the number of active workers has not yet reached its limit, the boss increments the counter, unlocks the mutex, and continues. If the limit has been reached, the boss waits on the thread_exit_cv condition variable. When the condition is signaled the boss wakes up and rechecks the counter. If the count of active workers is now below the limit, the boss increments the counter, unlocks the mutex, and continues.
In Example 3-19, we'll adjust the process_request code our worker threads execute.
Example 3-19: Limiting the Number of Worker Threads—Workers (atm_svr.c)
void process_request(workorder_t *workorderp)
{
  char resp_buf[COMM_BUF_SIZE];
  int  trans_id;
  sscanf(workorderp->req_buf, "%d", &trans_id);
  switch(trans_id) {
      case OPEN_ACCT_TRANS:
           open_account(resp_buf);
           break;
           .
           .
           .
      }
  server_comm_send_response(workorderp->conn, resp_buf);
  free(workorderp);
  pthread_mutex_lock(&worker_info.mutex);
  worker_info.num_active--;
  if (worker_info.num_active == (MAX_NUM_THREADS - 1))
     pthread_cond_signal(&worker_info.thread_exit_cv);
  pthread_mutex_unlock(&worker_info.mutex);
}
Each worker thread must decrement the active worker count when it exits. It does this in the process_request routine. If it finds that it has decremented the counter to one less than the limit, it calls pthread_cond_signal to signal thethread_exit_cv condition variable to the waiting boss thread.
Synchronizing a Server Shutdown
In the current version of our ATM server, the boss thread runs our program's main routine. When the boss thread finishes main, the system terminates the process and all its threads, including those worker threads that are still processing active requests. We can't allow this to happen, so our final synchronization task will be to handle server shutdown more gracefully.
To make sure that all worker threads get to complete active tasks before the boss thread exits main, we'll reuse the active worker counter and the thread_exit_cv condition variable. When we used them to control the number of concurrent workers, the boss thread requested a signal when the active worker count was one less than the active worker limit. This time, the boss will request the signal when the active worker count reaches zero. (Of course, at some time, the boss will stop creating new threads so that this can eventually happen.) We'll modify the main routine in the boss thread, as shown in Example 3-20.
Example 3-20: Processing a Shutdown in the Boss Thread (atm_svr.c)
extern int
main(argc, argv)
int argc;
char **argv;
{
  workorder_t *workorderp;
  pthread_t   *worker_threadp;
  int  conn;
  int  trans_id;
  atm_server_init(argc, argv);
  for(;;) {
    /*** Wait for a request ***/
    workorderp = (workorder_t *)malloc(sizeof(workorder_t));
    server_comm_get_request(&workorderp->conn, workorderp->req_buf);
    /*** Is it a shutdown request? ***/
    sscanf(workorderp->req_buf, "%d", &trans_id);
    if (trans_id == SHUTDOWN)
      char resp_buf[COMM_BUF_SIZE];
      pthread_mutex_lock(&worker_info.mutex);
      /* Wait for in-progress requests threads to finish */
      while (worker_info.num_active > 0) {
           pthread_cond_wait(&worker_info.thread_exit_cv, &worker_info.mutex);
      }
      pthread_mutex_unlock(&worker_info.mutex);
      /* process it here with main() thread */
      if (shutdown_req(workorderp->req_buf, resp_buf)) {
            server_comm_send_response(workorderp->conn, resp_buf);
            free(workorderp);
            break;
      }
    }
    /*** Have we exceeded our limit of active threads ? ***/
    pthread_mutex_lock(&worker_info.mutex);
    .
    .
    .
  }
  server_comm_shutdown();
  return 0;
}
When the boss thread receives a shutdown request, it locks the worker_info mutex and checks the active worker counter. If the active worker counter is zero, the boss unlocks the mutex, runs a cleanup function, and leaves the main loop, thus terminating the program. If the counter is greater than zero, the boss must wait for the thread_exit_cv condition variable to be signaled. When it's awakened, the boss rechecks the active worker count. If the final worker has exited, the count is zero, and the boss proceeds to shut down the program. If not, the boss must wait on the condition variable again.
We'll modify our process_request routine in Example 3-21 so that each worker thread signals the thread_exit_cv condition variable before it exits, as well as when it decrements the worker count to one below the limit.
Example 3-21: Processing a Shutdown in the Worker Thread (atm_svr.c)
process_request(...)
{
  .
  .
  .
  server_comm_send_response(workorderp->conn, resp_buf);
  free(workorderp);
  pthread_mutex_lock(&worker_info.mutex);
  worker_info.num_active--;
  pthread_cond_signal(&worker_info.thread_exit_cv);
  pthread_mutex_unlock(&worker_info.mutex);
}
This works fine but is a bit inefficient. Although the boss can proceed with program shutdown only when the last worker has exited, each exiting worker thread will wake it up (and it will go right back to sleep) until the last worker decrements the active worker counter to 0. If ten worker threads are active when their boss receives the shutdown request, the boss will wake up and reenter its wait nine times before it can finally do something useful! We could fix this. Instead of using the thread_exit_cv condition variable for shutdown handling, we could define a new condition variable to indicate when the active worker count reaches zero. As it exits, each worker would call pthread_cond_signal on our new condition variable if it notices that the count has become zero. If the boss thread is waiting on the condition, it will wake up and shut down the program.

Previous SectionNext Section
Books24x7.com, Inc © 2000 –  Feedback