From dd9af5cb6877b3c6c6b33d99800ec35745272729 Mon Sep 17 00:00:00 2001 From: Richard Kettlewell Date: Sun, 18 May 2008 17:51:36 +0100 Subject: [PATCH] Add a new 'wait' flag to the rescan command. This allows the caller to request that the rescan command blocks until the rescan is complete. The reason is that if you run the tests on a Linux tmpfs they would with high probability hang, due to the rescan completing before the rescan_monitor had started up. The flags is available in the Python interface but not the C interface or the command-line client. This could easily be fixed if there is demand. There's also a 'fresh' flag, to demand that the rescan start after the receipt of the command (i.e. to guarantee your new tracks make it in) but I disabled that due to the inconvenience of testing it. However the code is still there if anyone feels like writing tests. --- doc/disorder_protocol.5.in | 14 +++++- lib/trackdb.c | 43 +++++++++++++++++- lib/trackdb.h | 7 ++- python/disorder.py.in | 4 +- server/disorderd.c | 2 +- server/server.c | 110 ++++++++++++++++++++++++++++++++++++++++++--- server/state.c | 2 +- tests/dtest.py | 10 +---- 8 files changed, 170 insertions(+), 22 deletions(-) diff --git a/doc/disorder_protocol.5.in b/doc/disorder_protocol.5.in index 5a2bf83..0847050 100644 --- a/doc/disorder_protocol.5.in +++ b/doc/disorder_protocol.5.in @@ -255,9 +255,21 @@ Requires one of the \fBremove mine\fR, \fBremove random\fR or \fBremove any\fR rights depending on how the track came to be added to the queue. .TP -.B rescan +.B rescan \fR[\fBwait\fR] \fR[\fBfresh\fR] Rescan all roots for new or obsolete tracks. Requires the \fBrescan\fR right. +.IP +If the \fBwait\fR flag is present then the response is delayed until the rescan +completes. +Otherwise the response arrives immediately. +This is primarily intended for testing. +.IP +If the \fBfresh\fR flag is present a rescan is already underway then a second +rescan will be started when it completes. +The default behavior is to piggyback on the existing rescan. +.IP +NB that \fBfresh\fR is currently disabled in the server source, so using this +flag will just provoke an error. .TP .B resolve \fITRACK\fR Resolve a track name, i.e. if this is an alias then return the real track name. diff --git a/lib/trackdb.c b/lib/trackdb.c index 54016ba..2080e63 100644 --- a/lib/trackdb.c +++ b/lib/trackdb.c @@ -2097,6 +2097,28 @@ int trackdb_scan(const char *root, /* trackdb_rescan ************************************************************/ +/** @brief Node in the list of rescan-complete callbacks */ +struct rescanned_node { + struct rescanned_node *next; + void (*rescanned)(void *ru); + void *ru; +}; + +/** @brief List of rescan-complete callbacks */ +static struct rescanned_node *rescanned_list; + +/** @brief Add a rescan completion callback */ +void trackdb_add_rescanned(void (*rescanned)(void *ru), + void *ru) { + if(rescanned) { + struct rescanned_node *n = xmalloc(sizeof *n); + n->next = rescanned_list; + n->rescanned = rescanned; + n->ru = ru; + rescanned_list = n; + } +} + /* called when the rescanner terminates */ static int reap_rescan(ev_source attribute((unused)) *ev, pid_t pid, @@ -2111,23 +2133,37 @@ static int reap_rescan(ev_source attribute((unused)) *ev, /* Our cache of file lookups is out of date now */ cache_clean(&cache_files_type); eventlog("rescanned", (char *)0); + /* Call rescanned callbacks */ + while(rescanned_list) { + void (*rescanned)(void *u) = rescanned_list->rescanned; + void *ru = rescanned_list->ru; + + rescanned_list = rescanned_list->next; + rescanned(ru); + } return 0; } /** @brief Initiate a rescan * @param ev Event loop or 0 to block * @param recheck 1 to recheck lengths, 0 to suppress check + * @param rescanned Called on completion (if not NULL) + * @param u Passed to @p rescanned */ -void trackdb_rescan(ev_source *ev, int recheck) { +void trackdb_rescan(ev_source *ev, int recheck, + void (*rescanned)(void *ru), + void *ru) { int w; if(rescan_pid != -1) { + trackdb_add_rescanned(rescanned, ru); error(0, "rescan already underway"); return; } rescan_pid = subprogram(ev, -1, RESCAN, recheck ? "--check" : "--no-check", (char *)0); + trackdb_add_rescanned(rescanned, ru); if(ev) { ev_child(ev, rescan_pid, 0, reap_rescan, 0); D(("started rescanner")); @@ -2147,6 +2183,11 @@ int trackdb_rescan_cancel(void) { return 1; } +/** @brief Return true if a rescan is underway */ +int trackdb_rescan_underway(void) { + return rescan_pid != -1; +} + /* global prefs **************************************************************/ void trackdb_set_global(const char *name, diff --git a/lib/trackdb.h b/lib/trackdb.h index f372f4c..f97ee5f 100644 --- a/lib/trackdb.h +++ b/lib/trackdb.h @@ -136,7 +136,9 @@ char **trackdb_search(char **wordlist, int nwordlist, int *ntracks); /* return a list of tracks containing all of the words given. If you * ask for only stopwords you get no tracks. */ -void trackdb_rescan(struct ev_source *ev, int recheck); +void trackdb_rescan(struct ev_source *ev, int recheck, + void (*rescanned)(void *ru), + void *ru); /* Start a rescan, if one is not running already */ int trackdb_rescan_cancel(void); @@ -177,6 +179,9 @@ typedef void random_callback(struct ev_source *ev, const char *track); int trackdb_request_random(struct ev_source *ev, random_callback *callback); +void trackdb_add_rescanned(void (*rescanned)(void *ru), + void *ru); +int trackdb_rescan_underway(void); #endif /* TRACKDB_H */ diff --git a/python/disorder.py.in b/python/disorder.py.in index 23f840c..ef33106 100644 --- a/python/disorder.py.in +++ b/python/disorder.py.in @@ -479,12 +479,12 @@ class client: """ self._simple("reconfigure") - def rescan(self): + def rescan(self, *flags): """Rescan one or more collections. Only trusted users can perform this operation. """ - self._simple("rescan") + self._simple("rescan", *flags) def version(self): """Return the server's version number.""" diff --git a/server/disorderd.c b/server/disorderd.c index 3a5e093..9f444b6 100644 --- a/server/disorderd.c +++ b/server/disorderd.c @@ -160,7 +160,7 @@ static void create_periodic(ev_source *ev_, } static void periodic_rescan(ev_source *ev_) { - trackdb_rescan(ev_, 1/*check*/); + trackdb_rescan(ev_, 1/*check*/, 0, 0); } static void periodic_database_gc(ev_source attribute((unused)) *ev_) { diff --git a/server/server.c b/server/server.c index fd0fcd8..60cb869 100644 --- a/server/server.c +++ b/server/server.c @@ -124,6 +124,8 @@ struct conn { rights_type rights; /** @brief Next connection */ struct conn *next; + /** @brief True if pending rescan had 'wait' set */ + int rescan_wait; }; /** @brief Linked list of connections */ @@ -355,13 +357,107 @@ static int c_reconfigure(struct conn *c, return 1; /* completed */ } +static void finished_rescan(void *ru) { + struct conn *const c = ru; + + sink_writes(ev_writer_sink(c->w), "250 rescan completed\n"); + /* Turn this connection back on */ + ev_reader_enable(c->r); +} + +static void start_fresh_rescan(void *ru) { + struct conn *const c = ru; + + if(trackdb_rescan_underway()) { + /* Some other waiter beat us to it. However in this case we're happy to + * piggyback; the requirement is that a new rescan be started, not that it + * was _our_ rescan. */ + if(c->rescan_wait) { + /* We block until the rescan completes */ + trackdb_add_rescanned(finished_rescan, c); + } else { + /* We report that the new rescan has started */ + sink_writes(ev_writer_sink(c->w), "250 rescan initiated\n"); + /* Turn this connection back on */ + ev_reader_enable(c->r); + } + } else { + /* We are the first connection to get a callback so we must start a + * rescan. */ + if(c->rescan_wait) { + /* We want to block until the new rescan completes */ + trackdb_rescan(c->ev, 1/*check*/, finished_rescan, c); + } else { + /* We can report back immediately */ + trackdb_rescan(c->ev, 1/*check*/, 0, 0); + sink_writes(ev_writer_sink(c->w), "250 rescan initiated\n"); + /* Turn this connection back on */ + ev_reader_enable(c->r); + } + } +} + static int c_rescan(struct conn *c, - char attribute((unused)) **vec, - int attribute((unused)) nvec) { - info("S%x rescan by %s", c->tag, c->who); - trackdb_rescan(c->ev, 1/*check*/); - sink_writes(ev_writer_sink(c->w), "250 initiated rescan\n"); - return 1; /* completed */ + char **vec, + int nvec) { + int wait = 0, fresh = 0, n; + + /* Parse flags */ + for(n = 0; n < nvec; ++n) { + if(!strcmp(vec[n], "wait")) + wait = 1; /* wait for rescan to complete */ +#if 0 + /* Currently disabled because untested (and hard to test). */ + else if(!strcmp(vec[n], "fresh")) + fresh = 1; /* don't piggyback underway rescan */ +#endif + else { + sink_writes(ev_writer_sink(c->w), "550 unknown flag\n"); + return 1; /* completed */ + } + } + /* Report what was requested */ + info("S%x rescan by %s (%s %s)", c->tag, c->who, + wait ? "wait" : "", + fresh ? "fresh" : ""); + if(trackdb_rescan_underway()) { + if(fresh) { + /* We want a fresh rescan but there is already one underway. Arrange a + * callback when it completes and then set off a new one. */ + c->rescan_wait = wait; + trackdb_add_rescanned(start_fresh_rescan, c); + if(wait) + return 0; + else { + sink_writes(ev_writer_sink(c->w), "250 rescan queued\n"); + return 1; + } + } else { + /* There's a rescan underway, and it's acceptable to piggyback on it */ + if(wait) { + /* We want to block until completion. */ + trackdb_add_rescanned(finished_rescan, c); + return 0; + } else { + /* We don't want to block. So we just report that things are in + * hand. */ + sink_writes(ev_writer_sink(c->w), "250 rescan already underway\n"); + return 1; + } + } + } else { + /* No rescan is underway. fresh is therefore irrelevant. */ + if(wait) { + /* We want to block until completion */ + trackdb_rescan(c->ev, 1/*check*/, finished_rescan, c); + return 0; + } else { + /* We don't want to block. */ + trackdb_rescan(c->ev, 1/*check*/, 0, 0); + sink_writes(ev_writer_sink(c->w), "250 rescan initiated\n"); + return 1; /* completed */ + } + } } static int c_version(struct conn *c, @@ -1465,7 +1561,7 @@ static const struct command { { "register", 3, 3, c_register, RIGHT_REGISTER|RIGHT__LOCAL }, { "reminder", 1, 1, c_reminder, RIGHT__LOCAL }, { "remove", 1, 1, c_remove, RIGHT_REMOVE__MASK }, - { "rescan", 0, 0, c_rescan, RIGHT_RESCAN }, + { "rescan", 0, INT_MAX, c_rescan, RIGHT_RESCAN }, { "resolve", 1, 1, c_resolve, RIGHT_READ }, { "resume", 0, 0, c_resume, RIGHT_PAUSE }, { "revoke", 0, 0, c_revoke, RIGHT_READ }, diff --git a/server/state.c b/server/state.c index 3adcf2b..f88067b 100644 --- a/server/state.c +++ b/server/state.c @@ -154,7 +154,7 @@ int reconfigure(ev_source *ev, int reload) { /* We only allow for upgrade at startup */ trackdb_open(TRACKDB_CAN_UPGRADE); if(need_another_rescan) - trackdb_rescan(ev, 1/*check*/); + trackdb_rescan(ev, 1/*check*/, 0, 0); if(!ret) { queue_read(); recent_read(); diff --git a/tests/dtest.py b/tests/dtest.py index 9439bce..5eac2ea 100644 --- a/tests/dtest.py +++ b/tests/dtest.py @@ -271,16 +271,10 @@ def create_user(username="fred", password="fredpass"): "--user", "root", "edituser", username, "rights", "all"]) def rescan(c=None): - class rescan_monitor(disorder.monitor): - def rescanned(self): - return False + print " initiating rescan" if c is None: c = disorder.client() - m = rescan_monitor() - print " initiating rescan" - c.rescan() - print " waiting for rescan to complete" - m.run() + c.rescan('wait') print " rescan completed" def stop_daemon(): -- 2.11.0