libc/winsup/cygserver/threaded_queue.cc
Corinna Vinschen 282113ba89 Don't use safe_new but new throughout. Fix copyright dates
throughout.
	* Makefile.in: Accomodate all new files and name changes.
	Add a *.d dependency.
	(sbindir): Add.
	(etcdir): Drop in favor of more appropriate sysconfdir definition.
	(sysconfdir): Add.
	(CXXFLAGS): Add -MMD flag. Add SYSCONFDIR definition.
	(.SUFFIXES): Add.
	(install): Add action items.
	(libclean): New target.
	(fullclean): Ditto.
	* bsd_helper.cc: New file.
	* bsd_helper.h: Ditto.
	* bsd_log.cc: Ditto.
	* bsd_log.h: Ditto.
	* bsd_mutex.cc: Ditto.
	* bsd_mutex.h: Ditto.
	* client.cc: Rearrange to build as less as possible if
	__INSIDE_CYGWIN__.
	(client_request::handle_request): Add Message Queue and Semaphore
	handling.
	* cygserver.cc: Rearrange to build as less as possible if
	__INSIDE_CYGWIN__. Use new debug/log/panic logging functions.
	(DEF_CONFIG_FILE): New definition for configuration file.  Use
	throughout.
	(getfunc): Remove.
	(__cygserver__printf): Remove.
	(client_request_attach_tty::serve): Return error if impersonation
	fails.
	(print_usage): Pump up help message.
	(print_version): Add output of default configuration file.
	(main): Accommodate new options.  Allow overwrite of threading options
	from config file.  Call several new initialization functions.  Drop
	printing dots.  Don't define SIGHANDLE inline.
	* cygserver.conf: New file.
	* cygserver_process.h: Rename to process.h.
	* cygserver_transport.h: Rename to transport.h.
	* cygserver_transport_pipes.h: Rename to transport_pipes.h.
	* cygserver_transport_sockets.h: Rename to transport_sockets.h.
	* msg.cc: Rewrite.
	* sem.cc: Rewrite.
	* shm.cc: Rewrite.
	* sysv_msg.cc: New file, derived from FreeBSD version 1.52.
	* sysv_sem.cc: New file, derived from FreeBSD version 1.66.
	* sysv_shm.cc: New file, derived from FreeBSD version 1.89.
	* threaded_queue.cc: Rearrange to build as less as possible if
	__INSIDE_CYGWIN__.
	* transport.cc (transport_layer_base::impersonate_client): Define bool.
	(transport_layer_base::revert_to_self): Ditto.
	* transport.h (transport_layer_base::impersonate_client): Declare bool.
	(transport_layer_base::revert_to_self): Ditto.
	* transport_pipes.cc (transport_layer_pipes::transport_layer_pipes):
	Don't call init_security.
	(init_security): Remove.
	(transport_layer_pipes::accept): Use global sec_all_nih.
	(transport_layer_pipes::connect): Ditto.
	(transport_layer_pipes::impersonate_client): Define bool.
	(transport_layer_pipes::revert_to_self): Ditt.
	* transport_pipes.h (transport_layer_pipes::impersonate_client): Declare
	bool.
	(transport_layer_pipes::revert_to_self): Ditto.
	* woutsup.h: Include bsd compatibility headers.
	(SIGHANDLE): Add definition.
	(__cygserver__printf): Remove definition.
	(__noop_printf): Ditto.
	(debug_printf): Define using debug.
	(syscall_printf): Define using log.
	(system_printf): Ditto.
	Drop all other _printf definitions.
2003-11-19 18:49:41 +00:00

411 lines
9.1 KiB
C++

/* threaded_queue.cc
Copyright 2001, 2002, 2003 Red Hat Inc.
Written by Robert Collins <rbtcollins@hotmail.com>
This file is part of Cygwin.
This software is a copyrighted work licensed under the terms of the
Cygwin license. Please consult the file "CYGWIN_LICENSE" for
details. */
#ifdef __OUTSIDE_CYGWIN__
#include "woutsup.h"
#include <assert.h>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <stdlib.h>
#include "threaded_queue.h"
/*****************************************************************************/
/* queue_request */
queue_request::~queue_request ()
{}
/*****************************************************************************/
/* threaded_queue */
threaded_queue::threaded_queue (const size_t initial_workers)
: _workers_count (0),
_running (false),
_submitters_head (NULL),
_requests_count (0),
_requests_head (NULL),
_requests_sem (NULL)
{
InitializeCriticalSection (&_queue_lock);
// This semaphore's count is the number of requests on the queue.
// The maximum count (129792) is calculated as MAXIMUM_WAIT_OBJECTS
// multiplied by max. threads per process (2028?), which is (a few)
// more requests than could ever be pending with the current design.
_requests_sem = CreateSemaphore (NULL, // SECURITY_ATTRIBUTES
0, // Initial count
129792, // Maximum count
NULL); // Anonymous
if (!_requests_sem)
{
system_printf (("failed to create the request queue semaphore, "
"error = %lu"),
GetLastError ());
abort ();
}
create_workers (initial_workers);
}
threaded_queue::~threaded_queue ()
{
if (_running)
stop ();
debug_printf ("deleting all pending queue requests");
queue_request *reqptr = _requests_head;
while (reqptr)
{
queue_request *const ptr = reqptr;
reqptr = reqptr->_next;
delete ptr;
}
DeleteCriticalSection (&_queue_lock);
if (_requests_sem)
(void) CloseHandle (_requests_sem);
}
/* FIXME: return success or failure rather than quitting */
void
threaded_queue::add_submission_loop (queue_submission_loop *const submitter)
{
assert (this);
assert (submitter);
assert (submitter->_queue == this);
assert (!submitter->_next);
submitter->_next =
TInterlockedExchangePointer (&_submitters_head, submitter);
if (_running)
submitter->start ();
}
bool
threaded_queue::start ()
{
EnterCriticalSection (&_queue_lock);
const bool was_running = _running;
_running = true;
queue_submission_loop *loopptr = _submitters_head;
LeaveCriticalSection (&_queue_lock);
if (!was_running)
{
debug_printf ("starting all queue submission loops");
while (loopptr)
{
queue_submission_loop *const ptr = loopptr;
loopptr = loopptr->_next;
ptr->start ();
}
}
return was_running;
}
bool
threaded_queue::stop ()
{
EnterCriticalSection (&_queue_lock);
const bool was_running = _running;
_running = false;
queue_submission_loop *loopptr = _submitters_head;
LeaveCriticalSection (&_queue_lock);
if (was_running)
{
debug_printf ("stopping all queue submission loops");
while (loopptr)
{
queue_submission_loop *const ptr = loopptr;
loopptr = loopptr->_next;
ptr->stop ();
}
ReleaseSemaphore (_requests_sem, _workers_count, NULL);
while (_workers_count)
{
debug_printf (("waiting for worker threads to terminate: "
"%lu still running"),
_workers_count);
Sleep (1000);
}
debug_printf ("all worker threads have terminated");
}
return was_running;
}
/* FIXME: return success or failure */
void
threaded_queue::add (queue_request *const therequest)
{
assert (this);
assert (therequest);
assert (!therequest->_next);
if (!_workers_count)
{
system_printf ("warning: no worker threads to handle request!");
// FIXME: And then what?
}
EnterCriticalSection (&_queue_lock);
if (!_requests_head)
_requests_head = therequest;
else
{
/* Add to the queue end. */
queue_request *reqptr = _requests_head;
for (; reqptr->_next; reqptr = reqptr->_next)
{}
assert (reqptr);
assert (!reqptr->_next);
reqptr->_next = therequest;
}
_requests_count += 1;
assert (_requests_count > 0);
LeaveCriticalSection (&_queue_lock);
(void) ReleaseSemaphore (_requests_sem, 1, NULL);
}
/*static*/ DWORD WINAPI
threaded_queue::start_routine (const LPVOID lpParam)
{
class threaded_queue *const queue = (class threaded_queue *) lpParam;
assert (queue);
queue->worker_loop ();
const long count = InterlockedDecrement (&queue->_workers_count);
assert (count >= 0);
if (queue->_running)
debug_printf ("worker loop has exited; thread about to terminate");
return 0;
}
/* Called from the constructor: so no need to be thread-safe until the
* worker threads start to be created; thus the interlocked increment
* of the `_workers_count' field.
*/
void
threaded_queue::create_workers (const size_t initial_workers)
{
assert (initial_workers > 0);
for (unsigned int i = 0; i != initial_workers; i++)
{
const long count = InterlockedIncrement (&_workers_count);
assert (count > 0);
DWORD tid;
const HANDLE hThread =
CreateThread (NULL, 0, start_routine, this, 0, &tid);
if (!hThread)
{
system_printf ("failed to create thread, error = %lu",
GetLastError ());
abort ();
}
(void) CloseHandle (hThread);
}
}
void
threaded_queue::worker_loop ()
{
while (true)
{
const DWORD rc = WaitForSingleObject (_requests_sem, INFINITE);
if (rc == WAIT_FAILED)
{
system_printf ("wait for request semaphore failed, error = %lu",
GetLastError ());
return;
}
assert (rc == WAIT_OBJECT_0);
EnterCriticalSection (&_queue_lock);
if (!_running)
{
LeaveCriticalSection (&_queue_lock);
return;
}
assert (_requests_head);
queue_request *const reqptr = _requests_head;
_requests_head = reqptr->_next;
_requests_count -= 1;
assert (_requests_count >= 0);
LeaveCriticalSection (&_queue_lock);
assert (reqptr);
reqptr->process ();
delete reqptr;
}
}
/*****************************************************************************/
/* queue_submission_loop */
queue_submission_loop::queue_submission_loop (threaded_queue *const queue,
const bool ninterruptible)
: _running (false),
_interrupt_event (NULL),
_queue (queue),
_interruptible (ninterruptible),
_hThread (NULL),
_tid (0),
_next (NULL)
{
if (_interruptible)
{
// verbose: debug_printf ("creating an interruptible processing thread");
_interrupt_event = CreateEvent (NULL, // SECURITY_ATTRIBUTES
FALSE, // Auto-reset
FALSE, // Initially non-signalled
NULL); // Anonymous
if (!_interrupt_event)
{
system_printf ("failed to create interrupt event, error = %lu",
GetLastError ());
abort ();
}
}
}
queue_submission_loop::~queue_submission_loop ()
{
if (_running)
stop ();
if (_interrupt_event)
(void) CloseHandle (_interrupt_event);
if (_hThread)
(void) CloseHandle (_hThread);
}
bool
queue_submission_loop::start ()
{
assert (this);
assert (!_hThread);
const bool was_running = _running;
if (!was_running)
{
_running = true;
_hThread = CreateThread (NULL, 0, start_routine, this, 0, &_tid);
if (!_hThread)
{
system_printf ("failed to create thread, error = %lu",
GetLastError ());
abort ();
}
}
return was_running;
}
bool
queue_submission_loop::stop ()
{
assert (this);
assert (_hThread && _hThread != INVALID_HANDLE_VALUE);
const bool was_running = _running;
if (_running)
{
_running = false;
if (_interruptible)
{
assert (_interrupt_event
&& _interrupt_event != INVALID_HANDLE_VALUE);
SetEvent (_interrupt_event);
if (WaitForSingleObject (_hThread, 1000) == WAIT_TIMEOUT)
{
system_printf (("request loop thread %lu failed to shutdown "
"when asked politely: about to get heavy"),
_tid);
if (!TerminateThread (_hThread, 0))
{
system_printf (("failed to kill request loop thread %lu"
", error = %lu"),
_tid, GetLastError ());
abort ();
}
}
}
else
{
// FIXME: could wait to see if the request loop notices that
// the submission loop is no longer running and shuts down
// voluntarily.
debug_printf ("killing request loop thread %lu", _tid);
if (!TerminateThread (_hThread, 0))
system_printf (("failed to kill request loop thread %lu"
", error = %lu"),
_tid, GetLastError ());
}
}
return was_running;
}
/*static*/ DWORD WINAPI
queue_submission_loop::start_routine (const LPVOID lpParam)
{
class queue_submission_loop *const submission_loop =
(class queue_submission_loop *) lpParam;
assert (submission_loop);
submission_loop->request_loop ();
debug_printf ("submission loop has exited; thread about to terminate");
submission_loop->stop ();
return 0;
}
/*****************************************************************************/
#endif /* __OUTSIDE_CYGWIN__ */