/// User should not create this object. TcpConnection(EventLoop* loop, const string& name, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr);
voidTcpServer::newConnection(int sockfd, const InetAddress& peerAddr) { // FIXME poll with zero timeout to double confirm the new connection // FIXME use make_shared if necessary TcpConnectionPtr conn( new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr) ); }
// void send(string&& message); // C++11 voidsend(constvoid* message, int len); voidsend(const StringPiece& message); // void send(Buffer&& message); // C++11 voidsend(Buffer* message); // this one will swap data voidshutdown(); // NOT thread safe, no simultaneous calling // void shutdownAndForceCloseAfter(double seconds); // NOT thread safe, no simultaneous calling voidforceClose(); voidforceCloseWithDelay(double seconds); voidsetTcpNoDelay(bool on); // reading or not voidstartRead(); voidstopRead(); boolisReading()const{ return reading_; }; // NOT thread safe, may race with start/stopReadInLoop
voidTcpConnection::forceClose() { // FIXME: use compare and swap if (state_ == kConnected || state_ == kDisconnecting) { setState(kDisconnecting); loop_->queueInLoop(std::bind(&TcpConnection::forceCloseInLoop, shared_from_this())); } }
voidTcpConnection::forceCloseInLoop() { loop_->assertInLoopThread(); if (state_ == kConnected || state_ == kDisconnecting) { // as if we received 0 byte in handleRead(); handleClose(); } }
voidTcpConnection::handleClose() { loop_->assertInLoopThread(); LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString(); assert(state_ == kConnected || state_ == kDisconnecting); // we don't close fd, leave it to dtor, so we can find leaks easily. setState(kDisconnected); channel_->disableAll(); TcpConnectionPtr guardThis(shared_from_this()); // 由于connectionCallback_可能析构,所以这里加shared_ptr guard connectionCallback_(guardThis); // must be the last line closeCallback_(guardThis); }
/// TCP server, supports single-threaded and thread-pool models. /// /// This is an interface class, so don't expose too much details. classTcpServer : noncopyable { public: typedef std::function<void(EventLoop*)> ThreadInitCallback; enumOption { kNoReusePort, kReusePort, };
voidConnector::startInLoop() { loop_->assertInLoopThread(); assert(state_ == kDisconnected); if (connect_) { connect(); } else { LOG_DEBUG << "do not connect"; } }
voidConnector::connect() { int sockfd = sockets::createNonblockingOrDie(serverAddr_.family()); int ret = sockets::connect(sockfd, serverAddr_.getSockAddr()); int savedErrno = (ret == 0) ? 0 : errno; switch (savedErrno) { case0: case EINPROGRESS: case EINTR: case EISCONN: connecting(sockfd); break;
case EAGAIN: case EADDRINUSE: case EADDRNOTAVAIL: case ECONNREFUSED: case ENETUNREACH: retry(sockfd); break;
case EACCES: case EPERM: case EAFNOSUPPORT: case EALREADY: case EBADF: case EFAULT: case ENOTSOCK: LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno; sockets::close(sockfd); break;
InetAddress localAddr(sockets::getLocalAddr(sockfd)); // FIXME poll with zero timeout to double confirm the new connection // FIXME use make_shared if necessary TcpConnectionPtr conn(new TcpConnection(loop_, connName, sockfd, localAddr, peerAddr));