NSIS-ka
A free C++ implementation of NSIS protocols

Changeset 4452


Ignore:
Timestamp:
Sep 22, 2009, 5:58:10 PM (8 years ago)
Author:
stud-lenk
Message:

Multicast implementation of send_data_dmode() and send_data_cmode() -- first working version.

Sending data in D-mode should work as expected.

In case of data being sent in C-mode to a multicast destination, the data does not get enqueued for later re-transmissions. I consider the overhead of managing a data queue for every multcast peer as to big. Moreover multicast data transmission can only partly be reliable (C-mode) for some multicast inherent reasons.

This code needs some more intense testing.

Location:
ntlp/branches/20090723-multicast/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • ntlp/branches/20090723-multicast/src/ntlp_statemodule_data.cpp

    r4345 r4452  
    148148  r_entry->set_msg_id(msghandle);
    149149 
    150   appladdress* peer= new appladdress;
    151   peer->set_port( gconf.getpar<uint16>(gistconf_udpport) );
    152   peer->set_protocol(param.udp);
    153  
    154   // Abort on multicast request
    155   if (r_entry->is_multicast_QNode())
    156   { // TODO: implement MULTICAST
    157     ERRCLog(param.name, "send_data_dmode() for multicast target is not implemented yet. Skipping.");
    158     return;
    159   }
    160 
    161   peer->set_ip(r_entry->get_peer_nli()->get_if_address());
    162  
    163   SignalingMsgNTLP* sigmsg = new SignalingMsgNTLP();
    164   sigmsg->set_local_addr(r_entry->get_local_src()->copy());
    165 
    166   // Build a DATA Message
    167   sessionid* mysid = new sessionid(*r_key->sid);
    168 
    169   // take eventual payload from apimsg
    170   nslpdata* mydata=NULL;
    171   if (nslp_data)
    172   {
    173     mydata=new nslpdata(*nslp_data);
    174   }
    175 
    176   mri* mymri=r_key->mr->copy();
    177    
    178   nli* own_nli=build_local_nli(mymri->get_ip_version(), r_entry->get_peer_nli()->get_if_address(), r_entry);
    179    
    180   own_nli->set_ip_ttl(r_entry->get_ip_ttl());
    181    
    182 
    183   // construct DATA
    184   data* pdu = new data(mymri, mysid, own_nli, mydata);
    185 
    186   pdu->set_nslpid(r_key->nslpid);
    187   pdu->set_hops(r_entry->get_gist_hop_count());
    188  
    189   // if explicit routing took place (sending for DEAD state)
    190   if (r_entry->get_state() == dead)
    191   { 
    192     pdu->set_E(); 
    193   }
    194 
    195   // set S-flag
    196   pdu->set_S();
    197 
    198   // set IP TTL
    199   DLog(param.name, "Set IP TTL in target address");
    200   peer->set_ip_ttl(own_nli->get_ip_ttl());
    201 
    202   // set hops
    203   pdu->set_hops(1);
    204  
    205  
    206   // send it
    207   sigmsg->set_req(peer, pdu);
    208   sigmsg->send_or_delete();
     150  bool more_peers = false;
     151  hash_map<uint32, multicast_peerstatus>::const_iterator mcast_peer_it = r_entry->get_multicast_peers()->begin();
     152  do {
     153    appladdress* peer= new appladdress;
     154    peer->set_port( gconf.getpar<uint16>(gistconf_udpport) );
     155    peer->set_protocol(param.udp);
     156 
     157    const nli* peer_nli = r_entry->is_multicast_QNode() ? param.rt.findNLI(mcast_peer_it->first) : r_entry->get_peer_nli();
     158
     159    // set destination address
     160    peer->set_ip(peer_nli->get_if_address());
     161
     162    SignalingMsgNTLP* sigmsg = new SignalingMsgNTLP();
     163    sigmsg->set_local_addr(r_entry->get_local_src()->copy());
     164
     165    // Build a DATA Message
     166    sessionid* mysid = new sessionid(*r_key->sid);
     167
     168    // take eventual payload from apimsg
     169    nslpdata* mydata=NULL;
     170    if (nslp_data)
     171    {
     172      mydata=new nslpdata(*nslp_data);
     173    }
     174
     175    mri* mymri=r_key->mr->copy();
     176   
     177    nli* own_nli=build_local_nli(mymri->get_ip_version(), peer_nli->get_if_address(), r_entry);
     178 
     179    own_nli->set_ip_ttl(r_entry->get_ip_ttl());
     180   
     181
     182    // construct DATA
     183    data* pdu = new data(mymri, mysid, own_nli, mydata);
     184
     185    pdu->set_nslpid(r_key->nslpid);
     186    pdu->set_hops(r_entry->get_gist_hop_count());
     187 
     188    // if explicit routing took place (sending for DEAD state)
     189    if (r_entry->get_state() == dead)
     190    { 
     191      pdu->set_E(); 
     192    }
     193
     194    // set S-flag
     195    pdu->set_S();
     196
     197    // set IP TTL
     198    DLog(param.name, "Set IP TTL in target address");
     199    peer->set_ip_ttl(own_nli->get_ip_ttl());
     200
     201    // set hops
     202    pdu->set_hops(1);
     203 
     204 
     205    // send it
     206    sigmsg->set_req(peer, pdu);
     207    sigmsg->send_or_delete();
     208
     209    // iterate through more peers?
     210    if (r_entry->is_multicast_QNode()) {
     211      mcast_peer_it++;
     212      more_peers = (mcast_peer_it != r_entry->get_multicast_peers()->end());
     213    }
     214  } while (more_peers);
    209215} //end send_data_dmode
    210216
     
    228234  }
    229235 
    230   // Abort on multicast request
    231   if (r_entry->is_multicast_QNode())
    232   { // TODO: implement MULTICAST
    233     ERRCLog(param.name, "send_data_dmode() for multicast target is not implemented yet. Skipping.");
    234     return;
    235   }
    236    
    237   appladdress* peer= param.rt.lookup_ma_peeraddress(r_entry->get_peer_nli());
    238   if (peer)
    239   {
    240     // indicate activity to MA maintainance
    241     param.rt.activity_ind_ma(r_entry->get_peer_nli());
    242 
    243     SignalingMsgNTLP* sigmsg = new SignalingMsgNTLP();
    244     sigmsg->set_local_addr(r_entry->get_local_src()->copy());
    245    
    246     // Build a DATA Message
    247     sessionid* mysid = new sessionid(*(r_key->sid));
    248        
    249     // take eventual payload from apimsg
    250     nslpdata* mydata=NULL;
    251     if (payload) mydata=new nslpdata(*(payload));
    252        
    253     mri* mymri=r_key->mr->copy();
    254        
    255     // construct DATA
    256     data* pdu = new data(mymri, mysid, NULL, mydata);
    257        
    258     pdu->set_nslpid(r_key->nslpid);
    259        
    260     // set S-flag
    261     pdu->set_S();
    262 
    263     // set Hops
    264     pdu->set_hops(1);
    265 
    266     // send it
    267     sigmsg->set_req(peer, pdu);
    268     sigmsg->send_or_delete();
    269    
    270   }
    271   else
    272   {
    273     EVLog(param.name, color[green] << "No Messaging Association was found to send -> trying to re-establish" << color[off]);
    274    
    275     if (r_entry->is_responding_node()) {
     236  bool more_peers = false;
     237  hash_map<uint32, multicast_peerstatus>::const_iterator mcast_peer_it = r_entry->get_multicast_peers()->begin();
     238  do {
     239    const nli* peer_nli = r_entry->is_multicast_QNode() ? param.rt.findNLI(mcast_peer_it->first) : r_entry->get_peer_nli();
     240    appladdress* peer= param.rt.lookup_ma_peeraddress(peer_nli);
     241
     242    if (peer)
     243    {
     244      // indicate activity to MA maintainance
     245      param.rt.activity_ind_ma(peer_nli);
     246 
     247      SignalingMsgNTLP* sigmsg = new SignalingMsgNTLP();
     248      sigmsg->set_local_addr(r_entry->get_local_src()->copy());
    276249     
    277       ERRLog(param.name, color[green] << "I am Responder Node, I cannot establish a Messaging Association. Dropping data");
    278     }
    279     else
     250      // Build a DATA Message
     251      sessionid* mysid = new sessionid(*(r_key->sid));
     252       
     253      // take eventual payload from apimsg
     254      nslpdata* mydata=NULL;
     255      if (payload) mydata=new nslpdata(*(payload));
     256       
     257      mri* mymri=r_key->mr->copy();
     258       
     259      // construct DATA
     260      data* pdu = new data(mymri, mysid, NULL, mydata);
     261       
     262      pdu->set_nslpid(r_key->nslpid);
     263       
     264      // set S-flag
     265      pdu->set_S();
     266 
     267      // set Hops
     268      pdu->set_hops(1);
     269 
     270      // send it
     271      sigmsg->set_req(peer, pdu);
     272      sigmsg->send_or_delete();
     273    }
     274    else
    280275    {
    281       ERRLog(param.name, color[green] << "I am Querying Node, I will re-establish Routing State, enqueuing data, will be dropped after " << data_keep_timeout << "sec");
    282      
    283       enqueuedata(r_key, r_entry, payload, data_keep_timeout, msghandle);
    284      
    285       r_entry->set_state(qn_awaiting_response);
    286 
    287       // start NoResponse timer   
    288       starttimer(r_key, r_entry, noresponse, 0, r_entry->reset_retry_timeout());
    289 
    290     }
    291   }
     276      EVLog(param.name, color[green] << "No Messaging Association was found to send -> trying to re-establish" << color[off]);
     277
     278      if (r_entry->is_responding_node()) {
     279        ERRLog(param.name, color[green] << "I am Responder Node, I cannot establish a Messaging Association. Dropping data");
     280      }
     281      else
     282      {
     283        ERRLog(param.name, color[green] << "I am Querying Node, I will re-establish Routing State" << color[off]);
     284        if (r_entry->is_multicast_QNode()) {
     285          ERRLog(param.name, color[red] << "This is a multicast session. This failed data transmission to peer "
     286                                        << *peer_nli << " will not be repeated" << color[off]);
     287        } else {
     288          ERRLog(param.name, color[green] << "enqueuing data, will be dropped after " << data_keep_timeout << "sec");
     289          enqueuedata(r_key, r_entry, payload, data_keep_timeout, msghandle);
     290
     291          r_entry->set_state(qn_awaiting_response);
     292         
     293          // start NoResponse timer   
     294          starttimer(r_key, r_entry, noresponse, 0, r_entry->reset_retry_timeout());
     295        }
     296      }
     297    }
     298
     299    // iterate through more peers?
     300    if (r_entry->is_multicast_QNode()) {
     301      mcast_peer_it++;
     302      more_peers = (mcast_peer_it != r_entry->get_multicast_peers()->end());
     303    }
     304  } while (more_peers);
    292305} //end send_data_cmode
    293306
  • ntlp/branches/20090723-multicast/src/routingentry.h

    r4426 r4452  
    342342        // before being purged from the multicast peer list
    343343        uint32 get_max_response_miss_count() const;
     344
     345        // return a const pointer to the hash_map of multicast peers
     346        const hash_map<uint32, multicast_peerstatus>* get_multicast_peers() const { return &multicast_peer; }
    344347
    345348  /// enqueued DATA PAYLOAD
Note: See TracChangeset for help on using the changeset viewer.