commit c726b6b6ec94874d005b2b0ee4fdc045a6dde3d6 parent 0b4c13b7b3b4bc8c081de630921a213b45b407ae Author: finwo <finwo@pm.me> Date: Sat, 3 Jan 2026 19:37:22 +0100 2.46 Diffstat:
60 files changed, 830 insertions(+), 536 deletions(-)
diff --git a/ChangeLog b/ChangeLog @@ -1,3 +1,9 @@ +2.46 [KK 2009-02-18] +- Dispatcher-related classes moved under directory xr/Dispatchers/. +- UDP balancer implementation started (code stubs in place). +- xrctl bugfixed: a backend-scope maxconnections statement would + confuse the global server block. Thanks Rajeev for reporting. + 2.45 [KK 2009-02-18] - Usage information formatted. - Rubberstamped "stable"! diff --git a/Makefile b/Makefile @@ -1,7 +1,7 @@ # Top-level Makefile for XR # ------------------------- -VER = 2.45 +VER = 2.46 PREFIX = $(DESTDIR)/usr BINDIR = $(PREFIX)/sbin MANDIR = $(PREFIX)/share/man @@ -16,6 +16,7 @@ foo: @echo 'Choose:' @echo ' make local - local program construction' @echo ' make localprof - local, with profiling info' + @echo ' make localmem - local, with memory debugging' @echo ' make install - installation to $(BINDIR)' @echo ' make uninstall - removes installed programs' @echo ' make clean - removal after local/install' @@ -28,12 +29,15 @@ local: xr/etc/gettools /usr/local/bin xr/etc c-conf e-ver xr/etc/e-ver ChangeLog $(VER) BASE=$(BASE) AUTHOR='$(AUTHOR)' MAINTAINER='$(MAINTAINER)' \ - DISTSITE='$(DISTSITE)' \ + DISTSITE='$(DISTSITE)' MEMDEBUG=$(MEMDEBUG)\ VER='$(VER)' PROF=$(PROF) PROFILER=$(PROFILER) $(MAKE) -C xr localprof: PROF=-pg PROFILER=-DPROFILER make local +localmem: + MEMDEBUG=-DMEMDEBUG make local + install: local $(BINDIR)/xrctl install-manpages mkdir -p $(BINDIR) BASE=$(BASE) AUTHOR='$(AUTHOR)' MAINTAINER='$(MAINTAINER)' \ diff --git a/xr/Dispatchers/dispatcher/checkacl.cc b/xr/Dispatchers/dispatcher/checkacl.cc @@ -0,0 +1,26 @@ +#include "dispatcher" + +bool Dispatcher::check_acl() { + if (config.nallow()) { + bool allowed = false; + for (unsigned n = 0; n < config.nallow(); n++) + if (ipmatch(clientip(), config.allow(n))) { + allowed = true; + break; + } + if (!allowed) { + msg("Not serving client IP " + clientipstr() + + ": no match in allow list\n"); + return false; + } + } + if (config.ndeny()) { + for (unsigned n = 0; n < config.ndeny(); n++) + if (ipmatch(clientip(), config.deny(n))) { + msg("Not serving client IP " + clientipstr() + + ": match in deny list\n"); + return false; + } + } + return true; +} diff --git a/xr/Dispatchers/dispatcher/checkdos.cc b/xr/Dispatchers/dispatcher/checkdos.cc @@ -0,0 +1,119 @@ +#include "dispatcher" + +typedef map < unsigned long, std::queue<time_t> > AccessMap; +static AccessMap accesslog; +static time_t accesslog_lastclean = 0; + +// Execute an external program upon excess of hard/soft rates +static void run_excess(string const &prog, char const *ip) { + ostringstream o; + o << prog << ' ' << ip; + msg ((Mstr("Max connection rate exceeded, invoking '") + o.str()) + + "'\n"); + int ret = system(o.str().c_str()); + if (ret == -1) + throw Error(string("Failed to start system call: ") + + strerror(errno)); + else if (WIFEXITED(ret)) { + int exitstat = WEXITSTATUS(ret); + if (exitstat) + warnmsg((Mstr("Program '") + o.str()) + + (Mstr("' exited with exit status ") + exitstat) + + "\n"); + else + msg ("Program terminated normally.\n"); + } else + warnmsg((Mstr("Program '") + o.str()) + + "' terminated abnormally!\n"); +} + +bool Dispatcher::check_dos() { + msg ("Verifying DOS protection\n"); + Threadlist::desc("Verifying"); + + // Check 'softmaxconnrate' and 'hardmaxconnrate' now! + // Descend into this block if connrate_time() is set, AND + // either hardmaxconnrate() is set, + // or both softmaxconnrate() and defertime() are set. + if (config.connrate_time() && + (config.hardmaxconnrate() || + (config.softmaxconnrate() && config.defertime()))) { + time_t now, min_ts; + now = time(0); + min_ts = now - config.connrate_time(); + unsigned max_conns = max(config.hardmaxconnrate(), + config.softmaxconnrate()); + + Mutex::lock (&accesslog[clientip().s_addr]); + accesslog[clientip().s_addr].push(now); + Mutex::unlock (&accesslog[clientip().s_addr]); + + if (accesslog_lastclean < min_ts) { + // Clean the entire access log, it's been a while... + + Mutex::lock(&accesslog_lastclean); + accesslog_lastclean = now; + Mutex::unlock(&accesslog_lastclean); + + for (AccessMap::iterator i = accesslog.begin(); + i != accesslog.end(); + i++ ) { + if (accesslog[i->first].back() < min_ts) { + // This IP hasn't made ANY connections in a while -- erase! + accesslog.erase(i); + } else { + // Keep popping off this IP's oldest connection until we + // have only "recent" connections left. + Mutex::lock(&accesslog[i->first]); + while ( accesslog[i->first].front() < min_ts + || accesslog[i->first].size() > max_conns ) { + accesslog[i->first].pop(); + } + Mutex::unlock(&accesslog[i->first]); + } + } + } else { + // The "big log" doesn't need to be fully cleaned, + // but this particular IP should be! + Mutex::lock(&accesslog[clientip().s_addr]); + while ( accesslog[clientip().s_addr].front() < min_ts + || accesslog[clientip().s_addr].size() > max_conns ) { + accesslog[clientip().s_addr].pop(); + } + Mutex::unlock(&accesslog[clientip().s_addr]); + } + + if (config.hardmaxconnrate() && + accesslog[clientip().s_addr].size() >= config.hardmaxconnrate() ) { + // This IP has violated the "HARD" limit! Reject the connection + ostringstream o; + o << "Client " << clientipstr() + << " has hit the HARD maximum number of connections (" + << config.hardmaxconnrate() << " conections in " + << config.connrate_time() << " seconds; " + << accesslog[clientip().s_addr].size() + << " connections recorded). Client is refused.\n"; + warnmsg (o.str()); + socketclose(clientfd()); + run_excess(config.hardmaxconnexcess(), clientipstr().c_str()); + return false; + } else if (config.softmaxconnrate() && + (accesslog[clientip().s_addr].size() >= + config.softmaxconnrate())) { + // This IP has violated the "SOFT" Limit. Go to sleep for a while. + ostringstream o; + o << "Client " << clientipstr() + << " has hit the SOFT maximum number of connections (" + << config.softmaxconnrate() << " connections in " + << config.connrate_time() << " sedonds; " + << accesslog[clientip().s_addr].size() + << " connections recorded). Client is deferred for " + << config.defertime() << " microseconds.\n"; + warnmsg (o.str()); + run_excess(config.softmaxconnexcess(), clientipstr().c_str()); + usleep(config.defertime()); + } + } + + return true; +} diff --git a/xr/Dispatchers/dispatcher/clientipstr.cc b/xr/Dispatchers/dispatcher/clientipstr.cc @@ -0,0 +1,10 @@ +#include "dispatcher" + +string const &Dispatcher::clientipstr() { + if (clientip_str == "") { + Mutex::lock( (void*)inet_ntoa ); + clientip_str = inet_ntoa(clientip()); + Mutex::unlock( (void*)inet_ntoa ); + } + return clientip_str; +} diff --git a/xr/Dispatchers/dispatcher/dispatcher b/xr/Dispatchers/dispatcher/dispatcher @@ -0,0 +1,67 @@ +#ifndef _DISPATCHER_ +#define _DISPATCHER_ + +#include "sys/sys" +#include "memory/memory" + +#include "balancer/balancer" +#include "config/config" +#include "ThreadsAndMutexes/thread/thread" +#include "ThreadsAndMutexes/threadlist/threadlist" +#include "backendvector/backendvector" +#include "netbuffer/netbuffer" + +// Dispatching algorithm workers +#include "DispatchAlgorithms/algorithm/algorithm" +#include "DispatchAlgorithms/roundrobin/roundrobin" +#include "DispatchAlgorithms/firstactive/firstactive" +#include "DispatchAlgorithms/leastconn/leastconn" +#include "DispatchAlgorithms/external/external" +#include "DispatchAlgorithms/hashedip/hashedip" +#include "DispatchAlgorithms/storedip/storedip" +#include "DispatchAlgorithms/weightedload/weightedload" + +#ifdef MEMDEBUG +class Dispatcher: public Thread, public Memory +#else +class Dispatcher: public Thread +#endif +{ +public: + + Dispatcher(int fd, struct in_addr ip); + Dispatcher(int fd); + virtual ~Dispatcher(); + + virtual void execute() = 0; + virtual void dispatch() = 0; + virtual void handle() = 0; + + bool check_dos(); + bool check_acl(); + + int targetbackend() const { return target_backend; } + void targetbackend(int t) { target_backend = t; } + struct in_addr clientip() const { return client_ip; } + void clientip(struct in_addr i) { client_ip = i; + clientip_str = ""; } + string const &clientipstr(); + int clientfd() const { return client_fd; } + void clientfd(int c) { client_fd = c; } + int backendfd() const { return backend_fd; } + void backendfd(int b) { backend_fd = b; } + Algorithm *algorithm() const { return algo; } + + BackendVector &targetlist() { return target_list; } + void targetlist (BackendVector t) { target_list = t; } + +private: + void start_dispatcher(); + struct in_addr client_ip; + int target_backend, client_fd, backend_fd; + Algorithm *algo; + BackendVector target_list; + string clientip_str; +}; + +#endif diff --git a/xr/Dispatchers/dispatcher/dispatcher1.cc b/xr/Dispatchers/dispatcher/dispatcher1.cc @@ -0,0 +1,8 @@ +#include "dispatcher" + +Dispatcher::Dispatcher(int cfd, struct in_addr cip): + Thread(), client_ip(cip), target_backend(-1), client_fd(cfd), + backend_fd(-1), target_list(), clientip_str() { + + start_dispatcher(); +} diff --git a/xr/dispatcher/dispatcher2.cc b/xr/Dispatchers/dispatcher/dispatcher2.cc diff --git a/xr/Dispatchers/dispatcher/dispatcher3.cc b/xr/Dispatchers/dispatcher/dispatcher3.cc @@ -0,0 +1,8 @@ +#include "dispatcher" + +Dispatcher::Dispatcher(int fd): + Thread(), target_backend(-1), client_fd(fd), + backend_fd(-1), target_list(), clientip_str() { + + start_dispatcher(); +} diff --git a/xr/Dispatchers/dispatcher/startdispatcher.cc b/xr/Dispatchers/dispatcher/startdispatcher.cc @@ -0,0 +1,37 @@ +#include "dispatcher" + +void Dispatcher::start_dispatcher() { + // Instantiate dispatchmode algorithm + switch (config.dispatchmode()) { + case Dispatchmode::m_roundrobin: + algo = new Roundrobin; + break; + case Dispatchmode::m_firstactive: + algo = new Firstactive; + break; + case Dispatchmode::m_external: + algo = new External; + break; + case Dispatchmode::m_strict_hashed_ip: + case Dispatchmode::m_lax_hashed_ip: + algo = new HashedIp; + break; + case Dispatchmode::m_strict_stored_ip: + case Dispatchmode::m_lax_stored_ip: + algo = new StoredIp; + break; + case Dispatchmode::m_weighted_load: + algo = new Weightedload; + break; + case Dispatchmode::m_leastconn: + default: + algo = new Leastconn; + break; + } + + // NOTE: Memory errors for algorithm pointer are not handled here, + // but in dispatch() (don't want to throw up in the constructor) + + debugmsg("Dispatcher instantiated.\n"); +} + diff --git a/xr/httpdispatcher/dispatch.cc b/xr/Dispatchers/httpdispatcher/dispatch.cc diff --git a/xr/Dispatchers/httpdispatcher/handle.cc b/xr/Dispatchers/httpdispatcher/handle.cc @@ -0,0 +1,99 @@ +#include "httpdispatcher" + +void HttpDispatcher::handle() { + PROFILE("HttpDispatcher::handle"); + + // The client request was already retrieved before starting the + // dispatcher. We can continue by applying server-directed headers. + if (config.addxrversion()) + buf.setheader ("XR", VER); + if (config.addxforwardedfor()) + buf.addheader ("X-Forwarded-For", clientipstr()); + for (unsigned n = 0; n < config.nserverheaders(); n++) + buf.setheader (config.serverheader(n)); + + // Patch up the Host: header if requested so. + if (config.replacehostheader()) + buf.replaceheader("Host:", + balancer.backend(targetbackend()).server()); + + // Flush client info received so far to the back end. + debugmsg("Sending client request to back end\n"); + buf.netwrite(backendfd(), config.backend_timeout()); + + // Let's see if we will need to modify the server headers. + bool modify_serverheaders = false; + if (config.addxrversion() || + (config.stickyhttp() && !issticky())) + modify_serverheaders = true; + + // Store the client request. May want to log it later. + string client_request = buf.firstline(); + + // Go into copy-thru mode. If required, catch the server headers on + // their first appearance and modify them. + bool backend_response_checked = false; + while (1) { + Fdset readset (config.client_timeout()); + readset.add(clientfd()); + readset.add(backendfd()); + + int sock; + if ((sock = readset.readable()) < 0) + break; + + buf.reset(); + + if (!buf.netread(sock)) + break; + + if (sock == backendfd() && modify_serverheaders) { + debugmsg("Back end response seen, applying modifications\n"); + modify_serverheaders = false; + while (! buf.headersreceived()) + if (!buf.netread (sock, config.backend_timeout())) + throw Error("Failed to get headers from back end"); + if (config.addxrversion()) + buf.setheader("XR", VER); + if (config.stickyhttp() && !issticky()) { + ostringstream o; + o << "XRTarget=" << targetbackend(); + buf.setheader("Set-Cookie", o.str()); + } + } + + // The back end response may now get flushed to the client. + // If the response code is 4** or 5**, log it as a warning. + if (!backend_response_checked && + sock == backendfd() && buf.headersreceived()) { + string respcode = buf.stringat(9, 3); + if (respcode[0] == '4' || respcode[0] == '5') + warnmsg("HTTP back end indicates fault: '" + + buf.firstline() + "' as response to '" + + client_request + "'\n"); + backend_response_checked = true; + } + + // Flush info to the other connected side. + int othersock, timeout; + if (sock == clientfd()) { + othersock = backendfd(); + timeout = config.backend_timeout(); + // Re-patch Host header if requested + if (config.replacehostheader()) + buf.replaceheader("Host:", + balancer.backend(targetbackend()).server()); + } else { + othersock = clientfd(); + timeout = config.client_timeout(); + } + + debugmsg (Mstr("Had data on ") + sock + + (Mstr(", sending to ") + othersock) + "\n"); + + buf.netwrite(othersock, timeout); + if (sock == backendfd()) + balancer.backend(targetbackend()).addbytes(buf.bufsz()); + } + +} diff --git a/xr/Dispatchers/httpdispatcher/httpdispatcher b/xr/Dispatchers/httpdispatcher/httpdispatcher @@ -0,0 +1,24 @@ +#ifndef _HTTPDISPATCHER_ +#define _HTTPDISPATCHER_ + +#include "sys/sys" +#include "Dispatchers/tcpdispatcher/tcpdispatcher" +#include "httpbuffer/httpbuffer" + +class HttpDispatcher: public TcpDispatcher { +public: + HttpDispatcher (int fd, struct in_addr ip); + + void dispatch(); + void handle(); + bool issticky() const { return (is_sticky); } + void issticky (bool s) { is_sticky = s; } + +private: + void senderrorpage(string const &desc); + + Httpbuffer buf; + bool is_sticky; +}; + +#endif diff --git a/xr/httpdispatcher/httpdispatcher1.cc b/xr/Dispatchers/httpdispatcher/httpdispatcher1.cc diff --git a/xr/httpdispatcher/senderrorpage.cc b/xr/Dispatchers/httpdispatcher/senderrorpage.cc diff --git a/xr/tcpdispatcher/dispatch.cc b/xr/Dispatchers/tcpdispatcher/dispatch.cc diff --git a/xr/Dispatchers/tcpdispatcher/execute.cc b/xr/Dispatchers/tcpdispatcher/execute.cc @@ -0,0 +1,50 @@ +#include "tcpdispatcher" + +void TcpDispatcher::execute() { + Threadlist::clientfd(clientfd()); + + if (!check_dos() || + !check_acl()) + return; + + msg ((Mstr("Dispatch request for client fd ") + clientfd()) + "\n"); + + try { + Threadlist::desc("Dispatching"); + dispatch(); + } catch (Error const &e) { + Mutex::lock(&cerr); + cerr << e.what() << "\n"; + Mutex::unlock(&cerr); + socketclose(clientfd()); + socketclose(backendfd()); + return; + } + + msg ((Mstr("Dispatching client fd ") + clientfd()) + + (Mstr(" to ") + balancer.backend(targetbackend()).description()) + + (Mstr(", fd ") + backendfd()) + "\n"); + + Threadlist::desc("Serving"); + Threadlist::backend(targetbackend()); + Threadlist::backendfd(backendfd()); + + balancer.backend(targetbackend()).startconnection(); + + try { + handle(); + } catch (Error const &e) { + Mutex::lock(&cerr); + cerr << e.what() << "\n"; + Mutex::unlock(&cerr); + } + + balancer.backend(targetbackend()).endconnection(); + + socketclose (clientfd()); + socketclose (backendfd()); + + msg ((Mstr("Done dispatching to back end fd ") + backendfd()) + + (Mstr(" at ") + balancer.backend(targetbackend()).description()) + + "\n"); +} diff --git a/xr/tcpdispatcher/handle.cc b/xr/Dispatchers/tcpdispatcher/handle.cc diff --git a/xr/Dispatchers/tcpdispatcher/tcpdispatcher b/xr/Dispatchers/tcpdispatcher/tcpdispatcher @@ -0,0 +1,22 @@ +#ifndef _TCPDISPATCHER_ +#define _TCPDISPATCHER_ + +#include "Dispatchers/dispatcher/dispatcher" +#include "netbuffer/netbuffer" + +class TcpDispatcher: public Dispatcher { +public: + + TcpDispatcher (int fd, struct in_addr ip); + + virtual void execute(); + virtual void dispatch(); + virtual void handle(); + + unsigned readchunk (int src); + +private: + Netbuffer netbuffer; +}; + +#endif diff --git a/xr/tcpdispatcher/tcpdispatcher1.cc b/xr/Dispatchers/tcpdispatcher/tcpdispatcher1.cc diff --git a/xr/Dispatchers/udpdispatcher/dispatch.cc b/xr/Dispatchers/udpdispatcher/dispatch.cc @@ -0,0 +1,5 @@ +#include "udpdispatcher" + +void UdpDispatcher::dispatch() { + throw Error("UDP dispatcher: dispatch not yet implemented"); +} diff --git a/xr/Dispatchers/udpdispatcher/execute.cc b/xr/Dispatchers/udpdispatcher/execute.cc @@ -0,0 +1,5 @@ +#include "udpdispatcher" + +void UdpDispatcher::execute() { + throw Error("UDP dispatcher: execute not yet implemented"); +} diff --git a/xr/Dispatchers/udpdispatcher/handle.cc b/xr/Dispatchers/udpdispatcher/handle.cc @@ -0,0 +1,5 @@ +#include "udpdispatcher" + +void UdpDispatcher::handle() { + throw Error("UDP dispatcher: handle not yet implemented"); +} diff --git a/xr/Dispatchers/udpdispatcher/udpdispatcher b/xr/Dispatchers/udpdispatcher/udpdispatcher @@ -0,0 +1,16 @@ +#ifndef _UDPDISPATCHER_ +#define _UDPDISPATCHER_ + +#include "Dispatchers/dispatcher/dispatcher" + +class UdpDispatcher: public Dispatcher { +public: + UdpDispatcher(int fd); + virtual void execute(); + virtual void dispatch(); + virtual void handle(); +private: + Netbuffer netbuffer; +}; + +#endif diff --git a/xr/Dispatchers/udpdispatcher/udpdispatcher1.cc b/xr/Dispatchers/udpdispatcher/udpdispatcher1.cc @@ -0,0 +1,4 @@ +#include "udpdispatcher" + +UdpDispatcher::UdpDispatcher(int fd) : Dispatcher(fd), netbuffer() { +} diff --git a/xr/Makefile b/xr/Makefile @@ -35,7 +35,7 @@ subdirs: $(BUILDDIR)/usage.h $(BUILDDIR)/status.xslt.h echo "Making: $$f"; \ BASE=$(BASE) CC=$(CONF_CC) BUILDDIR=$(BUILDDIR) VER='$(VER)' \ AUTHOR='$(AUTHOR)' MAINTAINER='$(MAINTAINER)' \ - DISTSITE='$(DISTSITE)' \ + DISTSITE='$(DISTSITE)' MEMDEBUG=$(MEMDEBUG)\ CONF_CC='$(CONF_CC)' CONF_LIB='$(CONF_LIB)' \ CONF_GETOPT=$(CONF_GETOPT) CONF_GETOPT_LONG=$(CONF_GETOPT_LONG) \ CONF_INET_ATON=$(CONF_INET_ATON) CONF_OPTFLAGS='$(CONF_OPTFLAGS)' \ diff --git a/xr/ThreadsAndMutexes/thread/thread b/xr/ThreadsAndMutexes/thread/thread @@ -2,6 +2,7 @@ #define _THREAD_ #include "sys/sys" +#include "memory/memory" #include "error/error" #include "config/config" #include "timestamp/timestamp" @@ -10,7 +11,12 @@ using namespace std; -class Thread: public Mutex { +#ifdef MEMDEBUG +class Thread: public Mutex, public Memory +#else +class Thread: public Mutex +#endif +{ public: virtual ~Thread(); void start(); diff --git a/xr/balancer/balancer b/xr/balancer/balancer @@ -2,6 +2,7 @@ #define _BALANCER_ #include "sys/sys" +#include "memory/memory" #include "backend/backend" #include "backenddef/backenddef" diff --git a/xr/balancer/init.cc b/xr/balancer/init.cc @@ -2,14 +2,19 @@ void Balancer::init() { // Set the listening socket. - if (config.sport()) - server_fd = serversocket (config.sipaddr(), config.sport(), - "balancer"); - else - server_fd = 0; - + if (config.stype() != Servertype::t_udp) { + if (config.sport()) + server_fd = serversocket(config.sipaddr(), config.sport(), + "balancer", Servertype::t_tcp); + else + server_fd = 0; + } else { + server_fd = serversocket(config.sipaddr(), config.sport(), + "balancer", Servertype::t_udp); + } + // Start the web interface if requested. - if (config.usewebinterface()) { + if (config.usewebinterface() && !config.foregroundmode()) { Webinterface *w = new Webinterface(); if (! w) throw Error("Memory fault in Balancer::init"); diff --git a/xr/balancer/serve.cc b/xr/balancer/serve.cc @@ -1,9 +1,10 @@ #include "balancer" -#include "../tcpdispatcher/tcpdispatcher" -#include "../httpdispatcher/httpdispatcher" +#include "Dispatchers/tcpdispatcher/tcpdispatcher" +#include "Dispatchers/httpdispatcher/httpdispatcher" +#include "Dispatchers/udpdispatcher/udpdispatcher" void Balancer::serve() { - int clsock; + int clsock = -1; // Start up wakeup/checkup handlers. These are always started - even // when config.wakeupsec() and config.checkupsec() are not defined @@ -35,7 +36,10 @@ void Balancer::serve() { // Wait for activity, serve it. msg ((Mstr("Awaiting activity on fd ") + server_fd) + "\n"); + MEM(Memory::mem_mark("Balancer start")); + MEM(Memory::mem_follow(true)); while (true) { + MEM(Memory::mem_display()); Fdset fdset(0); fdset.add (server_fd); if (fdset.readable() < 0) { @@ -70,88 +74,55 @@ void Balancer::serve() { } } - // Got activity! + // Got activity! Check total # of connections. + msg ((Mstr("Got activity on fd ") + server_fd) + "\n"); request_nr++; + if (config.maxconn() && connections() >= config.maxconn()) { + msg ((Mstr("Not serving connection: already ") + connections()) + + (Mstr(" connection(s) (max ") + config.maxconn()) + ")\n"); + continue; + } if (server_fd) { - // If tcp-serving: server_fd > 0; serve and loop again - int size; + // In daemon mode (server_fd > 0): serve and loop again struct sockaddr_in clname; - - size = sizeof(clname); - if ( (clsock = accept (server_fd, (struct sockaddr *) &clname, - (socklen_t*) &size)) < 0 ) { - warnmsg(Mstr("Failed to accept network connection: ") + - Mstr(strerror(errno)) + "\n"); - continue; - } - - string clientip = inet_ntoa(clname.sin_addr); - - // If there is an allow list, the client must match it. - if (config.nallow()) { - if (config.debug()) - debugmsg ("Matching " + clientip + " against allow list\n"); - bool allowed = false; - for (unsigned n = 0; n < config.nallow(); n++) { - if (ipmatch (clname.sin_addr, config.allow(n))) { - allowed = true; - break; - } - } - if (!allowed) { - msg ("Not serving client " + clientip + - ": no match in allow list\n"); - socketclose (clsock); + int size = sizeof(clname); + + // Accept the client if this is a TCP connection. + if (config.stype() != Servertype::t_udp) { + if ( (clsock = accept (server_fd, (struct sockaddr *) &clname, + (socklen_t*) &size)) < 0 ) { + warnmsg(Mstr("Failed to accept network connection: ") + + Mstr(strerror(errno)) + "\n"); continue; } - } - // If the client is in the deny list, deny it. - if (config.debug()) - debugmsg ("Matching " + clientip + " against deny list\n"); - bool denied = false; - for (unsigned n = 0; n < config.ndeny(); n++) - if (ipmatch (clname.sin_addr, config.deny(n))) { - denied = true; - break; - } - if (denied) { - msg ("Not serving client " + clientip + - ": match in deny list\n"); - socketclose (clsock); - continue; + string clientip = inet_ntoa(clname.sin_addr); + msg ((Mstr("Accepted connection from ") + clientip) + + (Mstr(" as client fd ") + clsock) + "\n"); } - // Show whom we've accepted + // Show how we look if (config.verbose()) { - msg ((Mstr("Accepted connection from ") + clientip) + - (Mstr(" as client fd ") + clsock) + "\n"); msg ((Mstr("Balancer is serving ") + connections()) + " clients\n"); msg ("Current back end states:\n"); for (unsigned i = 0; i < nbackends(); i++) msg((Mstr(" Back end ") + backend(i).description()) + (Mstr(": ") + backend(i).connections()) + - (Mstr(" connections , max ") + backend(i).maxconn()) + + (Mstr(" connections, max ") + backend(i).maxconn()) + (Mstr(", status ") + backend(i).availablestr()) + "\n"); } - // We got action! Check if the total connections to the - // balancer doesn't exceed the max. - if (config.maxconn() && connections() >= config.maxconn()) { - msg ((Mstr("Not serving client: already ") + connections()) + - (Mstr(" (max ") + config.maxconn()) + "\n"); - socketclose (clsock); - continue; - } - - TcpDispatcher *d; + Dispatcher *d; switch (config.stype()) { case Servertype::t_tcp: - d = new TcpDispatcher (clsock, clname.sin_addr); + d = new TcpDispatcher(clsock, clname.sin_addr); break; case Servertype::t_http: - d = new HttpDispatcher (clsock, clname.sin_addr); + d = new HttpDispatcher(clsock, clname.sin_addr); + break; + case Servertype::t_udp: + d = new UdpDispatcher(server_fd); break; default: throw Error("Internal error, can't choose dispatcher"); @@ -182,6 +153,9 @@ void Balancer::serve() { case Servertype::t_http: d = new HttpDispatcher (server_fd, dummy); break; + case Servertype::t_udp: + throw Error("UDP dispatching not allowed in inetd-mode"); + break; default: throw Error("Internal error, can't choose dispatcher"); break; diff --git a/xr/dispatcher/dispatcher b/xr/dispatcher/dispatcher @@ -1,51 +0,0 @@ -#ifndef _DISPATCHER_ -#define _DISPATCHER_ - -#include "sys/sys" -#include "balancer/balancer" -#include "config/config" -#include "ThreadsAndMutexes/thread/thread" -#include "ThreadsAndMutexes/threadlist/threadlist" -#include "backendvector/backendvector" -#include "netbuffer/netbuffer" - -// Dispatching algorithm workers -#include "DispatchAlgorithms/algorithm/algorithm" -#include "DispatchAlgorithms/roundrobin/roundrobin" -#include "DispatchAlgorithms/firstactive/firstactive" -#include "DispatchAlgorithms/leastconn/leastconn" -#include "DispatchAlgorithms/external/external" -#include "DispatchAlgorithms/hashedip/hashedip" -#include "DispatchAlgorithms/storedip/storedip" -#include "DispatchAlgorithms/weightedload/weightedload" - -class Dispatcher: public Thread { -public: - - Dispatcher (int fd, struct in_addr ip); - virtual ~Dispatcher(); - - virtual void execute() = 0; - virtual void dispatch() = 0; - virtual void handle() = 0; - - int targetbackend() const { return target_backend; } - void targetbackend(int t) { target_backend = t; } - struct in_addr clientip() const { return client_ip; } - int clientfd() const { return client_fd; } - void clientfd(int c) { client_fd = c; } - int backendfd() const { return backend_fd; } - void backendfd(int b) { backend_fd = b; } - Algorithm *algorithm() const { return algo; } - - BackendVector &targetlist() { return target_list; } - void targetlist (BackendVector t) { target_list = t; } - -private: - struct in_addr client_ip; - int target_backend, client_fd, backend_fd; - Algorithm *algo; - BackendVector target_list; -}; - -#endif diff --git a/xr/dispatcher/dispatcher1.cc b/xr/dispatcher/dispatcher1.cc @@ -1,40 +0,0 @@ -#include "dispatcher" - -Dispatcher::Dispatcher(int cfd, struct in_addr cip): - Thread(), client_ip(cip), target_backend(-1), client_fd(cfd), - backend_fd(-1), target_list() { - - // Instantiate dispatchmode algorithm - switch (config.dispatchmode()) { - case Dispatchmode::m_roundrobin: - algo = new Roundrobin; - break; - case Dispatchmode::m_firstactive: - algo = new Firstactive; - break; - case Dispatchmode::m_external: - algo = new External; - break; - case Dispatchmode::m_strict_hashed_ip: - case Dispatchmode::m_lax_hashed_ip: - algo = new HashedIp; - break; - case Dispatchmode::m_strict_stored_ip: - case Dispatchmode::m_lax_stored_ip: - algo = new StoredIp; - break; - case Dispatchmode::m_weighted_load: - algo = new Weightedload; - break; - case Dispatchmode::m_leastconn: - default: - algo = new Leastconn; - break; - } - - // NOTE: Memory errors for algorithm pointer are not handled here, - // but in dispatch() (don't want to throw up in the constructor) - - debugmsg("Dispatcher instantiated.\n"); -} - diff --git a/xr/etc/Makefile.class b/xr/etc/Makefile.class @@ -7,10 +7,10 @@ class-compile: $(OBJ) $(BASE)/xr/$(BUILDDIR)/$(DIR)_%.o: %.cc @echo "Compiling: " `pwd` $< - @$(CONF_CC) $(PROF) $(PROFILER) $(CONF_OPTFLAGS) \ + $(CONF_CC) $(PROF) $(PROFILER) $(CONF_OPTFLAGS) \ -DVER='"$(VER)"' -DAUTHOR='"$(AUTHOR)"' \ -DMAINTAINER='"$(MAINTAINER)"' -DDISTSITE='"$(DISTSITE)"' \ - -DSYS='"$(SYS)"' -D$(SYS) \ + -DSYS='"$(SYS)"' -D$(SYS) $(MEMDEBUG) \ -DCONF_CC='"$(CONF_CC)"' -DCONF_LIB='"$(CONF_LIB)"' \ -DCONF_OPTFLAGS='"$(CONF_OPTFLAGS)"' $(CONF_STRNSTR) \ $(CONF_GETOPT) $(CONF_GETOPT_LONG) $(CONF_INET_ATON) \ diff --git a/xr/etc/usage.txt b/xr/etc/usage.txt @@ -118,9 +118,9 @@ may not exist on your platform): Enables sticky HTTP sessions by injecting XRTarget cookies into HTTP streams. Only effective with -s http:.... -s TYPE:IPADDRESS:PORT, --server TYPE:IPADDRESS:PORT - Specifies the server. TYPE is tcp or http. IPADDRESS is the IP address - to listen to. PORT defines the TCP port to listen (for type tcp or - http); when port is 0, XR will listen to stdin (inetd-mode). + Specifies the server. TYPE is tcp or http or udp. IPADDRESS is the IP + address to listen to. PORT defines the TCP port to listen; when port + is 0, XR will listen to stdin (inetd-mode, not available for udp). Default: tcp:0:10000 (TCP balancing, on all interfaces, via port 10000). -t SEC, --backend-timeout SEC Defines network timeouts for back ends, default 3 sec. Use 0 to diff --git a/xr/httpdispatcher/handle.cc b/xr/httpdispatcher/handle.cc @@ -1,104 +0,0 @@ -#include "httpdispatcher" - -void HttpDispatcher::handle() { - PROFILE("HttpDispatcher::handle"); - - // The client request was already retrieved before starting the - // dispatcher. We can continue by applying server-directed headers. - if (config.addxrversion()) - buf.setheader ("XR", VER); - if (config.addxforwardedfor()) - buf.addheader ("X-Forwarded-For", string(inet_ntoa(clientip()))); - for (unsigned n = 0; n < config.nserverheaders(); n++) - buf.setheader (config.serverheader(n)); - - // Patch up the Host: header if requested so. - if (config.replacehostheader()) - buf.replaceheader("Host:", - balancer.backend(targetbackend()).server()); - /* - * Httpbuffer::replaceheader() is built but not used, - * e.g.: - * buf.replaceheader("MyHeader:", "MyValue"); - */ - - // Flush client info received so far to the back end. - debugmsg("Sending client request to back end\n"); - buf.netwrite(backendfd(), config.backend_timeout()); - - // Let's see if we will need to modify the server headers. - bool modify_serverheaders = false; - if (config.addxrversion() || - (config.stickyhttp() && !issticky())) - modify_serverheaders = true; - - // Store the client request. May want to log it later. - string client_request = buf.firstline(); - - // Go into copy-thru mode. If required, catch the server headers on - // their first appearance and modify them. - bool backend_response_checked = false; - while (1) { - Fdset readset (config.client_timeout()); - readset.add(clientfd()); - readset.add(backendfd()); - - int sock; - if ((sock = readset.readable()) < 0) - break; - - buf.reset(); - - if (!buf.netread(sock)) - break; - - if (sock == backendfd() && modify_serverheaders) { - debugmsg("Back end response seen, applying modifications\n"); - modify_serverheaders = false; - while (! buf.headersreceived()) - if (!buf.netread (sock, config.backend_timeout())) - throw Error("Failed to get headers from back end"); - if (config.addxrversion()) - buf.setheader("XR", VER); - if (config.stickyhttp() && !issticky()) { - ostringstream o; - o << "XRTarget=" << targetbackend(); - buf.setheader("Set-Cookie", o.str()); - } - } - - // The back end response may now get flushed to the client. - // If the response code is 4** or 5**, log it as a warning. - if (!backend_response_checked && - sock == backendfd() && buf.headersreceived()) { - string respcode = buf.stringat(9, 3); - if (respcode[0] == '4' || respcode[0] == '5') - warnmsg("HTTP back end indicates fault: '" + - buf.firstline() + "' as response to '" + - client_request + "'\n"); - backend_response_checked = true; - } - - // Flush info to the other connected side. - int othersock, timeout; - if (sock == clientfd()) { - othersock = backendfd(); - timeout = config.backend_timeout(); - // Re-patch Host header if requested - if (config.replacehostheader()) - buf.replaceheader("Host:", - balancer.backend(targetbackend()).server()); - } else { - othersock = clientfd(); - timeout = config.client_timeout(); - } - - debugmsg (Mstr("Had data on ") + sock + - (Mstr(", sending to ") + othersock) + "\n"); - - buf.netwrite(othersock, timeout); - if (sock == backendfd()) - balancer.backend(targetbackend()).addbytes(buf.bufsz()); - } - -} diff --git a/xr/httpdispatcher/httpdispatcher b/xr/httpdispatcher/httpdispatcher @@ -1,24 +0,0 @@ -#ifndef _HTTPDISPATCHER_ -#define _HTTPDISPATCHER_ - -#include "../sys/sys" -#include "../tcpdispatcher/tcpdispatcher" -#include "../httpbuffer/httpbuffer" - -class HttpDispatcher: public TcpDispatcher { -public: - HttpDispatcher (int fd, struct in_addr ip); - - void dispatch(); - void handle(); - bool issticky() const { return (is_sticky); } - void issticky (bool s) { is_sticky = s; } - -private: - void senderrorpage(string const &desc); - - Httpbuffer buf; - bool is_sticky; -}; - -#endif diff --git a/xr/memory/memdata.cc b/xr/memory/memdata.cc @@ -0,0 +1,4 @@ +#include "memory" + +Memory::MemoryLog Memory::s_memlog; +bool Memory::s_follow; diff --git a/xr/memory/memdelete.cc b/xr/memory/memdelete.cc @@ -0,0 +1,5 @@ +#include "memory" + +void Memory::operator delete(void *ptr) { + free(ptr); +} diff --git a/xr/memory/memdelete1.cc b/xr/memory/memdelete1.cc @@ -0,0 +1,5 @@ +#include "memory" + +void Memory::operator delete[](void *ptr) { + free(ptr); +} diff --git a/xr/memory/memdisplay.cc b/xr/memory/memdisplay.cc @@ -0,0 +1,13 @@ +#include "memory" + +void Memory::mem_display() { + Mutex::lock(&s_memlog); + for (unsigned i = 0; i < s_memlog.size(); i++) { + MemoryEntry ent = s_memlog[i]; + Mutex::lock(&cout); + cout << "Memory::mem_display " << ent.ptr << ' ' << ent.sz + << ' ' << ent.desc << '\n'; + Mutex::unlock(&cout); + } + Mutex::unlock(&s_memlog); +} diff --git a/xr/memory/memfollow.cc b/xr/memory/memfollow.cc @@ -0,0 +1,5 @@ +#include "memory" + +void Memory::mem_follow(bool b) { + s_follow = b; +} diff --git a/xr/memory/memfollow1.cc b/xr/memory/memfollow1.cc @@ -0,0 +1,5 @@ +#include "memory" + +bool Memory::mem_follow() { + return s_follow; +} diff --git a/xr/memory/memfree.cc b/xr/memory/memfree.cc @@ -0,0 +1,25 @@ +#include "memory" + +void Memory::free(void *ptr) { + if (!ptr) + return; + + for (unsigned i = 0; i < s_memlog.size(); i++) { + Mutex::lock(&s_memlog); + MemoryEntry ent = s_memlog[i]; + Mutex::unlock(&s_memlog); + if (ent.ptr == ptr) { + if (s_follow) { + Mutex::lock(&cout); + cout << "Memory::free(" << ptr << ")\n"; + Mutex::unlock(&cout); + } + ::free (ent.ptr); + Mutex::lock(&s_memlog); + s_memlog.erase(s_memlog.begin() + i); + Mutex::unlock(&s_memlog); + return; + } + } + cerr << "FREE Request for non previously allocated memory\n"; +} diff --git a/xr/memory/memmalloc.cc b/xr/memory/memmalloc.cc @@ -0,0 +1,16 @@ +#include "memory" + +void *Memory::malloc(size_t sz, string const &desc) { + void *ptr = ::malloc(sz); + MemoryEntry ent = { ptr, sz, desc }; + Mutex::lock(&s_memlog); + s_memlog.push_back(ent); + Mutex::unlock(&s_memlog); + if (s_follow) { + Mutex::lock(&cout); + cout << "Memory::malloc(" << sz << ") -> " << ptr << ' ' + << desc << '\n'; + Mutex::unlock(&cout); + } + return ptr; +} diff --git a/xr/memory/memmark.cc b/xr/memory/memmark.cc @@ -0,0 +1,8 @@ +#include "memory" + +void Memory::mem_mark(string const &desc) { + MemoryEntry ent = { 0, 0, "MARK " + desc }; + Mutex::lock(&s_memlog); + s_memlog.push_back(ent); + Mutex::unlock(&s_memlog); +} diff --git a/xr/memory/memnew.cc b/xr/memory/memnew.cc @@ -0,0 +1,5 @@ +#include "memory" + +void *Memory::operator new(size_t sz) { + return malloc(sz); +} diff --git a/xr/memory/memnew1.cc b/xr/memory/memnew1.cc @@ -0,0 +1,5 @@ +#include "memory" + +void *Memory::operator new[](size_t sz) { + return malloc(sz); +} diff --git a/xr/memory/memory b/xr/memory/memory @@ -0,0 +1,40 @@ +#ifndef _MEMORY_ +#define _MEMORY_ + +#include "sys/sys" +#include "ThreadsAndMutexes/mutex/mutex" + +class Memory { +public: + // Memory accessing functions + static void *malloc(size_t sz, string const &desc = ""); + static void *realloc(void *ptr, size_t newsz, string const &desc = ""); + static void free(void *ptr); + static void *operator new(size_t sz); + static void operator delete(void *ptr); + static void *operator new[](size_t sz); + static void operator delete[](void *ptr); + + // Follow actions immediately? + static void mem_follow(bool b); + static bool mem_follow(); + + // Dump what we have + static void mem_display(); + + // Marker in the overview + static void mem_mark(string const &desc = ""); + + // Internal storage types + struct MemoryEntry { + void *ptr; + size_t sz; + string desc; + }; + typedef vector<MemoryEntry> MemoryLog; +private: + static MemoryLog s_memlog; + static bool s_follow; +}; + +#endif diff --git a/xr/memory/memrealloc.cc b/xr/memory/memrealloc.cc @@ -0,0 +1,29 @@ +#include "memory" + +void *Memory::realloc(void *ptr, size_t newsz, string const &desc) { + if (!newsz) { + free(ptr); + return 0; + } else { + for (unsigned i = 0; i < s_memlog.size(); i++) { + MemoryEntry ent = s_memlog[i]; + if (ent.ptr == ptr) { + ent.ptr = ::realloc(ptr, newsz); + ent.sz = newsz; + ent.desc = desc; + Mutex::lock(&s_memlog); + s_memlog[i] = ent; + Mutex::unlock(&s_memlog); + if (s_follow) { + Mutex::lock(&cout); + cout << "Memory::realloc(" << ptr << ", " << newsz + << ") -> " << ent.ptr << ' ' << desc << '\n'; + Mutex::unlock(&cout); + } + return ent.ptr; + } + } + cerr << "REALLOC Request for non previously allocated memory\n"; + } + return 0; +} diff --git a/xr/netbuffer/destroy.cc b/xr/netbuffer/destroy.cc @@ -1,7 +1,7 @@ #include "netbuffer" void Netbuffer::destroy() { - delete buf_data; + free(buf_data); buf_data = 0; buf_sz = 0; buf_alloced = 0; diff --git a/xr/netbuffer/netbuffer b/xr/netbuffer/netbuffer @@ -2,6 +2,8 @@ #define _NETBUFFER_ #include "sys/sys" +#include "memory/memory" + #include "error/error" #include "config/config" #include "profiler/profiler" @@ -18,7 +20,7 @@ #define UNLOCK_MALLOC #endif -class Netbuffer { +class Netbuffer MEM(: public Memory) { public: Netbuffer(); Netbuffer (Netbuffer const &other); diff --git a/xr/servertype/servertype b/xr/servertype/servertype @@ -1,9 +1,7 @@ #ifndef _SERVERTYPE_ #define _SERVERTYPE_ -#include "../sys/sys" -#include "../error/error" - +#include <string> using namespace std; class Servertype { @@ -11,19 +9,15 @@ public: enum Type { t_tcp, t_http, + t_udp, }; - Servertype() : t(t_tcp) { - } - Servertype (string id) { - type (id); - } + Servertype(): t(t_tcp) { } + Servertype(string id) { type(id); } - void type (string id); + void type(string id); string typestr() const; - Type type() const { - return (t); - } + Type type() const { return t; } private: Type t; diff --git a/xr/servertype/type1.cc b/xr/servertype/type1.cc @@ -1,11 +1,15 @@ #include "servertype" +#include "error/error" + void Servertype::type (string id) { if (id == "tcp") t = t_tcp; else if (id == "http") t = t_http; + else if (id == "udp") + t = t_udp; else throw Error("Bad server type '" + id + - "', supported are 'tcp' or 'http'"); + "', supported are 'tcp' or 'http' or 'udp'"); } diff --git a/xr/servertype/typestr.cc b/xr/servertype/typestr.cc @@ -1,10 +1,14 @@ #include "servertype" +#include "error/error" + string Servertype::typestr() const { if (t == t_tcp) return ("tcp"); else if (t == t_http) return ("http"); + else if (t == t_udp) + return ("udp"); else - return ("server type unknown"); + throw Error("Server type unknown in Servertype::typestr"); } diff --git a/xr/sys/serversocket.cc b/xr/sys/serversocket.cc @@ -3,13 +3,16 @@ #include "profiler/profiler" #include "config/config" -int serversocket (string addr, int port, string desc) { +int serversocket (string addr, int port, string desc, Servertype::Type type) { PROFILE("serversocket"); - int sock; - // Create the server socket, set options - if ( (sock = socket (PF_INET, SOCK_STREAM, 0)) < 0 ) + int sock; + if (type != Servertype::t_udp) + sock = socket(PF_INET, SOCK_STREAM, 0); + else + sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (sock < 0) throw Error("Failed to create " + desc + " socket: " + strerror(errno)); int val = 1; @@ -38,11 +41,12 @@ int serversocket (string addr, int port, string desc) { if (bind (sock, (sockaddr*) &saddr, sizeof(saddr)) < 0) throw Error("Failed to bind " + desc + " to IP/port: " + strerror(errno)); - if (listen (sock, 5) < 0) - throw Error("Failed to listen to " + desc + - " IP/port: " + strerror(errno)); + if (type != Servertype::t_udp) + if (listen (sock, 5) < 0) + throw Error("Failed to listen to " + desc + + " IP/port: " + strerror(errno)); - msg ("TCP server for " + desc + " listening\n"); + msg ("Server for " + desc + " listening\n"); return (sock); } diff --git a/xr/sys/sys b/xr/sys/sys @@ -50,6 +50,13 @@ # define PROFILE(x) #endif +/* Memory debugging on/off */ +#ifdef MEMDEBUG +# define MEM(x) x +#else +# define MEM(x) +#endif + /* If you fear that your malloc() / realloc() may have threading problems, * uncomment the following. It will cause mutex locks around the calls. */ // #define MISTRUST_MALLOC_THREADSAFE @@ -63,6 +70,8 @@ using namespace std; // This we need locally for msg(), debugmsg() #include "mstr/mstr" +// This is for the Servertype of serversocket() +#include "servertype/servertype" /* Messaging. Conditionals are defined as a macro to speed things up. */ void _msg (Mstr const &s); @@ -74,12 +83,14 @@ void warnmsg (Mstr const &s); void anymsg(Mstr const &s, ostream &o, string const &label); /* Other */ -int serversocket (string addr, int port, string description); +int serversocket (string addr, int port, string description, + Servertype::Type t); bool ipmatch (struct in_addr addr, struct in_addr mask); void socketclose (int fd); vector<string> str2parts (string const &s, char sep); void mt_srand(unsigned long s); unsigned long mt_rand(void); +bool check_acl(string const &ipstr, struct in_addr ipaddr); #ifndef HAVE_INET_ATON int inet_aton (char const *name, struct in_addr *addr); diff --git a/xr/tcpdispatcher/execute.cc b/xr/tcpdispatcher/execute.cc @@ -1,158 +0,0 @@ -#include "tcpdispatcher" - -typedef map < unsigned long, std::queue<time_t> > AccessMap; -static AccessMap accesslog; -static time_t accesslog_lastclean = 0; - -// Execute an external program upon excess of hard/soft rates -static void run_excess(string const &prog, char const *ip) { - ostringstream o; - o << prog << ' ' << ip; - msg ((Mstr("Max connection rate exceeded, invoking '") + o.str()) + - "'\n"); - int ret = system(o.str().c_str()); - if (ret == -1) - throw Error(string("Failed to start system call: ") + - strerror(errno)); - else if (WIFEXITED(ret)) { - int exitstat = WEXITSTATUS(ret); - if (exitstat) - warnmsg((Mstr("Program '") + o.str()) + - (Mstr("' exited with exit status ") + exitstat) + - "\n"); - else - msg ("Program terminated normally.\n"); - } else - warnmsg((Mstr("Program '") + o.str()) + - "' terminated abnormally!\n"); -} - -void TcpDispatcher::execute() { - Threadlist::desc("Verifying"); - Threadlist::clientfd(clientfd()); - - msg ((Mstr("Dispatch request for client fd ") + clientfd()) + "\n"); - - // Check 'softmaxconnrate' and 'hardmaxconnrate' now! - // Descend into this block if connrate_time() is set, AND - // either hardmaxconnrate() is set, - // or both softmaxconnrate() and defertime() are set. - if (config.connrate_time() && - (config.hardmaxconnrate() || - (config.softmaxconnrate() && config.defertime()))) { - time_t now, min_ts; - now = time(0); - min_ts = now - config.connrate_time(); - unsigned max_conns = max(config.hardmaxconnrate(), - config.softmaxconnrate()); - - Mutex::lock (&accesslog[clientip().s_addr]); - accesslog[clientip().s_addr].push(now); - Mutex::unlock (&accesslog[clientip().s_addr]); - - if (accesslog_lastclean < min_ts) { - // Clean the entire access log, it's been a while... - - Mutex::lock(&accesslog_lastclean); - accesslog_lastclean = now; - Mutex::unlock(&accesslog_lastclean); - - for (AccessMap::iterator i = accesslog.begin(); - i != accesslog.end(); - i++ ) { - if (accesslog[i->first].back() < min_ts) { - // This IP hasn't made ANY connections in a while -- erase! - accesslog.erase(i); - } else { - // Keep popping off this IP's oldest connection until we - // have only "recent" connections left. - Mutex::lock(&accesslog[i->first]); - while ( accesslog[i->first].front() < min_ts - || accesslog[i->first].size() > max_conns ) { - accesslog[i->first].pop(); - } - Mutex::unlock(&accesslog[i->first]); - } - } - } else { - // The "big log" doesn't need to be fully cleaned, - // but this particular IP should be! - Mutex::lock(&accesslog[clientip().s_addr]); - while ( accesslog[clientip().s_addr].front() < min_ts - || accesslog[clientip().s_addr].size() > max_conns ) { - accesslog[clientip().s_addr].pop(); - } - Mutex::unlock(&accesslog[clientip().s_addr]); - } - - if (config.hardmaxconnrate() && - accesslog[clientip().s_addr].size() >= config.hardmaxconnrate() ) { - // This IP has violated the "HARD" limit! Reject the connection - ostringstream o; - o << "Client " << inet_ntoa(clientip()) - << " has hit the HARD maximum number of connections (" - << config.hardmaxconnrate() << " conections in " - << config.connrate_time() << " seconds; " - << accesslog[clientip().s_addr].size() - << " connections recorded). Client is refused.\n"; - warnmsg (o.str()); - socketclose(clientfd()); - run_excess(config.hardmaxconnexcess(), inet_ntoa(clientip())); - return; - } else if (config.softmaxconnrate() && - (accesslog[clientip().s_addr].size() >= - config.softmaxconnrate())) { - // This IP has violated the "SOFT" Limit. Go to sleep for a while. - ostringstream o; - o << "Client " << inet_ntoa(clientip()) - << " has hit the SOFT maximum number of connections (" - << config.softmaxconnrate() << " connections in " - << config.connrate_time() << " sedonds; " - << accesslog[clientip().s_addr].size() - << " connections recorded). Client is deferred for " - << config.defertime() << " microseconds.\n"; - warnmsg (o.str()); - run_excess(config.softmaxconnexcess(), inet_ntoa(clientip())); - usleep(config.defertime()); - } - } - - try { - Threadlist::desc("Dispatching"); - dispatch(); - } catch (Error const &e) { - Mutex::lock(&cerr); - cerr << e.what() << "\n"; - Mutex::unlock(&cerr); - socketclose(clientfd()); - socketclose(backendfd()); - return; - } - - msg ((Mstr("Dispatching client fd ") + clientfd()) + - (Mstr(" to ") + balancer.backend(targetbackend()).description()) + - (Mstr(", fd ") + backendfd()) + "\n"); - - Threadlist::desc("Serving"); - Threadlist::backend(targetbackend()); - Threadlist::backendfd(backendfd()); - - balancer.backend(targetbackend()).startconnection(); - - try { - handle(); - } catch (Error const &e) { - Mutex::lock(&cerr); - cerr << e.what() << "\n"; - Mutex::unlock(&cerr); - } - - balancer.backend(targetbackend()).endconnection(); - - socketclose (clientfd()); - socketclose (backendfd()); - - msg ((Mstr("Done dispatching to back end fd ") + backendfd()) + - (Mstr(" at ") + balancer.backend(targetbackend()).description()) + - "\n"); -} diff --git a/xr/tcpdispatcher/tcpdispatcher b/xr/tcpdispatcher/tcpdispatcher @@ -1,22 +0,0 @@ -#ifndef _TCPDISPATCHER_ -#define _TCPDISPATCHER_ - -#include "dispatcher/dispatcher" -#include "netbuffer/netbuffer" - -class TcpDispatcher: public Dispatcher { -public: - - TcpDispatcher (int fd, struct in_addr ip); - - virtual void execute(); - virtual void dispatch(); - virtual void handle(); - - unsigned readchunk (int src); - -private: - Netbuffer netbuffer; -}; - -#endif diff --git a/xr/webinterface/execute.cc b/xr/webinterface/execute.cc @@ -11,7 +11,7 @@ void Webinterface::execute() { msg ("Starting web interface\n"); sfd = serversocket (config.webinterfaceip(), config.webinterfaceport(), - "web interface"); + "web interface", Servertype::t_tcp); } catch (Error const &e) { cerr << e.what() << "(webinterface, retrying in a sec)\n"; sleep (1); diff --git a/xrctl/xrctl b/xrctl/xrctl @@ -429,11 +429,12 @@ sub xr_cmdarr { my $sp = xml_serviceparser($service) or die ("Failed to locate <service> block for service '$service'\n"); - # Service descriptions + # Service descriptions inside the <server> block + my $ss = xml_serverparser($sp); my $type = 'tcp'; - $type = $sp->data('type') if ($sp->data('type')); + $type = $ss->data('type') if ($ss->data('type')); my $addr = '0:10000'; - $addr = $sp->data('address') if ($sp->data('address')); + $addr = $ss->data('address') if ($ss->data('address')); my $full = "$type:$addr"; push (@cmd, '--server', $full) if ($full ne 'tcp:0:10000'); @@ -444,65 +445,65 @@ sub xr_cmdarr { # Handle general flags and boolflags push (@cmd, - flag($sp, '--web-interface', 'webinterface', ''), - flag($sp, '--dispatch-mode', 'dispatchmode', + flag($ss, '--web-interface', 'webinterface', ''), + flag($ss, '--dispatch-mode', 'dispatchmode', $default_dispatchmode), - flag($sp, '--max-connections', 'maxconnections', + flag($ss, '--max-connections', 'maxconnections', $default_maxconnections), - flag($sp, '--client-timeout', 'clienttimeout', + flag($ss, '--client-timeout', 'clienttimeout', $default_clienttimeout), - flag($sp, '--backend-timeout', 'backendtimeout', + flag($ss, '--backend-timeout', 'backendtimeout', $default_backendtimeout), - flag($sp, '--buffer-size', 'buffersize', + flag($ss, '--buffer-size', 'buffersize', $default_buffersize), - flag($sp, '--wakeup-interval', 'wakeupinterval', + flag($ss, '--wakeup-interval', 'wakeupinterval', $default_wakeupinterval), - flag($sp, '--checkup-interval', 'checkupinterval', + flag($ss, '--checkup-interval', 'checkupinterval', $default_checkupinterval), - flag($sp, '--time-interval', 'timeinterval', + flag($ss, '--time-interval', 'timeinterval', $default_timeinterval), - flag($sp, '--hard-maxconnrate', 'hardmaxconnrate', + flag($ss, '--hard-maxconnrate', 'hardmaxconnrate', $default_hardmaxconnrate), - flag($sp, '--soft-maxconnrate', 'softmaxconnrate', + flag($ss, '--soft-maxconnrate', 'softmaxconnrate', $default_softmaxconnrate), - flag($sp, '--defer-time', 'defertime', + flag($ss, '--defer-time', 'defertime', $default_defertime), - flag($sp, '--hard-maxconn-excess', 'hardmaxconnexcess', + flag($ss, '--hard-maxconn-excess', 'hardmaxconnexcess', $default_hardmaxconnexcess), - flag($sp, '--soft-maxconn-excess', 'softmaxconnexcess', + flag($ss, '--soft-maxconn-excess', 'softmaxconnexcess', $default_softmaxconnexcess), - flag($sp, '--dns-cache-timeout', 'dnscachetimeout', + flag($ss, '--dns-cache-timeout', 'dnscachetimeout', $default_dnscachetimeout), - flag($sp, '--log-traffic-dir', 'logtrafficdir', '')); + flag($ss, '--log-traffic-dir', 'logtrafficdir', '')); for my $k (sort (keys (%boolflags))) { - push (@cmd, $boolflags{$k}) if (istrue($sp->data($k))); + push (@cmd, $boolflags{$k}) if (istrue($ss->data($k))); } # ACL's for (my $i = 0; ; $i++) { - my $mask = $sp->data('allowfrom', $i) or last; + my $mask = $ss->data('allowfrom', $i) or last; push (@cmd, '--allow-from', $mask); } for (my $i = 0; ; $i++) { - my $mask = $sp->data('denyfrom', $i) or last; + my $mask = $ss->data('denyfrom', $i) or last; push (@cmd, '--deny-from', $mask); } # HTTP goodies push (@cmd, '--add-xr-version') - if ($sp->data('addxrversion') and - istrue($sp->data('addxrversion'))); + if ($ss->data('addxrversion') and + istrue($ss->data('addxrversion'))); push (@cmd, '--add-x-forwarded-for') - if ($sp->data('addxforwardedfor') and - istrue($sp->data('addxforwardedfor'))); + if ($ss->data('addxforwardedfor') and + istrue($ss->data('addxforwardedfor'))); push (@cmd, '--sticky-http') - if ($sp->data('stickyhttp') and - istrue($sp->data('stickyhttp'))); + if ($ss->data('stickyhttp') and + istrue($ss->data('stickyhttp'))); push (@cmd, '--replace-host-header') - if ($sp->data('replacehostheader') and - istrue($sp->data('replacehostheader'))); + if ($ss->data('replacehostheader') and + istrue($ss->data('replacehostheader'))); for (my $i = 0; ; $i++) { - my $h = $sp->data('header', $i) or last; + my $h = $ss->data('header', $i) or last; push (@cmd, '--add-server-header', $h); } @@ -578,7 +579,7 @@ sub xml_serviceparser { my $service = shift; for (my $i = 0; ; $i++) { - $xml = $xp->data('service', $i) or return (undef); + my $xml = $xp->data('service', $i) or return (undef); msg ("XML service block: $xml\n"); my $sub = new XMLParser($xml); return ($sub) if ($sub->data('name') eq $service); @@ -586,6 +587,13 @@ sub xml_serviceparser { return (undef); } +# Fetch an XMLParser for a <server> block given a service parser +sub xml_serverparser { + my $serviceparser = shift; + my $xml = $serviceparser->data('server') or return undef; + return new XMLParser($xml); +} + # Fetch an XMLParser for a <backend> block given a service parser and # an order number sub xml_backendparser {