Commit db077db5 authored by Jürgen Enge's avatar Jürgen Enge

checker done

parent a1eeeb04
......@@ -19,7 +19,6 @@ func (d *duration) UnmarshalText(text []byte) error {
type CfgDBMySQL struct {
Dsn string
ConnMaxTimeout duration
Query string
}
type Config struct {
......@@ -30,6 +29,10 @@ type Config struct {
Addr string
JwtKey string
DB CfgDBMySQL
PageSize int
HeaderSize int
TempDir string
Siegfried string
}
func LoadConfig(filepath string) Config {
......
......@@ -5,7 +5,7 @@ import (
"database/sql"
"flag"
_ "github.com/go-sql-driver/mysql"
"gitlab.switch.ch/memoriav/memobase-2020/url-checker/main/memocrawler"
"gitlab.switch.ch/memoriav/memobase-2020/url-checker/memocrawler"
"log"
"os"
"os/signal"
......@@ -14,6 +14,10 @@ import (
"time"
)
/*
clear database: update `test2` set status="new", metadata=null,errormessage=null,mimetype=null,lastcheck=null,lastchange=null
*/
func main() {
configFile := flag.String("cfg", "./memocrawler.toml", "config file location")
flag.Parse()
......@@ -61,8 +65,10 @@ func main() {
}
defer db.Close()
cr := memocrawler.NewCrawler(db)
go cr.Runner()
cr := memocrawler.NewCrawler(db, config.HeaderSize, config.PageSize, config.TempDir, config.Siegfried)
// go cr.Runner()
cr.CrawlNew()
return
end := make(chan bool, 1)
......
package memocrawler
import (
"context"
"database/sql"
)
type Crawler struct {
db *sql.DB
}
func NewCrawler(db *sql.DB ) *Crawler {
cr := &Crawler{db:db}
return cr
}
func (cr *Crawler) Runner() error {
return cr.crawl()
}
func (cr *Crawler) crawl() error {
return nil
}
func (cr *Crawler) Shutdown( ctx context.Context ) {
}
\ No newline at end of file
......@@ -5,6 +5,10 @@ addr = "localhost:81"
certpem = "" # tls client certificate file in PEM format
keypem = "" # tls client key file in PEM format
jwtkey = "swordfish"
headersize = 5000 # number of bytes which are send to siegfried
pagesize = 100 # number of entries done by one database access
tempdir = "C:/temp/"
siegfried = "http://localhost:5138/identify/[[PATH]]?format=json"
[DB]
#if dsn is empty, the static resolver will be used
......@@ -13,6 +17,4 @@ jwtkey = "swordfish"
#dsn = ""
# should be smaller than server connection timeout to allow controlled reconnect
connMaxTimeout = "4h"
# query has to return the fields uri, access and protocol. One parameter
query = "SELECT uri, access, proto AS protocol FROM test.test2 WHERE sig = ?"
package memocrawler
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"github.com/goph/emperror"
"gitlab.switch.ch/memoriav/memobase-2020/streaming-server/memostream"
"io"
"io/ioutil"
"mime"
"net/http"
"os"
"path/filepath"
"runtime"
"strings"
)
type Crawler struct {
db *sql.DB
headerSize int
pageSize int
tempDir string
siegfried Siegfried
}
func NewCrawler(db *sql.DB, headerSize, pageSize int, tempDir, siegfried string) *Crawler {
cr := &Crawler{
db: db,
headerSize: headerSize,
pageSize: pageSize,
tempDir: tempDir,
siegfried: Siegfried{surl: siegfried},
}
return cr
}
type Metadata struct {
SFMatches []SFMatches `json:"siegfried"'`
}
/*
load all entries from query in array
*/
func (cr *Crawler) getEntries(sqlstr string) ([]*memostream.MediaEntry, error) {
entries := []*memostream.MediaEntry{}
// get 100 entries max.
rows, err := cr.db.Query(sqlstr+" LIMIT 0, ?", cr.headerSize)
if err == sql.ErrNoRows { // dataset not found
return entries, nil
}
if err != nil { // something strange happenz
return entries, emperror.Wrapf(err, "error querying %s", sqlstr)
}
defer rows.Close()
for rows.Next() {
var signature, access, protocol, uri, status string
// get data
if err := rows.Scan(&signature, &uri, &access, &protocol, &status); err != nil {
return entries, emperror.Wrapf(err, "cannot scan values")
}
me, err := memostream.NewMediaEntry(signature, uri, access, protocol, status)
if err != nil {
return entries, emperror.Wrapf(err, "cannot create MediaEntry")
}
entries = append(entries, me)
}
return entries, nil
}
func (cr *Crawler) getContentHeader(entry *memostream.MediaEntry) (buf []byte, mimetype string, err error) {
if entry.Protocol == memostream.Media_Proxy || entry.Protocol == memostream.Media_Redirect {
// build range request. we do not want to load more than needed
req, err := http.NewRequest("GET", entry.URI.String(), nil)
if err != nil {
return nil, "", emperror.Wrapf(err, "error creating request for uri", entry.URI.String())
}
req.Header.Set("Range", fmt.Sprintf("bytes=0-%d", cr.headerSize-1))
var client http.Client
resp, err := client.Do(req)
if err != nil {
return nil, "", emperror.Wrapf(err, "error querying uri")
}
// default should follow redirects
defer resp.Body.Close()
// read head of content
buf = make([]byte, cr.headerSize)
num, err := io.ReadFull(resp.Body, buf)
if err != nil && err != io.ErrUnexpectedEOF {
return nil, "", emperror.Wrapf(err, "cannot read content from url %s", entry.URI.String())
}
if num == 0 {
return nil, "", errors.New(fmt.Sprintf("no content from url %s", entry.URI.String()))
}
// ************************************
// * get mimetype from response header
// ************************************
mimetype = resp.Header.Get("Content-type")
// try to get a clean mimetype
for _, v := range strings.Split(mimetype, ",") {
t, _, err := mime.ParseMediaType(v)
if err != nil {
continue
}
mimetype = t
break
}
} else if entry.Protocol == memostream.Media_File {
path := filepath.Clean(entry.URI.Path)
if runtime.GOOS == "windows" {
path = strings.TrimLeft(path, string(filepath.Separator))
}
f, err := os.Open(path)
if err != nil {
return nil, "", emperror.Wrapf(err, "cannot open file %s for signature %s", path, entry.Signature)
}
buf = make([]byte, cr.headerSize)
if _, err := f.Read(buf); err != nil {
return nil, "", emperror.Wrapf(err, "cannot read from file %s for signature %s", path, entry.Signature)
}
} else {
return nil, "", errors.New(fmt.Sprintf("unknown access protocol %s for signature %s", entry.Protocol, entry.Signature))
}
return
}
/*
load 500 byte from an url and send it to siegfried
*/
func (cr *Crawler) check(entry *memostream.MediaEntry, siegfried bool) ([]SFMatches, string, error) {
// ************************************
// * get the first bytes of data
// ************************************
buf, mimetype, err := cr.getContentHeader(entry)
if err != nil {
return nil, "", emperror.Wrapf(err, "cannot read content header")
}
// if there's no mimetype in response header try to detect
if mimetype == "" {
mimetype = http.DetectContentType(buf)
}
// ************************************
// * write data into file
// ************************************
// write buf to temp file
tmpfile, err := ioutil.TempFile(cr.tempDir, "siegfried")
if err != nil {
return nil, "", emperror.Wrapf(err, "cannot create tempfile")
}
defer os.Remove(tmpfile.Name()) // clean up
if _, err := tmpfile.Write(buf); err != nil {
return nil, "", emperror.Wrapf(err, "cannot write to tempfile %s", tmpfile.Name())
}
if err := tmpfile.Close(); err != nil {
return nil, "", emperror.Wrapf(err, "cannot close tempfile %s", tmpfile.Name())
}
// ************************************
// * ask siegfried for filetype
// ************************************
var sfMatches []SFMatches
if siegfried {
sfMatches, err = cr.siegfried.Get(tmpfile.Name())
if err != nil {
return nil, "", emperror.Wrapf(err, "cannot call siegfried for file %s", tmpfile.Name())
}
mrel := MimeRelevance(mimetype)
// set the mimetype if it's a better one...
for _, sfMatch := range sfMatches {
sfrel := MimeRelevance(sfMatch.Mime)
if sfrel > mrel {
mimetype = sfMatch.Mime
}
}
}
return sfMatches, mimetype, nil
}
func (cr *Crawler) CrawlNew() error {
sqlstr := "SELECT sig AS signature, uri, access, proto AS protocol, `status` " +
"FROM test.test2 " +
"WHERE status='new' " +
"ORDER BY creationtime ASC"
for {
entries, err := cr.getEntries(sqlstr)
if err != nil {
return emperror.Wrapf(err, "cannot get new entries")
}
if len(entries) == 0 {
break
}
for _, entry := range entries {
sfMatches, mimetype, err := cr.check(entry, true)
sqlstr := "UPDATE test.test2 SET lastcheck=NOW()"
var statusStr string
var errMsg string
var params []interface{}
if err != nil {
statusStr = memostream.MediaStatusNum[memostream.Media_Error]
errMsg = err.Error()
if entry.Status != memostream.Media_Error {
sqlstr += ", lastchange=NOW()"
}
// return emperror.Wrapf(err, "cannot check entry %s", entry.Signature)
} else {
statusStr = memostream.MediaStatusNum[memostream.Media_OK]
meta := &Metadata{SFMatches: sfMatches}
metajson, err := json.Marshal(meta)
if err != nil {
return emperror.Wrapf(err, "cannot marshal %v", meta)
}
sqlstr += ", metadata=?"
params = append(params, string(metajson))
if mimetype != "" {
sqlstr += ", mimetype=?"
params = append(params, mimetype)
}
if entry.Status != memostream.Media_OK {
sqlstr += ", lastchange=NOW()"
}
}
sqlstr += ", status=?, errormessage=? WHERE sig=?"
params = append(params, statusStr, errMsg, entry.Signature)
if _, err := cr.db.Exec(sqlstr, params...); err != nil {
return emperror.Wrapf(err, "error executing sql %s [%v]", sqlstr, params)
}
}
}
return nil
}
func (cr *Crawler) CrawlError() error {
sqlstr := "SELECT sig AS signature, uri, access, proto AS protocol, `status` " +
"FROM test.test2 " +
"WHERE status='error' " +
"ORDER BY lastchange ASC"
for {
entries, err := cr.getEntries(sqlstr)
if err != nil {
return emperror.Wrapf(err, "cannot get new entries")
}
if len(entries) == 0 {
break
}
for _, entry := range entries {
sfMatches, mimetype, err := cr.check(entry, true)
sqlstr := "UPDATE test.test2 SET lastcheck=NOW()"
var statusStr string
var errMsg string
var params []interface{}
if err != nil {
statusStr = memostream.MediaStatusNum[memostream.Media_Error]
errMsg = err.Error()
if entry.Status != memostream.Media_Error {
sqlstr += ", lastchange=NOW()"
}
// return emperror.Wrapf(err, "cannot check entry %s", entry.Signature)
} else {
statusStr = memostream.MediaStatusNum[memostream.Media_OK]
meta := &Metadata{SFMatches: sfMatches}
metajson, err := json.Marshal(meta)
if err != nil {
return emperror.Wrapf(err, "cannot marshal %v", meta)
}
sqlstr += ", metadata=?"
params = append(params, string(metajson))
if mimetype != "" {
sqlstr += ", mimetype=?"
params = append(params, mimetype)
}
if entry.Status != memostream.Media_OK {
sqlstr += ", lastchange=NOW()"
}
}
sqlstr += ", status=?, errormessage=? WHERE sig=?"
params = append(params, statusStr, errMsg, entry.Signature)
if _, err := cr.db.Exec(sqlstr, params...); err != nil {
return emperror.Wrapf(err, "error executing sql %s [%v]", sqlstr, params)
}
}
}
return nil
}
func (cr *Crawler) Runner() error {
return cr.crawl()
}
func (cr *Crawler) crawl() error {
return nil
}
func (cr *Crawler) Shutdown(ctx context.Context) {
}
......@@ -3,6 +3,7 @@ package memocrawler
import (
"github.com/op/go-logging"
"os"
"strings"
)
var _logformat = logging.MustStringFormatter(
......@@ -39,3 +40,25 @@ func CreateLogger(module string, logfile string, loglevel string) (log *logging.
return
}
/*
holistic function to give some mimetypes a relevance
*/
func MimeRelevance( mimetype string) (relevance int) {
if mimetype == "" {
return 0
}
if mimetype == "application/octet-stream" {
return 1
}
if mimetype == "text/plain" {
return 2
}
if strings.HasPrefix(mimetype, "application/") {
return 3
}
if strings.HasPrefix(mimetype, "text/") {
return 4
}
return 1000
}
\ No newline at end of file
package memocrawler
import (
"encoding/json"
"github.com/goph/emperror"
"io/ioutil"
"net/http"
"net/url"
"strings"
)
type SFIdentifier struct {
Name string `json:"name,omitempty"`
Details string `json:"details,omitempty"`
}
type SFMatches struct {
Ns string `json:"ns,omitempty"`
Id string `json:"id,omitempty"`
Format string `json:"format,omitempty"`
Version string `json:"version,omitempty"`
Mime string `json:"mime,omitempty"`
Basis string `json:"basis,omitempty"`
Warning string `json:"warning,omitempty"`
}
type SFFiles struct {
Filename string `json:"filename,omitempty"`
Filesize int64 `json:"filesize,omitempty"`
Modified string `json:"modified,omitempty"`
Errors string `json:"errors,omitempty"`
Matches []SFMatches `json:"matches,omitempty"`
}
type SF struct {
Siegfried string `json:"siegfried,omitempty"`
Scandate string `json:"scandate,omitempty"`
Signature string `json:"signature,omitempty"`
Created string `json:"created,omitempty"`
Identifiers []SFIdentifier `json:"identfiers,omitempty"`
Files []SFFiles `json:"files,omitempty"`
}
type Siegfried struct {
surl string
}
func (sf *Siegfried) Get( filename string ) ([]SFMatches, error) {
urlstring := strings.Replace(sf.surl, "[[PATH]]", url.QueryEscape(filename), -1)
resp, err := http.Get(urlstring)
if err != nil {
return nil, emperror.Wrapf(err, "cannot query siegfried - %v", urlstring)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, emperror.Wrapf(err, "status not ok - %v -> %v", urlstring, resp.Status)
}
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, emperror.Wrapf(err, "error reading body - %v", urlstring)
}
result := SF{}
err = json.Unmarshal(bodyBytes, &result)
if err != nil {
return nil, emperror.Wrapf(err, "error decoding json - %v", string(bodyBytes))
}
if len(result.Files) == 0 {
return nil, emperror.Wrapf(err, "no file in sf result - %v", string(bodyBytes))
}
return result.Files[0].Matches, nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment