00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <libplumbing/config.h>
00019 #include <cerrno>
00020 #include <signal.h>
00021 #include <libexplain/select.h>
00022
00023 #include <libplumbing/reactor.h>
00024 #include <libplumbing/endpoint.h>
00025 #include <libplumbing/endpoint/functor.h>
00026 #include <libplumbing/logging.h>
00027 #include <libplumbing/time_value.h>
00028
00029
00030 plumbing::reactor::~reactor()
00031 {
00032 }
00033
00034
00035 void
00036 plumbing::reactor::fatal_error(const char *fmt, ...)
00037 {
00038 va_list ap;
00039 va_start(ap, fmt);
00040 fatal_error_v(fmt, ap);
00041 va_end(ap);
00042 }
00043
00044
00045 void
00046 plumbing::reactor::fatal_error_v(const char *fmt, va_list ap)
00047 {
00048 log().fatal_error_v(fmt, ap);
00049 }
00050
00051
00052 plumbing::reactor::reactor()
00053 {
00054
00055
00056
00057 signal(SIGPIPE, SIG_IGN);
00058
00059
00060
00061
00062 signal(SIGCHLD, SIG_IGN);
00063 }
00064
00065
00066 void
00067 plumbing::reactor::kill_suicidal_endpoints()
00068 {
00069 endpoints_t survivors;
00070 for
00071 (
00072 endpoints_t::iterator it = endpoints.begin();
00073 it != endpoints.end();
00074 ++it
00075 )
00076 {
00077 endpoint::pointer ep = *it;
00078 if (!ep->is_ready_to_die())
00079 survivors.push_back(ep);
00080 }
00081 endpoints.swap(survivors);
00082 }
00083
00084
00085 void
00086 plumbing::reactor::process(bool block)
00087 {
00088 fd_set readable;
00089 FD_ZERO(&readable);
00090 fd_set writable;
00091 FD_ZERO(&writable);
00092
00093
00094
00095
00096
00097 time_value maximum_sleep = at.get_maximum_sleep();
00098 int num_waiting = 0;
00099 for
00100 (
00101 endpoints_t::iterator it = endpoints.begin();
00102 it != endpoints.end();
00103 ++it
00104 )
00105 {
00106 endpoint::pointer ep = *it;
00107 assert(ep);
00108 int rfd = ep->get_read_file_descriptor();
00109 if (rfd >= 0)
00110 {
00111 FD_SET(rfd, &readable);
00112 ++num_waiting;
00113 }
00114 int wfd = ep->get_write_file_descriptor();
00115 if (wfd >= 0)
00116 {
00117 FD_SET(wfd, &writable);
00118 ++num_waiting;
00119 }
00120
00121 time_value dt = ep->get_maximum_sleep();
00122 if (maximum_sleep > dt)
00123 maximum_sleep = dt;
00124 }
00125
00126
00127
00128
00129
00130
00131 kill_suicidal_endpoints();
00132
00133
00134
00135
00136
00137 if (endpoints.empty() && at.empty())
00138 return;
00139
00140
00141
00142
00143
00144 time_value time_out(block ? maximum_sleep : time_value(0));
00145
00146
00147
00148
00149 int n = select(FD_SETSIZE, &readable, &writable, 0, &time_out);
00150 if (n < 0)
00151 {
00152 int err = errno;
00153 if (err != EINTR && err != EBADF)
00154 {
00155
00156
00157
00158
00159 log().fatal_error
00160 (
00161 "%s",
00162 explain_errno_select
00163 (
00164 err,
00165 FD_SETSIZE,
00166 &readable,
00167 &writable,
00168 0,
00169 &time_out
00170 )
00171 );
00172 }
00173 return;
00174 }
00175
00176
00177
00178
00179
00180
00181
00182
00183 if (n == 0)
00184 {
00185 for
00186 (
00187 endpoints_t::iterator it = endpoints.begin();
00188 it != endpoints.end();
00189 ++it
00190 )
00191 {
00192 endpoint::pointer ep = *it;
00193 ep->process_timeout();
00194 }
00195 }
00196 if (n > 0)
00197 {
00198
00199
00200
00201 for
00202 (
00203 endpoints_t::iterator it = endpoints.begin();
00204 it != endpoints.end();
00205 ++it
00206 )
00207 {
00208 endpoint::pointer ep = *it;
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218 int rfd = ep->get_read_file_descriptor();
00219 if (rfd >= 0 && FD_ISSET(rfd, &readable))
00220 {
00221 FD_CLR(rfd, &readable);
00222 ep->process_read();
00223 --n;
00224 }
00225 int wfd = ep->get_write_file_descriptor();
00226 if (wfd >= 0 && FD_ISSET(wfd, &writable))
00227 {
00228 FD_CLR(wfd, &writable);
00229 ep->process_write();
00230 --n;
00231 }
00232 if (n <= 0)
00233 break;
00234 }
00235 }
00236
00237
00238
00239
00240
00241
00242
00243
00244 at.process();
00245 }
00246
00247
00248 void
00249 plumbing::reactor::add_endpoint(const endpoint::pointer &sp)
00250 {
00251 endpoints.push_back(sp);
00252 }
00253
00254
00255 void
00256 plumbing::reactor::for_each_endpoint(endpoint_functor &func)
00257 {
00258 for
00259 (
00260 endpoints_t::iterator it = endpoints.begin();
00261 it != endpoints.end();
00262 ++it
00263 )
00264 {
00265 endpoint::pointer ep = *it;
00266 func(ep);
00267 }
00268 }
00269
00270
00271