Commit 04eab53e authored by Jürgen Enge's avatar Jürgen Enge

concurrent workers

parent 9df4ddb2
......@@ -19,6 +19,7 @@ func (d *duration) UnmarshalText(text []byte) error {
type CfgDBMySQL struct {
Dsn string
ConnMaxTimeout duration
Schema string
}
type FileMap struct {
......@@ -26,26 +27,38 @@ type FileMap struct {
Folder string
}
type Banner struct {
Timeout duration
Folder string
Timestamp string
}
type Crawler struct {
OK duration
Error duration
ErrorNew duration
Cron string
Timeout duration
PageSize int
HeaderSize int
Workers int
}
type Config struct {
Logfile string
Loglevel string
CertPEM string
KeyPEM string
Addr string
JwtKey string
DB CfgDBMySQL
PageSize int
HeaderSize int
TempDir string
Siegfried string
CrawlOK duration
CrawlError duration
CrawlErrorNew duration
Cron string
Ffmpeg string
Ffprobe string
BannerFolder string
FileMap []FileMap
Logfile string
Loglevel string
CertPEM string
KeyPEM string
Addr string
JwtKey string
DB CfgDBMySQL
TempDir string
Siegfried string
Ffmpeg string
Ffprobe string
Crawler Crawler
Banner Banner
FileMap []FileMap
}
func LoadConfig(filepath string) Config {
......
......@@ -104,25 +104,45 @@ func main() {
cr := memocrawler.NewCrawler(
db,
config.HeaderSize,
config.PageSize,
config.Crawler.Workers,
config.Crawler.PageSize,
config.TempDir,
config.Siegfried,
config.CrawlOK.Duration,
config.CrawlError.Duration,
config.CrawlErrorNew.Duration,
config.Crawler.OK.Duration,
config.Crawler.Error.Duration,
config.Crawler.ErrorNew.Duration,
config.Ffmpeg,
config.Ffprobe,
config.BannerFolder,
fm,
log)
cr.CrawlAll()
cb := memocrawler.NewCrawlerBanner(cr, config.Banner.Timeout.Duration, config.Banner.Folder, config.Banner.Timestamp)
cr.SetCrawlerBanner(cb)
cl := memocrawler.NewCrawlerLinkcheck(cr, config.Crawler.Timeout.Duration, config.Crawler.HeaderSize)
cr.SetCrawlerLinkcheck(cl)
/*
sqlstr := fmt.Sprintf("INSERT INTO %s.test2 (sig, uri, access, proto, status) VALUES (?, ?, ?, ?, ?)", config.DB.Schema)
for i := 1000; i < 5000; i++ {
var params []interface{}
params = append(params, fmt.Sprintf("sig-%v", i),
"https://ba14ns21403.fhnw.ch/video/open/performance/2002_B_B_Yours_Sincerly.mov.mp4",
"public",
"redirect",
"new")
_, err := db.Exec(sqlstr, params...)
if err != nil {
log.Panicf("%s, %v: %v", sqlstr, params, err)
}
}
*/
cr.Start()
return
c := cron.New(cron.WithLogger(cronLogger{log:log}))
c.AddFunc(config.Cron, func() {
if err := cr.CrawlAll(); err != nil {
c.AddFunc(config.Crawler.Cron, func() {
if err := cr.Start(); err != nil {
log.Errorf( "crawl error: %v", err)
}
})
......
......@@ -5,18 +5,25 @@ addr = "localhost:81"
certpem = "" # tls client certificate file in PEM format
keypem = "" # tls client key file in PEM format
jwtkey = "swordfish"
headersize = 5000 # number of bytes which are send to siegfried
pagesize = 100 # number of entries done by one database access
tempdir = "C:/temp/"
siegfried = "http://localhost:5138/identify/[[PATH]]?format=json"
crawlok = "600h" # check files every 600 hours
crawlerror = "168h" # if there's an error, check all 168 hours minimum
crawlerrornew = "22h" # new errors should be checked the next day
ffmpeg = "/usr/local/bin/ffmpeg2"
ffprobe = "/usr/local/bin/ffprobe2"
bannerfolder = "c:/temp/banner"
cron = "42 * * * *" # cron format (https://pkg.go.dev/github.com/robfig/cron?tab=doc)
[crawler]
headersize = 5000 # number of bytes which are send to siegfried
pagesize = 500 # number of entries done by one database access
ok = "600h" # check files every 600 hours
error = "168h" # if there's an error, check all 168 hours minimum
errornew = "22h" # new errors should be checked the next day
cron = "42 * * * *" # cron format (https://pkg.go.dev/github.com/robfig/cron?tab=doc)
workers = 4
timeout = "5s"
[banner]
folder = "c:/temp/banner"
timeout = "30s"
timestamp = "00:00:12"
[[filemap]]
alias = "c"
......@@ -33,4 +40,5 @@ folder = "c:/temp"
#dsn = ""
# should be smaller than server connection timeout to allow controlled reconnect
connMaxTimeout = "4h"
schema = "test"
......@@ -4,28 +4,16 @@ import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"github.com/goph/emperror"
"github.com/op/go-logging"
"gitlab.switch.ch/memoriav/memobase-2020/streaming-server/memostream"
"os/exec"
"io"
"io/ioutil"
"mime"
"net/http"
"os"
"path/filepath"
"runtime"
"strings"
"time"
)
type Crawler struct {
db *sql.DB
log *logging.Logger
headerSize int
workers int
pageSize int
tempDir string
siegfried Siegfried
......@@ -35,21 +23,26 @@ type Crawler struct {
ffmpeg string
ffprobe string
bannerfolder string
bannertimeout time.Duration
mapping *memostream.FileMapper
cb *CrawlerBanner
cl *CrawlerLinkcheck
jobQueue *JobQueue
jobChannel chan *Job
}
func NewCrawler(
db *sql.DB,
headerSize, pageSize int,
workers int,
pageSize int,
tempDir, siegfried string,
crawlOK, crawlError, crawlErrorNew time.Duration,
ffmpeg, ffprobe string,
bannerfolder string,
mapping *memostream.FileMapper,
log *logging.Logger) *Crawler {
cr := &Crawler{
db: db,
headerSize: headerSize,
workers: workers,
pageSize: pageSize,
tempDir: tempDir,
siegfried: Siegfried{surl: siegfried},
......@@ -58,7 +51,6 @@ func NewCrawler(
crawlErrorNew: crawlErrorNew,
ffmpeg: ffmpeg,
ffprobe: ffprobe,
bannerfolder: bannerfolder,
mapping: mapping,
log: log,
}
......@@ -69,6 +61,14 @@ type Metadata struct {
SFMatches []SFMatches `json:"siegfried"'`
}
func (cr *Crawler) SetCrawlerBanner(cb *CrawlerBanner) {
cr.cb = cb
}
func (cr *Crawler) SetCrawlerLinkcheck(cl *CrawlerLinkcheck) {
cr.cl = cl
}
/*
load all entries from query in array
*/
......@@ -76,7 +76,7 @@ func (cr *Crawler) getEntries(sqlstr string, args ...interface{}) ([]*memostream
entries := []*memostream.MediaEntry{}
sqlstr += " LIMIT 0, ?"
args = append(args, cr.headerSize)
args = append(args, cr.pageSize)
rows, err := cr.db.Query(sqlstr, args...)
if err == sql.ErrNoRows { // dataset not found
......@@ -101,183 +101,10 @@ func (cr *Crawler) getEntries(sqlstr string, args ...interface{}) ([]*memostream
return entries, nil
}
func (cr *Crawler) getContentHeader(entry *memostream.MediaEntry) (buf []byte, mimetype string, err error) {
if entry.Protocol == memostream.Media_Proxy || entry.Protocol == memostream.Media_Redirect {
cr.log.Infof("loading header from %s", entry.URI.String())
// build range request. we do not want to load more than needed
req, err := http.NewRequest("GET", entry.URI.String(), nil)
if err != nil {
return nil, "", emperror.Wrapf(err, "error creating request for uri", entry.URI.String())
}
req.Header.Set("Range", fmt.Sprintf("bytes=0-%d", cr.headerSize-1))
var client http.Client
resp, err := client.Do(req)
if err != nil {
return nil, "", emperror.Wrapf(err, "error querying uri")
}
// default should follow redirects
defer resp.Body.Close()
// read head of content
buf = make([]byte, cr.headerSize)
num, err := io.ReadFull(resp.Body, buf)
if err != nil && err != io.ErrUnexpectedEOF {
return nil, "", emperror.Wrapf(err, "cannot read content from url %s", entry.URI.String())
}
if num == 0 {
return nil, "", errors.New(fmt.Sprintf("no content from url %s", entry.URI.String()))
}
// ************************************
// * get mimetype from response header
// ************************************
mimetype = resp.Header.Get("Content-type")
// try to get a clean mimetype
for _, v := range strings.Split(mimetype, ",") {
t, _, err := mime.ParseMediaType(v)
if err != nil {
continue
}
mimetype = t
break
}
} else if entry.Protocol == memostream.Media_File {
path, err := cr.mapping.Get(entry.URI)
if err != nil {
return nil, "", emperror.Wrapf(err, "cannot map uri %s of signature %s", entry.URI.String(), entry.Signature)
}
f, err := os.Open(path)
if err != nil {
return nil, "", emperror.Wrapf(err, "cannot open file %s for signature %s", path, entry.Signature)
}
buf = make([]byte, cr.headerSize)
if _, err := f.Read(buf); err != nil {
return nil, "", emperror.Wrapf(err, "cannot read from file %s for signature %s", path, entry.Signature)
}
} else {
return nil, "", errors.New(fmt.Sprintf("unknown access protocol %s for signature %s", entry.Protocol, entry.Signature))
}
return
}
func (cr *Crawler) getBanner(entry *memostream.MediaEntry) (string, error) {
var prg string
var params []string
var inputfilename string
var err error
if entry.Protocol == memostream.Media_File {
inputfilename, err = cr.mapping.Get(entry.URI)
if runtime.GOOS == "windows" {
inputfilename = strings.Replace(filepath.ToSlash(inputfilename), "c:", "/mnt/c", -1)
}
} else {
inputfilename = entry.URI.String()
}
if err != nil {
return "", emperror.Wrapf(err, "cannot get path for signature %s", entry.Signature)
}
outputfilename := filepath.Join(cr.bannerfolder, fmt.Sprintf("%s.png", entry.Signature))
// todo: bad hack for windows wsl...
if runtime.GOOS == "windows" {
prg = "wsl.exe"
params = append(params, cr.ffmpeg)
outputfilename = strings.Replace(filepath.ToSlash(outputfilename), "c:", "/mnt/c", -1)
} else {
prg = cr.ffmpeg
}
params = append(params,
"-ss", "00:00:12",
"-i", inputfilename,
"-vframes", "1",
"-q:v", "2",
outputfilename)
cmd := exec.Command(prg, params...)
stderr, err := cmd.StderrPipe()
if err != nil {
return "", emperror.Wrapf(err, "cannot get stderr output pipe")
}
if err := cmd.Run(); err != nil {
slurp, _ := ioutil.ReadAll(stderr)
return "", emperror.Wrapf(err, "cannot execute %s %s: %s", prg, strings.Join(params, " "), slurp)
}
return fmt.Sprintf("%s.png", entry.Signature), nil
}
/*
load 500 byte from an url and send it to siegfried
*/
func (cr *Crawler) check(entry *memostream.MediaEntry, siegfried bool, banner bool) ([]SFMatches, string, string, string, error) {
cr.log.Infof("checking %s", entry.Signature)
// ************************************
// * get the first bytes of data
// ************************************
buf, mimetype, err := cr.getContentHeader(entry)
if err != nil {
return nil, "", "", emperror.Wrapf(err, "cannot read content header").Error(), nil
}
// if there's no mimetype in response header try to detect
if mimetype == "" {
mimetype = http.DetectContentType(buf)
}
// ************************************
// * write data into file
// ************************************
// write buf to temp file
tmpfile, err := ioutil.TempFile(cr.tempDir, "siegfried")
if err != nil {
return nil, "", "", "", emperror.Wrapf(err, "cannot create tempfile")
}
defer os.Remove(tmpfile.Name()) // clean up
if _, err := tmpfile.Write(buf); err != nil {
return nil, "", "", "", emperror.Wrapf(err, "cannot write to tempfile %s", tmpfile.Name())
}
if err := tmpfile.Close(); err != nil {
return nil, "", "", "", emperror.Wrapf(err, "cannot close tempfile %s", tmpfile.Name())
}
// ************************************
// * ask siegfried for filetype
// ************************************
var sfMatches []SFMatches
if siegfried {
sfMatches, err = cr.siegfried.Get(tmpfile.Name())
if err != nil {
return nil, "", "", "", emperror.Wrapf(err, "cannot call siegfried for file %s", tmpfile.Name())
}
mrel := MimeRelevance(mimetype)
// set the mimetype if it's a better one...
for _, sfMatch := range sfMatches {
sfrel := MimeRelevance(sfMatch.Mime)
if sfrel > mrel {
mimetype = sfMatch.Mime
}
}
}
var bannerfile string
if banner {
bannerfile, err = cr.getBanner(entry)
if err != nil {
bannerfile = ""
cr.log.Errorf("cannot get banner: %v", err.Error())
}
}
return sfMatches, mimetype, bannerfile, "", nil
}
func (cr *Crawler) checkList(entries []*memostream.MediaEntry) error {
for _, entry := range entries {
sfMatches, mimetype, bannerfile, errMsg, err := cr.check(entry, true, true)
sfMatches, mimetype, bannerfile, errMsg, err := cr.cl.linkCheck(entry, true, true)
if err != nil {
return emperror.Wrapf(err, "error checking entry %s", entry.Signature)
}
......@@ -290,7 +117,7 @@ func (cr *Crawler) checkList(entries []*memostream.MediaEntry) error {
if entry.Status != memostream.Media_Error {
sqlstr += ", lastchange=NOW()"
}
// return emperror.Wrapf(err, "cannot check entry %s", entry.Signature)
// return emperror.Wrapf(err, "cannot linkCheck entry %s", entry.Signature)
} else {
statusStr = memostream.MediaStatusNum[memostream.Media_OK]
meta := &Metadata{SFMatches: sfMatches}
......@@ -340,8 +167,26 @@ func (cr *Crawler) CrawlNew() error {
if len(entries) == 0 {
break
}
for _, entry := range entries {
cr.jobQueue.AddBack(&Job{
ID: entry.Signature,
Type: JobType_Linkcheck,
cr: cr,
entry: entry,
})
}
/*
if err := cr.checkList(entries); err != nil {
return emperror.Wrapf(err, "cannot check result list")
return emperror.Wrapf(err, "cannot linkCheck result list")
}
*/
// wait until last worker ist done
for {
if cr.jobQueue.isIdle() {
break;
}
time.Sleep(1*time.Second)
}
}
return nil
......@@ -364,8 +209,22 @@ func (cr *Crawler) CrawlError() error {
if len(entries) == 0 {
break
}
if err := cr.checkList(entries); err != nil {
return emperror.Wrapf(err, "cannot check result list")
for _, entry := range entries {
cr.jobQueue.AddBack(&Job{
ID: entry.Signature,
Type: JobType_Linkcheck,
cr: cr,
entry: entry,
})
}
// wait until last worker ist done
for {
if cr.jobQueue.isIdle() {
break;
}
time.Sleep(1*time.Second)
}
}
return nil
......@@ -387,14 +246,38 @@ func (cr *Crawler) CrawlOK() error {
if len(entries) == 0 {
break
}
if err := cr.checkList(entries); err != nil {
return emperror.Wrapf(err, "cannot check result list")
for _, entry := range entries {
cr.jobQueue.AddBack(&Job{
ID: entry.Signature,
Type: JobType_Linkcheck,
cr: cr,
entry: entry,
})
}
// wait until last worker ist done
for {
if cr.jobQueue.isIdle() {
break;
}
time.Sleep(1*time.Second)
}
}
return nil
}
func (cr *Crawler) CrawlAll() error {
func (cr *Crawler) Start() error {
// setup jobs
cr.jobChannel = make(chan *Job)
cr.jobQueue = NewJobQueue(cr.jobChannel, cr.log)
cr.jobQueue.Start()
for i := 0; i < cr.workers; i++ {
w := NewWorker(i+1, cr, cr.jobQueue)
w.start()
}
cr.log.Infof("start crawling")
if err := cr.CrawlNew(); err != nil {
return emperror.Wrap(err, "error crawling new entities")
......
package memocrawler
import (
"context"
"fmt"
"github.com/goph/emperror"
"gitlab.switch.ch/memoriav/memobase-2020/streaming-server/memostream"
"io/ioutil"
"os/exec"
"path/filepath"
"runtime"
"strings"
"time"
)
type CrawlerBanner struct {
crawler *Crawler
timeout time.Duration
folder string
timestamp string
}
func NewCrawlerBanner(crawler *Crawler,
Timeout time.Duration,
Folder string,
Timestamp string,
) *CrawlerBanner {
return &CrawlerBanner{
crawler: crawler,
timeout: Timeout,
folder: Folder,
timestamp: Timestamp,
}
}
func (cb *CrawlerBanner) getBanner(entry *memostream.MediaEntry) (string, error) {
var prg string
var params []string
var inputfilename string
var err error
if entry.Protocol == memostream.Media_File {
inputfilename, err = cb.crawler.mapping.Get(entry.URI)
if runtime.GOOS == "windows" {
inputfilename = strings.Replace(filepath.ToSlash(inputfilename), "c:", "/mnt/c", -1)
}
} else {
inputfilename = entry.URI.String()
}
if err != nil {
return "", emperror.Wrapf(err, "cannot get path for signature %s", entry.Signature)
}
outputfilename := filepath.Join(cb.folder, fmt.Sprintf("%s.png", entry.Signature))
// todo: bad hack for windows wsl...
if runtime.GOOS == "windows" {
prg = "wsl.exe"
params = append(params, cb.crawler.ffmpeg)
outputfilename = strings.Replace(filepath.ToSlash(outputfilename), "c:", "/mnt/c", -1)
} else {
prg = cb.crawler.ffmpeg
}
params = append(params,
"-ss", "00:00:12",
"-i", inputfilename,
"-vframes", "1",
"-q:v", "2",
outputfilename)
ctx, cancel := context.WithTimeout(context.Background(), cb.timeout)
defer cancel() // The cancel should be deferred so resources are cleaned up
cmd := exec.CommandContext(ctx, prg, params...)
stderr, err := cmd.StderrPipe()
if err != nil {
return "", emperror.Wrapf(err, "cannot get stderr output pipe")
}
output, err := cmd.Output()
if err != nil {
slurp, _ := ioutil.ReadAll(stderr)