-package uk.org.distorted.tripe;
-
-import scala.collection.mutable.ArrayBuffer;
-
-object Admin {
- val RX_ORDINARY = "^[^\\\\'\"\\s]+$".r;
- val RX_WEIRD = "[\\\\'\"]".r;
-
- def quote(v: Seq[String]) = {
- val b = new StringBuilder;
- var sep = false;
- for (s <- v) {
- if (!sep) sep = true;
- else b.append(' ');
- s match {
- case RX_ORDINARY() => b.append(s);
- case _ =>
- b.append('"');
- b.append(RX_WEIRD.replaceAllIn(s, "\\\\$0"));
- b.append('"');
+/* -*-scala-*-
+ *
+ * Managing TrIPE administration connections
+ *
+ * (c) 2018 Straylight/Edgeware
+ */
+
+/*----- Licensing notice --------------------------------------------------*
+ *
+ * This file is part of the Trivial IP Encryption (TrIPE) Android app.
+ *
+ * TrIPE is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free
+ * Software Foundation; either version 3 of the License, or (at your
+ * option) any later version.
+ *
+ * TrIPE is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package uk.org.distorted.tripe; package object admin {
+
+/*----- Imports -----------------------------------------------------------*/
+
+import java.io.{BufferedReader, InputStreamReader, OutputStreamWriter};
+import java.util.concurrent.locks.{Condition, ReentrantLock => Lock};
+
+import scala.collection.mutable.HashMap;
+import scala.concurrent.Channel;
+import scala.util.control.Breaks;
+
+import Implicits._;
+import sys.{serverInput, serverOutput};
+
+/*----- Classification of server messages ---------------------------------*/
+
+sealed abstract class Message;
+
+sealed abstract class JobMessage extends Message;
+case object JobOK extends JobMessage;
+final case class JobInfo(info: Seq[String]) extends JobMessage;
+final case class JobFail(err: Seq[String]) extends JobMessage;
+case object JobLostConnection extends JobMessage;
+
+final case class BackgroundJobMessage(tag: String, msg: JobMessage)
+ extends Message;
+final case class JobDetached(tag: String) extends Message;
+
+sealed abstract class AsyncMessage extends Message;
+final case class Trace(msg: String) extends AsyncMessage;
+final case class Warning(err: Seq[String]) extends AsyncMessage;
+final case class Notify(note: Seq[String]) extends AsyncMessage;
+case object ConnectionLost extends AsyncMessage;
+
+sealed abstract class ServiceMessage extends Message;
+final case class ServiceCancel(jobid: String) extends ServiceMessage;
+final case class ServiceClaim(svc: String, version: String)
+ extends ServiceMessage;
+final case class ServiceJob(jobid: String, svc: String,
+ cmd: String, args: Seq[String])
+ extends ServiceMessage;
+
+/*----- Main code ---------------------------------------------------------*/
+
+class ConnectionClosed extends Exception;
+
+class ServerFailed(msg: String) extends Exception(msg);
+
+class CommandFailed(val msg: Seq[String]) extends Exception {
+ override def getMessage(): String =
+ "%s(%s)".format(getClass.getName, quoteTokens(msg));
+}
+
+class ConnectionLostException extends Exception;
+
+object Connection extends Hook[AsyncMessage]
+{
+ /* Synchronization.
+ *
+ * This class is complicatedly multithreaded. The following fields must
+ * only be accessed while the instance is locked. To prevent deadlocks,
+ * hold the `Connection' lock before locking any individual `Job' objects.
+ */
+
+ private var livep: Boolean = true; // Is this connection still alive?
+ private var fgjob: Option[this.Job] = None; // Foreground job, if there is one.
+ private val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs.
+ private var bgseq = 0; // Next background job tag.
+
+ private val in = new BufferedReader(new InputStreamReader(serverInput));
+ private val out = new OutputStreamWriter(serverOutput);
+
+ type Pub = Connection.type;
+
+ class Job extends Iterator[Seq[String]] {
+ private[Connection] val ch = new Channel[JobMessage];
+ private[this] var nextmsg: Option[JobMessage] = None;
+
+ private[this] def fetchNext()
+ { if (!nextmsg) nextmsg = Some(ch.read); }
+ override def hasNext: Boolean = {
+ fetchNext();
+ nextmsg match {
+ case Some(JobOK) => false
+ case _ => true
+ }
+ }
+ override def next(): Seq[String] = {
+ fetchNext();
+ nextmsg match {
+ case None => unreachable;
+ case Some(JobOK) => throw new NoSuchElementException;
+ case Some(JobFail(msg)) => throw new CommandFailed(msg);
+ case Some(JobLostConnection) => throw new ConnectionLostException;
+ case Some(JobInfo(msg)) => nextmsg = None; msg
}
}
- b.mkString
- }
- class InvalidQuotingException(msg: String) extends Exception(msg);
+ def keyvals(): Map[String, String] = {
+ val b = Map.newBuilder[String, String];
+ for (line <- this; token <- line) {
+ token.indexOf('=') match {
+ case -1 => throw new ServerFailed("missing `=' in key-value list");
+ case eq =>
+ val k = token.substring(0, eq);
+ val v = token.substring(eq + 1);
+ b += k -> v;
+ }
+ }
+ b.result
+ }
- def split(s: String): Array[String] = {
- val ab = new ArrayBuffer[String]();
- val sb = new StringBuilder;
+ def traceish(): Seq[(Char, Boolean, String)] = {
+ val b = Seq.newBuilder[(Char, Boolean, String)];
+ for (line <- this) line match {
+ case List(key, desc@_*) =>
+ val live = if (key.length == 1) false
+ else if (key.length == 2 && key(1) == '+') true
+ else throw new ServerFailed(
+ s"incomprehensible traceish key `$key'");
+ b += ((key(0), live, desc.mkString(" ")));
+ case _ => throw new ServerFailed("empty line in traceish output");
+ }
+ b.result
+ }
- object State extends Enumeration {
- val BETWEEN, WORD, SQUOTE, DQUOTE = Value;
+ def expectEmpty() {
+ if (hasNext) throw new ServerFailed("no output expected");
}
- import State.{Value => _, _};
-
- val n = s.length;
-
- def scan(pos: Int, st: State.Value, bs: Boolean)
- {
- if (pos >= n) {
- if (bs)
- throw new InvalidQuotingException("trailing `\\'");
- else if (st == SQUOTE || st == DQUOTE)
- throw new InvalidQuotingException("unmatched quote");
- if (st != BETWEEN) ab += sb.mkString;
- } else (st, bs, s(pos)) match {
- case (BETWEEN, false, '\\') => scan(pos + 1, WORD, true);
- case (_, false, '\\') => scan(pos + 1, st, true);
- case (SQUOTE, false, ''') | (DQUOTE, false, '"') =>
- scan(pos + 1, WORD, false);
- case (BETWEEN | WORD, false, ''') => scan(pos + 1, SQUOTE, false);
- case (BETWEEN | WORD, false, '"') => scan(pos + 1, DQUOTE, false);
- case (BETWEEN, false, ch) if ch.isWhitespace =>
- scan(pos + 1, st, false);
- case (WORD, false, ch) if ch.isWhitespace =>
- ab += sb.mkString; sb.clear();
- scan(pos + 1, BETWEEN, false);
- case (BETWEEN, _, ch) => sb.append(ch); scan(pos + 1, WORD, false);
- case (_, _, ch) => sb.append(ch); scan(pos + 1, st, false);
+
+ def oneLine(): Seq[String] = {
+ if (hasNext) {
+ val line = next();
+ if (!hasNext) return line;
}
+ throw new ServerFailed("exactly one line expected");
}
- scan(0, BETWEEN, false);
- ab.toArray
}
- def main(args: Array[String])
- {
- if (args.length != 1) println(quote(args));
- else for (s <- split(args(0))) println(s);
+ def submit(bg: Boolean, toks: String*): this.Job = {
+ var cmd = toks;
+println(";; wait for lock");
+ synchronized {
+ if (bg) {
+ val tag = bgseq formatted "J%05d"; bgseq += 1;
+ cmd = toks match {
+ case Seq(cmd, tail@_*) => cmd +: "-background" +: tag +: tail;
+ }
+ }
+println(";; wait for foreground");
+ while (livep && fgjob != None) wait();
+ if (!livep) throw new ConnectionClosed;
+println(";; write command");
+ try { out.write(quoteTokens(cmd)); out.write('\n'); out.flush(); }
+ catch { case e: Throwable => notify(); throw e; }
+ val j = new Job;
+ fgjob = Some(j);
+ j
+ }
+ }
+
+ def submit(toks: String*): this.Job = submit(false, toks: _*);
+
+ /* These two expect the connection lock to be held. */
+ def foregroundJob: Job =
+ fgjob.getOrElse { throw new ServerFailed("no foreground job"); }
+ def releaseForegroundJob() { fgjob = None; notify(); }
+
+ def parseServerLine(s: String): Message = nextToken(s) match {
+ case None => throw new ServerFailed("empty line from server")
+ case Some(("TRACE", next)) => Trace(s.substring(next))
+ case Some((code, next)) => (code, splitTokens(s, next)) match {
+ case ("OK", Seq()) => JobOK
+ case ("INFO", tail) => JobInfo(tail)
+ case ("FAIL", tail) => JobFail(tail)
+ case ("BGDETACH", Seq(tag)) => JobDetached(tag)
+ case ("BGOK", Seq(tag)) => BackgroundJobMessage(tag, JobOK)
+ case ("BGINFO", Seq(tag, tail@_*)) =>
+ BackgroundJobMessage(tag, JobInfo(tail))
+ case ("BGFAIL", Seq(tag, tail@_*)) =>
+ BackgroundJobMessage(tag, JobFail(tail))
+ case ("WARN", tail) => Warning(tail)
+ case ("NOTE", tail) => Notify(tail)
+ case ("SVCCLAIM", Seq(svc, ver)) => ServiceClaim(svc, ver)
+ case ("SVCJOB", Seq(tag, svc, cmd, args@_*)) =>
+ ServiceJob(tag, svc, cmd, args)
+ case ("SVCCANCEL", Seq(tag)) => ServiceCancel(tag)
+ case (_, tail) => throw new ServerFailed(
+ "incomprehensible line from server: " + quoteTokens(code +: tail))
+ }
}
+
+ def processJobMessage(msg: JobMessage)
+ (getjob: (Boolean) => Job) {
+ synchronized { getjob(msg.isInstanceOf[JobInfo]); }.ch.write(msg);
+ }
+
+ /* Reading lines from the server. */
+ val readthr = thread("admin reader") {
+println(";; readthr running");
+ val bin = in match {
+ case br: BufferedReader => br;
+ case _ => new BufferedReader(in)
+ }
+ var line: String = null;
+
+ try {
+println(";; wait for line");
+ while ({line = bin.readLine; line != null}) {
+println(s";; line: $line");
+ parseServerLine(line) match {
+ case JobDetached(tag) => synchronized {
+ jobmap(tag) = foregroundJob; releaseForegroundJob();
+ }
+ case msg: JobMessage => processJobMessage(msg) { keep =>
+ val j = foregroundJob; if (!keep) releaseForegroundJob(); j
+ }
+ case BackgroundJobMessage(tag, msg) =>
+ processJobMessage(msg) { keep =>
+ val j = jobmap.getOrElse(tag, throw new ServerFailed(
+ s"no job with tag `${tag}'"));
+ if (!keep) jobmap.remove(tag);
+ j
+ }
+ case msg: AsyncMessage =>
+ callHook(msg);
+ case _: ServiceMessage =>
+ ok;
+ }
+ }
+ } catch {
+ case e: Throwable => e.printStackTrace();
+ } finally {
+ synchronized {
+ livep = false;
+ for ((_, j) <- jobmap) j.ch.write(JobLostConnection);
+ fgjob match {
+ case Some(j) =>
+ j.ch.write(JobLostConnection);
+ fgjob = None;
+ notifyAll();
+ case None => ok;
+ }
+ }
+ callHook(ConnectionLost);
+ }
+ }
+}
+
+/*----- That's all, folks -------------------------------------------------*/
+
}