wxWidgets/samples/sockets/baseclient.cpp
Vadim Zeitlin 3f66f6a5b3 Remove all lines containing cvs/svn "$Id$" keyword.
This keyword is not expanded by Git which means it's not replaced with the
correct revision value in the releases made using git-based scripts and it's
confusing to have lines with unexpanded "$Id$" in the released files. As
expanding them with Git is not that simple (it could be done with git archive
and export-subst attribute) and there are not many benefits in having them in
the first place, just remove all these lines.

If nothing else, this will make an eventual transition to Git simpler.

Closes #14487.

git-svn-id: https://svn.wxwidgets.org/svn/wx/wxWidgets/trunk@74602 c3d73ce0-8a6f-49c7-b76d-6d57e0e08775
2013-07-26 16:02:46 +00:00

736 lines
22 KiB
C++

/////////////////////////////////////////////////////////////////////////////
// Name: samples/sockbase/client.cpp
// Purpose: Sockets sample for wxBase
// Author: Lukasz Michalski
// Modified by:
// Created: 27.06.2005
// Copyright: (c) 2005 Lukasz Michalski <lmichalski@sf.net>
// Licence: wxWindows licence
/////////////////////////////////////////////////////////////////////////////
// ============================================================================
// declarations
// ============================================================================
// ----------------------------------------------------------------------------
// headers
// ----------------------------------------------------------------------------
#include "wx/wx.h"
#include "wx/socket.h"
#include "wx/event.h"
#include "wx/list.h"
#include "wx/cmdline.h"
#include "wx/ffile.h"
#include "wx/datetime.h"
#include "wx/timer.h"
#include "wx/thread.h"
const wxEventType wxEVT_WORKER = wxNewEventType();
#define EVT_WORKER(func) DECLARE_EVENT_TABLE_ENTRY( wxEVT_WORKER, -1, -1, (wxObjectEventFunction) (wxEventFunction) (WorkerEventFunction) & func, (wxObject *) NULL ),
const int timeout_val = 1000;
class WorkerEvent : public wxEvent {
public:
typedef enum {
CONNECTING,
SENDING,
RECEIVING,
DISCONNECTING,
DONE
} evt_type;
WorkerEvent(void* pSender, evt_type type)
{
SetId(-1);
SetEventType(wxEVT_WORKER);
m_sender = pSender;
m_eventType = type;
m_isFailed = false;
}
void setFailed() { m_isFailed = true; }
bool isFailed() const { return m_isFailed; }
virtual wxEvent* Clone() const
{
return new WorkerEvent(*this);
}
void* m_sender;
bool m_isFailed;
wxString m_workerIdent;
evt_type m_eventType;
};
typedef void (wxEvtHandler::*WorkerEventFunction)(WorkerEvent&);
class ThreadWorker;
class EventWorker;
WX_DECLARE_LIST(ThreadWorker, TList);
WX_DECLARE_LIST(EventWorker, EList);
class Client : public wxApp {
DECLARE_EVENT_TABLE()
public:
void RemoveEventWorker(EventWorker* p_worker);
private:
typedef enum
{
THREADS,
EVENTS
} workMode;
typedef enum
{
SEND_RANDOM,
SEND_MESSAGE,
STRESS_TEST
} sendType;
workMode m_workMode;
sendType m_sendType;
wxString m_message;
wxString m_host;
long m_stressWorkers;
virtual bool OnInit();
virtual int OnRun();
virtual int OnExit();
void OnInitCmdLine(wxCmdLineParser& pParser);
bool OnCmdLineParsed(wxCmdLineParser& pParser);
void OnWorkerEvent(WorkerEvent& pEvent);
void OnTimerEvent(wxTimerEvent& pEvent);
void StartWorker(workMode pMode, const wxString& pMessage);
void StartWorker(workMode pMode);
char* CreateBuffer(int *msgsize);
void dumpStatistics();
TList m_threadWorkers;
EList m_eventWorkers;
unsigned m_statConnecting;
unsigned m_statSending;
unsigned m_statReceiving;
unsigned m_statDisconnecting;
unsigned m_statDone;
unsigned m_statFailed;
wxTimer mTimer;
};
DECLARE_APP(Client);
class ThreadWorker : public wxThread
{
public:
ThreadWorker(const wxString& p_host, char* p_buf, int p_size);
virtual ExitCode Entry();
private:
wxString m_host;
wxSocketClient* m_clientSocket;
char* m_inbuf;
char* m_outbuf;
int m_outsize;
int m_insize;
wxString m_workerIdent;
};
class EventWorker : public wxEvtHandler
{
DECLARE_EVENT_TABLE()
public:
EventWorker(const wxString& p_host, char* p_buf, int p_size);
void Run();
virtual ~EventWorker();
private:
wxString m_host;
wxSocketClient* m_clientSocket;
char* m_inbuf;
char* m_outbuf;
int m_outsize;
int m_written;
int m_insize;
int m_readed;
WorkerEvent::evt_type m_currentType;
bool m_doneSent;
wxIPV4address m_localaddr;
void OnSocketEvent(wxSocketEvent& pEvent);
void SendEvent(bool failed);
};
/******************* Implementation ******************/
IMPLEMENT_APP_CONSOLE(Client);
#include <wx/listimpl.cpp>
WX_DEFINE_LIST(TList);
WX_DEFINE_LIST(EList);
wxString
CreateIdent(const wxIPV4address& addr)
{
return wxString::Format(wxT("%s:%d"),addr.IPAddress().c_str(),addr.Service());
}
void
Client::OnInitCmdLine(wxCmdLineParser& pParser)
{
wxApp::OnInitCmdLine(pParser);
pParser.AddSwitch(wxT("e"),wxT("event"),_("Use event based worker (default)"),wxCMD_LINE_PARAM_OPTIONAL);
pParser.AddSwitch(wxT("t"),wxT("thread"),_("Use thread based worker"),wxCMD_LINE_PARAM_OPTIONAL);
pParser.AddSwitch(wxT("r"),wxT("random"),_("Send radnom data (default)"),wxCMD_LINE_PARAM_OPTIONAL);
pParser.AddOption(wxT("m"),wxT("message"),_("Send message from <str>"),wxCMD_LINE_VAL_STRING,wxCMD_LINE_PARAM_OPTIONAL);
pParser.AddOption(wxT("f"),wxT("file"),_("Send contents of <file>"),wxCMD_LINE_VAL_STRING,wxCMD_LINE_PARAM_OPTIONAL);
pParser.AddOption(wxT("H"),wxT("hostname"),_("IP or name of host to connect to"),wxCMD_LINE_VAL_STRING,wxCMD_LINE_PARAM_OPTIONAL);
pParser.AddOption(wxT("s"),wxT("stress"),_("stress test with <num> concurrent connections"),wxCMD_LINE_VAL_NUMBER,wxCMD_LINE_PARAM_OPTIONAL);
}
bool
Client::OnCmdLineParsed(wxCmdLineParser& pParser)
{
wxString fname;
m_workMode = EVENTS;
m_stressWorkers = 50;
if (pParser.Found(_("verbose")))
{
wxLog::AddTraceMask(wxT("wxSocket"));
wxLog::AddTraceMask(wxT("epolldispatcher"));
wxLog::AddTraceMask(wxT("selectdispatcher"));
wxLog::AddTraceMask(wxT("thread"));
wxLog::AddTraceMask(wxT("events"));
}
if (pParser.Found(wxT("t")))
m_workMode = THREADS;
m_sendType = SEND_RANDOM;
if (pParser.Found(wxT("m"),&m_message))
m_sendType = SEND_MESSAGE;
else if (pParser.Found(wxT("f"),&fname))
{
wxFFile file(fname);
if (!file.IsOpened()) {
wxLogError(wxT("Cannot open file %s"),fname.c_str());
return false;
};
if (!file.ReadAll(&m_message)) {
wxLogError(wxT("Cannot read conten of file %s"),fname.c_str());
return false;
};
m_sendType = SEND_MESSAGE;
};
if (pParser.Found(wxT("s"),&m_stressWorkers))
m_sendType = STRESS_TEST;
m_host = wxT("127.0.0.1");
pParser.Found(wxT("H"),&m_host);
return wxApp::OnCmdLineParsed(pParser);
};
bool
Client::OnInit()
{
if (!wxApp::OnInit())
return false;
srand(wxDateTime::Now().GetTicks());
mTimer.SetOwner(this);
m_statConnecting = 0;
m_statSending = 0;
m_statReceiving = 0;
m_statDisconnecting = 0;
m_statDone = 0;
m_statFailed = 0;
return true;
}
int
Client::OnRun()
{
int i;
switch(m_sendType)
{
case STRESS_TEST:
switch(m_workMode)
{
case THREADS:
for (i = 0; i < m_stressWorkers; i++) {
if (m_message.empty())
StartWorker(THREADS);
else
StartWorker(THREADS, m_message);
}
break;
case EVENTS:
for (i = 0; i < m_stressWorkers; i++) {
if (m_message.empty())
StartWorker(EVENTS);
else
StartWorker(EVENTS, m_message);
}
break;
default:
for (i = 0; i < m_stressWorkers; i++) {
if (m_message.empty())
StartWorker(i % 5 == 0 ? THREADS : EVENTS);
else
StartWorker(i % 5 == 0 ? THREADS : EVENTS, m_message);
}
break;
}
break;
case SEND_MESSAGE:
StartWorker(m_workMode,m_message);
break;
case SEND_RANDOM:
StartWorker(m_workMode);
break;
}
mTimer.Start(timeout_val,true);
return wxApp::OnRun();
}
int
Client::OnExit()
{
for(EList::compatibility_iterator it = m_eventWorkers.GetFirst(); it ; it->GetNext()) {
delete it->GetData();
}
return 0;
}
// Create buffer to be sent by client. Buffer contains test indicator
// message size and place for data
// msgsize parameter contains size of data in bytes and
// if input value does not fit into 250 bytes then
// on exit is updated to new value that is multiply of 1024 bytes
char*
Client::CreateBuffer(int* msgsize)
{
int bufsize = 0;
char* buf;
//if message should have more than 256 bytes then set it as
//test3 for compatibility with GUI server sample
if ((*msgsize) > 250)
{
//send at least one kb of data
int size = (*msgsize)/1024 + 1;
//returned buffer will contain test indicator, message size in kb and data
bufsize = size*1024+2;
buf = new char[bufsize];
buf[0] = (unsigned char)0xDE; //second byte contains size in kilobytes
buf[1] = (char)(size);
*msgsize = size*1024;
}
else
{
//returned buffer will contain test indicator, message size in kb and data
bufsize = (*msgsize)+2;
buf = new char[bufsize];
buf[0] = (unsigned char)0xBE; //second byte contains size in bytes
buf[1] = (char)(*msgsize);
}
return buf;
}
void
Client::StartWorker(workMode pMode) {
int msgsize = 1 + (int) (250000.0 * (rand() / (RAND_MAX + 1.0)));
char* buf = CreateBuffer(&msgsize);
//fill data part of buffer with random bytes
for (int i = 2; i < (msgsize); i++) {
buf[i] = i % 256;
}
if (pMode == THREADS) {
ThreadWorker* c = new ThreadWorker(m_host,buf,msgsize+2);
if (c->Create() != wxTHREAD_NO_ERROR) {
wxLogError(wxT("Cannot create more threads"));
} else {
c->Run();
m_threadWorkers.Append(c);
}
} else {
EventWorker* e = new EventWorker(m_host,buf,msgsize+2);
e->Run();
m_eventWorkers.Append(e);
}
m_statConnecting++;
}
void
Client::StartWorker(workMode pMode, const wxString& pMessage) {
char* tmpbuf = wxStrdup(pMessage.mb_str());
int msgsize = strlen(tmpbuf);
char* buf = CreateBuffer(&msgsize);
memset(buf+2,0x0,msgsize);
memcpy(buf+2,tmpbuf,msgsize);
free(tmpbuf);
if (pMode == THREADS) {
ThreadWorker* c = new ThreadWorker(m_host,buf,msgsize+2);
if (c->Create() != wxTHREAD_NO_ERROR) {
wxLogError(wxT("Cannot create more threads"));
} else {
c->Run();
m_threadWorkers.Append(c);
}
} else {
EventWorker* e = new EventWorker(m_host,buf,msgsize+2);
e->Run();
m_eventWorkers.Append(e);
}
m_statConnecting++;
}
void
Client::OnWorkerEvent(WorkerEvent& pEvent) {
switch (pEvent.m_eventType) {
case WorkerEvent::CONNECTING:
if (pEvent.isFailed())
{
m_statConnecting--;
m_statFailed++;
}
break;
case WorkerEvent::SENDING:
if (pEvent.isFailed())
{
m_statFailed++;
m_statSending--;
}
else
{
m_statConnecting--;
m_statSending++;
}
break;
case WorkerEvent::RECEIVING:
if (pEvent.isFailed())
{
m_statReceiving--;
m_statFailed++;
}
else
{
m_statSending--;
m_statReceiving++;
}
break;
case WorkerEvent::DISCONNECTING:
if (pEvent.isFailed())
{
m_statDisconnecting--;
m_statFailed++;
}
else
{
m_statReceiving--;
m_statDisconnecting++;
}
break;
case WorkerEvent::DONE:
m_statDone++;
m_statDisconnecting--;
break;
};
if (pEvent.isFailed() || pEvent.m_eventType == WorkerEvent::DONE)
{
for(TList::compatibility_iterator it = m_threadWorkers.GetFirst(); it ; it = it->GetNext()) {
if (it->GetData() == pEvent.m_sender) {
m_threadWorkers.DeleteNode(it);
break;
}
}
for(EList::compatibility_iterator it2 = m_eventWorkers.GetFirst(); it2 ; it2 = it2->GetNext())
{
if (it2->GetData() == pEvent.m_sender) {
delete it2->GetData();
m_eventWorkers.DeleteNode(it2);
break;
}
}
if ((m_threadWorkers.GetCount() == 0) && (m_eventWorkers.GetCount() == 0))
{
mTimer.Stop();
dumpStatistics();
wxSleep(2);
ExitMainLoop();
}
else
{
mTimer.Start(timeout_val,true);
}
}
}
void
Client::RemoveEventWorker(EventWorker* p_worker) {
for(EList::compatibility_iterator it = m_eventWorkers.GetFirst(); it ; it = it->GetNext()) {
if (it->GetData() == p_worker) {
//wxLogDebug(wxT("Deleting event worker"));
delete it->GetData();
m_eventWorkers.DeleteNode(it);
return;
}
}
}
void
Client::dumpStatistics() {
wxString msg(
wxString::Format(_("Connecting:\t%d\nSending\t\t%d\nReceiving\t%d\nDisconnecting:\t%d\nDone:\t\t%d\nFailed:\t\t%d\n"),
m_statConnecting,
m_statSending,
m_statReceiving,
m_statDisconnecting,
m_statDone,
m_statFailed
));
wxLogMessage(wxT("Current status:\n%s\n"),msg.c_str());
}
void
Client::OnTimerEvent(wxTimerEvent&) {
dumpStatistics();
}
BEGIN_EVENT_TABLE(Client,wxEvtHandler)
EVT_WORKER(Client::OnWorkerEvent)
EVT_TIMER(wxID_ANY,Client::OnTimerEvent)
END_EVENT_TABLE()
EventWorker::EventWorker(const wxString& p_host, char* p_buf, int p_size)
: m_host(p_host),
m_outbuf(p_buf),
m_outsize(p_size),
m_written(0),
m_readed(0)
{
m_clientSocket = new wxSocketClient(wxSOCKET_NOWAIT);
m_clientSocket->SetEventHandler(*this);
m_insize = m_outsize - 2;
m_inbuf = new char[m_insize];
}
void
EventWorker::Run() {
wxIPV4address ca;
ca.Hostname(m_host);
ca.Service(3000);
m_clientSocket->SetNotify(wxSOCKET_CONNECTION_FLAG|wxSOCKET_LOST_FLAG|wxSOCKET_OUTPUT_FLAG|wxSOCKET_INPUT_FLAG);
m_clientSocket->Notify(true);
m_currentType = WorkerEvent::CONNECTING;
m_doneSent = false;
//wxLogMessage(wxT("EventWorker: Connecting....."));
m_clientSocket->Connect(ca,false);
}
void
EventWorker::OnSocketEvent(wxSocketEvent& pEvent) {
switch(pEvent.GetSocketEvent()) {
case wxSOCKET_INPUT:
//wxLogDebug(wxT("EventWorker: INPUT"));
do {
if (m_readed == m_insize)
return; //event already posted
m_clientSocket->Read(m_inbuf + m_readed, m_insize - m_readed);
if (m_clientSocket->Error())
{
if (m_clientSocket->LastError() != wxSOCKET_WOULDBLOCK)
{
wxLogError(wxT("%s: read error"),CreateIdent(m_localaddr).c_str());
SendEvent(true);
}
}
m_readed += m_clientSocket->LastCount();
//wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(), m_insize - m_readed);
if (m_readed == m_insize)
{
if (!memcmp(m_inbuf,m_outbuf,m_insize)) {
wxLogError(wxT("%s: data mismatch"),CreateIdent(m_localaddr).c_str());
SendEvent(true);
}
m_currentType = WorkerEvent::DISCONNECTING;
wxLogDebug(wxT("%s: DISCONNECTING"),CreateIdent(m_localaddr).c_str());
SendEvent(false);
//wxLogDebug(wxT("EventWorker %p closing"),this);
m_clientSocket->Close();
m_currentType = WorkerEvent::DONE;
wxLogDebug(wxT("%s: DONE"),CreateIdent(m_localaddr).c_str());
SendEvent(false);
}
} while (!m_clientSocket->Error());
break;
case wxSOCKET_OUTPUT:
//wxLogDebug(wxT("EventWorker: OUTPUT"));
do {
if (m_written == m_outsize)
return;
if (m_written == 0)
{
m_currentType = WorkerEvent::SENDING;
wxLogDebug(wxT("%s: SENDING"),CreateIdent(m_localaddr).c_str());
}
m_clientSocket->Write(m_outbuf + m_written, m_outsize - m_written);
if (m_clientSocket->Error())
{
if (m_clientSocket->LastError() != wxSOCKET_WOULDBLOCK) {
wxLogError(wxT("%s: Write error"),CreateIdent(m_localaddr).c_str());
SendEvent(true);
}
}
m_written += m_clientSocket->LastCount();
if (m_written != m_outsize)
{
//wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),m_outsize - m_written);
}
else
{
//wxLogDebug(wxT("EventWorker %p SENDING->RECEIVING"),this);
m_currentType = WorkerEvent::RECEIVING;
wxLogDebug(wxT("%s: RECEIVING"),CreateIdent(m_localaddr).c_str());
SendEvent(false);
}
} while(!m_clientSocket->Error());
break;
case wxSOCKET_CONNECTION:
{
//wxLogMessage(wxT("EventWorker: got connection"));
wxLogMessage(wxT("%s: starting writing message (2 bytes for signature and %d bytes of data to write)"),CreateIdent(m_localaddr).c_str(),m_outsize-2);
if (!m_clientSocket->GetLocal(m_localaddr))
{
wxLogError(_("Cannot get peer data for socket %p"),m_clientSocket);
}
m_currentType = WorkerEvent::SENDING;
wxLogDebug(wxT("%s: CONNECTING"),CreateIdent(m_localaddr).c_str());
SendEvent(false);
}
break;
case wxSOCKET_LOST:
{
wxLogError(_("%s: connection lost"),CreateIdent(m_localaddr).c_str());
SendEvent(true);
}
break;
}
}
void
EventWorker::SendEvent(bool failed) {
if (m_doneSent)
return;
WorkerEvent e(this,m_currentType);
if (failed) e.setFailed();
wxGetApp().AddPendingEvent(e);
m_doneSent = failed || m_currentType == WorkerEvent::DONE;
};
EventWorker::~EventWorker() {
m_clientSocket->Destroy();
delete [] m_outbuf;
delete [] m_inbuf;
}
BEGIN_EVENT_TABLE(EventWorker,wxEvtHandler)
EVT_SOCKET(wxID_ANY,EventWorker::OnSocketEvent)
END_EVENT_TABLE()
ThreadWorker::ThreadWorker(const wxString& p_host, char* p_buf, int p_size)
: wxThread(wxTHREAD_DETACHED),
m_host(p_host),
m_outbuf(p_buf),
m_outsize(p_size)
{
m_clientSocket = new wxSocketClient(wxSOCKET_BLOCK|wxSOCKET_WAITALL);
m_insize = m_outsize - 2;
m_inbuf = new char[m_insize];
}
wxThread::ExitCode ThreadWorker::Entry()
{
wxIPV4address ca;
ca.Hostname(m_host);
ca.Service(5678);
//wxLogDebug(wxT("ThreadWorker: Connecting....."));
m_clientSocket->SetTimeout(60);
bool failed = false;
WorkerEvent::evt_type etype = WorkerEvent::CONNECTING;
if (!m_clientSocket->Connect(ca)) {
wxLogError(wxT("Cannot connect to %s:%d"),ca.IPAddress().c_str(), ca.Service());
failed = true;
} else {
//wxLogMessage(wxT("ThreadWorker: Connected. Sending %d bytes of data"),m_outsize);
etype = WorkerEvent::SENDING;
WorkerEvent e(this,etype);
wxGetApp().AddPendingEvent(e);
int to_process = m_outsize;
do {
m_clientSocket->Write(m_outbuf,m_outsize);
if (m_clientSocket->Error()) {
wxLogError(wxT("ThreadWorker: Write error"));
failed = true;
}
to_process -= m_clientSocket->LastCount();
//wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process);
} while(!m_clientSocket->Error() && to_process != 0);
if (!failed) {
etype = WorkerEvent::RECEIVING;
WorkerEvent e(this,etype);
wxGetApp().AddPendingEvent(e);
to_process = m_insize;
do {
m_clientSocket->Read(m_inbuf,m_insize);
if (m_clientSocket->Error()) {
wxLogError(wxT("ThreadWorker: Read error"));
failed = true;
break;
}
to_process -= m_clientSocket->LastCount();
//wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process);
} while(!m_clientSocket->Error() && to_process != 0);
}
char* outdat = (char*)m_outbuf+2;
if (!failed && (memcmp(m_inbuf,outdat,m_insize) != 0))
{
wxLogError(wxT("Data mismatch"));
failed = true;
}
}
//wxLogDebug(wxT("ThreadWorker: Finished"));
if (!failed) {
etype = WorkerEvent::DISCONNECTING;
WorkerEvent e(this,etype);
wxGetApp().AddPendingEvent(e);
};
m_clientSocket->Close();
m_clientSocket->Destroy();
m_clientSocket = NULL;
delete [] m_outbuf;
delete [] m_inbuf;
if (!failed)
etype = WorkerEvent::DONE;
WorkerEvent e(this,etype);
if (failed) e.setFailed();
wxGetApp().AddPendingEvent(e);
return 0;
}