Иван обнови решението на 10.12.2014 17:12 (преди над 3 години)
+package main
+
+import (
+ "errors"
+ "io/ioutil"
+ "net/http"
+ "sync"
+ "time"
+)
+
+type manager struct {
+ maxWorkers int
+ inProgress int
+ mtx sync.Mutex
+ input chan string
+ workQueue chan string
+ output chan string
+ callback func(string) bool
+ done chan struct{}
+}
+
+func NewManager(maxWorkers int, input chan string, callback func(string) bool) *manager {
+ m := &manager{}
+
+ m.maxWorkers = maxWorkers
+ m.callback = callback
+ m.input = input
+
+ m.inProgress = 0
+ m.done = make(chan struct{})
+ m.output = make(chan string)
+ m.workQueue = make(chan string, 1)
+ go m.start()
+ return m
+}
+
+func (m *manager) OutputChan() chan string {
+ return m.output
+}
+
+func (m *manager) start() {
+ for {
+ select {
+ case work := <-m.input:
+ m.workQueue <- work
+ if m.inProgress < m.maxWorkers {
+ m.mtx.Lock()
+ m.inProgress++
+ m.mtx.Unlock()
+ m.startWorker()
+ }
+ case <-m.done:
+ return
+ }
+ }
+}
+
+func (m *manager) startWorker() {
+ go func() {
+ MainLoop:
+ for {
+
+ select {
+ case url := <-m.workQueue:
+ client := http.Client{
+ Timeout: time.Duration(time.Second * 3),
+ }
+
+ response, err := client.Get(url)
+ if err != nil {
+ continue MainLoop
+ } else {
+ defer response.Body.Close()
+ content, err := ioutil.ReadAll(response.Body)
+ if err != nil {
+ continue MainLoop
+ }
+
+ if m.callback(string(content)) {
+ m.output <- url
+ m.done <- struct{}{}
+ }
+ }
+ case <-time.After(time.Millisecond * 200):
+ m.mtx.Lock()
+ m.inProgress--
+ m.mtx.Unlock()
+ return
+ }
+
+ }
+ }()
+}
+
+func dispatch(from <-chan []string, to chan<- string, stopped chan struct{}) {
+ for input := range from {
+ go func(urls []string) {
+ for _, url := range urls {
+ to <- url
+ }
+ }(input)
+ }
+ stopped <- struct{}{}
+}
+
+func SeekAndDestroy(callback func(string) bool, chunkedUrlsToCheck <-chan []string, workersCount int) (string, error) {
+ if workersCount < 1 {
+ return "", errors.New("Expected positive integer")
+ }
+ if chunkedUrlsToCheck == nil {
+ return "", errors.New("Expected initialized channel")
+ }
+ input := make(chan string)
+ inputClosed := make(chan struct{})
+ go dispatch(chunkedUrlsToCheck, input, inputClosed)
+ m := NewManager(workersCount, input, callback)
+
+ result := m.OutputChan()
+ select {
+ case result := <-result:
+ return result, nil
+ case <-inputClosed:
+ return "", errors.New("Input closed")
+ case <-time.After(time.Second * 15):
+ return "", errors.New("An error occurred - probably a timeout :)")
+ }
+}