From 62edcae233b50265a33ea3fe3005540fe02d8ea9 Mon Sep 17 00:00:00 2001 From: Damien Robichaud Date: Tue, 9 Jul 2019 15:15:19 -0700 Subject: [PATCH] Implementation of configurable github crawler. Currently I've left the search splitting by file size out of this commit since it's ~200 lines of logic, and I think it's best to get it reviewed separately. In it's current state the crawler would only be able to get the last 1000 indexed files by Github, but it does show the general structure of how the crawler is implemented. --- internal/search/crawler/github/crawler.go | 421 ++++++++++++++++++++++ 1 file changed, 421 insertions(+) create mode 100644 internal/search/crawler/github/crawler.go diff --git a/internal/search/crawler/github/crawler.go b/internal/search/crawler/github/crawler.go new file mode 100644 index 000000000..2c0ea9ed9 --- /dev/null +++ b/internal/search/crawler/github/crawler.go @@ -0,0 +1,421 @@ +// Package github implements the crawler.Crawler interface, getting data +// from the Github search API. +package github + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "log" + "math" + "net/http" + "os" + "regexp" + "strconv" + "strings" + "time" + + "sigs.k8s.io/kustomize/internal/search/doc" +) + +var logger = log.New(os.Stdout, "Github Crawler: ", + log.LstdFlags|log.LUTC|log.Llongfile) + +// Implements crawler.Crawler. +type githubCrawler struct { + rc RequestConfig + query Query +} + +func NewCrawler( + accessToken string, retryCount uint64, query Query) githubCrawler { + + return githubCrawler{ + rc: RequestConfig{ + perPage: githubMaxPageSize, + retryCount: retryCount, + accessToken: accessToken, + }, + query: query, + } +} + +// Implements crawler.Crawler. +func (gc githubCrawler) Crawl( + ctx context.Context, output chan<- *doc.KustomizationDocument) error { + + // Range finding will be added in the next PR to make this one + // simpler/shorter. It would return multiple search queries for the + // Github API such that all documents can be retrieved. This is + // required since Github returns a max of 1000 results per query, so + // multiple queries that split the search space into chunks of 1000 + // kustomization files is required. + ranges := []string{gc.query.String()} + + // Query each range for files. + errs := make(multiError, 0) + for _, query := range ranges { + err := processQuery(ctx, gc.rc, query, output) + if err != nil { + errs = append(errs, err) + } + } + + return errs +} + +// processQuery follows all of the pages in a query, and updates/adds the +// documents from the crawl to the datastore/index. +func processQuery(ctx context.Context, rc RequestConfig, query string, + output chan<- *doc.KustomizationDocument) error { + + queryPages := make(chan GithubResponseInfo) + + go func() { + // Forward the document metadata to the retrieval channel. + // This separation allows for concurrent requests for the code + // search, and the retrieval portions of the API. + err := ForwardPaginatedQuery( + ctx, query, rc.retryCount, queryPages) + if err != nil { + // TODO(damienr74) handle this error with redis? + logger.Println(err) + } + close(queryPages) + }() + + errs := make(multiError, 0) + for page := range queryPages { + if page.Error != nil { + errs = append(errs, page.Error) + continue + } + + for _, file := range page.Parsed.Items { + // TODO(damienr74) This is where we'd need to + // communicate with redis. Currently always doing a full + // reindex of the documents. Since the documents are in + // sorted order in each bucket, we can short circuit the + // search when we find a file that has been seen, or we + // can choose to selectively update files. + + k, err := kustomizationResultAdapter(rc, file) + if err != nil { + errs = append(errs, err) + continue + } + output <- k + } + } + + return errs +} + +func kustomizationResultAdapter(rc RequestConfig, k GithubFileSpec) ( + *doc.KustomizationDocument, error) { + + data, err := GetFileData(rc, k) + if err != nil { + return nil, err + } + + creationTime, err := GetFileCreationTime(rc, k) + if err != nil { + logger.Printf("(Error: %v) initializing to current time.", err) + } + + doc := doc.KustomizationDocument{ + DocumentData: string(data), + FilePath: doc.Atom(k.Path), + RepositoryURL: doc.Atom(k.Repository.URL), + CreationTime: creationTime, + } + + return &doc, nil +} + +// ForwardPaginatedQuery follows the links to the next pages and performs all of +// the queries for a given search query, relaying the data from each request +// back to an output channel. +func ForwardPaginatedQuery(ctx context.Context, query string, retryCount uint64, + output chan<- GithubResponseInfo) error { + + response := parseGithubResponse(query, retryCount) + if response.Error != nil { + return response.Error + } + + output <- response + + for response.LastURL != "" && response.NextURL != "" { + select { + case <-ctx.Done(): + return nil + default: + response = parseGithubResponse(response.NextURL, retryCount) + if response.Error != nil { + return response.Error + } + + output <- response + } + } + + return nil +} + +// GetFileData gets the bytes from a file. +func GetFileData(rc RequestConfig, k GithubFileSpec) ([]byte, error) { + + url := rc.ContentsRequest(k.Repository.FullName, k.Path) + + logger.Println("content-url ", url) + resp, err := GetReposData(url, rc.RetryCount()) + if err != nil { + return nil, err + } + + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + type githubContentRawURL struct { + DownloadURL string `json:"download_url,omitempty"` + } + var rawURL githubContentRawURL + err = json.Unmarshal(data, &rawURL) + if err != nil { + return nil, err + } + + logger.Println("raw-data-url", rawURL.DownloadURL) + resp, err = GetReposData(rawURL.DownloadURL, rc.RetryCount()) + if err != nil { + return nil, err + } + + return ioutil.ReadAll(resp.Body) +} + +// GetFileCreationTime gets the earliest date of a file. +func GetFileCreationTime( + rc RequestConfig, k GithubFileSpec) (time.Time, error) { + + url := rc.CommitsRequest(k.Repository.FullName, k.Path) + + defaultTime := time.Now() + + logger.Println("commits-url", url) + resp, err := GetReposData(url, rc.RetryCount()) + if err != nil { + return defaultTime, err + } + + type DateSpec struct { + Commit struct { + Author struct { + Date string `json:"date,omitempty"` + } `json:"author,omitempty"` + } `json:"commit,omitempty"` + } + + _, lastURL := parseGithubLinkFormat(resp.Header.Get("link")) + if lastURL != "" { + resp, err = GetReposData(lastURL, rc.RetryCount()) + if err != nil { + return defaultTime, err + } + } + + data, err := ioutil.ReadAll(resp.Body) + earliestDate := []DateSpec{} + err = json.Unmarshal(data, &earliestDate) + size := len(earliestDate) + if err != nil || size == 0 { + return defaultTime, err + } + + return time.Parse(time.RFC3339, earliestDate[size-1].Commit.Author.Date) +} + +// TODO(damienr74) change the tickers to actually check api rate limits, reset +// times, and throttle requests dynamically based off of current utilization, +// instead of hardcoding the documented values, these calls are not quota'd. +// +// See https://developer.github.com/v3/rate_limit/ for details. +var ( + searchRateTicker = time.NewTicker(time.Second * 2) + contentRateTicker = time.NewTicker(time.Second * 1) +) + +func throttleSearchAPI() { + <-searchRateTicker.C +} + +func throttleRepoAPI() { + <-contentRateTicker.C +} + +const ( + accessTokenKeyword = "access_token=" + perPageKeyword = "per_page=" + contentSearchURL = "https://api.github.com/repos" + contentKeyword = "contents" +) + +type multiError []error + +func (me multiError) Error() string { + size := len(me) + 2 + strs := make([]string, size) + strs[0] = "Errors [\n\t" + for i, err := range me { + strs[i+1] = err.Error() + } + strs[size-1] = "\n]" + return strings.Join(strs, "\n\t") +} + +type GithubFileSpec struct { + Path string `json:"path,omitempty"` + Repository struct { + URL string `json:"html_url,omitempty"` + FullName string `json:"full_name,omitempty"` + } `json:"repository,omitempty"` +} + +type githubResponse struct { + // MaxUint is reserved as a sentinel value. + // This is the number of files that match the query. + TotalCount uint64 `json:"total_count,omitempty"` + + // Github representation of a file. + Items []GithubFileSpec `json:"items,omitempty"` +} + +type GithubResponseInfo struct { + *http.Response + Parsed *githubResponse + Error error + NextURL string + LastURL string +} + +func parseGithubLinkFormat(links string) (string, string) { + const ( + linkNext = "next" + linkLast = "last" + linkInfoURL = 1 + linkInfoRel = 2 + ) + + next, last := "", "" + linkInfo := regexp.MustCompile(`<(.*)>.*; rel="(last|next)"`) + + for _, link := range strings.Split(links, ",") { + linkParse := linkInfo.FindStringSubmatch(link) + if len(linkParse) != 3 { + continue + } + + url := linkParse[linkInfoURL] + switch linkParse[linkInfoRel] { + case linkNext: + next = url + case linkLast: + last = url + default: + } + } + + return next, last +} + +func parseGithubResponse(getRequest string, retryCount uint64) GithubResponseInfo { + resp, err := SearchGithubAPI(getRequest, retryCount) + requestInfo := GithubResponseInfo{ + Response: resp, + Error: err, + Parsed: nil, + } + + if err != nil || resp == nil { + return requestInfo + } + + var data []byte + data, requestInfo.Error = ioutil.ReadAll(resp.Body) + if requestInfo.Error != nil { + return requestInfo + } + + if resp.StatusCode != http.StatusOK { + logger.Println("Query: ", getRequest) + logger.Println("Status not OK at the source") + logger.Println("Header Dump", resp.Header) + logger.Println("Body Dump", string(data)) + requestInfo.Error = fmt.Errorf("Request Rejected, Status '%s'", + resp.Status) + return requestInfo + } + + requestInfo.NextURL, requestInfo.LastURL = + parseGithubLinkFormat(resp.Header.Get("link")) + + resultCount := githubResponse{ + TotalCount: math.MaxUint64, + } + requestInfo.Error = json.Unmarshal(data, &resultCount) + if requestInfo.Error != nil { + return requestInfo + } + + requestInfo.Parsed = &resultCount + + return requestInfo + +} + +// SearchGithubAPI performs a search query and handles rate limitting for +// the 'code/search?' endpoint as well as timed retries in the case of abuse +// prevention. +func SearchGithubAPI(query string, retryCount uint64) (*http.Response, error) { + throttleSearchAPI() + return getWithRetry(query, retryCount) +} + +// GetReposData performs a search query and handles rate limitting for +// the '/repos' endpoint as well as timed retries in the case of abuse +// prevention. +func GetReposData(query string, retryCount uint64) (*http.Response, error) { + throttleRepoAPI() + return getWithRetry(query, retryCount) +} + +func getWithRetry( + query string, retryCount uint64) (resp *http.Response, err error) { + + resp, err = http.Get(query) + + for err == nil && + resp.StatusCode == http.StatusForbidden && + retryCount > 0 { + + retryTime := resp.Header.Get("Retry-After") + i, err := strconv.Atoi(retryTime) + if err != nil { + return resp, fmt.Errorf("Forbidden without 'Retry-After'") + } + logger.Printf( + "Status Forbidden, retring %d more times\n", retryCount) + + logger.Printf("Waiting %d seconds before retrying\n", i) + time.Sleep(time.Second * time.Duration(i)) + retryCount-- + resp, err = http.Get(query) + } + + return resp, err +}