Onega

a lot of VC++ posts, a few C# posts, and some miscellaneous stuff

Tuesday, November 15, 2005

ACE multicast example with thread

//Server

#include "stdafx.h"
#include "ace_auto_link.h"
#define WIN32_LEAN_AND_MEAN        // Exclude rarely-used stuff from Windows headers

#include <stdio.h>
#include <tchar.h>
#include "ace/OS.h"
#include "ace/SOCK_Dgram.h"
#include "ace/INET_Addr.h"
#include "ace/log_msg.h"
#include "ace/SOCK_Dgram_Mcast.h"
#include "ace/get_opt.h"
#include "process.h"

#define ARRAY_SIZE(X) sizeof(X)/sizeof(X[0]) 
#ifndef __WFUNCTION__

    #define WIDEN2(x) L ## x 
    #define WIDEN(x) WIDEN2(x) 
    #define __WFILE__ WIDEN(__FILE__) 
    #define __WFUNCTION__ WIDEN(__FUNCTION__) 
    #ifdef _UNICODE 
        #define __TFILE__ __WFILE__ 
        #define __TFUNCTION__ __WFUNCTION__ 
    #else 
        #define __TFILE__ __FILE__ 
        #define __TFUNCTION__ __FUNCTION__ 
    #endif 
#endif //__WFUNCTION__


#define __STR2__(x) #x 
#define __STR1__(x) __STR2__(x) 
//#define __LOC__ __FILE__ "("__STR1__(__LINE__)") : Warning Msg: " 
#define __LOC2__ __FILE__ "("__STR1__(__LINE__)") " 
#define __FLOC__ __TFILE__ "("__STR1__(__LINE__)"): " 
#define __MLOC__ __TFUNCTION__ "("__STR1__(__LINE__)"): " 
const int PATH_LEN = 1024+MAX_PATH;

#define DEFAULT_MULTICAST_ADDR ACE_TEXT("224.9.9.2")
//#define TIMEOUT 5



class BaseThread

{
// To create a thread, inherit from BaseThread and implement run()
// To start thread, call start_thread()/start_threadex()
// Do not close the handle returned by start_threadex()/get_handle() in inherited class

// To stop thread, call stop(), the thread is actually stopped when is_stopped() return ture

public:
    BaseThread():m_working(true),m_stopped(false),m_return_code(0),m_thread_id(0)
        ,m_thread_handle(NULL)
    {}
    ~BaseThread()
    {
        if(m_thread_handle)
            CloseHandle(get_handle());
        m_thread_handle = NULL;
    }
    inline void start_thread()
    {
        start_threadex();
    }
    inline uintptr_t start_threadex()
    {
        return m_thread_handle = _beginthreadex( NULL, 0, &ExThreadProc, this, 0, &m_thread_id );
    }
    virtual long run() =0;
    inline void stop()
    {// tell the thread to stop

        m_working = false;
    }
    inline bool is_stopped()
    {// if the thread is actually stopped

        return m_stopped;
    }
    inline long get_return_code()
    {// the value returned by run()
        return m_return_code;
    }
    inline HANDLE get_handle()
    {
        return reinterpret_cast<HANDLE>(m_thread_handle); 
    }
    inline bool is_working()
    {// run() shall check this value

        return m_working;
    }
private:
    static void __cdecl BasicThreadProc( void* pArguments )
    {//http://fruitfruit.blogspot.com/2005_05_01_fruitfruit_archive.html

        ExThreadProc(pArguments);
    } 
    static unsigned __stdcall ExThreadProc( void* pArguments )
    {//http://fruitfruit.blogspot.com/2005_05_01_fruitfruit_archive.html

        BaseThread* pb = reinterpret_cast<BaseThread*>(pArguments);
        pb->m_return_code = pb->run();
        pb->stop();
        pb->m_stopped = true;
        _endthreadex( 0 );
        return 0;
    }
    volatile bool m_stopped;
    long m_return_code;
    uintptr_t m_thread_handle;
    unsigned m_thread_id;
    volatile bool m_working;
};

//The following class is used to receive multicast messages from

//any sender.
class Receiver_Multicast:public BaseThread

{
public:
    Receiver_Multicast(int port):
      m_multicast_addr(port,DEFAULT_MULTICAST_ADDR),m_remote_addr((u_short)0)
          ,recv_timeout(1)
      {
          // Subscribe to multicast address.
          if (mcast_dgram_.subscribe (m_multicast_addr) == -1)
          {
              //ACE_DEBUG((LM_DEBUG,ACE_TEXT("Error in subscribing to Multicast address \n")));
              exit(-1);
          }
      }
      ~Receiver_Multicast()
      {
          mcast_dgram_.close();
          //if(mcast_dgram_.unsubscribe()==-1)
             // ACE_DEBUG((LM_ERROR,"Error in unsubscribing from Mcast group\n"));
      }
      //Receive data from someone who is sending data on the multicast group

      //address. To do so it must use the multicast datagram component

      //mcast_dgram_.
      int recv_multicast()
      {
          //get ready to receive data from the sender.
          ACE_Time_Value ace_recv_timeout (recv_timeout);
          if(mcast_dgram_.recv (&mcast_info,sizeof (mcast_info),m_remote_addr,0,&ace_recv_timeout)==-1)
          {
              ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) %s end, received NOTHING*****\n"),__TFUNCTION__));
              return -1;
          }
          else

          {
              ACE_UINT32 ip = m_remote_addr.get_ip_address();
              TCHAR ipbuf[64] = {0};
              _stprintf( ipbuf, _T("%d.%d.%d.%d"), (ip>>24)&255, (ip>>16)&255, (ip>>8)&255, ip&255 ); 
              ACE_TCHAR host_name_buf[64] = {0};
              m_remote_addr.get_host_name(host_name_buf, ARRAY_SIZE(host_name_buf));
              ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) Received multicast from {%s, %s}:%d.\n"),
                  host_name_buf, ipbuf, m_remote_addr.get_port_number()));
              ACE_DEBUG((LM_DEBUG,ACE_TEXT("Successfully received %d\n"), mcast_info));
              ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) %s end\n"),__TFUNCTION__));
              return 0;
          }
      }
    long run( )
    {
        while( is_working())
        {
            if(m_multicast_addr.get_port_number()>0)
            {
                recv_multicast();
            }
            Sleep(1000);
        }
        ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) %s end\n"),__TFUNCTION__));
        return 0;
    } 
    void set_recv_timeout(int v)
    {
        if(v>0)
        recv_timeout = v;
    }
private:
    ACE_INET_Addr m_multicast_addr;
    ACE_INET_Addr m_remote_addr;
    ACE_SOCK_Dgram_Mcast mcast_dgram_;
    int mcast_info;
    int recv_timeout;
};

class Sender_Multicast:public BaseThread

{
public:
    Sender_Multicast(int port):
      local_addr_((u_short)0),dgram_(local_addr_),
          multicast_addr_(port,DEFAULT_MULTICAST_ADDR)
      {
      }
      //Method which uses a simple datagram component to send data to the //multicast group.
      int send_to_multicast_group()
      {
          //Convert the information we wish to send into network byte order

          mcast_info= htons (1000);
          // Send multicast

          if(dgram_.send (&mcast_info, sizeof (mcast_info), multicast_addr_)==-1)
          {
              ACE_DEBUG((LM_ERROR,ACE_TEXT("Send to Multicast group failed \n")));
              return -1;
          }
          ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT("%s: Sent multicast to group. Number sent is %d.\n"),
              __TFUNCTION__,
              mcast_info));
          return 0;
      }
    long run(  )
    {
        while( this->is_working())
        {
            if(this->multicast_addr_.get_port_number()>0)
            {
                this->send_to_multicast_group();
            }
            Sleep(7000);
        }
        ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) %s end\n"),__TFUNCTION__));
        return 0;
    } 
private:
    ACE_INET_Addr multicast_addr_;
    ACE_INET_Addr local_addr_;
    ACE_SOCK_Dgram dgram_;
    int mcast_info;
};


int ACE_TMAIN(int argc, TCHAR* argv[])
{
    ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) %s starts\n"),__TFUNCTION__));
    Sender_Multicast sender_thread(2000);
    Receiver_Multicast recv_thread(2000);
    sender_thread.start_thread();
    recv_thread.start_thread();
    getchar();
    sender_thread.stop();
    recv_thread.stop();
    while( sender_thread.is_stopped() == false 
        || recv_thread.is_stopped() == false)
    {
        Sleep(1);
    }
    ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) %s end\n"),__TFUNCTION__));
    return 0;
}

0 Comments:

Post a Comment

<< Home