hunting/utils/TcpModule/src/TcpServerImpl.cpp
2024-06-18 23:49:14 +08:00

257 lines
8.4 KiB
C++

/*
* Copyright (c) 2023 Fancy Code.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TcpServerImpl.h"
#include "ILog.h"
#include "ITcpServer.h"
#include "TcpModule.h"
#include "hloop.h"
#include "hsocket.h"
#include <cstddef>
#include <memory>
#include <mutex>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
static void on_close(hio_t *io)
{
LogInfo("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
TcpServerImpl *server = (TcpServerImpl *)hevent_userdata(io);
server->RemoveClient(io);
}
static void server_on_close(hio_t *io)
{
LogInfo("server_on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
TcpServerImpl *server = (TcpServerImpl *)hevent_userdata(io);
server->Closed();
}
static void on_recv(hio_t *io, void *buf, int readbytes)
{
LogInfo("on_recv fd=%d readbytes=%d\n", hio_fd(io), readbytes);
char localaddrstr[SOCKADDR_STRLEN] = {0};
char peeraddrstr[SOCKADDR_STRLEN] = {0};
LogInfo(
"[%s] <=> [%s]\n", SOCKADDR_STR(hio_localaddr(io), localaddrstr), SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
LogInfo("< %.*s", readbytes, (char *)buf);
TcpServerImpl *server = (TcpServerImpl *)hevent_userdata(io);
std::shared_ptr<ITcpClientAccept> *client = server->GetClient(io);
(*client)->Readed((const char *)buf, readbytes);
}
static void on_accept(hio_t *io)
{
LogInfo("on_accept connfd=%d\n", hio_fd(io));
char localaddrstr[SOCKADDR_STRLEN] = {0};
char peeraddrstr[SOCKADDR_STRLEN] = {0};
LogInfo("accept connfd=%d [%s] <= [%s]\n",
hio_fd(io),
SOCKADDR_STR(hio_localaddr(io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
hio_setcb_close(io, on_close);
hio_setcb_read(io, on_recv);
// std::shared_ptr<ITcpClientAccept> *client = NewTcpClientAccept(io);
TcpServerImpl *server = (TcpServerImpl *)hevent_userdata(io);
server->AddClient(io);
hio_read_start(io);
}
TcpClientAcceptImpl::TcpClientAcceptImpl(const hio_t *io, const void *object, const ClientAcceptParam &param)
: mIo(io), mObjectThis(object), mParam(param)
{
}
void TcpClientAcceptImpl::Close(void)
{
if (nullptr != mIo) {
hio_t *io = (hio_t *)mIo;
hio_close(io);
}
}
void TcpClientAcceptImpl::Readed(const void *data, size_t length)
{
if (nullptr != mParam.mReadFunc) {
mParam.mReadFunc(data, length, mObjectThis);
return;
}
LogWarning("mParam.mClosedFunc is null\n");
}
ssize_t TcpClientAcceptImpl::Write(const void *data, size_t length)
{
if (mIo) {
hio_t *io = (hio_t *)mIo;
return hio_write(io, data, length);
}
LogError("mIo is null\n");
return TCP_MODULE_WRITE_ERROR;
}
void TcpClientAcceptImpl::Closed(void)
{
if (nullptr != mParam.mClosedFunc) {
mParam.mClosedFunc(mObjectThis);
return;
}
LogWarning("mParam.mClosedFunc is null\n");
}
TcpServerImpl::TcpServerImpl(const TcpServerParam param) : mParam(param)
{
mLoop = nullptr;
mIo = nullptr;
}
void TcpServerImpl::Init(void)
{
constexpr int NO_FALGS = 0;
mLoop = hloop_new(NO_FALGS);
if (nullptr == mLoop) {
LogError("hloop_new failed\n");
return;
}
hio_t *listenio = hloop_create_tcp_server(mLoop, mParam.mIp, mParam.mPort, on_accept);
if (nullptr == listenio) {
LogError("hloop_create_tcp_server failed\n");
hloop_free(&mLoop);
mLoop = nullptr;
return;
}
LogInfo("listenfd=%d\n", hio_fd(listenio));
hevent_set_userdata(listenio, this);
mIo = listenio;
hio_setcb_close(mIo, server_on_close);
std::shared_ptr<ITcpServer> server = shared_from_this();
std::shared_ptr<TcpServerImpl> impl = std::dynamic_pointer_cast<TcpServerImpl>(server);
auto recvThread = [](std::shared_ptr<TcpServerImpl> tcpServer) {
tcpServer->Loop();
};
mTcpServerThread = std::thread(recvThread, impl);
}
void TcpServerImpl::UnInit(void)
{
LogInfo("UnInit TcpServerImpl\n");
FreeClients();
if (nullptr != mIo) {
hio_close(mIo);
mIo = nullptr;
}
if (mTcpServerThread.joinable()) {
mTcpServerThread.join();
}
}
void TcpServerImpl::Loop(void)
{
if (nullptr == mLoop) {
LogError("mLoop is null\n");
return;
}
hloop_run(mLoop);
hloop_free(&mLoop);
mLoop = nullptr;
}
void TcpServerImpl::AddClient(hio_t *io)
{
mMutex.lock();
char localaddrstr[SOCKADDR_STRLEN] = {0};
char peeraddrstr[SOCKADDR_STRLEN] = {0};
LogInfo("accept connfd=%d [%s] <= [%s]\n",
hio_fd(io),
SOCKADDR_STR(hio_localaddr(io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
std::shared_ptr<ITcpClientAccept> *addClient = NewTcpClientAccept(io, mParam.mClientAcceptParam);
mClients[hio_fd(io)] = addClient;
if (mParam.mAcceptClientFunc) {
if (mParam.mAcceptClientFunc(addClient, peeraddrstr) == true) {
mMutex.unlock();
return;
}
LogWarning("User did not accept client.\n");
}
mMutex.unlock();
hio_close(io);
LogWarning("AddClient failed.\n");
}
std::shared_ptr<ITcpClientAccept> *TcpServerImpl::GetClient(hio_t *io)
{
std::lock_guard<std::mutex> locker(mMutex);
auto it = mClients.find(hio_fd(io));
if (it != mClients.end()) {
return it->second;
}
LogError("GetClient failed, client not exit.\n");
return nullptr;
}
void TcpServerImpl::RemoveClient(hio_t *io)
{
std::lock_guard<std::mutex> locker(mMutex);
auto it = mClients.find(hio_fd(io));
if (it != mClients.end()) {
void *object = (void *)it->second;
if (TcpClientAcceptObjectCheck(object) == true) {
(*(std::shared_ptr<ITcpClientAccept> *)object)->Closed();
(*(std::shared_ptr<ITcpClientAccept> *)object).reset();
free(((char *)object) - sizeof(ITcpServerHeader));
}
mClients.erase(it);
}
else {
LogError("RemoveClient failed, client not exit.\n");
}
}
void TcpServerImpl::FreeClients(void)
{
std::lock_guard<std::mutex> locker(mMutex);
for (auto &client : mClients) {
if (nullptr != client.second) {
void *object = (void *)client.second;
if (TcpClientAcceptObjectCheck(object) == true) {
(*(std::shared_ptr<ITcpClientAccept> *)object)->Closed();
(*(std::shared_ptr<ITcpClientAccept> *)object).reset();
free(((char *)object) - sizeof(ITcpServerHeader));
}
}
}
mClients.clear();
}
void TcpServerImpl::Closed(void)
{
if (nullptr != mLoop) {
LogInfo("Stop loop.\n");
hloop_stop(mLoop);
}
}
std::shared_ptr<ITcpServer> *NewTcpServer(const TcpServerParam &param)
{
LogInfo("Create tcp server object.\n");
TcpServer *impl = (TcpServer *)malloc(sizeof(TcpServer));
if (nullptr == impl) {
LogError("NewTcpServer::malloc failed.\n");
return nullptr;
}
TcpServer tmp;
memcpy((void *)impl, (void *)&tmp, sizeof(TcpServer));
impl->mHeader.mCheckName = GetTcpServerModuleName();
impl->mTcpServer = std::make_shared<TcpServerImpl>(param);
return (std::shared_ptr<ITcpServer> *)(((char *)impl) + sizeof(ITcpServerHeader));
}
std::shared_ptr<ITcpClientAccept> *NewTcpClientAccept(const hio_t *io, const ClientAcceptParam &param)
{
LogInfo("Create tcp server object.\n");
TcpClientAccept *impl = (TcpClientAccept *)malloc(sizeof(TcpClientAccept));
if (nullptr == impl) {
LogError("NewTcpServer::malloc failed.\n");
return nullptr;
}
TcpClientAccept tmp;
memcpy((void *)impl, (void *)&tmp, sizeof(TcpClientAccept));
impl->mHeader.mCheckName = GetTcpClientAcceptName();
std::shared_ptr<ITcpClientAccept> *objectThis =
(std::shared_ptr<ITcpClientAccept> *)(((char *)impl) + sizeof(ITcpServerHeader));
impl->mTcpClientAccept = std::make_shared<TcpClientAcceptImpl>(io, objectThis, param);
return objectThis;
}