Мартин обнови решението на 09.12.2014 17:57 (преди над 3 години)
+package main
+
+import (
+ //"fmt"
+ "errors"
+ "io/ioutil"
+ "net/http"
+ "sync"
+ "time"
+)
+
+type queue struct {
+ conteiner []string
+ // pushEvent chan interface{}
+ empty bool
+ mutex sync.Mutex
+}
+
+func newQueue() *queue {
+ res := new(queue)
+ res.conteiner = make([]string, 0)
+ return res
+}
+
+func (q *queue) Empty() bool {
+ return q.empty
+}
+
+func (q *queue) Push(s string) {
+ q.mutex.Lock()
+ defer q.mutex.Unlock()
+ q.conteiner = append(q.conteiner, s)
+ q.empty = true
+}
+
+func (q *queue) Pop() string {
+ q.mutex.Lock()
+ defer q.mutex.Unlock()
+ res := q.conteiner[0]
+ q.conteiner = append(q.conteiner[:0], q.conteiner[1:]...)
+ if len(q.conteiner) == 0 {
+ q.empty = false
+ }
+ return res
+}
+
+type manager struct {
+ limit int
+ current int
+ release chan interface{}
+ mutex sync.Mutex
+}
+
+func newManager(limit int) *manager {
+ res := new(manager)
+ res.limit = limit
+ res.release = make(chan interface{})
+ res.current = 0
+
+ go func(m *manager) {
+ for {
+ _, ok := <-m.release
+ if !ok {
+ return
+ }
+ m.current--
+ time.Sleep(2 * time.Microsecond)
+ }
+ }(res)
+
+ return res
+}
+
+func (m *manager) AddTask(callback func(string) bool, url string, output chan<- string) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ for {
+ if m.current < m.limit {
+ m.current++
+ go worker(callback, url, output, m)
+ return
+ }
+ time.Sleep(2 * time.Microsecond)
+ }
+}
+
+func worker(callback func(string) bool, url string, output chan<- string, m *manager) {
+
+ defer func(m *manager) {
+ m.release <- struct{}{}
+ }(m)
+ response, err := http.Get(url)
+ if err != nil {
+ return
+ } else {
+ defer response.Body.Close()
+ contents, err := ioutil.ReadAll(response.Body)
+ if err != nil {
+ return
+ }
+ con := string(contents[:])
+ if callback(con) {
+ output <- url
+ }
+ }
+}
+
+func bufferedUrls(buffer *queue, chunkedUrlsToCheck <-chan []string) {
+ for {
+ select {
+ case urls, ok := <-chunkedUrlsToCheck:
+ if !ok {
+ return
+ }
+ for _, val := range urls {
+ buffer.Push(val)
+ }
+ }
+ }
+}
+
+func helpSaD(callback func(string) bool, buffer *queue, workersCount int, output chan<- string) {
+ m := newManager(workersCount)
+ for {
+
+ if buffer.Empty() {
+ m.AddTask(callback, buffer.Pop(), output)
+ }
+ time.Sleep(2 * time.Microsecond)
+ }
+}
+
+func SeekAndDestroy(callback func(string) bool, chunkedUrlsToCheck <-chan []string, workersCount int) (string, error) {
+ buffer := newQueue()
+ go bufferedUrls(buffer, chunkedUrlsToCheck)
+ output := make(chan string)
+ go helpSaD(callback, buffer, workersCount, output)
+ select {
+ case res := <-output:
+ return res, nil
+ case <-time.After(15 * time.Second):
+ return "", errors.New("probably a timeout")
+ }
+}