crossroads

Git mirror of https://crossroads.e-tunity.com/
git clone git://git.finwo.net/app/crossroads
Log | Files | Refs | LICENSE

commit e08f730be61b2d2c337b15ef201d6d327759fce8
parent 9dc6c1ffc517c8140ddc36208349121f305f69de
Author: finwo <finwo@pm.me>
Date:   Sat,  3 Jan 2026 19:37:47 +0100

2.55

Diffstat:
MChangeLog | 15+++++++++++++++
MMakefile | 2+-
Mdoc/xr.odt | 0
Mdoc/xr.pdf | 0
Mdoc/xrctl.1 | 7+++++--
Atest/xr-analyze-test | 15+++++++++++++++
Atest/xr-http-test | 36++++++++++++++++++++++++++++++++++++
Atest/xr-mysql-connect | 13+++++++++++++
Atest/xr-smtp-test | 43+++++++++++++++++++++++++++++++++++++++++++
Dtest/xr-test | 36------------------------------------
Mxr/Checkers/checkupthread/execute.cc | 1+
Mxr/Checkers/wakeupthread/execute.cc | 3++-
Mxr/DispatchAlgorithms/leastconn/leastconn | 1+
Mxr/DispatchAlgorithms/leastconn/target.cc | 13++-----------
Mxr/DispatchAlgorithms/storedip/storedip | 1+
Mxr/DispatchAlgorithms/storedip/target.cc | 72+++++++-----------------------------------------------------------------
Mxr/Dispatchers/httpdispatcher/handle.cc | 7++++++-
Mxr/Dispatchers/tcpdispatcher/dispatch.cc | 10+---------
Mxr/Dispatchers/tcpdispatcher/handle.cc | 27++++++++++++++-------------
Mxr/Dispatchers/tcpdispatcher/tcpdispatcher | 1+
Dxr/backend/anticipated.cc | 9---------
Dxr/backend/anticipateless.cc | 9---------
Dxr/backend/anticipatemore.cc | 7-------
Mxr/backend/available.cc | 20++++++++++++--------
Mxr/backend/backend | 46++++++++++++++++++++++++----------------------
Mxr/backend/backend1.cc | 5+++--
Mxr/backend/backend2.cc | 4++--
Mxr/backend/connect.cc | 20++++++++++++++------
Axr/backend/markconnecterror.cc | 8++++++++
Mxr/balancer/addbackend2.cc | 16+++++++++++-----
Mxr/balancer/init.cc | 8+++++---
Mxr/balancer/serve.cc | 9+++++++--
Mxr/etc/Makefile.class | 3++-
Mxr/etc/status-nosavebutton.xslt | 7+++++++
Mxr/etc/usage.txt | 4++--
Mxr/fdset/fdset | 36+++++++++++++++++-------------------
Mxr/fdset/fdset1.cc | 3+++
Dxr/fdset/readable.cc | 67-------------------------------------------------------------------
Dxr/fdset/readwriteable.cc | 58----------------------------------------------------------
Axr/fdset/wait.cc | 65+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Dxr/fdset/writeable.cc | 59-----------------------------------------------------------
Axr/ipstore/activity.cc | 15+++++++++++++++
Axr/ipstore/anticipated.cc | 20++++++++++++++++++++
Axr/ipstore/clear.cc | 9+++++++++
Axr/ipstore/ipstore | 41+++++++++++++++++++++++++++++++++++++++++
Axr/ipstore/target.cc | 45+++++++++++++++++++++++++++++++++++++++++++++
Mxr/netbuffer/netread.cc | 3++-
Mxr/netbuffer/netwrite.cc | 3++-
Mxr/webinterface/answerstatus.cc | 2+-
Mxr/webinterface/execute.cc | 3++-
Mxrctl/xrctl | 56+++++++++++++++++++++++++++++++++++++++++++++++---------
51 files changed, 530 insertions(+), 433 deletions(-)

diff --git a/ChangeLog b/ChangeLog @@ -1,3 +1,18 @@ +2.55 [KK 2009-05-13] +- Implemented connection error counting of back ends. +- Select-handling revised: atomic readability and writeability checks, + see xr/fdset/wait.cc and wait_r(), wait_w(), wait_rw(). +- Checks for connect() success can be configured to verify only + writeable status of a socket (see xr/backend/connect.cc, macro + CONNECTCHECK_ONLY_WRITABLE in xr/etc/Makefile.class). +- Added test/xr-mysql-connect. +- Added commands kill and killstart to xrctl. Action restart renamed + to stopstart. +- Added simple benchmarking under test/: xr-http-test (based on an + older script) and xr-smtp-test. Plus a simple analyzer of the + reported timings, xr-analyze-test. +- Revamped stored-ip dispatcher, added IPStore class to keep track of clients. + 2.54 [KK 2009-04-29] - Stamped new STABLE version. diff --git a/Makefile b/Makefile @@ -1,7 +1,7 @@ # Top-level Makefile for XR # ------------------------- -VER = 2.54 +VER = 2.55 PREFIX = $(DESTDIR)/usr BINDIR = $(PREFIX)/sbin MANDIR = $(PREFIX)/share/man diff --git a/doc/xr.odt b/doc/xr.odt Binary files differ. diff --git a/doc/xr.pdf b/doc/xr.pdf Binary files differ. diff --git a/doc/xrctl.1 b/doc/xrctl.1 @@ -19,8 +19,10 @@ start to start all your services, or xrctl stop to stop them. list [SERVICE] - show configuration of a service, or of all start [SERVICE] - start a service, or all configured services stop [SERVICE] - stop a service, or all configured services +kill [SERVICE] - brutal stop, interrupts connections force [SERVICE] - start a service (or all) if not running -restart [SERVICE] - stop and start a service, or all +stopstart [SERVICE] - stop and start a service, or all +killstart [SERVICE] - kill and start a service, or all status [SERVICE] - show running status of a service, or of all rotate [SERVICE] - rotate logs of a service or of all generateconfig [SERVICE] - queries running XR's for the configuration @@ -35,4 +37,5 @@ xrctl was written by Karel Kubat <karel@kubat.nl>. Web page: http://crossroads.e-tunity.com .P -This man page was written by Frederik Dannemare <frederik@dannemare.net>. +This man page was written by Frederik Dannemare +<frederik@dannemare.net> and updated by Karel Kubat <karel@kubat.nl>. diff --git a/test/xr-analyze-test b/test/xr-analyze-test @@ -0,0 +1,15 @@ +#!/usr/bin/perl + +use strict; + +my $n = 0; +my $tot = 0; +while (my $line = <STDIN>) { + next unless ($line =~ /^\d/); + $tot += $line; + $n++; +} +exit(1) unless ($n); + +print ("Total : $n\n", + "Average : ", $tot / $n, "\n"); diff --git a/test/xr-http-test b/test/xr-http-test @@ -0,0 +1,36 @@ +#!/usr/bin/perl + +use strict; +use LWP::UserAgent; +use Time::HiRes qw(sleep gettimeofday tv_interval); + +$|++; + +die <<"ENDUSAGE" if ($#ARGV != 2); + +Usage: $0 URL THREADS DURATION +Will start THREADS to get URL. The entire test will run for DURATION +seconds. + +ENDUSAGE + +my ($url, $threads, $duration) = @ARGV; +for my $i (1..$threads) { + next if (fork()); + + my $t_start = [gettimeofday()]; + my $runs = 0; + while (tv_interval($t_start) < $duration) { + $runs++; + my $t_run = [gettimeofday()]; + my $ua = LWP::UserAgent->new(); + $ua->timeout(5); + my $resp = $ua->get($url); + print(tv_interval($t_run), "\n") + if ($resp->is_success()); + } + exit(0); +} + +while(wait() != -1) { +} diff --git a/test/xr-mysql-connect b/test/xr-mysql-connect @@ -0,0 +1,13 @@ +#!/bin/sh + +# Where does your mysql live? +mysql=/usr/local/mysql/bin/mysql + +# Get the host and port from the argument (which is the back end, +# as host:port) +host=`echo $1 | sed 's/:.*//'` +port=`echo $1 | sed 's/.*://'` + +# Try to connect to the host and port. Print the result on stdout. +echo quit | $mysql -h $host -P $port --protocol=tcp --connect-timeout=3 +echo $? diff --git a/test/xr-smtp-test b/test/xr-smtp-test @@ -0,0 +1,43 @@ +#!/usr/bin/perl + +use strict; +use Net::SMTP; +use Time::HiRes qw(sleep gettimeofday tv_interval); + +$|++; + +die <<"ENDUSAGE" if ($#ARGV != 4); + +Usage: $0 ENDPOINT FROM-ADDRESS TO-ADDRESS THREADS DURATION +Will start THREADS to send dummy e-mails to TO-ADDRESS. The test will run +for DURATION seconds. ENDPOINT is the IP address to connect to. + +ENDUSAGE + +my ($endpoint, $from, $address, $threads, $duration) = @ARGV; +for my $i (1..$threads) { + next if (fork()); + + my $t_start = [gettimeofday()]; + my $runs = 0; + while (tv_interval($t_start) < $duration) { + $runs++; + my $t_run = [gettimeofday()]; + my $smtp = Net::SMTP->new($endpoint, Timeout => 5) + or die ("Cannot start SMTP\n"); + $smtp->mail($from); + $smtp->to($address); + $smtp->data(); + $smtp->datasend("To: $address\n"); + $smtp->datasend("Subject: Testing mail\n"); + $smtp->datasend("\n"); + $smtp->datasend("This is just a test message.\n"); + $smtp->dataend(); + $smtp->quit(); + print(tv_interval($t_run), "\n"); + } + exit(0); +} + +while(wait() != -1) { +} diff --git a/test/xr-test b/test/xr-test @@ -1,36 +0,0 @@ -#!/usr/bin/perl - -use strict; -use LWP::UserAgent; -use Time::HiRes qw(sleep gettimeofday tv_interval); - -die ("Usage: $0 URL TRIALS\n") if ($#ARGV != 1); -$|++; - -my $n = 0; -my $totaltime = 0; - -my $ua = LWP::UserAgent->new(); -$ua->timeout(5); -while ($n < $ARGV[1]) { - my $success = 0; - for my $i (1..10) { - my $t0 = [ gettimeofday() ]; - my $resp = $ua->get($ARGV[0]); - if ($resp->is_success()) { - $success++; - $n++; - $totaltime += tv_interval ($t0); - last; - } - if ($i == 3) { - die ($resp->status_line(), "\n"); - } - sleep (1); - } - sleep (0.1); - printf ("\rN: %5d Tot:%8.3g Avg:%8.3g", $n, $totaltime, $totaltime / $n) - if (! ($n % 10)); -} - -print ("\n"); diff --git a/xr/Checkers/checkupthread/execute.cc b/xr/Checkers/checkupthread/execute.cc @@ -17,6 +17,7 @@ void Checkupthread::execute() { " is alive\n"); } else { balancer.backend(i).live(false); + balancer.backend(i).markconnecterror(); msg ("Checkup call: backend " + target.description() + " is unavailable\n"); if (config.onfail().length()) { diff --git a/xr/Checkers/wakeupthread/execute.cc b/xr/Checkers/wakeupthread/execute.cc @@ -15,7 +15,8 @@ void Wakeupthread::execute() { target.description() + " has awoken\n"); balancer.backend(i).live(true); - } + } else + balancer.backend(i).markconnecterror(); } catch (...) { socketclose(target.sock()); } diff --git a/xr/DispatchAlgorithms/leastconn/leastconn b/xr/DispatchAlgorithms/leastconn/leastconn @@ -5,6 +5,7 @@ #include "error/error" #include "balancer/balancer" #include "DispatchAlgorithms/algorithm/algorithm" +#include "ipstore/ipstore" class Leastconn: public Algorithm { public: diff --git a/xr/DispatchAlgorithms/leastconn/target.cc b/xr/DispatchAlgorithms/leastconn/target.cc @@ -4,6 +4,7 @@ unsigned Leastconn::target(struct in_addr clientip, BackendVector const &targetlist) { PROFILE("Leastconn::target"); + msg (Mstr("Starting least-connections dispatcher\n")); bool found = false; unsigned best_weighted = 0, t = 0; @@ -13,18 +14,8 @@ unsigned Leastconn::target(struct in_addr clientip, continue; unsigned this_weight = (balancer.backend(targetlist[i]).connections() + - balancer.backend(targetlist[i]).anticipated()) * + IPStore::anticipated(targetlist[i])) * balancer.backend(targetlist[i]).adjustedweight(); - msg ("Back end " + balancer.backend(targetlist[i]).description() + - (Mstr(": connections ") - + balancer.backend(targetlist[i]).connections()) + - (Mstr(", anticipated ") - + balancer.backend(targetlist[i]).anticipated()) + - (Mstr(", adjusted weight ") + - balancer.backend(targetlist[i]).adjustedweight()) + - (Mstr(", weighted ") + this_weight) + - "\n"); - if (!found || this_weight < best_weighted) { t = targetlist[i]; best_weighted = this_weight; diff --git a/xr/DispatchAlgorithms/storedip/storedip b/xr/DispatchAlgorithms/storedip/storedip @@ -8,6 +8,7 @@ #include "timestamp/timestamp" #include "DispatchAlgorithms/algorithm/algorithm" #include "DispatchAlgorithms/leastconn/leastconn" +#include "ipstore/ipstore" class StoredIp: public Algorithm { public: diff --git a/xr/DispatchAlgorithms/storedip/target.cc b/xr/DispatchAlgorithms/storedip/target.cc @@ -1,62 +1,20 @@ #include "storedip" -struct ClientData { - int targetbackend; - time_t lastaccess; -}; - -struct ClientDataCmp { - bool operator() (struct in_addr a, struct in_addr b) const { - long la, lb; - memcpy (&la, &a, sizeof(long)); - memcpy (&lb, &b, sizeof(long)); - return (la - lb) < 0; - } -}; - -typedef map<struct in_addr, ClientData, ClientDataCmp> StoreMap; -static StoreMap store; - unsigned StoredIp::target(struct in_addr clientip, BackendVector const &targetlist) { PROFILE("StoredIP::target"); + IPStore::on(); msg(Mstr("Starting stored-ip dispatcher\n")); - - unsigned target; - time_t now = time(0); - // Weed out store. Done first, because the store should be up to date - // for some decisions below. - for (StoreMap::iterator iter = store.begin(); iter != store.end(); - iter++) { - if (config.debug()) { - Timestamp tm((*iter).second.lastaccess); - debugmsg (Mstr(inet_ntoa(iter->first)) + Mstr(" visited on ") + - tm.desc() + "\n"); - } - if (now - ((*iter).second.lastaccess) > config.ipstoretimeout()) { - debugmsg (" Erasing stale entry\n"); - store.erase(iter); - } - } - - // Let's see if we know the client. - if (store.count(clientip) > 0) { - if (config.verbose()) { - Timestamp tm(store[clientip].lastaccess); - msg(Mstr("Client IP ") + Mstr(inet_ntoa(clientip)) + - " last visited on " + tm.desc() + " and went to " + - balancer.backend(store[clientip].targetbackend).description() + - "\n"); - } - target = store[clientip].targetbackend; + int tb; + if ( (tb = IPStore::target(clientip)) >= 0 ) { + unsigned target = tb; + IPStore::clear(clientip); if (balancer.backend(target).available()) { // Historical target is up, go there! msg(Mstr("Sending ") + Mstr(inet_ntoa(clientip)) + " to " + balancer.backend(target).description() + "\n"); - ClientData entry = {target, now}; - store[clientip] = entry; return target; } msg (Mstr("Historical target ") + @@ -64,28 +22,12 @@ unsigned StoredIp::target(struct in_addr clientip, if (config.dispatchmode() == Dispatchmode::m_strict_stored_ip) throw Error("Stored-IP algorithm: target back end " + balancer.backend(target).description() + - "unavailable"); + " unavailable"); } // Client is seen for the first time, or after the timout period, or // their preferred back end is down (and we're in lax mode ofc). // Treat as new connection. - - // Preload anticipated connections. - for (StoreMap::iterator iter = store.begin(); iter != store.end(); - iter++) { - msg(Mstr("Anticipating connection for back end ") + - balancer.backend((*iter).second.targetbackend).description() + - "\n"); - balancer.backend((*iter).second.targetbackend).anticipate_more(); - } - - // Now get a target and store the dispatch result. Leastconn l; - target = l.target(clientip, targetlist); - ClientData entry = {target, now}; - store[clientip] = entry; - - // Return target to caller - return target; + return l.target(clientip, targetlist); } diff --git a/xr/Dispatchers/httpdispatcher/handle.cc b/xr/Dispatchers/httpdispatcher/handle.cc @@ -38,9 +38,14 @@ void HttpDispatcher::handle() { config.backend_read_timeout())); readset.add(clientfd()); readset.add(backendfd()); + readset.wait_r(); int sock; - if ((sock = readset.readable()) < 0) + if (readset.readable(clientfd())) + sock = clientfd(); + else if (readset.readable(backendfd())) + sock = backendfd(); + else break; buf().reset(); diff --git a/xr/Dispatchers/tcpdispatcher/dispatch.cc b/xr/Dispatchers/tcpdispatcher/dispatch.cc @@ -8,14 +8,6 @@ void TcpDispatcher::dispatch() { bool connected = false; - // Reset the expectancy of back ends. Dispatchers down the line (stored-ip) - // will update that later. - for (unsigned i = 0; i < balancer.nbackends(); i++) { - msg (Mstr("Resetting anticipation for back end ") + - Mstr(i) + "\n"); - balancer.backend(i).anticipated(0); - } - // Build up the target list, if not yet done so. The HTTP dispatcher // might've created it already for host-based matching (in which case // we won't bother here). @@ -45,7 +37,7 @@ void TcpDispatcher::dispatch() { backendfd(tb.sock()); msg ((Mstr("Will dispatch client to back end ") + tb.description()) + - (Mstr(" on fd ") + tb.sock()) + "\n"); + (Mstr(" on fd ") + tb.sock()) + "\n"); break; } } diff --git a/xr/Dispatchers/tcpdispatcher/handle.cc b/xr/Dispatchers/tcpdispatcher/handle.cc @@ -9,30 +9,31 @@ void TcpDispatcher::handle() { while (1) { Fdset readset(maxtimeout(config.client_read_timeout(), config.backend_read_timeout())); - readset.add (clientfd()); - readset.add (backendfd()); + readset.add(clientfd()); + readset.add(backendfd()); + readset.wait_r(); - int sock; - if ((sock = readset.readable()) < 0) - break; - if (!netbuffer.netread(sock)) - break; - - int othersock, timeout; - if (sock == clientfd()) { + int sock, othersock, timeout; + if (readset.readable(clientfd())) { + sock = clientfd(); othersock = backendfd(); timeout = config.backend_write_timeout(); - } else { + } else if (readset.readable(backendfd())) { + sock = backendfd(); othersock = clientfd(); timeout = config.client_write_timeout(); - } + } else + break; + if (!netbuffer.netread(sock)) + break; debugmsg (Mstr("Had data on ") + sock + (Mstr(", sending to ") + othersock) + "\n"); - netbuffer.netwrite (othersock, timeout); if (sock == backendfd()) balancer.backend(targetbackend()).addbytes(netbuffer.bufsz()); + else + IPStore::activity(clientip(), targetbackend()); netbuffer.reset(); } diff --git a/xr/Dispatchers/tcpdispatcher/tcpdispatcher b/xr/Dispatchers/tcpdispatcher/tcpdispatcher @@ -4,6 +4,7 @@ #include "Dispatchers/dispatcher/dispatcher" #include "netbuffer/netbuffer" #include "httpbuffer/httpbuffer" +#include "ipstore/ipstore" class TcpDispatcher: public Dispatcher { public: diff --git a/xr/backend/anticipated.cc b/xr/backend/anticipated.cc @@ -1,9 +0,0 @@ -#include "backend" - -void Backend::anticipated(unsigned a) { - Mutex::lock(&nanticipated); - nanticipated = a; - Mutex::unlock(&nanticipated); - msg((Mstr("Backend ") + description()) + - (Mstr(" now anticipates ") + a) + " connections\n"); -} diff --git a/xr/backend/anticipateless.cc b/xr/backend/anticipateless.cc @@ -1,9 +0,0 @@ -#include "backend" - -void Backend::anticipate_less() { - if (nanticipated) { - Mutex::lock(&nanticipated); - nanticipated--; - Mutex::unlock(&nanticipated); - } -} diff --git a/xr/backend/anticipatemore.cc b/xr/backend/anticipatemore.cc @@ -1,7 +0,0 @@ -#include "backend" - -void Backend::anticipate_more() { - Mutex::lock(&nanticipated); - nanticipated++; - Mutex::unlock(&nanticipated); -} diff --git a/xr/backend/available.cc b/xr/backend/available.cc @@ -1,19 +1,23 @@ #include "backend" #include "profiler/profiler" +#include "ipstore/ipstore" bool Backend::available() const { PROFILE("Backend::available"); - debugmsg((Mstr("Back end ") + description()) + - (Mstr(": ") + livestr()) + - (Mstr(", ") + upstr()) + - (Mstr(", ") + connections()) + - (Mstr(" connections, ") + anticipated()) + - (Mstr(" anticipated, of ") + maxconn()) + - " max\n"); + if (config.debug()) { + ostringstream o; + o << "Back end " << description() << ": " + << livestr() << ", " << upstr() << ", " + << connections() << " connections, " + << IPStore::anticipated(balancerindex()) << " anticipated, " + << maxconn() << " max\n"; + _debugmsg(o.str()); + } if (!maxconn()) return (islive && isup); return (islive && isup && - (connections() + anticipated()) < maxconn()); + (connections() + IPStore::anticipated(balancerindex()) < + maxconn())); } diff --git a/xr/backend/backend b/xr/backend/backend @@ -19,7 +19,9 @@ public: Backend (BackendDef const &b); virtual ~Backend(); bool connect(); - int sock() const { return (clsocket); } + void markconnecterror(); + + int sock() const { return clsocket; } void check(); string description() const; @@ -27,50 +29,46 @@ public: bool available() const; string availablestr() const; - bool live() const { return (islive); }; + bool live() const { return islive; }; void live (bool state); string livestr() const; void up (bool state); - bool up() const { return (isup); } + bool up() const { return isup; } string upstr() const; - string const &server() const { return (bdef.server()); } + string const &server() const { return bdef.server(); } void server(string s) { bdef.server(s); } - int port() const { return (bdef.port()); } + int port() const { return bdef.port(); } void port(int p) { bdef.port(p); } - unsigned maxconn() const { return (bdef.maxconn()); } + unsigned maxconn() const { return bdef.maxconn(); } void maxconn (unsigned m) { bdef.maxconn(m); } - string const &hostmatch() const { return (bdef.hostmatch()); } + string const &hostmatch() const { return bdef.hostmatch(); } void hostmatch(string const &s) { bdef.hostmatch(s); } - regex_t const &hostregex() const { return (bdef.hostregex()); } + regex_t const &hostregex() const { return bdef.hostregex(); } - unsigned weight() const { return (bdef.weight()); } + unsigned weight() const { return bdef.weight(); } void weight (unsigned w) { bdef.weight(w); } - unsigned adjustedweight() const { return (bdef.adjustedweight()); } + unsigned adjustedweight() const { return bdef.adjustedweight(); } - unsigned connections() const { return (nconn); } - - unsigned anticipated() const { return nanticipated; } - void anticipated(unsigned n); - void anticipate_more(); - void anticipate_less(); + unsigned connections() const { return nconn; } + unsigned connecterrors() const { return nconnerr; } - double bytesserved() const { return (bytes_served); } - unsigned clientsserved() const { return (totconn); } + double bytesserved() const { return bytes_served; } + unsigned clientsserved() const { return totconn; } - double loadavg() const { return (loadaverage); } + double loadavg() const { return loadaverage; } void loadavg(double l) { loadaverage = l; } void addbytes (unsigned n); void startconnection(); - void endconnection(); + void endconnection(); BackendDef const &backenddef() const { - return (bdef); + return bdef; } BackendCheck const &backendcheck() { @@ -79,6 +77,9 @@ public: void backendcheck(BackendCheck const &b) { bdef.backendcheck(b); } + + void balancerindex(int i) { index = i; } + int balancerindex() const { return index; } private: @@ -86,10 +87,11 @@ private: bool islive; bool isup; int clsocket; - unsigned nconn, totconn, nanticipated; + unsigned nconn, totconn, nconnerr; double bytes_served; double loadaverage; DNSEntry dnsentry; + int index; }; #endif diff --git a/xr/backend/backend1.cc b/xr/backend/backend1.cc @@ -2,6 +2,7 @@ Backend::Backend () : islive(true), isup(true), clsocket(-1), - nconn(0), totconn(0), nanticipated(0), bytes_served(0), - loadaverage(0.1), dnsentry() { + nconn(0), totconn(0), nconnerr(0), + bytes_served(0), + loadaverage(0.1), dnsentry(), index(-1) { } diff --git a/xr/backend/backend2.cc b/xr/backend/backend2.cc @@ -2,6 +2,6 @@ Backend::Backend (BackendDef const &b) : bdef(b), islive(true), isup(true), clsocket(-1), nconn(0), totconn(0), - nanticipated(0), - bytes_served(0), loadaverage(0.1), dnsentry() { + nconnerr(0), + bytes_served(0), loadaverage(0.1), dnsentry(), index(-1) { } diff --git a/xr/backend/connect.cc b/xr/backend/connect.cc @@ -52,13 +52,21 @@ bool Backend::connect() { // Wait for socket to go writable. Fdset fdset (config.backend_write_timeout()); fdset.add (clsocket); - int rwsock = fdset.readwriteable(); - int wsock = fdset.writeable(); - debugmsg (Mstr("Connecting to ") + description() + - Mstr(Mstr(": writesocket=") + wsock) + - Mstr(Mstr(", read/writesocket=") + rwsock) + "\n"); - if (wsock == clsocket && rwsock == -1) + fdset.wait_rw(); + + debugmsg(Mstr("Connectiong to ") + description() + "\n"); + +# ifdef CONNECTCHECK_ONLY_WRITABLE + if (fdset.writeable(clsocket)) + islive = true; + else + markconnecterror(); +# else + if (fdset.writeable(clsocket) && !fdset.readable(clsocket)) islive = true; + else + markconnecterror(); +# endif } debugmsg ((Mstr("Back end ") + description()) + diff --git a/xr/backend/markconnecterror.cc b/xr/backend/markconnecterror.cc @@ -0,0 +1,8 @@ +#include "backend" + +void Backend::markconnecterror() { + Mutex::lock(&nconnerr); + nconnerr++; + Mutex::unlock(&nconnerr); +} + diff --git a/xr/balancer/addbackend2.cc b/xr/balancer/addbackend2.cc @@ -2,13 +2,19 @@ void Balancer::addbackend (Backend const &b, bool is_up, bool is_live, bool do_check) { - Mutex::lock(&backends); - backends.push_back (b); - Mutex::unlock(&backends); + debugmsg(Mstr("Adding back end ") + b.description() + " to list\n"); + Mutex::lock(&backends); + backends.push_back (b); backends[backends.size() - 1].up(is_up); backends[backends.size() - 1].live(is_live); + backends[backends.size() - 1].balancerindex(backends.size() - 1); + Mutex::unlock(&backends); + + if (do_check) { + debugmsg(Mstr("Verifying configured back end\n")); + backends[backends.size() - 1].check(); + } - if (do_check) - backends[backends.size() - 1].check(); + debugmsg(Mstr("Back end ") + b.description() + " added to list\n"); } diff --git a/xr/balancer/init.cc b/xr/balancer/init.cc @@ -25,7 +25,9 @@ void Balancer::init() { for (int i = 0; i < config.backends(); i++) addbackend (config.backend(i)); - for (unsigned i = 0; i < nbackends(); i++) - msg ("Initial backend state: " + backend(i).description() + " is " + - backend(i).availablestr() + "\n"); + if (config.verbose()) { + for (unsigned i = 0; i < nbackends(); i++) + _msg ("Initial backend state: " + backend(i).description() + + " is " + backend(i).availablestr() + "\n"); + } } diff --git a/xr/balancer/serve.cc b/xr/balancer/serve.cc @@ -42,7 +42,9 @@ void Balancer::serve() { MEM(Memory::mem_display()); Fdset fdset(0); fdset.add (server_fd); - if (fdset.readable() < 0) { + fdset.wait_r(); + + if (! fdset.readable(server_fd)) { // We caught a signal. Either a request to report status, // or to terminate. msg ("Interrupt seen\n"); @@ -103,6 +105,7 @@ void Balancer::serve() { // Show how we look if (config.verbose()) { + ostringstream o; msg ((Mstr("Balancer is serving ") + connections()) + " clients\n"); msg ("Current back end states:\n"); @@ -110,7 +113,9 @@ void Balancer::serve() { msg((Mstr(" Back end ") + backend(i).description()) + (Mstr(": ") + backend(i).connections()) + (Mstr(" connections, max ") + backend(i).maxconn()) + - (Mstr(", status ") + backend(i).availablestr()) + "\n"); + (Mstr(", status ") + backend(i).availablestr()) + + (Mstr(", anticipated ") + IPStore::anticipated(i)) + + "\n"); } Dispatcher *d; diff --git a/xr/etc/Makefile.class b/xr/etc/Makefile.class @@ -3,6 +3,7 @@ OBJ = $(patsubst %.cc, $(BASE)/xr/$(BUILDDIR)/$(DIR)_%.o, $(SRC)) DIR = $(shell pwd | sed 's:.*/::') SYS = $(shell uname) HST = $(shell hostname) +# CCC = -DCONNECTCHECK_ONLY_WRITABLE ifeq ($(HST), Thera.local) ERRFLAG = -Werror @@ -15,7 +16,7 @@ $(BASE)/xr/$(BUILDDIR)/$(DIR)_%.o: %.cc $(CONF_CC) $(PROF) $(PROFILER) $(CONF_OPTFLAGS) \ -DVER='"$(VER)"' -DAUTHOR='"$(AUTHOR)"' -DHST='"$(HST)"' \ -DMAINTAINER='"$(MAINTAINER)"' -DDISTSITE='"$(DISTSITE)"' \ - -DSYS='"$(SYS)"' -D$(SYS) $(MEMDEBUG) \ + -DSYS='"$(SYS)"' -D$(SYS) $(MEMDEBUG) $(CCC) \ -DCONF_CC='"$(CONF_CC)"' -DCONF_LIB='"$(CONF_LIB)"' \ -DCONF_OPTFLAGS='"$(CONF_OPTFLAGS)"' $(CONF_STRNSTR) \ $(CONF_GETOPT) $(CONF_GETOPT_LONG) $(CONF_INET_ATON) \ diff --git a/xr/etc/status-nosavebutton.xslt b/xr/etc/status-nosavebutton.xslt @@ -646,6 +646,13 @@ </tr> <tr> <td></td> + <td>Connect failures</td> + <td colspan="2"> + <xsl:value-of select="connecterrors"/> + </td> + </tr> + <tr> + <td></td> <td>Served</td> <td colspan="2"> <xsl:value-of select="bytesserved"/> bytes, diff --git a/xr/etc/usage.txt b/xr/etc/usage.txt @@ -71,8 +71,8 @@ may not exist on your platform): /URI is not given, then "/" is assumed. external:PROGRAM - The PROGRAM is called with the arguments "IP:PORT", availability as "available" or "unavailable", and - the number of connections. The program must exit(0) to - indicate that the back end is alive. + the number of connections. The program must echo 0 to indicate + that the back end is alive. The default behavior is a TCP connect, to the back end's IP, at the back end's port. Use "-g connect::" to reset previous flags to the default. diff --git a/xr/fdset/fdset b/xr/fdset/fdset @@ -11,29 +11,27 @@ class Fdset { public: Fdset(int t); - int timeout() const { - return (tsec); - } - void timeout (int t) { - tsec = t; - } - void add (int fd) { - set.push_back(fd); - } - unsigned size() const { - return (set.size()); - } - int fd (unsigned index) const { - return (set[index]); - } - - int readable() const; - int writeable() const; - int readwriteable() const; + int timeout() const { return tsec; } + void timeout (int t) { tsec = t; } + + void add (int fd) { set.push_back(fd); } + + unsigned size() const { return set.size(); } + + int fd (unsigned index) { return set[index]; } + + void wait(bool wait_read, bool wait_write); + void wait_rw() { wait(true, true); } + void wait_r() { wait(true, false); } + void wait_w() { wait(false, true); } + + bool readable(int fd) { return FD_ISSET(fd, &readset); } + bool writeable(int fd) { return FD_ISSET(fd, &writeset); } private: int tsec; + fd_set readset, writeset, exceptset; vector<int> set; }; diff --git a/xr/fdset/fdset1.cc b/xr/fdset/fdset1.cc @@ -1,4 +1,7 @@ #include "fdset" Fdset::Fdset (int t) : tsec(t), set() { + FD_ZERO(&readset); + FD_ZERO(&writeset); + FD_ZERO(&exceptset); } diff --git a/xr/fdset/readable.cc b/xr/fdset/readable.cc @@ -1,67 +0,0 @@ -#include "fdset" - -int Fdset::readable() const { - PROFILE("Fdset::readable"); - - fd_set readset, exceptset; - struct timeval tv, *tvp; - - // No fd's? Nothing is readable. - if (set.size() < 1) - return (-1); - - if (config.debug()) { - ostringstream o; - o << "Candidate readable fd's:"; - for (unsigned i = 0; i < set.size(); i++) - o << ' ' << set[i]; - _debugmsg(o.str() + '\n'); - } - - // Prepare select sets. - FD_ZERO (&readset); - FD_ZERO (&exceptset); - for (unsigned i = 0; i < set.size(); i++) { - FD_SET (set[i], &readset); - FD_SET (set[i], &exceptset); - } - - // Prepare timout specifier. - if (tsec) { - tv.tv_sec = tsec; - tv.tv_usec = 0; - tvp = &tv; - } else - tvp = 0; - - // Run the select. Signal interrupts are returned as -1, so that - // the caller can handle them gracefully. - if (select (FD_SETSIZE, &readset, 0, &exceptset, tvp) < 0) { - debugmsg (Mstr("Select interrupted with errno ") + errno + - " while waiting for readable fd\n"); - if (errno != EINTR) - throw Error(string("Select failure: failed to wait for " - "readable state: ") + - strerror(errno)); - return (-1); - } - - // Check for exceptions. - for (unsigned i = 0; i < set.size(); i++) - if (FD_ISSET (set[i], &exceptset)) { - ostringstream o; - o << "Exception on fd/socket " << int(set[i]); - throw Error(o.str()); - } - - // Check what's readable. - for (unsigned i = 0; i < set.size(); i++) - if (FD_ISSET (set[i], &readset)) { - debugmsg (Mstr("Fd ") + set[i] + " has become readable\n"); - return (set[i]); - } - - // Nothing.. - debugmsg ("No readable fd's at this time\n"); - return (-1); -} diff --git a/xr/fdset/readwriteable.cc b/xr/fdset/readwriteable.cc @@ -1,58 +0,0 @@ -#include "fdset" - -int Fdset::readwriteable() const { - PROFILE("Fdset::readwriteable"); - - fd_set readset, writeset, exceptset; - struct timeval tv, *tvp; - - // No fd's? Nothing is readable. - if (set.size() < 1) - return (-1); - - // Prepare select sets. - FD_ZERO (&readset); - FD_ZERO (&writeset); - FD_ZERO (&exceptset); - for (unsigned i = 0; i < set.size(); i++) { - FD_SET (set[i], &readset); - FD_SET (set[i], &writeset); - FD_SET (set[i], &exceptset); - } - - // Prepare timout specifier. - if (tsec) { - tv.tv_sec = tsec; - tv.tv_usec = 0; - tvp = &tv; - } else - tvp = 0; - - // Run the select. - if (select (FD_SETSIZE, &readset, &writeset, &exceptset, tvp) < 0) { - if (errno != EINTR) - throw Error(string("Select failure: failed to wait for " - "read/writeablestate: ") + - strerror(errno)); - return (-1); - } - - // Check for exceptions. - for (unsigned i = 0; i < set.size(); i++) - if (FD_ISSET (set[i], &exceptset)) { - ostringstream o; - o << "Exception on fd/socket " << int(set[i]); - throw Error(o.str()); - } - - // Check what's active. - for (unsigned i = 0; i < set.size(); i++) - if (FD_ISSET (set[i], &readset) && FD_ISSET (set[i], &writeset)) { - debugmsg(Mstr("Fd " ) + set[i] + " has become read/writeable\n"); - return (set[i]); - } - - // Nothing.. - return (-1); -} - diff --git a/xr/fdset/wait.cc b/xr/fdset/wait.cc @@ -0,0 +1,65 @@ +#include "fdset" + +void Fdset::wait(bool wait_read, bool wait_write) { + PROFILE("Fdset::wait"); + + struct timeval tv, *tvp; + + // No fd's? Nothing to wait for. + if (set.size() < 1) + throw Error("Internal jam in Fdset::wait(): no fd's to wait for"); + + // Prepare select sets. + FD_ZERO (&readset); + FD_ZERO (&writeset); + FD_ZERO (&exceptset); + for (unsigned i = 0; i < set.size(); i++) { + FD_SET (set[i], &readset); + FD_SET (set[i], &writeset); + FD_SET (set[i], &exceptset); + debugmsg(Mstr("About to wait for fd ") + Mstr(set[i]) + "\n"); + } + + // Prepare timout specifier. + if (tsec) { + tv.tv_sec = tsec; + tv.tv_usec = 0; + tvp = &tv; + debugmsg(Mstr("Waiting limitation: ") + Mstr(tsec) + "\n"); + } else { + tvp = 0; + debugmsg(Mstr("No waiting limitation\n")); + } + + // Run the select. + if (select (FD_SETSIZE, + wait_read ? &readset : 0, + wait_write ? &writeset : 0, + &exceptset, tvp) < 0) { + if (errno != EINTR) + throw Error(string("Select failure: failed to wait: ") + + strerror(errno)); + FD_ZERO(&readset); + FD_ZERO(&writeset); + FD_ZERO(&exceptset); + return; + } + + // Check for exceptions. + for (unsigned i = 0; i < set.size(); i++) + if (FD_ISSET (set[i], &exceptset)) { + ostringstream o; + o << "Exception on fd/socket " << int(set[i]); + throw Error(o.str()); + } + + // More debugging + if (config.debug()) { + for (unsigned int i = 0; i < FD_SETSIZE; i++) { + if (FD_ISSET(i, &readset)) + _debugmsg(Mstr("Fd ") + Mstr(i) + " is readable\n"); + if (FD_ISSET(i, &writeset)) + _debugmsg(Mstr("Fd ") + Mstr(i) + " is writeable\n"); + } + } +} diff --git a/xr/fdset/writeable.cc b/xr/fdset/writeable.cc @@ -1,59 +0,0 @@ -#include "fdset" - -int Fdset::writeable() const { - PROFILE("Fdset::writeable"); - - fd_set writeset, exceptset; - struct timeval tv, *tvp; - - // No fd's? Nothing is writeable. - if (set.size() < 1) - return (-1); - - // Prepare select sets. - FD_ZERO (&writeset); - FD_ZERO (&exceptset); - for (unsigned i = 0; i < set.size(); i++) { - FD_SET (set[i], &writeset); - FD_SET (set[i], &exceptset); - } - - // Prepare timout specifier. - if (tsec) { - tv.tv_sec = tsec; - tv.tv_usec = 0; - tvp = &tv; - } else - tvp = 0; - - // Run the select. - if (select (FD_SETSIZE, 0, &writeset, &exceptset, tvp) < 0) { - debugmsg (Mstr("Select interrupted with errno ") + errno + - " while waiting for writeable fd\n"); - if (errno != EINTR) - throw Error(string("Select failure: failed to wait for " - "writable state: ") + - strerror(errno)); - return (-1); - } - - // Check for exceptions. - for (unsigned i = 0; i < set.size(); i++) - if (FD_ISSET (set[i], &exceptset)) { - ostringstream o; - o << "Exception on fd/socket " << int(set[i]); - throw Error(o.str()); - } - - // Check what's writeable. - for (unsigned i = 0; i < set.size(); i++) - if (FD_ISSET (set[i], &writeset)) { - debugmsg (Mstr("Fd ") + set[i] + " has become writeable\n"); - return (set[i]); - } - - // Nothing.. - debugmsg ("No writeable fd's at this time\n"); - return (-1); -} - diff --git a/xr/ipstore/activity.cc b/xr/ipstore/activity.cc @@ -0,0 +1,15 @@ +#include "ipstore" + +void IPStore::activity(struct in_addr clientip, unsigned curbackend) { + if (!onoff) + return; + + msg(Mstr("Logging activity for back end ") + Mstr(curbackend) + + Mstr(" from ") + inet_ntoa(clientip) + "\n"); + + Mutex::lock(&store); + store[clientip].targetbackend = (int)curbackend; + store[clientip].lastaccess = time(0); + Mutex::unlock(&store); +} + diff --git a/xr/ipstore/anticipated.cc b/xr/ipstore/anticipated.cc @@ -0,0 +1,20 @@ +#include "ipstore" + +unsigned IPStore::anticipated(unsigned b) { + if (!onoff || b >= balancer.nbackends()) + return 0; + + unsigned ret = 0; + + Mutex::lock(&store); + for (StoreMap::iterator iter = store.begin(); + iter != store.end(); + iter++) + if ((*iter).second.targetbackend == (int)b) + ret++; + Mutex::unlock(&store); + + return ret; +} + + diff --git a/xr/ipstore/clear.cc b/xr/ipstore/clear.cc @@ -0,0 +1,9 @@ +#include "ipstore" + +void IPStore::clear(struct in_addr clientip) { + debugmsg(Mstr("Erasing IP entry of ") + + Mstr(inet_ntoa(clientip)) + "\n"); + Mutex::lock(&store); + store.erase(clientip); + Mutex::unlock(&store); +} diff --git a/xr/ipstore/ipstore b/xr/ipstore/ipstore @@ -0,0 +1,41 @@ +#ifndef _IPSTORE_ +#define _IPSTORE_ + +#include "sys/sys" +#include "config/config" +#include "timestamp/timestamp" +#include "balancer/balancer" +#include "ThreadsAndMutexes/mutex/mutex" + +class IPStore { +public: + struct ClientData { + int targetbackend; + time_t lastaccess; + }; + + struct ClientDataCmp { + bool operator() (struct in_addr a, struct in_addr b) const { + long la, lb; + memcpy (&la, &a, sizeof(long)); + memcpy (&lb, &b, sizeof(long)); + return (la - lb) < 0; + } + }; + + typedef map<struct in_addr, ClientData, ClientDataCmp> StoreMap; + + static int target(struct in_addr clientip); + static void activity(struct in_addr clientip, unsigned curbackend); + static unsigned anticipated(unsigned bckend); + static void clear(struct in_addr clientip); + + static void on() { onoff = true; } + static void off() { onoff = false; } + +private: + static StoreMap store; + static bool onoff; +}; + +#endif diff --git a/xr/ipstore/target.cc b/xr/ipstore/target.cc @@ -0,0 +1,45 @@ +#include "ipstore" + +IPStore::StoreMap IPStore::store; +bool IPStore::onoff = false; + +int IPStore::target(struct in_addr clientip) { + time_t now = time(0); + + // Weed out store. Done first, because the store should be up to date + // for some decisions below. + bool done = false; + Mutex::lock(&store); + while (!done) { + done = true; + for (StoreMap::iterator iter = store.begin(); + iter != store.end(); + iter++) { + if (now - ((*iter).second.lastaccess) > config.ipstoretimeout()) { + if (config.debug()) { + done = false; + Timestamp tm((*iter).second.lastaccess); + debugmsg (Mstr(inet_ntoa(iter->first)) + + Mstr(" visited on ") + tm.desc() + ", erasing\n"); + } + store.erase(iter); + break; + } + } + } + Mutex::unlock(&store); + + // Let's see if we know the client. + if (store.count(clientip) > 0) { + if (config.verbose()) { + Timestamp tm(store[clientip].lastaccess); + msg(Mstr("Client IP ") + Mstr(inet_ntoa(clientip)) + + " last visited on " + tm.desc() + " and went to " + + balancer.backend(store[clientip].targetbackend).description() + + "\n"); + } + return store[clientip].targetbackend; + } + return -1; +} + diff --git a/xr/netbuffer/netread.cc b/xr/netbuffer/netread.cc @@ -6,7 +6,8 @@ unsigned Netbuffer::netread (int fd, int timeout) { if (timeout) { Fdset set(timeout); set.add(fd); - if (set.readable() != fd) { + set.wait_r(); + if (! set.readable(fd)) { ostringstream o; o << "Fd " << fd << " failed to become readable within " << int(timeout) << " sec"; diff --git a/xr/netbuffer/netwrite.cc b/xr/netbuffer/netwrite.cc @@ -33,7 +33,8 @@ unsigned Netbuffer::netwrite (int fd, int timeout) const { if (timeout) { Fdset set (timeout); set.add (fd); - if (set.writeable() != fd) { + set.wait_w(); + if (! set.writeable(fd)) { ostringstream o; o << "Fd " << fd << " failed to become writable within " << timeout << " sec"; diff --git a/xr/webinterface/answerstatus.cc b/xr/webinterface/answerstatus.cc @@ -103,7 +103,7 @@ void Webinterface::answer_status() { " <live>" << balancer.backend(i).livestr() << "</live>\n" " <available>" << balancer.backend(i).availablestr() << "</available>\n" " <connections>" << balancer.backend(i).connections() << "</connections>\n" - " <anticipated>" << balancer.backend(i).anticipated() << "</anticipated>\n" + " <connecterrors>" << balancer.backend(i).connecterrors() << "</connecterrors>\n" " <bytesserved>" << balancer.backend(i).bytesserved() << "</bytesserved>\n" " <clientsserved>" << balancer.backend(i).clientsserved() << "</clientsserved>\n" " <hostmatch>" << balancer.backend(i).hostmatch() << "</hostmatch>\n" diff --git a/xr/webinterface/execute.cc b/xr/webinterface/execute.cc @@ -25,7 +25,8 @@ void Webinterface::execute() { try { Fdset fdset(0); fdset.add (sfd); - if (fdset.readable() == sfd) { + fdset.wait_r(); + if (fdset.readable(sfd)) { int size; struct sockaddr_in clname; if ( (cfd = accept (sfd, (struct sockaddr *) &clname, diff --git a/xrctl/xrctl b/xrctl/xrctl @@ -104,10 +104,14 @@ if ($cmd eq 'list') { cmd_start(@ARGV); } elsif ($cmd eq 'stop') { cmd_stop(@ARGV); +} elsif ($cmd eq 'kill') { + cmd_kill(@ARGV); } elsif ($cmd eq 'force') { cmd_force(@ARGV); -} elsif ($cmd eq 'restart') { - cmd_restart(@ARGV); +} elsif ($cmd eq 'stopstart') { + cmd_stopstart(@ARGV); +} elsif ($cmd eq 'killstart') { + cmd_killstart(@ARGV); } elsif ($cmd eq 'status') { cmd_status(@ARGV); } elsif ($cmd eq 'rotate') { @@ -153,12 +157,27 @@ sub cmd_stop { push (@pids, @p); } for my $p (@pids) { - msg ("About to kill PID: '$p'\n"); + msg ("About to stop PID: '$p'\n"); } kill (15, @pids) if ($#pids > -1); print ("Services @_: stopped\n"); } +sub cmd_kill { + my @pids; + for my $s (@_) { + my @p = is_running($s) + or die ("Cannot stop service $s, not running\n"); + print ("Service $s: running at @p\n"); + push (@pids, @p); + } + for my $p (@pids) { + msg ("About to kill PID: '$p'\n"); + } + kill (9, @pids) if ($#pids > -1); + print ("Services @_: killed\n"); +} + sub cmd_force { for my $s (@_) { print ("Service $s: "); @@ -171,11 +190,11 @@ sub cmd_force { } } -sub cmd_restart { +sub cmd_stopstart { my @pids; for my $s (@_) { my @p = is_running($s) - or die ("Cannot restart service $s, not running\n"); + or die ("Cannot stopstart service $s, not running\n"); push (@pids, @p); } print ("Service(s) @_: "); @@ -188,6 +207,23 @@ sub cmd_restart { } } +sub cmd_killstart { + my @pids; + for my $s (@_) { + my @p = is_running($s) + or die ("Cannot killstart service $s, not running\n"); + push (@pids, @p); + } + print ("Service(s) @_: "); + kill (9, @pids) if ($#pids > -1); + print ("killed\n"); + for my $s (@_) { + print ("Service $s: "); + start_service($s); + print ("started\n"); + } +} + sub cmd_status { for my $s (@_) { print ("Service $s: "); @@ -292,15 +328,17 @@ Actions are: configtest builds invocations from the configuration file and validates them list shows the xr command line start starts the service(s) if they are not yet running - stop stops the service(s) if they are running + stop gracefully stops the service(s) if they are running + kill brutally kills the service(s), interrupting all connections force forces the service(s) up: starts if not running - restart restarts the service(s) if they are running + stopstart gracefully restarts the service(s) if they are running + killstart brutally restarts status shows which services are running rotate rotates logs of the service(s) generateconfig queries running XR's for the current configuration and shows it in the format of $default_conf -Services are the services stated in the configuration. When absent, all -named services are handled. +The SERVICES following an action are the services stated in the configuration. +When absent, all configured services are handled. ENDUSAGE }