~elektito/gemplex

70352b2ab9ef5d9532595f1a7d1d6fe73e98b89e — Mostafa Razavi 1 year, 7 months ago 5f37fbd
Remove some extraneous url parsing in crawler
3 files changed, 107 insertions(+), 90 deletions(-)

M cmd/gemplex/crawl.go
M pkg/gcrawler/gcrawler.go
M pkg/gsearch/gsearch.go
M cmd/gemplex/crawl.go => cmd/gemplex/crawl.go +88 -86
@@ 37,7 37,7 @@ const (
)

type VisitResult struct {
	url         string
	url         gcrawler.PreparedUrl
	error       error
	statusCode  int
	meta        string


@@ 157,7 157,7 @@ redirect:
	return
}

func visitor(visitorId string, urls <-chan string, results chan<- VisitResult, done <-chan bool) {
func visitor(visitorId string, urls <-chan gcrawler.PreparedUrl, results chan<- VisitResult, done <-chan bool) {
	client := gemini.NewClient()
	ctx, cancelFunc := context.WithCancel(context.Background())



@@ 166,32 166,37 @@ func visitor(visitorId string, urls <-chan string, results chan<- VisitResult, d
		cancelFunc()
	}()

	for urlStr := range urls {
		log.Printf("[crawl][%s] Processing: %s\n", visitorId, urlStr)
		u, _ := url.Parse(urlStr)
	for u := range urls {
		log.Printf("[crawl][%s] Processing: %s\n", visitorId, u)

		body, code, meta, u, err := readGemini(ctx, client, u, visitorId)
		body, code, meta, finalUrl, err := readGemini(ctx, client, u.Parsed, visitorId)
		if errors.Is(err, context.Canceled) {
			break
		}
		if err != nil {
			log.Printf("[crawl][%s] Error: %s url=%s\n", visitorId, err, urlStr)
			log.Printf("[crawl][%s] Error: %s url=%s\n", visitorId, err, u)
			results <- VisitResult{
				url:        urlStr,
				meta:       meta,
				error:      err,
				statusCode: -1,
				url:         u,
				error:       err,
				statusCode:  -1,
				meta:        meta,
				page:        gparse.Page{},
				contents:    []byte{},
				contentType: "",
				visitTime:   time.Time{},
				banned:      false,
				isHostVisit: false,
			}
			continue
		}

		if code/10 == 2 { // SUCCESS
			contentType := meta
			page, err := gparse.ParsePage(body, u, contentType)
			page, err := gparse.ParsePage(body, finalUrl, contentType)
			if err != nil {
				log.Printf("[crawl][%s]Error parsing page: %s\n", visitorId, err)
				results <- VisitResult{
					url:         urlStr,
					url:         u,
					statusCode:  code,
					meta:        meta,
					contentType: contentType,


@@ 200,7 205,7 @@ func visitor(visitorId string, urls <-chan string, results chan<- VisitResult, d
				}
			} else {
				results <- VisitResult{
					url:         urlStr,
					url:         u,
					statusCode:  code,
					meta:        meta,
					page:        page,


@@ 211,10 216,16 @@ func visitor(visitorId string, urls <-chan string, results chan<- VisitResult, d
			}
		} else {
			results <- VisitResult{
				url:        urlStr,
				meta:       meta,
				error:      fmt.Errorf("STATUS: %d META: %s", code, meta),
				statusCode: code,
				url:         u,
				error:       fmt.Errorf("STATUS: %d META: %s", code, meta),
				statusCode:  code,
				meta:        meta,
				page:        gparse.Page{},
				contents:    []byte{},
				contentType: "",
				visitTime:   time.Time{},
				banned:      false,
				isHostVisit: false,
			}
		}



@@ 244,7 255,7 @@ update urls
set banned = $1
where url = $2
`
	_, err := Db.Exec(q, r.banned, r.url)
	_, err := Db.Exec(q, r.banned, r.url.String())
	utils.PanicOnErr(err)
}



@@ 282,7 293,7 @@ func updateDbSuccessfulVisit(r VisitResult) {
		contentHash, r.contents, r.page.Text, r.page.Lang, kind, ct, ctArgs, r.page.Title, r.visitTime,
	).Scan(&contentId)
	if err != nil {
		log.Println("[crawl] Database error when inserting contents for url:", r.url)
		log.Println("[crawl] Database error when inserting contents for url:", r.url.String())
		panic(err)
	}



@@ 293,7 304,7 @@ insert into images (image_hash, image, alt, content_hash, url, fetch_time)
values ($1, $2, $3, $4, $5, $6)
on conflict (image_hash)
do nothing
`, imgHash, img.Value, img.AltText, contentHash, r.url, r.visitTime)
`, imgHash, img.Value, img.AltText, contentHash, r.url.String(), r.visitTime)
		utils.PanicOnErr(err)
	}



@@ 307,21 318,21 @@ do nothing
                 retry_time = case when content_id = $1 then least(retry_time + $3, $4) else $5 end
                 where url = $6
                 returning id`,
		contentId, r.statusCode, revisitTimeIncrementNoChange, maxRevisitTime, revisitTimeAfterChange, r.url,
		contentId, r.statusCode, revisitTimeIncrementNoChange, maxRevisitTime, revisitTimeAfterChange, r.url.String(),
	).Scan(&urlId)
	if err == sql.ErrNoRows {
		log.Printf("[crawl] WARNING: URL not in the database, even though it should be; this is a bug! (%s)\n", r.url)
		log.Printf("[crawl] WARNING: URL not in the database, even though it should be; this is a bug! (%s)\n", r.url.String())
		return
	}
	if err != nil {
		log.Println("[crawl] Database error when updating url info:", r.url)
		log.Println("[crawl] Database error when updating url info:", r.url.String())
		panic(err)
	}

	// remove all existing links for this url
	_, err = tx.Exec(`delete from links where src_url_id = $1`, urlId)
	if err != nil {
		log.Println("[crawl] Database error when deleting existing links for url:", r.url)
		log.Println("[crawl] Database error when deleting existing links for url:", r.url.String())
		panic(err)
	}



@@ 361,12 372,6 @@ func updateDbSlowDownError(r VisitResult) {
		updateDbTempError(r)
	}

	// then also mark the hostname for slowdown
	uparsed, err := url.Parse(r.url)
	if err != nil {
		return
	}

	intervalSeconds, err := strconv.Atoi(r.meta)
	if err != nil {
		return


@@ 377,7 382,7 @@ update hosts
set slowdown_until = now() + make_interval(secs => $1)
where hostname = $2
`
	_, err = Db.Exec(q, intervalSeconds, uparsed.Host)
	_, err = Db.Exec(q, intervalSeconds, r.url.Parsed.Host)
	utils.PanicOnErr(err)
}



@@ 389,7 394,7 @@ func updateDbPermanentError(r VisitResult) {
                 status_code = $2,
                 retry_time = $3
                 where url = $4`,
		r.error.Error(), r.statusCode, permanentErrorRetry, r.url)
		r.error.Error(), r.statusCode, permanentErrorRetry, r.url.String())
	utils.PanicOnErr(err)
}



@@ 402,7 407,7 @@ func updateDbTempError(r VisitResult) {
                 status_code = $2,
                 retry_time = case when retry_time is null then $3 else least(retry_time * 2, $4) end
                 where url = $5`,
		r.error.Error(), r.statusCode, tempErrorMinRetry, maxRevisitTime, r.url)
		r.error.Error(), r.statusCode, tempErrorMinRetry, maxRevisitTime, r.url.String())
	utils.PanicOnErr(err)
}



@@ 445,9 450,9 @@ func hashString(input string) uint64 {
	return h.Sum64()
}

func isBanned(parsedLink *url.URL, robotsPrefixes []string) bool {
func isBanned(u gcrawler.PreparedUrl, robotsPrefixes []string) bool {
	for _, prefix := range robotsPrefixes {
		if strings.HasPrefix(parsedLink.Path, prefix) {
		if strings.HasPrefix(u.Parsed.Path, prefix) {
			return true
		}
	}


@@ 455,7 460,7 @@ func isBanned(parsedLink *url.URL, robotsPrefixes []string) bool {
	return false
}

func coordinator(nprocs int, visitorInputs []chan string, urlChan <-chan string, done chan bool, wg *sync.WaitGroup) {
func coordinator(nprocs int, visitorInputs []chan gcrawler.PreparedUrl, urlChan <-chan gcrawler.PreparedUrl, done chan bool, wg *sync.WaitGroup) {
	defer wg.Done()

	host2ip := map[string]string{}


@@ 464,18 469,14 @@ func coordinator(nprocs int, visitorInputs []chan string, urlChan <-chan string,
loop:
	for {
		select {
		case link := <-urlChan:
			if _, ok := seen[link]; ok {
		case u := <-urlChan:
			if _, ok := seen[u.String()]; ok {
				continue
			}

			seen[link] = true
			seen[u.String()] = true

			// urls should already be error checked (in GetLinks), so we ignore the
			// error here
			u, _ := url.Parse(link)

			host := u.Hostname()
			host := u.Parsed.Hostname()
			ip, ok := host2ip[host]
			if !ok {
				ips, err := net.LookupIP(host)


@@ 496,7 497,7 @@ loop:
			n := int(hashString(ip) % uint64(nprocs))

			select {
			case visitorInputs[n] <- link:
			case visitorInputs[n] <- u:
			case <-done:
				break loop
			default:


@@ 511,7 512,7 @@ loop:
	log.Println("[crawl][coord] Exited.")
}

func getDueUrls(ctx context.Context, c chan<- string) {
func getDueUrls(ctx context.Context, c chan<- gcrawler.PreparedUrl) {
	rows, err := Db.QueryContext(ctx, `
select url from urls u
left join hosts h on u.hostname = h.hostname


@@ 525,15 526,21 @@ where not banned and (h.slowdown_until is null or h.slowdown_until < now()) and

loop:
	for rows.Next() {
		var url string
		err = rows.Scan(&url)
		var ustr string
		err = rows.Scan(&ustr)
		if errors.Is(err, context.Canceled) {
			break
		}
		utils.PanicOnErr(err)

		uparsed, err := url.Parse(ustr)
		if err != nil {
			log.Println("Read invalid url from db:", ustr)
			continue
		}

		select {
		case c <- url:
		case c <- gcrawler.PreparedUrl{Parsed: uparsed, NonParsed: ustr}:
		case <-ctx.Done():
			break loop
		}


@@ 541,10 548,10 @@ loop:
	close(c)
}

func fetchRobotsRules(ctx context.Context, u *url.URL, client *gemini.Client, visitorId string) (prefixes []string, err error) {
func fetchRobotsRules(ctx context.Context, u gcrawler.PreparedUrl, client *gemini.Client, visitorId string) (prefixes []string, err error) {
	prefixes = make([]string, 0)

	robotsUrl, err := url.Parse("gemini://" + u.Host + "/robots.txt")
	robotsUrl, err := url.Parse("gemini://" + u.Parsed.Host + "/robots.txt")
	if err != nil {
		return
	}


@@ 565,7 572,7 @@ func fetchRobotsRules(ctx context.Context, u *url.URL, client *gemini.Client, vi
	} else if code/10 != 2 {
		// we'll still treat it as an empty list, but we'll log something about
		// it
		log.Printf("Cannot read robots.txt for hostname %s: got code %d. Treating it as no robots.txt.", u.Host, code)
		log.Printf("Cannot read robots.txt for hostname %s: got code %d. Treating it as no robots.txt.", u.Parsed.Host, code)
		return
	} else if finalUrl.String() != robotsUrl.String() {
		log.Printf("robots.txt redirected from %s to %s; treating it as no robots.txt.", robotsUrl.String(), finalUrl.String())


@@ 626,7 633,7 @@ func fetchRobotsRules(ctx context.Context, u *url.URL, client *gemini.Client, vi
	return
}

func getRobotsPrefixesFromDb(u *url.URL) (prefixes []string, validUntil time.Time, err error) {
func getRobotsPrefixesFromDb(u gcrawler.PreparedUrl) (prefixes []string, validUntil time.Time, err error) {
	var prefixesStr sql.NullString
	var nextTryTime sql.NullTime
	var validUntilNullable sql.NullTime


@@ 635,7 642,7 @@ select
    robots_prefixes, robots_valid_until, robots_last_visited + robots_retry_time
from hosts
where hostname = $1`
	row := Db.QueryRow(q, u.Host)
	row := Db.QueryRow(q, u.Parsed.Host)
	err = row.Scan(&prefixesStr, &validUntilNullable, &nextTryTime)
	if err == sql.ErrNoRows {
		return


@@ 657,7 664,7 @@ where hostname = $1`
	return
}

func updateRobotsRulesInDbWithError(u *url.URL, permanentError bool) {
func updateRobotsRulesInDbWithError(u gcrawler.PreparedUrl, permanentError bool) {
	var err error
	if permanentError {
		q := `


@@ 670,7 677,7 @@ set robots_prefixes = null,
    robots_last_visited = now(),
    robots_retry_time = $2,
    slowdown_until = now() + $2`
		_, err = Db.Exec(q, u.Host, permanentErrorRetry)
		_, err = Db.Exec(q, u.Parsed.Host, permanentErrorRetry)
	} else {
		q := `
insert into hosts


@@ 686,13 693,13 @@ set robots_prefixes = null,
    slowdown_until = now() + (case when excluded.robots_retry_time is null
                              then $2
                              else least(excluded.robots_retry_time * 2, $3) end)`
		_, err = Db.Exec(q, u.Host, tempErrorMinRetry, maxRevisitTime)
		_, err = Db.Exec(q, u.Parsed.Host, tempErrorMinRetry, maxRevisitTime)
	}

	utils.PanicOnErr(err)
}

func updateRobotsRulesInDbWithSuccess(u *url.URL, prefixes []string) {
func updateRobotsRulesInDbWithSuccess(u gcrawler.PreparedUrl, prefixes []string) {
	prefixesStr := strings.Join(prefixes, "\n")
	q := `
insert into hosts


@@ 705,7 712,7 @@ on conflict (hostname) do update set
    robots_last_visited = now(),
    robots_retry_time = null
`
	_, err := Db.Exec(q, prefixesStr, robotsTxtValidity, u.Host)
	_, err := Db.Exec(q, prefixesStr, robotsTxtValidity, u.Parsed.Host)
	utils.PanicOnErr(err)
}



@@ 726,7 733,7 @@ func isPermanentNetworkError(err error) bool {
	return false
}

func seeder(output chan<- string, visitResults chan VisitResult, done chan bool, wg *sync.WaitGroup) {
func seeder(output chan<- gcrawler.PreparedUrl, visitResults chan VisitResult, done chan bool, wg *sync.WaitGroup) {
	defer wg.Done()

	client := gemini.NewClient()


@@ 736,19 743,19 @@ func seeder(output chan<- string, visitResults chan VisitResult, done chan bool,
		err        error
	}
	robotsCache := map[string]RobotsRecord{}
	getOrFetchRobotsPrefixes := func(ctx context.Context, u *url.URL) (results []string, err error) {
		hit, ok := robotsCache[u.Host]
	getOrFetchRobotsPrefixes := func(ctx context.Context, u gcrawler.PreparedUrl) (results []string, err error) {
		hit, ok := robotsCache[u.Parsed.Host]
		if ok && hit.validUntil.Before(time.Now()) {
			results = hit.prefixes
			err = hit.err
			return
		} else if ok {
			delete(robotsCache, u.Host)
			delete(robotsCache, u.Parsed.Host)
		}

		results, validUntil, err := getRobotsPrefixesFromDb(u)
		if err == nil {
			robotsCache[u.Host] = RobotsRecord{
			robotsCache[u.Parsed.Host] = RobotsRecord{
				prefixes:   results,
				validUntil: validUntil,
			}


@@ 764,7 771,7 @@ func seeder(output chan<- string, visitResults chan VisitResult, done chan bool,
			return
		} else if errors.As(err, &slowdownErr) {
			updateDbSlowDownError(VisitResult{
				url:         u.String(),
				url:         u,
				meta:        slowdownErr.Meta,
				isHostVisit: true,
			})


@@ 789,19 796,14 @@ func seeder(output chan<- string, visitResults chan VisitResult, done chan bool,

loop:
	for {
		c := make(chan string)
		c := make(chan gcrawler.PreparedUrl)
		go getDueUrls(ctx, c)
		for urlString := range c {
			urlParsed, err := url.Parse(urlString)
			if err != nil {
				continue
			}

			if gcrawler.IsBlacklisted(urlString, urlParsed) {
		for u := range c {
			if gcrawler.IsBlacklisted(u) {
				continue
			}

			robotsPrefixes, err := getOrFetchRobotsPrefixes(ctx, urlParsed)
			robotsPrefixes, err := getOrFetchRobotsPrefixes(ctx, u)
			if errors.Is(err, context.Canceled) {
				break loop
			}


@@ 809,20 811,20 @@ loop:
				if err == ErrRobotsBackoff {
					// don't report these so logs aren't spammed
				} else {
					log.Printf("[crawl][seeder] Cannot read robots.txt for url %s: %s\n", urlString, err)
					log.Printf("[crawl][seeder] Cannot read robots.txt for url %s: %s\n", u.String(), err)
				}
				continue
			}
			if isBanned(urlParsed, robotsPrefixes) {
			if isBanned(u, robotsPrefixes) {
				visitResults <- VisitResult{
					url:    urlString,
					url:    u,
					banned: true,
				}
				continue
			}

			select {
			case output <- urlString:
			case output <- u:
			case <-ctx.Done():
				break loop
			}


@@ 961,7 963,7 @@ func logSizeGroups(sizeGroups map[int]int) {
	log.Println(msg)
}

func dumpCrawlerState(filename string, nprocs int, urls [][]string) {
func dumpCrawlerState(filename string, nprocs int, urls [][]gcrawler.PreparedUrl) {
	f, err := os.Create(filename)
	utils.PanicOnErr(err)
	defer f.Close()


@@ 973,7 975,7 @@ func dumpCrawlerState(filename string, nprocs int, urls [][]string) {

		f.WriteString(fmt.Sprintf("---- channel %d ----\n", i))
		for _, u := range urls[i] {
			f.WriteString(u + "\n")
			f.WriteString(u.String() + "\n")
		}
	}



@@ 987,10 989,10 @@ func crawl(done chan bool, wg *sync.WaitGroup) {

	// create an array of channel, which will each serve as the input to each
	// processor.
	inputUrls := make([]chan string, nprocs)
	inputUrls := make([]chan gcrawler.PreparedUrl, nprocs)
	visitorDone := make([]chan bool, nprocs)
	for i := 0; i < nprocs; i++ {
		inputUrls[i] = make(chan string, 1000)
		inputUrls[i] = make(chan gcrawler.PreparedUrl, 1000)
		visitorDone[i] = make(chan bool)
	}



@@ 1000,7 1002,7 @@ func crawl(done chan bool, wg *sync.WaitGroup) {
		go visitor(strconv.Itoa(i), inputUrls[i], visitResults, visitorDone[i])
	}

	urlChan := make(chan string, 100000)
	urlChan := make(chan gcrawler.PreparedUrl, 100000)
	coordDone := make(chan bool, 1)
	seedDone := make(chan bool, 1)
	flushDone := make(chan bool, 1)


@@ 1057,9 1059,9 @@ loop:
	}

	log.Println("[crawl] Draining channels...")
	urls := make([][]string, nprocs)
	urls := make([][]gcrawler.PreparedUrl, nprocs)
	for i := 0; i < nprocs; i++ {
		urls[i] = make([]string, 0)
		urls[i] = make([]gcrawler.PreparedUrl, 0)
	}
	for i, c := range inputUrls {
		for u := range c {

M pkg/gcrawler/gcrawler.go => pkg/gcrawler/gcrawler.go +18 -3
@@ 1,6 1,7 @@
package gcrawler

import (
	"fmt"
	"net/url"
	"strings"
)


@@ 47,13 48,27 @@ var blacklistedPrefixes = []string{
	"gemini://gemlog.stargrave.org/?",
}

func IsBlacklisted(link string, parsedLink *url.URL) bool {
	if _, ok := blacklistedDomains[parsedLink.Hostname()]; ok {
// since we frequently need both the parsed and non-parsed form of the url,
// we'll be passing this url around so we only need to parse once, and not have
// to reassemble the parsed url either.
type PreparedUrl struct {
	Parsed    *url.URL
	NonParsed string
}

func (u PreparedUrl) String() string {
	return u.NonParsed
}

var _ fmt.Stringer = (*PreparedUrl)(nil)

func IsBlacklisted(u PreparedUrl) bool {
	if _, ok := blacklistedDomains[u.Parsed.Hostname()]; ok {
		return true
	}

	for _, prefix := range blacklistedPrefixes {
		if strings.HasPrefix(link, prefix) {
		if strings.HasPrefix(u.String(), prefix) {
			return true
		}
	}

M pkg/gsearch/gsearch.go => pkg/gsearch/gsearch.go +1 -1
@@ 325,7 325,7 @@ loop:
		urlParsed, err = url.Parse(urlStr)
		if err != nil {
			log.Printf("WARNING: URL stored in db cannot be parsed: url=%s error=%s\n", urlStr, err)
		} else if gcrawler.IsBlacklisted(urlStr, urlParsed) {
		} else if gcrawler.IsBlacklisted(gcrawler.PreparedUrl{Parsed: urlParsed, NonParsed: urlStr}) {
			continue
		}