diff --git a/middleware/McuManager/src/McuDevice.cpp b/middleware/McuManager/src/McuDevice.cpp index ae736e1..624efc9 100644 --- a/middleware/McuManager/src/McuDevice.cpp +++ b/middleware/McuManager/src/McuDevice.cpp @@ -79,8 +79,8 @@ const StatusCode McuDevice::UnInit(void) DeleteAllAsk(); return CreateStatusCode(STATUS_CODE_OK); } -size_t McuDevice::WriteData(const void *buff, const size_t buffLength, std::shared_ptr &context, - const unsigned int &serialNumber) +ssize_t McuDevice::WriteData(const void *buff, const size_t buffLength, std::shared_ptr &context, + const unsigned int &serialNumber) { constexpr size_t WRITE_ERROR = -1; size_t length = WRITE_ERROR; diff --git a/middleware/McuManager/src/McuDevice.h b/middleware/McuManager/src/McuDevice.h index 7cda792..6a44dc6 100644 --- a/middleware/McuManager/src/McuDevice.h +++ b/middleware/McuManager/src/McuDevice.h @@ -29,8 +29,8 @@ public: const StatusCode UnInit(void) override; public: - size_t WriteData(const void *buff, const size_t buffLength, std::shared_ptr &context, - const unsigned int &serialNumber) override; + ssize_t WriteData(const void *buff, const size_t buffLength, std::shared_ptr &context, + const unsigned int &serialNumber) override; public: void GetIpcMissionReply(const unsigned int &serialNumber, const unsigned char &mission) override; diff --git a/test/utils/TcpModule/src/TcpModule_Test.cpp b/test/utils/TcpModule/src/TcpModule_Test.cpp index 39a4ea3..99db5eb 100644 --- a/test/utils/TcpModule/src/TcpModule_Test.cpp +++ b/test/utils/TcpModule/src/TcpModule_Test.cpp @@ -19,33 +19,42 @@ #include namespace TcpModuleTest { -// ../output_files/test/bin/TcpModuleTest --gtest_filter=TcpModuleTest.UNIT_TcpModule_AUTO_IllegalObject +// ../output_files/test/bin/TcpModuleTest --gtest_filter=TcpModuleTest.UNIT_TcpModule_EXAMPLE_AUTO_IllegalObject /** * TcpModule module api will not crash when object is illegal. */ -TEST(TcpModuleTest, UNIT_TcpModule_AUTO_IllegalObject) +TEST(TcpModuleTest, UNIT_TcpModule_EXAMPLE_AUTO_IllegalObject) { static void *tcpClientAccept = nullptr; - TcpParam param = { + TcpServerParam tcpServerparam = { .mIp = "127.0.0.1", .mPort = 9876, - .mReadFunc = [](const void *p, const size_t len, void *context) -> void { - return; - }, .mAcceptClientFunc = [](void *object, const char *ip) -> void { LogInfo("accept client, peer ip: %s", ip); tcpClientAccept = object; return; }, }; + TcpClientParam param = { + .mIp = "127.0.0.1", + .mPort = 9876, + .mReadFunc = [](const void *data, const ssize_t len, const void *context) -> void { + LogInfo("read data: %s", (char *)data); + return; + }, + }; CreateLogModule(); ILogInit(LOG_INSTANCE_TYPE_END); - void *tcpServer = CreateTcpServer(param); + void *tcpServer = CreateTcpServer(tcpServerparam); std::this_thread::sleep_for(std::chrono::milliseconds(500)); void *tcpClient = CreateTcpClient(param); std::this_thread::sleep_for(std::chrono::milliseconds(100)); TcpClientWrite(tcpClient, "123456789", strlen("123456789")); - std::this_thread::sleep_for(std::chrono::milliseconds(3000)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + if (nullptr != tcpClientAccept) { + AcceptClientWrite(tcpClientAccept, "9876543210", strlen("9876543210")); + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); if (nullptr != tcpClient) { FreeTcpClient(tcpClient); } diff --git a/utils/McuProtocol/include/McuProtocol.h b/utils/McuProtocol/include/McuProtocol.h index 6ed055d..08b7ea8 100644 --- a/utils/McuProtocol/include/McuProtocol.h +++ b/utils/McuProtocol/include/McuProtocol.h @@ -68,8 +68,8 @@ protected: { return std::make_shared(); } - virtual size_t WriteData(const void *buff, const size_t buffLength, std::shared_ptr &context, - const unsigned int &serialNumber) + virtual ssize_t WriteData(const void *buff, const size_t buffLength, std::shared_ptr &context, + const unsigned int &serialNumber) { return 0; } diff --git a/utils/TcpModule/include/TcpModule.h b/utils/TcpModule/include/TcpModule.h index 2044b24..a8b605d 100644 --- a/utils/TcpModule/include/TcpModule.h +++ b/utils/TcpModule/include/TcpModule.h @@ -16,23 +16,36 @@ #define TCP_MODULE_H #include #include +#define TCP_MODULE_WRITE_ERROR -1; #ifdef __cplusplus extern "C" { #endif -typedef void (*TcpReadFunction)(const void *, const size_t, void *); +typedef void (*TcpReadFunction)(const void *, const ssize_t, const void *); typedef void (*TcpAcceptClientFunction)(void *, const char *); +typedef void (*SocketClosedFunction)(const void *); +typedef struct tcp_server_parm +{ + const char *mIp; + const int mPort; + TcpAcceptClientFunction mAcceptClientFunc; +} TcpServerParam; typedef struct tcp_parm { const char *mIp; const int mPort; - TcpReadFunction mReadFunc; // TODO: delete this memeber - TcpAcceptClientFunction mAcceptClientFunc; -} TcpParam; -void *CreateTcpServer(const TcpParam param); + TcpReadFunction mReadFunc; + SocketClosedFunction mClosedFunc; +} TcpClientParam; +typedef struct client_accept_parm +{ + TcpReadFunction mReadFunc; + SocketClosedFunction mClosedFunc; +} ClientAcceptParam; +void *CreateTcpServer(const TcpServerParam param); void FreeTcpServer(void *object); -int AcceptClientSetReadFunc(void *object, TcpReadFunction readFunc); -int AcceptClientWrite(void *object, void *buf, const size_t bufLenght); -void *CreateTcpClient(const TcpParam param); +void AcceptClientSetParam(void *object, const ClientAcceptParam param); +ssize_t AcceptClientWrite(void *object, const void *buf, const size_t bufLenght); +void *CreateTcpClient(const TcpClientParam param); void FreeTcpClient(void *object); ssize_t TcpClientWrite(void *object, const void *buf, const size_t bufLenght); #ifdef __cplusplus diff --git a/utils/TcpModule/src/ITcpClient.cpp b/utils/TcpModule/src/ITcpClient.cpp index 0e0ca32..256ab40 100644 --- a/utils/TcpModule/src/ITcpClient.cpp +++ b/utils/TcpModule/src/ITcpClient.cpp @@ -14,6 +14,7 @@ */ #include "ITcpClient.h" #include "ILog.h" +#include "TcpModule.h" #include void ITcpClient::Init(void) { @@ -21,9 +22,15 @@ void ITcpClient::Init(void) void ITcpClient::UnInit(void) { } +void ITcpClient::Readed(const void *data, size_t length) +{ +} ssize_t ITcpClient::Write(const void *buf, const size_t bufLenght) { - return WRITE_ERROR; + return TCP_MODULE_WRITE_ERROR; +} +void ITcpClient::Closed(void) +{ } static const char *TCP_CLIENT_NAME = "tcp_client"; const char *GetTcpClientModuleName(void) diff --git a/utils/TcpModule/src/ITcpClient.h b/utils/TcpModule/src/ITcpClient.h index 992174b..8948a45 100644 --- a/utils/TcpModule/src/ITcpClient.h +++ b/utils/TcpModule/src/ITcpClient.h @@ -16,7 +16,6 @@ #define I_TCP_CLIENT_H #include "StatusCode.h" #include -constexpr int WRITE_ERROR = -1; class ITcpClient { public: @@ -24,7 +23,9 @@ public: virtual ~ITcpClient() = default; virtual void Init(void); virtual void UnInit(void); + virtual void Readed(const void *data, size_t length); virtual ssize_t Write(const void *buf, const size_t bufLenght); + virtual void Closed(void); }; typedef struct i_tcp_client_header { diff --git a/utils/TcpModule/src/ITcpServer.cpp b/utils/TcpModule/src/ITcpServer.cpp index 4c8c0e2..f61b8c4 100644 --- a/utils/TcpModule/src/ITcpServer.cpp +++ b/utils/TcpModule/src/ITcpServer.cpp @@ -14,10 +14,21 @@ */ #include "ITcpServer.h" #include "ILog.h" +#include "TcpModule.h" #include +void ITcpClientAccept::SetParam(const ClientAcceptParam ¶m) +{ +} void ITcpClientAccept::Readed(const void *data, size_t length) { } +ssize_t ITcpClientAccept::Write(const void *data, size_t length) +{ + return TCP_MODULE_WRITE_ERROR; +} +void ITcpClientAccept::Closed(void) +{ +} void ITcpServer::Init(void) { } @@ -33,4 +44,16 @@ const char *GetTcpServerModuleName(void) const char *GetTcpClientAcceptName(void) { return TCP_CLIENT_ACCEPT_NAME; +} +bool TcpClientAcceptObjectCheck(void *object) +{ + if (nullptr == object) { + LogError("nullptr object!\n"); + return false; + } + if (*((const char **)(((char *)object) - sizeof(ITcpServerHeader))) != GetTcpClientAcceptName()) { + LogError("Illegal object!\n"); + return false; + } + return true; } \ No newline at end of file diff --git a/utils/TcpModule/src/ITcpServer.h b/utils/TcpModule/src/ITcpServer.h index 34a4cc7..5f72f50 100644 --- a/utils/TcpModule/src/ITcpServer.h +++ b/utils/TcpModule/src/ITcpServer.h @@ -15,13 +15,17 @@ #ifndef I_TCP_SERVER_H #define I_TCP_SERVER_H #include "StatusCode.h" +#include "TcpModule.h" #include class ITcpClientAccept { public: ITcpClientAccept() = default; virtual ~ITcpClientAccept() = default; + virtual void SetParam(const ClientAcceptParam ¶m); virtual void Readed(const void *data, size_t length); + virtual ssize_t Write(const void *data, size_t length); + virtual void Closed(void); }; class ITcpServer { @@ -47,4 +51,5 @@ typedef struct tcp_client_accept } TcpClientAccept; const char *GetTcpServerModuleName(void); const char *GetTcpClientAcceptName(void); +bool TcpClientAcceptObjectCheck(void *object); #endif \ No newline at end of file diff --git a/utils/TcpModule/src/TcpClientImpl.cpp b/utils/TcpModule/src/TcpClientImpl.cpp index 532ad74..a7c2b6f 100644 --- a/utils/TcpModule/src/TcpClientImpl.cpp +++ b/utils/TcpModule/src/TcpClientImpl.cpp @@ -15,38 +15,26 @@ #include "TcpClientImpl.h" #include "ILog.h" #include -TcpClientImpl::TcpClientImpl(const TcpParam param) : mParam(param) -{ -} static void on_message(hio_t *io, void *buf, int len) { LogInfo("onmessage: %.*s\n", len, (char *)buf); - TcpClientImpl *cli = (TcpClientImpl *)hevent_userdata(io); - // ... + TcpClientImpl *tcpClient = (TcpClientImpl *)hevent_userdata(io); + tcpClient->Readed(buf, len); } static void on_connect(hio_t *io) { LogInfo("onconnect: connfd=%d\n", hio_fd(io)); - TcpClientImpl *cli = (TcpClientImpl *)hevent_userdata(io); - // cli->connected = 1; - - // hio_write(io, "hello\r\n", 7); - hio_setcb_read(io, on_message); hio_read(io); } static void on_close(hio_t *io) { LogInfo("onclose: connfd=%d error=%d\n", hio_fd(io), hio_error(io)); - TcpClientImpl *cli = (TcpClientImpl *)hevent_userdata(io); - // cli->connected = 0; - // reconnect - // if (cli->reconn_setting && reconn_setting_can_retry(cli->reconn_setting)) { - // uint32_t delay = reconn_setting_calc_delay(cli->reconn_setting); - // LogInfo("reconnect cnt=%d, delay=%d ...\n", cli->reconn_setting->cur_retry_cnt, - // cli->reconn_setting->cur_delay); cli->reconn_timer = htimer_add(cli->mLoop, reconnect_timer_cb, delay, 1); - // hevent_set_userdata(cli->reconn_timer, cli); - // } + TcpClientImpl *tcpClient = (TcpClientImpl *)hevent_userdata(io); + tcpClient->Closed(); +} +TcpClientImpl::TcpClientImpl(const TcpClientParam ¶m, const void *object) : mParam(param), mObjectThis(object) +{ } void TcpClientImpl::Init(void) { @@ -85,14 +73,30 @@ void TcpClientImpl::UnInit(void) mLoop = nullptr; mIo = nullptr; } +void TcpClientImpl::Readed(const void *data, size_t length) +{ + if (nullptr != mParam.mReadFunc) { + mParam.mReadFunc(data, length, nullptr); + return; + } + LogError("mParam.mReadFunc is nullptr.\n"); +} ssize_t TcpClientImpl::Write(const void *buf, const size_t bufLenght) { if (nullptr == mIo) { LogError("mIo is nullptr.\n"); - return WRITE_ERROR; + return TCP_MODULE_WRITE_ERROR; } return hio_write(mIo, buf, bufLenght); } +void TcpClientImpl::Closed(void) +{ + if (nullptr != mParam.mClosedFunc) { + mParam.mClosedFunc(mObjectThis); + return; + } + LogWarning("mParam.mClosedFunc is null\n"); +} void TcpClientImpl::Loop(void) { if (nullptr == mLoop) { @@ -101,7 +105,7 @@ void TcpClientImpl::Loop(void) } hloop_run(mLoop); } -std::shared_ptr *NewTcpClient(const TcpParam param) +std::shared_ptr *NewTcpClient(const TcpClientParam ¶m) { LogInfo("Create tcp server object.\n"); TcpClient *impl = (TcpClient *)malloc(sizeof(TcpClient)); @@ -112,6 +116,8 @@ std::shared_ptr *NewTcpClient(const TcpParam param) TcpClient tmp; memcpy((void *)impl, (void *)&tmp, sizeof(TcpClient)); impl->mHeader.mCheckName = GetTcpClientModuleName(); - impl->mTcpClient = std::make_shared(param); - return (std::shared_ptr *)(((char *)impl) + sizeof(ITcpClientHeader)); + std::shared_ptr *objectThis = + (std::shared_ptr *)(((char *)impl) + sizeof(ITcpClientHeader)); + impl->mTcpClient = std::make_shared(param, objectThis); + return objectThis; } \ No newline at end of file diff --git a/utils/TcpModule/src/TcpClientImpl.h b/utils/TcpModule/src/TcpClientImpl.h index 9404a89..c84cde9 100644 --- a/utils/TcpModule/src/TcpClientImpl.h +++ b/utils/TcpModule/src/TcpClientImpl.h @@ -24,18 +24,21 @@ class TcpClientImpl : public ITcpClient, public std::enable_shared_from_this { public: - TcpClientImpl(const TcpParam param); + TcpClientImpl(const TcpClientParam ¶m, const void *object); virtual ~TcpClientImpl() = default; void Init(void) override; void UnInit(void) override; + void Readed(const void *data, size_t length) override; ssize_t Write(const void *buf, const size_t bufLenght) override; + void Closed(void) override; void Loop(void); private: - hloop_t* mLoop; + hloop_t *mLoop; hio_t *mIo; - const TcpParam mParam; + const TcpClientParam mParam; std::thread mTcpClientThread; + const void *mObjectThis; }; -std::shared_ptr *NewTcpClient(const TcpParam param); +std::shared_ptr *NewTcpClient(const TcpClientParam ¶m); #endif \ No newline at end of file diff --git a/utils/TcpModule/src/TcpModule.cpp b/utils/TcpModule/src/TcpModule.cpp index 939c2a4..ab28bc9 100644 --- a/utils/TcpModule/src/TcpModule.cpp +++ b/utils/TcpModule/src/TcpModule.cpp @@ -42,7 +42,7 @@ static bool TcpClientObjectCheck(void *object) } return true; } -void *CreateTcpServer(const TcpParam param) +void *CreateTcpServer(const TcpServerParam param) { std::shared_ptr *server = TcpModuleMakePtr::GetInstance()->CreateTcpServer(param); if (nullptr != *server) { @@ -58,7 +58,20 @@ void FreeTcpServer(void *object) free(((char *)object) - sizeof(ITcpServerHeader)); // TODO: bug? } } -void *CreateTcpClient(const TcpParam param) +void AcceptClientSetParam(void *object, const ClientAcceptParam param) +{ + if (TcpClientAcceptObjectCheck(object) == true) { + (*(std::shared_ptr *)object)->SetParam(param); + } +} +ssize_t AcceptClientWrite(void *object, const void *buf, const size_t bufLenght) +{ + if (TcpClientAcceptObjectCheck(object) == true) { + return (*(std::shared_ptr *)object)->Write(buf, bufLenght); + } + return TCP_MODULE_WRITE_ERROR; +} +void *CreateTcpClient(const TcpClientParam param) { std::shared_ptr *client = TcpModuleMakePtr::GetInstance()->CreateTcpClient(param); if (nullptr != *client) { diff --git a/utils/TcpModule/src/TcpModuleImpl.cpp b/utils/TcpModule/src/TcpModuleImpl.cpp index 85e5619..08e20a8 100644 --- a/utils/TcpModule/src/TcpModuleImpl.cpp +++ b/utils/TcpModule/src/TcpModuleImpl.cpp @@ -20,7 +20,7 @@ // { // return TCP_MODULE_NAME; // } -// void *NewTcpModuleImpl(const TcpParam &tcpParam) +// void *NewTcpModuleImpl(const TcpClientParam &tcpParam) // { // if (nullptr == tcpParam.mIp) { // LogError("Parament error, nullptr == tcpParam.mIp\n"); @@ -34,7 +34,7 @@ // impl->mTcpImpl = std::make_shared(tcpParam); // return (void *)(((char *)impl) + sizeof(TcpModuleHeader)); // } -// void *NewTcpServer(const TcpParam &tcpParam) +// void *NewTcpServer(const TcpClientParam &tcpParam) // { // return nullptr; // } \ No newline at end of file diff --git a/utils/TcpModule/src/TcpModuleImpl.h b/utils/TcpModule/src/TcpModuleImpl.h index 6b58fa6..3320ea4 100644 --- a/utils/TcpModule/src/TcpModuleImpl.h +++ b/utils/TcpModule/src/TcpModuleImpl.h @@ -25,14 +25,14 @@ // class TcpModuleImpl // { // public: -// TcpModuleImpl(const TcpParam &uatrInfo); +// TcpModuleImpl(const TcpClientParam &uatrInfo); // virtual ~TcpModuleImpl() = default; // private: // const StatusCode SetConfig(void); // private: -// const TcpParam mUatrInfo; +// const TcpClientParam mUatrInfo; // int mFd; // }; // TODO: There may be a CPU byte alignment bug. @@ -42,8 +42,8 @@ // TcpModuleHeader mHeader; // std::shared_ptr mTcpImpl; // } TcpServer; -// void *NewTcpModuleImpl(const TcpParam &tcpParam); -// void *NewTcpServer(const TcpParam &tcpParam); +// void *NewTcpModuleImpl(const TcpClientParam &tcpParam); +// void *NewTcpServer(const TcpClientParam &tcpParam); // static inline TcpServer *TcpModuleImplConvert(void *object) // { // return ((TcpServer *)(((char *)object) - sizeof(TcpModuleHeader))); diff --git a/utils/TcpModule/src/TcpModuleMakePtr.cpp b/utils/TcpModule/src/TcpModuleMakePtr.cpp index 0c0d2f5..975d350 100644 --- a/utils/TcpModule/src/TcpModuleMakePtr.cpp +++ b/utils/TcpModule/src/TcpModuleMakePtr.cpp @@ -14,8 +14,8 @@ */ #include "TcpModuleMakePtr.h" #include "ILog.h" -#include "TcpServerImpl.h" #include "TcpClientImpl.h" +#include "TcpServerImpl.h" std::shared_ptr &TcpModuleMakePtr::GetInstance(std::shared_ptr *impl) { static auto instance = std::make_shared(); @@ -24,11 +24,11 @@ std::shared_ptr &TcpModuleMakePtr::GetInstance(std::shared_ptr } return instance; } -std::shared_ptr *TcpModuleMakePtr::CreateTcpServer(const TcpParam param) +std::shared_ptr *TcpModuleMakePtr::CreateTcpServer(const TcpServerParam ¶m) { return NewTcpServer(param); } -std::shared_ptr *TcpModuleMakePtr::CreateTcpClient(const TcpParam param) +std::shared_ptr *TcpModuleMakePtr::CreateTcpClient(const TcpClientParam ¶m) { return NewTcpClient(param); } \ No newline at end of file diff --git a/utils/TcpModule/src/TcpModuleMakePtr.h b/utils/TcpModule/src/TcpModuleMakePtr.h index 78f31bf..f381029 100644 --- a/utils/TcpModule/src/TcpModuleMakePtr.h +++ b/utils/TcpModule/src/TcpModuleMakePtr.h @@ -14,10 +14,10 @@ */ #ifndef TCP_MODULE_MAKE_PTR_H #define TCP_MODULE_MAKE_PTR_H +#include "ITcpClient.h" +#include "ITcpServer.h" #include "StatusCode.h" #include "TcpModule.h" -#include "ITcpServer.h" -#include "ITcpClient.h" #include class TcpModuleMakePtr { @@ -25,7 +25,7 @@ public: TcpModuleMakePtr() = default; virtual ~TcpModuleMakePtr() = default; static std::shared_ptr &GetInstance(std::shared_ptr *impl = nullptr); - virtual std::shared_ptr *CreateTcpServer(const TcpParam param); - virtual std::shared_ptr *CreateTcpClient(const TcpParam param); + virtual std::shared_ptr *CreateTcpServer(const TcpServerParam ¶m); + virtual std::shared_ptr *CreateTcpClient(const TcpClientParam ¶m); }; #endif // !TCP_MODULE_MAKE_PTR_H \ No newline at end of file diff --git a/utils/TcpModule/src/TcpServerImpl.cpp b/utils/TcpModule/src/TcpServerImpl.cpp index 5a24d54..dee7329 100644 --- a/utils/TcpModule/src/TcpServerImpl.cpp +++ b/utils/TcpModule/src/TcpServerImpl.cpp @@ -15,28 +15,11 @@ #include "TcpServerImpl.h" #include "ILog.h" #include "TcpServerHandle.h" -static bool TcpClientAcceptObjectCheck(void *object) -{ - if (nullptr == object) { - LogError("nullptr object!\n"); - return false; - } - if (*((const char **)(((char *)object) - sizeof(ITcpServerHeader))) != GetTcpClientAcceptName()) { - LogError("Illegal object!\n"); - return false; - } - return true; -} static void on_close(hio_t *io) { LogInfo("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io)); - // std::shared_ptr server; - // if (TcpServerHandle::GetInstance()->GetServer(hio_fd(io), server) == true) { - // std::shared_ptr impl = std::dynamic_pointer_cast(server); - // if (impl != nullptr) { - // impl->ClosedEvent(); - // } - // } + TcpServerImpl *server = (TcpServerImpl *)hevent_userdata(io); + server->RemoveClient(io); } static void on_recv(hio_t *io, void *buf, int readbytes) { @@ -46,19 +29,9 @@ static void on_recv(hio_t *io, void *buf, int readbytes) LogInfo( "[%s] <=> [%s]\n", SOCKADDR_STR(hio_localaddr(io), localaddrstr), SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); LogInfo("< %.*s", readbytes, (char *)buf); - ITcpClientAccept *client = (ITcpClientAccept *)hevent_userdata(io); - client->Readed((const char *)buf, readbytes); - // server->RemoveClient(hio_fd(io)); - // std::shared_ptr server; - // if (TcpServerHandle::GetInstance()->GetServer(hio_fd(io), server) == true) { - // std::shared_ptr impl = std::dynamic_pointer_cast(server); - // if (impl != nullptr) { - // impl->TcpServerReaded(buf, readbytes); - // } - // } - // echo - // LogInfo("> %.*s", readbytes, (char *)buf); - // hio_write(io, buf, readbytes); + TcpServerImpl *server = (TcpServerImpl *)hevent_userdata(io); + std::shared_ptr *client = server->GetClient(io); + (*client)->Readed((const char *)buf, readbytes); } static void on_accept(hio_t *io) { @@ -74,25 +47,47 @@ static void on_accept(hio_t *io) hio_setcb_read(io, on_recv); std::shared_ptr *client = NewTcpClientAccept(io); TcpServerImpl *server = (TcpServerImpl *)hevent_userdata(io); - // std::shared_ptr server; - // if (TcpServerHandle::GetInstance()->GetServer(hio_fd(io), server) == true) { - // std::shared_ptr impl = std::dynamic_pointer_cast(server); - // if (impl != nullptr) { - // impl->SetIo(io); - // } - // } server->AddClient(io, client); - hevent_set_userdata(io, (*client).get()); hio_read_start(io); } -TcpClientAcceptImpl::TcpClientAcceptImpl(const hio_t *io) : mIo(io) +TcpClientAcceptImpl::TcpClientAcceptImpl(const hio_t *io, const void *object) : mIo(io), mObjectThis(object) { + mParam.mReadFunc = nullptr; + mParam.mClosedFunc = nullptr; +} +void TcpClientAcceptImpl::SetParam(const ClientAcceptParam ¶m) +{ + std::lock_guard locker(mMutex); + mParam = param; } void TcpClientAcceptImpl::Readed(const void *data, size_t length) { - LogInfo("TcpClientAcceptImpl::Readed\n"); + std::lock_guard locker(mMutex); + if (nullptr != mParam.mReadFunc) { + mParam.mReadFunc(data, length, mObjectThis); + return; + } + LogWarning("mParam.mClosedFunc is null\n"); } -TcpServerImpl::TcpServerImpl(const TcpParam param) : mParam(param) +ssize_t TcpClientAcceptImpl::Write(const void *data, size_t length) +{ + if (mIo) { + hio_t *io = (hio_t *)mIo; + return hio_write(io, data, length); + } + LogError("mIo is null\n"); + return TCP_MODULE_WRITE_ERROR; +} +void TcpClientAcceptImpl::Closed(void) +{ + std::lock_guard locker(mMutex); + if (nullptr != mParam.mClosedFunc) { + mParam.mClosedFunc(mObjectThis); + return; + } + LogWarning("mParam.mClosedFunc is null\n"); +} +TcpServerImpl::TcpServerImpl(const TcpServerParam param) : mParam(param) { mLoop = nullptr; mIo = nullptr; @@ -114,7 +109,6 @@ void TcpServerImpl::Init(void) hevent_set_userdata(listenio, this); mIo = listenio; std::shared_ptr server = shared_from_this(); - // TcpServerHandle::GetInstance()->AddServer(hio_fd(listenio), server); std::shared_ptr impl = std::dynamic_pointer_cast(server); auto recvThread = [](std::shared_ptr tcpServer) { tcpServer->Loop(); @@ -136,22 +130,6 @@ void TcpServerImpl::UnInit(void) mLoop = nullptr; mIo = nullptr; } -void TcpServerImpl::TcpServerReaded(const void *buf, size_t length) -{ -} -void TcpServerImpl::TcpServerWrite(const void *buf, size_t length) -{ - if (mIo) { - hio_write(mIo, buf, length); - return; - } - LogError("mIo is null\n"); -} -void TcpServerImpl::ClosedEvent(void) -{ - mLoop = nullptr; - mIo = nullptr; -} void TcpServerImpl::Loop(void) { if (nullptr == mLoop) { @@ -162,6 +140,7 @@ void TcpServerImpl::Loop(void) } void TcpServerImpl::AddClient(hio_t *io, std::shared_ptr *client) { + std::lock_guard locker(mMutex); char localaddrstr[SOCKADDR_STRLEN] = {0}; char peeraddrstr[SOCKADDR_STRLEN] = {0}; LogInfo("accept connfd=%d [%s] <= [%s]\n", @@ -173,37 +152,51 @@ void TcpServerImpl::AddClient(hio_t *io, std::shared_ptr *clie mParam.mAcceptClientFunc(client, peeraddrstr); } } +std::shared_ptr *TcpServerImpl::GetClient(hio_t *io) +{ + std::lock_guard locker(mMutex); + auto it = mClients.find(hio_fd(io)); + if (it != mClients.end()) { + return it->second; + } + else { + LogError("GetClient failed, client not exit.\n"); + return nullptr; + } +} +void TcpServerImpl::RemoveClient(hio_t *io) +{ + std::lock_guard locker(mMutex); + auto it = mClients.find(hio_fd(io)); + if (it != mClients.end()) { + void *object = (void *)it->second; + if (TcpClientAcceptObjectCheck(object) == true) { + (*(std::shared_ptr *)object)->Closed(); + (*(std::shared_ptr *)object).reset(); + free(((char *)object) - sizeof(ITcpServerHeader)); + } + mClients.erase(it); + } + else { + LogError("RemoveClient failed, client not exit.\n"); + } +} void TcpServerImpl::FreeClients(void) { + std::lock_guard locker(mMutex); for (auto &client : mClients) { if (nullptr != client.second) { - // delete client.second; - // client.second = nullptr; void *object = (void *)client.second; if (TcpClientAcceptObjectCheck(object) == true) { - // (*(std::shared_ptr *)object)->UnInit(); + (*(std::shared_ptr *)object)->Closed(); (*(std::shared_ptr *)object).reset(); - free(((char *)object) - sizeof(ITcpServerHeader)); // TODO: bug? + free(((char *)object) - sizeof(ITcpServerHeader)); } } } - // for (auto it = mClients.begin(); it != mClients.end();) { - // // 检查指针是否为空 - // if (it->second != nullptr) { - // // 释放 shared_ptr,这将减少引用计数 - // it->second->reset(); - // // 删除指针 - // delete it->second; - // // 从 map 中删除条目 - // it = mClients.erase(it); - // } - // else { - // // 指针为空,继续迭代 - // ++it; - // } - // } + mClients.clear(); } -std::shared_ptr *NewTcpServer(const TcpParam param) +std::shared_ptr *NewTcpServer(const TcpServerParam ¶m) { LogInfo("Create tcp server object.\n"); TcpServer *impl = (TcpServer *)malloc(sizeof(TcpServer)); @@ -228,6 +221,8 @@ std::shared_ptr *NewTcpClientAccept(const hio_t *io) TcpClientAccept tmp; memcpy((void *)impl, (void *)&tmp, sizeof(TcpClientAccept)); impl->mHeader.mCheckName = GetTcpClientAcceptName(); - impl->mTcpClientAccept = std::make_shared(io); - return (std::shared_ptr *)(((char *)impl) + sizeof(ITcpServerHeader)); + std::shared_ptr *objectThis = + (std::shared_ptr *)(((char *)impl) + sizeof(ITcpServerHeader)); + impl->mTcpClientAccept = std::make_shared(io, objectThis); + return objectThis; } \ No newline at end of file diff --git a/utils/TcpModule/src/TcpServerImpl.h b/utils/TcpModule/src/TcpServerImpl.h index 3db698c..5e92655 100644 --- a/utils/TcpModule/src/TcpServerImpl.h +++ b/utils/TcpModule/src/TcpServerImpl.h @@ -20,38 +20,45 @@ #include "hsocket.h" #include "hssl.h" #include +#include #include -class TcpClientAcceptImpl : public ITcpClientAccept +class TcpClientAcceptImpl : public ITcpClientAccept, public std::enable_shared_from_this { public: - TcpClientAcceptImpl(const hio_t *io); + TcpClientAcceptImpl(const hio_t *io, const void *object); virtual ~TcpClientAcceptImpl() = default; + void SetParam(const ClientAcceptParam ¶m) override; void Readed(const void *data, size_t length) override; + ssize_t Write(const void *data, size_t length) override; + void Closed(void) override; private: + std::mutex mMutex; const hio_t *mIo; + ClientAcceptParam mParam; + const void *mObjectThis; }; class TcpServerImpl : public ITcpServer, public std::enable_shared_from_this { public: - TcpServerImpl(const TcpParam param); + TcpServerImpl(const TcpServerParam param); virtual ~TcpServerImpl() = default; void Init(void) override; void UnInit(void) override; - void TcpServerReaded(const void *buf, size_t length); - void TcpServerWrite(const void *buf, size_t length); - void ClosedEvent(void); void Loop(void); void AddClient(hio_t *io, std::shared_ptr *client); + std::shared_ptr *GetClient(hio_t *io); + void RemoveClient(hio_t *io); void FreeClients(void); private: + std::mutex mMutex; hloop_t *mLoop; hio_t *mIo; - const TcpParam mParam; + const TcpServerParam mParam; std::thread mTcpServerThread; std::map *> mClients; }; -std::shared_ptr *NewTcpServer(const TcpParam param); +std::shared_ptr *NewTcpServer(const TcpServerParam ¶m); std::shared_ptr *NewTcpClientAccept(const hio_t *io); #endif \ No newline at end of file