Source file blog/pipelines/parallel.go

     1  // +build OMIT
     2  
     3  package main
     4  
     5  import (
     6  	"crypto/md5"
     7  	"errors"
     8  	"fmt"
     9  	"io/ioutil"
    10  	"os"
    11  	"path/filepath"
    12  	"sort"
    13  	"sync"
    14  )
    15  
    16  // A result is the product of reading and summing a file using MD5.
    17  type result struct {
    18  	path string
    19  	sum  [md5.Size]byte
    20  	err  error
    21  }
    22  
    23  // sumFiles starts goroutines to walk the directory tree at root and digest each
    24  // regular file.  These goroutines send the results of the digests on the result
    25  // channel and send the result of the walk on the error channel.  If done is
    26  // closed, sumFiles abandons its work.
    27  func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    28  	// For each regular file, start a goroutine that sums the file and sends
    29  	// the result on c.  Send the result of the walk on errc.
    30  	c := make(chan result)
    31  	errc := make(chan error, 1)
    32  	go func() { // HL
    33  		var wg sync.WaitGroup
    34  		err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
    35  			if err != nil {
    36  				return err
    37  			}
    38  			if !info.Mode().IsRegular() {
    39  				return nil
    40  			}
    41  			wg.Add(1)
    42  			go func() { // HL
    43  				data, err := ioutil.ReadFile(path)
    44  				select {
    45  				case c <- result{path, md5.Sum(data), err}: // HL
    46  				case <-done: // HL
    47  				}
    48  				wg.Done()
    49  			}()
    50  			// Abort the walk if done is closed.
    51  			select {
    52  			case <-done: // HL
    53  				return errors.New("walk canceled")
    54  			default:
    55  				return nil
    56  			}
    57  		})
    58  		// Walk has returned, so all calls to wg.Add are done.  Start a
    59  		// goroutine to close c once all the sends are done.
    60  		go func() { // HL
    61  			wg.Wait()
    62  			close(c) // HL
    63  		}()
    64  		// No select needed here, since errc is buffered.
    65  		errc <- err // HL
    66  	}()
    67  	return c, errc
    68  }
    69  
    70  // MD5All reads all the files in the file tree rooted at root and returns a map
    71  // from file path to the MD5 sum of the file's contents.  If the directory walk
    72  // fails or any read operation fails, MD5All returns an error.  In that case,
    73  // MD5All does not wait for inflight read operations to complete.
    74  func MD5All(root string) (map[string][md5.Size]byte, error) {
    75  	// MD5All closes the done channel when it returns; it may do so before
    76  	// receiving all the values from c and errc.
    77  	done := make(chan struct{}) // HLdone
    78  	defer close(done)           // HLdone
    79  
    80  	c, errc := sumFiles(done, root) // HLdone
    81  
    82  	m := make(map[string][md5.Size]byte)
    83  	for r := range c { // HLrange
    84  		if r.err != nil {
    85  			return nil, r.err
    86  		}
    87  		m[r.path] = r.sum
    88  	}
    89  	if err := <-errc; err != nil {
    90  		return nil, err
    91  	}
    92  	return m, nil
    93  }
    94  
    95  func main() {
    96  	// Calculate the MD5 sum of all files under the specified directory,
    97  	// then print the results sorted by path name.
    98  	m, err := MD5All(os.Args[1])
    99  	if err != nil {
   100  		fmt.Println(err)
   101  		return
   102  	}
   103  	var paths []string
   104  	for path := range m {
   105  		paths = append(paths, path)
   106  	}
   107  	sort.Strings(paths)
   108  	for _, path := range paths {
   109  		fmt.Printf("%x  %s\n", m[path], path)
   110  	}
   111  }
   112  

View as plain text