Commit d7c7e457 authored by Jürgen Enge's avatar Jürgen Enge

big changes

parent 577cab7e
......@@ -27,8 +27,18 @@ type FileMap struct {
Folder string
}
type Meta struct {
Timeout duration
Workers int
PageSize int
Cron string
}
type Banner struct {
Timeout duration
Workers int
PageSize int
Cron string
Folder string
Timestamp string
}
......@@ -42,7 +52,6 @@ type Crawler struct {
PageSize int
HeaderSize int
Workers int
Table string
}
type Config struct {
......@@ -52,12 +61,14 @@ type Config struct {
KeyPEM string
Addr string
JwtKey string
JwtAlg []string
DB CfgDBMySQL
TempDir string
Siegfried string
Ffmpeg string
Ffprobe string
Crawler Crawler
Metadata Meta
Banner Banner
FileMap []FileMap
}
......
......@@ -107,12 +107,19 @@ func main() {
config.Crawler.Workers,
config.Crawler.PageSize,
config.DB.Schema,
config.Crawler.Table,
config.TempDir,
config.Siegfried,
config.Crawler.OK.Duration,
config.Crawler.Error.Duration,
config.Crawler.ErrorNew.Duration,
config.Metadata.Timeout.Duration,
config.Metadata.Workers,
config.Metadata.PageSize,
config.Banner.Timeout.Duration,
config.Banner.Workers,
config.Banner.PageSize,
config.Banner.Timestamp,
config.Banner.Folder,
config.Ffmpeg,
config.Ffprobe,
fm,
......
......@@ -5,6 +5,7 @@ addr = "localhost:81"
certpem = "" # tls client certificate file in PEM format
keypem = "" # tls client key file in PEM format
jwtkey = "swordfish"
jwtalg = ["HS256", "HS384", "HS512"] # "hs256" "hs384" "hs512" "es256" "es384" "es512" "ps256" "ps384" "ps512"
tempdir = "C:/temp/"
siegfried = "http://localhost:5138/identify/[[PATH]]?format=json"
ffmpeg = "/usr/local/bin/ffmpeg2"
......@@ -19,12 +20,20 @@ ffprobe = "/usr/local/bin/ffprobe2"
cron = "42 * * * *" # cron format (https://pkg.go.dev/github.com/robfig/cron?tab=doc)
workers = 7
timeout = "5s"
table = "test2"
[metadata]
workers = 3
timeout = "30s"
pagesize = 400
cron = "32 * * * *" # cron format (https://pkg.go.dev/github.com/robfig/cron?tab=doc)
[banner]
workers = 2
folder = "c:/temp/banner"
timeout = "30s"
pagesize = 200
timestamp = "00:00:12"
cron = "32 * * * *" # cron format (https://pkg.go.dev/github.com/robfig/cron?tab=doc)
[[filemap]]
alias = "c"
......
// This file is part of Memobase Crawler which is released under GPLv3.
// See file license.txt for full license details.
//
// Author Juergen Enge <juergen@info-age.net>
//
// This code uses elements from
// * "Mediaserver" (Center for Digital Matter HGK FHNW, Basel)
// * "Remote Exhibition Project" (info-age GmbH, Basel)
//
package memocrawler
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/goph/emperror"
"github.com/op/go-logging"
"gitlab.switch.ch/memoriav/memobase-2020/streaming-server/memostream"
......@@ -11,26 +22,37 @@ import (
)
type Crawler struct {
db *sql.DB
log *logging.Logger
workers int
pageSize int
schema string
table string
tempDir string
siegfried Siegfried
crawlOK time.Duration
crawlError time.Duration
crawlErrorNew time.Duration
ffmpeg string
ffprobe string
bannerfolder string
bannertimeout time.Duration
mapping *memostream.FileMapper
cb *CrawlerBanner
cl *CrawlerLinkcheck
jobQueue *JobQueue
jobChannel chan *Job
db *sql.DB
log *logging.Logger
workers int
pageSize int
schema string
tempDir string
siegfried Siegfried
crawlOK time.Duration
crawlError time.Duration
crawlErrorNew time.Duration
metaTimeout time.Duration
metaWorkers int
metaPageSize int
bannerTimeout time.Duration
bannerWorkers int
bannerPageSize int
bannerTimestamp string
bannerFolder string
ffmpeg string
ffprobe string
bannerfolder string
bannertimeout time.Duration
mapping *memostream.FileMapper
cb *CrawlerBanner
cl *CrawlerLinkcheck
jobQueue *JobQueue
jobChannel chan *Job
metaJobQueue *JobQueue
metaJobChannel chan *Job
bannerJobQueue *JobQueue
bannerJobChannel chan *Job
}
func NewCrawler(
......@@ -38,27 +60,41 @@ func NewCrawler(
workers int,
pageSize int,
schema string,
table string,
tempDir, siegfried string,
crawlOK, crawlError, crawlErrorNew time.Duration,
metaTimeout time.Duration,
metaWorkers int,
metaPageSize int,
bannerTimeout time.Duration,
bannerWorkers int,
bannerPageSize int,
bannerTimestamp string,
bannerFolder string,
ffmpeg, ffprobe string,
mapping *memostream.FileMapper,
log *logging.Logger) *Crawler {
cr := &Crawler{
db: db,
workers: workers,
pageSize: pageSize,
schema: schema,
table: table,
tempDir: tempDir,
siegfried: Siegfried{surl: siegfried},
crawlOK: crawlOK,
crawlError: crawlError,
crawlErrorNew: crawlErrorNew,
ffmpeg: ffmpeg,
ffprobe: ffprobe,
mapping: mapping,
log: log,
db: db,
workers: workers,
pageSize: pageSize,
schema: schema,
tempDir: tempDir,
siegfried: Siegfried{surl: siegfried},
crawlOK: crawlOK,
crawlError: crawlError,
crawlErrorNew: crawlErrorNew,
metaTimeout: metaTimeout,
metaWorkers: metaWorkers,
metaPageSize: metaPageSize,
bannerTimeout: bannerTimeout,
bannerWorkers: bannerWorkers,
bannerPageSize: bannerPageSize,
bannerTimestamp: bannerTimestamp,
bannerFolder: bannerFolder,
ffmpeg: ffmpeg,
ffprobe: ffprobe,
mapping: mapping,
log: log,
}
return cr
}
......@@ -110,11 +146,11 @@ func (cr *Crawler) getEntries(sqlstr string, args ...interface{}) ([]*memostream
func (cr *Crawler) checkList(entries []*memostream.MediaEntry) error {
for _, entry := range entries {
sfMatches, mimetype, bannerfile, errMsg, err := cr.cl.linkCheck(entry, true, true)
sfMatches, mimetype, errMsg, err := cr.cl.linkCheck(entry, true)
if err != nil {
return emperror.Wrapf(err, "error checking entry %s", entry.Signature)
}
sqlstr := "UPDATE test.test2 SET lastcheck=NOW()"
sqlstr := fmt.Sprintf("UPDATE %s.entities SET lastcheck=NOW()", cr.schema)
var statusStr string
var params []interface{}
......@@ -141,12 +177,6 @@ func (cr *Crawler) checkList(entries []*memostream.MediaEntry) error {
if entry.Status != memostream.Media_OK {
sqlstr += ", lastchange=NOW()"
}
if bannerfile != "" {
if mimetype != "" {
sqlstr += ", banner=?"
params = append(params, bannerfile)
}
}
}
sqlstr += ", status=?, errormessage=? WHERE sig=?"
......@@ -158,13 +188,48 @@ func (cr *Crawler) checkList(entries []*memostream.MediaEntry) error {
return nil
}
func (cr *Crawler) MetaNew() error {
cr.log.Infof("start crawling metadata for new entities")
sqlstr := fmt.Sprintf("SELECT e.sig AS signature, e.uri, e.access, e.proto AS protocol, e.`status` "+
"FROM %s.entities e LEFT JOIN %s.metadata m ON e.sig=m.sig "+
"WHERE e.status=? AND m.modificationtime IS NULL "+
"ORDER BY e.creationtime ASC", cr.schema, cr.schema)
for {
entries, err := cr.getEntries(sqlstr, "ok")
if err != nil {
return emperror.Wrapf(err, "cannot get new entries")
}
if len(entries) == 0 {
break
}
for _, entry := range entries {
cr.metaJobQueue.AddBack(&Job{
ID: entry.Signature,
Type: JobType_Metadata,
cr: cr,
entry: entry,
})
}
// wait until last worker ist done
for {
if cr.metaJobQueue.isIdle() {
break
}
time.Sleep(1 * time.Second)
}
}
return nil
}
func (cr *Crawler) CrawlNew() error {
cr.log.Infof("start crawling new entities")
sqlstr := "SELECT sig AS signature, uri, access, proto AS protocol, `status` " +
"FROM test.test2 " +
"WHERE status='new' " +
"ORDER BY creationtime ASC"
sqlstr := fmt.Sprintf("SELECT sig AS signature, uri, access, proto AS protocol, `status` "+
"FROM %s.entities "+
"WHERE status='new' "+
"ORDER BY creationtime ASC", cr.schema)
for {
entries, err := cr.getEntries(sqlstr)
if err != nil {
......@@ -201,12 +266,12 @@ func (cr *Crawler) CrawlNew() error {
func (cr *Crawler) CrawlError() error {
cr.log.Infof("start crawling entities with errors")
sqlstr := "SELECT sig AS signature, uri, access, proto AS protocol, `status` " +
"FROM test.test2 " +
"WHERE status='error' " +
"AND lastcheck < ? " +
"AND (lastcheck < ? OR lastchange > ?)" +
"ORDER BY lastchange ASC"
sqlstr := fmt.Sprintf("SELECT sig AS signature, uri, access, proto AS protocol, `status` "+
"FROM %s.entities "+
"WHERE status='error' "+
"AND lastcheck < ? "+
"AND (lastcheck < ? OR lastchange > ?)"+
"ORDER BY lastchange ASC", cr.schema)
for {
entries, err := cr.getEntries(sqlstr, cr.crawlErrorNew, cr.crawlError, cr.crawlErrorNew)
if err != nil {
......@@ -239,11 +304,11 @@ func (cr *Crawler) CrawlError() error {
func (cr *Crawler) CrawlOK() error {
cr.log.Infof("start crawling entities without errors")
sqlstr := "SELECT sig AS signature, uri, access, proto AS protocol, `status` " +
"FROM test.test2 " +
"WHERE status='ok' " +
"AND lastcheck < ? " +
"ORDER BY lastcheck ASC"
sqlstr := fmt.Sprintf("SELECT sig AS signature, uri, access, proto AS protocol, `status` "+
"FROM %s.entities "+
"WHERE status='ok' "+
"AND lastcheck < ? "+
"ORDER BY lastcheck ASC", cr.schema)
for {
entries, err := cr.getEntries(sqlstr, cr.crawlOK)
if err != nil {
......
// This file is part of Memobase Crawler which is released under GPLv3.
// See file license.txt for full license details.
//
// Author Juergen Enge <juergen@info-age.net>
//
// This code uses elements from
// * "Mediaserver" (Center for Digital Matter HGK FHNW, Basel)
// * "Remote Exhibition Project" (info-age GmbH, Basel)
//
package memocrawler
import (
......
package memocrawler
type CrawlerJob interface {
}
// This file is part of Memobase Crawler which is released under GPLv3.
// See file license.txt for full license details.
//
// Author Juergen Enge <juergen@info-age.net>
//
// This code uses elements from
// * "Mediaserver" (Center for Digital Matter HGK FHNW, Basel)
// * "Remote Exhibition Project" (info-age GmbH, Basel)
//
package memocrawler
import (
......@@ -94,7 +104,7 @@ func (cl *CrawlerLinkcheck) getContentHeader(entry *memostream.MediaEntry) (buf
/*
load 500 byte from an url and send it to siegfried
*/
func (cl *CrawlerLinkcheck) linkCheck(entry *memostream.MediaEntry, siegfried bool, banner bool) ([]SFMatches, string, string, string, error) {
func (cl *CrawlerLinkcheck) linkCheck(entry *memostream.MediaEntry, siegfried bool) ([]SFMatches, string, string, error) {
cl.crawler.log.Infof("checking %s", entry.Signature)
// ************************************
......@@ -102,7 +112,7 @@ func (cl *CrawlerLinkcheck) linkCheck(entry *memostream.MediaEntry, siegfried bo
// ************************************
buf, mimetype, err := cl.getContentHeader(entry)
if err != nil {
return nil, "", "", emperror.Wrapf(err, "cannot read content header").Error(), nil
return nil, "", emperror.Wrapf(err, "cannot read content header").Error(), nil
}
// if there's no mimetype in response header try to detect
......@@ -116,15 +126,15 @@ func (cl *CrawlerLinkcheck) linkCheck(entry *memostream.MediaEntry, siegfried bo
// write buf to temp file
tmpfile, err := ioutil.TempFile(cl.crawler.tempDir, "siegfried")
if err != nil {
return nil, "", "", "", emperror.Wrapf(err, "cannot create tempfile")
return nil, "", "", emperror.Wrapf(err, "cannot create tempfile")
}
defer os.Remove(tmpfile.Name()) // clean up
if _, err := tmpfile.Write(buf); err != nil {
return nil, "", "", "", emperror.Wrapf(err, "cannot write to tempfile %s", tmpfile.Name())
return nil, "", "", emperror.Wrapf(err, "cannot write to tempfile %s", tmpfile.Name())
}
if err := tmpfile.Close(); err != nil {
return nil, "", "", "", emperror.Wrapf(err, "cannot close tempfile %s", tmpfile.Name())
return nil, "", "", emperror.Wrapf(err, "cannot close tempfile %s", tmpfile.Name())
}
// ************************************
......@@ -134,7 +144,7 @@ func (cl *CrawlerLinkcheck) linkCheck(entry *memostream.MediaEntry, siegfried bo
if siegfried {
sfMatches, err = cl.crawler.siegfried.Get(tmpfile.Name())
if err != nil {
return nil, "", "", "", emperror.Wrapf(err, "cannot call siegfried for file %s", tmpfile.Name())
return nil, "", "", emperror.Wrapf(err, "cannot call siegfried for file %s", tmpfile.Name())
}
mrel := MimeRelevance(mimetype)
// set the mimetype if it's a better one...
......@@ -145,14 +155,6 @@ func (cl *CrawlerLinkcheck) linkCheck(entry *memostream.MediaEntry, siegfried bo
}
}
}
var bannerfile string
if banner {
bannerfile, err = cl.crawler.cb.getBanner(entry)
if err != nil {
bannerfile = ""
cl.crawler.log.Errorf("cannot get banner: %v", err.Error())
}
}
return sfMatches, mimetype, bannerfile, "", nil
return sfMatches, mimetype, "", nil
}
// This file is part of Memobase Crawler which is released under GPLv3.
// See file license.txt for full license details.
//
// Author Juergen Enge <juergen@info-age.net>
//
// This code uses elements from
// * "Mediaserver" (Center for Digital Matter HGK FHNW, Basel)
// * "Remote Exhibition Project" (info-age GmbH, Basel)
//
package memocrawler
import (
......
// This file is part of Memobase Crawler which is released under GPLv3.
// See file license.txt for full license details.
//
// Author Juergen Enge <juergen@info-age.net>
//
// This code uses elements from
// * "Mediaserver" (Center for Digital Matter HGK FHNW, Basel)
// * "Remote Exhibition Project" (info-age GmbH, Basel)
//
package memocrawler
import (
......@@ -13,6 +23,7 @@ type JobType int
const (
JobType_Linkcheck = 0
JobType_Banner = 1
JobType_Metadata = 2
)
type Job struct {
......
// This file is part of Memobase Crawler which is released under GPLv3.
// See file license.txt for full license details.
//
// Author Juergen Enge <juergen@info-age.net>
//
// This code uses elements from
// * "Mediaserver" (Center for Digital Matter HGK FHNW, Basel)
// * "Remote Exhibition Project" (info-age GmbH, Basel)
//
package memocrawler
import (
......
// This file is part of Memobase Crawler which is released under GPLv3.
// See file license.txt for full license details.
//
// Author Juergen Enge <juergen@info-age.net>
//
// This code uses elements from
// * "Mediaserver" (Center for Digital Matter HGK FHNW, Basel)
// * "Remote Exhibition Project" (info-age GmbH, Basel)
//
package memocrawler
import (
......@@ -43,7 +53,7 @@ func (w Worker) Do(job *Job) error {
switch job.Type {
case JobType_Linkcheck:
if err := w.linkcheck(job.entry); err != nil {
w.cr.log.Errorf("Worker%02d: error checking link for job #%v", w.id, job.ID)
w.cr.log.Errorf("Worker%02d: error checking link for job #%v: %v", w.id, job.ID, err)
}
case JobType_Banner:
default:
......@@ -54,11 +64,11 @@ func (w Worker) Do(job *Job) error {
}
func (w Worker) linkcheck( entry *memostream.MediaEntry ) error {
sfMatches, mimetype, bannerfile, errMsg, err := w.cr.cl.linkCheck(entry, true, false)
sfMatches, mimetype, errMsg, err := w.cr.cl.linkCheck(entry, true)
if err != nil {
return emperror.Wrapf(err, "error checking entry %s", entry.Signature)
}
sqlstr := fmt.Sprintf("UPDATE %s.%s SET lastcheck=NOW()", w.cr.schema, w.cr.table)
sqlstr := fmt.Sprintf("UPDATE %s.entities SET lastcheck=NOW()", w.cr.schema)
var statusStr string
var params []interface{}
......@@ -75,7 +85,7 @@ func (w Worker) linkcheck( entry *memostream.MediaEntry ) error {
if err != nil {
return emperror.Wrapf(err, "cannot marshal %v", meta)
}
sqlstr += ", metadata=?"
sqlstr += ", siegfried=?"
params = append(params, string(metajson))
if mimetype != "" {
......@@ -85,12 +95,6 @@ func (w Worker) linkcheck( entry *memostream.MediaEntry ) error {
if entry.Status != memostream.Media_OK {
sqlstr += ", lastchange=NOW()"
}
if bannerfile != "" {
if mimetype != "" {
sqlstr += ", banner=?"
params = append(params, bannerfile)
}
}
}
sqlstr += ", status=?, errormessage=? WHERE sig=?"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment