Commit 09220a2c authored by Juergen Enge's avatar Juergen Enge

url format check moved to content check functionality

parent dc2eab67
......@@ -82,8 +82,8 @@ func NewCrawler(
/*
load all entries from query in array
*/
func (cr *Crawler) getEntries(sqlstr string, args ...interface{}) ([]*memostream.MediaEntry, error) {
entries := []*memostream.MediaEntry{}
func (cr *Crawler) getEntries(sqlstr string, args ...interface{}) ([]*MediaEntry, error) {
entries := []*MediaEntry{}
sqlstr += " LIMIT 0, ?"
args = append(args, cr.pageSize)
......@@ -102,7 +102,7 @@ func (cr *Crawler) getEntries(sqlstr string, args ...interface{}) ([]*memostream
if err := rows.Scan(&signature, &uri, &access, &protocol, &status); err != nil {
return nil, emperror.Wrapf(err, "cannot scan values")
}
me, err := memostream.NewMediaEntry(signature, uri, access, protocol, status, "", "", 0, 0, 0, "", "")
me, err := NewMediaEntry(signature, uri, access, protocol, status, "", "", 0, 0, 0, "", "")
if err != nil {
return nil, emperror.Wrapf(err, "cannot create MediaEntry")
}
......@@ -195,7 +195,7 @@ func (cr *Crawler) CrawlError() error {
func (cr *Crawler) CrawlOK() error {
cr.log.Infof("start crawling entities without errors")
var lastFirst *memostream.MediaEntry
var lastFirst *MediaEntry
sqlstr := fmt.Sprintf("SELECT sig AS signature, uri, access, proto AS protocol, `status` "+
"FROM %s.entities "+
"WHERE status='ok' "+
......
......@@ -12,16 +12,66 @@ package memocrawler
import (
"container/list"
"errors"
"fmt"
"github.com/op/go-logging"
"gitlab.switch.ch/memoriav/memobase-2020/services/streaming-server/pkg/memostream"
"sync"
"time"
)
// Represents the data needed to stream media object
type MediaEntry struct {
Signature string
URI string
Protocol memostream.MediaProtocol
Access memostream.MediaAccess
Status memostream.MediaStatus
Type string
Mimetype string
Width, Height, Duration int64
ManifestV2, ManifestV3 string
}
func NewMediaEntry(signature, uri, access, protocol, status, t, mimetype string, width, height, length int64, manifestV2, manifestV3 string) (*MediaEntry, error) {
p, ok := memostream.MediaProtocolString[protocol]
// invalid data in database
if !ok {
return nil, errors.New(fmt.Sprintf("invalid protocol value %s", protocol))
}
a, ok := memostream.MediaAccessString[access]
// invalid data in database
if !ok {
return nil, errors.New(fmt.Sprintf("invalid access value %s", access))
}
s, ok := memostream.MediaStatusString[status]
// invalid data in database
if !ok {
return nil, errors.New(fmt.Sprintf("invalid statuss value %s", status))
}
return &MediaEntry{
Signature: signature,
URI: uri,
Protocol: p,
Access: a,
Status: s,
Type: t,
Mimetype: mimetype,
Width: width,
Height: height,
Duration: length,
ManifestV2: manifestV2,
ManifestV3: manifestV3,
}, nil
}
type Job struct {
ID string
cr *Crawler
entry *memostream.MediaEntry
entry *MediaEntry
}
func (j *Job) GetID() string {
......
......@@ -18,6 +18,7 @@ import (
"gitlab.switch.ch/memoriav/memobase-2020/services/streaming-server/pkg/memostream"
"io"
"net/http"
"net/url"
"time"
)
......@@ -39,8 +40,17 @@ func NewWorker(id int, cr *Crawler, jobQueue *JobQueue) Worker {
}
}
func (w Worker) checkFile(entry *memostream.MediaEntry) error {
rsc, _, err := w.cr.mapping.Open(entry.URI)
func (w Worker) checkFile(entry *MediaEntry) error {
// Create url and check uri syntax
url, err := url.Parse(entry.URI)
if err != nil {
return emperror.Wrapf(err, "cannot parse uri %s", entry.URI)
}
if url == nil {
return emperror.Wrapf(err, "url from uri is nil %s", entry.URI)
}
rsc, _, err := w.cr.mapping.Open(*url)
if err != nil {
return emperror.Wrapf(err, "cannot open file")
}
......@@ -49,26 +59,35 @@ func (w Worker) checkFile(entry *memostream.MediaEntry) error {
bbuf := bytes.NewBuffer(buf)
written, err := io.CopyN(bbuf, rsc, int64(w.cr.headerSize))
if err != nil {
return emperror.Wrapf(err, "cannot read from %s", entry.URI.String())
return emperror.Wrapf(err, "cannot read from %s", url.String())
}
if written != int64(w.cr.headerSize) {
return fmt.Errorf("could not read %v bytes from %s - %v bytes read", w.cr.headerSize, entry.URI.String(), written)
return fmt.Errorf("could not read %v bytes from %s - %v bytes read", w.cr.headerSize, url.String(), written)
}
return nil
}
func (w Worker) checkURL(entry *memostream.MediaEntry) error {
func (w Worker) checkURL(entry *MediaEntry) error {
// Create url and check uri syntax
url, err := url.Parse(entry.URI)
if err != nil {
return emperror.Wrapf(err, "cannot parse uri %s", entry.URI)
}
if url == nil {
return emperror.Wrapf(err, "url from uri is nil %s", entry.URI)
}
client := &http.Client{}
ctx, _ := context.WithTimeout(context.Background(), timeout)
req, err := http.NewRequestWithContext(ctx, "GET", entry.URI.String(), nil)
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
if err != nil {
return emperror.Wrapf(err, "cannot create GET request for %s", entry.URI.String())
return emperror.Wrapf(err, "cannot create GET request for %s", url.String())
}
// we'll try a range request
req.Header.Set("Range", fmt.Sprintf("bytes=0-%v", w.cr.headerSize-1))
resp, err := client.Do(req)
if err != nil {
return emperror.Wrapf(err, "cannot execute GET request for %s", entry.URI.String())
return emperror.Wrapf(err, "cannot execute GET request for %s", url.String())
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
......@@ -78,10 +97,10 @@ func (w Worker) checkURL(entry *memostream.MediaEntry) error {
bbuf := bytes.NewBuffer(buf)
written, err := io.CopyN(bbuf, resp.Body, int64(w.cr.headerSize))
if err != nil {
return emperror.Wrapf(err, "cannot read from %s", entry.URI.String())
return emperror.Wrapf(err, "cannot read from %s", url.String())
}
if written != int64(w.cr.headerSize) {
return fmt.Errorf("could not read %v bytes from %s - %v bytes read", w.cr.headerSize, entry.URI.String(), written)
return fmt.Errorf("could not read %v bytes from %s - %v bytes read", w.cr.headerSize, url.String(), written)
}
return nil
}
......@@ -104,7 +123,7 @@ func (w Worker) stop() {
}
func (w Worker) Do(job *Job) (err error) {
w.cr.log.Debugf("#%03d [%s] checking %s - %s", w.id, job.entry.Signature, memostream.MediaProtocolNum[job.entry.Protocol], job.entry.URI.String())
w.cr.log.Debugf("#%03d [%s] checking %s - %s", w.id, job.entry.Signature, memostream.MediaProtocolNum[job.entry.Protocol], job.entry.URI)
switch job.entry.Protocol {
case memostream.Media_File:
err = w.checkFile(job.entry)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment