From 9a4b76f610fc0f968d5750a20e21275a21afef80 Mon Sep 17 00:00:00 2001 From: Stephen McQuay Date: Thu, 13 Mar 2014 00:12:21 -0700 Subject: [PATCH] Added io.Reader and io.Writer wrapper This finalizes the work on getting the server to support multiple encodings for players and spectators. --- player.go | 118 ++++++++++++++++++++++++++++++---------------------- protocol.go | 4 +- 2 files changed, 70 insertions(+), 52 deletions(-) diff --git a/player.go b/player.go index 97b303d..b28a30c 100644 --- a/player.go +++ b/player.go @@ -1,9 +1,8 @@ package main import ( + "encoding/gob" "encoding/json" - "errors" - "fmt" "log" "bitbucket.org/smcquay/bandwidth" @@ -12,78 +11,95 @@ import ( const maxMessageSize = 1024 -type protoTalker struct { - ws *websocket.Conn - bw *bandwidth.Bandwidth - send chan Message - buff []byte - Id string +type encoder interface { + Encode(v interface{}) error } -func NewProtoTalker(id string, ws *websocket.Conn, bw *bandwidth.Bandwidth) *protoTalker { +type decoder interface { + Decode(v interface{}) error +} + +type streamCounter struct { + ws *websocket.Conn + bw *bandwidth.Bandwidth +} + +func (sc *streamCounter) Read(p []byte) (n int, err error) { + n, err = sc.ws.Read(p) + sc.bw.AddRx <- n + return n, err +} + +func (sc *streamCounter) Write(p []byte) (n int, err error) { + n, err = sc.ws.Write(p) + sc.bw.AddTx <- n + return n, err +} + +func (sc *streamCounter) Close() error { + return sc.ws.Close() +} + +type protoTalker struct { + enc encoder + dec decoder + counter *streamCounter + send chan Message + Id string +} + +func NewProtoTalker(id string, ws *websocket.Conn, bw *bandwidth.Bandwidth, encoding string) *protoTalker { + var enc encoder + var dec decoder + comptroller := &streamCounter{ + ws: ws, + bw: bw, + } + if encoding == "json" { + enc = json.NewEncoder(comptroller) + dec = json.NewDecoder(comptroller) + } else { + enc = gob.NewEncoder(comptroller) + dec = gob.NewDecoder(comptroller) + } return &protoTalker{ - send: make(chan Message, 16), - ws: ws, - bw: bw, - Id: id, - buff: make([]byte, maxMessageSize), + send: make(chan Message, 16), + enc: enc, + dec: dec, + counter: comptroller, + Id: id, } } func (pt *protoTalker) sender() { log.Printf("%s: client launched", pt.Id) for things := range pt.send { - b, err := json.Marshal(things) - if err != nil { - break - } - n, err := pt.ws.Write(b) - pt.bw.AddTx <- n + err := pt.enc.Encode(things) if err != nil { break } } - pt.ws.Close() + pt.counter.Close() log.Printf("%s: spectator sender close", pt.Id) } -func (pt *protoTalker) readJSON() (map[string]Instruction, error) { - msg := map[string]Instruction{} - n, err := pt.ws.Read(pt.buff) - if err != nil { - log.Printf("%s: problem reading from player: %s", pt.Id, err) - return nil, err - } - pt.bw.AddRx <- n - if n == len(pt.buff) { - errMsg := fmt.Sprintf("%s: read buffer overfull: %s", pt.Id, string(pt.buff)) - log.Printf(errMsg) - return msg, errors.New(errMsg) - } - err = json.Unmarshal(pt.buff[:n], &msg) - if err != nil { - log.Printf("%s: problem reading from player: %s", pt.Id, err) - return nil, err - } - return msg, nil -} - type player struct { Robots []*Robot Instruction Instruction protoTalker } -func NewPlayer(id string, ws *websocket.Conn, bw *bandwidth.Bandwidth) *player { +func NewPlayer(id string, ws *websocket.Conn, bw *bandwidth.Bandwidth, encoding string) *player { return &player{ Robots: []*Robot{}, - protoTalker: *NewProtoTalker(id, ws, bw), + protoTalker: *NewProtoTalker(id, ws, bw, encoding), } } func (p *player) recv() { for { - msgs, err := p.readJSON() + var msgs map[string]Instruction + err := p.dec.Decode(&msgs) if err != nil { log.Printf("%s: %s", p.Id, err) break @@ -149,26 +165,28 @@ func (p *player) recv() { } log.Printf("%s: recv close", p.Id) - p.ws.Close() + p.counter.Close() } type Spectator struct { protoTalker } -func NewSpectator(id string, ws *websocket.Conn, bw *bandwidth.Bandwidth) *Spectator { +func NewSpectator(id string, ws *websocket.Conn, bw *bandwidth.Bandwidth, encoding string) *Spectator { return &Spectator{ - protoTalker: *NewProtoTalker(id, ws, bw), + protoTalker: *NewProtoTalker(id, ws, bw, encoding), } } func (s *Spectator) recv() { for { - _, err := s.readJSON() + var msgs interface{} + err := s.dec.Decode(&msgs) if err != nil { log.Printf("%s: %s", s.Id, err) break } + // After the first bit of handshaking, the rest of the messages should // only be "{}" for spectators, and the following could hold true: // @@ -178,5 +196,5 @@ func (s *Spectator) recv() { // } } log.Printf("%s: recv close", s.Id) - s.ws.Close() + s.counter.Close() } diff --git a/protocol.go b/protocol.go index 0cb79d5..f9ea4ff 100644 --- a/protocol.go +++ b/protocol.go @@ -275,7 +275,7 @@ encodingLoops: } } - p := NewPlayer(player_id, ws, game.bw) + p := NewPlayer(player_id, ws, game.bw, encoding) log.Printf("%s: made a player: %s", gid.Id, p.Id) convertedStats := map[string]Stats{} @@ -330,7 +330,7 @@ encodingLoops: gid.Id, ) case "spectator": - s := NewSpectator(player_id, ws, game.bw) + s := NewSpectator(player_id, ws, game.bw, encoding) log.Printf("%s, %s: about to register this spectator", gid.Id, s.Id) game.sregister <- s log.Printf("%s, %s: registered spectator", gid.Id, s.Id)