crossroads

Git mirror of https://crossroads.e-tunity.com/
git clone git://git.finwo.net/app/crossroads
Log | Files | Refs | LICENSE

commit 65b5c15aaf415b2fa3e1b224dd6ab8ee5d8d28d5
parent 106d478c6f296565889a144abf9a18b724ef77de
Author: finwo <finwo@pm.me>
Date:   Sat,  3 Jan 2026 19:37:04 +0100

2.39

Diffstat:
MChangeLog | 21+++++++++++++++++++++
MMakefile | 2+-
Mxr/Checkers/checkupthread/checkupthread | 1+
Mxr/Checkers/checkupthread/execute.cc | 3+++
Mxr/Checkers/wakeupthread/execute.cc | 2++
Mxr/Checkers/wakeupthread/wakeupthread | 1+
Mxr/DispatchAlgorithms/storedip/storedip | 1+
Mxr/DispatchAlgorithms/storedip/target.cc | 24++++++++++++++----------
Axr/ThreadsAndMutexes/thread/run.cc | 22++++++++++++++++++++++
Mxr/ThreadsAndMutexes/thread/start.cc | 24++++++------------------
Mxr/ThreadsAndMutexes/thread/thread | 5+++++
Axr/ThreadsAndMutexes/threadinfo/threadinfo | 34++++++++++++++++++++++++++++++++++
Axr/ThreadsAndMutexes/threadlist/backend.cc | 5+++++
Axr/ThreadsAndMutexes/threadlist/backendfd.cc | 5+++++
Axr/ThreadsAndMutexes/threadlist/clientfd.cc | 5+++++
Axr/ThreadsAndMutexes/threadlist/deregister1.cc | 7+++++++
Axr/ThreadsAndMutexes/threadlist/deregister2.cc | 6++++++
Axr/ThreadsAndMutexes/threadlist/desc.cc | 5+++++
Axr/ThreadsAndMutexes/threadlist/enregister.cc | 10++++++++++
Axr/ThreadsAndMutexes/threadlist/info.cc | 5+++++
Axr/ThreadsAndMutexes/threadlist/map.cc | 5+++++
Axr/ThreadsAndMutexes/threadlist/threadlist | 25+++++++++++++++++++++++++
Mxr/backend/check.cc | 4++--
Mxr/backend/connect.cc | 9++++++++-
Axr/dispatcher/dispatcher | 51+++++++++++++++++++++++++++++++++++++++++++++++++++
Axr/dispatcher/dispatcher1.cc | 40++++++++++++++++++++++++++++++++++++++++
Axr/dispatcher/dispatcher2.cc | 6++++++
Mxr/dnsentry/dnsentry | 5+++--
Mxr/dnsentry/resolve.cc | 4++--
Mxr/error/error | 5+++--
Mxr/error/opplus2.cc | 2+-
Mxr/error/what.cc | 6++++--
Mxr/etc/status.xslt | 124++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------
Mxr/fdset/readable.cc | 7++-----
Mxr/fdset/readwriteable.cc | 6++++--
Mxr/fdset/writeable.cc | 2+-
Mxr/httpbuffer/httpbuffer | 2++
Axr/httpbuffer/reset.cc | 7+++++++
Mxr/httpbuffer/setversion.cc | 13+++++++------
Mxr/httpdispatcher/dispatch.cc | 25+++++++++----------------
Mxr/httpdispatcher/handle.cc | 27+++++++++++++++++++--------
Mxr/httpdispatcher/httpdispatcher | 2+-
Mxr/httpdispatcher/senderrorpage.cc | 18++++++++----------
Mxr/mstr/mstr | 6++++++
Mxr/netbuffer/checkspace.cc | 11+++++++++++
Mxr/netbuffer/copy.cc | 7+++++--
Mxr/netbuffer/netbuffer | 17++++++++++++++---
Mxr/netbuffer/netread.cc | 2+-
Mxr/netbuffer/netwrite.cc | 3++-
Mxr/profiler/profiler | 3++-
Mxr/profiler/profiler1.cc | 3+--
Mxr/profiler/profiler2.cc | 15+--------------
Axr/sys/anymsg.cc | 17+++++++++++++++++
Mxr/sys/debugmsg.cc | 9++-------
Mxr/sys/main.cc | 42+++++++++++++++++++++++++++++++++++++++++-
Mxr/sys/msg.cc | 9++-------
Mxr/sys/reportmsg.cc | 8++------
Mxr/sys/sys | 12+++++++++++-
Dxr/sys/timestamp.cc | 21---------------------
Mxr/sys/warnmsg.cc | 8++------
Mxr/tcpdispatcher/dispatch.cc | 12++++++------
Mxr/tcpdispatcher/execute.cc | 53+++++++++++++++++++++++++++++++----------------------
Mxr/tcpdispatcher/handle.cc | 2+-
Mxr/tcpdispatcher/tcpdispatcher | 36++----------------------------------
Mxr/tcpdispatcher/tcpdispatcher1.cc | 34+---------------------------------
Dxr/tcpdispatcher/tcpdispatcher2.cc | 6------
Axr/timestamp/desc.cc | 11+++++++++++
Axr/timestamp/elapsed.cc | 10++++++++++
Axr/timestamp/timestamp | 17+++++++++++++++++
Axr/timestamp/timestamp1.cc | 5+++++
Axr/timestamp/timestamp2.cc | 6++++++
Mxr/webinterface/answer.cc | 57+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mxr/webinterface/answerstatus.cc | 22++++++++++++++++++++++
Mxr/webinterface/execute.cc | 2++
Mxr/webinterface/webinterface | 1+
75 files changed, 772 insertions(+), 278 deletions(-)

diff --git a/ChangeLog b/ChangeLog @@ -1,3 +1,24 @@ +2.39 [KK 2008-12-04] +- Added Httpbuffer::reset() +- The HTTP dispatcher will show back end error returns (when the HTTP + code is in the 400 or 500 range) +- main() will show runtime limits when invoked with -v +- The HTTP dispatcher no longer downgrades to HTTP/1.0 and closed + connections. Webserver back ends can do this. +- Exceptions during the sending of an error page in HTTP mode are + discarded, no longer logged. +- Mutex lock added around thread startups (see Thread::start()), when + requested so in sys/sys. +- Mutex locks around malloc()/realloc(), when requested so in sys/sys. +- Fixed possible socket leak in TcpDispatcher::execute(). The back end + socket might not have been closed when the dispatch phase crashed. +- Dispatchers are now derived from a new base class Dispatcher, in + preparation for UDP handling. +- Timestamp handling centralized in a class Timestamp. +- Messaging (msg(), debugmsg(), reportmsg(), warnmsg()) centralized. +- Started threads are administered in Threadlist. Separate threads are + killable from the web interface, or all threads to a given back end. + 2.38 [KK 2008-11-19] - Bugfix in xrctl: Weights and max connections in back ends are now passed correctly. diff --git a/Makefile b/Makefile @@ -1,7 +1,7 @@ # Top-level Makefile for XR # ------------------------- -VER = 2.38 +VER = 2.39 PREFIX = $(DESTDIR)/usr BINDIR = $(PREFIX)/sbin MANDIR = $(PREFIX)/share/man diff --git a/xr/Checkers/checkupthread/checkupthread b/xr/Checkers/checkupthread/checkupthread @@ -3,6 +3,7 @@ #include "sys/sys" #include "ThreadsAndMutexes/thread/thread" +#include "ThreadsAndMutexes/threadlist/threadlist" #include "balancer/balancer" #include "error/error" diff --git a/xr/Checkers/checkupthread/execute.cc b/xr/Checkers/checkupthread/execute.cc @@ -1,6 +1,9 @@ #include "checkupthread" void Checkupthread::execute() { + + Threadlist::desc("Checkup thread"); + while (1) { if (config.checkupsec()) { for (unsigned i = 0; i < balancer.nbackends(); i++) { diff --git a/xr/Checkers/wakeupthread/execute.cc b/xr/Checkers/wakeupthread/execute.cc @@ -1,6 +1,8 @@ #include "wakeupthread" void Wakeupthread::execute() { + Threadlist::desc("Wakeup thread"); + while (1) { if (config.wakeupsec()) { for (unsigned i = 0; i < balancer.nbackends(); i++) { diff --git a/xr/Checkers/wakeupthread/wakeupthread b/xr/Checkers/wakeupthread/wakeupthread @@ -3,6 +3,7 @@ #include "sys/sys" #include "ThreadsAndMutexes/thread/thread" +#include "ThreadsAndMutexes/threadlist/threadlist" #include "balancer/balancer" #include "error/error" diff --git a/xr/DispatchAlgorithms/storedip/storedip b/xr/DispatchAlgorithms/storedip/storedip @@ -5,6 +5,7 @@ #include "error/error" #include "balancer/balancer" #include "config/config" +#include "timestamp/timestamp" #include "DispatchAlgorithms/algorithm/algorithm" #include "DispatchAlgorithms/leastconn/leastconn" diff --git a/xr/DispatchAlgorithms/storedip/target.cc b/xr/DispatchAlgorithms/storedip/target.cc @@ -25,14 +25,15 @@ unsigned StoredIp::target(struct in_addr clientip, if (store.count(clientip) > 0) { // Client already known, maybe timed out. time_t diff = now - store[clientip].lastaccess; - - msg ("Client IP " + static_cast<string>(inet_ntoa(clientip)) + - (Mstr(" last visited on ") - + timestamp(store[clientip].lastaccess)) + - (Mstr(", ") + diff) + - " sec ago, and went to " + - balancer.backend(store[clientip].targetbackend).description() + - "\n"); + + if (config.verbose()) { + Timestamp tm(store[clientip].lastaccess); + msg(Mstr("Client IP ") + Mstr(inet_ntoa(clientip)) + + " last visited on " + tm.desc() + + Mstr(Mstr(", ") + diff) + " sec ago, and went to " + + balancer.backend(store[clientip].targetbackend).description() + + "\n"); + } if (diff <= config.ipstoretimeout()) { // Recent 'nuff @@ -73,8 +74,11 @@ unsigned StoredIp::target(struct in_addr clientip, // Weed out store. for (StoreMap::iterator iter = store.begin(); iter != store.end(); iter++) { - debugmsg (Mstr(inet_ntoa(iter->first)) + Mstr(" visited on ") + - timestamp((*iter).second.lastaccess) + "\n"); + if (config.debug()) { + Timestamp tm((*iter).second.lastaccess); + debugmsg (Mstr(inet_ntoa(iter->first)) + Mstr(" visited on ") + + tm.desc() + "\n"); + } if (now - ((*iter).second.lastaccess) > config.ipstoretimeout()) { debugmsg (" Erasing stale entry, stale\n"); store.erase(iter); diff --git a/xr/ThreadsAndMutexes/thread/run.cc b/xr/ThreadsAndMutexes/thread/run.cc @@ -0,0 +1,22 @@ +#include "thread" + +void *Thread::_run (void *data) { + Thread *t = (Thread*) data; + + Threadlist::enregister(); + try { + t->execute(); + } catch (Error const &e) { + lock(&cerr); + cerr << e.what() << "\n"; + unlock(&cerr); + } + Threadlist::deregister(); + + // Cleanups + delete (t); + + // To satisfy the prototype + return (0); +} + diff --git a/xr/ThreadsAndMutexes/thread/start.cc b/xr/ThreadsAndMutexes/thread/start.cc @@ -1,24 +1,6 @@ #include "thread" #include "profiler/profiler" -static void *_run (void *data) { - Thread *t = (Thread*) data; - - try { - t->execute(); - } catch (Error const &e) { - Mutex::lock(&cerr); - cerr << e.what() << "\n"; - Mutex::unlock(&cerr); - } - - // Cleanups - delete (t); - - // To satisfy the prototype - return (0); -} - void Thread::start() { PROFILE ("Thread::start"); @@ -34,7 +16,13 @@ void Thread::start() { if (pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED)) throw static_cast<Error>("Cannot set thread state as detached"); for (int i = 0; i < 3; i++) { +# ifdef MISTRUST_THREAD_CREATE_THREADSAFE + lock((void*)_run); +# endif res = pthread_create (&th, &attr, _run, (void*) this); +# ifdef MISTRUST_THREAD_CREATE_THREADSAFE + unlock((void*)_run); +# endif if (!res) { pthread_attr_destroy (&attr); return; diff --git a/xr/ThreadsAndMutexes/thread/thread b/xr/ThreadsAndMutexes/thread/thread @@ -4,7 +4,9 @@ #include "sys/sys" #include "error/error" #include "config/config" +#include "timestamp/timestamp" #include "ThreadsAndMutexes/mutex/mutex" +#include "ThreadsAndMutexes/threadlist/threadlist" using namespace std; @@ -13,6 +15,9 @@ public: virtual ~Thread(); void start(); virtual void execute(); + +private: + static void *_run(void *data); }; #endif diff --git a/xr/ThreadsAndMutexes/threadinfo/threadinfo b/xr/ThreadsAndMutexes/threadinfo/threadinfo @@ -0,0 +1,34 @@ +#ifndef _THREADINFO_ +#define _THREADINFO_ + +#include "sys/sys" +#include "timestamp/timestamp" +#include "ThreadsAndMutexes/mutex/mutex" + +class Threadinfo { +public: + Threadinfo(): + th_desc(), th_tm(), th_backend(-1), th_backendfd(-1), th_clientfd(-1) + {} + + void desc(string s) { th_desc = s; } + string const &desc() const { return th_desc; } + + Timestamp const &timestamp() const { return th_tm; } + + void backend(int b) { th_backend = b; } + int backend() const { return th_backend; } + + void backendfd(int f) { th_backendfd = f; } + int backendfd() const { return th_backendfd; } + + void clientfd(int f) { th_clientfd = f; } + int clientfd() const { return th_clientfd; } + +private: + string th_desc; + Timestamp th_tm; + int th_backend, th_backendfd, th_clientfd; +}; + +#endif diff --git a/xr/ThreadsAndMutexes/threadlist/backend.cc b/xr/ThreadsAndMutexes/threadlist/backend.cc @@ -0,0 +1,5 @@ +#include "threadlist" + +void Threadlist::backend(int b) { + th_map[pthread_self()].backend(b); +} diff --git a/xr/ThreadsAndMutexes/threadlist/backendfd.cc b/xr/ThreadsAndMutexes/threadlist/backendfd.cc @@ -0,0 +1,5 @@ +#include "threadlist" + +void Threadlist::backendfd(int f) { + th_map[pthread_self()].backendfd(f); +} diff --git a/xr/ThreadsAndMutexes/threadlist/clientfd.cc b/xr/ThreadsAndMutexes/threadlist/clientfd.cc @@ -0,0 +1,5 @@ +#include "threadlist" + +void Threadlist::clientfd(int f) { + th_map[pthread_self()].clientfd(f); +} diff --git a/xr/ThreadsAndMutexes/threadlist/deregister1.cc b/xr/ThreadsAndMutexes/threadlist/deregister1.cc @@ -0,0 +1,7 @@ +#include "threadlist" + +void Threadlist::deregister(pthread_t id) { + Mutex::lock(&th_map); + th_map.erase(id); + Mutex::unlock(&th_map); +} diff --git a/xr/ThreadsAndMutexes/threadlist/deregister2.cc b/xr/ThreadsAndMutexes/threadlist/deregister2.cc @@ -0,0 +1,6 @@ +#include "threadlist" + +void Threadlist::deregister() { + deregister(pthread_self()); +} + diff --git a/xr/ThreadsAndMutexes/threadlist/desc.cc b/xr/ThreadsAndMutexes/threadlist/desc.cc @@ -0,0 +1,5 @@ +#include "threadlist" + +void Threadlist::desc(string const &s) { + th_map[pthread_self()].desc(s); +} diff --git a/xr/ThreadsAndMutexes/threadlist/enregister.cc b/xr/ThreadsAndMutexes/threadlist/enregister.cc @@ -0,0 +1,10 @@ +#include "threadlist" + +Threadmap Threadlist::th_map; + +void Threadlist::enregister() { + Threadinfo n; + Mutex::lock(&th_map); + th_map[pthread_self()] = n; + Mutex::unlock(&th_map); +} diff --git a/xr/ThreadsAndMutexes/threadlist/info.cc b/xr/ThreadsAndMutexes/threadlist/info.cc @@ -0,0 +1,5 @@ +#include "threadlist" + +Threadinfo Threadlist::info(pthread_t id) { + return th_map[id]; +} diff --git a/xr/ThreadsAndMutexes/threadlist/map.cc b/xr/ThreadsAndMutexes/threadlist/map.cc @@ -0,0 +1,5 @@ +#include "threadlist" + +Threadmap &Threadlist::map() { + return th_map; +} diff --git a/xr/ThreadsAndMutexes/threadlist/threadlist b/xr/ThreadsAndMutexes/threadlist/threadlist @@ -0,0 +1,25 @@ +#ifndef _THREADLIST_ +#define _THREADLIST_ + +#include "sys/sys" +#include "ThreadsAndMutexes/threadinfo/threadinfo" + +typedef map<pthread_t, Threadinfo> Threadmap; + +class Threadlist { +public: + static void enregister(); + static void deregister(pthread_t id); + static void deregister(); + static Threadmap &map(); + static Threadinfo info(pthread_t id); + static void desc(string const &s); + static void backend(int b); + static void clientfd(int f); + static void backendfd(int f); + +private: + static Threadmap th_map; +}; + +#endif diff --git a/xr/backend/check.cc b/xr/backend/check.cc @@ -13,7 +13,6 @@ void Backend::check() { if (backendcheck().server() == "" && backendcheck().port() == 0) { // Most common: TCP connect to the actual back end connect(); - socketclose (sock()); } else { // TCP connects to an alternative server or port. // We instantiate a dummy backend and let it connect to the "other" @@ -29,6 +28,7 @@ void Backend::check() { msg (Mstr("Alternative back end for testing ") + tester.description() + " is " + livestr() + "\n"); } + socketclose (sock()); break; case BackendCheck::c_get: @@ -58,8 +58,8 @@ void Backend::check() { live(true); else debugmsg("Back end assumed dead.\n"); - socketclose(tester.sock()); } + socketclose(tester.sock()); break; case BackendCheck::c_external: diff --git a/xr/backend/connect.cc b/xr/backend/connect.cc @@ -37,6 +37,8 @@ bool Backend::connect() { int conres = ::connect (clsocket, (struct sockaddr *)&backendaddr, sizeof(backendaddr)); int conerrno = errno; + debugmsg((Mstr("Connect result: ") + conres) + + (Mstr(", errno: ") + conerrno) + "\n"); // Put socket again in blocking mode. if (fcntl (clsocket, F_SETFL, flags) == -1) { @@ -50,7 +52,12 @@ bool Backend::connect() { // Wait for socket to go writable. Fdset fdset (config.backend_timeout()); fdset.add (clsocket); - if (fdset.readwriteable() == -1 && fdset.writeable() == clsocket) + int rwsock = fdset.readwriteable(); + int wsock = fdset.writeable(); + debugmsg (Mstr("Connecting to ") + description() + + Mstr(Mstr(": writesocket=") + wsock) + + Mstr(Mstr(", read/writesocket=") + rwsock) + "\n"); + if (wsock == clsocket && rwsock == -1) islive = true; } diff --git a/xr/dispatcher/dispatcher b/xr/dispatcher/dispatcher @@ -0,0 +1,51 @@ +#ifndef _DISPATCHER_ +#define _DISPATCHER_ + +#include "sys/sys" +#include "balancer/balancer" +#include "config/config" +#include "ThreadsAndMutexes/thread/thread" +#include "ThreadsAndMutexes/threadlist/threadlist" +#include "backendvector/backendvector" +#include "netbuffer/netbuffer" + +// Dispatching algorithm workers +#include "DispatchAlgorithms/algorithm/algorithm" +#include "DispatchAlgorithms/roundrobin/roundrobin" +#include "DispatchAlgorithms/firstactive/firstactive" +#include "DispatchAlgorithms/leastconn/leastconn" +#include "DispatchAlgorithms/external/external" +#include "DispatchAlgorithms/hashedip/hashedip" +#include "DispatchAlgorithms/storedip/storedip" +#include "DispatchAlgorithms/weightedload/weightedload" + +class Dispatcher: public Thread { +public: + + Dispatcher (int fd, struct in_addr ip); + virtual ~Dispatcher(); + + virtual void execute() = 0; + virtual void dispatch() = 0; + virtual void handle() = 0; + + int targetbackend() const { return target_backend; } + void targetbackend(int t) { target_backend = t; } + struct in_addr clientip() const { return client_ip; } + int clientfd() const { return client_fd; } + void clientfd(int c) { client_fd = c; } + int backendfd() const { return backend_fd; } + void backendfd(int b) { backend_fd = b; } + Algorithm *algorithm() const { return algo; } + + BackendVector &targetlist() { return target_list; } + void targetlist (BackendVector t) { target_list = t; } + +private: + struct in_addr client_ip; + int target_backend, client_fd, backend_fd; + Algorithm *algo; + BackendVector target_list; +}; + +#endif diff --git a/xr/dispatcher/dispatcher1.cc b/xr/dispatcher/dispatcher1.cc @@ -0,0 +1,40 @@ +#include "dispatcher" + +Dispatcher::Dispatcher(int cfd, struct in_addr cip): + Thread(), client_ip(cip), target_backend(-1), client_fd(cfd), + backend_fd(-1), target_list() { + + // Instantiate dispatchmode algorithm + switch (config.dispatchmode()) { + case Dispatchmode::m_roundrobin: + algo = new Roundrobin; + break; + case Dispatchmode::m_firstactive: + algo = new Firstactive; + break; + case Dispatchmode::m_external: + algo = new External; + break; + case Dispatchmode::m_strict_hashed_ip: + case Dispatchmode::m_lax_hashed_ip: + algo = new HashedIp; + break; + case Dispatchmode::m_strict_stored_ip: + case Dispatchmode::m_lax_stored_ip: + algo = new StoredIp; + break; + case Dispatchmode::m_weighted_load: + algo = new Weightedload; + break; + case Dispatchmode::m_leastconn: + default: + algo = new Leastconn; + break; + } + + // NOTE: Memory errors for algorithm pointer are not handled here, + // but in dispatch() (don't want to throw up in the constructor) + + debugmsg("Dispatcher instantiated.\n"); +} + diff --git a/xr/dispatcher/dispatcher2.cc b/xr/dispatcher/dispatcher2.cc @@ -0,0 +1,6 @@ +#include "dispatcher" + +Dispatcher::~Dispatcher() { + delete algo; + debugmsg ("Dispatcher finished\n"); +} diff --git a/xr/dnsentry/dnsentry b/xr/dnsentry/dnsentry @@ -3,16 +3,17 @@ #include "config/config" #include "error/error" +#include "timestamp/timestamp" #include "ThreadsAndMutexes/mutex/mutex" class DNSEntry { public: - DNSEntry(): result(0), timestamp(0) {} + DNSEntry(): result(0), timestamp() {} in_addr_t &resolve(string const &str); private: in_addr_t result; - time_t timestamp; + Timestamp timestamp; }; #endif diff --git a/xr/dnsentry/resolve.cc b/xr/dnsentry/resolve.cc @@ -2,8 +2,7 @@ in_addr_t &DNSEntry::resolve (string const &h) { // If the entry is there and if it's up to date, run with it - if (timestamp && - time(0) <= timestamp + (time_t)config.dnscachetimeout()) + if (result && timestamp.elapsed() <= (double)config.dnscachetimeout()) return result; // Resolve now. @@ -16,5 +15,6 @@ in_addr_t &DNSEntry::resolve (string const &h) { if (!hostaddr) throw static_cast<Error>("Failed to resolve host '") + h + "'"; + debugmsg(Mstr("Host ") + h + "resolved\n"); return result; } diff --git a/xr/error/error b/xr/error/error @@ -1,7 +1,8 @@ #ifndef _ERROR_ #define _ERROR_ -#include "../sys/sys" +#include "sys/sys" +#include "timestamp/timestamp" using namespace std; @@ -10,7 +11,7 @@ public: Error (string s); Error (int i); Error &operator+ (Error const &other); - Error &operator+ (string const s); + Error &operator+ (string const &s); Error &operator+ (int i); char const *what() const throw (); diff --git a/xr/error/opplus2.cc b/xr/error/opplus2.cc @@ -1,6 +1,6 @@ #include "error" -Error &Error::operator+ (string s) { +Error &Error::operator+ (string const &s) { desc += s; return (*this); } diff --git a/xr/error/what.cc b/xr/error/what.cc @@ -3,8 +3,10 @@ char const *Error::what() const throw (){ ostringstream o; - if (config.prefixtimestamp()) - o << timestamp() << ' '; + if (config.prefixtimestamp()) { + Timestamp tm; + o << tm.desc() << ' '; + } o << pthread_self() << " ERROR: " << desc; return (o.str().c_str()); } diff --git a/xr/etc/status.xslt b/xr/etc/status.xslt @@ -1,7 +1,9 @@ <?xml version="1.0" encoding="UTF-8"?> <xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> -<xsl:output method="html"/> +<xsl:output method="html" + encoding="UTF-8" + doctype-public="-//W3C//DTD HTML 4.01 Transitional//EN"/> <xsl:template match="/"> <html> @@ -58,23 +60,110 @@ <xsl:template match="/status"> <table> - <xsl:apply-templates select="/status/server"/> - <xsl:apply-templates select="/status/backend"/> - - <tr> <td colspan="4"><hr/></td></tr> <tr> - <td class="backend" colspan="2"> - <b>Add back end ip:port</b> + <td valign="top"> + <!-- This is the left hand detailed status view --> + <table> + <tr> + <td colspan="4"><b>Detailed Status</b></td> + </tr> + <tr> + <td colspan="4"><hr/></td> + </tr> + <xsl:apply-templates select="/status/server"/> + <xsl:apply-templates select="/status/backend"/> + + <tr> <td colspan="4"><hr/></td></tr> + <tr> + <td class="backend" colspan="2"> + <b>Add back end ip:port</b> + </td> + <td class="backend" colspan="2" align="right"> + <input type="text" size="30" name="addbackend" id="addbackend" + onchange="goto('/server/addbackend/', 'addbackend');"/> + </td> + </tr> + <tr> <td colspan="4"><hr/></td></tr> + + </table> + <xsl:apply-templates select="/status/id"/> </td> - <td class="backend" colspan="2" align="right"> - <input type="text" size="30" name="addbackend" id="addbackend" - onchange="goto('/server/addbackend/', 'addbackend');"/> + <td valign="top"> + <!-- This is the right-hand overview --> + <table width="100%"> + <tr> + <td colspan="2"><b>Quick Overview</b></td> + </tr> + <tr> + <td colspan="2"><hr/></td> + </tr> + <xsl:for-each select="/status/backend"> + <tr> + <td> + <b>Back end + <a href="#{nr}"><xsl:value-of select="address"/></a> + </b> + </td> + <td> + <xsl:value-of select="up"/>, + <xsl:value-of select="live"/>, + <xsl:value-of select="available"/>, + <xsl:value-of select="connections"/> connections + </td> + </tr> + </xsl:for-each> + </table> + <!-- This is the activity overview --> + <table width="100%"> + <tr> + <td colspan="5"><hr/></td> + </tr> + <tr> + <td colspan="5"><b>Activity</b></td> + </tr> + <tr> + <td colspan="5"><hr/></td> + </tr> + <tr> + <td><b>Thread</b></td> + <td><b>Description</b></td> + <td><b>Back end</b></td> + <td><b>Duration</b></td> + <td></td> + </tr> + <xsl:apply-templates select="/status/activity/thread"> + <xsl:sort select="duration" data-type="number"/> + </xsl:apply-templates> + </table> </td> </tr> - <tr> <td colspan="4"><hr/></td></tr> - </table> - <xsl:apply-templates select="/status/id"/> +</xsl:template> + +<xsl:template match="/status/activity/thread"> + <tr> + <td><xsl:value-of select="id"/></td> + <td><xsl:value-of select="description"/></td> + <xsl:choose> + <xsl:when test="backend = -1"> + <td></td> + </xsl:when> + <xsl:otherwise> + <td><xsl:value-of select="address"/></td> + </xsl:otherwise> + </xsl:choose> + <td><xsl:value-of select="duration"/></td> + <xsl:choose> + <xsl:when test="backend = -1"> + <td></td> + </xsl:when> + <xsl:otherwise> + <td><input type="button" value="Kill" + onclick="goto('/thread/kill/{id}', '');"/> + </td> + </xsl:otherwise> + </xsl:choose> + </tr> </xsl:template> <xsl:template match="/status/id"> @@ -441,6 +530,7 @@ <tr> <td colspan="4"><hr/></td></tr> <tr> <td class="backend" colspan="3"> + <a name="{nr}"/> <b> Back end <xsl:value-of select="address"/> </b> </td> <td class="backend"> @@ -554,6 +644,14 @@ </select> </xsl:otherwise> </xsl:choose> + </td> + </tr> + <tr> + <td></td> + <td colspan="2">Stop all connections</td> + <td> + <input type="button" value="Stop now" + onclick="goto('/backend/{nr}/stopconnections', '');"/> </td> </tr> </xsl:template> diff --git a/xr/fdset/readable.cc b/xr/fdset/readable.cc @@ -21,10 +21,7 @@ int Fdset::readable() const { // Prepare select sets. FD_ZERO (&readset); FD_ZERO (&exceptset); - int max = 0; for (unsigned i = 0; i < set.size(); i++) { - if (set[i] > max) - max = set[i]; FD_SET (set[i], &readset); FD_SET (set[i], &exceptset); } @@ -39,7 +36,7 @@ int Fdset::readable() const { // Run the select. Signal interrupts are returned as -1, so that // the caller can handle them gracefully. - if (select (max + 1, &readset, 0, &exceptset, tvp) < 0) { + if (select (FD_SETSIZE, &readset, 0, &exceptset, tvp) < 0) { debugmsg (Mstr("Select interrupted with errno ") + errno + " while waiting for readable fd\n"); if (errno != EINTR) @@ -53,7 +50,7 @@ int Fdset::readable() const { // Check for exceptions. for (unsigned i = 0; i < set.size(); i++) if (FD_ISSET (set[i], &exceptset)) - throw static_cast<Error>("Exception on fd/socket ") + set[i]; + throw static_cast<Error>("Exception on fd/socket ") + int(set[i]); // Check what's readable. for (unsigned i = 0; i < set.size(); i++) diff --git a/xr/fdset/readwriteable.cc b/xr/fdset/readwriteable.cc @@ -41,12 +41,14 @@ int Fdset::readwriteable() const { // Check for exceptions. for (unsigned i = 0; i < set.size(); i++) if (FD_ISSET (set[i], &exceptset)) - throw static_cast<Error>("Exception on fd/socket ") + set[i]; + throw static_cast<Error>("Exception on fd/socket ") + int(set[i]); // Check what's active. for (unsigned i = 0; i < set.size(); i++) - if (FD_ISSET (set[i], &readset) && FD_ISSET (set[i], &writeset)) + if (FD_ISSET (set[i], &readset) && FD_ISSET (set[i], &writeset)) { + debugmsg(Mstr("Fd " ) + set[i] + " has become read/writeable\n"); return (set[i]); + } // Nothing.. return (-1); diff --git a/xr/fdset/writeable.cc b/xr/fdset/writeable.cc @@ -41,7 +41,7 @@ int Fdset::writeable() const { // Check for exceptions. for (unsigned i = 0; i < set.size(); i++) if (FD_ISSET (set[i], &exceptset)) - throw static_cast<Error>("Exception on fd/socket ") + set[i]; + throw static_cast<Error>("Exception on fd/socket ") + int(set[i]); // Check what's writeable. for (unsigned i = 0; i < set.size(); i++) diff --git a/xr/httpbuffer/httpbuffer b/xr/httpbuffer/httpbuffer @@ -36,6 +36,8 @@ public: string requesturi(); + void reset(); + private: unsigned findheader (string h); unsigned bodystart; diff --git a/xr/httpbuffer/reset.cc b/xr/httpbuffer/reset.cc @@ -0,0 +1,7 @@ +#include "httpbuffer" + +void Httpbuffer::reset() { + first_line = ""; + bodystart = 0; + Netbuffer::reset(); +} diff --git a/xr/httpbuffer/setversion.cc b/xr/httpbuffer/setversion.cc @@ -3,15 +3,16 @@ bool Httpbuffer::setversion (char v) { PROFILE("Httpbuffer::setversion"); - // No headers? Nothing to do. - if (!headersreceived()) + // No first line? Nothing to do yet. + unsigned croff = charfind('\n'); + if (!croff) return false; - // First line must start with HTTP/1.X. - // Poke in the version right after that. - if (bufsz() < 8 || strncmp(bufdata(), "HTTP/1.", 7)) + // Find the HTTP/1.x header + unsigned stroff = strfind("HTTP/1."); + if (!stroff || stroff > croff) return false; // Poke in the new version. - return setchar(7, v); + return setchar(stroff + 7, v); } diff --git a/xr/httpdispatcher/dispatch.cc b/xr/httpdispatcher/dispatch.cc @@ -2,13 +2,12 @@ void HttpDispatcher::dispatch() { PROFILE("HttpDispatcher::dispatch"); - + unsigned stickytarget; string host_header = ""; // Try to dispatch. Since we're in HTTP mode, we must return an // error page when dispatching fails. - try { // Get the client's request. May need for cookie inspection or for the @@ -17,9 +16,7 @@ void HttpDispatcher::dispatch() { if (!buf.netread(clientfd(), config.client_timeout())) throw static_cast<Error>("Didn't receive a valid " "client request."); - if (config.verbose()) - msg ("Received client request: '" + buf.firstline() + - "'\n"); + msg ("Received client request: '" + buf.firstline() + "'\n"); // See if hostmatching is used. This is true when a backend // matches against a non-dot host. @@ -32,9 +29,8 @@ void HttpDispatcher::dispatch() { // Build new target list if host matching applies. if (hostmatchused) { host_header = buf.headerval ("Host"); - if (config.verbose()) - msg ("Will try to dispatch request host '" + - host_header + "'\n"); + msg ("Will try to dispatch request host '" + + host_header + "'\n"); // We need to build tcpdispatcher's target list now! // Construct locally and poke into TcpDispatcher. @@ -67,13 +63,11 @@ void HttpDispatcher::dispatch() { // to non-sticky dispatching. targetbackend(stickytarget); Backend tb = balancer.backend(stickytarget); - if (config.verbose()) - msg ("Sticky HTTP request for " + tb.description() + "\n"); + msg ("Sticky HTTP request for " + tb.description() + "\n"); if (! tb.connect()) { balancer.backend(stickytarget).live(false); - if (config.verbose()) - msg ("Failed to connect to back end " + tb.description() + - ", trying to dispatch to other\n"); + msg ("Failed to connect to back end " + tb.description() + + ", trying to dispatch to other\n"); issticky(false); TcpDispatcher::dispatch(); } else { @@ -81,11 +75,10 @@ void HttpDispatcher::dispatch() { issticky(true); } } - + } catch (Error const &e) { - senderrorpage(); + senderrorpage(e.what()); throw e; } - } diff --git a/xr/httpdispatcher/handle.cc b/xr/httpdispatcher/handle.cc @@ -5,12 +5,6 @@ void HttpDispatcher::handle() { // The client request was already retrieved before starting the // dispatcher. We can continue by applying server-directed headers. - debugmsg("Applying server-directed headers to client request\n"); - buf.setversion('0'); - buf.setheader ("Connection", "close"); - buf.setheader ("Proxy-Connection", "close"); - - // Apply other server-side directed info if (config.addxrversion()) buf.setheader ("XR", VER); if (config.addxforwardedfor()) @@ -22,14 +16,18 @@ void HttpDispatcher::handle() { debugmsg("Sending client request to back end\n"); buf.netwrite(backendfd(), config.backend_timeout()); - // Let's see if we need to modify the server headers. + // Let's see if we will need to modify the server headers. bool modify_serverheaders = false; if (config.addxrversion() || (config.stickyhttp() && !issticky())) modify_serverheaders = true; + // Store the client request. May want to log it later. + string client_request = buf.firstline(); + // Go into copy-thru mode. If required, catch the server headers on // their first appearance and modify them. + bool backend_response_checked = false; while (1) { Fdset readset (config.client_timeout()); readset.add(clientfd()); @@ -45,7 +43,7 @@ void HttpDispatcher::handle() { break; if (sock == backendfd() && modify_serverheaders) { - debugmsg("First back end request seen, applying modifications\n"); + debugmsg("Back end response seen, applying modifications\n"); modify_serverheaders = false; while (! buf.headersreceived()) if (!buf.netread (sock, config.backend_timeout())) @@ -59,6 +57,18 @@ void HttpDispatcher::handle() { buf.setheader("Set-Cookie", o.str()); } } + + // The back end response may now get flushed to the client. + // If the response code is 4** or 5**, log it as a warning. + if (!backend_response_checked && + sock == backendfd() && buf.headersreceived()) { + string respcode = buf.stringat(9, 3); + if (respcode[0] == '4' || respcode[0] == '5') + warnmsg("HTTP back end indicates fault: '" + + buf.firstline() + "' as response to '" + + client_request + "'\n"); + backend_response_checked = true; + } // Flush info to the other connected side. int othersock, timeout; @@ -77,4 +87,5 @@ void HttpDispatcher::handle() { if (sock == backendfd()) balancer.backend(targetbackend()).addbytes(buf.bufsz()); } + } diff --git a/xr/httpdispatcher/httpdispatcher b/xr/httpdispatcher/httpdispatcher @@ -15,7 +15,7 @@ public: void issticky (bool s) { is_sticky = s; } private: - void senderrorpage(); + void senderrorpage(string const &desc); Httpbuffer buf; bool is_sticky; diff --git a/xr/httpdispatcher/senderrorpage.cc b/xr/httpdispatcher/senderrorpage.cc @@ -1,14 +1,9 @@ #include "httpdispatcher" -#define ERRORSTR \ - "HTTP/1.0 502 Internal Server Error\r\n" \ - "Content-Length: 0\r\n" \ - "\r\n" - -void HttpDispatcher::senderrorpage() { +void HttpDispatcher::senderrorpage(string const &reason) { PROFILE("HttpDispatcher::senderrorpage"); - msg ("Sending error page to client.\n"); + msg ("Sending error page to client: '" + reason + "'\n"); try { string txt = "<html>\n" @@ -24,13 +19,16 @@ void HttpDispatcher::senderrorpage() { mess << "HTTP/1.0 502 Internal Server Error\r\n" "Content-Length: " << txt.size() << "\r\n" + "XR-Reason: " << reason << "\r\n" "\r\n" << txt; Netbuffer buf(mess.str()); buf.netwrite(clientfd(), config.client_timeout()); } catch (Error const &e) { - Mutex::lock(&cerr); - cerr << e.what() << " (while sending error page)\n"; - Mutex::unlock(&cerr); + // Silently discard, we are not interested in errors + // that ocur when an error page is being sent + // Mutex::lock(&cerr); + // cerr << e.what() << " (while sending error page)\n"; + // Mutex::unlock(&cerr); } } diff --git a/xr/mstr/mstr b/xr/mstr/mstr @@ -41,6 +41,12 @@ public: *this += o.str(); return *this; } + Mstr const &operator+(rlim_t r) { + ostringstream o; + o << r; + *this += o.str(); + return *this; + } }; #endif diff --git a/xr/netbuffer/checkspace.cc b/xr/netbuffer/checkspace.cc @@ -5,14 +5,25 @@ void Netbuffer::check_space(unsigned extra) { if (!buf_alloced) { buf_alloced = extra; + // When the first network buffer is allocated in HTTP mode, get + // twice as much. Most often that will be enough to fetch the whole + // client request, so that one realloc() will be spared. + if (extra == config.buffersize() && + config.stype() == Servertype::t_http) + buf_alloced <<= 1; + msg (Mstr("Reserving ") + buf_alloced + " bytes for network buffer\n"); + LOCK_MALLOC; buf_data = (char*)malloc(buf_alloced); + UNLOCK_MALLOC; if (! buf_data) throw static_cast<Error>("Memory fault in Netbuffer::check_space"); } else if (buf_sz + extra > buf_alloced) { msg((Mstr("Reallocating net buffer from ") + buf_alloced) + (Mstr(" to ") + (buf_alloced + extra)) + " bytes\n"); buf_alloced += extra; + LOCK_MALLOC; buf_data = (char*)realloc(buf_data, buf_alloced); + UNLOCK_MALLOC; if (! buf_data) throw static_cast<Error>("Memory fault in Netbuffer::check_space"); } diff --git a/xr/netbuffer/copy.cc b/xr/netbuffer/copy.cc @@ -3,8 +3,11 @@ void Netbuffer::copy (Netbuffer const &other) { buf_sz = other.buf_sz; buf_alloced = other.buf_alloced; - if (! (buf_data = (char*)malloc(buf_sz)) ) + LOCK_MALLOC; + buf_data = (char*)malloc(buf_alloced); + UNLOCK_MALLOC; + if (!buf_data) throw static_cast<Error>("Memory fault in Netbuffer::copy"); - memcpy (buf_data, other.buf_data, buf_sz); + memcpy (buf_data, other.buf_data, buf_alloced); } diff --git a/xr/netbuffer/netbuffer b/xr/netbuffer/netbuffer @@ -6,6 +6,17 @@ #include "config/config" #include "profiler/profiler" #include "fdset/fdset" +#include "servertype/servertype" + +/* A few defs when malloc() / realloc() are suspected to be not thread-safe. + * The defines are used in eg. copy() and check_space(). */ +#ifdef MISTRUST_MALLOC_THREADSAFE +#define LOCK_MALLOC Mutex::lock((void*)malloc) +#define UNLOCK_MALLOC Mutex::unlock((void*)malloc) +#else +#define LOCK_MALLOC +#define UNLOCK_MALLOC +#endif class Netbuffer { public: @@ -26,7 +37,7 @@ public: unsigned strfind (char const *s) const; unsigned charfind (char ch, unsigned start = 0) const; - + bool setchar(unsigned offset, char ch); void setstring(string const &s); @@ -36,7 +47,7 @@ public: bool removeat(unsigned index, unsigned len = 1); void reset(); - + private: void copy (Netbuffer const &other); @@ -44,7 +55,7 @@ private: void check_space(unsigned extra); string printable(char c) const; - + char *buf_data; unsigned buf_sz; unsigned buf_alloced; diff --git a/xr/netbuffer/netread.cc b/xr/netbuffer/netread.cc @@ -8,7 +8,7 @@ unsigned Netbuffer::netread (int fd, int timeout) { set.add(fd); if (set.readable() != fd) throw static_cast<Error>("Fd ") + fd + - " failed to become readable within " + timeout + "sec"; + " failed to become readable within " + int(timeout) + " sec"; } check_space(config.buffersize()); diff --git a/xr/netbuffer/netwrite.cc b/xr/netbuffer/netwrite.cc @@ -57,7 +57,8 @@ unsigned Netbuffer::netwrite (int fd, int timeout) const { totwritten += nwritten; else if (errno != EINVAL && errno != EINPROGRESS) throw static_cast<Error>("Write/send failed: errno=") + - errno + ", " + strerror(errno) + ", result=" + nwritten; + int(errno) + ", " + strerror(errno) + + ", result=" + (int)nwritten; } return buf_sz; diff --git a/xr/profiler/profiler b/xr/profiler/profiler @@ -2,6 +2,7 @@ #define _PROFILER_ #include "sys/sys" +#include "timestamp/timestamp" #include "ThreadsAndMutexes/mutex/mutex" class Profiler { @@ -10,7 +11,7 @@ public: ~Profiler(); private: char const *fname; - struct timeval tv_start; + Timestamp timestamp; static FILE *outf; }; diff --git a/xr/profiler/profiler1.cc b/xr/profiler/profiler1.cc @@ -2,8 +2,7 @@ FILE *Profiler::outf; -Profiler::Profiler (char const *f) { +Profiler::Profiler (char const *f): timestamp() { fname = f; - gettimeofday(&tv_start, 0); } diff --git a/xr/profiler/profiler2.cc b/xr/profiler/profiler2.cc @@ -1,22 +1,9 @@ #include "profiler" Profiler::~Profiler() { - struct timeval tv_end; - gettimeofday (&tv_end, 0); - double usec = - ( ((double)tv_end.tv_sec * 1000000 + tv_end.tv_usec) - - ((double)tv_start.tv_sec * 1000000 + tv_start.tv_usec) ); - - /* - cout << fname - << " start " << tv_start.tv_sec << " . " << tv_start.tv_usec - << ", end " << tv_end.tv_sec << '.' << tv_end.tv_usec - << ", usec " << usec << '\n'; - */ - if (!outf) outf = fopen ("/tmp/xr-prof.txt", "w"); if (outf) fprintf (outf, "%s %s %g\n", - timestamp().c_str(), fname, usec); + timestamp.desc().c_str(), fname, timestamp.elapsed()); } diff --git a/xr/sys/anymsg.cc b/xr/sys/anymsg.cc @@ -0,0 +1,17 @@ +#include "sys" +#include "config/config" +#include "ThreadsAndMutexes/mutex/mutex" +#include "profiler/profiler" +#include "mstr/mstr" +#include "timestamp/timestamp" + +void anymsg (Mstr const &s, ostream &o, string const &label) { + Mutex::lock(&o); + if (config.prefixtimestamp()) { + Timestamp tm; + o << tm.desc() << ' '; + } + o << pthread_self() << ' ' << label << ": " << s; + o.flush(); + Mutex::unlock(&o); +} diff --git a/xr/sys/debugmsg.cc b/xr/sys/debugmsg.cc @@ -3,14 +3,9 @@ #include "ThreadsAndMutexes/mutex/mutex" #include "profiler/profiler" #include "mstr/mstr" +#include "timestamp/timestamp" void _debugmsg (Mstr const &s) { PROFILE("debugmsg"); - - Mutex::lock(&cerr); - if (config.prefixtimestamp()) - cerr << timestamp() << ' '; - cerr << pthread_self() << " DEBUG: " << s; - cerr.flush(); - Mutex::unlock(&cerr); + anymsg(s, cerr, "DEBUG"); } diff --git a/xr/sys/main.cc b/xr/sys/main.cc @@ -10,15 +10,52 @@ using namespace std; Config config; Balancer balancer; +static void showlimits() { + typedef struct { + int resource; + string description; + } Limit; + static Limit limit[] = { + { RLIMIT_CORE, "coredump size (bytes)" }, + { RLIMIT_CPU, "cpu time (sec)" }, + { RLIMIT_DATA, "data segment size (bytes)" }, + { RLIMIT_FSIZE, "max file size (bytes)" }, +# ifdef RLIMIT_MEMLOCK + { RLIMIT_MEMLOCK, "locked mem size (bytes)" }, +# endif + { RLIMIT_NOFILE, "max open files" }, +# ifdef RLIMIT_NPROC + { RLIMIT_NPROC, "max processes" }, +# endif +# ifdef RLIMIT_RSS + { RLIMIT_RSS, "max resident set size (bytes)" }, +# endif + { RLIMIT_STACK, "max stack size (bytes)" } + }; + + for (unsigned i = 0; i < sizeof(limit) / sizeof(Limit); i++) { + struct rlimit rl; + if (getrlimit(limit[i].resource, &rl)) + throw static_cast<Error>("Failed to request limit: ") + + strerror(errno); + ostringstream o; + o << "Limits for " << limit[i].description + << ": hard limit " << unsigned(rl.rlim_max) + << ", soft limit " << unsigned(rl.rlim_cur) << '\n'; + msg(o.str()); + } +} + static void sigcatcher (int sig) { debugmsg ("Seen signal " + sig + '\n'); if (sig == SIGHUP) balancer.report(true); - else if (sig != SIGPIPE) + else if (sig != SIGPIPE && sig != SIGSTOP) balancer.terminate(true); // Actually we wouldn't need to test for SIGPIPE, it's ignored (see below). // Leaving the test in place for future versions, better an extra if // than forgetting it later. + // SIGSTOP is used for stopping separare threads. } int main (int argc, char **argv) { @@ -33,6 +70,9 @@ int main (int argc, char **argv) { // Load configuration from the commandline, promote verbosity config.parsecmdline (argc, argv); + if (config.verbose()) + showlimits(); + msg ((Mstr("XR running as PID ") + getpid()) + "\n"); // Load the signal handler. diff --git a/xr/sys/msg.cc b/xr/sys/msg.cc @@ -3,14 +3,9 @@ #include "ThreadsAndMutexes/mutex/mutex" #include "profiler/profiler" #include "mstr/mstr" +#include "timestamp/timestamp" void _msg (Mstr const &s) { PROFILE("msg"); - - Mutex::lock(&cerr); - if (config.prefixtimestamp()) - cerr << timestamp() << ' '; - cerr << pthread_self() << " INFO: " << s; - cerr.flush(); - Mutex::unlock(&cerr); + anymsg(s, cerr, "INFO"); } diff --git a/xr/sys/reportmsg.cc b/xr/sys/reportmsg.cc @@ -2,12 +2,8 @@ #include "config/config" #include "ThreadsAndMutexes/mutex/mutex" #include "mstr/mstr" +#include "timestamp/timestamp" void reportmsg (Mstr const &s) { - Mutex::lock(&cerr); - if (config.prefixtimestamp()) - cerr << timestamp() << ' '; - cerr << pthread_self() << " REPORT: " << s; - cerr.flush(); - Mutex::unlock(&cerr); + anymsg(s, cerr, "REPORT"); } diff --git a/xr/sys/sys b/xr/sys/sys @@ -21,6 +21,7 @@ #include <unistd.h> #include <arpa/inet.h> #include <netinet/in.h> +#include <sys/resource.h> #include <sys/select.h> #include <sys/socket.h> #include <sys/time.h> @@ -49,6 +50,15 @@ # define PROFILE(x) #endif +/* If you fear that your malloc() / realloc() may have threading problems, + * uncomment the following. It will cause mutex locks around the calls. */ +// #define MISTRUST_MALLOC_THREADSAFE + +/* If you fear racing conditions in thread_create() then uncomment this. + * BTW it would be really weird if thread_create() weren't thread-safe., so + * defining this is very likely not necessary. */ +// #define MISTRUST_THREAD_CREATE_THREADSAFE + using namespace std; // This we need locally for msg(), debugmsg() @@ -61,10 +71,10 @@ void _debugmsg (Mstr const &s); #define debugmsg(x) if (config.debug()) _debugmsg(x) void reportmsg (Mstr const &s); void warnmsg (Mstr const &s); +void anymsg(Mstr const &s, ostream &o, string const &label); /* Other */ int serversocket (string addr, int port, string description); -string timestamp(time_t s = 0); bool ipmatch (struct in_addr addr, struct in_addr mask); void socketclose (int fd); vector<string> str2parts (string const &s, char sep); diff --git a/xr/sys/timestamp.cc b/xr/sys/timestamp.cc @@ -1,21 +0,0 @@ -#include "sys" - -string timestamp(time_t s) { - struct timeval tv; - - if (! s) - gettimeofday (&tv, 0); - else { - tv.tv_sec = s; - tv.tv_usec = 0; - } - - struct tm *tmp = localtime(&tv.tv_sec); - - char buf[80]; - sprintf (buf, "%4.4d-%2.2d-%2.2d %2.2d:%2.2d:%2.2d,%3.3d", - tmp->tm_year + 1900, tmp->tm_mon + 1, tmp->tm_mday, - tmp->tm_hour, tmp->tm_min, tmp->tm_sec, - int(tv.tv_usec / 1000)); - return (buf); -} diff --git a/xr/sys/warnmsg.cc b/xr/sys/warnmsg.cc @@ -2,12 +2,8 @@ #include "config/config" #include "ThreadsAndMutexes/mutex/mutex" #include "mstr/mstr" +#include "timestamp/timestamp" void warnmsg (Mstr const &s) { - Mutex::lock(&cerr); - if (config.prefixtimestamp()) - cerr << timestamp() << ' '; - cerr << pthread_self() << " WARNING: " << s; - cerr.flush(); - Mutex::unlock(&cerr); + anymsg(s, cerr, "WARNING"); } diff --git a/xr/tcpdispatcher/dispatch.cc b/xr/tcpdispatcher/dispatch.cc @@ -3,7 +3,7 @@ void TcpDispatcher::dispatch() { // Check that a working algorithm is available. May be missing if // constructor's "new" failed. - if (!algorithm) + if (!algorithm()) throw static_cast<Error>("No algorithm in Tcpdispatcher::dispatch"); bool connected = false; @@ -11,11 +11,11 @@ void TcpDispatcher::dispatch() { // Build up the target list, if not yet done so. The HTTP dispatcher // might've created it already for host-based matching (in which case // we won't bother here). - if (! target_list.isdefined()) { + if (! targetlist().isdefined()) { msg ("Creating target list for the TCP dispatcher\n"); for (unsigned i = 0; i < balancer.nbackends(); i++) if (balancer.backend(i).available()) { - target_list.add(i); + targetlist().add(i); if (config.verbose()) msg (" Candidate target: " + balancer.backend(i).description() + "\n"); @@ -25,10 +25,10 @@ void TcpDispatcher::dispatch() { // Call the dispatch algorithm until we can connect, // or until the algorithm is out of back ends (throws exception). while (!connected) { - target_backend = algorithm->target(clientip(), target_list); - Backend tb = balancer.backend(target_backend); + targetbackend(algorithm()->target(clientip(), targetlist())); + Backend tb = balancer.backend(targetbackend()); if (!tb.connect()) { - balancer.backend(target_backend).live(false); + balancer.backend(targetbackend()).live(false); if (config.verbose()) msg ("Failed to connect to back end " + tb.description() + ", trying to dispatch to other\n"); diff --git a/xr/tcpdispatcher/execute.cc b/xr/tcpdispatcher/execute.cc @@ -27,7 +27,10 @@ static void run_excess(string const &prog, char const *ip) { "' terminated abnormally!\n"); } -void TcpDispatcher::execute() { +void TcpDispatcher::execute() { + Threadlist::desc("Verifying"); + Threadlist::clientfd(clientfd()); + msg ((Mstr("Dispatch request for client fd ") + clientfd()) + "\n"); // Check 'softmaxconnrate' and 'hardmaxconnrate' now! @@ -43,9 +46,9 @@ void TcpDispatcher::execute() { unsigned max_conns = max(config.hardmaxconnrate(), config.softmaxconnrate()); - Mutex::lock (&accesslog[client_ip.s_addr]); - accesslog[client_ip.s_addr].push(now); - Mutex::unlock (&accesslog[client_ip.s_addr]); + Mutex::lock (&accesslog[clientip().s_addr]); + accesslog[clientip().s_addr].push(now); + Mutex::unlock (&accesslog[clientip().s_addr]); if (accesslog_lastclean < min_ts) { // Clean the entire access log, it's been a while... @@ -74,61 +77,67 @@ void TcpDispatcher::execute() { } else { // The "big log" doesn't need to be fully cleaned, // but this particular IP should be! - Mutex::lock(&accesslog[client_ip.s_addr]); - while ( accesslog[client_ip.s_addr].front() < min_ts - || accesslog[client_ip.s_addr].size() > max_conns ) { - accesslog[client_ip.s_addr].pop(); + Mutex::lock(&accesslog[clientip().s_addr]); + while ( accesslog[clientip().s_addr].front() < min_ts + || accesslog[clientip().s_addr].size() > max_conns ) { + accesslog[clientip().s_addr].pop(); } - Mutex::unlock(&accesslog[client_ip.s_addr]); + Mutex::unlock(&accesslog[clientip().s_addr]); } if (config.hardmaxconnrate() && - accesslog[client_ip.s_addr].size() >= config.hardmaxconnrate() ) { + accesslog[clientip().s_addr].size() >= config.hardmaxconnrate() ) { // This IP has violated the "HARD" limit! Reject the connection ostringstream o; - o << "Client " << inet_ntoa(client_ip) + o << "Client " << inet_ntoa(clientip()) << " has hit the HARD maximum number of connections (" << config.hardmaxconnrate() << " conections in " << config.connrate_time() << " seconds; " - << accesslog[client_ip.s_addr].size() + << accesslog[clientip().s_addr].size() << " connections recorded). Client is refused.\n"; warnmsg (o.str()); socketclose(clientfd()); - run_excess(config.hardmaxconnexcess(), inet_ntoa(client_ip)); + run_excess(config.hardmaxconnexcess(), inet_ntoa(clientip())); return; } else if (config.softmaxconnrate() && - (accesslog[client_ip.s_addr].size() >= + (accesslog[clientip().s_addr].size() >= config.softmaxconnrate())) { // This IP has violated the "SOFT" Limit. Go to sleep for a while. ostringstream o; - o << "Client " << inet_ntoa(client_ip) + o << "Client " << inet_ntoa(clientip()) << " has hit the SOFT maximum number of connections (" << config.softmaxconnrate() << " connections in " << config.connrate_time() << " sedonds; " - << accesslog[client_ip.s_addr].size() + << accesslog[clientip().s_addr].size() << " connections recorded). Client is deferred for " << config.defertime() << " microseconds.\n"; warnmsg (o.str()); - run_excess(config.softmaxconnexcess(), inet_ntoa(client_ip)); + run_excess(config.softmaxconnexcess(), inet_ntoa(clientip())); usleep(config.defertime()); } } try { + Threadlist::desc("Dispatching"); dispatch(); } catch (Error const &e) { Mutex::lock(&cerr); cerr << e.what() << "\n"; Mutex::unlock(&cerr); - socketclose (clientfd()); + socketclose(clientfd()); + socketclose(backendfd()); return; } msg ((Mstr("Dispatching client fd ") + clientfd()) + - (Mstr(" to ") + balancer.backend(target_backend).description()) + + (Mstr(" to ") + balancer.backend(targetbackend()).description()) + (Mstr(", fd ") + backendfd()) + "\n"); - balancer.backend(target_backend).startconnection(); + Threadlist::desc("Serving"); + Threadlist::backend(targetbackend()); + Threadlist::backendfd(backendfd()); + + balancer.backend(targetbackend()).startconnection(); try { handle(); @@ -138,12 +147,12 @@ void TcpDispatcher::execute() { Mutex::unlock(&cerr); } - balancer.backend(target_backend).endconnection(); + balancer.backend(targetbackend()).endconnection(); socketclose (clientfd()); socketclose (backendfd()); msg ((Mstr("Done dispatching to back end fd ") + backendfd()) + - (Mstr(" at ") + balancer.backend(target_backend).description()) + + (Mstr(" at ") + balancer.backend(targetbackend()).description()) + "\n"); } diff --git a/xr/tcpdispatcher/handle.cc b/xr/tcpdispatcher/handle.cc @@ -31,7 +31,7 @@ void TcpDispatcher::handle() { netbuffer.netwrite (othersock, timeout); if (sock == backendfd()) - balancer.backend(target_backend).addbytes(netbuffer.bufsz()); + balancer.backend(targetbackend()).addbytes(netbuffer.bufsz()); netbuffer.reset(); } diff --git a/xr/tcpdispatcher/tcpdispatcher b/xr/tcpdispatcher/tcpdispatcher @@ -1,53 +1,21 @@ #ifndef _TCPDISPATCHER_ #define _TCPDISPATCHER_ -#include "sys/sys" -#include "balancer/balancer" -#include "config/config" -#include "ThreadsAndMutexes/thread/thread" -#include "backendvector/backendvector" +#include "dispatcher/dispatcher" #include "netbuffer/netbuffer" -// Dispatching algorithm workers -#include "DispatchAlgorithms/algorithm/algorithm" -#include "DispatchAlgorithms/roundrobin/roundrobin" -#include "DispatchAlgorithms/firstactive/firstactive" -#include "DispatchAlgorithms/leastconn/leastconn" -#include "DispatchAlgorithms/external/external" -#include "DispatchAlgorithms/hashedip/hashedip" -#include "DispatchAlgorithms/storedip/storedip" -#include "DispatchAlgorithms/weightedload/weightedload" - -class TcpDispatcher: public Thread { +class TcpDispatcher: public Dispatcher { public: TcpDispatcher (int fd, struct in_addr ip); - virtual ~TcpDispatcher(); virtual void execute(); - virtual void dispatch(); - void dispatch (string host); virtual void handle(); - int targetbackend() const { return target_backend; } - void targetbackend(int t) { target_backend = t; } - struct in_addr clientip() const { return client_ip; } - int clientfd() const { return client_fd; } - void clientfd(int c) { client_fd = c; } - int backendfd() const { return backend_fd; } - void backendfd(int b) { backend_fd = b; } - - BackendVector const &targetlist() const { return target_list; } - void targetlist (BackendVector t) { target_list = t; } - unsigned readchunk (int src); private: - struct in_addr client_ip; - int target_backend, client_fd, backend_fd; - Algorithm *algorithm; - BackendVector target_list; Netbuffer netbuffer; }; diff --git a/xr/tcpdispatcher/tcpdispatcher1.cc b/xr/tcpdispatcher/tcpdispatcher1.cc @@ -1,37 +1,5 @@ #include "tcpdispatcher" TcpDispatcher::TcpDispatcher(int cfd, struct in_addr cip): - Thread(), client_ip(cip), target_backend(-1), client_fd(cfd), - backend_fd(-1), target_list(), netbuffer() { - - // Instantiate dispatchmode algorithm - switch (config.dispatchmode()) { - case Dispatchmode::m_roundrobin: - algorithm = new Roundrobin; - break; - case Dispatchmode::m_firstactive: - algorithm = new Firstactive; - break; - case Dispatchmode::m_external: - algorithm = new External; - break; - case Dispatchmode::m_strict_hashed_ip: - case Dispatchmode::m_lax_hashed_ip: - algorithm = new HashedIp; - break; - case Dispatchmode::m_strict_stored_ip: - case Dispatchmode::m_lax_stored_ip: - algorithm = new StoredIp; - break; - case Dispatchmode::m_weighted_load: - algorithm = new Weightedload; - break; - case Dispatchmode::m_leastconn: - default: - algorithm = new Leastconn; - break; - } - - // NOTE: Memory errors for algorithm pointer are not handled here, - // but in dispatch() (don't want to throw up in the constructor) + Dispatcher(cfd, cip), netbuffer() { } diff --git a/xr/tcpdispatcher/tcpdispatcher2.cc b/xr/tcpdispatcher/tcpdispatcher2.cc @@ -1,6 +0,0 @@ -#include "tcpdispatcher" - -TcpDispatcher::~TcpDispatcher() { - delete algorithm; - debugmsg ("TCP dispatcher finished\n"); -} diff --git a/xr/timestamp/desc.cc b/xr/timestamp/desc.cc @@ -0,0 +1,11 @@ +#include "timestamp" + +string Timestamp::desc() const { + struct tm *tmp = localtime(&tv.tv_sec); + char buf[80]; + sprintf (buf, "%4.4d-%2.2d-%2.2d %2.2d:%2.2d:%2.2d,%3.3d", + tmp->tm_year + 1900, tmp->tm_mon + 1, tmp->tm_mday, + tmp->tm_hour, tmp->tm_min, tmp->tm_sec, + int(tv.tv_usec / 1000)); + return buf; +} diff --git a/xr/timestamp/elapsed.cc b/xr/timestamp/elapsed.cc @@ -0,0 +1,10 @@ +#include "timestamp" + +double Timestamp::elapsed() const { + struct timeval end; + gettimeofday(&end, 0); + double usec = + ( ((double)end.tv_sec * 1000000 + end.tv_usec) - + ((double)tv.tv_sec * 1000000 + tv.tv_usec) ); + return usec / 1000000; +} diff --git a/xr/timestamp/timestamp b/xr/timestamp/timestamp @@ -0,0 +1,17 @@ +#ifndef _TIMESTAMP_ +#define _TIMESTAMP_ + +#include "sys/sys" + +class Timestamp { +public: + Timestamp(); + Timestamp(int sec_since_epoch); + double elapsed() const; + string desc() const; + +private: + struct timeval tv; +}; + +#endif diff --git a/xr/timestamp/timestamp1.cc b/xr/timestamp/timestamp1.cc @@ -0,0 +1,5 @@ +#include "timestamp" + +Timestamp::Timestamp() { + gettimeofday(&tv, 0); +} diff --git a/xr/timestamp/timestamp2.cc b/xr/timestamp/timestamp2.cc @@ -0,0 +1,6 @@ +#include "timestamp" + +Timestamp::Timestamp(int sec_since_epoch) { + tv.tv_sec = sec_since_epoch; + tv.tv_usec = 0; +} diff --git a/xr/webinterface/answer.cc b/xr/webinterface/answer.cc @@ -1,5 +1,16 @@ #include "webinterface" +static void stop_backend_thread(pthread_t id) { + Threadinfo info = Threadlist::info(id); + msg((Mstr("Stopping thread ") + id) + + (Mstr(" (backend socket ") + info.backendfd()) + + (Mstr(", client socket ") + info.clientfd()) + + ")\n"); + socketclose(info.backendfd()); + socketclose(info.clientfd()); + Threadlist::deregister(id); +} + static unsigned str2uns (string const &s, string const &desc) { unsigned ret; @@ -16,6 +27,21 @@ static double str2dbl (string const &s, string const &desc) { return (ret); } +static pthread_t str2threadid (string const &s, string const &desc) { + pthread_t ret; + long val; + int convret; + + if (s[0] == '0' && (s[1] == 'x' || s[1] == 'X')) + convret = sscanf(s.c_str() + 2, "%lx", &val); + else + convret = sscanf(s.c_str(), "%ld", &val); + if (convret < 1) + throw static_cast<Error>("Bad ") + desc; + memcpy (&ret, &val, sizeof(ret)); + return (ret); +} + static unsigned backendindex (string const &s) { unsigned ret; @@ -456,5 +482,36 @@ void Webinterface::answer(Httpbuffer req) { return; } + // /backend/NR/stopconnections + if (parts.size() == 3 && + parts[0] == "backend" && parts[2] == "stopconnections") { + unsigned ind = backendindex(parts[1]); + bool done = false; + while (!done) { + done = true; + for (Threadmap::iterator it = Threadlist::map().begin(); + it != Threadlist::map().end(); + it++) { + pthread_t thread_id = (*it).first; + Threadinfo thread_info = (*it).second; + if (thread_info.backend() == (int)ind) { + stop_backend_thread(thread_id); + done = false; + break; + } + } + } + answer_status(); + return; + } + + // /thread/kill/VALUE + if (parts.size() == 3 && parts[0] == "thread" && parts[1] == "kill") { + pthread_t id = str2threadid(parts[2], "thread id"); + stop_backend_thread(id); + answer_status(); + return; + } + throw static_cast<Error>("No action for URI '/") + uri + "'"; } diff --git a/xr/webinterface/answerstatus.cc b/xr/webinterface/answerstatus.cc @@ -103,6 +103,28 @@ void Webinterface::answer_status() { " <backendcheck>" << balancer.backend(i).backendcheck().setting() << "</backendcheck>\n" " </backend>\n" ; + + o << " <activity>\n"; + for (Threadmap::iterator it = Threadlist::map().begin(); + it != Threadlist::map().end(); + it++) { + pthread_t thread_id = (*it).first; + Threadinfo thread_info = (*it).second; + o << + " <thread>\n" + " <id>" << thread_id << "</id>\n" + " <description>" << thread_info.desc() << "</description>\n" + " <backend>" << thread_info.backend() << "</backend>\n" + " <address>"; + if (thread_info.backend() >= 0) + o << balancer.backend(thread_info.backend()).description(); + o << + "</address>\n" + " <duration>" << thread_info.timestamp().elapsed() << "</duration>\n" + " </thread>\n"; + } + o << " </activity>\n"; + o << "</status>\n\n"; diff --git a/xr/webinterface/execute.cc b/xr/webinterface/execute.cc @@ -3,6 +3,8 @@ void Webinterface::execute() { int sfd; + Threadlist::desc("Web interface"); + // Create the server socket, or retry infinitely. while (true) { try { diff --git a/xr/webinterface/webinterface b/xr/webinterface/webinterface @@ -3,6 +3,7 @@ #include "sys/sys" #include "ThreadsAndMutexes/thread/thread" +#include "ThreadsAndMutexes/threadlist/threadlist" #include "fdset/fdset" #include "httpbuffer/httpbuffer" #include "balancer/balancer"