crossroads

Git mirror of https://crossroads.e-tunity.com/
git clone git://git.finwo.net/app/crossroads
Log | Files | Refs | LICENSE

serve.cc (5735B)


      1 #include "balancer"
      2 #include "Dispatchers/tcpdispatcher/tcpdispatcher"
      3 #include "Dispatchers/httpdispatcher/httpdispatcher"
      4 
      5 // #define SHOWDEBUG
      6 
      7 void Balancer::serve() {
      8     // Start up wakeup/checkup handlers. These are always started - even
      9     // when config.wakeupsec() and config.checkupsec() are not defined
     10     // and have value 0. Via the web interface, the values can be later
     11     // changed, but we want to have the checkers running always.
     12     if (!config.foregroundmode() && config.sport()) {
     13 	msg("Starting wakeup thread.\n");
     14 	Wakeupthread *wt = new Wakeupthread();
     15 	if (!wt)
     16 	    throw Error("Memory fault in Balancer::serve");
     17 	wt->start();
     18 	
     19 	msg("Starting checkup thread.\n");
     20 	Checkupthread *ct = new Checkupthread();
     21 	if (!ct)
     22 	    throw Error("Memory fault in Balancer::serve");
     23 	ct->start();
     24     }
     25 
     26     // Write the PID file.
     27     if (config.pidfile() != "") {
     28 	FILE *f;
     29 	if (! (f = fopen(config.pidfile().c_str(), "w")) )
     30 	    throw Error(string("Cannot write pid file ") +
     31 			config.pidfile() + ": " + strerror(errno));
     32 	fprintf(f, "%u\n", getpid());
     33 	fclose(f);
     34     }
     35 
     36     // Wait for activity, serve it.
     37     msg("Awaiting activity on fd " << server_fd.fd() << '\n');
     38     MEM(Memory::mem_mark("Balancer start"));
     39     MEM(Memory::mem_follow(true));
     40     while (true) {
     41 	MEM(Memory::mem_display());
     42 	Fdset fdset(0);
     43 	fdset.add(server_fd);
     44 	fdset.wait_r();
     45 
     46 	if (! fdset.readable(server_fd)) {
     47 	    // We caught a signal. Either a request to report status,
     48 	    // or to terminate.
     49 	    msg("Interrupt seen\n");
     50 	    if (terminate()) {
     51 		msg("Termination requested, XR will stop.\n");
     52 		break;
     53 	    } else if (report()) {
     54 		msg("Report requested\n");
     55 		reportmsg("*** XR STATUS REPORT STARTS ***\n");
     56 		for (unsigned i = 0; i < nbackends(); i++) {
     57 		    reportmsg("Back end " << i << ": " <<
     58 			      backend(i).description() << ", weight " <<
     59 			      backend(i).weight() << '\n');
     60 		    reportmsg("  Status: " << 
     61 			      backend(i).availablestr() << ", " <<
     62 			      backend(i).livestr() << '\n');
     63 		    reportmsg("  Connections: " << 
     64 			      backend(i).connections() <<
     65 			      " (max " << backend(i).maxconn() << ")\n");
     66 		    reportmsg("  Served:" << backend(i).bytesserved() <<
     67 			      " bytes, " << backend(i).clientsserved() <<
     68 			      " clients\n");
     69 		}
     70 		report(false);
     71 		reportmsg("*** XR STATUS REPORT ENDS ***\n");
     72 		continue;
     73 	    } else if (restart()) {
     74 		msg("Restart requested\n");
     75 		config.restart();
     76 	    } else {
     77 		msg("Non-meaningful interrupt or select timeout, "
     78 		     "resuming\n");
     79 		continue;
     80 	    }
     81 	}
     82 
     83 	// Got activity! Check total # of connections.
     84 	msg("Got activity on fd " << server_fd.fd() << '\n');
     85 	request_nr++;
     86 	if (config.maxconn() && connections() >= config.maxconn()) {
     87 	    msg("Not serving connection: already " <<  connections() <<
     88 		 " connection(s) (max " << config.maxconn() << ")\n");
     89 	    continue;
     90 	}
     91 
     92 	if (server_fd.fd()) {
     93 	    // In daemon mode (server_fd > 0): Accept, serve and loop again
     94 	    Socket clsock;
     95 	    try {
     96 		clsock = server_fd.accept();
     97 	    } catch (Error const &e) {
     98 		warnmsg(e.what() << '\n');
     99 		clsock.close();
    100 		continue;
    101 	    }
    102 	    
    103 	    msg("Accepted connection from " <<
    104 		inet2string(clsock.clientaddr().sin_addr) <<
    105 		" as client fd " << clsock.fd() << '\n');	
    106 
    107 	    // Show how we look
    108 	    if (config.verbose()) {
    109 		ostringstream o;
    110 		msg("Balancer is serving " << connections() << " clients\n");
    111 		msg("Current back end states:\n");
    112 		for (unsigned i = 0; i < nbackends(); i++) {
    113 		    msg("  Back end " << backend(i).description() << ": " <<
    114 			backend(i).connections()  << " connections, max " <<
    115 			backend(i).maxconn() << ", status " <<
    116 			backend(i).availablestr()  << ", anticipated " <<
    117 			IPStore::anticipated(i) << '\n');
    118 		}
    119 	    }
    120 
    121 	    Dispatcher *d;
    122 	    switch (config.stype()) {
    123 	    case Servertype::t_tcp:
    124 		d = new TcpDispatcher(clsock);
    125 		break;
    126 	    case Servertype::t_http:
    127 		d = new HttpDispatcher(clsock);
    128 		break;
    129 	    default:
    130 		throw Error("Internal error, can't choose dispatcher");
    131 		break;
    132 	    }
    133 
    134 	    if (!d)
    135 		throw Error("Memory fault: cannot instantiate dispatcher\n");
    136 
    137 	    // Allocation boundary printout
    138 	    if (config.debug()) {
    139 		void *mem = malloc(16);
    140 		free(mem);
    141 		debugmsg("Allocation boundary at dispatcher start: " <<
    142 			 mem << '\n');
    143 	    }
    144 	    #ifdef SHOWDEBUG
    145 	    void *mem = malloc(16);
    146 	    free(mem);
    147 	    cout << "XR allocation at dispatcher start: " << mem << '\n';
    148 	    #endif
    149 
    150 	    d->start();
    151 	} else {
    152 	    // If fd-serving, serve and close. Don't thread it up.
    153 	    TcpDispatcher *d;
    154 	    
    155 	    switch (config.stype()) {
    156 	    case Servertype::t_tcp:
    157 		d = new TcpDispatcher(server_fd);
    158 		break;
    159 	    case Servertype::t_http:
    160 		d = new HttpDispatcher(server_fd);
    161 		break;
    162 	    default:
    163 		throw Error("Internal error, can't choose dispatcher");
    164 		break;
    165 	    }
    166 	    if (!d)
    167 		throw Error("Memory fault in Balancer::serve");
    168 	    d->execute();
    169 	    break;
    170 	}
    171 
    172 	// If we exceed the max # of requests, stop..
    173 	if (config.quitafter()) {
    174 	    msg("Request " << requestnr() << " underway of max " <<
    175 		 config.quitafter() << '\n');
    176 	    if (requestnr() >=  (long)config.quitafter()) {
    177 		msg("Max requests served, will stop.\n");
    178 		break;
    179 	    }
    180 	}
    181     }
    182 
    183     // We're stopping now. If a PID stamp was created, remove it.
    184     if (config.pidfile() != "")
    185 	unlink(config.pidfile().c_str());
    186 
    187     // Wait for running threads to die off.
    188     delete webinterface;
    189     unsigned prev_conn = 0x19081962;
    190     while (1) {
    191 	unsigned curr_conn = balancer.connections();
    192 	if (!curr_conn)
    193 	    break;
    194 	if (curr_conn != prev_conn) {
    195 	    msg("There are still " << curr_conn << " connections\n");
    196 	    prev_conn = curr_conn;
    197 	}
    198 	sleep(1);
    199     }
    200     msg("XR is idle, stopping.\n");
    201 }