Commit 011290ab authored by Juergen Enge's avatar Juergen Enge

simplification of code

parent f8348a0e
Pipeline #17205 failed with stages
in 36 seconds
......@@ -3,9 +3,7 @@ package main
import (
"database/sql"
"flag"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/op/go-logging"
"gitlab.switch.ch/memoriav/memobase-2020/services/streaming-server/pkg/memostream"
"gitlab.switch.ch/memoriav/memobase-2020/services/url-checker/pkg/memocrawler"
"log"
......@@ -19,31 +17,6 @@ import (
clear database: update test.`entities` set status="new", errormessage=null,mimetype=null,lastcheck=null,lastchange=null WHERE sig <> "xxx"
*/
type cronLogger struct {
log *logging.Logger
}
// Info logs routine messages about cron's operation.
func (cl cronLogger) Info(msg string, keysAndValues ...interface{}) {
str := msg
var name string
for _, val := range keysAndValues {
if name == "" {
name, _ = val.(string)
} else {
str += fmt.Sprintf("\n %v: %v", name, val)
name = ""
}
}
cl.log.Info(str)
}
// Error logs an error condition.
func (cl cronLogger) Error(err error, msg string, keysAndValues ...interface{}) {
keysAndValues = append(keysAndValues, err)
cl.log.Errorf(msg+": %v", keysAndValues...)
}
func main() {
configFile := flag.String("cfg", "./memocrawler.toml", "config file location")
flag.Parse()
......@@ -68,7 +41,7 @@ func main() {
config = LoadConfig(*configFile)
// create logger instance
log, lf := memostream.CreateLogger("memostream", config.Logfile, config.Loglevel)
log, lf := memostream.CreateLogger("urlchecker", config.Logfile, config.Loglevel)
defer lf.Close()
db, err := sql.Open("mysql", config.DB.Dsn)
......@@ -95,7 +68,7 @@ func main() {
for _, val := range config.FileMap {
mapping[strings.ToLower(val.Alias)] = val.Folder
}
fm := memostream.NewFileMapper(mapping)
fm := memostream.NewFilesystemDisk(mapping)
cr := memocrawler.NewCrawler(
db,
......@@ -109,13 +82,13 @@ func main() {
config.Metadata.Timeout.Duration,
config.Metadata.Workers,
config.Metadata.PageSize,
config.Crawler.HeaderSize,
config.Indexer,
fm,
log)
cl := memocrawler.NewCrawlerLinkcheck(cr, config.Crawler.Timeout.Duration, config.Crawler.HeaderSize)
cr.SetCrawlerLinkcheck(cl)
cr.Start()
if err := cr.Start(); err != nil {
log.Errorf("error crawling: %v", err)
}
}
......@@ -10,13 +10,13 @@ tempdir = "C:/temp/"
indexer = "http://localhost:81"
[crawler]
headersize = 5000 # number of bytes which are send to siegfried
headersize = 512 # number of bytes which are read from file
pagesize = 1000 # number of entries done by one database access
ok = "600h" # check files every 600 hours
error = "168h" # if there's an error, check all 168 hours minimum
errornew = "2h" # new errors should be checked the next day
cron = "" # "42 * * * *" # cron format (https://pkg.go.dev/github.com/robfig/cron?tab=doc) starts directly one crawl if empty
workers = 1
workers = 5
timeout = "5s"
[metadata]
......@@ -36,11 +36,12 @@ folder = "c:/temp"
[DB]
#if dsn is empty, the static resolver will be used
#[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
#dsn = "test:HDNQiaqNqu4IjmUPATJr@tcp(localhost:3306)/test"
dsn = "urlchecker:siex5ieNguuQuei@tcp(localhost:4306)/medienserver"
dsn = "test:HDNQiaqNqu4IjmUPATJr@tcp(localhost:3306)/test"
#dsn = "urlchecker:siex5ieNguuQuei@tcp(localhost:4306)/medienserver"
#dsn = ""
# should be smaller than server connection timeout to allow controlled reconnect
connMaxTimeout = "4h"
#schema = "test"
schema = "medienserver"
chema = "test"
#schema = "medienserver"
......@@ -35,14 +35,14 @@ type Crawler struct {
indexer string
bannerfolder string
bannertimeout time.Duration
mapping *memostream.FileMapper
cl *CrawlerLinkcheck
mapping memostream.Filesystem
jobQueue *JobQueue
jobChannel chan *Job
metaJobQueue *JobQueue
metaJobChannel chan *Job
bannerJobQueue *JobQueue
bannerJobChannel chan *Job
headerSize int
}
func NewCrawler(
......@@ -55,8 +55,9 @@ func NewCrawler(
metaTimeout time.Duration,
metaWorkers int,
metaPageSize int,
headerSize int,
indexer string,
mapping *memostream.FileMapper,
mapping memostream.Filesystem,
log *logging.Logger) *Crawler {
cr := &Crawler{
db: db,
......@@ -71,16 +72,13 @@ func NewCrawler(
metaTimeout: metaTimeout,
metaWorkers: metaWorkers,
metaPageSize: metaPageSize,
headerSize: headerSize,
mapping: mapping,
log: log,
}
return cr
}
func (cr *Crawler) SetCrawlerLinkcheck(cl *CrawlerLinkcheck) {
cr.cl = cl
}
/*
load all entries from query in array
*/
......@@ -104,7 +102,7 @@ func (cr *Crawler) getEntries(sqlstr string, args ...interface{}) ([]*memostream
if err := rows.Scan(&signature, &uri, &access, &protocol, &status); err != nil {
return nil, emperror.Wrapf(err, "cannot scan values")
}
me, err := memostream.NewMediaEntry(signature, uri, access, protocol, status)
me, err := memostream.NewMediaEntry(signature, uri, access, protocol, status, "", "", 0, 0, 0, "", "")
if err != nil {
return nil, emperror.Wrapf(err, "cannot create MediaEntry")
}
......@@ -131,7 +129,6 @@ func (cr *Crawler) CrawlNew() error {
for _, entry := range entries {
cr.jobQueue.AddBack(&Job{
ID: entry.Signature,
Type: JobType_Linkcheck,
cr: cr,
entry: entry,
})
......@@ -157,14 +154,12 @@ func (cr *Crawler) CrawlError() error {
cr.log.Infof("start crawling entities with errors")
sqlstr := fmt.Sprintf("SELECT sig AS signature, uri, access, proto AS protocol, `status` "+
"FROM %s.entities "+
"WHERE status='error' "+
"AND lastcheck < TIMESTAMPADD(SECOND, ?, NOW()) "+
"AND (lastcheck < TIMESTAMPADD(SECOND, ?, NOW()) OR lastchange < TIMESTAMPADD(SECOND, ?, NOW())) "+
"AND 1=0 "+
"ORDER BY lastchange ASC", cr.schema)
" FROM %s.entities"+
" WHERE status='error'"+
" AND lastcheck < TIMESTAMPADD(SECOND, ?, NOW())"+
" ORDER BY lastchange ASC", cr.schema)
for {
entries, err := cr.getEntries(sqlstr, int64(cr.crawlErrorNew/time.Second), int64(cr.crawlError/time.Second), int64(cr.crawlErrorNew/time.Second))
entries, err := cr.getEntries(sqlstr, int64(cr.crawlErrorNew/time.Second))
if err != nil {
return emperror.Wrapf(err, "cannot get new entries")
}
......@@ -175,7 +170,6 @@ func (cr *Crawler) CrawlError() error {
for _, entry := range entries {
cr.jobQueue.AddBack(&Job{
ID: entry.Signature,
Type: JobType_Linkcheck,
cr: cr,
entry: entry,
})
......@@ -195,24 +189,31 @@ func (cr *Crawler) CrawlError() error {
func (cr *Crawler) CrawlOK() error {
cr.log.Infof("start crawling entities without errors")
var lastFirst *memostream.MediaEntry
sqlstr := fmt.Sprintf("SELECT sig AS signature, uri, access, proto AS protocol, `status` "+
"FROM %s.entities "+
"WHERE status='ok' "+
"AND lastcheck < ? "+
"AND lastcheck >= TIMESTAMPADD(SECOND, ?, NOW())"+
"ORDER BY lastcheck ASC", cr.schema)
for {
entries, err := cr.getEntries(sqlstr, cr.crawlOK)
entries, err := cr.getEntries(sqlstr, int64(cr.crawlOK/time.Second))
if err != nil {
return emperror.Wrapf(err, "cannot get new entries")
}
if len(entries) == 0 {
break
}
first := entries[0]
if lastFirst != nil {
if first.Signature == lastFirst.Signature {
return fmt.Errorf("runthrough has no effect for signature %s", lastFirst.Signature)
}
}
lastFirst = first
for _, entry := range entries {
cr.jobQueue.AddBack(&Job{
ID: entry.Signature,
Type: JobType_Linkcheck,
cr: cr,
entry: entry,
})
......
package memocrawler
type CrawlerJob interface {
}
// This file is part of Memobase Crawler which is released under GPLv3.
// See file license.txt for full license details.
//
// Author Juergen Enge <juergen@info-age.net>
//
// This code uses elements from
// * "Mediaserver" (Center for Digital Matter HGK FHNW, Basel)
// * "Remote Exhibition Project" (info-age GmbH, Basel)
//
package memocrawler
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/goph/emperror"
"gitlab.switch.ch/memoriav/memobase-2020/services/streaming-server/pkg/memostream"
"io"
"io/ioutil"
"mime"
"net/http"
"os"
"path/filepath"
"strings"
"time"
)
type CrawlerLinkcheck struct {
crawler *Crawler
timeout time.Duration
headerSize int
}
func NewCrawlerLinkcheck(crawler *Crawler, Timeout time.Duration, HeaderSize int) *CrawlerLinkcheck {
return &CrawlerLinkcheck{
crawler: crawler,
timeout: Timeout,
headerSize: HeaderSize,
}
}
func (cl *CrawlerLinkcheck) getContentHeader(entry *memostream.MediaEntry) (buf []byte, mimetype string, err error) {
if entry.Protocol == memostream.Media_Proxy || entry.Protocol == memostream.Media_Redirect {
cl.crawler.log.Infof("loading header from %s", entry.URI.String())
ctx, cancel := context.WithTimeout(context.Background(), cl.timeout)
defer cancel() // The cancel should be deferred so resources are cleaned up
// build range request. we do not want to load more than needed
req, err := http.NewRequestWithContext(ctx, "GET", entry.URI.String(), nil)
if err != nil {
return nil, "", emperror.Wrapf(err, "error creating request for uri", entry.URI.String())
}
req.Header.Set("Range", fmt.Sprintf("bytes=0-%d", cl.headerSize-1))
var client http.Client
resp, err := client.Do(req)
if err != nil {
return nil, "", emperror.Wrapf(err, "error querying uri")
}
// default should follow redirects
defer resp.Body.Close()
// read head of content
buf = make([]byte, cl.headerSize)
num, err := io.ReadFull(resp.Body, buf)
if err != nil && err != io.ErrUnexpectedEOF {
return nil, "", emperror.Wrapf(err, "cannot read content from url %s", entry.URI.String())
}
if num == 0 {
return nil, "", errors.New(fmt.Sprintf("no content from url %s", entry.URI.String()))
}
// ************************************
// * get mimetype from response header
// ************************************
mimetype = resp.Header.Get("Content-type")
// try to get a clean mimetype
for _, v := range strings.Split(mimetype, ",") {
t, _, err := mime.ParseMediaType(v)
if err != nil {
continue
}
mimetype = t
break
}
} else if entry.Protocol == memostream.Media_File {
path, err := cl.crawler.mapping.Get(entry.URI)
if err != nil {
return nil, "", emperror.Wrapf(err, "cannot map uri %s of signature %s", entry.URI.String(), entry.Signature)
}
f, err := os.Open(path)
if err != nil {
return nil, "", emperror.Wrapf(err, "cannot open file %s for signature %s", path, entry.Signature)
}
buf = make([]byte, cl.headerSize)
if _, err := f.Read(buf); err != nil {
return nil, "", emperror.Wrapf(err, "cannot read from file %s for signature %s", path, entry.Signature)
}
} else {
return nil, "", errors.New(fmt.Sprintf("unknown access protocol %s for signature %s", entry.Protocol, entry.Signature))
}
return
}
/*
load 500 byte from an url and send it to siegfried
*/
func (cl *CrawlerLinkcheck) linkCheck(entry *memostream.MediaEntry, siegfried bool) (
result map[string]interface{},
mimetype string,
width int64,
height int64,
duration int64,
err error) {
cl.crawler.log.Infof("checking %s", entry.Signature)
url := entry.URI.String()
if entry.URI.Scheme == "file" {
filename, err := cl.crawler.mapping.Get(entry.URI)
if err != nil {
return nil, "", 0, 0, 0, emperror.Wrapf(err, "cannot map uri to filename - %v", entry.URI.String())
}
url = fmt.Sprintf("file:///%s", filepath.ToSlash(filename))
}
jsonstr, err := json.Marshal(map[string]string{"url": url})
if err != nil {
return nil, "", 0, 0, 0, emperror.Wrapf(err, "cannot marshal json")
}
resp, err := http.Post(cl.crawler.indexer, "application/json", bytes.NewBuffer(jsonstr))
if err != nil {
return nil, "", 0, 0,0 , emperror.Wrapf(err, "error calling call indexer")
}
defer resp.Body.Close()
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, "", 0, 0,0 , emperror.Wrapf(err, "error reading indexer result")
}
if err := json.Unmarshal(bodyBytes, &result); err != nil {
return nil, "", 0, 0, 0, emperror.Wrapf(err, "cannot unmarshal result")
}
mimetype, _ = result["mimetype"].(string)
_width, _ := result["width"].(float64)
width = int64(_width)
_height, _ := result["height"].(float64)
height = int64(_height)
_duration, _ := result["duration"].(float64)
duration = int64(_duration)
return
/*
// ************************************
// * get the first bytes of data
// ************************************
buf, mimetype, err := cl.getContentHeader(entry)
if err != nil {
return nil, "", emperror.Wrapf(err, "cannot read content header").Error(), nil
}
// if there's no mimetype in response header try to detect
if mimetype == "" {
mimetype = http.DetectContentType(buf)
}
// ************************************
// * write data into file
// ************************************
// write buf to temp file
tmpfile, err := ioutil.TempFile(cl.crawler.tempDir, "siegfried")
if err != nil {
return nil, "", "", emperror.Wrapf(err, "cannot create tempfile")
}
defer os.Remove(tmpfile.Name()) // clean up
if _, err := tmpfile.Write(buf); err != nil {
return nil, "", "", emperror.Wrapf(err, "cannot write to tempfile %s", tmpfile.Name())
}
if err := tmpfile.Close(); err != nil {
return nil, "", "", emperror.Wrapf(err, "cannot close tempfile %s", tmpfile.Name())
}
// ************************************
// * ask siegfried for filetype
// ************************************
var sfMatches []SFMatches
if siegfried {
sfMatches, err = cl.crawler.siegfried.Get(tmpfile.Name())
if err != nil {
return nil, "", "", emperror.Wrapf(err, "cannot call siegfried for file %s", tmpfile.Name())
}
mrel := MimeRelevance(mimetype)
// set the mimetype if it's a better one...
for _, sfMatch := range sfMatches {
sfrel := MimeRelevance(sfMatch.Mime)
if sfrel > mrel {
mimetype = sfMatch.Mime
}
}
}
return sfMatches, mimetype, "", nil
*/
}
......@@ -18,17 +18,8 @@ import (
"time"
)
type JobType int
const (
JobType_Linkcheck = 0
JobType_Banner = 1
JobType_Metadata = 2
)
type Job struct {
ID string
Type JobType
cr *Crawler
entry *memostream.MediaEntry
}
......
......@@ -11,10 +11,14 @@
package memocrawler
import (
"encoding/json"
"bytes"
"context"
"fmt"
"github.com/goph/emperror"
"gitlab.switch.ch/memoriav/memobase-2020/services/streaming-server/pkg/memostream"
"io"
"net/http"
"time"
)
type Worker struct {
......@@ -23,6 +27,9 @@ type Worker struct {
jobQueue *JobQueue
}
//var w.cr.headerSize int64 = 512
var timeout time.Duration = 5 * time.Second
// NewWorker creates takes a numeric id and a channel w/ worker pool.
func NewWorker(id int, cr *Crawler, jobQueue *JobQueue) Worker {
return Worker{
......@@ -32,14 +39,61 @@ func NewWorker(id int, cr *Crawler, jobQueue *JobQueue) Worker {
}
}
func (w Worker) checkFile(entry *memostream.MediaEntry) error {
rsc, _, err := w.cr.mapping.Open(entry.URI)
if err != nil {
return emperror.Wrapf(err, "cannot open file")
}
defer rsc.Close()
buf := make([]byte, 0, w.cr.headerSize)
bbuf := bytes.NewBuffer(buf)
written, err := io.CopyN(bbuf, rsc, int64(w.cr.headerSize))
if err != nil {
return emperror.Wrapf(err, "cannot read from %s", entry.URI.String())
}
if written != int64(w.cr.headerSize) {
return fmt.Errorf("could not read %v bytes from %s - %v bytes read", w.cr.headerSize, entry.URI.String(), written)
}
return nil
}
func (w Worker) checkURL(entry *memostream.MediaEntry) error {
client := &http.Client{}
ctx, _ := context.WithTimeout(context.Background(), timeout)
req, err := http.NewRequestWithContext(ctx, "GET", entry.URI.String(), nil)
if err != nil {
return emperror.Wrapf(err, "cannot create GET request for %s", entry.URI.String())
}
// we'll try a range request
req.Header.Set("Range", fmt.Sprintf("bytes=0-%v", w.cr.headerSize-1))
resp, err := client.Do(req)
if err != nil {
return emperror.Wrapf(err, "cannot execute GET request for %s", entry.URI.String())
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return fmt.Errorf("response status %s", resp.Status)
}
buf := make([]byte, 0, w.cr.headerSize)
bbuf := bytes.NewBuffer(buf)
written, err := io.CopyN(bbuf, resp.Body, int64(w.cr.headerSize))
if err != nil {
return emperror.Wrapf(err, "cannot read from %s", entry.URI.String())
}
if written != int64(w.cr.headerSize) {
return fmt.Errorf("could not read %v bytes from %s - %v bytes read", w.cr.headerSize, entry.URI.String(), written)
}
return nil
}
func (w Worker) start() {
w.cr.log.Infof("starting Worker #%v", w.id)
w.cr.log.Infof("#%03d starting Worker ", w.id)
go func() {
for job := range w.jobQueue.GetChannel() {
w.jobQueue.addWorkerCounter(1)
w.cr.log.Infof("Worker%02d startet job %v", w.id, job.GetID())
w.cr.log.Debugf("#%03d [%v] starting job", w.id, job.GetID())
w.Do(job)
w.cr.log.Infof("Worker%02d completed job %v", w.id, job.GetID())
w.cr.log.Debugf("#%03d [%v] completed job", w.id, job.GetID())
w.jobQueue.addWorkerCounter(-1)
}
}()
......@@ -49,109 +103,51 @@ func (w Worker) stop() {
//close(w.jobInput)
}
func (w Worker) Do(job *Job) error {
switch job.Type {
case JobType_Linkcheck:
if err := w.linkcheck(job.entry); err != nil {
w.cr.log.Errorf("Worker%02d: error checking link for job #%v: %v", w.id, job.ID, err)
}
case JobType_Banner:
func (w Worker) Do(job *Job) (err error) {
w.cr.log.Debugf("#%03d [%s] checking %s - %s", w.id, job.entry.Signature, memostream.MediaProtocolNum[job.entry.Protocol], job.entry.URI.String())
switch job.entry.Protocol {
case memostream.Media_File:
err = w.checkFile(job.entry)
default:
w.cr.log.Errorf("invalid Jobtype %s for job #%v", job.Type, job.ID)
}
return nil
}
func (w Worker) linkcheck( entry *memostream.MediaEntry ) error {
metadata, mimetype, width, height, duration, err := w.cr.cl.linkCheck(entry, true)
if err != nil {
err := emperror.Wrapf(err, "error checking entry %s", entry.Signature)
sqlstr := fmt.Sprintf("UPDATE %s.entities SET lastcheck=NOW(), status=?, errormessage=? WHERE sig=?", w.cr.schema)
params := []interface{}{"error", err.Error(), entry.Signature}
if _, err := w.cr.db.Exec(sqlstr, params...); err != nil {
return emperror.Wrapf(err, "error executing sql %s [%v]", sqlstr, params)
}
return err
}
if mimetype == "" {
err := fmt.Errorf("media not accessible: %v", metadata["errors"])
sqlstr := fmt.Sprintf("UPDATE %s.entities SET lastcheck=NOW(), status=?, errormessage=? WHERE sig=?", w.cr.schema)
params := []interface{}{"error", err.Error(), entry.Signature}
if _, err := w.cr.db.Exec(sqlstr, params...); err != nil {
return emperror.Wrapf(err, "error executing sql %s [%v]", sqlstr, params)
}
return err
}
metajson, err := json.Marshal(metadata)
if err != nil {
err := emperror.Wrapf(err, "cannot marshal metadata %v", metadata)
sqlstr := fmt.Sprintf("UPDATE %s.entities SET lastcheck=NOW(), status=?, errormessage=? WHERE sig=?", w.cr.schema)
params := []interface{}{"error", err.Error(), entry.Signature}
if _, err := w.cr.db.Exec(sqlstr, params...); err != nil {
return emperror.Wrapf(err, "error executing sql %s [%v]", sqlstr, params)
}
return