X-Git-Url: https://git.distorted.org.uk/~mdw/tripe-android/blobdiff_plain/25c3546915ef2105c0d53983939da840ddbde795..HEAD:/admin.scala diff --git a/admin.scala b/admin.scala index 01d7995..9796cd9 100644 --- a/admin.scala +++ b/admin.scala @@ -27,14 +27,15 @@ package uk.org.distorted.tripe; package object admin { /*----- Imports -----------------------------------------------------------*/ -import java.io.{BufferedReader, Reader, Writer}; +import java.io.{BufferedReader, InputStreamReader, OutputStreamWriter}; import java.util.concurrent.locks.{Condition, ReentrantLock => Lock}; -import scala.collection.mutable.{HashMap, Publisher}; +import scala.collection.mutable.HashMap; import scala.concurrent.Channel; import scala.util.control.Breaks; import Implicits._; +import sys.{serverInput, serverOutput}; /*----- Classification of server messages ---------------------------------*/ @@ -42,33 +43,30 @@ sealed abstract class Message; sealed abstract class JobMessage extends Message; case object JobOK extends JobMessage; -case class JobInfo(info: Seq[String]) extends JobMessage; -case class JobFail(err: Seq[String]) extends JobMessage; +final case class JobInfo(info: Seq[String]) extends JobMessage; +final case class JobFail(err: Seq[String]) extends JobMessage; case object JobLostConnection extends JobMessage; -case class BackgroundJobMessage(tag: String, msg: JobMessage) +final case class BackgroundJobMessage(tag: String, msg: JobMessage) extends Message; -case class JobDetached(tag: String) extends Message; +final case class JobDetached(tag: String) extends Message; sealed abstract class AsyncMessage extends Message; -case class Trace(msg: String) extends AsyncMessage; -case class Warning(err: Seq[String]) extends AsyncMessage; -case class Notify(note: Seq[String]) extends AsyncMessage; +final case class Trace(msg: String) extends AsyncMessage; +final case class Warning(err: Seq[String]) extends AsyncMessage; +final case class Notify(note: Seq[String]) extends AsyncMessage; case object ConnectionLost extends AsyncMessage; sealed abstract class ServiceMessage extends Message; -case class ServiceCancel(jobid: String) extends ServiceMessage; -case class ServiceClaim(svc: String, version: String) +final case class ServiceCancel(jobid: String) extends ServiceMessage; +final case class ServiceClaim(svc: String, version: String) extends ServiceMessage; -case class ServiceJob(jobid: String, svc: String, +final case class ServiceJob(jobid: String, svc: String, cmd: String, args: Seq[String]) extends ServiceMessage; /*----- Main code ---------------------------------------------------------*/ -object Connection { -} - class ConnectionClosed extends Exception; class ServerFailed(msg: String) extends Exception(msg); @@ -80,8 +78,7 @@ class CommandFailed(val msg: Seq[String]) extends Exception { class ConnectionLostException extends Exception; -class Connection(val in: Reader, val out: Writer) - extends Publisher[AsyncMessage] +object Connection extends Hook[AsyncMessage] { /* Synchronization. * @@ -90,17 +87,22 @@ class Connection(val in: Reader, val out: Writer) * hold the `Connection' lock before locking any individual `Job' objects. */ - var livep: Boolean = true; // Is this connection still alive? - var fgjob: Option[this.Job] = None; // Foreground job, if there is one. - val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs. - var bgseq = 0; // Next background job tag. + private var livep: Boolean = true; // Is this connection still alive? + private var fgjob: Option[this.Job] = None; // Foreground job, if there is one. + private val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs. + private var bgseq = 0; // Next background job tag. + + private val in = new BufferedReader(new InputStreamReader(serverInput)); + private val out = new OutputStreamWriter(serverOutput); + + type Pub = Connection.type; class Job extends Iterator[Seq[String]] { private[Connection] val ch = new Channel[JobMessage]; private[this] var nextmsg: Option[JobMessage] = None; private[this] def fetchNext() - { if (nextmsg == None) nextmsg = Some(ch.read); } + { if (!nextmsg) nextmsg = Some(ch.read); } override def hasNext: Boolean = { fetchNext(); nextmsg match { @@ -111,10 +113,10 @@ class Connection(val in: Reader, val out: Writer) override def next(): Seq[String] = { fetchNext(); nextmsg match { - case None => ??? - case Some(JobOK) => throw new NoSuchElementException - case Some(JobFail(msg)) => throw new CommandFailed(msg) - case Some(JobLostConnection) => throw new ConnectionLostException + case None => unreachable; + case Some(JobOK) => throw new NoSuchElementException; + case Some(JobFail(msg)) => throw new CommandFailed(msg); + case Some(JobLostConnection) => throw new ConnectionLostException; case Some(JobInfo(msg)) => nextmsg = None; msg } } @@ -184,8 +186,6 @@ println(";; write command"); def submit(toks: String*): this.Job = submit(false, toks: _*); - def close() { synchronized { out.close(); } } - /* These two expect the connection lock to be held. */ def foregroundJob: Job = fgjob.getOrElse { throw new ServerFailed("no foreground job"); } @@ -248,9 +248,9 @@ println(s";; line: $line"); j } case msg: AsyncMessage => - publish(msg); + callHook(msg); case _: ServiceMessage => - (); + ok; } } } catch { @@ -264,11 +264,10 @@ println(s";; line: $line"); j.ch.write(JobLostConnection); fgjob = None; notifyAll(); - case None => (); + case None => ok; } } - publish(ConnectionLost); - in.close(); out.close(); + callHook(ConnectionLost); } } }