+ c.execute("""SELECT kind, groupdesc, g, x, m, n FROM top""")
+ kind, groupdesc, gstr, xstr, mstr, nstr = c.fetchone()
+ G = getgroup(kind, groupdesc)
+ g, x, m = G.elt(gstr), G.elt(xstr), C.MP(mstr)
+ n = nstr is not None and C.MP(nstr) or None
+ return G, g, x, m, n
+
+def get_job(db):
+ c = db.cursor()
+ c.execute("""SELECT p.p, p.e, p.k, p.n, p.dpbits
+ FROM progress AS p LEFT OUTER JOIN workers AS w
+ ON p.p = w.p and p.k = w.k
+ WHERE p.k < p.e AND (p.dpbits > 0 OR w.pid IS NULL)
+ LIMIT 1""")
+ row = c.fetchone()
+ if row is None: return None, None, None, None, None
+ else:
+ pstr, e, k, nstr, dpbits = row
+ p, n = C.MP(pstr), C.MP(nstr)
+ return p, e, k, n, dpbits
+
+def maybe_cleanup_worker(dir, db, pid):
+ c = db.cursor()
+ f = OS.path.join(dir, 'lk.%d' % pid)
+ state = 'LIVE'
+ try: fd = OS.open(f, OS.O_WRONLY)
+ except OSError, err:
+ if err.errno != E.ENOENT: raise ExpectedError, 'open lockfile: %s' % err
+ state = 'STALE'
+ else:
+ try: F.lockf(fd, F.LOCK_EX | F.LOCK_NB)
+ except IOError, err:
+ if err.errno != E.EAGAIN: raise ExpectedError, 'check lock: %s' % err
+ else:
+ state = 'STALE'
+ if state == 'STALE':
+ try: OS.unlink(f)
+ except OSError: pass
+ c.execute("""DELETE FROM workers WHERE pid = ?""", (pid,))
+
+def maybe_kill_worker(dir, pid):
+ f = OS.path.join(dir, 'lk.%d' % pid)
+ try: fd = OS.open(f, OS.O_RDWR)
+ except OSError, err:
+ if err.errno != E.ENOENT: raise ExpectedError, 'open lockfile: %s' % err
+ return
+ try: F.lockf(fd, F.LOCK_EX | F.LOCK_NB)
+ except IOError, err:
+ if err.errno != E.EAGAIN: raise ExpectedError, 'check lock: %s' % err
+ else: return
+ OS.kill(pid, SIG.SIGTERM)
+ try: OS.unlink(f)
+ except OSError: pass
+
+###--------------------------------------------------------------------------
+### Setup.