libc/winsup/cygserver/threaded_queue.cc

251 lines
6.1 KiB
C++

/* threaded_queue.cc
Copyright 2001, 2002 Red Hat Inc.
Written by Robert Collins <rbtcollins@hotmail.com>
This file is part of Cygwin.
This software is a copyrighted work licensed under the terms of the
Cygwin license. Please consult the file "CYGWIN_LICENSE" for
details. */
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <windows.h>
#include <sys/types.h>
#include <stdlib.h>
#include "wincap.h"
#include "threaded_queue.h"
#define DEBUG 1
#define debug_printf if (DEBUG) printf
/* threaded_queue */
DWORD WINAPI
worker_function (LPVOID LpParam)
{
class threaded_queue *queue = (class threaded_queue *) LpParam;
class queue_request *request;
/* FIXME use a threadsafe pop instead for speed? */
while (queue->active)
{
EnterCriticalSection (&queue->queuelock);
while (!queue->request && queue->active)
{
LeaveCriticalSection (&queue->queuelock);
DWORD rc = WaitForSingleObject (queue->event, INFINITE);
if (rc == WAIT_FAILED)
{
printf ("Wait for event failed\n");
queue->running--;
ExitThread (0);
}
EnterCriticalSection (&queue->queuelock);
}
if (!queue->active)
{
queue->running--;
LeaveCriticalSection (&queue->queuelock);
ExitThread (0);
}
/* not needed, but it is efficient */
request =
(class queue_request *) InterlockedExchangePointer (&queue->request,
queue->request->
next);
LeaveCriticalSection (&queue->queuelock);
request->process ();
delete request;
}
queue->running--;
ExitThread (0);
}
void
threaded_queue::create_workers ()
{
InitializeCriticalSection (&queuelock);
if ((event = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL)
{
printf ("Failed to create event queue (%lu), terminating\n",
GetLastError ());
exit (1);
}
active = true;
/* FIXME: Use a stack pair and create threads on the fly whenever
* we have to to service a request.
*/
for (unsigned int i = 0; i < initial_workers; i++)
{
HANDLE hThread;
DWORD tid;
hThread = CreateThread (NULL, 0, worker_function, this, 0, &tid);
if (hThread == NULL)
{
printf ("Failed to create thread (%lu), terminating\n",
GetLastError ());
exit (1);
}
CloseHandle (hThread);
running++;
}
}
void
threaded_queue::cleanup ()
{
/* harvest the threads */
active = false;
/* kill the request processing loops */
queue_process_param *reqloop;
/* make sure we don't race with a incoming request creation */
EnterCriticalSection (&queuelock);
reqloop =
(queue_process_param *) InterlockedExchangePointer (&process_head, NULL);
while (reqloop)
{
queue_process_param *t = reqloop;
reqloop = reqloop->next;
delete t;
}
LeaveCriticalSection (&queuelock);
if (!running)
return;
printf ("Waiting for current queue threads to terminate\n");
for (int n = running; n; n--)
PulseEvent (event);
while (running)
sleep (1);
DeleteCriticalSection (&queuelock);
CloseHandle (event);
}
/* FIXME: return success or failure */
void
threaded_queue::add (queue_request * therequest)
{
/* safe to not "Try" because workers don't hog this, they wait on the event
*/
EnterCriticalSection (&queuelock);
if (!running)
{
printf ("No worker threads to handle request!\n");
}
if (!request)
request = therequest;
else
{
/* add to the queue end. */
queue_request *listrequest = request;
while (listrequest->next)
listrequest = listrequest->next;
listrequest->next = therequest;
}
PulseEvent (event);
LeaveCriticalSection (&queuelock);
}
/* FIXME: return success or failure rather than quitting */
void
threaded_queue::process_requests (queue_process_param * params,
threaded_queue_thread_function *
request_loop)
{
if (params->start (request_loop, this) == false)
exit (1);
params->next =
(queue_process_param *) InterlockedExchangePointer (&process_head,
params);
}
/* queue_process_param */
/* How does a constructor return an error? */
queue_process_param::queue_process_param (bool ninterruptible):running (false), shutdown (false),
interruptible
(ninterruptible)
{
if (!interruptible)
return;
debug_printf ("creating an interruptible processing thread\n");
if ((interrupt = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL)
{
printf ("Failed to create interrupt event (%lu), terminating\n",
GetLastError ());
exit (1);
}
}
queue_process_param::~queue_process_param ()
{
if (running)
stop ();
if (!interruptible)
return;
CloseHandle (interrupt);
}
bool
queue_process_param::start (threaded_queue_thread_function * request_loop,
threaded_queue * thequeue)
{
queue = thequeue;
hThread = CreateThread (NULL, 0, request_loop, this, 0, &tid);
if (hThread)
{
running = true;
return true;
}
printf ("Failed to create thread (%lu), terminating\n", GetLastError ());
return false;
}
void
queue_process_param::stop ()
{
if (interruptible)
{
InterlockedExchange (&shutdown, true);
PulseEvent (interrupt);
/* Wait up to 50 ms for the thread to exit. If it doesn't _and_ we get
* scheduled again, we print an error and exit. We _should_ loop or
* try resignalling. We don't want to hand here though...
*/
int n = 5;
while (n-- && WaitForSingleObject (hThread, 1000) == WAIT_TIMEOUT);
if (!n)
{
printf ("Process thread didn't shutdown cleanly after 200ms!\n");
exit (1);
}
else
running = false;
}
else
{
printf ("killing request loop thread %ld\n", tid);
int rc;
if (!(rc = TerminateThread (hThread, 0)))
{
printf ("error shutting down request loop worker thread\n");
}
running = false;
}
CloseHandle (hThread);
}
/* queue_request */
queue_request::queue_request ():next (NULL)
{
}
void
queue_request::process (void)
{
printf ("\n**********************************************\n"
"Oh no! we've hit the base queue_request process() function, and this indicates a coding\n"
"fault !!!\n" "***********************************************\n");
}