/* * Copyright (c) 2023 Fancy Code. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "TcpServerImpl.h" #include "ILog.h" #include "TcpServerHandle.h" static void on_close(hio_t *io) { LogInfo("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io)); TcpServerImpl *server = (TcpServerImpl *)hevent_userdata(io); server->RemoveClient(io); } static void server_on_close(hio_t *io) { LogInfo("server_on_close fd=%d error=%d\n", hio_fd(io), hio_error(io)); TcpServerImpl *server = (TcpServerImpl *)hevent_userdata(io); server->Closed(); } static void on_recv(hio_t *io, void *buf, int readbytes) { LogInfo("on_recv fd=%d readbytes=%d\n", hio_fd(io), readbytes); char localaddrstr[SOCKADDR_STRLEN] = {0}; char peeraddrstr[SOCKADDR_STRLEN] = {0}; LogInfo( "[%s] <=> [%s]\n", SOCKADDR_STR(hio_localaddr(io), localaddrstr), SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); LogInfo("< %.*s", readbytes, (char *)buf); TcpServerImpl *server = (TcpServerImpl *)hevent_userdata(io); std::shared_ptr *client = server->GetClient(io); (*client)->Readed((const char *)buf, readbytes); } static void on_accept(hio_t *io) { LogInfo("on_accept connfd=%d\n", hio_fd(io)); char localaddrstr[SOCKADDR_STRLEN] = {0}; char peeraddrstr[SOCKADDR_STRLEN] = {0}; LogInfo("accept connfd=%d [%s] <= [%s]\n", hio_fd(io), SOCKADDR_STR(hio_localaddr(io), localaddrstr), SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); hio_setcb_close(io, on_close); hio_setcb_read(io, on_recv); // std::shared_ptr *client = NewTcpClientAccept(io); TcpServerImpl *server = (TcpServerImpl *)hevent_userdata(io); server->AddClient(io); hio_read_start(io); } TcpClientAcceptImpl::TcpClientAcceptImpl(const hio_t *io, const void *object, const ClientAcceptParam ¶m) : mIo(io), mObjectThis(object), mParam(param) { } void TcpClientAcceptImpl::Close(void) { if (nullptr != mIo) { hio_t *io = (hio_t *)mIo; hio_close(io); } } void TcpClientAcceptImpl::Readed(const void *data, size_t length) { if (nullptr != mParam.mReadFunc) { mParam.mReadFunc(data, length, mObjectThis); return; } LogWarning("mParam.mClosedFunc is null\n"); } ssize_t TcpClientAcceptImpl::Write(const void *data, size_t length) { if (mIo) { hio_t *io = (hio_t *)mIo; return hio_write(io, data, length); } LogError("mIo is null\n"); return TCP_MODULE_WRITE_ERROR; } void TcpClientAcceptImpl::Closed(void) { if (nullptr != mParam.mClosedFunc) { mParam.mClosedFunc(mObjectThis); return; } LogWarning("mParam.mClosedFunc is null\n"); } TcpServerImpl::TcpServerImpl(const TcpServerParam param) : mParam(param) { mLoop = nullptr; mIo = nullptr; } void TcpServerImpl::Init(void) { constexpr int NO_FALGS = 0; mLoop = hloop_new(NO_FALGS); if (nullptr == mLoop) { LogError("hloop_new failed\n"); return; } hio_t *listenio = hloop_create_tcp_server(mLoop, mParam.mIp, mParam.mPort, on_accept); if (nullptr == listenio) { LogError("hloop_create_tcp_server failed\n"); return; } LogInfo("listenfd=%d\n", hio_fd(listenio)); hevent_set_userdata(listenio, this); mIo = listenio; hio_setcb_close(mIo, server_on_close); std::shared_ptr server = shared_from_this(); std::shared_ptr impl = std::dynamic_pointer_cast(server); auto recvThread = [](std::shared_ptr tcpServer) { tcpServer->Loop(); }; mTcpServerThread = std::thread(recvThread, impl); } void TcpServerImpl::UnInit(void) { LogInfo("UnInit TcpServerImpl\n"); FreeClients(); if (nullptr != mIo) { hio_close(mIo); mIo = nullptr; } if (mTcpServerThread.joinable()) { mTcpServerThread.join(); } } void TcpServerImpl::Loop(void) { if (nullptr == mLoop) { LogError("mLoop is null\n"); return; } hloop_run(mLoop); hloop_free(&mLoop); mLoop = nullptr; } void TcpServerImpl::AddClient(hio_t *io) { mMutex.lock(); char localaddrstr[SOCKADDR_STRLEN] = {0}; char peeraddrstr[SOCKADDR_STRLEN] = {0}; LogInfo("accept connfd=%d [%s] <= [%s]\n", hio_fd(io), SOCKADDR_STR(hio_localaddr(io), localaddrstr), SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); std::shared_ptr *addClient = NewTcpClientAccept(io, mParam.mClientAcceptParam); mClients[hio_fd(io)] = addClient; if (mParam.mAcceptClientFunc) { if (mParam.mAcceptClientFunc(addClient, peeraddrstr) == true) { mMutex.unlock(); return; } LogWarning("User did not accept client.\n"); } mMutex.unlock(); hio_close(io); LogWarning("AddClient failed.\n"); } std::shared_ptr *TcpServerImpl::GetClient(hio_t *io) { std::lock_guard locker(mMutex); auto it = mClients.find(hio_fd(io)); if (it != mClients.end()) { return it->second; } else { LogError("GetClient failed, client not exit.\n"); return nullptr; } } void TcpServerImpl::RemoveClient(hio_t *io) { std::lock_guard locker(mMutex); auto it = mClients.find(hio_fd(io)); if (it != mClients.end()) { void *object = (void *)it->second; if (TcpClientAcceptObjectCheck(object) == true) { (*(std::shared_ptr *)object)->Closed(); (*(std::shared_ptr *)object).reset(); free(((char *)object) - sizeof(ITcpServerHeader)); } mClients.erase(it); } else { LogError("RemoveClient failed, client not exit.\n"); } } void TcpServerImpl::FreeClients(void) { std::lock_guard locker(mMutex); for (auto &client : mClients) { if (nullptr != client.second) { void *object = (void *)client.second; if (TcpClientAcceptObjectCheck(object) == true) { (*(std::shared_ptr *)object)->Closed(); (*(std::shared_ptr *)object).reset(); free(((char *)object) - sizeof(ITcpServerHeader)); } } } mClients.clear(); } void TcpServerImpl::Closed(void) { if (nullptr != mLoop) { LogInfo("Stop loop.\n"); hloop_stop(mLoop); } } std::shared_ptr *NewTcpServer(const TcpServerParam ¶m) { LogInfo("Create tcp server object.\n"); TcpServer *impl = (TcpServer *)malloc(sizeof(TcpServer)); if (nullptr == impl) { LogError("NewTcpServer::malloc failed.\n"); return nullptr; } TcpServer tmp; memcpy((void *)impl, (void *)&tmp, sizeof(TcpServer)); impl->mHeader.mCheckName = GetTcpServerModuleName(); impl->mTcpServer = std::make_shared(param); return (std::shared_ptr *)(((char *)impl) + sizeof(ITcpServerHeader)); } std::shared_ptr *NewTcpClientAccept(const hio_t *io, const ClientAcceptParam ¶m) { LogInfo("Create tcp server object.\n"); TcpClientAccept *impl = (TcpClientAccept *)malloc(sizeof(TcpClientAccept)); if (nullptr == impl) { LogError("NewTcpServer::malloc failed.\n"); return nullptr; } TcpClientAccept tmp; memcpy((void *)impl, (void *)&tmp, sizeof(TcpClientAccept)); impl->mHeader.mCheckName = GetTcpClientAcceptName(); std::shared_ptr *objectThis = (std::shared_ptr *)(((char *)impl) + sizeof(ITcpServerHeader)); impl->mTcpClientAccept = std::make_shared(io, objectThis, param); return objectThis; }