X-Git-Url: https://git.distorted.org.uk/~mdw/tripe-android/blobdiff_plain/7894831e9078211df0b460c4d3dd1bc51ca46804..HEAD:/admin.scala diff --git a/admin.scala b/admin.scala index 85978fe..9796cd9 100644 --- a/admin.scala +++ b/admin.scala @@ -1,72 +1,277 @@ -package uk.org.distorted.tripe; - -import scala.collection.mutable.ArrayBuffer; - -object Admin { - val RX_ORDINARY = "^[^\\\\'\"\\s]+$".r; - val RX_WEIRD = "[\\\\'\"]".r; - - def quote(v: Seq[String]) = { - val b = new StringBuilder; - var sep = false; - for (s <- v) { - if (!sep) sep = true; - else b.append(' '); - s match { - case RX_ORDINARY() => b.append(s); - case _ => - b.append('"'); - b.append(RX_WEIRD.replaceAllIn(s, "\\\\$0")); - b.append('"'); +/* -*-scala-*- + * + * Managing TrIPE administration connections + * + * (c) 2018 Straylight/Edgeware + */ + +/*----- Licensing notice --------------------------------------------------* + * + * This file is part of the Trivial IP Encryption (TrIPE) Android app. + * + * TrIPE is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 3 of the License, or (at your + * option) any later version. + * + * TrIPE is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * along with TrIPE. If not, see . + */ + +package uk.org.distorted.tripe; package object admin { + +/*----- Imports -----------------------------------------------------------*/ + +import java.io.{BufferedReader, InputStreamReader, OutputStreamWriter}; +import java.util.concurrent.locks.{Condition, ReentrantLock => Lock}; + +import scala.collection.mutable.HashMap; +import scala.concurrent.Channel; +import scala.util.control.Breaks; + +import Implicits._; +import sys.{serverInput, serverOutput}; + +/*----- Classification of server messages ---------------------------------*/ + +sealed abstract class Message; + +sealed abstract class JobMessage extends Message; +case object JobOK extends JobMessage; +final case class JobInfo(info: Seq[String]) extends JobMessage; +final case class JobFail(err: Seq[String]) extends JobMessage; +case object JobLostConnection extends JobMessage; + +final case class BackgroundJobMessage(tag: String, msg: JobMessage) + extends Message; +final case class JobDetached(tag: String) extends Message; + +sealed abstract class AsyncMessage extends Message; +final case class Trace(msg: String) extends AsyncMessage; +final case class Warning(err: Seq[String]) extends AsyncMessage; +final case class Notify(note: Seq[String]) extends AsyncMessage; +case object ConnectionLost extends AsyncMessage; + +sealed abstract class ServiceMessage extends Message; +final case class ServiceCancel(jobid: String) extends ServiceMessage; +final case class ServiceClaim(svc: String, version: String) + extends ServiceMessage; +final case class ServiceJob(jobid: String, svc: String, + cmd: String, args: Seq[String]) + extends ServiceMessage; + +/*----- Main code ---------------------------------------------------------*/ + +class ConnectionClosed extends Exception; + +class ServerFailed(msg: String) extends Exception(msg); + +class CommandFailed(val msg: Seq[String]) extends Exception { + override def getMessage(): String = + "%s(%s)".format(getClass.getName, quoteTokens(msg)); +} + +class ConnectionLostException extends Exception; + +object Connection extends Hook[AsyncMessage] +{ + /* Synchronization. + * + * This class is complicatedly multithreaded. The following fields must + * only be accessed while the instance is locked. To prevent deadlocks, + * hold the `Connection' lock before locking any individual `Job' objects. + */ + + private var livep: Boolean = true; // Is this connection still alive? + private var fgjob: Option[this.Job] = None; // Foreground job, if there is one. + private val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs. + private var bgseq = 0; // Next background job tag. + + private val in = new BufferedReader(new InputStreamReader(serverInput)); + private val out = new OutputStreamWriter(serverOutput); + + type Pub = Connection.type; + + class Job extends Iterator[Seq[String]] { + private[Connection] val ch = new Channel[JobMessage]; + private[this] var nextmsg: Option[JobMessage] = None; + + private[this] def fetchNext() + { if (!nextmsg) nextmsg = Some(ch.read); } + override def hasNext: Boolean = { + fetchNext(); + nextmsg match { + case Some(JobOK) => false + case _ => true + } + } + override def next(): Seq[String] = { + fetchNext(); + nextmsg match { + case None => unreachable; + case Some(JobOK) => throw new NoSuchElementException; + case Some(JobFail(msg)) => throw new CommandFailed(msg); + case Some(JobLostConnection) => throw new ConnectionLostException; + case Some(JobInfo(msg)) => nextmsg = None; msg } } - b.mkString - } - class InvalidQuotingException(msg: String) extends Exception(msg); + def keyvals(): Map[String, String] = { + val b = Map.newBuilder[String, String]; + for (line <- this; token <- line) { + token.indexOf('=') match { + case -1 => throw new ServerFailed("missing `=' in key-value list"); + case eq => + val k = token.substring(0, eq); + val v = token.substring(eq + 1); + b += k -> v; + } + } + b.result + } - def split(s: String): Array[String] = { - val ab = new ArrayBuffer[String](); - val sb = new StringBuilder; + def traceish(): Seq[(Char, Boolean, String)] = { + val b = Seq.newBuilder[(Char, Boolean, String)]; + for (line <- this) line match { + case List(key, desc@_*) => + val live = if (key.length == 1) false + else if (key.length == 2 && key(1) == '+') true + else throw new ServerFailed( + s"incomprehensible traceish key `$key'"); + b += ((key(0), live, desc.mkString(" "))); + case _ => throw new ServerFailed("empty line in traceish output"); + } + b.result + } - object State extends Enumeration { - val BETWEEN, WORD, SQUOTE, DQUOTE = Value; + def expectEmpty() { + if (hasNext) throw new ServerFailed("no output expected"); } - import State.{Value => _, _}; - - val n = s.length; - - def scan(pos: Int, st: State.Value, bs: Boolean) - { - if (pos >= n) { - if (bs) - throw new InvalidQuotingException("trailing `\\'"); - else if (st == SQUOTE || st == DQUOTE) - throw new InvalidQuotingException("unmatched quote"); - if (st != BETWEEN) ab += sb.mkString; - } else (st, bs, s(pos)) match { - case (BETWEEN, false, '\\') => scan(pos + 1, WORD, true); - case (_, false, '\\') => scan(pos + 1, st, true); - case (SQUOTE, false, ''') | (DQUOTE, false, '"') => - scan(pos + 1, WORD, false); - case (BETWEEN | WORD, false, ''') => scan(pos + 1, SQUOTE, false); - case (BETWEEN | WORD, false, '"') => scan(pos + 1, DQUOTE, false); - case (BETWEEN, false, ch) if ch.isWhitespace => - scan(pos + 1, st, false); - case (WORD, false, ch) if ch.isWhitespace => - ab += sb.mkString; sb.clear(); - scan(pos + 1, BETWEEN, false); - case (BETWEEN, _, ch) => sb.append(ch); scan(pos + 1, WORD, false); - case (_, _, ch) => sb.append(ch); scan(pos + 1, st, false); + + def oneLine(): Seq[String] = { + if (hasNext) { + val line = next(); + if (!hasNext) return line; } + throw new ServerFailed("exactly one line expected"); } - scan(0, BETWEEN, false); - ab.toArray } - def main(args: Array[String]) - { - if (args.length != 1) println(quote(args)); - else for (s <- split(args(0))) println(s); + def submit(bg: Boolean, toks: String*): this.Job = { + var cmd = toks; +println(";; wait for lock"); + synchronized { + if (bg) { + val tag = bgseq formatted "J%05d"; bgseq += 1; + cmd = toks match { + case Seq(cmd, tail@_*) => cmd +: "-background" +: tag +: tail; + } + } +println(";; wait for foreground"); + while (livep && fgjob != None) wait(); + if (!livep) throw new ConnectionClosed; +println(";; write command"); + try { out.write(quoteTokens(cmd)); out.write('\n'); out.flush(); } + catch { case e: Throwable => notify(); throw e; } + val j = new Job; + fgjob = Some(j); + j + } + } + + def submit(toks: String*): this.Job = submit(false, toks: _*); + + /* These two expect the connection lock to be held. */ + def foregroundJob: Job = + fgjob.getOrElse { throw new ServerFailed("no foreground job"); } + def releaseForegroundJob() { fgjob = None; notify(); } + + def parseServerLine(s: String): Message = nextToken(s) match { + case None => throw new ServerFailed("empty line from server") + case Some(("TRACE", next)) => Trace(s.substring(next)) + case Some((code, next)) => (code, splitTokens(s, next)) match { + case ("OK", Seq()) => JobOK + case ("INFO", tail) => JobInfo(tail) + case ("FAIL", tail) => JobFail(tail) + case ("BGDETACH", Seq(tag)) => JobDetached(tag) + case ("BGOK", Seq(tag)) => BackgroundJobMessage(tag, JobOK) + case ("BGINFO", Seq(tag, tail@_*)) => + BackgroundJobMessage(tag, JobInfo(tail)) + case ("BGFAIL", Seq(tag, tail@_*)) => + BackgroundJobMessage(tag, JobFail(tail)) + case ("WARN", tail) => Warning(tail) + case ("NOTE", tail) => Notify(tail) + case ("SVCCLAIM", Seq(svc, ver)) => ServiceClaim(svc, ver) + case ("SVCJOB", Seq(tag, svc, cmd, args@_*)) => + ServiceJob(tag, svc, cmd, args) + case ("SVCCANCEL", Seq(tag)) => ServiceCancel(tag) + case (_, tail) => throw new ServerFailed( + "incomprehensible line from server: " + quoteTokens(code +: tail)) + } } + + def processJobMessage(msg: JobMessage) + (getjob: (Boolean) => Job) { + synchronized { getjob(msg.isInstanceOf[JobInfo]); }.ch.write(msg); + } + + /* Reading lines from the server. */ + val readthr = thread("admin reader") { +println(";; readthr running"); + val bin = in match { + case br: BufferedReader => br; + case _ => new BufferedReader(in) + } + var line: String = null; + + try { +println(";; wait for line"); + while ({line = bin.readLine; line != null}) { +println(s";; line: $line"); + parseServerLine(line) match { + case JobDetached(tag) => synchronized { + jobmap(tag) = foregroundJob; releaseForegroundJob(); + } + case msg: JobMessage => processJobMessage(msg) { keep => + val j = foregroundJob; if (!keep) releaseForegroundJob(); j + } + case BackgroundJobMessage(tag, msg) => + processJobMessage(msg) { keep => + val j = jobmap.getOrElse(tag, throw new ServerFailed( + s"no job with tag `${tag}'")); + if (!keep) jobmap.remove(tag); + j + } + case msg: AsyncMessage => + callHook(msg); + case _: ServiceMessage => + ok; + } + } + } catch { + case e: Throwable => e.printStackTrace(); + } finally { + synchronized { + livep = false; + for ((_, j) <- jobmap) j.ch.write(JobLostConnection); + fgjob match { + case Some(j) => + j.ch.write(JobLostConnection); + fgjob = None; + notifyAll(); + case None => ok; + } + } + callHook(ConnectionLost); + } + } +} + +/*----- That's all, folks -------------------------------------------------*/ + }