Commit e0b2dd42 authored by Jürgen Enge's avatar Jürgen Enge

crona added

parent db077db5
......@@ -21,18 +21,31 @@ type CfgDBMySQL struct {
ConnMaxTimeout duration
}
type FileMap struct {
Alias string
Folder string
}
type Config struct {
Logfile string
Loglevel string
CertPEM string
KeyPEM string
Addr string
JwtKey string
DB CfgDBMySQL
PageSize int
HeaderSize int
TempDir string
Siegfried string
Logfile string
Loglevel string
CertPEM string
KeyPEM string
Addr string
JwtKey string
DB CfgDBMySQL
PageSize int
HeaderSize int
TempDir string
Siegfried string
CrawlOK duration
CrawlError duration
CrawlErrorNew duration
Cron string
Ffmpeg string
Ffprobe string
BannerFolder string
FileMap []FileMap
}
func LoadConfig(filepath string) Config {
......
......@@ -4,12 +4,16 @@ import (
"context"
"database/sql"
"flag"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/op/go-logging"
"github.com/robfig/cron"
"gitlab.switch.ch/memoriav/memobase-2020/url-checker/memocrawler"
"log"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
)
......@@ -18,6 +22,32 @@ import (
clear database: update `test2` set status="new", metadata=null,errormessage=null,mimetype=null,lastcheck=null,lastchange=null
*/
type cronLogger struct {
log *logging.Logger
}
// Info logs routine messages about cron's operation.
func (cl cronLogger) Info(msg string, keysAndValues ...interface{}) {
str := msg
var name string
for _, val := range keysAndValues {
if name == "" {
name,_ = val.(string)
} else {
str += fmt.Sprintf("\n %v: %v", name, val)
name = ""
}
}
cl.log.Info(str)
}
// Error logs an error condition.
func (cl cronLogger) Error(err error, msg string, keysAndValues ...interface{}) {
keysAndValues = append(keysAndValues, err)
cl.log.Errorf(msg+": %v", keysAndValues...)
}
func main() {
configFile := flag.String("cfg", "./memocrawler.toml", "config file location")
flag.Parse()
......@@ -65,10 +95,35 @@ func main() {
}
defer db.Close()
cr := memocrawler.NewCrawler(db, config.HeaderSize, config.PageSize, config.TempDir, config.Siegfried)
// go cr.Runner()
cr.CrawlNew()
return
mapping := map[string]string{}
for _, val := range config.FileMap {
mapping[strings.ToLower(val.Alias)] = val.Folder
}
fm := memocrawler.NewFileMapper(mapping)
cr := memocrawler.NewCrawler(
db,
config.HeaderSize,
config.PageSize,
config.TempDir,
config.Siegfried,
config.CrawlOK.Duration,
config.CrawlError.Duration,
config.CrawlErrorNew.Duration,
config.Ffmpeg,
config.Ffprobe,
config.BannerFolder,
fm,
log)
c := cron.New(cron.WithLogger(cronLogger{log:log}))
c.AddFunc(config.Cron, func() {
if err := cr.CrawlAll(); err != nil {
log.Errorf( "crawl error: %v", err)
}
})
c.Start()
end := make(chan bool, 1)
......@@ -89,6 +144,7 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
c.Stop()
cr.Shutdown(ctx)
end <- true
......
......@@ -9,6 +9,22 @@ headersize = 5000 # number of bytes which are send to siegfried
pagesize = 100 # number of entries done by one database access
tempdir = "C:/temp/"
siegfried = "http://localhost:5138/identify/[[PATH]]?format=json"
crawlok = "600h" # check files every 600 hours
crawlerror = "168h" # if there's an error, check all 168 hours minimum
crawlerrornew = "22h" # new errors should be checked the next day
ffmpeg = "/usr/local/bin/ffmpeg"
ffprobe = "/usr/local/bin/ffprobe"
bannerfolder = "c:/temp/banner"
cron = "10 * * * *" # cron format (https://pkg.go.dev/github.com/robfig/cron?tab=doc)
[[filemap]]
alias = "main"
folder = "c:/temp"
[[filemap]]
alias = "blah"
folder = "c:/temp"
[DB]
#if dsn is empty, the static resolver will be used
......
......@@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"github.com/goph/emperror"
"github.com/op/go-logging"
"gitlab.switch.ch/memoriav/memobase-2020/streaming-server/memostream"
"io"
"io/ioutil"
......@@ -16,23 +17,48 @@ import (
"path/filepath"
"runtime"
"strings"
"time"
)
type Crawler struct {
db *sql.DB
headerSize int
pageSize int
tempDir string
siegfried Siegfried
db *sql.DB
log *logging.Logger
headerSize int
pageSize int
tempDir string
siegfried Siegfried
crawlOK time.Duration
crawlError time.Duration
crawlErrorNew time.Duration
ffmpeg string
ffprobe string
bannerfolder string
mapping *FileMapper
}
func NewCrawler(db *sql.DB, headerSize, pageSize int, tempDir, siegfried string) *Crawler {
func NewCrawler(
db *sql.DB,
headerSize, pageSize int,
tempDir, siegfried string,
crawlOK, crawlError, crawlErrorNew time.Duration,
ffmpeg, ffprobe string,
bannerfolder string,
mapping *FileMapper,
log *logging.Logger) *Crawler {
cr := &Crawler{
db: db,
headerSize: headerSize,
pageSize: pageSize,
tempDir: tempDir,
siegfried: Siegfried{surl: siegfried},
db: db,
headerSize: headerSize,
pageSize: pageSize,
tempDir: tempDir,
siegfried: Siegfried{surl: siegfried},
crawlOK: crawlOK,
crawlError: crawlError,
crawlErrorNew: crawlErrorNew,
ffmpeg: ffmpeg,
ffprobe: ffprobe,
bannerfolder: bannerfolder,
mapping: mapping,
log: log,
}
return cr
}
......@@ -44,26 +70,29 @@ type Metadata struct {
/*
load all entries from query in array
*/
func (cr *Crawler) getEntries(sqlstr string) ([]*memostream.MediaEntry, error) {
func (cr *Crawler) getEntries(sqlstr string, args ...interface{}) ([]*memostream.MediaEntry, error) {
entries := []*memostream.MediaEntry{}
// get 100 entries max.
rows, err := cr.db.Query(sqlstr+" LIMIT 0, ?", cr.headerSize)
sqlstr += " LIMIT 0, ?"
args = append(args, cr.headerSize)
rows, err := cr.db.Query(sqlstr, args...)
if err == sql.ErrNoRows { // dataset not found
return entries, nil
}
if err != nil { // something strange happenz
return entries, emperror.Wrapf(err, "error querying %s", sqlstr)
return nil, emperror.Wrapf(err, "error querying %s", sqlstr)
}
defer rows.Close()
for rows.Next() {
var signature, access, protocol, uri, status string
// get data
if err := rows.Scan(&signature, &uri, &access, &protocol, &status); err != nil {
return entries, emperror.Wrapf(err, "cannot scan values")
return nil, emperror.Wrapf(err, "cannot scan values")
}
me, err := memostream.NewMediaEntry(signature, uri, access, protocol, status)
if err != nil {
return entries, emperror.Wrapf(err, "cannot create MediaEntry")
return nil, emperror.Wrapf(err, "cannot create MediaEntry")
}
entries = append(entries, me)
}
......@@ -72,6 +101,8 @@ func (cr *Crawler) getEntries(sqlstr string) ([]*memostream.MediaEntry, error) {
func (cr *Crawler) getContentHeader(entry *memostream.MediaEntry) (buf []byte, mimetype string, err error) {
if entry.Protocol == memostream.Media_Proxy || entry.Protocol == memostream.Media_Redirect {
cr.log.Infof("loading header from %s", entry.URI.String())
// build range request. we do not want to load more than needed
req, err := http.NewRequest("GET", entry.URI.String(), nil)
if err != nil {
......@@ -110,9 +141,9 @@ func (cr *Crawler) getContentHeader(entry *memostream.MediaEntry) (buf []byte, m
break
}
} else if entry.Protocol == memostream.Media_File {
path := filepath.Clean(entry.URI.Path)
if runtime.GOOS == "windows" {
path = strings.TrimLeft(path, string(filepath.Separator))
path, err := cr.mapping.get(entry.URI)
if err != nil {
return nil, "", emperror.Wrapf(err, "cannot map uri %s of signature %s", entry.URI.String(), entry.Signature)
}
f, err := os.Open(path)
if err != nil {
......@@ -128,17 +159,44 @@ func (cr *Crawler) getContentHeader(entry *memostream.MediaEntry) (buf []byte, m
return
}
func (cr *Crawler) getBanner(entry *memostream.MediaEntry) (string, error) {
var prg string
var params []string
var outputfilename string
var inputfilename string
outputfilename = filepath.Join(cr.bannerfolder, fmt.Sprintf("%s.png", entry.Signature))
// todo: this code is unusable crap
if runtime.GOOS == "windows" {
prg = "wsl.exe"
params = append(params, cr.ffmpeg)
inputfilename = filepath.Join(prg)
prg = inputfilename
} else {
prg = cr.ffmpeg
}
params = append(params,
"-ss", "00:00:12",
"-i", entry.URI.String(),
"-vframes", "1",
"-q:v", "2",
outputfilename)
return "", nil
}
/*
load 500 byte from an url and send it to siegfried
*/
func (cr *Crawler) check(entry *memostream.MediaEntry, siegfried bool) ([]SFMatches, string, error) {
func (cr *Crawler) check(entry *memostream.MediaEntry, siegfried bool) ([]SFMatches, string, string, error) {
cr.log.Infof("checking %s", entry.Signature)
// ************************************
// * get the first bytes of data
// ************************************
buf, mimetype, err := cr.getContentHeader(entry)
if err != nil {
return nil, "", emperror.Wrapf(err, "cannot read content header")
return nil, "", emperror.Wrapf(err, "cannot read content header").Error(), nil
}
// if there's no mimetype in response header try to detect
......@@ -152,15 +210,15 @@ func (cr *Crawler) check(entry *memostream.MediaEntry, siegfried bool) ([]SFMatc
// write buf to temp file
tmpfile, err := ioutil.TempFile(cr.tempDir, "siegfried")
if err != nil {
return nil, "", emperror.Wrapf(err, "cannot create tempfile")
return nil, "", "", emperror.Wrapf(err, "cannot create tempfile")
}
defer os.Remove(tmpfile.Name()) // clean up
if _, err := tmpfile.Write(buf); err != nil {
return nil, "", emperror.Wrapf(err, "cannot write to tempfile %s", tmpfile.Name())
return nil, "", "", emperror.Wrapf(err, "cannot write to tempfile %s", tmpfile.Name())
}
if err := tmpfile.Close(); err != nil {
return nil, "", emperror.Wrapf(err, "cannot close tempfile %s", tmpfile.Name())
return nil, "", "", emperror.Wrapf(err, "cannot close tempfile %s", tmpfile.Name())
}
// ************************************
......@@ -170,7 +228,7 @@ func (cr *Crawler) check(entry *memostream.MediaEntry, siegfried bool) ([]SFMatc
if siegfried {
sfMatches, err = cr.siegfried.Get(tmpfile.Name())
if err != nil {
return nil, "", emperror.Wrapf(err, "cannot call siegfried for file %s", tmpfile.Name())
return nil, "", "", emperror.Wrapf(err, "cannot call siegfried for file %s", tmpfile.Name())
}
mrel := MimeRelevance(mimetype)
// set the mimetype if it's a better one...
......@@ -181,10 +239,55 @@ func (cr *Crawler) check(entry *memostream.MediaEntry, siegfried bool) ([]SFMatc
}
}
}
return sfMatches, mimetype, nil
return sfMatches, mimetype, "", nil
}
func (cr *Crawler) checkList(entries []*memostream.MediaEntry) error {
for _, entry := range entries {
sfMatches, mimetype, errMsg, err := cr.check(entry, true)
if err != nil {
return emperror.Wrapf(err, "error checking entry %s", entry.Signature)
}
sqlstr := "UPDATE test.test2 SET lastcheck=NOW()"
var statusStr string
var params []interface{}
if errMsg != "" {
statusStr = memostream.MediaStatusNum[memostream.Media_Error]
if entry.Status != memostream.Media_Error {
sqlstr += ", lastchange=NOW()"
}
// return emperror.Wrapf(err, "cannot check entry %s", entry.Signature)
} else {
statusStr = memostream.MediaStatusNum[memostream.Media_OK]
meta := &Metadata{SFMatches: sfMatches}
metajson, err := json.Marshal(meta)
if err != nil {
return emperror.Wrapf(err, "cannot marshal %v", meta)
}
sqlstr += ", metadata=?"
params = append(params, string(metajson))
if mimetype != "" {
sqlstr += ", mimetype=?"
params = append(params, mimetype)
}
if entry.Status != memostream.Media_OK {
sqlstr += ", lastchange=NOW()"
}
}
sqlstr += ", status=?, errormessage=? WHERE sig=?"
params = append(params, statusStr, errMsg, entry.Signature)
if _, err := cr.db.Exec(sqlstr, params...); err != nil {
return emperror.Wrapf(err, "error executing sql %s [%v]", sqlstr, params)
}
}
return nil
}
func (cr *Crawler) CrawlNew() error {
cr.log.Infof("start crawling new entities")
sqlstr := "SELECT sig AS signature, uri, access, proto AS protocol, `status` " +
"FROM test.test2 " +
"WHERE status='new' " +
......@@ -197,112 +300,71 @@ func (cr *Crawler) CrawlNew() error {
if len(entries) == 0 {
break
}
for _, entry := range entries {
sfMatches, mimetype, err := cr.check(entry, true)
sqlstr := "UPDATE test.test2 SET lastcheck=NOW()"
var statusStr string
var errMsg string
var params []interface{}
if err != nil {
statusStr = memostream.MediaStatusNum[memostream.Media_Error]
errMsg = err.Error()
if entry.Status != memostream.Media_Error {
sqlstr += ", lastchange=NOW()"
}
// return emperror.Wrapf(err, "cannot check entry %s", entry.Signature)
} else {
statusStr = memostream.MediaStatusNum[memostream.Media_OK]
meta := &Metadata{SFMatches: sfMatches}
metajson, err := json.Marshal(meta)
if err != nil {
return emperror.Wrapf(err, "cannot marshal %v", meta)
}
sqlstr += ", metadata=?"
params = append(params, string(metajson))
if mimetype != "" {
sqlstr += ", mimetype=?"
params = append(params, mimetype)
}
if entry.Status != memostream.Media_OK {
sqlstr += ", lastchange=NOW()"
}
}
sqlstr += ", status=?, errormessage=? WHERE sig=?"
params = append(params, statusStr, errMsg, entry.Signature)
if _, err := cr.db.Exec(sqlstr, params...); err != nil {
return emperror.Wrapf(err, "error executing sql %s [%v]", sqlstr, params)
}
if err := cr.checkList(entries); err != nil {
return emperror.Wrapf(err, "cannot check result list")
}
}
return nil
}
func (cr *Crawler) CrawlError() error {
cr.log.Infof("start crawling entities with errors")
sqlstr := "SELECT sig AS signature, uri, access, proto AS protocol, `status` " +
"FROM test.test2 " +
"WHERE status='error' " +
"AND lastcheck < ? " +
"AND (lastcheck < ? OR lastchange > ?)" +
"ORDER BY lastchange ASC"
for {
entries, err := cr.getEntries(sqlstr)
entries, err := cr.getEntries(sqlstr, cr.crawlErrorNew, cr.crawlError, cr.crawlErrorNew)
if err != nil {
return emperror.Wrapf(err, "cannot get new entries")
}
if len(entries) == 0 {
break
}
for _, entry := range entries {
sfMatches, mimetype, err := cr.check(entry, true)
sqlstr := "UPDATE test.test2 SET lastcheck=NOW()"
var statusStr string
var errMsg string
var params []interface{}
if err != nil {
statusStr = memostream.MediaStatusNum[memostream.Media_Error]
errMsg = err.Error()
if entry.Status != memostream.Media_Error {
sqlstr += ", lastchange=NOW()"
}
// return emperror.Wrapf(err, "cannot check entry %s", entry.Signature)
} else {
statusStr = memostream.MediaStatusNum[memostream.Media_OK]
meta := &Metadata{SFMatches: sfMatches}
metajson, err := json.Marshal(meta)
if err != nil {
return emperror.Wrapf(err, "cannot marshal %v", meta)
}
sqlstr += ", metadata=?"
params = append(params, string(metajson))
if mimetype != "" {
sqlstr += ", mimetype=?"
params = append(params, mimetype)
}
if entry.Status != memostream.Media_OK {
sqlstr += ", lastchange=NOW()"
}
}
sqlstr += ", status=?, errormessage=? WHERE sig=?"
params = append(params, statusStr, errMsg, entry.Signature)
if _, err := cr.db.Exec(sqlstr, params...); err != nil {
return emperror.Wrapf(err, "error executing sql %s [%v]", sqlstr, params)
}
if err := cr.checkList(entries); err != nil {
return emperror.Wrapf(err, "cannot check result list")
}
}
return nil
}
func (cr *Crawler) CrawlOK() error {
cr.log.Infof("start crawling entities without errors")
func (cr *Crawler) Runner() error {
return cr.crawl()
sqlstr := "SELECT sig AS signature, uri, access, proto AS protocol, `status` " +
"FROM test.test2 " +
"WHERE status='ok' " +
"AND lastcheck < ? " +
"ORDER BY lastcheck ASC"
for {
entries, err := cr.getEntries(sqlstr, cr.crawlOK)
if err != nil {
return emperror.Wrapf(err, "cannot get new entries")
}
if len(entries) == 0 {
break
}
if err := cr.checkList(entries); err != nil {
return emperror.Wrapf(err, "cannot check result list")
}
}
return nil
}
func (cr *Crawler) crawl() error {
func (cr *Crawler) CrawlAll() error {
cr.log.Infof("start crawling")
if err := cr.CrawlNew(); err != nil {
return emperror.Wrap(err, "error crawling new entities")
}
if err := cr.CrawlError(); err != nil {
return emperror.Wrap(err, "error crawling error entities")
}
if err := cr.CrawlOK(); err != nil {
return emperror.Wrap(err, "error crawling entities")
}
return nil
}
......
package memocrawler
import (
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"runtime"
"strings"
)
type FileMapper struct {
mapping map[string]string
}
func NewFileMapper(mapping map[string]string) *FileMapper {
return &FileMapper{mapping:mapping}
}
func (fm *FileMapper) get(uri *url.URL) (string, error) {
if uri.Scheme != "file" {
return "", errors.New( fmt.Sprintf("cannot handle scheme %s: need file scheme", uri.Scheme))
}
var filename string
var ok bool
if uri.Host != "" {
filename, ok = fm.mapping[strings.ToLower(uri.Host)]
if !ok {
return "", errors.New(fmt.Sprintf("no mapping for %s", uri.Host))
}
}
filename = filepath.Join(filename, uri.Path)
filename = filepath.Clean(filename)
if runtime.GOOS == "windows" {
filename = strings.TrimPrefix(filename, string(os.PathSeparator))
}
return filename, nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment