Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Amazon S3 extractor #153

Merged
merged 11 commits into from
Oct 30, 2024
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/internetarchive/Zeno
go 1.22.4

require (
github.com/internetarchive/gocrawlhq v1.2.14
github.com/CorentinB/warc v0.8.53
github.com/PuerkitoBio/goquery v1.9.3
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2
Expand All @@ -14,6 +13,7 @@ require (
github.com/gosuri/uilive v0.0.4
github.com/gosuri/uitable v0.0.4
github.com/grafov/m3u8 v0.12.0
github.com/internetarchive/gocrawlhq v1.2.14
github.com/paulbellamy/ratecounter v0.2.0
github.com/philippgille/gokv/leveldb v0.7.0
github.com/prometheus/client_golang v1.20.4
Expand All @@ -32,6 +32,7 @@ require (
require (
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/andybalholm/cascadia v1.3.2 // indirect
github.com/aws/aws-sdk-go v1.55.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudflare/circl v1.4.0 // indirect
Expand All @@ -49,6 +50,7 @@ require (
github.com/google/go-cmp v0.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.10 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPd
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU=
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down Expand Up @@ -82,6 +84,9 @@ github.com/internetarchive/gocrawlhq v1.2.13 h1:ALfUrWR7nRez5gWhHRJ7ZklIpGMjERGM
github.com/internetarchive/gocrawlhq v1.2.13/go.mod h1:JQIKgebFmpbxmEalNRjID3RwCxHkslt3PHAnum82KtM=
github.com/internetarchive/gocrawlhq v1.2.14 h1:g3MPMonpA6mTkCpjBvW3paeBHiH+gGgwSvkyX/lxu7s=
github.com/internetarchive/gocrawlhq v1.2.14/go.mod h1:IOHVfWsptADzh+r2J+UnSm22EB9r8TiVVeAuP9WRFoc=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
Expand Down Expand Up @@ -264,6 +269,7 @@ gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
170 changes: 87 additions & 83 deletions internal/pkg/crawl/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,16 +439,25 @@ func (c *Crawl) Capture(item *queue.Item) error {
}

// If the response is an XML document, we want to scrape it for links
var outlinks []*url.URL
if strings.Contains(resp.Header.Get("Content-Type"), "xml") {
URLsFromXML, isSitemap, err := extractor.XML(resp)
if err != nil {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("unable to extract URLs from XML")
if extractor.IsS3(resp) {
URLsFromS3, err := extractor.S3(resp)
if err != nil {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while extracting URLs from S3")
}

outlinks = append(outlinks, URLsFromS3...)
} else {
if isSitemap {
waitGroup.Add(1)
go c.queueOutlinks(URLsFromXML, item, &waitGroup)
URLsFromXML, isSitemap, err := extractor.XML(resp)
if err != nil {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("unable to extract URLs from XML")
} else {
assets = append(assets, URLsFromXML...)
if isSitemap {
outlinks = append(outlinks, URLsFromXML...)
} else {
assets = append(assets, URLsFromXML...)
}
}
}
} else if strings.Contains(resp.Header.Get("Content-Type"), "json") {
Expand All @@ -470,100 +479,95 @@ func (c *Crawl) Capture(item *queue.Item) error {
}

return err
}

// Turn the response into a doc that we will scrape for outlinks and assets.
doc, err := goquery.NewDocumentFromReader(resp.Body)
if err != nil {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while creating goquery document")
return err
}

// Execute site-specific code on the document
if strings.Contains(base.Host, "cloudflarestream.com") {
// Look for JS files necessary for the playback of the video
cfstreamURLs, err := cloudflarestream.GetJSFiles(doc, base, *c.Client)
} else {
// Turn the response into a doc that we will scrape for outlinks and assets.
doc, err := goquery.NewDocumentFromReader(resp.Body)
if err != nil {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while getting JS files from cloudflarestream")
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while creating goquery document")
return err
}

// Seencheck the URLs we captured, we ignore the returned value here
// because we already archived the URLs, we just want them to be added
// to the seencheck table.
if c.UseSeencheck {
if c.UseHQ {
_, err := c.HQSeencheckURLs(utils.StringSliceToURLSlice(cfstreamURLs))
if err != nil {
c.Log.WithFields(c.genLogFields(err, item.URL, map[string]interface{}{
"urls": cfstreamURLs,
})).Error("error while seenchecking assets via HQ")
}
} else {
for _, cfstreamURL := range cfstreamURLs {
c.Seencheck.SeencheckURL(cfstreamURL, "asset")
}
}
}
// Log the archived URLs
for _, cfstreamURL := range cfstreamURLs {
c.Log.WithFields(c.genLogFields(err, cfstreamURL, map[string]interface{}{
"parentHop": item.Hop,
"parentUrl": utils.URLToString(item.URL),
"type": "asset",
})).Info("URL archived")
}
}

// Websites can use a <base> tag to specify a base for relative URLs in every other tags.
// This checks for the "base" tag and resets the "base" URL variable with the new base URL specified
// https://developer.mozilla.org/en-US/docs/Web/HTML/Element/base
if !utils.StringInSlice("base", c.DisabledHTMLTags) {
oldBase := base

doc.Find("base").Each(func(index int, goitem *goquery.Selection) {
// If a new base got scraped, stop looking for one
if oldBase != base {
return
// Execute site-specific code on the document
if strings.Contains(base.Host, "cloudflarestream.com") {
// Look for JS files necessary for the playback of the video
cfstreamURLs, err := cloudflarestream.GetJSFiles(doc, base, *c.Client)
if err != nil {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while getting JS files from cloudflarestream")
return err
}

// Attempt to get a new base value from the base HTML tag
link, exists := goitem.Attr("href")
if exists {
baseTagValue, err := url.Parse(link)
if err != nil {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while parsing base tag value")
// Seencheck the URLs we captured, we ignore the returned value here
// because we already archived the URLs, we just want them to be added
// to the seencheck table.
if c.UseSeencheck {
if c.UseHQ {
_, err := c.HQSeencheckURLs(utils.StringSliceToURLSlice(cfstreamURLs))
if err != nil {
c.Log.WithFields(c.genLogFields(err, item.URL, map[string]interface{}{
"urls": cfstreamURLs,
})).Error("error while seenchecking assets via HQ")
}
} else {
base = baseTagValue
for _, cfstreamURL := range cfstreamURLs {
c.Seencheck.SeencheckURL(cfstreamURL, "asset")
}
}
}
})
}
// Log the archived URLs
for _, cfstreamURL := range cfstreamURLs {
c.Log.WithFields(c.genLogFields(err, cfstreamURL, map[string]interface{}{
"parentHop": item.Hop,
"parentUrl": utils.URLToString(item.URL),
"type": "asset",
})).Info("URL archived")
}
}

// Extract outlinks
outlinks, err := c.extractOutlinks(base, doc)
if err != nil {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while extracting outlinks")
return err
}
// Websites can use a <base> tag to specify a base for relative URLs in every other tags.
// This checks for the "base" tag and resets the "base" URL variable with the new base URL specified
// https://developer.mozilla.org/en-US/docs/Web/HTML/Element/base
if !utils.StringInSlice("base", c.DisabledHTMLTags) {
oldBase := base

waitGroup.Add(1)
go c.queueOutlinks(outlinks, item, &waitGroup)
doc.Find("base").Each(func(index int, goitem *goquery.Selection) {
// If a new base got scraped, stop looking for one
if oldBase != base {
return
}

if c.DisableAssetsCapture {
return err
}
// Attempt to get a new base value from the base HTML tag
link, exists := goitem.Attr("href")
if exists {
baseTagValue, err := url.Parse(link)
if err != nil {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while parsing base tag value")
} else {
base = baseTagValue
}
}
})
}

// Extract and capture assets (only if we didn't use an extractor that produce assets)
if len(assets) == 0 {
assets, err = c.extractAssets(base, item, doc)
// Extract outlinks
outlinks, err = c.extractOutlinks(base, doc)
if err != nil {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while extracting assets")
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while extracting outlinks")
return err
}

if !c.DisableAssetsCapture {
assets, err = c.extractAssets(base, item, doc)
if err != nil {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while extracting assets")
return err
}
}
}

if len(assets) != 0 {
waitGroup.Add(1)
go c.queueOutlinks(outlinks, item, &waitGroup)

if !c.DisableAssetsCapture && len(assets) != 0 {
assets = c.seencheckAssets(assets, item)
if len(assets) != 0 {
c.captureAssets(item, assets, resp.Cookies(), nil)
Expand Down
124 changes: 124 additions & 0 deletions internal/pkg/crawl/extractor/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package extractor

import (
"encoding/xml"
"fmt"
"io"
"net/http"
"net/url"

"github.com/internetarchive/Zeno/internal/pkg/utils"
)

var validS3Servers = []string{
"AmazonS3",
"WasabiS3",
"UploadServer", // Google Cloud Storage
"Windows-Azure-Blob",
"AliyunOSS", // Alibaba Object Storage Service
}

// S3ListBucketResult represents the XML structure of an S3 bucket listing
type S3ListBucketResult struct {
XMLName xml.Name `xml:"ListBucketResult"`
Name string `xml:"Name"`
Prefix string `xml:"Prefix"`
Marker string `xml:"Marker"`
Contents []S3Object `xml:"Contents"`
CommonPrefixes []CommonPrefix `xml:"CommonPrefixes"`
IsTruncated bool `xml:"IsTruncated"`
NextContinuationToken string `xml:"NextContinuationToken"`
}

type S3Object struct {
Key string `xml:"Key"`
LastModified string `xml:"LastModified"`
Size int64 `xml:"Size"`
}

type CommonPrefix struct {
Prefix string `xml:"Prefix"`
}

// IsS3 checks if the response is from an S3 server
func IsS3(resp *http.Response) bool {
return utils.StringContainsSliceElements(resp.Header.Get("Server"), validS3Servers)
}

// S3 takes an initial response and returns URLs of either files or prefixes at the current level,
// plus continuation URL if more results exist
func S3(resp *http.Response) ([]*url.URL, error) {
result, err := S3ProcessResponse(resp)
if err != nil {
return nil, err
}

// Extract base URL from the response URL
reqURL := resp.Request.URL
requestQuery := reqURL.Query()
baseURL := fmt.Sprintf("https://%s", reqURL.Host)
parsedBase, err := url.Parse(baseURL)
if err != nil {
return nil, fmt.Errorf("invalid base URL: %v", err)
}

var urls []string

// Ensure we can add marker
// ListObjects
if requestQuery.Get("list-type") != "2" && len(result.Contents) > 0 {
// If we can, iterate through S3 using the marker field
nextURL := *reqURL
q := nextURL.Query()
q.Set("marker", result.Contents[len(result.Contents)-1].Key)
nextURL.RawQuery = q.Encode()
urls = append(urls, nextURL.String())
}

// If we are using list-type 2/ListObjectsV2
if len(result.CommonPrefixes) > 0 {
for _, prefix := range result.CommonPrefixes {
nextURL := *reqURL
q := nextURL.Query()
q.Set("prefix", prefix.Prefix)
nextURL.RawQuery = q.Encode()
urls = append(urls, nextURL.String())
}
} else {
// Otherwise return file URLs
for _, obj := range result.Contents {
if obj.Size > 0 {
fileURL := *parsedBase
fileURL.Path += "/" + obj.Key
urls = append(urls, fileURL.String())
}
}
}

// If there's a continuation token, add the continuation URL
if result.IsTruncated && result.NextContinuationToken != "" {
nextURL := *reqURL
q := nextURL.Query()
q.Set("continuation-token", result.NextContinuationToken)
nextURL.RawQuery = q.Encode()
urls = append(urls, nextURL.String())
}

return utils.StringSliceToURLSlice(urls), nil
}

// S3ProcessResponse parses an HTTP response into an S3ListBucketResult
func S3ProcessResponse(resp *http.Response) (*S3ListBucketResult, error) {
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body: %v", err)
}
defer resp.Body.Close()

var result S3ListBucketResult
if err := xml.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("error parsing XML: %v", err)
}

return &result, nil
}