X-Git-Url: https://git.distorted.org.uk/~mdw/tripe-android/blobdiff_plain/25c3546915ef2105c0d53983939da840ddbde795..3bb2303d42adb3f37420f168b009ecfe64f888cd:/admin.scala diff --git a/admin.scala b/admin.scala index 01d7995..52a2912 100644 --- a/admin.scala +++ b/admin.scala @@ -27,7 +27,7 @@ package uk.org.distorted.tripe; package object admin { /*----- Imports -----------------------------------------------------------*/ -import java.io.{BufferedReader, Reader, Writer}; +import java.io.{BufferedReader, InputStreamReader, OutputStreamWriter}; import java.util.concurrent.locks.{Condition, ReentrantLock => Lock}; import scala.collection.mutable.{HashMap, Publisher}; @@ -35,6 +35,7 @@ import scala.concurrent.Channel; import scala.util.control.Breaks; import Implicits._; +import sys.{serverInput, serverOutput}; /*----- Classification of server messages ---------------------------------*/ @@ -42,33 +43,30 @@ sealed abstract class Message; sealed abstract class JobMessage extends Message; case object JobOK extends JobMessage; -case class JobInfo(info: Seq[String]) extends JobMessage; -case class JobFail(err: Seq[String]) extends JobMessage; +final case class JobInfo(info: Seq[String]) extends JobMessage; +final case class JobFail(err: Seq[String]) extends JobMessage; case object JobLostConnection extends JobMessage; -case class BackgroundJobMessage(tag: String, msg: JobMessage) +final case class BackgroundJobMessage(tag: String, msg: JobMessage) extends Message; -case class JobDetached(tag: String) extends Message; +final case class JobDetached(tag: String) extends Message; sealed abstract class AsyncMessage extends Message; -case class Trace(msg: String) extends AsyncMessage; -case class Warning(err: Seq[String]) extends AsyncMessage; -case class Notify(note: Seq[String]) extends AsyncMessage; +final case class Trace(msg: String) extends AsyncMessage; +final case class Warning(err: Seq[String]) extends AsyncMessage; +final case class Notify(note: Seq[String]) extends AsyncMessage; case object ConnectionLost extends AsyncMessage; sealed abstract class ServiceMessage extends Message; -case class ServiceCancel(jobid: String) extends ServiceMessage; -case class ServiceClaim(svc: String, version: String) +final case class ServiceCancel(jobid: String) extends ServiceMessage; +final case class ServiceClaim(svc: String, version: String) extends ServiceMessage; -case class ServiceJob(jobid: String, svc: String, +final case class ServiceJob(jobid: String, svc: String, cmd: String, args: Seq[String]) extends ServiceMessage; /*----- Main code ---------------------------------------------------------*/ -object Connection { -} - class ConnectionClosed extends Exception; class ServerFailed(msg: String) extends Exception(msg); @@ -80,8 +78,7 @@ class CommandFailed(val msg: Seq[String]) extends Exception { class ConnectionLostException extends Exception; -class Connection(val in: Reader, val out: Writer) - extends Publisher[AsyncMessage] +object Connection extends Publisher[AsyncMessage] { /* Synchronization. * @@ -90,10 +87,15 @@ class Connection(val in: Reader, val out: Writer) * hold the `Connection' lock before locking any individual `Job' objects. */ - var livep: Boolean = true; // Is this connection still alive? - var fgjob: Option[this.Job] = None; // Foreground job, if there is one. - val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs. - var bgseq = 0; // Next background job tag. + private var livep: Boolean = true; // Is this connection still alive? + private var fgjob: Option[this.Job] = None; // Foreground job, if there is one. + private val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs. + private var bgseq = 0; // Next background job tag. + + private val in = new BufferedReader(new InputStreamReader(serverInput)); + private val out = new OutputStreamWriter(serverOutput); + + type Pub = Connection.type; class Job extends Iterator[Seq[String]] { private[Connection] val ch = new Channel[JobMessage]; @@ -184,8 +186,6 @@ println(";; write command"); def submit(toks: String*): this.Job = submit(false, toks: _*); - def close() { synchronized { out.close(); } } - /* These two expect the connection lock to be held. */ def foregroundJob: Job = fgjob.getOrElse { throw new ServerFailed("no foreground job"); } @@ -250,7 +250,7 @@ println(s";; line: $line"); case msg: AsyncMessage => publish(msg); case _: ServiceMessage => - (); + ok; } } } catch { @@ -264,11 +264,10 @@ println(s";; line: $line"); j.ch.write(JobLostConnection); fgjob = None; notifyAll(); - case None => (); + case None => ok; } } publish(ConnectionLost); - in.close(); out.close(); } } }