From c5cd77676ec91b3867fc653d3257de95aed346d1 Mon Sep 17 00:00:00 2001 From: "Stephen McQuay (smcquay)" Date: Wed, 16 Nov 2016 01:18:35 -0800 Subject: [PATCH] rearrange, rename, refactor --- check.go | 174 ++++++++++++++++++++++++++++--------------------------- hash.go | 120 ++++++++++++++++++++++++++++++++++++++ main.go | 108 ---------------------------------- 3 files changed, 209 insertions(+), 193 deletions(-) create mode 100644 hash.go diff --git a/check.go b/check.go index 1475b47..3318f2b 100644 --- a/check.go +++ b/check.go @@ -15,78 +15,38 @@ import ( "sync" ) -type checksum struct { - filename string - hash hash.Hash - checksum string -} - -func parseCS(line string) (checksum, error) { - elems := strings.Fields(line) - if len(elems) != 2 { - return checksum{}, fmt.Errorf("unexpected content: %d != 2", len(elems)) - } - cs, f := elems[0], elems[1] - var hsh hash.Hash - switch len(cs) { - case 32: - hsh = md5.New() - case 40: - hsh = sha1.New() - case 64: - hsh = sha256.New() - case 128: - hsh = sha512.New() - default: - return checksum{}, fmt.Errorf("unknown format: %q", line) - } - return checksum{filename: f, hash: hsh, checksum: cs}, nil -} - +// input contains a file-ish piece of work to perform type input struct { f io.ReadCloser err error } -type work struct { - cs checksum - err error +// checksum contains the path to a file, a way to hash it, and the results of +// the hash +type checksum struct { + filename string + hash hash.Hash + checksum string + err error } -func streams(files []string) chan input { - r := make(chan input) +// check is the entry point for -c operation. +func check(args []string) chan error { + jobs := make(chan checksum) go func() { - for _, name := range files { - f, err := os.Open(name) - r <- input{f, err} - } - if len(files) == 0 { - r <- input{f: os.Stdin} - } - close(r) - }() - - return r -} - -func check(files []string) chan error { - jobs := make(chan work) - - go func() { - for stream := range streams(files) { - if stream.err != nil { - jobs <- work{err: stream.err} + for i := range toInput(args) { + if i.err != nil { + jobs <- checksum{err: i.err} break } - s := bufio.NewScanner(stream.f) + s := bufio.NewScanner(i.f) for s.Scan() { - cs, err := parseCS(s.Text()) - jobs <- work{cs, err} + jobs <- parseCS(s.Text()) } - stream.f.Close() + i.f.Close() if s.Err() != nil { - jobs <- work{err: s.Err()} + jobs <- checksum{err: s.Err()} } } close(jobs) @@ -101,6 +61,77 @@ func check(files []string) chan error { return merge(results) } +// toInput converts args to a stream of input +func toInput(args []string) chan input { + r := make(chan input) + + go func() { + for _, name := range args { + f, err := os.Open(name) + r <- input{f, err} + } + if len(args) == 0 { + r <- input{f: os.Stdin} + } + close(r) + }() + + return r +} + +// parseCS picks apart a line from a checksum file and returns everything +// needed to perform a checksum. +func parseCS(line string) checksum { + elems := strings.Fields(line) + if len(elems) != 2 { + return checksum{err: fmt.Errorf("unexpected content: %d != 2", len(elems))} + } + cs, f := elems[0], elems[1] + var hsh hash.Hash + switch len(cs) { + case 32: + hsh = md5.New() + case 40: + hsh = sha1.New() + case 64: + hsh = sha256.New() + case 128: + hsh = sha512.New() + default: + return checksum{err: fmt.Errorf("unknown format: %q", line)} + } + return checksum{filename: f, hash: hsh, checksum: cs} +} + +// verify does grunt work of verifying a stream of jobs (filenames). +func verify(jobs chan checksum) chan error { + r := make(chan error) + go func() { + for job := range jobs { + if job.err != nil { + log.Printf("%+v", job.err) + continue + } + f, err := os.Open(job.filename) + if err != nil { + r <- err + continue + } + if _, err := io.Copy(job.hash, f); err != nil { + r <- err + continue + } + f.Close() + if fmt.Sprintf("%x", job.hash.Sum(nil)) != job.checksum { + r <- fmt.Errorf("%s: bad", job.filename) + } + } + close(r) + }() + return r +} + +// merge is simple error fan-in func merge(cs []<-chan error) chan error { out := make(chan error) @@ -124,30 +155,3 @@ func merge(cs []<-chan error) chan error { }() return out } - -func verify(jobs chan work) chan error { - r := make(chan error) - go func() { - for job := range jobs { - if job.err != nil { - log.Printf("%+v", job.err) - continue - } - f, err := os.Open(job.cs.filename) - if err != nil { - r <- fmt.Errorf("open: %v", err) - continue - } - if _, err := io.Copy(job.cs.hash, f); err != nil { - r <- err - continue - } - f.Close() - if fmt.Sprintf("%x", job.cs.hash.Sum(nil)) != job.cs.checksum { - r <- fmt.Errorf("%s: bad", job.cs.filename) - } - } - close(r) - }() - return r -} diff --git a/hash.go b/hash.go new file mode 100644 index 0000000..4a80a0e --- /dev/null +++ b/hash.go @@ -0,0 +1,120 @@ +package main + +import ( + "crypto/md5" + "crypto/sha1" + "crypto/sha256" + "crypto/sha512" + "fmt" + "hash" + "io" + "os" + "sync" +) + +// result is a message or error payload +type result struct { + msg string + err error +} + +// hashr exists so that we can make a thing that can return valid hash.Hash +// interfaces. +type hashr func() hash.Hash + +// hsh figures out which hash algo to use, and distributes the work of hashing +func hsh(files []string) chan result { + var h hashr + switch *algo { + case "sha1", "1": + h = sha1.New + case "sha256", "256": + h = sha256.New + case "sha512", "512": + h = sha512.New + case "md5": + h = md5.New + default: + r := make(chan result) + go func() { + r <- result{err: fmt.Errorf("unsupported algorithm: %v", *algo)} + }() + return r + } + + if len(files) == 0 { + hsh := h() + _, err := io.Copy(hsh, os.Stdin) + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } + fmt.Printf("%x -\n", hsh.Sum(nil)) + return nil + } + + jobs := make(chan checksum) + go func() { + for _, name := range files { + jobs <- checksum{filename: name} + } + close(jobs) + }() + + res := []<-chan result{} + for w := 0; w < *ngo; w++ { + res = append(res, compute(h, jobs)) + } + + return rmerge(res) +} + +// compute is the checksumming workhorse +func compute(h hashr, jobs chan checksum) chan result { + hsh := h() + r := make(chan result) + go func() { + for job := range jobs { + f, err := os.Open(job.filename) + if err != nil { + r <- result{err: err} + continue + } + hsh.Reset() + _, err = io.Copy(hsh, f) + f.Close() + if err != nil { + r <- result{err: err} + continue + } + r <- result{msg: fmt.Sprintf("%x %s", hsh.Sum(nil), job.filename)} + } + close(r) + }() + return r +} + +// rmerge implements fan-in +func rmerge(cs []<-chan result) chan result { + out := make(chan result) + + var wg sync.WaitGroup + + output := func(c <-chan result) { + for n := range c { + out <- n + } + wg.Done() + } + + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + go func() { + wg.Wait() + close(out) + }() + return out +} diff --git a/main.go b/main.go index 1796293..661d02f 100644 --- a/main.go +++ b/main.go @@ -1,17 +1,10 @@ package main import ( - "crypto/md5" - "crypto/sha1" - "crypto/sha256" - "crypto/sha512" "flag" "fmt" - "hash" - "io" "os" "runtime" - "sync" ) var algo = flag.String("a", "sha1", "algorithm to use") @@ -46,104 +39,3 @@ func main() { } } } - -type hashr func() hash.Hash - -func hsh(files []string) chan result { - var h hashr - switch *algo { - case "sha1", "1": - h = sha1.New - case "sha256", "256": - h = sha256.New - case "sha512", "512": - h = sha512.New - case "md5": - h = md5.New - default: - r := make(chan result) - go func() { - r <- result{err: fmt.Errorf("unsupported algorithm: %v", *algo)} - }() - return r - } - - if len(files) == 0 { - hsh := h() - _, err := io.Copy(hsh, os.Stdin) - if err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) - os.Exit(1) - } - fmt.Printf("%x -\n", hsh.Sum(nil)) - return nil - } - - jobs := make(chan work) - go func() { - for _, name := range files { - jobs <- work{cs: checksum{filename: name}} - } - close(jobs) - }() - - res := []<-chan result{} - for w := 0; w < *ngo; w++ { - res = append(res, compute(h, jobs)) - } - - return rmerge(res) -} - -type result struct { - msg string - err error -} - -func compute(h hashr, jobs chan work) chan result { - hsh := h() - r := make(chan result) - go func() { - for job := range jobs { - f, err := os.Open(job.cs.filename) - if err != nil { - r <- result{err: err} - continue - } - hsh.Reset() - _, err = io.Copy(hsh, f) - f.Close() - if err != nil { - r <- result{err: err} - continue - } - r <- result{msg: fmt.Sprintf("%x %s", hsh.Sum(nil), job.cs.filename)} - } - close(r) - }() - return r -} - -func rmerge(cs []<-chan result) chan result { - out := make(chan result) - - var wg sync.WaitGroup - - output := func(c <-chan result) { - for n := range c { - out <- n - } - wg.Done() - } - - wg.Add(len(cs)) - for _, c := range cs { - go output(c) - } - - go func() { - wg.Wait() - close(out) - }() - return out -}