NSIS-ka
A free C++ implementation of NSIS protocols

Changeset 4420


Ignore:
Timestamp:
Sep 21, 2009, 10:28:37 AM (8 years ago)
Author:
stud-lenk
Message:

Second step towards a multicast implementation

  • extended routingentry by an additional timer slot (timer_type_multicast and timer_id_multicast) used for multicast specific timer handling.
  • extended routingentry by an additional timeval struct used to save absolute deadlines for running refresh_QNode_timeout timer. This is used for calculating the remaining response time when receiving the first response to a multicast query.

The new multicast timer slot is used to decide when to purge unresponsive
multicast peers from the peer list. The timer is started whenever the
Query Node State Machine enters the state ESTABLISHED and runs for the
remaining time Refresh_QNode would have run to reach his deadline.
When the timer expires (and the node is still in state ESTABLISHED) the list of
multicast peers is checked for peers that did not send a RESPONSE to the
previous QUERY. Those peers will then be removed from the list.

Location:
ntlp/branches/20090723-multicast/src
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • ntlp/branches/20090723-multicast/src/ntlp_statemodule.h

    r4245 r4420  
    292292  void to_secrets_refresh();
    293293
     294  /// Timeout on collecting multicast responses
     295  void to_mcast_collect(const routingkey* r_key, TimerMsg* timermsg);
     296
    294297  /// SendMessage API Call processing
    295298  void tg_send_message(APIMsg* apimsg);
  • ntlp/branches/20090723-multicast/src/ntlp_statemodule_main.cpp

    r4271 r4420  
    407407        break;
    408408
     409      // Purge all unresponsive multicast peers NOW
     410      case mcast_collect:
     411        rk = static_cast<routingkey*>(timermsg->get_param2());
     412        to_mcast_collect(rk, timermsg);
     413        break;
     414
    409415      // Stopped
    410416      case none:
     
    13841390
    13851391    msg->send_to(message::qaddr_timer);
     1392    if (timer == noresponse) {
     1393      r_entry->save_no_response_deadline(sec, msec);
     1394    }
    13861395  }
    13871396
     
    14061415  }   
    14071416
     1417  // separate timer for collecting more responses (in reseponse to multicast queries)
     1418  if (timer == mcast_collect)
     1419  {
     1420    if ((r_entry->get_timer_id_multicast() != 0) && stop)
     1421    {
     1422      // stop the current timer
     1423      TimerMsg* stopmsg = new TimerMsg(message::qaddr_coordination, true);
     1424      stopmsg->stop(r_entry->get_timer_id_multicast());
     1425      stopmsg->send_to(message::qaddr_timer);
     1426    }
     1427
     1428    TimerMsg* msg = new RoutingTableTimerMsg(*r_key, timer, sec, msec);
     1429
     1430    r_entry->set_timer_type_multicast(timer);
     1431    r_entry->set_timer_id_multicast(msg->get_id());
     1432    mid = r_entry->get_timer_id_multicast();
     1433
     1434    msg->send_to(message::qaddr_timer);
     1435  }
     1436
    14081437  DLog(param.name, "Started timer: " << color[magenta] << timername(timer)<< " " << ((sec * 1000) + msec) << " ms" << " mid " << mid << color[off]);
    14091438}
     
    14501479    TimerMsg* stopmsg = new TimerMsg(message::qaddr_coordination, true);
    14511480    stopmsg->stop(r_entry->get_timer_id_3());
     1481    stopmsg->send_to(message::qaddr_timer);
     1482  }
     1483
     1484  // timers on multicast slot
     1485  if (r_entry->get_timer_type_multicast() != none)
     1486  {
     1487    // this will cause only ignoring any timer
     1488    r_entry->set_timer_type_multicast(none);
     1489
     1490    EVLog(param.name, color[magenta] << "Trying to stop timer for multicast" << color[off]);
     1491    // really stop the current timer
     1492    TimerMsg* stopmsg = new TimerMsg(message::qaddr_coordination, true);
     1493    stopmsg->stop(r_entry->get_timer_id_multicast());
    14521494    stopmsg->send_to(message::qaddr_timer);
    14531495  }
  • ntlp/branches/20090723-multicast/src/ntlp_statemodule_querier.cpp

    r4346 r4420  
    418418        ERRCLog(param.name, "incoming_pdu has no NLI. There is something really wrong!");
    419419      }
     420
     421      uint32 remaining_responses_time = r_entry->get_remaining_response_time();
     422      DLog(param.name, "Remaining time for other responses: " << remaining_responses_time << " ms");
     423      if (r_entry->get_min_peer_rs_validity_time() < remaining_responses_time) {
     424        WLog(param.name, "Trimming remaining time for more responses ("
     425                         << remaining_responses_time << " ms) to shorter peer RS validity time: "
     426                         << r_entry->get_min_peer_rs_validity_time() << " ms");
     427        remaining_responses_time = r_entry->get_min_peer_rs_validity_time();
     428        // TODO MULTICAST: shorten running timer for to_Refresh_QNode
     429      }
     430      if (remaining_responses_time == 0) {
     431        // purge list of unresponding peers now
     432        if (r_entry->get_timer_type_multicast() != none) {
     433          ERRCLog(param.name, "timer_type_multicast is not 'none' - this should never happen");
     434          r_entry->set_timer_type_multicast(none);
     435        }
     436        r_entry->purge_multicast_peers();
     437      } else {
     438        // purge list of unresponsive peers later
     439        starttimer(r_key, r_entry, mcast_collect, 0, remaining_responses_time);
     440      }
    420441    }
    421442    else
     
    535556
    536557  if (r_entry->is_multicast_QNode()) {
     558    // Add peer to list of multicast peers
    537559    uint32 sii_handle = param.rt.generate_sii_handle(incoming_pdu->get_nli());
    538560    if (incoming_pdu->get_nli())
    539561      r_entry->trim_peer_rs_validity_time(incoming_pdu->get_nli()->get_rs_validity_time());
    540562    r_entry->add_multicast_peer(sii_handle);
     563     
     564    uint32 remaining_responses_time = r_entry->get_remaining_response_time();
     565    DLog(param.name, "Remaining time for other responses: " << remaining_responses_time << " ms");
     566    if (r_entry->get_min_peer_rs_validity_time() < remaining_responses_time) {
     567      WLog(param.name, "Trimming remaining time for more responses ("
     568                        << remaining_responses_time << " ms) to shorter peer RS validity time: "
     569                        << r_entry->get_min_peer_rs_validity_time() << " ms");
     570      remaining_responses_time = r_entry->get_min_peer_rs_validity_time();
     571      // TODO MULTICAST: shorten running timer for to_Refresh_QNode
     572    }
     573    if (remaining_responses_time == 0) {
     574      // purge list of unresponding peers now
     575      if (r_entry->get_timer_type_multicast() != none) {
     576        ERRCLog(param.name, "timer_type_multicast is not 'none' - this should never happen");
     577        r_entry->set_timer_type_multicast(none);
     578      }
     579      r_entry->purge_multicast_peers();
     580    } else {
     581      // purge list of unresponsive peers later
     582      starttimer(r_key, r_entry, mcast_collect, 0, remaining_responses_time);
     583    }
    541584
    542585    //******************************************************************************
    543586    // TODO MULTICAST:
     587    // Traq ticket #126 (http://projekte.tm.uka.de/trac/NSIS/ticket/126)
    544588    // Detection of Routing Changes DOES NOT WORK for multicast yet
    545589    // Possible solution:
     
    11211165      if (r_entry->is_multicast_QNode())
    11221166      {
     1167        if (r_entry->get_timer_type_multicast() == mcast_collect) {
     1168          ERRCLog(param.name, "List of multicast peers hasn't been purged (mcast_collect timer should have fired before but didn't). Skipping purge for this time");
     1169          // Ensure later timer events on mcast_collect get ignored
     1170          //
     1171          // We do not need to stoptimer() because timer handling checks for
     1172          // the correct value in routingentry::timer_type_multicast and
     1173          // routingentry::timer_id_multicast. If a new timer gets started
     1174          // the latter will get assigned a new value and thus will entirely
     1175          // invalidate the old timer event
     1176          r_entry->set_timer_type_multicast(none);
     1177        }
    11231178        r_entry->expire_multicast_peers();
    11241179        // restart detection of lowest RSV time
     
    11541209}
    11551210
     1211
     1212/**
     1213 * McastCollect Timer processing. Purge unresponsive peers from multicast peer list
     1214 * @param r_key -- the routing key (may NOT be NULL)
     1215 * @param timermsg -- the timer message
     1216 */
     1217void Statemodule::to_mcast_collect(const routingkey* r_key, TimerMsg* timermsg)
     1218{
     1219  assert( r_key != NULL );
     1220
     1221  DLog(param.name, "to_mcast_collect(): looking up routing key");
     1222   
     1223  routingentry* r_entry = param.rt.lookup(r_key);
     1224   
     1225  if (r_entry)
     1226  {
     1227    if (r_entry->get_timer_type_multicast() != mcast_collect) {
     1228      DLog(param.name, "Deliberately stopped timer, ignoring");
     1229      param.rt.unlock(r_key);
     1230      return;
     1231    }
     1232
     1233    if (r_entry->get_state() != qn_established) {
     1234      ERRLog(param.name, "Got a timeout on timer " << timerstring[mcast_collect]
     1235                        << " but I am in state " << r_entry->get_state_name()
     1236                        << " (expected to be in state " << statestring[qn_established] << ")");
     1237      param.rt.unlock(r_key);
     1238      return;
     1239    }
     1240
     1241    if (r_entry->get_timer_id_multicast() == timermsg->get_id())
     1242    {
     1243      DLog(param.name, "Timer slot multicast, timer is valid, id: " << timermsg->get_id());
     1244
     1245
     1246      EVLog(param.name, color[magenta] << "MCastCollect Timer went off" << color[off]);
     1247      r_entry->purge_multicast_peers();
     1248           
     1249      param.rt.unlock(r_key);
     1250    } // end if timer valid
     1251  } // if r_entry
     1252}
     1253
    11561254//@}
    11571255
  • ntlp/branches/20090723-multicast/src/routingentry.cpp

    r4345 r4420  
    3030#include "gist_conf.h"
    3131#include "routingentry.h"
     32#include "sys/time.h"
    3233
    3334using namespace ntlp;
     
    113114routingentry::purge_multicast_peers()
    114115{
    115   if (!is_multicast_QNode()) { ERRCLog("purge_multicast_peers()", "Not a multicast QNode"); return; }
    116   hash_map<uint32, multicast_peerstatus>::iterator it;;
     116  if (!is_multicast_QNode())
     117  {
     118    ERRCLog("purge_multicast_peers()", "called on a unicast QNode - this should never happen!");
     119    return;
     120  }
     121
     122  unsigned int purged_peer_count = 0;
     123  hash_map<uint32, multicast_peerstatus>::iterator it;
    117124  for (it=multicast_peer.begin(); it != multicast_peer.end(); it++)
    118125  {
    119126    if (it->second.active == false) {
     127      purged_peer_count++;
    120128      multicast_peer.erase(it);
    121129      DLog("MulticastQN", "Multicast peer with SII " << it->first << " purged");
     130
     131      // TODO MULTICAST:
     132      // Give peers a fair chance if they missed a refreshing query
     133
     134      // TODO MULTICAST:
     135      // Do we need to do a networknotification() to NSLP???
     136
    122137    }
    123138  }
     139  DLog("MulticastQN", "purge_multicast_peers(): " << purged_peer_count << " peers removed from list of multicast peers");
     140
     141  // ensure later timer events on mcast_collect get ignored:
     142  set_timer_type_multicast(none);
    124143}
     144
     145void
     146routingentry::save_no_response_deadline(uint32 sec, uint32 msec)
     147{
     148  struct timeval now = {0,0};
     149  struct timeval delta = {sec, msec * 1000};
     150  gettimeofday(&now, NULL);
     151  timeradd(&now, &delta, &no_response_deadline);
     152}
     153
     154uint32
     155routingentry::get_remaining_response_time()
     156{
     157  // shortcut gettimeofday() if there is no valid no_response_deadline
     158  if (no_response_deadline.tv_sec == 0 && no_response_deadline.tv_usec == 0) return 0;
     159
     160  uint32 remaining_time = 0;
     161  struct timeval now = {0,0};
     162  gettimeofday(&now, NULL);
     163  if (timercmp(&now, &no_response_deadline, <))
     164  {
     165    struct timeval delta;
     166    timersub(&no_response_deadline, &now, &delta);
     167    assert(delta.tv_sec >= 0 && delta.tv_usec >= 0);
     168    remaining_time = delta.tv_sec * 1000 + delta.tv_usec / 1000;
     169  }
     170  else
     171  { // invalidate expired no_response_deadline
     172    no_response_deadline.tv_sec = 0;
     173    no_response_deadline.tv_usec = 0;
     174  }
     175  return remaining_time;
     176}
  • ntlp/branches/20090723-multicast/src/routingentry.h

    r4346 r4420  
    6060  timer_stopped  = 12,
    6161  send_mresponse = 13,
    62   last_timer_type= 14,
     62  mcast_collect  = 14,
     63  last_timer_type= 15,
    6364  none           = 255
    6465} timer_type_t;
     
    106107  "STOPPED",
    107108  "SEND_MCAST_RESPONSE",
     109  "MULTICAST_COLLECT",
    108110  "INVALID_TYPE_LAST_TIMER"
    109111}; // end timerstring
     
    158160          timer_type_2(none), // NOTE: not a true copy
    159161          timer_type_3(none), // NOTE: not a true copy
     162          timer_type_multicast(none), // NOTE: not a true copy
    160163          timer_id(0), // NOTE: not a true copy
    161164          timer_id_2(0), // NOTE: not a true copy
    162165          timer_id_3(0), // NOTE: not a true copy
     166          timer_id_multicast(0), // NOTE: not a true copy
    163167          rs_validity_time(n.rs_validity_time),
    164168          min_peer_rs_validity_time(n.min_peer_rs_validity_time),
     
    179183  {
    180184    dataqueue.clear();
     185    no_response_deadline.tv_sec = 0;
     186    no_response_deadline.tv_usec = 0;
    181187  } // end copy constructor
    182188
     
    287293        timer_type_t get_timer_type_3() const { return timer_type_3; }
    288294        void set_timer_type_3(timer_type_t type) { timer_type_3= type; }
     295        timer_type_t get_timer_type_multicast() const { return timer_type_multicast; }
     296        void set_timer_type_multicast(timer_type_t type) { timer_type_multicast= type; }
    289297
    290298        uint64 get_timer_id_1() const { return timer_id; }
     
    294302        uint64 get_timer_id_3() const { return timer_id_3; }
    295303        void set_timer_id_3(uint64 id) { timer_id_3= id; }
     304        uint64 get_timer_id_multicast() const { return timer_id_multicast; }
     305        void set_timer_id_multicast(uint64 id) { timer_id_multicast= id; }
    296306
    297307        const char* get_state_name() const { return (state<rt_state_max) ? statestring[state] : "state variable invalid - internal error"; };
     
    319329        void expire_multicast_peers();
    320330        void purge_multicast_peers();
     331
     332        // calculates the absolute deadline from the given relative deadline
     333        // (sec, msec like in starttimer()) and saves the result to
     334        // routingentry::no_response_deadline (a timeval)
     335        void save_no_response_deadline(uint32 sec, uint32 msec);
     336
     337        // checks the absolute deadline previously saved by
     338        // save_no_response_deadline() and returns the remaining time (ms)
     339        uint32 get_remaining_response_time();
    321340
    322341  /// enqueued DATA PAYLOAD
     
    361380  timer_type_t timer_type_2;
    362381  timer_type_t timer_type_3;
     382  // used for timeout of collecting multicast responses
     383  timer_type_t timer_type_multicast;
    363384   
    364385  /// the ID of the timers 1 and 2
     
    366387  uint64 timer_id_2;
    367388  uint64 timer_id_3;   
     389  // used for timeout of collecting multicast responses
     390  uint64 timer_id_multicast;
     391  // needed for calculation of remaining no_response_timeout time
     392  // when receiving first response to multicast query
     393  struct timeval no_response_deadline;
    368394   
    369395  /// RSV time (ms)
     
    430456    timer_type_2(none),
    431457    timer_type_3(none),
     458    timer_type_multicast(none),
    432459    timer_id(0),
    433460    timer_id_2(0),
    434461    timer_id_3(0),
     462    timer_id_multicast(0),
    435463    rs_validity_time(0),
    436464    min_peer_rs_validity_time(0),
     
    451479{
    452480  dataqueue.clear();
     481  no_response_deadline.tv_sec = 0;
     482  no_response_deadline.tv_usec = 0;
    453483} // end constructor
    454484
Note: See TracChangeset for help on using the changeset viewer.