keys.scala, etc.: Make merging public keys have a progress bar.
[tripe-android] / admin.scala
index 85978fe..9796cd9 100644 (file)
-package uk.org.distorted.tripe;
-
-import scala.collection.mutable.ArrayBuffer;
-
-object Admin {
-  val RX_ORDINARY = "^[^\\\\'\"\\s]+$".r;
-  val RX_WEIRD = "[\\\\'\"]".r;
-
-  def quote(v: Seq[String]) = {
-    val b = new StringBuilder;
-    var sep = false;
-    for (s <- v) {
-      if (!sep) sep = true;
-      else b.append(' ');
-      s match {
-       case RX_ORDINARY() => b.append(s);
-       case _ =>
-         b.append('"');
-         b.append(RX_WEIRD.replaceAllIn(s, "\\\\$0"));
-         b.append('"');
+/* -*-scala-*-
+ *
+ * Managing TrIPE administration connections
+ *
+ * (c) 2018 Straylight/Edgeware
+ */
+
+/*----- Licensing notice --------------------------------------------------*
+ *
+ * This file is part of the Trivial IP Encryption (TrIPE) Android app.
+ *
+ * TrIPE is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free
+ * Software Foundation; either version 3 of the License, or (at your
+ * option) any later version.
+ *
+ * TrIPE is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with TrIPE.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package uk.org.distorted.tripe; package object admin {
+
+/*----- Imports -----------------------------------------------------------*/
+
+import java.io.{BufferedReader, InputStreamReader, OutputStreamWriter};
+import java.util.concurrent.locks.{Condition, ReentrantLock => Lock};
+
+import scala.collection.mutable.HashMap;
+import scala.concurrent.Channel;
+import scala.util.control.Breaks;
+
+import Implicits._;
+import sys.{serverInput, serverOutput};
+
+/*----- Classification of server messages ---------------------------------*/
+
+sealed abstract class Message;
+
+sealed abstract class JobMessage extends Message;
+case object JobOK extends JobMessage;
+final case class JobInfo(info: Seq[String]) extends JobMessage;
+final case class JobFail(err: Seq[String]) extends JobMessage;
+case object JobLostConnection extends JobMessage;
+
+final case class BackgroundJobMessage(tag: String, msg: JobMessage)
+       extends Message;
+final case class JobDetached(tag: String) extends Message;
+
+sealed abstract class AsyncMessage extends Message;
+final case class Trace(msg: String) extends AsyncMessage;
+final case class Warning(err: Seq[String]) extends AsyncMessage;
+final case class Notify(note: Seq[String]) extends AsyncMessage;
+case object ConnectionLost extends AsyncMessage;
+
+sealed abstract class ServiceMessage extends Message;
+final case class ServiceCancel(jobid: String) extends ServiceMessage;
+final case class ServiceClaim(svc: String, version: String)
+       extends ServiceMessage;
+final case class ServiceJob(jobid: String, svc: String,
+                     cmd: String, args: Seq[String])
+       extends ServiceMessage;
+
+/*----- Main code ---------------------------------------------------------*/
+
+class ConnectionClosed extends Exception;
+
+class ServerFailed(msg: String) extends Exception(msg);
+
+class CommandFailed(val msg: Seq[String]) extends Exception {
+  override def getMessage(): String =
+    "%s(%s)".format(getClass.getName, quoteTokens(msg));
+}
+
+class ConnectionLostException extends Exception;
+
+object Connection extends Hook[AsyncMessage]
+{
+  /* Synchronization.
+   *
+   * This class is complicatedly multithreaded.  The following fields must
+   * only be accessed while the instance is locked.  To prevent deadlocks,
+   * hold the `Connection' lock before locking any individual `Job' objects.
+   */
+
+  private var livep: Boolean = true;   // Is this connection still alive?
+  private var fgjob: Option[this.Job] = None; // Foreground job, if there is one.
+  private val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs.
+  private var bgseq = 0;               // Next background job tag.
+
+  private val in = new BufferedReader(new InputStreamReader(serverInput));
+  private val out = new OutputStreamWriter(serverOutput);
+
+  type Pub = Connection.type;
+
+  class Job extends Iterator[Seq[String]] {
+    private[Connection] val ch = new Channel[JobMessage];
+    private[this] var nextmsg: Option[JobMessage] = None;
+
+    private[this] def fetchNext()
+      { if (!nextmsg) nextmsg = Some(ch.read); }
+    override def hasNext: Boolean = {
+      fetchNext();
+      nextmsg match {
+       case Some(JobOK) => false
+       case _ => true
+      }
+    }
+    override def next(): Seq[String] = {
+      fetchNext();
+      nextmsg match {
+       case None => unreachable;
+       case Some(JobOK) => throw new NoSuchElementException;
+       case Some(JobFail(msg)) => throw new CommandFailed(msg);
+       case Some(JobLostConnection) => throw new ConnectionLostException;
+       case Some(JobInfo(msg)) => nextmsg = None; msg
       }
     }
-    b.mkString
-  }
 
-  class InvalidQuotingException(msg: String) extends Exception(msg);
+    def keyvals(): Map[String, String] = {
+      val b = Map.newBuilder[String, String];
+      for (line <- this; token <- line) {
+       token.indexOf('=') match {
+         case -1 => throw new ServerFailed("missing `=' in key-value list");
+         case eq =>
+           val k = token.substring(0, eq);
+           val v = token.substring(eq + 1);
+           b += k -> v;
+       }
+      }
+      b.result
+    }
 
-  def split(s: String): Array[String] = {
-    val ab = new ArrayBuffer[String]();
-    val sb = new StringBuilder;
+    def traceish(): Seq[(Char, Boolean, String)] = {
+      val b = Seq.newBuilder[(Char, Boolean, String)];
+      for (line <- this) line match {
+       case List(key, desc@_*) =>
+         val live = if (key.length == 1) false
+                    else if (key.length == 2 && key(1) == '+') true
+                    else throw new ServerFailed(
+                      s"incomprehensible traceish key `$key'");
+         b += ((key(0), live, desc.mkString(" ")));
+       case _ => throw new ServerFailed("empty line in traceish output");
+      }
+      b.result
+    }
 
-    object State extends Enumeration {
-      val BETWEEN, WORD, SQUOTE, DQUOTE = Value;
+    def expectEmpty() {
+      if (hasNext) throw new ServerFailed("no output expected");
     }
-    import State.{Value => _, _};
-
-    val n = s.length;
-
-    def scan(pos: Int, st: State.Value, bs: Boolean)
-    {
-      if (pos >= n) {
-       if (bs)
-         throw new InvalidQuotingException("trailing `\\'");
-       else if (st == SQUOTE || st == DQUOTE)
-         throw new InvalidQuotingException("unmatched quote");
-       if (st != BETWEEN) ab += sb.mkString;
-      } else (st, bs, s(pos)) match {
-       case (BETWEEN, false, '\\') => scan(pos + 1, WORD, true);
-       case (_, false, '\\') => scan(pos + 1, st, true);
-       case (SQUOTE, false, ''') | (DQUOTE, false, '"') =>
-         scan(pos + 1, WORD, false);
-       case (BETWEEN | WORD, false, ''') => scan(pos + 1, SQUOTE, false);
-       case (BETWEEN | WORD, false, '"') => scan(pos + 1, DQUOTE, false);
-       case (BETWEEN, false, ch) if ch.isWhitespace =>
-         scan(pos + 1, st, false);
-       case (WORD, false, ch) if ch.isWhitespace =>
-         ab += sb.mkString; sb.clear();
-         scan(pos + 1, BETWEEN, false);
-       case (BETWEEN, _, ch) => sb.append(ch); scan(pos + 1, WORD, false);
-       case (_, _, ch) => sb.append(ch); scan(pos + 1, st, false);
+
+    def oneLine(): Seq[String] = {
+      if (hasNext) {
+       val line = next();
+       if (!hasNext) return line;
       }
+      throw new ServerFailed("exactly one line expected");
     }
-    scan(0, BETWEEN, false);
-    ab.toArray
   }
 
-  def main(args: Array[String])
-  {
-    if (args.length != 1) println(quote(args));
-    else for (s <- split(args(0))) println(s);
+  def submit(bg: Boolean, toks: String*): this.Job = {
+    var cmd = toks;
+println(";; wait for lock");
+    synchronized {
+      if (bg) {
+       val tag = bgseq formatted "J%05d"; bgseq += 1;
+       cmd = toks match {
+         case Seq(cmd, tail@_*) => cmd +: "-background" +: tag +: tail;
+       }
+      }
+println(";; wait for foreground");
+      while (livep && fgjob != None) wait();
+      if (!livep) throw new ConnectionClosed;
+println(";; write command");
+      try { out.write(quoteTokens(cmd)); out.write('\n'); out.flush(); }
+      catch { case e: Throwable => notify(); throw e; }
+      val j = new Job;
+      fgjob = Some(j);
+      j
+    }
+  }
+
+  def submit(toks: String*): this.Job = submit(false, toks: _*);
+
+  /* These two expect the connection lock to be held. */
+  def foregroundJob: Job =
+    fgjob.getOrElse { throw new ServerFailed("no foreground job"); }
+  def releaseForegroundJob() { fgjob = None; notify(); }
+
+  def parseServerLine(s: String): Message = nextToken(s) match {
+    case None => throw new ServerFailed("empty line from server")
+    case Some(("TRACE", next)) => Trace(s.substring(next))
+    case Some((code, next)) => (code, splitTokens(s, next)) match {
+      case ("OK", Seq()) => JobOK
+      case ("INFO", tail) => JobInfo(tail)
+      case ("FAIL", tail) => JobFail(tail)
+      case ("BGDETACH", Seq(tag)) => JobDetached(tag)
+      case ("BGOK", Seq(tag)) => BackgroundJobMessage(tag, JobOK)
+      case ("BGINFO", Seq(tag, tail@_*)) =>
+       BackgroundJobMessage(tag, JobInfo(tail))
+      case ("BGFAIL", Seq(tag, tail@_*)) =>
+       BackgroundJobMessage(tag, JobFail(tail))
+      case ("WARN", tail) => Warning(tail)
+      case ("NOTE", tail) => Notify(tail)
+      case ("SVCCLAIM", Seq(svc, ver)) => ServiceClaim(svc, ver)
+      case ("SVCJOB", Seq(tag, svc, cmd, args@_*)) =>
+       ServiceJob(tag, svc, cmd, args)
+      case ("SVCCANCEL", Seq(tag)) => ServiceCancel(tag)
+      case (_, tail) => throw new ServerFailed(
+       "incomprehensible line from server: " + quoteTokens(code +: tail))
+    }
   }
+
+  def processJobMessage(msg: JobMessage)
+                      (getjob: (Boolean) => Job) {
+    synchronized { getjob(msg.isInstanceOf[JobInfo]); }.ch.write(msg);
+  }
+
+  /* Reading lines from the server. */
+  val readthr = thread("admin reader") {
+println(";; readthr running");
+    val bin = in match {
+      case br: BufferedReader => br;
+      case _ => new BufferedReader(in)
+    }
+    var line: String = null;
+
+    try {
+println(";; wait for line");
+      while ({line = bin.readLine; line != null}) {
+println(s";; line: $line");
+       parseServerLine(line) match {
+         case JobDetached(tag) => synchronized {
+           jobmap(tag) = foregroundJob; releaseForegroundJob();
+         }
+         case msg: JobMessage => processJobMessage(msg) { keep =>
+           val j = foregroundJob; if (!keep) releaseForegroundJob(); j
+         }
+         case BackgroundJobMessage(tag, msg) =>
+           processJobMessage(msg) { keep =>
+             val j = jobmap.getOrElse(tag, throw new ServerFailed(
+               s"no job with tag `${tag}'"));
+             if (!keep) jobmap.remove(tag);
+             j
+           }
+         case msg: AsyncMessage =>
+           callHook(msg);
+         case _: ServiceMessage =>
+           ok;
+       }
+      }
+    } catch {
+      case e: Throwable => e.printStackTrace();
+    } finally {
+      synchronized {
+       livep = false;
+       for ((_, j) <- jobmap) j.ch.write(JobLostConnection);
+       fgjob match {
+         case Some(j) =>
+           j.ch.write(JobLostConnection);
+           fgjob = None;
+           notifyAll();
+         case None => ok;
+       }
+      }
+      callHook(ConnectionLost);
+    }
+  }
+}
+
+/*----- That's all, folks -------------------------------------------------*/
+
 }