///////////////////////////////////////////////////////////////////////////// // Name: samples/sockbase/client.cpp // Purpose: Sockets sample for wxBase // Author: Lukasz Michalski // Modified by: // Created: 27.06.2005 // RCS-ID: $Id$ // Copyright: (c) 2005 Lukasz Michalski // Licence: wxWindows license ///////////////////////////////////////////////////////////////////////////// // ============================================================================ // declarations // ============================================================================ // ---------------------------------------------------------------------------- // headers // ---------------------------------------------------------------------------- #include "wx/wx.h" #include "wx/socket.h" #include "wx/event.h" #include "wx/list.h" #include "wx/cmdline.h" #include "wx/ffile.h" #include "wx/datetime.h" #include "wx/timer.h" #include "wx/thread.h" const wxEventType wxEVT_WORKER = wxNewEventType(); #define EVT_WORKER(func) DECLARE_EVENT_TABLE_ENTRY( wxEVT_WORKER, -1, -1, (wxObjectEventFunction) (wxEventFunction) (WorkerEventFunction) & func, (wxObject *) NULL ), const int timeout_val = 1000; class WorkerEvent : public wxEvent { public: typedef enum { CONNECTING, SENDING, RECEIVING, DISCONNECTING, DONE } evt_type; WorkerEvent(void* pSender, evt_type type) { SetId(-1); SetEventType(wxEVT_WORKER); m_sender = pSender; m_eventType = type; m_isFailed = false; } void setFailed() { m_isFailed = true; } bool isFailed() const { return m_isFailed; } virtual wxEvent* Clone() const { return new WorkerEvent(*this); } void* m_sender; bool m_isFailed; wxString m_workerIdent; evt_type m_eventType; }; typedef void (wxEvtHandler::*WorkerEventFunction)(WorkerEvent&); class ThreadWorker; class EventWorker; WX_DECLARE_LIST(ThreadWorker, TList); WX_DECLARE_LIST(EventWorker, EList); class Client : public wxApp { DECLARE_EVENT_TABLE(); public: void RemoveEventWorker(EventWorker* p_worker); private: typedef enum { THREADS, EVENTS } workMode; typedef enum { SEND_RANDOM, SEND_MESSAGE, STRESS_TEST } sendType; workMode m_workMode; sendType m_sendType; wxString m_message; wxString m_host; long m_stressWorkers; virtual bool OnInit(); virtual int OnRun(); virtual int OnExit(); void OnInitCmdLine(wxCmdLineParser& pParser); bool OnCmdLineParsed(wxCmdLineParser& pParser); void OnWorkerEvent(WorkerEvent& pEvent); void OnTimerEvent(wxTimerEvent& pEvent); void StartWorker(workMode pMode, const wxString& pMessage); void StartWorker(workMode pMode); char* CreateBuffer(int *msgsize); void dumpStatistics(); TList m_threadWorkers; EList m_eventWorkers; unsigned m_statConnecting; unsigned m_statSending; unsigned m_statReceiving; unsigned m_statDisconnecting; unsigned m_statDone; unsigned m_statFailed; wxTimer mTimer; }; DECLARE_APP(Client); class ThreadWorker : public wxThread { public: ThreadWorker(const wxString& p_host, char* p_buf, int p_size); virtual ExitCode Entry(); private: wxString m_host; wxSocketClient* m_clientSocket; char* m_inbuf; char* m_outbuf; int m_outsize; int m_insize; wxString m_workerIdent; }; class EventWorker : public wxEvtHandler { DECLARE_EVENT_TABLE(); public: EventWorker(const wxString& p_host, char* p_buf, int p_size); void Run(); virtual ~EventWorker(); private: wxString m_host; wxSocketClient* m_clientSocket; char* m_inbuf; char* m_outbuf; int m_outsize; int m_written; int m_insize; int m_readed; WorkerEvent::evt_type m_currentType; bool m_doneSent; wxIPV4address m_localaddr; void OnSocketEvent(wxSocketEvent& pEvent); void SendEvent(bool failed); }; /******************* Implementation ******************/ IMPLEMENT_APP_CONSOLE(Client); #include WX_DEFINE_LIST(TList); WX_DEFINE_LIST(EList); wxString CreateIdent(const wxIPV4address& addr) { return wxString::Format(wxT("%s:%d"),addr.IPAddress().c_str(),addr.Service()); } void Client::OnInitCmdLine(wxCmdLineParser& pParser) { wxApp::OnInitCmdLine(pParser); pParser.AddSwitch(wxT("e"),wxT("event"),_("Use event based worker (default)"),wxCMD_LINE_PARAM_OPTIONAL); pParser.AddSwitch(wxT("t"),wxT("thread"),_("Use thread based worker"),wxCMD_LINE_PARAM_OPTIONAL); pParser.AddSwitch(wxT("r"),wxT("random"),_("Send radnom data (default)"),wxCMD_LINE_PARAM_OPTIONAL); pParser.AddOption(wxT("m"),wxT("message"),_("Send message from "),wxCMD_LINE_VAL_STRING,wxCMD_LINE_PARAM_OPTIONAL); pParser.AddOption(wxT("f"),wxT("file"),_("Send contents of "),wxCMD_LINE_VAL_STRING,wxCMD_LINE_PARAM_OPTIONAL); pParser.AddOption(wxT("H"),wxT("hostname"),_("IP or name of host to connect to"),wxCMD_LINE_VAL_STRING,wxCMD_LINE_PARAM_OPTIONAL); pParser.AddOption(wxT("s"),wxT("stress"),_("stress test with concurrent connections"),wxCMD_LINE_VAL_NUMBER,wxCMD_LINE_PARAM_OPTIONAL); } bool Client::OnCmdLineParsed(wxCmdLineParser& pParser) { wxString fname; m_workMode = EVENTS; m_stressWorkers = 50; if (pParser.Found(_("verbose"))) { wxLog::AddTraceMask(wxT("wxSocket")); wxLog::AddTraceMask(wxT("epolldispatcher")); wxLog::AddTraceMask(wxT("selectdispatcher")); wxLog::AddTraceMask(wxT("thread")); wxLog::AddTraceMask(wxT("events")); } if (pParser.Found(wxT("t"))) m_workMode = THREADS; m_sendType = SEND_RANDOM; if (pParser.Found(wxT("m"),&m_message)) m_sendType = SEND_MESSAGE; else if (pParser.Found(wxT("f"),&fname)) { wxFFile file(fname); if (!file.IsOpened()) { wxLogError(wxT("Cannot open file %s"),fname.c_str()); return false; }; if (!file.ReadAll(&m_message)) { wxLogError(wxT("Cannot read conten of file %s"),fname.c_str()); return false; }; m_sendType = SEND_MESSAGE; }; if (pParser.Found(wxT("s"),&m_stressWorkers)) m_sendType = STRESS_TEST; m_host = wxT("127.0.0.1"); pParser.Found(wxT("H"),&m_host); return wxApp::OnCmdLineParsed(pParser); }; bool Client::OnInit() { if (!wxApp::OnInit()) return false; srand(wxDateTime::Now().GetTicks()); mTimer.SetOwner(this); m_statConnecting = 0; m_statSending = 0; m_statReceiving = 0; m_statDisconnecting = 0; m_statDone = 0; m_statFailed = 0; return true; } int Client::OnRun() { switch(m_sendType) { case STRESS_TEST: switch(m_workMode) { case THREADS: for (int i = 0; i < m_stressWorkers; i++) { if (m_message.empty()) StartWorker(THREADS); else StartWorker(THREADS, m_message); } break; case EVENTS: for (int i = 0; i < m_stressWorkers; i++) { if (m_message.empty()) StartWorker(EVENTS); else StartWorker(EVENTS, m_message); } break; default: for (int i = 0; i < m_stressWorkers; i++) { if (m_message.empty()) StartWorker(i % 5 == 0 ? THREADS : EVENTS); else StartWorker(i % 5 == 0 ? THREADS : EVENTS, m_message); } break; } break; case SEND_MESSAGE: StartWorker(m_workMode,m_message); break; case SEND_RANDOM: StartWorker(m_workMode); break; } mTimer.Start(timeout_val,true); return wxApp::OnRun(); } int Client::OnExit() { for(EList::compatibility_iterator it = m_eventWorkers.GetFirst(); it ; it->GetNext()) { delete it->GetData(); } return 0; } // Create buffer to be sent by client. Buffer contains test indicator // message size and place for data // msgsize parameter contains size of data in bytes and // if input value does not fit into 250 bytes then // on exit is updated to new value that is multiply of 1024 bytes char* Client::CreateBuffer(int* msgsize) { int bufsize = 0; char* buf; //if message should have more than 256 bytes then set it as //test3 for compatibility with GUI server sample if ((*msgsize) > 250) { //send at least one kb of data int size = (*msgsize)/1024 + 1; //returned buffer will contain test indicator, message size in kb and data bufsize = size*1024+2; buf = new char[bufsize]; buf[0] = 0xDE; //second byte contains size in kilobytes buf[1] = (char)(size); *msgsize = size*1024; } else { //returned buffer will contain test indicator, message size in kb and data bufsize = (*msgsize)+2; buf = new char[bufsize]; buf[0] = 0xBE; //second byte contains size in bytes buf[1] = (char)(*msgsize); } return buf; } void Client::StartWorker(workMode pMode) { int msgsize = 1 + (int) (250000.0 * (rand() / (RAND_MAX + 1.0))); char* buf = CreateBuffer(&msgsize); //fill data part of buffer with random bytes for (int i = 2; i < (msgsize); i++) { buf[i] = i % 256; } if (pMode == THREADS) { ThreadWorker* c = new ThreadWorker(m_host,buf,msgsize+2); if (c->Create() != wxTHREAD_NO_ERROR) { wxLogError(wxT("Cannot create more threads")); } else { c->Run(); m_threadWorkers.Append(c); } } else { EventWorker* e = new EventWorker(m_host,buf,msgsize+2); e->Run(); m_eventWorkers.Append(e); } m_statConnecting++; } void Client::StartWorker(workMode pMode, const wxString& pMessage) { char* tmpbuf = strdup(pMessage.mb_str()); int msgsize = strlen(tmpbuf); char* buf = CreateBuffer(&msgsize); memset(buf+2,0x0,msgsize); memcpy(buf+2,tmpbuf,msgsize); free(tmpbuf); if (pMode == THREADS) { ThreadWorker* c = new ThreadWorker(m_host,buf,msgsize+2); if (c->Create() != wxTHREAD_NO_ERROR) { wxLogError(wxT("Cannot create more threads")); } else { c->Run(); m_threadWorkers.Append(c); } } else { EventWorker* e = new EventWorker(m_host,buf,msgsize+2); e->Run(); m_eventWorkers.Append(e); } m_statConnecting++; } void Client::OnWorkerEvent(WorkerEvent& pEvent) { switch (pEvent.m_eventType) { case WorkerEvent::CONNECTING: if (pEvent.isFailed()) { m_statConnecting--; m_statFailed++; } break; case WorkerEvent::SENDING: if (pEvent.isFailed()) { m_statFailed++; m_statSending--; } else { m_statConnecting--; m_statSending++; } break; case WorkerEvent::RECEIVING: if (pEvent.isFailed()) { m_statReceiving--; m_statFailed++; } else { m_statSending--; m_statReceiving++; } break; case WorkerEvent::DISCONNECTING: if (pEvent.isFailed()) { m_statDisconnecting--; m_statFailed++; } else { m_statReceiving--; m_statDisconnecting++; } break; case WorkerEvent::DONE: m_statDone++; m_statDisconnecting--; break; }; if (pEvent.isFailed() || pEvent.m_eventType == WorkerEvent::DONE) { for(TList::compatibility_iterator it = m_threadWorkers.GetFirst(); it ; it = it->GetNext()) { if (it->GetData() == pEvent.m_sender) { m_threadWorkers.DeleteNode(it); break; } } for(EList::compatibility_iterator it = m_eventWorkers.GetFirst(); it ; it = it->GetNext()) { if (it->GetData() == pEvent.m_sender) { delete it->GetData(); m_eventWorkers.DeleteNode(it); break; } } if ((m_threadWorkers.GetCount() == 0) && (m_eventWorkers.GetCount() == 0)) { mTimer.Stop(); dumpStatistics(); wxSleep(2); ExitMainLoop(); } else { mTimer.Start(timeout_val,true); } } } void Client::RemoveEventWorker(EventWorker* p_worker) { for(EList::compatibility_iterator it = m_eventWorkers.GetFirst(); it ; it = it->GetNext()) { if (it->GetData() == p_worker) { //wxLogDebug(wxT("Deleting event worker")); delete it->GetData(); m_eventWorkers.DeleteNode(it); return; } } } void Client::dumpStatistics() { wxString msg( wxString::Format(_("Connecting:\t%d\nSending\t\t%d\nReceiving\t%d\nDisconnecting:\t%d\nDone:\t\t%d\nFailed:\t\t%d\n"), m_statConnecting, m_statSending, m_statReceiving, m_statDisconnecting, m_statDone, m_statFailed )); wxLogMessage(wxT("Current status:\n%s\n"),msg.c_str()); } void Client::OnTimerEvent(wxTimerEvent&) { dumpStatistics(); } BEGIN_EVENT_TABLE(Client,wxEvtHandler) EVT_WORKER(Client::OnWorkerEvent) EVT_TIMER(wxID_ANY,Client::OnTimerEvent) END_EVENT_TABLE() EventWorker::EventWorker(const wxString& p_host, char* p_buf, int p_size) : m_host(p_host), m_outbuf(p_buf), m_outsize(p_size), m_written(0), m_readed(0) { m_clientSocket = new wxSocketClient(wxSOCKET_NOWAIT); m_clientSocket->SetEventHandler(*this); m_insize = m_outsize - 2; m_inbuf = new char[m_insize]; } void EventWorker::Run() { wxIPV4address ca; ca.Hostname(m_host); ca.Service(3000); m_clientSocket->SetNotify(wxSOCKET_CONNECTION_FLAG|wxSOCKET_LOST_FLAG|wxSOCKET_OUTPUT_FLAG|wxSOCKET_INPUT_FLAG); m_clientSocket->Notify(true); m_currentType = WorkerEvent::CONNECTING; m_doneSent = false; //wxLogMessage(wxT("EventWorker: Connecting.....")); m_clientSocket->Connect(ca,false); } void EventWorker::OnSocketEvent(wxSocketEvent& pEvent) { switch(pEvent.GetSocketEvent()) { case wxSOCKET_INPUT: //wxLogDebug(wxT("EventWorker: INPUT")); do { if (m_readed == m_insize) return; //event already posted m_clientSocket->Read(m_inbuf + m_readed, m_insize - m_readed); if (m_clientSocket->Error()) { if (m_clientSocket->LastError() != wxSOCKET_WOULDBLOCK) { wxLogError(wxT("%s: read error"),CreateIdent(m_localaddr).c_str()); SendEvent(true); } } m_readed += m_clientSocket->LastCount(); //wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(), m_insize - m_readed); if (m_readed == m_insize) { if (!memcmp(m_inbuf,m_outbuf,m_insize)) { wxLogError(wxT("%s: data mismatch"),CreateIdent(m_localaddr).c_str()); SendEvent(true); } m_currentType = WorkerEvent::DISCONNECTING; wxLogDebug(wxT("%s: DISCONNECTING"),CreateIdent(m_localaddr).c_str()); SendEvent(false); //wxLogDebug(wxT("EventWorker %p closing"),this); m_clientSocket->Close(); m_currentType = WorkerEvent::DONE; wxLogDebug(wxT("%s: DONE"),CreateIdent(m_localaddr).c_str()); SendEvent(false); } } while (!m_clientSocket->Error()); break; case wxSOCKET_OUTPUT: //wxLogDebug(wxT("EventWorker: OUTPUT")); do { if (m_written == m_outsize) return; if (m_written == 0) { m_currentType = WorkerEvent::SENDING; wxLogDebug(wxT("%s: SENDING"),CreateIdent(m_localaddr).c_str()); } m_clientSocket->Write(m_outbuf + m_written, m_outsize - m_written); if (m_clientSocket->Error()) { if (m_clientSocket->LastError() != wxSOCKET_WOULDBLOCK) { wxLogError(wxT("%s: Write error"),CreateIdent(m_localaddr).c_str()); SendEvent(true); } } m_written += m_clientSocket->LastCount(); if (m_written != m_outsize) { //wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),m_outsize - m_written); } else { //wxLogDebug(wxT("EventWorker %p SENDING->RECEIVING"),this); m_currentType = WorkerEvent::RECEIVING; wxLogDebug(wxT("%s: RECEIVING"),CreateIdent(m_localaddr).c_str()); SendEvent(false); } } while(!m_clientSocket->Error()); break; case wxSOCKET_CONNECTION: { //wxLogMessage(wxT("EventWorker: got connection")); wxLogMessage(wxT("%s: starting writing message (2 bytes for signature and %d bytes of data to write)"),CreateIdent(m_localaddr).c_str(),m_outsize-2); if (!m_clientSocket->GetLocal(m_localaddr)) wxLogError(_("Cannot get peer data for socket %p"),m_clientSocket); m_currentType = WorkerEvent::SENDING; wxLogDebug(wxT("%s: CONNECTING"),CreateIdent(m_localaddr).c_str()); SendEvent(false); } break; case wxSOCKET_LOST: { wxLogError(_("%s: connection lost"),CreateIdent(m_localaddr).c_str()); SendEvent(true); } break; } } void EventWorker::SendEvent(bool failed) { if (m_doneSent) return; WorkerEvent e(this,m_currentType); if (failed) e.setFailed(); wxGetApp().AddPendingEvent(e); m_doneSent = failed || m_currentType == WorkerEvent::DONE; }; EventWorker::~EventWorker() { m_clientSocket->Destroy(); delete [] m_outbuf; delete [] m_inbuf; } BEGIN_EVENT_TABLE(EventWorker,wxEvtHandler) EVT_SOCKET(wxID_ANY,EventWorker::OnSocketEvent) END_EVENT_TABLE() ThreadWorker::ThreadWorker(const wxString& p_host, char* p_buf, int p_size) : wxThread(wxTHREAD_DETACHED), m_host(p_host), m_outbuf(p_buf), m_outsize(p_size) { m_clientSocket = new wxSocketClient(wxSOCKET_BLOCK|wxSOCKET_WAITALL); m_insize = m_outsize - 2; m_inbuf = new char[m_insize]; } wxThread::ExitCode ThreadWorker::Entry() { wxIPV4address ca; ca.Hostname(m_host); ca.Service(5678); //wxLogDebug(wxT("ThreadWorker: Connecting.....")); m_clientSocket->SetTimeout(60); bool failed = false; WorkerEvent::evt_type etype = WorkerEvent::CONNECTING; if (!m_clientSocket->Connect(ca)) { wxLogError(wxT("Cannot connect to %s:%d"),ca.IPAddress().c_str(), ca.Service()); failed = true; } else { //wxLogMessage(wxT("ThreadWorker: Connected. Sending %d bytes of data"),m_outsize); etype = WorkerEvent::SENDING; WorkerEvent e(this,etype); wxGetApp().AddPendingEvent(e); int to_process = m_outsize; do { m_clientSocket->Write(m_outbuf,m_outsize); if (m_clientSocket->Error()) { wxLogError(wxT("ThreadWorker: Write error")); failed = true; } to_process -= m_clientSocket->LastCount(); //wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process); } while(!m_clientSocket->Error() && to_process != 0); if (!failed) { etype = WorkerEvent::RECEIVING; WorkerEvent e(this,etype); wxGetApp().AddPendingEvent(e); to_process = m_insize; do { m_clientSocket->Read(m_inbuf,m_insize); if (m_clientSocket->Error()) { wxLogError(wxT("ThreadWorker: Read error")); failed = true; break; } to_process -= m_clientSocket->LastCount(); //wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process); } while(!m_clientSocket->Error() && to_process != 0); } char* outdat = (char*)m_outbuf+2; if (!failed && (memcmp(m_inbuf,outdat,m_insize) != 0)) { wxLogError(wxT("Data mismatch")); failed = true; } } //wxLogDebug(wxT("ThreadWorker: Finished")); if (!failed) { etype = WorkerEvent::DISCONNECTING; WorkerEvent e(this,etype); wxGetApp().AddPendingEvent(e); }; m_clientSocket->Close(); m_clientSocket->Destroy(); m_clientSocket = NULL; delete [] m_outbuf; delete [] m_inbuf; if (!failed) etype = WorkerEvent::DONE; WorkerEvent e(this,etype); if (failed) e.setFailed(); wxGetApp().AddPendingEvent(e); return 0; }