X-Git-Url: https://git.distorted.org.uk/~mdw/tripe/blobdiff_plain/a62f8e8a94bf56194539f7140a1215bc74309b36..76a24ae6cece0604e3155b628622a4b425cb77f3:/svc/watch.in diff --git a/svc/watch.in b/svc/watch.in index bfad160c..140f1fce 100644 --- a/svc/watch.in +++ b/svc/watch.in @@ -399,12 +399,22 @@ class PingPeer (object): """ S.rawcommand(T.TripeAsynchronousCommand( me._q, (me._peer, me.seq), - ['PING', + ['EPING', '-background', S.bgtag(), '-timeout', str(me._timeout), '--', me._peer])) + def _reconnect(me): + info = peerinfo(me._peer) + if 'connect' in info: + S.warn('watch', 'reconnecting', me._peer) + S.forcekx(me._peer) + T.spawn(connect, me._peer) + me._timer = M.SelTimer(time() + me._every, me._time) + else: + S.kill(me._peer) + def event(me, code, stuff): """ Respond to an event which happened to this peer. @@ -422,8 +432,12 @@ class PingPeer (object): me._ping() elif code == 'FAIL': S.notify('watch', 'ping-failed', me._peer, *stuff) - if stuff and stuff[0] == 'unknown-peer': + if not stuff: + pass + elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer) + elif stuff[0] == 'ping-send-failed': + me._reconnect() elif code == 'INFO': if stuff[0] == 'ping-ok': if me._failures > 0: @@ -436,14 +450,7 @@ class PingPeer (object): if me._failures < me._retries: me._ping() else: - info = peerinfo(me._peer) - if 'connect' in info: - S.warn('watch', 'reconnecting', me._peer) - S.forcekx(me._peer) - T.spawn(T.Coroutine(connect), me._peer) - me._timer = M.SelTimer(time() + me._every, me._time) - else: - S.kill(me._peer) + me._reconnect() elif stuff[0] == 'ping-peer-died': me._pinger.kill(me._peer) @@ -518,26 +525,35 @@ class Pinger (T.Coroutine): installed, and again by the dbwatcher coroutine when it detects a change to the database. """ + if T._debug: print '# rescan peers' correct = {} + start = {} for peer in S.list(): try: info = peerinfo(peer) except KeyError: continue if boolean(info, 'watch', False): - correct[peer] = info + if T._debug: print '# interesting peer %s' % peer + correct[peer] = start[peer] = info + elif startup: + if T._debug: print '# peer %s ready for adoption' % peer + start[peer] = info for peer, obj in me._peers.items(): if peer in correct: obj.update(correct[peer]) else: + if T._debug: print '# peer %s vanished' % peer del me._peers[peer] - for peer, info in correct.iteritems(): + for peer, info in start.iteritems(): if peer not in me._peers: if startup: + if T._debug: print '# setting up peer %s' % peer ifname = S.ifname(peer) addr = S.addr(peer) - addpeer(info, peer, ifname, *addr) + T.defer(addpeer, info, peer, ifname, *addr) else: + if T._debug: print '# adopting new peer %s' % peer me.add(peer, info, True) return me @@ -607,9 +623,11 @@ def addpeer(info, peer, ifname, *addr): except KeyError: return if 'ifup' in info: - T.Coroutine(ifupdown).switch('ifup', peer, info, ifname, *addr) + T.Coroutine(ifupdown, name = 'ifup %s' % peer) \ + .switch('ifup', peer, info, ifname, *addr) if 'connect' in info: - T.Coroutine(connect).switch(peer, info['connect']) + T.Coroutine(connect, name = 'connect %s' % peer) \ + .switch(peer, info['connect']) if boolean(info, 'watch', False): pinger.add(peer, info, False) @@ -624,7 +642,8 @@ def delpeer(peer): except KeyError: pass if 'ifdown' in info: - T.Coroutine(ifupdown).switch('ifdown', peer, info) + T.Coroutine(ifupdown, name = 'ifdown %s' % peer) \ + .switch('ifdown', peer, info) def notify(_, code, *rest): """ @@ -650,7 +669,7 @@ def cmd_kick(peer): """ if peer not in pinger.adopted(): raise T.TripeJobError('peer-not-adopted', peer) - T.spawn(T.Coroutine(connect), peer) + T.spawn(connect, peer) def cmd_adopted(): """ @@ -680,7 +699,7 @@ def init(): errorwatch = ErrorWatch() childwatch = ChildWatch() pinger = Pinger() - T.Coroutine(dbwatch).switch() + T.Coroutine(dbwatch, name = 'dbwatch').switch() errorwatch.switch() pinger.switch()