* wxThread fixes

* wxStream fix (Read(wxStreamBase))
* wxEvent: GTK idle loop waking up was actually good, reenabled.
* wxSocket: all features working on Linux/RH6 (including high-level protocols)
       Needs testing on other platforms now.


git-svn-id: https://svn.wxwidgets.org/svn/wx/wxWidgets/trunk@2563 c3d73ce0-8a6f-49c7-b76d-6d57e0e08775
This commit is contained in:
Guilhem Lavaux 1999-05-25 17:14:56 +00:00
parent 4d0f3cd6ac
commit 062c486171
11 changed files with 204 additions and 53 deletions

View File

@ -82,7 +82,11 @@ class SocketWaiter: public wxThread {
int m_fd;
};
class SocketRequester: public wxThread {
class SocketRequester
#if wxUSE_THREADS
: public wxThread
#endif
{
public:
SocketRequester(wxSocketBase *socket, wxSocketInternal *internal);
~SocketRequester();
@ -136,8 +140,8 @@ class wxSocketInternal {
wxMutex m_socket_locker, m_fd_locker, m_request_locker, m_end_requester;
wxCondition m_socket_cond;
SocketWaiter *m_thread_waiter;
SocketRequester *m_thread_requester;
#endif
SocketRequester *m_thread_requester;
wxList m_requests;
int m_fd;
bool m_invalid_requester;

View File

@ -95,6 +95,8 @@ public:
wxSocketBase& Read(char* buffer, size_t nbytes);
wxSocketBase& Write(const char *buffer, size_t nbytes);
wxSocketBase& Unread(const char *buffer, size_t nbytes);
wxSocketBase& ReadMsg(char *buffer, size_t nbytes);
wxSocketBase& WriteMsg(const char *buffer, size_t nbytes);
void Discard();
// Try not to use this two methods (they sould be protected)

View File

@ -3,7 +3,7 @@
* Purpose: wxSocket: client demo
* Author: LAVAUX Guilhem
* Created: June 1997
* Updated:
* CVS ID: $Id$
* Copyright: (c) 1997, LAVAUX Guilhem
*/
@ -163,11 +163,8 @@ void MyFrame::OnExecOpenConnection(wxCommandEvent& WXUNUSED(evt))
if (sock->IsConnected())
sock->Close();
/*
wxString hname = wxGetTextFromUser("Enter the address of the wxSocket Sample Server",
"Connect ...", "localhost");
*/
wxString hname = "localhost";
addr.Hostname(hname);
addr.Service(3000);
sock->SetNotify(0);
@ -282,13 +279,15 @@ void MyFrame::OnExecUrlTest(wxCommandEvent& WXUNUSED(evt))
wxURL url(urlname);
wxInputStream *datas = url.GetInputStream();
if (!datas)
wxMessageBox("Error in getting data from the URL.", "Alert !");
else {
if (!datas) {
wxString error;
error.Printf(_T("Error in getting data from the URL. (error = %d)"), url.GetError());
wxMessageBox(error, "Alert !");
} else {
wxFileOutputStream *str_out = new wxFileOutputStream("test.url");
str_out->Write(*datas);
wxMessageBox("Success !! Click on OK to see the text.", "OK");
wxMessageBox(_T("Success !! Click on OK to see the text."), "OK");
delete datas;
delete str_out;
}

View File

@ -3,7 +3,7 @@
* Purpose: wxSocket: server demo
* Author: LAVAUX Guilhem
* Created: June 1997
* Updated:
* CVS Id: $Id$
* Copyright: (C) 1997, LAVAUX Guilhem
*/
@ -102,6 +102,9 @@ void MyFrame::OnSockRequest(wxSocketEvent& evt)
waiting socket thread, i.e. here we are
not in the main GUI thread and thus we
must not call any GUI function here. */
/* Wrong ! This routine is called by the main GUI thread
because the main GUI thread received a signal from the other
thread using wxEvent::ProcessThreadEvent */
wxSocketBase *sock = evt.Socket();
@ -119,6 +122,7 @@ void MyFrame::OnSockRequest(wxSocketEvent& evt)
case wxSocketBase::EVT_LOST:
printf("Destroying socket\n");
wxPendingDelete.Append(sock);
UpdateStatus(-1);
return;
break;
}
@ -132,17 +136,21 @@ void MyFrame::OnSockRequestServer(wxSocketEvent& evt)
waiting socket thread, i.e. here we are
not in the main GUI thread and thus we
must not call any GUI function here. */
/* Wrong ! This routine is called by the main GUI thread
because the main GUI thread received a signal from the other
thread using wxEvent::ProcessThreadEvent */
wxSocketBase *sock2;
wxSocketServer *server = (wxSocketServer *) evt.Socket();
printf("OnSockRequestServer OK\n");
printf("OnSockRequest (event = %d)\n",evt.SocketEvent());
printf("OnSockRequest (Main = %d) (event = %d)\n",wxThread::IsMain(), evt.SocketEvent());
sock2 = server->Accept();
if (sock2 == NULL)
return;
UpdateStatus(1);
sock2->SetFlags(wxSocketBase::SPEED);
sock2->Notify(TRUE);
sock2->SetEventHandler(*this, SKDEMO_SOCKET);

View File

@ -563,8 +563,8 @@ bool wxEvtHandler::ProcessThreadEvent(wxEvent& event)
wxPendingEventsLocker->Leave();
#ifdef __WXGTK__
// if (g_isIdle)
// wxapp_install_idle_handler();
if (g_isIdle)
wxapp_install_idle_handler();
#endif
return TRUE;

View File

@ -355,6 +355,7 @@ wxInputStream *wxFTP::GetInputStream(const wxString& path)
in_stream->m_ftpsize = wxAtoi(WXSTRINGCAST str_size);
}
sock->SetFlags(WAITALL);
return in_stream;
}
@ -401,13 +402,6 @@ wxList *wxFTP::GetList(const wxString& wildcard)
return NULL;
}
// Ininterresting ?!
/*
sock->SetEventHandler(*GetNextHandler(), m_id);
sock->Notify(m_notifyme);
sock->SetNotify(m_neededreq);
*/
return file_list;
}
#endif

View File

@ -195,8 +195,8 @@ bool wxHTTP::BuildRequest(const wxString& path, wxHTTP_Req req)
}
SaveState();
SetFlags(NONE);
Notify(FALSE);
SetFlags(WAITALL);
sprintf(buf, "%s %s HTTP/1.0\n\r", tmp_buf, (const char*)pathbuf);
Write(buf, strlen(buf));
@ -275,6 +275,7 @@ wxInputStream *wxHTTP::GetInputStream(const wxString& path)
if (!GetHeader(_T("Content-Length")).IsEmpty())
inp_stream->m_httpsize = wxAtoi(WXSTRINGCAST GetHeader(_T("Content-Length")));
SetFlags(WAITALL);
return inp_stream;
}

View File

@ -100,7 +100,9 @@ void SocketWaiter::ProcessReadEvent()
int ret;
char c;
m_internal->AcquireFD();
ret = recv(m_fd, &c, 1, MSG_PEEK);
m_internal->ReleaseFD();
// We are a server => emit a EVT_ACCEPT event.
if (ret == -1 && m_socket->GetType() == wxSocketBase::SOCK_SERVER) {
@ -167,14 +169,12 @@ void *SocketWaiter::Entry()
#endif
#endif
if (ret == 0)
// If nothing happened, we wait for 100 ms.
wxUsleep(10);
// We wait for 100 ms to prevent the CPU from burning.
wxUsleep(100);
// Check whether we should exit.
if (TestDestroy()) {
if (TestDestroy())
return NULL;
}
}
return NULL;
}
@ -185,7 +185,10 @@ void *SocketWaiter::Entry()
SocketRequester::SocketRequester(wxSocketBase *socket,
wxSocketInternal *internal)
: wxThread(),
:
#if wxUSE_THREADS
wxThread(),
#endif
m_socket(socket), m_internal(internal), m_fd(internal->GetFD())
{
}
@ -204,12 +207,12 @@ bool SocketRequester::WaitFor(wxSocketBase::wxRequestNotify req, int millisec)
tv.tv_sec = millisec / 1000;
tv.tv_usec = (millisec % 1000) * 1000;
if ((req & READ_MASK) != 0)
FD_ZERO(&sockrd_set);
FD_ZERO(&sockrd_set);
FD_ZERO(&sockwr_set);
FD_SET(m_fd, &sockrd_set);
FD_SET(m_fd, &sockwr_set);
if ((req & READ_MASK) != 0)
FD_SET(m_fd, &sockrd_set);
if ((req & WRITE_MASK) != 0)
FD_SET(m_fd, &sockwr_set);
m_internal->AcquireFD();
ret = select(m_fd+1, &sockrd_set, &sockwr_set, NULL, &tv);
@ -247,6 +250,9 @@ void SocketRequester::ProcessReadEvent(SockRequest *req)
req->size -= len;
req->io_nbytes += len;
req->buffer += len;
if (len == 0)
m_internal->EndRequest(req);
return;
}
// The End.
@ -337,21 +343,17 @@ void *SocketRequester::Entry()
wxSocketInternal::wxSocketInternal(wxSocketBase *socket)
{
m_socket = socket;
#if wxUSE_THREADS
m_thread_requester = NULL;
m_thread_waiter = NULL;
m_invalid_requester = TRUE;
#endif
}
wxSocketInternal::~wxSocketInternal()
{
#if wxUSE_THREADS
StopRequester();
wxASSERT(m_thread_requester == NULL);
StopWaiter();
wxASSERT(m_thread_waiter == NULL);
#endif
}
// ----------------------------------------------------------------------
@ -363,10 +365,12 @@ SockRequest *wxSocketInternal::WaitForReq()
#if wxUSE_THREADS
wxNode *node;
// First try.
node = m_requests.First();
if (node == NULL) {
m_socket_cond.Wait(m_request_locker, 10, 0);
// Second try, if it is unsuccessul we give up.
node = m_requests.First();
if (node == NULL)
return NULL;
@ -425,14 +429,17 @@ void wxSocketInternal::ResumeRequester()
#if wxUSE_THREADS
wxThreadError err;
wxASSERT(m_thread_requester == NULL || m_invalid_requester);
wxASSERT(m_invalid_requester);
m_end_requester.Lock();
if (m_invalid_requester) {
if (m_thread_requester != NULL)
delete m_thread_requester;
m_invalid_requester = FALSE;
if (m_thread_requester != NULL) {
m_thread_requester->Delete(); // We must join it.
delete m_thread_requester;
}
m_invalid_requester = FALSE;
m_end_requester.Unlock();
m_thread_requester = new SocketRequester(m_socket, this);
@ -442,7 +449,11 @@ void wxSocketInternal::ResumeRequester()
err = m_thread_requester->Run();
wxASSERT(err == wxTHREAD_NO_ERROR);
#else
if (!m_invalid_requester)
return;
m_thread_requester = new SocketRequester(m_socket, this);
m_invalid_requester = FALSE;
#endif
}
@ -453,6 +464,7 @@ void wxSocketInternal::StopRequester()
if (m_invalid_requester) {
m_end_requester.Unlock();
if (m_thread_requester) {
m_thread_requester->Delete();
delete m_thread_requester;
m_thread_requester = NULL;
}
@ -475,6 +487,11 @@ void wxSocketInternal::StopRequester()
delete m_thread_requester;
m_thread_requester = NULL;
m_invalid_requester = TRUE;
#else
delete m_thread_requester;
m_thread_requester = NULL;
m_invalid_requester = TRUE;
#endif
}
@ -488,6 +505,8 @@ void wxSocketInternal::ResumeWaiter()
m_thread_waiter = new SocketWaiter(m_socket, this);
m_thread_waiter->SetPriority(WXTHREAD_MIN_PRIORITY);
err = m_thread_waiter->Create();
wxASSERT(err == wxTHREAD_NO_ERROR);
@ -514,11 +533,10 @@ void wxSocketInternal::StopWaiter()
// ----------------------------------------------------------------------
void wxSocketInternal::QueueRequest(SockRequest *request, bool async)
{
#if wxUSE_THREADS
if (m_invalid_requester)
ResumeRequester();
async = FALSE;
#if wxUSE_THREADS
if (async) {
m_request_locker.Lock();
@ -541,6 +559,7 @@ void wxSocketInternal::QueueRequest(SockRequest *request, bool async)
}
} else {
m_request_locker.Lock();
#endif
if ((request->type & wxSocketBase::REQ_WAIT) != 0) {
m_thread_requester->ProcessWaitEvent(request);
@ -559,6 +578,7 @@ void wxSocketInternal::QueueRequest(SockRequest *request, bool async)
}
}
request->done = TRUE;
#if wxUSE_THREADS
m_request_locker.Unlock();
}
#endif

View File

@ -260,6 +260,65 @@ wxSocketBase& wxSocketBase::Read(char* buffer, size_t nbytes)
return *this;
}
wxSocketBase& wxSocketBase::ReadMsg(char* buffer, size_t nbytes)
{
unsigned long len, len2, sig;
struct {
char sig[4];
char len[4];
} msg;
// sig should be an explicit 32-bit unsigned integer; I've seen
// compilers in which size_t was actually a 16-bit unsigned integer
Read((char *)&msg, sizeof(msg));
if (m_lcount != sizeof(msg))
return *this;
sig = msg.sig[0] & 0xff;
sig |= (size_t)(msg.sig[1] & 0xff) << 8;
sig |= (size_t)(msg.sig[2] & 0xff) << 16;
sig |= (size_t)(msg.sig[3] & 0xff) << 24;
if (sig != 0xfeeddead)
return *this;
len = msg.len[0] & 0xff;
len |= (size_t)(msg.len[1] & 0xff) << 8;
len |= (size_t)(msg.len[2] & 0xff) << 16;
len |= (size_t)(msg.len[3] & 0xff) << 24;
// len2 is incorrectly computed in the original; this sequence is
// the fix
if (len > nbytes) {
len2 = len - nbytes;
len = nbytes;
}
else
len2 = 0;
// the "len &&" in the following statement is necessary so that
// we don't attempt to read (and possibly hang the system)
// if the message was zero bytes long
if (len && Read(buffer, len).LastCount() != len)
return *this;
if (len2 && (Read(NULL, len2).LastCount() != len2))
return *this;
if (Read((char *)&msg, sizeof(msg)).LastCount() != sizeof(msg))
return *this;
sig = msg.sig[0] & 0xff;
sig |= (size_t)(msg.sig[1] & 0xff) << 8;
sig |= (size_t)(msg.sig[2] & 0xff) << 16;
sig |= (size_t)(msg.sig[3] & 0xff) << 24;
// ERROR
// we return *this either way, so a smart optimizer will
// optimize the following sequence out; I'm leaving it in anyway
if (sig != 0xdeadfeed)
return *this;
return *this;
}
wxSocketBase& wxSocketBase::Peek(char* buffer, size_t nbytes)
{
m_lcount = GetPushback(buffer, nbytes, TRUE);
@ -282,9 +341,54 @@ wxSocketBase& wxSocketBase::Write(const char *buffer, size_t nbytes)
return *this;
}
wxSocketBase& wxSocketBase::WriteMsg(const char *buffer, size_t nbytes)
{
struct {
char sig[4];
char len[4];
} msg;
// warning about 'cast truncates constant value'
#ifdef __VISUALC__
#pragma warning(disable: 4310)
#endif // __VISUALC__
msg.sig[0] = (char) 0xad;
msg.sig[1] = (char) 0xde;
msg.sig[2] = (char) 0xed;
msg.sig[3] = (char) 0xfe;
msg.len[0] = (char) nbytes & 0xff;
msg.len[1] = (char) (nbytes >> 8) & 0xff;
msg.len[2] = (char) (nbytes >> 16) & 0xff;
msg.len[3] = (char) (nbytes >> 24) & 0xff;
if (Write((char *)&msg, sizeof(msg)).LastCount() < sizeof(msg))
return *this;
if (Write(buffer, nbytes).LastCount() < nbytes)
return *this;
msg.sig[0] = (char) 0xed;
msg.sig[1] = (char) 0xfe;
msg.sig[2] = (char) 0xad;
msg.sig[3] = (char) 0xde;
msg.len[0] = msg.len[1] = msg.len[2] = msg.len[3] = (char) 0;
Write((char *)&msg, sizeof(msg));
return *this;
#ifdef __VISUALC__
#pragma warning(default: 4310)
#endif // __VISUALC__
}
wxSocketBase& wxSocketBase::Unread(const char *buffer, size_t nbytes)
{
CreatePushbackAfter(buffer, nbytes);
m_lcount = 0;
if (nbytes != 0) {
CreatePushbackAfter(buffer, nbytes);
m_lcount = nbytes;
}
return *this;
}
@ -304,7 +408,7 @@ bool wxSocketBase::IsData() const
tv.tv_usec = 0;
FD_ZERO(&sock_set);
FD_SET(m_fd, &sock_set);
select(FD_SETSIZE, &sock_set, NULL, NULL, &tv);
select(m_fd+1, &sock_set, NULL, NULL, &tv);
m_internal->ReleaseFD();
@ -424,7 +528,9 @@ void wxSocketBase::RestoreState()
state = (SocketState *)node->Data();
SetFlags(state->socket_flags);
m_internal->AcquireData();
m_neededreq = state->evt_notify_state;
m_internal->ReleaseData();
m_cbk = state->c_callback;
m_cdata = state->c_callback_data;
Notify(state->notify_state);
@ -682,7 +788,6 @@ void wxSocketBase::WantBuffer(char *buffer, size_t nbytes,
SockRequest *buf = new SockRequest;
SaveState();
m_internal->StopWaiter();
buf->buffer = buffer;
buf->size = nbytes;
buf->done = FALSE;

View File

@ -340,7 +340,7 @@ size_t wxStreamBuffer::Read(wxStreamBuffer *s_buf)
if (m_mode == write)
return 0;
while (bytes_read == BUF_TEMP_SIZE) {
while (bytes_read != 0) {
bytes_read = Read(buf, bytes_read);
bytes_read = s_buf->Write(buf, bytes_read);
s += bytes_read;

View File

@ -271,6 +271,8 @@ private:
// state
// 2. The Delete() function blocks until the condition is signaled when the
// thread exits.
// GL: On Linux, this may fail because we can have a deadlock in either
// SignalExit() or Wait(): so we add m_end_mutex for the finalization.
wxMutex m_mutex, m_end_mutex;
wxCondition m_cond;
@ -299,7 +301,7 @@ void *wxThreadInternal::PthreadStart(void *ptr)
}
#if HAVE_THREAD_CLEANUP_FUNCTIONS
// Install the cleanup handler.
// pthread_cleanup_push(wxThreadInternal::PthreadCleanup, ptr);
pthread_cleanup_push(wxThreadInternal::PthreadCleanup, ptr);
#endif
// wait for the condition to be signaled from Run()
@ -311,7 +313,7 @@ void *wxThreadInternal::PthreadStart(void *ptr)
status = thread->Entry();
#if HAVE_THREAD_CLEANUP_FUNCTIONS
// pthread_cleanup_pop(FALSE);
pthread_cleanup_pop(FALSE);
#endif
// terminate the thread
@ -371,8 +373,16 @@ wxThreadInternal::~wxThreadInternal()
// note that m_mutex will be unlocked by the thread which waits for our
// termination
// m_end_mutex can be unlocked here.
m_end_mutex.Unlock();
// In the case, we didn't start the thread, all these mutex are locked:
// we must unlock them.
if (m_mutex.IsLocked())
m_mutex.Unlock();
if (m_end_mutex.IsLocked())
m_end_mutex.Unlock();
if (m_mutexSuspend.IsLocked())
m_mutexSuspend.Unlock();
}
wxThreadError wxThreadInternal::Run()
@ -748,6 +758,14 @@ bool wxThread::TestDestroy()
wxThread::~wxThread()
{
m_critsect.Enter();
if (p_internal->GetState() != STATE_EXITED &&
p_internal->GetState() != STATE_NEW)
wxLogDebug(_T("The thread is being destroyed althought it is still running ! The application may crash."));
m_critsect.Leave();
delete p_internal;
// remove this thread from the global array
gs_allThreads.Remove(this);
}