commit 70ba40d536cb7cd72c03893e8837a9d2b9810c0a parent 7123fc4787fd87f5c3c5b971e2def5e0b8555e4b Author: finwo <finwo@pm.me> Date: Sat, 3 Jan 2026 19:36:10 +0100 2.21 Diffstat:
100 files changed, 825 insertions(+), 531 deletions(-)
diff --git a/ChangeLog b/ChangeLog @@ -1,13 +1,27 @@ +2.21 [KK 2008-10-14] +- Fixed round-robin dispatching with only 1 configured back end. The + bug was that on the next-time around, no "other" back end would be + found. +- Centralized reading/writing from fd's and buffer handling into class + Netbuffer. This removes superfluous buffer copying in the HTTP + dispatcher. +- Rewrote HTTP dispatcher & (hopefully) optimized it. +- Added Profiler class and xr/etc/xrprof tool. +- Many other small changes. +- Added docs on maxconn flags and weighted-load dispatching. +- THIS VERSION IS A REWRITE OF THE HTTP DISPATCHER. IT IS ALPHA-QUALITY. + 2.20 [KK 2008-10-13] - Fixed -C flag recognition, --close-sockets-fast worked, but I'd omitted -C from the flag set. - Ongoing optimization work. 2.19 [KK 2008-10-13] -- Changed the TCP Dispatcher to allow for setting a maximum # connection - attempts per client, with options to either defer the connection or drop it. -- Added a dispatch mdoe "weighted-load" for randomly picking a back end based - on the inverse of their respective load averages. (gem) +- Changed the TCP Dispatcher to allow for setting a maximum # + connection attempts per client, with options to either defer the + connection or drop it. +- Added a dispatch mdoe "weighted-load" for randomly picking a back + end based on the inverse of their respective load averages. (gem) - Verbose/debug options in the web interface are now rendered as select boxes. - Web interface errors are rendered on the client as an HTML error diff --git a/Makefile b/Makefile @@ -1,7 +1,7 @@ # Top-level Makefile for XR # ------------------------- -VER = 2.20 +VER = 2.21 BINDIR = /usr/sbin TAR = /tmp/crossroads-$(VER).tar.gz AUTHOR = Karel Kubat <karel@kubat.nl> @@ -24,10 +24,10 @@ local: xr/etc/gettools /usr/local/bin xr/etc c-conf e-ver xr/etc/e-ver ChangeLog $(VER) BASE=$(BASE) AUTHOR='$(AUTHOR)' MAINTAINER='$(MAINTAINER)' \ - VER='$(VER)' PROF=$(PROF) $(MAKE) -C xr + VER='$(VER)' PROF=$(PROF) PROFILER=$(PROFILER) $(MAKE) -C xr localprof: - PROF=-pg make local + PROF=-pg PROFILER=-DPROFILER make local install: local mkdir -p $(BINDIR) @@ -60,4 +60,3 @@ commit: local clean (echo 'SVN not fully up to date: run "svn status"' && exit 1) perl -c xrctl/xrctl svn commit - -\ No newline at end of file diff --git a/doc/xr.odt b/doc/xr.odt Binary files differ. diff --git a/doc/xr.pdf b/doc/xr.pdf Binary files differ. diff --git a/xr/DispatchAlgorithms/algorithm/algorithm b/xr/DispatchAlgorithms/algorithm/algorithm @@ -3,6 +3,7 @@ #include "ThreadsAndMutexes/thread/thread" #include "backendvector/backendvector" +#include "profiler/profiler" class Algorithm: public Thread { public: diff --git a/xr/DispatchAlgorithms/leastconn/target.cc b/xr/DispatchAlgorithms/leastconn/target.cc @@ -2,6 +2,9 @@ unsigned Leastconn::target(struct in_addr clientip, BackendVector const &targetlist) { + + PROFILE("Leastconn::target"); + bool found = false; unsigned nconn = 0, t = 0; diff --git a/xr/DispatchAlgorithms/roundrobin/target.cc b/xr/DispatchAlgorithms/roundrobin/target.cc @@ -2,9 +2,11 @@ unsigned Roundrobin::target(struct in_addr clientip, BackendVector const &targetlist) { - // No back ends? Don't even try. + // No back ends? Don't even try. One back end? Always that one. if (targetlist.size() == 0) throw static_cast<Error>("Round robin dispatcher: no backends\n"); + if (targetlist.size() == 1) + return (0); static int prev_run_index = -1; diff --git a/xr/Makefile b/xr/Makefile @@ -37,7 +37,7 @@ subdirs: $(BUILDDIR)/usage.h $(BUILDDIR)/status.xslt.h AUTHOR='$(AUTHOR)' MAINTAINER='$(MAINTAINER)' \ CONF_CC='$(CONF_CC)' CONF_LIB='$(CONF_LIB)' \ CONF_GETOPT=$(CONF_GETOPT) CONF_GETOPT_LONG=$(CONF_GETOPT_LONG) \ - CONF_INET_ATON=$(CONF_INET_ATON) CONF_OPTFLAGS=$(CONF_OPTFLAGS) \ + CONF_INET_ATON=$(CONF_INET_ATON) CONF_OPTFLAGS='$(CONF_OPTFLAGS)' \ CONF_STRNSTR=$(CONF_STRNSTR) \ $(MAKE) -C $$f -f $(BASE)/xr/etc/Makefile.class \ || exit 1; \ diff --git a/xr/ThreadsAndMutexes/mutex/lock.cc b/xr/ThreadsAndMutexes/mutex/lock.cc @@ -1,8 +1,11 @@ #include "mutex" +#include "profiler/profiler" std::map<void *, pthread_mutex_t> Mutex::s_lock; void Mutex::lock (void *target) { + PROFILE("Mutex::lock"); + plock(&s_lock); plock(target); unlock(&s_lock); diff --git a/xr/ThreadsAndMutexes/mutex/unlock.cc b/xr/ThreadsAndMutexes/mutex/unlock.cc @@ -1,7 +1,10 @@ #include "mutex" #include "error/error" +#include "profiler/profiler" void Mutex::unlock(void *target) { + PROFILE("Mutex::unlock"); + if (int res = pthread_mutex_unlock(&s_lock[target])) throw static_cast<Error>("Failed to release mutex lock: ") + strerror(res); diff --git a/xr/ThreadsAndMutexes/thread/start.cc b/xr/ThreadsAndMutexes/thread/start.cc @@ -1,4 +1,5 @@ #include "thread" +#include "profiler/profiler" static void *_run (void *data) { Thread *t = (Thread*) data; @@ -17,7 +18,8 @@ static void *_run (void *data) { } void Thread::start() { - + PROFILE ("Thread::start"); + if (config.foregroundmode()) _run ((void*)this); else { @@ -35,8 +37,9 @@ void Thread::start() { pthread_attr_destroy (&attr); return; } else if (res == EAGAIN) { - msg ("Failed to start thread: " + (string)strerror(res) + - ", retrying\n"); + if (config.verbose()) + msg ("Failed to start thread: " + (string)strerror(res) + + ", retrying\n"); sleep (1); continue; } else { diff --git a/xr/backend/available.cc b/xr/backend/available.cc @@ -1,6 +1,9 @@ #include "backend" +#include "profiler/profiler" bool Backend::available() const { + PROFILE("Backend::available"); + if (config.debug()) { ostringstream o; o << (islive ? "alive" : "dead") << ", " diff --git a/xr/backend/backend b/xr/backend/backend @@ -6,6 +6,7 @@ #include "fdset/fdset" #include "error/error" #include "ThreadsAndMutexes/mutex/mutex" +#include "profiler/profiler" using namespace std; diff --git a/xr/backend/connect.cc b/xr/backend/connect.cc @@ -1,6 +1,8 @@ #include "backend" bool Backend::connect() { + PROFILE("Backend::connect"); + // Assume the backend is dead islive = false; diff --git a/xr/backend/live.cc b/xr/backend/live.cc @@ -1,6 +1,8 @@ #include "backend" void Backend::live (bool state) { + PROFILE("Backend::live"); + Mutex::lock (&islive); bool oldstate = islive; islive = state; diff --git a/xr/backenddef/backenddef b/xr/backenddef/backenddef @@ -3,6 +3,7 @@ #include "../sys/sys" #include "../error/error" +#include "profiler/profiler" using namespace std; diff --git a/xr/backenddef/hostmatch.cc b/xr/backenddef/hostmatch.cc @@ -1,6 +1,8 @@ #include "backenddef" void BackendDef::hostmatch (string const &s) { + PROFILE("BackendDef::hostmatch"); + host_match = s; if (host_match == "") host_match = "."; diff --git a/xr/balancer/serve.cc b/xr/balancer/serve.cc @@ -45,17 +45,22 @@ void Balancer::serve() { msg ("Report requested\n"); reportmsg ("*** XR STATUS REPORT STARTS ***\n"); for (unsigned i = 0; i < nbackends(); i++) { - reportmsg ("Back end " + backend(i).description() + ":\n"); - reportmsg (" Status: " + backend(i).availablestr() + - ", " + backend(i).livestr() + "\n"); ostringstream o; - o << backend(i).connections() << " (max " - << backend(i).maxconn() << ")"; - reportmsg (" Connections: " + o.str() + "\n"); - ostringstream b; - b << backend(i).bytesserved() << " bytes, " - << backend(i).clientsserved() << " clients"; - reportmsg (" Served: " + b.str() + "\n"); + o << "Back end " << backend(i).description() << "\n"; + reportmsg(o.str()); + o.str(""); + o << " Status: " << backend(i).availablestr() + << ", " << backend(i).livestr() << "\n"; + reportmsg(o.str()); + o.str(""); + o << " Connections: " << backend(i).connections() + << " (max " << backend(i).maxconn() << ")\n"; + reportmsg(o.str()); + o.str(""); + o << " Served: " << backend(i).bytesserved() + << " bytes, " << backend(i).clientsserved() + << " clients\n"; + reportmsg(o.str()); } report (false); reportmsg ("*** XR STATUS REPORT ENDS ***\n"); @@ -83,7 +88,8 @@ void Balancer::serve() { // If there is an allow list, the client must match it. if (config.nallow()) { - debugmsg ("Matching " + clientip + " against allow list\n"); + if (config.debug()) + debugmsg ("Matching " + clientip + " against allow list\n"); bool allowed = false; for (unsigned n = 0; n < config.nallow(); n++) { if (ipmatch (clname.sin_addr, config.allow(n))) { @@ -99,7 +105,8 @@ void Balancer::serve() { } } // If the client is in the deny list, deny it. - debugmsg ("Matching " + clientip + " against deny list\n"); + if (config.debug()) + debugmsg ("Matching " + clientip + " against deny list\n"); bool denied = false; for (unsigned n = 0; n < config.ndeny(); n++) if (ipmatch (clname.sin_addr, config.deny(n))) { diff --git a/xr/buffer/add.cc b/xr/buffer/add.cc @@ -1,12 +0,0 @@ -#include "buffer" - -void Buffer::add (char const *b, unsigned len) { - if (!buf_data) - set (b, len); - else { - if (! (buf_data = (char*)realloc (buf_data, buf_len + len)) ) - throw static_cast<Error>("Memory fault in Buffer::add"); - memcpy (buf_data + buf_len, b, len); - buf_len += len; - } -} diff --git a/xr/buffer/buffer1.cc b/xr/buffer/buffer1.cc @@ -1,4 +0,0 @@ -#include "buffer" - -Buffer::Buffer(): buf_data(0), buf_len(0) { -} diff --git a/xr/buffer/buffer2.cc b/xr/buffer/buffer2.cc @@ -1,5 +0,0 @@ -#include "buffer" - -Buffer::Buffer (Buffer const &other): buf_data(0), buf_len(0){ - copy(other); -} diff --git a/xr/buffer/buffer3.cc b/xr/buffer/buffer3.cc @@ -1,5 +0,0 @@ -#include "buffer" - -Buffer::~Buffer() { - destroy(); -} diff --git a/xr/buffer/copy.cc b/xr/buffer/copy.cc @@ -1,13 +0,0 @@ -#include "buffer" - -void Buffer::copy (Buffer const &other) { - if (! other.buf_len) { - buf_data = 0; - buf_len = 0; - } else { - if (! (buf_data = (char*)malloc (other.buf_len)) ) - throw static_cast<Error>("Memory fault in Buffer::copy"); - memcpy (buf_data, other.buf_data, other.buf_len); - buf_len = other.buf_len; - } -} diff --git a/xr/buffer/data.cc b/xr/buffer/data.cc @@ -1,5 +0,0 @@ -#include "buffer" - -char const *Buffer::data() const { - return (buf_data); -} diff --git a/xr/buffer/destroy.cc b/xr/buffer/destroy.cc @@ -1,7 +0,0 @@ -#include "buffer" - -void Buffer::destroy() { - free (buf_data); - buf_data = 0; - buf_len = 0; -} diff --git a/xr/buffer/insertat1.cc b/xr/buffer/insertat1.cc @@ -1,5 +0,0 @@ -#include "buffer" - -void Buffer::insertat (unsigned index, string s) { - insertat (index, s.c_str(), s.size()); -} diff --git a/xr/buffer/insertat2.cc b/xr/buffer/insertat2.cc @@ -1,13 +0,0 @@ -#include "buffer" - -void Buffer::insertat (unsigned index, char const *s, unsigned len) { - if (index >= buf_len) - return; - if (! (buf_data = (char*)realloc (buf_data, buf_len + len)) ) - throw static_cast<Error>("Memory fault in Buffer::insertat"); - memmove (buf_data + index + len, - buf_data + index, - buf_len - index); - memcpy (buf_data + index, s, len); - buf_len += len; -} diff --git a/xr/buffer/oparray.cc b/xr/buffer/oparray.cc @@ -1,9 +0,0 @@ -#include "buffer" - -char &Buffer::operator[] (unsigned index) { - static char dummy = 0; - - if (index >= buf_len) - return (dummy); - return (buf_data[index]); -} diff --git a/xr/buffer/opequal.cc b/xr/buffer/opequal.cc @@ -1,9 +0,0 @@ -#include "buffer" - -Buffer &Buffer::operator= (Buffer const &other) { - if (this != &other) { - destroy(); - copy (other); - } - return (*this); -} diff --git a/xr/buffer/removeat.cc b/xr/buffer/removeat.cc @@ -1,14 +0,0 @@ -#include "buffer" - -void Buffer::removeat (unsigned index, unsigned len) { - if (!buf_data || index >= buf_len) - return; - if (index + len >= buf_len) { - buf_len = index; - return; - } - memcpy (buf_data + index, buf_data + index + len, - buf_len - index - len); - buf_len -= len; -} - diff --git a/xr/buffer/set.cc b/xr/buffer/set.cc @@ -1,9 +0,0 @@ -#include "buffer" - -void Buffer::set (char const *b, unsigned len) { - destroy(); - if (! (buf_data = (char*)malloc (len)) ) - throw static_cast<Error>("Memory fault in Buffer::set"); - memcpy (buf_data, b, len); - buf_len = len; -} diff --git a/xr/buffer/strfind.cc b/xr/buffer/strfind.cc @@ -1,9 +0,0 @@ -#include "buffer" - -int Buffer::strfind(char const *s) const { - char *cp; - - if ( (cp = strnstr(buf_data, s, buf_len)) ) - return (cp - buf_data); - return (-1); -} diff --git a/xr/buffer/stringat.cc b/xr/buffer/stringat.cc @@ -1,14 +0,0 @@ -#include "buffer" - -string Buffer::stringat (unsigned index, unsigned len) const { - string ret; - - for (unsigned int i = index; i < index + len; i++) { - char ch = charat(i); - if (!ch) - break; - ret += ch; - } - - return (ret); -} diff --git a/xr/etc/Makefile.class b/xr/etc/Makefile.class @@ -7,7 +7,7 @@ class-compile: $(OBJ) $(BASE)/xr/$(BUILDDIR)/$(DIR)_%.o: %.cc @echo "Compiling: " `pwd` $< - @$(CONF_CC) $(PROF) $(CONF_OPTFLAGS) \ + @$(CONF_CC) $(PROF) $(PROFILER) $(CONF_OPTFLAGS) \ -DVER='"$(VER)"' -DAUTHOR='"$(AUTHOR)"' \ -DMAINTAINER='"$(MAINTAINER)"' -DSYS='"$(SYS)"' -D$(SYS) \ -DCONF_CC='"$(CONF_CC)"' -DCONF_LIB='"$(CONF_LIB)"' \ diff --git a/xr/etc/usage.txt b/xr/etc/usage.txt @@ -51,7 +51,7 @@ may not exist on your platform): must reply with a back end number (0..max) on stdout. -f, --foreground Suppresses forking/threading, only for debugging. Also suppresses - wakeup (-w) and checkup (-c) threads. + wakeups (-w), checkups (-c) and the webinterface (-W). -h, -?, --help This text. -H HDR, --add-server-header HDR @@ -75,16 +75,16 @@ may not exist on your platform): Stops the balancer after REQUESTS hits. For debugging / loadtesting. -r, --soft-maxconnrate MAXCONS Sets the "SOFT" maximum average number of connections per IP allowed - within a given time period (see --time-interval). If a particular IP - exceeds this number, then their connection is deferred (see - --defer-time). Defualt is 0 (disabled). + within a given time period (see -U, --time-interval). If a + particular IP exceeds this number, then their connection is + deferred (see -u, --defer-time). Default is 0 (disabled). -R, --hard-maxconnrate MAXCONS Sets the "HARD" maximum average number of connections per IP allowed - within a given time period (see --time-interval). If a particular IP - exceeds this number, then their connection is immediately closed. - Default is 0 (disabled). If both the "soft" and "hard" rates are set, - and the "hard" rate is lower than the "soft" rate, then only the "hard" - rate is obeyed. + within a given time period (see -U, --time-interval). If a + particular IP exceeds this number, then their connection is + immediately closed. Default is 0 (disabled). If both the + "soft" and "hard" rates are set, and the "hard" rate is lower + than the "soft" rate, then only the "hard" rate is obeyed. -S, --sticky-http Enables sticky HTTP sessions by injecting XRTarget cookies into HTTP streams. Only effective with -s http:.... @@ -108,7 +108,7 @@ may not exist on your platform): option allows you to specify the time period to which those numbers of connections apply. For example, "-r 200 -U 60" would trigger the "soft" limit on any IP attempting more than 200 connections in any 60 second - period. Default is 1. + period. Default is 1 (second). -v, --verbose Increases verbosity, default is silent operation. -V, --version diff --git a/xr/etc/xrprof b/xr/etc/xrprof @@ -0,0 +1,34 @@ +#!/usr/bin/perl + +use strict; + +open (my $if, "/tmp/xr-prof.txt") + or die ("Cannot read /tmp/xr-prof.txt: $!\n"); + +my %info; + +while (my $line = <$if>) { + chomp($line); + my (undef, undef, $name, $duration) = split (/\s+/, $line); + # print ("$name: $duration\n"); + if ($info{$name}) { + my ($calls, $total) = @{ $info{$name} }; + $calls++; + $total += $duration; + $info{$name} = [ $calls, $total ]; + } else { + $info{$name} = [ 1, $duration ]; + } +} + +for my $k (sort bytotal keys(%info)) { + my ($calls, $total) = @{ $info{$k} }; + print ("$k: $total usec\n"); +} + +sub bytotal { + my @aa = @{ $info{$a} }; + my @bb = @{ $info{$b} }; + return ($bb[1] <=> $aa[1]); +} + diff --git a/xr/fdset/readable.cc b/xr/fdset/readable.cc @@ -1,6 +1,8 @@ #include "fdset" int Fdset::readable() const { + PROFILE("Fdset::readable"); + fd_set readset, exceptset; struct timeval tv, *tvp; diff --git a/xr/fdset/readwriteable.cc b/xr/fdset/readwriteable.cc @@ -1,6 +1,8 @@ #include "fdset" int Fdset::readwriteable() const { + PROFILE("Fdset::readwriteable"); + fd_set readset, writeset, exceptset; struct timeval tv, *tvp; diff --git a/xr/fdset/writeable.cc b/xr/fdset/writeable.cc @@ -1,6 +1,8 @@ #include "fdset" int Fdset::writeable() const { + PROFILE("Fdset::writeable"); + fd_set writeset, exceptset; struct timeval tv, *tvp; diff --git a/xr/httpbuffer/addheader.cc b/xr/httpbuffer/addheader.cc @@ -1,6 +1,8 @@ #include "httpbuffer" void Httpbuffer::addheader (string var, string val) { + PROFILE("Httpbuffer::addheader(string,string)"); + string old = headerval(var); if (old.size()) { old += ", " + val; diff --git a/xr/httpbuffer/addheader1.cc b/xr/httpbuffer/addheader1.cc @@ -1,6 +1,8 @@ #include "httpbuffer" void Httpbuffer::addheader (string h) { + PROFILE("Httpbuffer::addheader(string)"); + unsigned i; for (i = 0; i < h.size(); i++) if (h[i] == ':') { diff --git a/xr/httpbuffer/bodyreceived.cc b/xr/httpbuffer/bodyreceived.cc @@ -1,46 +0,0 @@ -#include "httpbuffer" - -static bool ishex (char ch) { - return ( (ch >= '0' && ch <= '9') || - (ch >= 'a' && ch <= 'f') || - (ch >= 'A' && ch <= 'F') ); -} - -Httpbuffer::Bodystatus Httpbuffer::bodyreceived() { - // No headers? Not done. - if (!headersdone()) - return (b_is_not_received); - - // If we have a content length, the body must match it. - string str = headerval("Content-Length"); - unsigned clen; - if (sscanf (str.c_str(), "%d", &clen) > 0) { - if (size() - bodystart >= clen) - return (b_is_received); - else - return (b_is_not_received); - } - - // If we have chunks, then the last chunk must be zero. - str = headerval ("Transfer-Encoding"); - if (!strcasecmp (str.c_str(), "chunked")) { - for (unsigned i = bodystart; i < size(); /* increment below */ ) { - unsigned chunk; - if (sscanf (data() + i, "%x", &chunk) < 1) - return (b_is_not_received); - if (!chunk) - return (b_is_received); - while (ishex(charat(i)) && i < size()) - i++; - if (charat(i) == '\r') - i++; - if (charat(i) == '\n') - i++; - i += chunk; - } - } - - // No content length, no chunks... We just dunno. - return (b_unknown); -} - diff --git a/xr/httpbuffer/cookievalue.cc b/xr/httpbuffer/cookievalue.cc @@ -1,6 +1,8 @@ #include "httpbuffer" string Httpbuffer::cookievalue (string c) { + PROFILE("Httpheader::cookievalue"); + string cval = headerval ("Cookie"); size_t pos; string ret = ""; diff --git a/xr/httpbuffer/findheader.cc b/xr/httpbuffer/findheader.cc @@ -1,25 +1,22 @@ #include "httpbuffer" unsigned Httpbuffer::findheader(string h) { - if (!headersdone()) - return (0); + PROFILE("Httpbuffer::findheader"); - for (unsigned int i = 0; i < bodystart - h.size(); ) { + if (!headersreceived()) + return (0); + + for (unsigned int i = 1; i < bufsz(); /* incremented below */) { // At header? - if (stringat(i, h.size()) == h) - return (i); - // No, advance beyond next \n - while (1) { - char ch = charat(i); - if (!ch) - return (0); - if (ch == '\n') { - i++; - break; - } - i++; - } + if (!strncmp(bufdata() + i, h.c_str(), h.size())) + return i; + + // No, advance beyond \n or fail + if (! (i = charfind('\n', i)) ) + return 0; + i++; } - return (0); + // Avoiding warnings: + return 0; } diff --git a/xr/httpbuffer/firstline.cc b/xr/httpbuffer/firstline.cc @@ -1,10 +1,12 @@ #include "httpbuffer" string &Httpbuffer::firstline() { + PROFILE("Httpbuffer::firstline"); + if (first_line.size()) return (first_line); - for (unsigned i = 0; i < size(); i++) { + for (unsigned i = 0; i < bufsz(); i++) { char ch = charat(i); if (ch == '\n' || ch == '\r') break; diff --git a/xr/httpbuffer/headersdone.cc b/xr/httpbuffer/headersdone.cc @@ -1,41 +0,0 @@ -#include "httpbuffer" - -bool Httpbuffer::headersdone() { - if (bodystart) - return (true); - -#ifdef OLDCODE - // This attempts to find \r\n\r\n in the received blob which - // marks the end of headers. It can also be \n\n for clients - // that aren't too strict. - // [KK 2008-10-13] This hoses charat() all the time - attempt - // to optimize, see below - bool prevnl = false; - for (unsigned i = 0; i < size(); i++) { - if (charat(i) == '\n') { - if (prevnl) { - bodystart = i + 1; - return (true); - } - prevnl = true; - } else { - if (charat(i) != '\r') - prevnl = false; - } - } - - return (false); -#endif - - // [KK 2008-10-13] Optimization of the above (?) - int off; - if ( (off = strfind("\r\n\r\n")) >= 0 ) { - bodystart = off + 4; - return (true); - } - if ( (off = strfind("\n\n")) >= 0 ) { - bodystart = off + 2; - return (true); - } - return (false); -} diff --git a/xr/httpbuffer/headersreceived.cc b/xr/httpbuffer/headersreceived.cc @@ -0,0 +1,19 @@ +#include "httpbuffer" + +bool Httpbuffer::headersreceived() { + PROFILE("Httpbuffer::headersreceived"); + + if (bodystart) + return (true); + + unsigned off; + if ( (off = strfind("\r\n\r\n")) > 0 ) { + bodystart = off + 4; + return (true); + } + if ( (off = strfind("\n\n")) > 0 ) { + bodystart = off + 2; + return (true); + } + return (false); +} diff --git a/xr/httpbuffer/headerval.cc b/xr/httpbuffer/headerval.cc @@ -1,25 +1,27 @@ #include "httpbuffer" string Httpbuffer::headerval (string var) { + PROFILE("Httpbuffer::headerval"); + string ret; - if (!headersdone()) + if (!headersreceived()) return (""); if (var[var.size() - 1] != ':') var += ":"; unsigned int start; - if (! (start = findheader(var)) ) - ret = ""; - else { - start += var.size(); - for (char ch = charat(start); ch && isspace(ch); ch = charat(++start)) - ; - for (char ch = charat(start); ch && ch != '\r' && ch != '\n'; - ch = charat(++start)) - ret += ch; - } + if ( (!(start = strfind(var.c_str()))) || + (start >= bodystart) ) + return (""); + + start += var.size(); + for (char ch = charat(start); ch && isspace(ch); ch = charat(++start)) + ; + for (char ch = charat(start); ch && ch != '\r' && ch != '\n'; + ch = charat(++start)) + ret += ch; debugmsg ("Header " + var + " '" + ret + "'\n"); return (ret); diff --git a/xr/httpbuffer/httpbuffer b/xr/httpbuffer/httpbuffer @@ -1,29 +1,27 @@ #ifndef _HTTPBUFFER_ #define _HTTPBUFFER_ -#include "../sys/sys" -#include "../buffer/buffer" -#include "../config/config" +#include "sys/sys" +#include "netbuffer/netbuffer" +#include "config/config" +#include "profiler/profiler" -class Httpbuffer: public Buffer { +class Httpbuffer: public Netbuffer { public: - enum Bodystatus { - b_is_received, - b_is_not_received, - b_unknown, - }; + + // Recognized request methods. Modify requestmethod.cc to add more. enum RequestMethod { m_get, - // More can be added ad lib, modify requestmethod.cc if you do m_other, }; Httpbuffer(); - bool headersdone(); - string headerval (string var); - Bodystatus bodyreceived(); + + bool headersreceived(); + + string headerval (string var); string &firstline(); - bool setversion(string v); + bool setversion(char v); void setheader (string var, string val); void addheader (string var, string val); void addheader (string h); @@ -31,7 +29,7 @@ public: RequestMethod requestmethod(); string requesturi(); -private: +private: unsigned findheader (string h); unsigned bodystart; string first_line; diff --git a/xr/httpbuffer/httpbuffer1.cc b/xr/httpbuffer/httpbuffer1.cc @@ -1,4 +1,5 @@ #include "httpbuffer" Httpbuffer::Httpbuffer(): bodystart(0), first_line("") { + PROFILE("Httpbuffer::Httpbuffer"); } diff --git a/xr/httpbuffer/requestmethod.cc b/xr/httpbuffer/requestmethod.cc @@ -1,12 +1,9 @@ #include "httpbuffer" Httpbuffer::RequestMethod Httpbuffer::requestmethod() { - string first = firstline(); - debugmsg ("First line of http buffer: '" + first + "'\n"); + PROFILE("Httpheader::requestmethod"); - // GCC 3.x doesn't support string.compare(start,len,otherstring) - // so let's just get a substring and compare to the target - if (first.substr(0, 3) == static_cast<string>("GET")) + if (bufsz() >= 3 && !strncmp(bufdata(), "GET", 3)) return (m_get); return (m_other); diff --git a/xr/httpbuffer/requesturi.cc b/xr/httpbuffer/requesturi.cc @@ -1,6 +1,8 @@ #include "httpbuffer" string Httpbuffer::requesturi() { + PROFILE("Httpbuffer:requesturi"); + vector<string> parts = str2parts (firstline(), ' '); return (parts.size() >= 2 ? parts[1] : ""); } diff --git a/xr/httpbuffer/setheader.cc b/xr/httpbuffer/setheader.cc @@ -1,26 +1,20 @@ #include "httpbuffer" void Httpbuffer::setheader (string var, string val) { + PROFILE("Httpbuffer::setheader"); + if (!bodystart) return; if (var[var.size() - 1] != ':') var += ':'; - + + // Find position beyond first \n unsigned i; - if (! (i = findheader (var)) ) { - while (charat(i) && charat(i) != '\n') - i++; - i++; - insertat (i, var + " " + val + "\r\n"); - } else { - i += var.size(); - while (isspace(charat(i))) - i++; - unsigned len = i; - while (charat(len) && charat(len) != '\r' && charat(len) != '\n') - len++; - len -= i; - removeat (i, len); - insertat (i, val); - } + if (! (i = charfind('\n')) ) + return; + + // Poke in the header. + string h; + h = var + ' ' + val + "\r\n"; + insertat(i + 1, h.c_str()); } diff --git a/xr/httpbuffer/setversion.cc b/xr/httpbuffer/setversion.cc @@ -1,20 +1,17 @@ #include "httpbuffer" -bool Httpbuffer::setversion (string v) { - if (!headersdone()) - return (false); +bool Httpbuffer::setversion (char v) { + PROFILE("Httpbuffer::setversion"); - // Find "HTTP/" in the first line. Add the version there. - for (unsigned int i = 0; ; i++) { - if (charat(i) == '\r' || charat(i) == '\n') - return (false); - if (stringat(i, 5) == "HTTP/") { - i ++; - while (charat(i) != '\n' && charat(i) != '\r') - removeat(i); - insertat(i, v); - } - return (true); - } - return (false); + // No headers? Nothing to do. + if (!headersreceived()) + return false; + + // First line must start with HTTP/1.X. + // Poke in the version right after that. + if (bufsz() < 8 || strncmp(bufdata(), "HTTP/1.", 7)) + return false; + + // Poke in the new version. + return setchar(7, v); } diff --git a/xr/httpdispatcher/dispatch.cc b/xr/httpdispatcher/dispatch.cc @@ -1,6 +1,8 @@ #include "httpdispatcher" void HttpDispatcher::dispatch() { + PROFILE("HttpDispatcher::dispatch"); + unsigned stickytarget; string host_header = ""; @@ -11,9 +13,13 @@ void HttpDispatcher::dispatch() { // Get the client's request. May need for cookie inspection or for the // host header. - if (!getclientrequest()) - throw static_cast<Error>("Didn't receive a valid " - "client request.\n"); + while (!buf.headersreceived()) + if (!buf.netread(clientfd(), config.client_timeout())) + throw static_cast<Error>("Didn't receive a valid " + "client request.\n"); + if (config.verbose()) + msg ("Received client request: '" + buf.firstline() + + "'\n"); // See if hostmatching is used. This is true when a backend // matches against a non-dot host. @@ -25,8 +31,10 @@ void HttpDispatcher::dispatch() { } // Build new target list if host matching applies. if (hostmatchused) { - host_header = clientrequest.headerval ("Host"); - msg ("Will try to dispatch request host '" + host_header + "'\n"); + host_header = buf.headerval ("Host"); + if (config.verbose()) + msg ("Will try to dispatch request host '" + + host_header + "'\n"); // We need to build tcpdispatcher's target list now! // Construct locally and poke into TcpDispatcher. @@ -38,8 +46,9 @@ void HttpDispatcher::dispatch() { (!regexec (&(balancer.backend(i).hostregex()), host_header.c_str(), 0, 0, 0)) ) { v.add(i); - msg (" Candidate target: " + - balancer.backend(i).description() + "\n"); + if (config.verbose()) + msg (" Candidate target: " + + balancer.backend(i).description() + "\n"); } } targetlist(v); @@ -48,7 +57,7 @@ void HttpDispatcher::dispatch() { // Dispatch as a normal backend if sticky HTTP is off, or if the // sticky target is badly specified. if (!config.stickyhttp() || - sscanf (clientrequest.cookievalue ("XRTarget").c_str(), + sscanf (buf.cookievalue ("XRTarget").c_str(), "%d", &stickytarget) < 1 || stickytarget >= balancer.nbackends()) { issticky(false); @@ -58,11 +67,13 @@ void HttpDispatcher::dispatch() { // to non-sticky dispatching. targetbackend(stickytarget); Backend tb = balancer.backend(stickytarget); - msg ("Sticky HTTP request for " + tb.description() + "\n"); + if (config.verbose()) + msg ("Sticky HTTP request for " + tb.description() + "\n"); if (! tb.connect()) { balancer.backend(stickytarget).live(false); - msg ("Failed to connect to back end " + tb.description() + - ", trying to dispatch to other\n"); + if (config.verbose()) + msg ("Failed to connect to back end " + tb.description() + + ", trying to dispatch to other\n"); issticky(false); TcpDispatcher::dispatch(); } else { diff --git a/xr/httpdispatcher/getclientrequest.cc b/xr/httpdispatcher/getclientrequest.cc @@ -1,16 +0,0 @@ -#include "httpdispatcher" - -bool HttpDispatcher::getclientrequest() { - do { - Fdset set (config.client_timeout()); - set.add (clientfd()); - int s; - if ( (s = set.readable()) < 0 || !readchunk(clientfd()) ) - return (false); - clientrequest.add (databuf(), databufsize()); - } while (clientrequest.bodyreceived() == Httpbuffer::b_is_not_received); - - msg ("Received client request: '" + clientrequest.firstline() + "'\n"); - return (true); -} - diff --git a/xr/httpdispatcher/getserverresponse.cc b/xr/httpdispatcher/getserverresponse.cc @@ -1,17 +0,0 @@ -#include "httpdispatcher" - -bool HttpDispatcher::getserverresponse() { - msg ("Reading back end response.\n"); - - do { - Fdset set (config.backend_timeout()); - set.add (backendfd()); - int s; - if ( (s = set.readable() < 0) || !readchunk(backendfd()) ) - return (false); - serverresponse.add (databuf(), databufsize()); - } while (! serverresponse.headersdone()); - - msg ("Received back end response: '" + serverresponse.firstline() + "'\n"); - return (true); -} diff --git a/xr/httpdispatcher/handle.cc b/xr/httpdispatcher/handle.cc @@ -1,52 +1,90 @@ #include "httpdispatcher" void HttpDispatcher::handle() { + PROFILE("HttpDispatcher::handle"); // The client request was already retrieved before starting the // dispatcher. We can continue by applying server-directed headers. - clientrequest.setversion("1.0"); - clientrequest.setheader ("Connection", "close"); - clientrequest.setheader ("Proxy-Connection", "close"); + debugmsg("Applying server-directed headers to client request\n"); + buf.setversion('0'); + buf.setheader ("Connection", "close"); + buf.setheader ("Proxy-Connection", "close"); // Apply other server-side directed info if (config.addxrversion()) - clientrequest.setheader ("XR", VER); + buf.setheader ("XR", VER); if (config.addxforwardedfor()) - clientrequest.addheader ("X-Forwarded-For", - string(inet_ntoa(clientip()))); + buf.addheader ("X-Forwarded-For", string(inet_ntoa(clientip()))); for (unsigned n = 0; n < config.nserverheaders(); n++) - clientrequest.addheader (config.serverheader(n)); - - // Send to server - try { - sendclientrequest(); - } catch (Error const &e) { - msg (static_cast<string>(e.what()) + "\n"); - senderrorpage(); - return; - } + buf.addheader (config.serverheader(n)); - // Get server's response - if (!getserverresponse()) { - msg ("Failed to get server response\n"); - senderrorpage(); - return; - } + // Flush client info received so far to the back end. + debugmsg("Sending client request to back end\n"); + buf.netwrite(backendfd(), config.backend_timeout()); - // Apply client-side directed info. Smuggle in stickiness if required. - if (config.addxrversion()) - serverresponse.setheader ("XR", VER); - if (config.stickyhttp() && !issticky()) { - ostringstream o; - o << targetbackend(); - serverresponse.setheader ("Set-Cookie", "XRTarget=" + o.str()); - } + // Let's see if we need to modify the server headers. + bool modify_serverheaders = false; + if (config.addxrversion() || + (config.stickyhttp() && !issticky())) + modify_serverheaders = true; - // Flush server buffer to the client - fdwrite (clientfd(), config.client_timeout(), - serverresponse.data(), serverresponse.size()); - balancer.backend(targetbackend()).addbytes(serverresponse.size()); + /* + ostringstream o; + o << "stickyhttp:" << config.stickyhttp() << " issticky:" + << issticky() << " modifyheaders:" << modify_serverheaders << "\n"; + debugmsg(o.str()); + */ - // Then switch to the TCP copy/thru protocol. - TcpDispatcher::handle(); + // Go into copy-thru mode. If required, catch the server headers on + // their first appearance and modify them. + while (1) { + Fdset readset (config.client_timeout()); + readset.add(clientfd()); + readset.add(backendfd()); + + int sock; + if ((sock = readset.readable()) < 0) + break; + + buf.reset(); + + if (!buf.netread(sock)) + break; + + if (sock == backendfd() && modify_serverheaders) { + debugmsg("First back end request seen, applying modifications\n"); + modify_serverheaders = false; + while (! buf.headersreceived()) + if (!buf.netread (sock, config.backend_timeout())) + throw static_cast<Error> + ("Failed to get headers from back end"); + if (config.addxrversion()) + buf.setheader("XR", VER); + if (config.stickyhttp() && !issticky()) { + ostringstream o; + o << "XRTarget=" << targetbackend(); + buf.setheader("Set-Cookie", o.str()); + } + } + + // Flush info to the other connected side. + int othersock, timeout; + if (sock == clientfd()) { + othersock = backendfd(); + timeout = config.backend_timeout(); + } else { + othersock = clientfd(); + timeout = config.client_timeout(); + } + + if (config.debug()) { + ostringstream o; + o << "Had data on " << sock << ", sending to " << othersock << "\n"; + debugmsg (o.str()); + } + + buf.netwrite(othersock, timeout); + if (sock == backendfd()) + balancer.backend(targetbackend()).addbytes(buf.bufsz()); + } } diff --git a/xr/httpdispatcher/httpdispatcher b/xr/httpdispatcher/httpdispatcher @@ -15,14 +15,9 @@ public: void issticky (bool s) { is_sticky = s; } private: -private: - bool getclientrequest(); - void sendclientrequest(); - bool getserverresponse(); - void sendserverresponse(); void senderrorpage(); - Httpbuffer clientrequest, serverresponse; + Httpbuffer buf; bool is_sticky; }; diff --git a/xr/httpdispatcher/httpdispatcher1.cc b/xr/httpdispatcher/httpdispatcher1.cc @@ -1,5 +1,5 @@ #include "httpdispatcher" HttpDispatcher::HttpDispatcher (int fd, struct in_addr ip) : - TcpDispatcher (fd, ip), clientrequest(), serverresponse() { + TcpDispatcher (fd, ip) { } diff --git a/xr/httpdispatcher/sendclientrequest.cc b/xr/httpdispatcher/sendclientrequest.cc @@ -1,9 +0,0 @@ -#include "httpdispatcher" - -void HttpDispatcher::sendclientrequest() { - msg ("Sending client's request '" + clientrequest.firstline() + - "' to back end\n"); - fdwrite (backendfd(), config.backend_timeout(), - clientrequest.data(), clientrequest.size()); - balancer.backend(targetbackend()).addbytes(clientrequest.size()); -} diff --git a/xr/httpdispatcher/senderrorpage.cc b/xr/httpdispatcher/senderrorpage.cc @@ -6,10 +6,28 @@ "\r\n" void HttpDispatcher::senderrorpage() { + PROFILE("HttpDispatcher::senderrorpage"); + msg ("Sending error page to client.\n"); try { + string txt = + "<html>\n" + " <head>\n" + " <title>Internal Server Error</title>\n" + " </head>\n" + " <body>\n" + " <h1>Internal Server Error</h1>\n" + " You request could not be completed. Please retry later.\n" + " </body>\n" + "</html>\n"; + ostringstream msg; + msg << + "HTTP/1.0 502 Internal Server Error\r\n" + "Content-Length: " << txt.size() << "\r\n" + "\r\n" << + txt; fdwrite (clientfd(), config.client_timeout(), - ERRORSTR, sizeof(ERRORSTR)); + msg.str().c_str(), msg.str().size()); } catch (Error const &e) { cerr << e.what() << " (while sending error page)\n"; } diff --git a/xr/httpdispatcher/sendserverresponse.cc b/xr/httpdispatcher/sendserverresponse.cc @@ -1,9 +0,0 @@ -#include "httpdispatcher" - -void HttpDispatcher::sendserverresponse() { - msg ("Sending server response '" + serverresponse.firstline() + - "' to client.\n"); - fdwrite (clientfd(), config.client_timeout(), - serverresponse.data(), serverresponse.size()); - balancer.backend(targetbackend()).addbytes(clientrequest.size()); -} diff --git a/xr/netbuffer/charat.cc b/xr/netbuffer/charat.cc @@ -0,0 +1,9 @@ +#include "netbuffer" + +char Netbuffer::charat(unsigned index) const { + PROFILE("Netbuffer::charat"); + + if (index >= buf_sz) + return (0); + return buf_data[index]; +} diff --git a/xr/netbuffer/charfind.cc b/xr/netbuffer/charfind.cc @@ -0,0 +1,13 @@ +#include "netbuffer" + +unsigned Netbuffer::charfind(char ch, unsigned start) const { + PROFILE("Netbuffer::charfind"); + + if (buf_sz <= start) + return 0; + + char *cp = (char*)memchr(buf_data + start, ch, buf_sz); + if (!cp) + return (0); + return (cp - buf_data); +} diff --git a/xr/netbuffer/checkspace.cc b/xr/netbuffer/checkspace.cc @@ -0,0 +1,23 @@ +#include "netbuffer" + +void Netbuffer::check_space(unsigned extra) { + PROFILE("Netbuffer::check_space"); + + if (!buf_alloced) { + buf_alloced = extra; + buf_data = (char*)malloc(buf_alloced); + if (! buf_data) + throw static_cast<Error>("Memory fault in Netbuffer::check_space"); + } else if (buf_sz + extra > buf_alloced) { + if (config.verbose()) { + ostringstream o; + o << "Reallocating net buffer from " << buf_alloced + << " to " << buf_alloced + extra << " bytes\n"; + msg (o.str()); + } + buf_alloced += extra; + buf_data = (char*)realloc(buf_data, buf_alloced); + if (! buf_data) + throw static_cast<Error>("Memory fault in Netbuffer::check_space"); + } +} diff --git a/xr/netbuffer/copy.cc b/xr/netbuffer/copy.cc @@ -0,0 +1,9 @@ +#include "netbuffer" + +void Netbuffer::copy (Netbuffer const &other) { + buf_sz = other.buf_sz; + buf_alloced = other.buf_alloced; + buf_data = new char[buf_sz]; + + memcpy (buf_data, other.buf_data, buf_sz); +} diff --git a/xr/netbuffer/destroy.cc b/xr/netbuffer/destroy.cc @@ -0,0 +1,8 @@ +#include "netbuffer" + +void Netbuffer::destroy() { + delete buf_data; + buf_data = 0; + buf_sz = 0; + buf_alloced = 0; +} diff --git a/xr/netbuffer/insertat.cc b/xr/netbuffer/insertat.cc @@ -0,0 +1,18 @@ +#include "netbuffer" + +bool Netbuffer::insertat(unsigned index, char const *s, unsigned len) { + PROFILE("Netbuffer::insertat"); + + if (!len) + len = strlen(s); + + if (index >= buf_sz) + return false; + check_space(len); + memmove (buf_data + index + len, buf_data + index, buf_sz - index); + memcpy (buf_data + index, s, len); + buf_sz += len; + + return true; +} + diff --git a/xr/netbuffer/netbuffer b/xr/netbuffer/netbuffer @@ -0,0 +1,51 @@ +#ifndef _NETBUFFER_ +#define _NETBUFFER_ + +#include "sys/sys" +#include "error/error" +#include "config/config" +#include "profiler/profiler" +#include "fdset/fdset" + +class Netbuffer { +public: + Netbuffer(); + Netbuffer (Netbuffer const &other); + virtual ~Netbuffer(); + Netbuffer const &operator= (Netbuffer const &other); + + char charat(unsigned index) const; + char operator[] (unsigned index) { return charat(index); } + + char const *bufdata() const { return buf_data; } + unsigned bufsz() const { return buf_sz; } + + unsigned netread (int fd, int timeout = 0); + unsigned netwrite (int fd, int timeout) const; + + unsigned strfind (char const *s) const; + unsigned charfind (char ch, unsigned start = 0) const; + + bool setchar(unsigned offset, char ch); + + string stringat(unsigned index, unsigned len); + + bool insertat(unsigned index, char const *s, unsigned len = 0); + bool removeat(unsigned index, unsigned len = 1); + + void reset(); + + +private: + void copy (Netbuffer const &other); + void destroy(); + + void check_space(unsigned extra); + string printable(char c) const; + + char *buf_data; + unsigned buf_sz; + unsigned buf_alloced; +}; + +#endif diff --git a/xr/netbuffer/netbuffer1.cc b/xr/netbuffer/netbuffer1.cc @@ -0,0 +1,4 @@ +#include "netbuffer" + +Netbuffer::Netbuffer(): buf_data(0), buf_sz(0), buf_alloced(0) { +} diff --git a/xr/netbuffer/netbuffer2.cc b/xr/netbuffer/netbuffer2.cc @@ -0,0 +1,5 @@ +#include "netbuffer" + +Netbuffer::Netbuffer (Netbuffer const &other) { + copy (other); +} diff --git a/xr/netbuffer/netbuffer3.cc b/xr/netbuffer/netbuffer3.cc @@ -0,0 +1,5 @@ +#include "netbuffer" + +Netbuffer::~Netbuffer() { + destroy(); +} diff --git a/xr/netbuffer/netread.cc b/xr/netbuffer/netread.cc @@ -0,0 +1,31 @@ +#include "netbuffer" + +unsigned Netbuffer::netread (int fd, int timeout) { + PROFILE("Netbuffer::netread"); + + if (timeout) { + Fdset set(timeout); + set.add(fd); + if (set.readable() != fd) + throw static_cast<Error>("Fd ") + fd + + " failed to become readable within " + timeout + "sec"; + } + + check_space(config.buffersize()); + + ssize_t nread = read (fd, buf_data + buf_sz, config.buffersize()); + if (nread < 0) + throw static_cast<Error>("Read failed on fd ") + fd; + buf_sz += nread; + + if (config.debug() && nread) { + ostringstream o; + o << "Got " << nread << " bytes from fd " << fd << ": "; + for (unsigned i = 0; i < (unsigned)nread; i++) + o << printable(buf_data[i]); + o << "\n"; + debugmsg (o.str()); + } + + return nread; +} diff --git a/xr/netbuffer/netwrite.cc b/xr/netbuffer/netwrite.cc @@ -0,0 +1,68 @@ +#include "netbuffer" +#include "balancer/balancer" + +unsigned Netbuffer::netwrite (int fd, int timeout) const { + PROFILE("Netbuffer::netwrite"); + + if (config.debug()) { + ostringstream o; + o << "About to write " << buf_sz << " bytes to fd " << fd + << ", timeout " << timeout << "\n"; + debugmsg (o.str()); + } + + if (!buf_sz) + return (0); + + // Log to dump directory if requested + if (config.dumpdir().length()) { + ostringstream of; + of << config.dumpdir() << "/" << balancer.requestnr() << "." << fd; + FILE *f; + if ( (!(f = fopen (of.str().c_str(), "a"))) && + (!(f = fopen (of.str().c_str(), "w"))) ) + warnmsg ("Cannot write traffic log " + of.str() + ": " + + strerror(errno) + "\n"); + else { + fwrite (buf_data, 1, buf_sz, f); + fclose (f); + } + } + + // Send to the socket + unsigned totwritten = 0; + while (totwritten < buf_sz) { + // Wait for the socket to become writeable. + if (timeout) { + Fdset set (timeout); + set.add (fd); + if (set.writeable() != fd) + throw static_cast<Error>("Fd ") + fd + + " failed to become writable within " + timeout + " sec"; + } + + // Push bytes + ssize_t nwritten; + nwritten = write (fd, buf_data + totwritten, buf_sz - totwritten); + + if (config.debug()) { + ostringstream o; + o << "Sent " << nwritten << " bytes to fd " << fd << ": "; + for (unsigned i = totwritten; i < totwritten + nwritten; i++) + o << printable(buf_data[i]); + o << "\n"; + debugmsg (o.str()); + } + + // EINVAL / EINPROGRESS errors are handled as: retry + // If any bytes were written, we're ok + if (nwritten >= 1) + totwritten += nwritten; + else if (errno != EINVAL && errno != EINPROGRESS) + throw static_cast<Error>("Write/send failed: errno=") + + errno + ", " + strerror(errno) + ", result=" + nwritten; + } + + return buf_sz; +} + diff --git a/xr/netbuffer/opassign.cc b/xr/netbuffer/opassign.cc @@ -0,0 +1,9 @@ +#include "netbuffer" + +Netbuffer const &Netbuffer::operator= (Netbuffer const &other) { + if (this != &other) { + destroy(); + copy (other); + } + return (*this); +} diff --git a/xr/netbuffer/printable.cc b/xr/netbuffer/printable.cc @@ -0,0 +1,24 @@ +#include "netbuffer" + +string Netbuffer::printable (char ch) const { + ostringstream o; + + if (isprint(ch) && ch != '\\') { + o << ch; + return (o.str()); + } else if (ch == '\n') + return ("\\n"); + else if (ch == '\r') + return ("\\r"); + else if (ch == '\t') + return ("\\t"); + else { + char buf[10]; + sprintf (buf, "%3.3o", ch & 0xff); + o << "\\" << buf; + return (o.str()); + } + + // Avoid warnings + return ("."); +} diff --git a/xr/netbuffer/removeat.cc b/xr/netbuffer/removeat.cc @@ -0,0 +1,15 @@ +#include "netbuffer" + +bool Netbuffer::removeat(unsigned index, unsigned len) { + if (index >= buf_sz) + return false; + + if (index + len >= buf_sz) + buf_sz = index; + else { + memmove (buf_data + index + len, buf_data + index, + buf_sz - index - len); + buf_sz -= len; + } + return true; +} diff --git a/xr/netbuffer/reset.cc b/xr/netbuffer/reset.cc @@ -0,0 +1,5 @@ +#include "netbuffer" + +void Netbuffer::reset() { + buf_sz = 0; +} diff --git a/xr/netbuffer/setchar.cc b/xr/netbuffer/setchar.cc @@ -0,0 +1,9 @@ +#include "netbuffer" + +bool Netbuffer::setchar(unsigned offset, char ch) { + PROFILE("Netbuffer::setchar"); + if (offset >= buf_sz) + return false; + buf_data[offset] = ch; + return true; +} diff --git a/xr/netbuffer/strfind.cc b/xr/netbuffer/strfind.cc @@ -0,0 +1,15 @@ +#include "netbuffer" + +unsigned Netbuffer::strfind(char const *s) const { + PROFILE("Netbuffer::strfind"); + + if (!buf_sz) + return (0); + + char *cp = strnstr(buf_data, s, buf_sz); + if (cp) + return (cp - buf_data); + + return (0); +} + diff --git a/xr/profiler/profiler b/xr/profiler/profiler @@ -0,0 +1,18 @@ +#ifndef _PROFILER_ +#define _PROFILER_ + +#include "sys/sys" +#include "ThreadsAndMutexes/mutex/mutex" + +class Profiler { +public: + Profiler (char const *f); + ~Profiler(); +private: + char const *fname; + struct timeval tv_start; + static FILE *outf; +}; + +#endif + diff --git a/xr/profiler/profiler1.cc b/xr/profiler/profiler1.cc @@ -0,0 +1,9 @@ +#include "profiler" + +FILE *Profiler::outf; + +Profiler::Profiler (char const *f) { + fname = f; + gettimeofday(&tv_start, 0); +} + diff --git a/xr/profiler/profiler2.cc b/xr/profiler/profiler2.cc @@ -0,0 +1,22 @@ +#include "profiler" + +Profiler::~Profiler() { + struct timeval tv_end; + gettimeofday (&tv_end, 0); + double usec = + ( ((double)tv_end.tv_sec * 1000000 + tv_end.tv_usec) - + ((double)tv_start.tv_sec * 1000000 + tv_start.tv_usec) ); + + /* + cout << fname + << " start " << tv_start.tv_sec << " . " << tv_start.tv_usec + << ", end " << tv_end.tv_sec << '.' << tv_end.tv_usec + << ", usec " << usec << '\n'; + */ + + if (!outf) + outf = fopen ("/tmp/xr-prof.txt", "w"); + if (outf) + fprintf (outf, "%s %s %g\n", + timestamp().c_str(), fname, usec); +} diff --git a/xr/sys/debugmsg.cc b/xr/sys/debugmsg.cc @@ -1,8 +1,11 @@ #include "sys" #include "config/config" #include "ThreadsAndMutexes/mutex/mutex" +#include "profiler/profiler" void debugmsg (string const &s) { + PROFILE("debugmsg"); + if (config.debug()) { Mutex::lock(&cerr); if (config.prefixtimestamp()) diff --git a/xr/sys/fdwrite.cc b/xr/sys/fdwrite.cc @@ -2,7 +2,11 @@ #include "fdset/fdset" #include "balancer/balancer" +// obsolete -> replaced by netbuffer + void fdwrite (int fd, int timeout, char const *buf, unsigned buflen) { + PROFILE("fdwrite"); + if (config.debug()) { ostringstream o; o << "About to write " << buflen << " bytes to fd " << fd diff --git a/xr/sys/ipmatch.cc b/xr/sys/ipmatch.cc @@ -2,6 +2,8 @@ #include "../config/config" bool ipmatch (struct in_addr adr, struct in_addr mask) { + PROFILE("ipmatch"); + long laddr, lmask; memcpy (&laddr, &adr, sizeof(long)); memcpy (&lmask, &mask, sizeof(long)); diff --git a/xr/sys/main.cc b/xr/sys/main.cc @@ -3,6 +3,7 @@ #include "config/config" #include "balancer/balancer" #include "error/error" +#include "profiler/profiler" using namespace std; @@ -17,6 +18,9 @@ static void sigcatcher (int sig) { } int main (int argc, char **argv) { + + PROFILE("main"); + static int relevant_sig[] = { SIGHUP, SIGINT, SIGQUIT, SIGABRT, SIGTERM, SIGSTOP, SIGPIPE }; diff --git a/xr/sys/msg.cc b/xr/sys/msg.cc @@ -1,8 +1,11 @@ #include "sys" #include "config/config" #include "ThreadsAndMutexes/mutex/mutex" +#include "profiler/profiler" void msg (string const &s) { + PROFILE("msg"); + if (config.verbose()) { Mutex::lock(&cerr); if (config.prefixtimestamp()) diff --git a/xr/sys/serversocket.cc b/xr/sys/serversocket.cc @@ -1,7 +1,10 @@ #include "sys" #include "../error/error" +#include "profiler/profiler" int serversocket (string addr, int port, string desc) { + PROFILE("serversocket"); + int sock; // Create the server socket, set options diff --git a/xr/sys/socketclose.cc b/xr/sys/socketclose.cc @@ -2,6 +2,8 @@ #include "../config/config" void socketclose (int fd) { + PROFILE("socketclose"); + if (config.debug()) { ostringstream o; o << fd; diff --git a/xr/sys/str2parts.cc b/xr/sys/str2parts.cc @@ -2,6 +2,8 @@ #include "config/config" vector<string> str2parts (string const &s, char sep) { + PROFILE("str2parts"); + string str = s; int pos; vector<string> parts; diff --git a/xr/sys/sys b/xr/sys/sys @@ -41,6 +41,13 @@ #include <vector> #include <queue> +/* Profiling support on/off */ +#ifdef PROFILER +# define PROFILE(x) Profiler local_prof(x) +#else +# define PROFILE(x) +#endif + /* Generic functions */ using namespace std; diff --git a/xr/tcpdispatcher/dispatch.cc b/xr/tcpdispatcher/dispatch.cc @@ -11,8 +11,9 @@ void TcpDispatcher::dispatch() { for (unsigned i = 0; i < balancer.nbackends(); i++) if (balancer.backend(i).available()) { target_list.add(i); - msg (" Candidate target: " + - balancer.backend(i).description() + "\n"); + if (config.verbose()) + msg (" Candidate target: " + + balancer.backend(i).description() + "\n"); } } @@ -23,16 +24,19 @@ void TcpDispatcher::dispatch() { Backend tb = balancer.backend(target_backend); if (!tb.connect()) { balancer.backend(target_backend).live(false); - msg ("Failed to connect to back end " + tb.description() + - ", trying to dispatch to other\n"); + if (config.verbose()) + msg ("Failed to connect to back end " + tb.description() + + ", trying to dispatch to other\n"); } else { connected = true; backendfd(tb.sock()); - ostringstream o; - o << tb.sock(); - msg ("Will dispatch client to back end " + tb.description() + - " on fd " + o.str() + "\n"); + if (config.verbose()) { + ostringstream o; + o << tb.sock(); + msg ("Will dispatch client to back end " + tb.description() + + " on fd " + o.str() + "\n"); + } break; } diff --git a/xr/tcpdispatcher/execute.cc b/xr/tcpdispatcher/execute.cc @@ -6,14 +6,17 @@ static time_t accesslog_lastclean = 0; void TcpDispatcher::execute() { ostringstream o; o << clientfd(); - msg ("Dispatch request for client fd " + o.str() + "\n"); - /* Check 'softmaxconnrate' and 'hardmaxconnrate' now! */ + if (config.verbose()) + msg ("Dispatch request for client fd " + o.str() + "\n"); + + // Check 'softmaxconnrate' and 'hardmaxconnrate' now! // Descend into this block if connrate_time() is set, AND // either hardmaxconnrate() is set, // or both softmaxconnrate() and defertime() are set. - if (config.connrate_time() && (config.hardmaxconnrate() - || (config.softmaxconnrate() && config.defertime()))) { + if (config.connrate_time() && + (config.hardmaxconnrate() || + (config.softmaxconnrate() && config.defertime()))) { time_t now, min_ts; now = time(0); min_ts = now - config.connrate_time(); @@ -23,7 +26,7 @@ void TcpDispatcher::execute() { Mutex::lock (&accesslog[client_ip.s_addr]); accesslog[client_ip.s_addr].push(now); Mutex::unlock (&accesslog[client_ip.s_addr]); - + if (accesslog_lastclean < min_ts) { // Clean the entire access log, it's been a while... @@ -31,10 +34,10 @@ void TcpDispatcher::execute() { accesslog_lastclean = now; Mutex::unlock(&accesslog_lastclean); - for ( map<unsigned long, queue <time_t> >::iterator i=accesslog.begin(); - i != accesslog.end(); - i++ ) { - + for (map<unsigned long, queue <time_t> >::iterator i = + accesslog.begin(); + i != accesslog.end(); + i++ ) { if (accesslog[i->first].back() < min_ts) { // This IP hasn't made ANY connections in a while -- erase! accesslog.erase(i); @@ -43,60 +46,53 @@ void TcpDispatcher::execute() { // have only "recent" connections left. Mutex::lock(&accesslog[i->first]); while ( accesslog[i->first].front() < min_ts - || accesslog[i->first].size() > max_conns ) { + || accesslog[i->first].size() > max_conns ) { accesslog[i->first].pop(); } Mutex::unlock(&accesslog[i->first]); - } + } } - } else { - // The "big log" doesn't need to be fully cleaned, but this particular - // IP should be! + // The "big log" doesn't need to be fully cleaned, + // but this particular IP should be! Mutex::lock(&accesslog[client_ip.s_addr]); while ( accesslog[client_ip.s_addr].front() < min_ts - || accesslog[client_ip.s_addr].size() > max_conns ) { + || accesslog[client_ip.s_addr].size() > max_conns ) { accesslog[client_ip.s_addr].pop(); } Mutex::unlock(&accesslog[client_ip.s_addr]); } - - - if ( config.hardmaxconnrate() - && accesslog[client_ip.s_addr].size() >= config.hardmaxconnrate() ) { + if (config.hardmaxconnrate() && + accesslog[client_ip.s_addr].size() >= config.hardmaxconnrate() ) { // This IP has violated the "HARD" limit! Reject the connection - ostringstream oa, om, ot; - oa << accesslog[client_ip.s_addr].size(); - om << config.hardmaxconnrate(); - ot << config.connrate_time(); - cerr << "WARNING: Client " << inet_ntoa(client_ip) << " has hit " << - "the HARD maximum number of connections (" << om.str() << - " connections in " << ot.str() << " seconds; " << oa.str() << - " connections recorded). This client's connection is being " << - "refused.\n"; + ostringstream o; + o << "Client " << inet_ntoa(client_ip) + << " has hit the HARD maximum number of connections (" + << config.hardmaxconnrate() << " conections in " + << config.connrate_time() << " seconds; " + << accesslog[client_ip.s_addr].size() + << " connections recorded). Client is refused.\n"; + warnmsg (o.str()); socketclose(clientfd()); return; - } else if (config.softmaxconnrate() - && accesslog[client_ip.s_addr].size() >= config.softmaxconnrate() ) { - // This IP has violated the "SOFT" Limit. Go to sleep for a while. - ostringstream oa, od, om, ot; - oa << accesslog[client_ip.s_addr].size(); - od << config.defertime(); - om << config.softmaxconnrate(); - ot << config.connrate_time(); - cerr << "WARNING: Client " << inet_ntoa(client_ip) << " has hit " << - "the SOFT maximum number of connections (" << om.str() << - " connections in " << ot.str() << " seconds; " << oa.str() << - " connections recorded). This client's connection is being " << - "deferred for " << od.str() << - " microseconds.\n"; + } else if (config.softmaxconnrate() && + (accesslog[client_ip.s_addr].size() >= + config.softmaxconnrate())) { + // This IP has violated the "SOFT" Limit. Go to sleep for a while. + ostringstream o; + o << "Client " << inet_ntoa(client_ip) + << " has hit the SOFT maximum number of connections (" + << config.softmaxconnrate() << " connections in " + << config.connrate_time() << " sedonds; " + << accesslog[client_ip.s_addr].size() + << " connections recorded). Client is deferred for " + << config.defertime() << " microseconds.\n"; + warnmsg (o.str()); usleep(config.defertime()); } } - - try { dispatch(); } catch (Error const &e) { @@ -105,13 +101,13 @@ void TcpDispatcher::execute() { return; } - ostringstream co; - co << clientfd(); - ostringstream bo; - bo << backendfd(); - msg ("Dispatching client fd " + co.str() + " to " + - balancer.backend(target_backend).description() + - ", fd " + bo.str() + "\n"); + if (config.verbose()) { + ostringstream o; + o << "Dispatching client fd " << clientfd() << " to " + << balancer.backend(target_backend).description() + << ", fd " << backendfd() << "\n"; + msg (o.str()); + } balancer.backend(target_backend).startconnection(); @@ -125,7 +121,11 @@ void TcpDispatcher::execute() { socketclose (clientfd()); socketclose (backendfd()); - msg ("Done dispatching client fd " + co.str() + " at " + - balancer.backend(target_backend).description() + "\n"); + if (config.verbose()) { + ostringstream o; + o << "Done dispatching client fd " << backendfd() << " at " + << balancer.backend(target_backend).description() << "\n"; + msg (o.str()); + } } diff --git a/xr/tcpdispatcher/handle.cc b/xr/tcpdispatcher/handle.cc @@ -16,14 +16,7 @@ void TcpDispatcher::handle() { int sock; if ((sock = readset.readable()) < 0) break; - - if (config.debug()) { - ostringstream o; - o << sock; - debugmsg ("Data waiting on fd " + o.str() + "\n"); - } - - if (!readchunk (sock)) + if (!netbuffer.netread(sock)) break; int othersock, timeout; @@ -35,13 +28,16 @@ void TcpDispatcher::handle() { timeout = config.client_timeout(); } - ostringstream o; - o << "Had data on " << sock << ", sending to " << othersock << "\n"; - debugmsg (o.str()); - - fdwrite (othersock, timeout, databuf(), databufsize()); + if (config.debug()) { + ostringstream o; + o << "Had data on " << sock << ", sending to " << othersock << "\n"; + debugmsg (o.str()); + } + netbuffer.netwrite (othersock, timeout); if (sock == backendfd()) - balancer.backend(target_backend).addbytes(databufsize()); + balancer.backend(target_backend).addbytes(netbuffer.bufsz()); + + netbuffer.reset(); } } diff --git a/xr/tcpdispatcher/readchunk.cc b/xr/tcpdispatcher/readchunk.cc @@ -1,5 +1,7 @@ #include "tcpdispatcher" +// obsolete -> replaced by netbuffer + unsigned TcpDispatcher::readchunk (int src) { ssize_t nread = read (src, data_buf, config.buffersize()); if (nread < 0) diff --git a/xr/tcpdispatcher/tcpdispatcher b/xr/tcpdispatcher/tcpdispatcher @@ -6,6 +6,7 @@ #include "config/config" #include "ThreadsAndMutexes/thread/thread" #include "backendvector/backendvector" +#include "netbuffer/netbuffer" // Dispatching algorithm workers #include "DispatchAlgorithms/algorithm/algorithm" @@ -36,21 +37,30 @@ public: void clientfd(int c) { client_fd = c; } int backendfd() const { return backend_fd; } void backendfd(int b) { backend_fd = b; } + + // obsolete 2 char const *databuf() const { return data_buf; } unsigned databufsize() const { return data_bufsz; } + BackendVector const &targetlist() const { return target_list; } void targetlist (BackendVector t) { target_list = t; } unsigned readchunk (int src); private: + // obsolete string printable (char ch) const; + struct in_addr client_ip; int target_backend, client_fd, backend_fd; + + // obsolete 2 char *data_buf; unsigned data_bufsz; + Algorithm *algorithm; BackendVector target_list; + Netbuffer netbuffer; }; #endif diff --git a/xr/tcpdispatcher/tcpdispatcher1.cc b/xr/tcpdispatcher/tcpdispatcher1.cc @@ -2,7 +2,7 @@ TcpDispatcher::TcpDispatcher(int cfd, struct in_addr cip): Thread(), client_ip(cip), target_backend(-1), client_fd(cfd), - backend_fd(-1), data_bufsz(0), target_list() { + backend_fd(-1), data_bufsz(0), target_list(), netbuffer() { // Set up a data buffer for network transfers data_buf = new char[config.buffersize()]; diff --git a/xr/webinterface/serve.cc b/xr/webinterface/serve.cc @@ -5,17 +5,8 @@ void Webinterface::serve () { o << cfd; msg ("Webinterface serving request on client fd " + o.str() + "\n"); - char databuf[config.buffersize()]; Httpbuffer clientrequest; - do { - Fdset set (config.client_timeout()); - set.add(cfd); - int nread; - if (set.readable() != cfd || - ( nread = read(cfd, databuf, config.buffersize()) ) < 0) - throw static_cast<Error>("Read failure on fd ") + cfd; - clientrequest.add(databuf, nread); - } while (clientrequest.bodyreceived() == Httpbuffer::b_is_not_received); + clientrequest.netread(cfd, config.client_timeout()); msg ("Webinterface request: " + clientrequest.firstline() + "\n"); answer(clientrequest);