serve.cc (5735B)
1 #include "balancer" 2 #include "Dispatchers/tcpdispatcher/tcpdispatcher" 3 #include "Dispatchers/httpdispatcher/httpdispatcher" 4 5 // #define SHOWDEBUG 6 7 void Balancer::serve() { 8 // Start up wakeup/checkup handlers. These are always started - even 9 // when config.wakeupsec() and config.checkupsec() are not defined 10 // and have value 0. Via the web interface, the values can be later 11 // changed, but we want to have the checkers running always. 12 if (!config.foregroundmode() && config.sport()) { 13 msg("Starting wakeup thread.\n"); 14 Wakeupthread *wt = new Wakeupthread(); 15 if (!wt) 16 throw Error("Memory fault in Balancer::serve"); 17 wt->start(); 18 19 msg("Starting checkup thread.\n"); 20 Checkupthread *ct = new Checkupthread(); 21 if (!ct) 22 throw Error("Memory fault in Balancer::serve"); 23 ct->start(); 24 } 25 26 // Write the PID file. 27 if (config.pidfile() != "") { 28 FILE *f; 29 if (! (f = fopen(config.pidfile().c_str(), "w")) ) 30 throw Error(string("Cannot write pid file ") + 31 config.pidfile() + ": " + strerror(errno)); 32 fprintf(f, "%u\n", getpid()); 33 fclose(f); 34 } 35 36 // Wait for activity, serve it. 37 msg("Awaiting activity on fd " << server_fd.fd() << '\n'); 38 MEM(Memory::mem_mark("Balancer start")); 39 MEM(Memory::mem_follow(true)); 40 while (true) { 41 MEM(Memory::mem_display()); 42 Fdset fdset(0); 43 fdset.add(server_fd); 44 fdset.wait_r(); 45 46 if (! fdset.readable(server_fd)) { 47 // We caught a signal. Either a request to report status, 48 // or to terminate. 49 msg("Interrupt seen\n"); 50 if (terminate()) { 51 msg("Termination requested, XR will stop.\n"); 52 break; 53 } else if (report()) { 54 msg("Report requested\n"); 55 reportmsg("*** XR STATUS REPORT STARTS ***\n"); 56 for (unsigned i = 0; i < nbackends(); i++) { 57 reportmsg("Back end " << i << ": " << 58 backend(i).description() << ", weight " << 59 backend(i).weight() << '\n'); 60 reportmsg(" Status: " << 61 backend(i).availablestr() << ", " << 62 backend(i).livestr() << '\n'); 63 reportmsg(" Connections: " << 64 backend(i).connections() << 65 " (max " << backend(i).maxconn() << ")\n"); 66 reportmsg(" Served:" << backend(i).bytesserved() << 67 " bytes, " << backend(i).clientsserved() << 68 " clients\n"); 69 } 70 report(false); 71 reportmsg("*** XR STATUS REPORT ENDS ***\n"); 72 continue; 73 } else if (restart()) { 74 msg("Restart requested\n"); 75 config.restart(); 76 } else { 77 msg("Non-meaningful interrupt or select timeout, " 78 "resuming\n"); 79 continue; 80 } 81 } 82 83 // Got activity! Check total # of connections. 84 msg("Got activity on fd " << server_fd.fd() << '\n'); 85 request_nr++; 86 if (config.maxconn() && connections() >= config.maxconn()) { 87 msg("Not serving connection: already " << connections() << 88 " connection(s) (max " << config.maxconn() << ")\n"); 89 continue; 90 } 91 92 if (server_fd.fd()) { 93 // In daemon mode (server_fd > 0): Accept, serve and loop again 94 Socket clsock; 95 try { 96 clsock = server_fd.accept(); 97 } catch (Error const &e) { 98 warnmsg(e.what() << '\n'); 99 clsock.close(); 100 continue; 101 } 102 103 msg("Accepted connection from " << 104 inet2string(clsock.clientaddr().sin_addr) << 105 " as client fd " << clsock.fd() << '\n'); 106 107 // Show how we look 108 if (config.verbose()) { 109 ostringstream o; 110 msg("Balancer is serving " << connections() << " clients\n"); 111 msg("Current back end states:\n"); 112 for (unsigned i = 0; i < nbackends(); i++) { 113 msg(" Back end " << backend(i).description() << ": " << 114 backend(i).connections() << " connections, max " << 115 backend(i).maxconn() << ", status " << 116 backend(i).availablestr() << ", anticipated " << 117 IPStore::anticipated(i) << '\n'); 118 } 119 } 120 121 Dispatcher *d; 122 switch (config.stype()) { 123 case Servertype::t_tcp: 124 d = new TcpDispatcher(clsock); 125 break; 126 case Servertype::t_http: 127 d = new HttpDispatcher(clsock); 128 break; 129 default: 130 throw Error("Internal error, can't choose dispatcher"); 131 break; 132 } 133 134 if (!d) 135 throw Error("Memory fault: cannot instantiate dispatcher\n"); 136 137 // Allocation boundary printout 138 if (config.debug()) { 139 void *mem = malloc(16); 140 free(mem); 141 debugmsg("Allocation boundary at dispatcher start: " << 142 mem << '\n'); 143 } 144 #ifdef SHOWDEBUG 145 void *mem = malloc(16); 146 free(mem); 147 cout << "XR allocation at dispatcher start: " << mem << '\n'; 148 #endif 149 150 d->start(); 151 } else { 152 // If fd-serving, serve and close. Don't thread it up. 153 TcpDispatcher *d; 154 155 switch (config.stype()) { 156 case Servertype::t_tcp: 157 d = new TcpDispatcher(server_fd); 158 break; 159 case Servertype::t_http: 160 d = new HttpDispatcher(server_fd); 161 break; 162 default: 163 throw Error("Internal error, can't choose dispatcher"); 164 break; 165 } 166 if (!d) 167 throw Error("Memory fault in Balancer::serve"); 168 d->execute(); 169 break; 170 } 171 172 // If we exceed the max # of requests, stop.. 173 if (config.quitafter()) { 174 msg("Request " << requestnr() << " underway of max " << 175 config.quitafter() << '\n'); 176 if (requestnr() >= (long)config.quitafter()) { 177 msg("Max requests served, will stop.\n"); 178 break; 179 } 180 } 181 } 182 183 // We're stopping now. If a PID stamp was created, remove it. 184 if (config.pidfile() != "") 185 unlink(config.pidfile().c_str()); 186 187 // Wait for running threads to die off. 188 delete webinterface; 189 unsigned prev_conn = 0x19081962; 190 while (1) { 191 unsigned curr_conn = balancer.connections(); 192 if (!curr_conn) 193 break; 194 if (curr_conn != prev_conn) { 195 msg("There are still " << curr_conn << " connections\n"); 196 prev_conn = curr_conn; 197 } 198 sleep(1); 199 } 200 msg("XR is idle, stopping.\n"); 201 }