Source file blog/pipelines/bounded.go

     1  // +build OMIT
     2  
     3  package main
     4  
     5  import (
     6  	"crypto/md5"
     7  	"errors"
     8  	"fmt"
     9  	"io/ioutil"
    10  	"os"
    11  	"path/filepath"
    12  	"sort"
    13  	"sync"
    14  )
    15  
    16  // walkFiles starts a goroutine to walk the directory tree at root and send the
    17  // path of each regular file on the string channel.  It sends the result of the
    18  // walk on the error channel.  If done is closed, walkFiles abandons its work.
    19  func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    20  	paths := make(chan string)
    21  	errc := make(chan error, 1)
    22  	go func() { // HL
    23  		// Close the paths channel after Walk returns.
    24  		defer close(paths) // HL
    25  		// No select needed for this send, since errc is buffered.
    26  		errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL
    27  			if err != nil {
    28  				return err
    29  			}
    30  			if !info.Mode().IsRegular() {
    31  				return nil
    32  			}
    33  			select {
    34  			case paths <- path: // HL
    35  			case <-done: // HL
    36  				return errors.New("walk canceled")
    37  			}
    38  			return nil
    39  		})
    40  	}()
    41  	return paths, errc
    42  }
    43  
    44  // A result is the product of reading and summing a file using MD5.
    45  type result struct {
    46  	path string
    47  	sum  [md5.Size]byte
    48  	err  error
    49  }
    50  
    51  // digester reads path names from paths and sends digests of the corresponding
    52  // files on c until either paths or done is closed.
    53  func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    54  	for path := range paths { // HLpaths
    55  		data, err := ioutil.ReadFile(path)
    56  		select {
    57  		case c <- result{path, md5.Sum(data), err}:
    58  		case <-done:
    59  			return
    60  		}
    61  	}
    62  }
    63  
    64  // MD5All reads all the files in the file tree rooted at root and returns a map
    65  // from file path to the MD5 sum of the file's contents.  If the directory walk
    66  // fails or any read operation fails, MD5All returns an error.  In that case,
    67  // MD5All does not wait for inflight read operations to complete.
    68  func MD5All(root string) (map[string][md5.Size]byte, error) {
    69  	// MD5All closes the done channel when it returns; it may do so before
    70  	// receiving all the values from c and errc.
    71  	done := make(chan struct{})
    72  	defer close(done)
    73  
    74  	paths, errc := walkFiles(done, root)
    75  
    76  	// Start a fixed number of goroutines to read and digest files.
    77  	c := make(chan result) // HLc
    78  	var wg sync.WaitGroup
    79  	const numDigesters = 20
    80  	wg.Add(numDigesters)
    81  	for i := 0; i < numDigesters; i++ {
    82  		go func() {
    83  			digester(done, paths, c) // HLc
    84  			wg.Done()
    85  		}()
    86  	}
    87  	go func() {
    88  		wg.Wait()
    89  		close(c) // HLc
    90  	}()
    91  	// End of pipeline. OMIT
    92  
    93  	m := make(map[string][md5.Size]byte)
    94  	for r := range c {
    95  		if r.err != nil {
    96  			return nil, r.err
    97  		}
    98  		m[r.path] = r.sum
    99  	}
   100  	// Check whether the Walk failed.
   101  	if err := <-errc; err != nil { // HLerrc
   102  		return nil, err
   103  	}
   104  	return m, nil
   105  }
   106  
   107  func main() {
   108  	// Calculate the MD5 sum of all files under the specified directory,
   109  	// then print the results sorted by path name.
   110  	m, err := MD5All(os.Args[1])
   111  	if err != nil {
   112  		fmt.Println(err)
   113  		return
   114  	}
   115  	var paths []string
   116  	for path := range m {
   117  		paths = append(paths, path)
   118  	}
   119  	sort.Strings(paths)
   120  	for _, path := range paths {
   121  		fmt.Printf("%x  %s\n", m[path], path)
   122  	}
   123  }
   124  

View as plain text