00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <libplumbing/config.h>
00019 #include <arpa/inet.h>
00020 #include <cassert>
00021 #include <cerrno>
00022 #include <cstring>
00023 #include <fcntl.h>
00024 #include <netdb.h>
00025 #include <sys/types.h>
00026 #include <sys/socket.h>
00027 #include <libexplain/connect.h>
00028 #include <libexplain/fcntl.h>
00029 #include <libexplain/getsockopt.h>
00030 #include <libexplain/read.h>
00031 #include <libexplain/socket.h>
00032 #include <libexplain/write.h>
00033
00034 #include <libplumbing/logging.h>
00035 #include <libplumbing/endpoint/client.h>
00036
00037
00038 plumbing::endpoint_client::~endpoint_client()
00039 {
00040 delete (struct sockaddr_in *)sock_addr;
00041 }
00042
00043
00044 plumbing::endpoint_client::endpoint_client(const std::string &a_hostname,
00045 int a_port) :
00046 endpoint(-1),
00047 hostname(a_hostname),
00048 port(a_port),
00049 sock_addr(0),
00050 sock_addr_size(0),
00051 connecting(false),
00052 retry_after(0)
00053 {
00054 char buf[100];
00055 snprintf(buf, sizeof(buf), "host \"%s\": port %d", hostname.c_str(), port);
00056 set_peer_name(buf);
00057 try_to_connect();
00058 }
00059
00060
00061 plumbing::endpoint_client::endpoint_client(const std::string &a_hostname,
00062 const std::string &a_port) :
00063 endpoint(-1),
00064 hostname(a_hostname),
00065 port(0),
00066 sock_addr(0),
00067 sock_addr_size(0),
00068 connecting(false),
00069 retry_after(0)
00070 {
00071 char buf[100];
00072 snprintf
00073 (
00074 buf,
00075 sizeof(buf),
00076 "host \"%s\": port \"%s\"",
00077 hostname.c_str(),
00078 a_port.c_str()
00079 );
00080 set_peer_name(buf);
00081
00082 port = parse_port_number_or_die(a_port);
00083
00084 try_to_connect();
00085 }
00086
00087
00088 void
00089 plumbing::endpoint_client::write(const std::string &text)
00090 {
00091 write_queue.push_back(text + "\n");
00092 }
00093
00094
00095 int
00096 plumbing::endpoint_client::get_write_file_descriptor()
00097 {
00098 try_to_connect();
00099 if (connecting)
00100 return fd;
00101 return (write_queue.empty() ? -1 : fd);
00102 }
00103
00104
00105 void
00106 plumbing::endpoint_client::process_write()
00107 {
00108 if (connecting)
00109 {
00110 assert(fd >= 0);
00111 connecting = false;
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131 int error = 0;
00132 socklen_t len = sizeof(error);
00133 if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0)
00134 {
00135 fatal_error
00136 (
00137 "%s",
00138 explain_getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len)
00139 );
00140 }
00141 errno = error;
00142 switch (errno)
00143 {
00144 case 0:
00145
00146 break;
00147
00148 case ECONNREFUSED:
00149 case ENETUNREACH:
00150
00151
00152
00153
00154 warning
00155 (
00156 "%s",
00157 explain_errno_connect
00158 (
00159 error,
00160 fd,
00161 (const struct sockaddr *)sock_addr,
00162 sock_addr_size
00163 )
00164 );
00165 close();
00166 kill_me_now();
00167 return;
00168
00169 default:
00170 fatal_error
00171 (
00172 "%s",
00173 explain_errno_connect
00174 (
00175 error,
00176 fd,
00177 (const struct sockaddr *)sock_addr,
00178 sock_addr_size
00179 )
00180 );
00181
00182 return;
00183 }
00184
00185
00186
00187
00188 int flags = fcntl(fd, F_GETFL, 0);
00189 if (flags < 0)
00190 fatal_error("%s", explain_fcntl(fd, F_GETFL, 0));
00191 flags &= ~O_NONBLOCK;
00192 int err = fcntl(fd, F_SETFL, flags);
00193 if (err < 0)
00194 fatal_error("%s", explain_fcntl(fd, F_SETFL, flags));
00195 }
00196
00197 if (fd < 0)
00198 return;
00199 if (write_queue.empty())
00200 return;
00201 std::string s = write_queue.front();
00202 ssize_t n = ::write(fd, s.c_str(), s.size());
00203 int err = errno;
00204 if (n != ssize_t(s.size()))
00205 {
00206
00207 warning("%s", explain_errno_write(err, fd, s.c_str(), s.size()));
00208 close();
00209 kill_me_now();
00210 return;
00211 }
00212 write_queue.pop_front();
00213 }
00214
00215
00216 int
00217 plumbing::endpoint_client::get_read_file_descriptor()
00218 {
00219 try_to_connect();
00220 if (connecting)
00221 return -1;
00222 return fd;
00223 }
00224
00225
00226 void
00227 plumbing::endpoint_client::process_read()
00228 {
00229 assert(fd >= 0);
00230 if (fd < 0)
00231 return;
00232
00233 char buffer[1 << 12];
00234 size_t nbytes = sizeof(buffer);
00235 ssize_t n = ::read(fd, buffer, nbytes);
00236 if (n == -1)
00237 {
00238 int err = errno;
00239 fatal_error("%s", explain_errno_read(err, fd, buffer, nbytes));
00240 }
00241 if (n == 0)
00242 {
00243
00244
00245
00246
00247
00248 warning("connection closed by peer");
00249 data_received(0, 0);
00250 close();
00251
00252
00253
00254
00255
00256
00257 return;
00258 }
00259
00260
00261
00262
00263 data_received(buffer, n);
00264 }
00265
00266
00267 void
00268 plumbing::endpoint_client::try_to_connect()
00269 {
00270 if (fd >= 0)
00271 return;
00272 if (connecting)
00273 return;
00274
00275
00276
00277
00278 time_t now;
00279 time(&now);
00280 if (now < retry_after)
00281 return;
00282 retry_after = now + 10;
00283
00284 if (!sock_addr)
00285 {
00286 struct hostent *he = gethostbyname(hostname.c_str());
00287 if (!he)
00288 {
00289 warning("gethostbyname: %s", hstrerror(h_errno));
00290 kill_me_now();
00291 return;
00292 }
00293
00294
00295
00296
00297 struct sockaddr_in *sain_p = new struct sockaddr_in;
00298 memset(sain_p, 0, sizeof(*sain_p));
00299 sain_p->sin_addr.s_addr = *(in_addr_t *)he->h_addr;
00300 sain_p->sin_family = AF_INET;
00301 sain_p->sin_port = htons(port);
00302
00303 sock_addr = sain_p;
00304 sock_addr_size = sizeof(*sain_p);
00305 }
00306
00307 connecting = true;
00308
00309
00310
00311
00312 fd = socket(AF_INET, SOCK_STREAM, 0);
00313 if (fd < 0)
00314 fatal_error("%s", explain_socket(AF_INET, SOCK_STREAM, 0));
00315
00316
00317
00318
00319 int flags = fcntl(fd, F_GETFL);
00320 if (flags < 0)
00321 fatal_error("%s", explain_fcntl(fd, F_GETFL, 0));
00322 flags |= O_NONBLOCK;
00323 int err = fcntl(fd, F_SETFL, flags);
00324 if (err < 0)
00325 fatal_error("%s", explain_fcntl(fd, F_SETFL, flags));
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340 err = connect(fd, (const struct sockaddr *)sock_addr, sock_addr_size);
00341
00342
00343
00344
00345
00346 if (err == 0)
00347 {
00348 connecting = false;
00349 return;
00350 }
00351
00352
00353
00354
00355 int error_num = errno;
00356 switch (error_num)
00357 {
00358 case EAGAIN:
00359 case EALREADY:
00360 case EINPROGRESS:
00361
00362
00363
00364 return;
00365
00366 case ECONNREFUSED:
00367 case ENETUNREACH:
00368
00369
00370
00371
00372 warning
00373 (
00374 "%s",
00375 explain_errno_connect
00376 (
00377 error_num,
00378 fd,
00379 (const struct sockaddr *)sock_addr,
00380 sock_addr_size
00381 )
00382 );
00383 connecting = false;
00384 close();
00385 kill_me_now();
00386 return;
00387
00388 default:
00389 break;
00390 }
00391
00392 fatal_error
00393 (
00394 "%s",
00395 explain_errno_connect
00396 (
00397 error_num,
00398 fd,
00399 (const struct sockaddr *)sock_addr,
00400 sock_addr_size
00401 )
00402 );
00403
00404 }
00405
00406
00407