NSIS-ka
A free C++ implementation of NSIS protocols

Changeset 4491


Ignore:
Timestamp:
Sep 25, 2009, 4:02:08 PM (8 years ago)
Author:
stud-lenk
Message:

First allmost completely working multicast NTLP implementation!

The changes in detail:

  • Extend send_data_dmode() and send_data_cmode() by optional parameter "peer_nli". If this parameter is specified the data is send to this peer. Otherwise the peer_nli from the given routingentry is used (unicast only). This makes most of the changes from r4452 obsolete, so reverted them.
  • Rework tg_send_message() to make use of this nli. If it is called on a Querying Node targeting a multicast flow, tg_send_message() now iterates through the list of all multicast peers and sends the message to each listed peer.
Location:
ntlp/branches/20090723-multicast/src
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • ntlp/branches/20090723-multicast/src/ntlp_statemodule.h

    r4485 r4491  
    226226
    227227  /// send GIST Data in D-Mode
    228   void send_data_dmode(nslpdata* nslp_data, routingkey* r_key, routingentry* r_entry, msghandle_t msghandle);
     228  void send_data_dmode(nslpdata* nslp_data, routingkey* r_key, routingentry* r_entry, msghandle_t msghandle, const nli* peer_nli = NULL);
    229229
    230230  /// send GIST Data in C-Mode
    231   void send_data_cmode(nslpdata* nslp_data, routingkey* r_key, routingentry* r_entry, msghandle_t msghandle);
     231  void send_data_cmode(nslpdata* nslp_data, routingkey* r_key, routingentry* r_entry, msghandle_t msghandle, const nli* peer_nli = NULL);
    232232
    233233  /// process timers
  • ntlp/branches/20090723-multicast/src/ntlp_statemodule_api.cpp

    r4485 r4491  
    629629      case qn_established:
    630630        {
     631          mcast_peer_list_t::const_iterator mcast_peer_it = r_entry->get_multicast_peers()->begin();
     632          const nli* peer_nli = r_entry->is_multicast_QNode() ? &mcast_peer_it->first : r_entry->get_peer_nli();
     633          assert(peer_nli);
     634          while (peer_nli) {
    631635          // if it is set up in D-Mode and MA re-use not requested
    632           // TODO MULTICAST - this DOES NOT work for C-/D-mode-mixed peers:
    633           if (r_entry->is_dmode() && (r_entry->is_ma_reuse_requested() == false))
     636          if (r_entry->is_dmode() && (r_entry->is_ma_reuse_requested(peer_nli) == false))
    634637          {
    635638            // if transfer type is dmode -> OK, send in D-MODE
     
    637640            {
    638641              DLog(param.name, "Routing State set up in D-Mode, this suits, we send data Message in D-Mode");
    639               send_data_dmode(apimsg->get_data(), r_key, r_entry, apimsg->get_nslpid());
     642              send_data_dmode(apimsg->get_data(), r_key, r_entry, apimsg->get_nslpid(), peer_nli);
    640643            }
    641644                   
     
    672675            {
    673676              DLog(param.name, "Routing State set up in C-Mode, we use it however it was not required.");
    674               send_data_cmode(apimsg->get_data(), r_key, r_entry, apimsg->get_nslpmsghandle());
     677              send_data_cmode(apimsg->get_data(), r_key, r_entry, apimsg->get_nslpmsghandle(), peer_nli);
    675678            }
    676679                   
     
    679682            {
    680683              DLog(param.name, "Routing State set up in C-Mode, we use it as it was required.");
    681               send_data_cmode(apimsg->get_data(), r_key, r_entry, apimsg->get_nslpmsghandle());
     684              send_data_cmode(apimsg->get_data(), r_key, r_entry, apimsg->get_nslpmsghandle(), peer_nli);
    682685            }
    683686                   
     
    686689            {
    687690              DLog(param.name, "Routing State set up in C-Mode with security, we use it.");
    688               send_data_cmode(apimsg->get_data(), r_key, r_entry, apimsg->get_nslpmsghandle());
     691              send_data_cmode(apimsg->get_data(), r_key, r_entry, apimsg->get_nslpmsghandle(), peer_nli);
    689692            }
    690693                   
     
    710713            } // endif set up cmode without security and transfer type is cmode_sec
    711714          } // endif C-Mode or MA re-use desired
     715          if (r_entry->is_multicast_QNode()) {
     716            // iterate to next NLI
     717            mcast_peer_it++;
     718            peer_nli = mcast_peer_it == r_entry->get_multicast_peers()->end() ? NULL : &mcast_peer_it->first;
     719          } else { // terminate while-loop in unicast case
     720            peer_nli = NULL;
     721          }
     722          } // while peer_nli
    712723          break;
    713724        } // end case
  • ntlp/branches/20090723-multicast/src/ntlp_statemodule_data.cpp

    r4485 r4491  
    135135 * @param r_entry -- the routing entry itself
    136136 * @param msghandle -- the NSLP MsgHandle, given here for eventual error messages
    137  */
    138 void
    139 Statemodule::send_data_dmode(nslpdata*  nslp_data, routingkey* r_key, routingentry* r_entry,  msghandle_t msghandle)
     137 * @param peer_nli -- the target peer's NLI. When NULL the NLI saved in the routingentry is used (unicast only).
     138 */
     139void
     140Statemodule::send_data_dmode(nslpdata*  nslp_data, routingkey* r_key, routingentry* r_entry,  msghandle_t msghandle, const nli* peer_nli)
    140141{
    141142  if (r_entry->get_state()==qn_established || r_entry->get_state()==qn_awaiting_refresh)
     
    148149  r_entry->set_msg_id(msghandle);
    149150 
    150   bool more_peers = false;
    151   mcast_peer_list_t::const_iterator mcast_peer_it = r_entry->get_multicast_peers()->begin();
    152   do {
    153     appladdress* peer= new appladdress;
    154     peer->set_port( gconf.getpar<uint16>(gistconf_udpport) );
    155     peer->set_protocol(param.udp);
    156  
    157     const nli* peer_nli = r_entry->is_multicast_QNode() ? &mcast_peer_it->first : r_entry->get_peer_nli();
    158     assert(peer_nli);
    159 
    160     // check for MA reuse
    161     if (r_entry->is_ma_reuse_requested(peer_nli)) {
    162       ERRCLog(param.name, "Peer " << peer_nli->get_if_address() << " << requested MA reuse. We can't handle that right now -- Skip sending data to this peer.");
    163       mcast_peer_it++;
    164       more_peers = (mcast_peer_it != r_entry->get_multicast_peers()->end());
    165       continue;
    166     }
    167 
    168     // set destination address
    169     peer->set_ip(peer_nli->get_if_address());
    170 
    171     SignalingMsgNTLP* sigmsg = new SignalingMsgNTLP();
    172     sigmsg->set_local_addr(r_entry->get_local_src()->copy());
    173 
    174     // Build a DATA Message
    175     sessionid* mysid = new sessionid(*r_key->sid);
    176 
    177     // take eventual payload from apimsg
    178     nslpdata* mydata=NULL;
    179     if (nslp_data)
    180     {
    181       mydata=new nslpdata(*nslp_data);
    182     }
    183 
    184     mri* mymri=r_key->mr->copy();
    185    
    186     nli* own_nli=build_local_nli(mymri->get_ip_version(), peer_nli->get_if_address(), r_entry);
    187  
    188     own_nli->set_ip_ttl(r_entry->get_ip_ttl());
    189    
    190 
    191     // construct DATA
    192     data* pdu = new data(mymri, mysid, own_nli, mydata);
    193 
    194     pdu->set_nslpid(r_key->nslpid);
    195     pdu->set_hops(r_entry->get_gist_hop_count());
    196  
    197     // if explicit routing took place (sending for DEAD state)
    198     if (r_entry->get_state() == dead)
    199     { 
    200       pdu->set_E(); 
    201     }
    202 
    203     // set S-flag
    204     pdu->set_S();
    205 
    206     // set IP TTL
    207     DLog(param.name, "Set IP TTL in target address");
    208     peer->set_ip_ttl(own_nli->get_ip_ttl());
    209 
    210     // set hops
    211     pdu->set_hops(1);
    212  
    213  
    214     // send it
    215     sigmsg->set_req(peer, pdu);
    216     sigmsg->send_or_delete();
    217 
    218     // iterate through more peers?
    219     if (r_entry->is_multicast_QNode()) {
    220       mcast_peer_it++;
    221       more_peers = (mcast_peer_it != r_entry->get_multicast_peers()->end());
    222     }
    223   } while (more_peers);
     151  appladdress* peer= new appladdress;
     152  peer->set_port( gconf.getpar<uint16>(gistconf_udpport) );
     153  peer->set_protocol(param.udp);
     154 
     155  if (peer_nli == NULL) {
     156    if (r_entry->is_multicast_QNode())
     157    { // Abort on multicast request without NLI
     158      ERRCLog(param.name, color[red] << "send_data_dmode() called for multicast target, but no target NLI given. Skipping." << color[off]);
     159      return;
     160    }
     161    // otherwise use peer_nli from routing entry
     162    peer_nli = r_entry->get_peer_nli();
     163  }
     164  assert(peer_nli);
     165
     166  // check for MA reuse
     167  if (r_entry->is_ma_reuse_requested(peer_nli)) {
     168    ERRCLog(param.name, color[red] << "send_data_dmode(): ERROR: Peer " << peer_nli->get_if_address()
     169                                   << " requested MA reuse. This peer should have been handled by send_data_cmode()");
     170    return;
     171  }
     172
     173  peer->set_ip(peer_nli->get_if_address());
     174 
     175  SignalingMsgNTLP* sigmsg = new SignalingMsgNTLP();
     176  sigmsg->set_local_addr(r_entry->get_local_src()->copy());
     177
     178  // Build a DATA Message
     179  sessionid* mysid = new sessionid(*r_key->sid);
     180
     181  // take eventual payload from apimsg
     182  nslpdata* mydata=NULL;
     183  if (nslp_data)
     184  {
     185    mydata=new nslpdata(*nslp_data);
     186  }
     187
     188  mri* mymri=r_key->mr->copy();
     189   
     190  nli* own_nli=build_local_nli(mymri->get_ip_version(), peer_nli->get_if_address(), r_entry);
     191   
     192  own_nli->set_ip_ttl(r_entry->get_ip_ttl());
     193   
     194
     195  // construct DATA
     196  data* pdu = new data(mymri, mysid, own_nli, mydata);
     197
     198  pdu->set_nslpid(r_key->nslpid);
     199  pdu->set_hops(r_entry->get_gist_hop_count());
     200 
     201  // if explicit routing took place (sending for DEAD state)
     202  if (r_entry->get_state() == dead)
     203  { 
     204    pdu->set_E(); 
     205  }
     206
     207  // set S-flag
     208  pdu->set_S();
     209
     210  // set IP TTL
     211  DLog(param.name, "Set IP TTL in target address");
     212  peer->set_ip_ttl(own_nli->get_ip_ttl());
     213
     214  // set hops
     215  pdu->set_hops(1);
     216 
     217 
     218  // send it
     219  sigmsg->set_req(peer, pdu);
     220  sigmsg->send_or_delete();
    224221} //end send_data_dmode
    225222
     
    232229 * @param r_entry -- the routing entry itself
    233230 * @param msghandle -- the NSLP MsgHandle, given here for eventual error messages
    234  */
    235 void
    236 Statemodule::send_data_cmode(nslpdata* payload, routingkey* r_key, routingentry* r_entry, msghandle_t msghandle)
     231 * @param peer_nli -- the target peer's NLI. When NULL the NLI saved in the routingentry is used (unicast only).
     232 */
     233void
     234Statemodule::send_data_cmode(nslpdata* payload, routingkey* r_key, routingentry* r_entry, msghandle_t msghandle, const nli* peer_nli)
    237235{
    238236
     
    243241  }
    244242 
    245   bool more_peers = false;
    246   mcast_peer_list_t::const_iterator mcast_peer_it = r_entry->get_multicast_peers()->begin();
    247   do {
    248     const nli* peer_nli = r_entry->is_multicast_QNode() ? &mcast_peer_it->first : r_entry->get_peer_nli();
    249     assert(peer_nli);
    250     appladdress* peer= param.rt.lookup_ma_peeraddress(peer_nli);
    251 
    252     if (peer)
     243  if (peer_nli == NULL)
     244  {
     245    if (r_entry->is_multicast_QNode())
     246    { // Abort on multicast request
     247      ERRCLog(param.name, color[red] << "send_data_cmode() called for multicast target, but no target NLI given. Skipping." << color[off]);
     248      return;
     249    }
     250    // otherwise use peer_nli from routing entry
     251    peer_nli = r_entry->get_peer_nli();
     252  }
     253  assert(peer_nli);
     254   
     255  appladdress* peer= param.rt.lookup_ma_peeraddress(peer_nli);
     256  if (peer)
     257  {
     258    // indicate activity to MA maintainance
     259    param.rt.activity_ind_ma(peer_nli);
     260
     261    SignalingMsgNTLP* sigmsg = new SignalingMsgNTLP();
     262    sigmsg->set_local_addr(r_entry->get_local_src()->copy());
     263   
     264    // Build a DATA Message
     265    sessionid* mysid = new sessionid(*(r_key->sid));
     266       
     267    // take eventual payload from apimsg
     268    nslpdata* mydata=NULL;
     269    if (payload) mydata=new nslpdata(*(payload));
     270       
     271    mri* mymri=r_key->mr->copy();
     272       
     273    // construct DATA
     274    data* pdu = new data(mymri, mysid, NULL, mydata);
     275       
     276    pdu->set_nslpid(r_key->nslpid);
     277       
     278    // set S-flag
     279    pdu->set_S();
     280
     281    // set Hops
     282    pdu->set_hops(1);
     283
     284    // send it
     285    sigmsg->set_req(peer, pdu);
     286    sigmsg->send_or_delete();
     287   
     288  }
     289  else
     290  {
     291    EVLog(param.name, color[green] << "No Messaging Association was found to send -> trying to re-establish" << color[off]);
     292   
     293    if (r_entry->is_responding_node()) {
     294     
     295      ERRLog(param.name, color[green] << "I am Responder Node, I cannot establish a Messaging Association. Dropping data");
     296    }
     297    else
    253298    {
    254       // indicate activity to MA maintainance
    255       param.rt.activity_ind_ma(peer_nli);
    256  
    257       SignalingMsgNTLP* sigmsg = new SignalingMsgNTLP();
    258       sigmsg->set_local_addr(r_entry->get_local_src()->copy());
     299      ERRLog(param.name, color[green] << "I am Querying Node, I will re-establish Routing State, enqueuing data, will be dropped after " << data_keep_timeout << "sec");
    259300     
    260       // Build a DATA Message
    261       sessionid* mysid = new sessionid(*(r_key->sid));
    262        
    263       // take eventual payload from apimsg
    264       nslpdata* mydata=NULL;
    265       if (payload) mydata=new nslpdata(*(payload));
    266        
    267       mri* mymri=r_key->mr->copy();
    268        
    269       // construct DATA
    270       data* pdu = new data(mymri, mysid, NULL, mydata);
    271        
    272       pdu->set_nslpid(r_key->nslpid);
    273        
    274       // set S-flag
    275       pdu->set_S();
    276  
    277       // set Hops
    278       pdu->set_hops(1);
    279  
    280       // send it
    281       sigmsg->set_req(peer, pdu);
    282       sigmsg->send_or_delete();
    283     }
    284     else
    285     {
    286       EVLog(param.name, color[green] << "No Messaging Association was found to send -> trying to re-establish" << color[off]);
    287 
    288       if (r_entry->is_responding_node()) {
    289         ERRLog(param.name, color[green] << "I am Responder Node, I cannot establish a Messaging Association. Dropping data");
    290       }
    291       else
    292       {
    293         ERRLog(param.name, color[green] << "I am Querying Node, I will re-establish Routing State" << color[off]);
    294         if (r_entry->is_multicast_QNode()) {
    295           ERRLog(param.name, color[red] << "This is a multicast session. This failed data transmission to peer "
    296                                         << *peer_nli << " will not be repeated" << color[off]);
    297         } else {
    298           ERRLog(param.name, color[green] << "enqueuing data, will be dropped after " << data_keep_timeout << "sec");
    299           enqueuedata(r_key, r_entry, payload, data_keep_timeout, msghandle);
    300 
    301           r_entry->set_state(qn_awaiting_response);
    302          
    303           // start NoResponse timer   
    304           starttimer(r_key, r_entry, noresponse, 0, r_entry->reset_retry_timeout());
    305         }
    306       }
    307     }
    308 
    309     // iterate through more peers?
    310     if (r_entry->is_multicast_QNode()) {
    311       mcast_peer_it++;
    312       more_peers = (mcast_peer_it != r_entry->get_multicast_peers()->end());
    313     }
    314   } while (more_peers);
     301      enqueuedata(r_key, r_entry, payload, data_keep_timeout, msghandle);
     302     
     303      r_entry->set_state(qn_awaiting_response);
     304
     305      // start NoResponse timer   
     306      starttimer(r_key, r_entry, noresponse, 0, r_entry->reset_retry_timeout());
     307
     308    }
     309  }
    315310} //end send_data_cmode
    316311
     
    359354    {
    360355      mydata=entry->dataqueue[i]->data;   
    361       // this cannot be explicitly routed, as we would not use C-Mode
    362       // TODO MULTICAST: This does not work for C-/D-mode mixed peers
    363       if (entry->is_dmode() && !entry->is_ma_reuse_requested()) {
    364         send_data_dmode(mydata, key, entry, entry->dataqueue[i]->msghandle);
     356      if (entry->is_multicast_QNode()) {
     357        // send queued data to each multicast peer
     358        mcast_peer_list_t::const_iterator mc_peer_it;
     359        for (mc_peer_it = entry->get_multicast_peers()->begin();
     360             mc_peer_it != entry->get_multicast_peers()->end(); mc_peer_it++)
     361        {
     362          const nli* peer_nli = &mc_peer_it->first;
     363          if (entry->is_dmode() && !entry->is_ma_reuse_requested(peer_nli))
     364            send_data_dmode(mydata, key, entry, entry->dataqueue[i]->msghandle, peer_nli);
     365          else
     366            send_data_cmode(mydata, key, entry, entry->dataqueue[i]->msghandle, peer_nli);
     367        }
     368      } else {
     369        // send queued data to unicast peer
     370        if (entry->is_dmode() && !entry->is_ma_reuse_requested())
     371          send_data_dmode(mydata, key, entry, entry->dataqueue[i]->msghandle);
     372        else
     373          send_data_cmode(mydata, key, entry, entry->dataqueue[i]->msghandle);
    365374      }
    366       else
    367         send_data_cmode(mydata, key, entry, entry->dataqueue[i]->msghandle);
    368375    }
    369376    // notify about sent message!
Note: See TracChangeset for help on using the changeset viewer.