X-Git-Url: https://git.distorted.org.uk/~mdw/tripe-android/blobdiff_plain/c8292b34485a2e00e676023d4164dd5841e4659f..HEAD:/admin.scala?ds=sidebyside diff --git a/admin.scala b/admin.scala index 56f1c30..9796cd9 100644 --- a/admin.scala +++ b/admin.scala @@ -27,14 +27,15 @@ package uk.org.distorted.tripe; package object admin { /*----- Imports -----------------------------------------------------------*/ -import java.io.{BufferedReader, Reader, Writer}; +import java.io.{BufferedReader, InputStreamReader, OutputStreamWriter}; import java.util.concurrent.locks.{Condition, ReentrantLock => Lock}; -import scala.collection.mutable.{HashMap, Publisher}; +import scala.collection.mutable.HashMap; import scala.concurrent.Channel; import scala.util.control.Breaks; import Implicits._; +import sys.{serverInput, serverOutput}; /*----- Classification of server messages ---------------------------------*/ @@ -77,8 +78,7 @@ class CommandFailed(val msg: Seq[String]) extends Exception { class ConnectionLostException extends Exception; -class Connection(val in: Reader, val out: Writer) - extends Publisher[AsyncMessage] +object Connection extends Hook[AsyncMessage] { /* Synchronization. * @@ -87,17 +87,22 @@ class Connection(val in: Reader, val out: Writer) * hold the `Connection' lock before locking any individual `Job' objects. */ - var livep: Boolean = true; // Is this connection still alive? - var fgjob: Option[this.Job] = None; // Foreground job, if there is one. - val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs. - var bgseq = 0; // Next background job tag. + private var livep: Boolean = true; // Is this connection still alive? + private var fgjob: Option[this.Job] = None; // Foreground job, if there is one. + private val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs. + private var bgseq = 0; // Next background job tag. + + private val in = new BufferedReader(new InputStreamReader(serverInput)); + private val out = new OutputStreamWriter(serverOutput); + + type Pub = Connection.type; class Job extends Iterator[Seq[String]] { private[Connection] val ch = new Channel[JobMessage]; private[this] var nextmsg: Option[JobMessage] = None; private[this] def fetchNext() - { if (nextmsg == None) nextmsg = Some(ch.read); } + { if (!nextmsg) nextmsg = Some(ch.read); } override def hasNext: Boolean = { fetchNext(); nextmsg match { @@ -108,10 +113,10 @@ class Connection(val in: Reader, val out: Writer) override def next(): Seq[String] = { fetchNext(); nextmsg match { - case None => ??? - case Some(JobOK) => throw new NoSuchElementException - case Some(JobFail(msg)) => throw new CommandFailed(msg) - case Some(JobLostConnection) => throw new ConnectionLostException + case None => unreachable; + case Some(JobOK) => throw new NoSuchElementException; + case Some(JobFail(msg)) => throw new CommandFailed(msg); + case Some(JobLostConnection) => throw new ConnectionLostException; case Some(JobInfo(msg)) => nextmsg = None; msg } } @@ -181,8 +186,6 @@ println(";; write command"); def submit(toks: String*): this.Job = submit(false, toks: _*); - def close() { synchronized { out.close(); } } - /* These two expect the connection lock to be held. */ def foregroundJob: Job = fgjob.getOrElse { throw new ServerFailed("no foreground job"); } @@ -245,7 +248,7 @@ println(s";; line: $line"); j } case msg: AsyncMessage => - publish(msg); + callHook(msg); case _: ServiceMessage => ok; } @@ -264,8 +267,7 @@ println(s";; line: $line"); case None => ok; } } - publish(ConnectionLost); - in.close(); out.close(); + callHook(ConnectionLost); } } }