508 lines
12 KiB
Go
508 lines
12 KiB
Go
package deadlinks
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"code.betamike.com/mediocregopher/mediocre-go-lib/miter"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
migrate "github.com/rubenv/sql-migrate"
|
|
)
|
|
|
|
// Store keeps track of the current status of all discovered Resources, and
|
|
// links between them. A Resource which is neither pinned nor linked to from
|
|
// another Resource is considered to not exist.
|
|
//
|
|
// An implementation of Store must be thread-safe.
|
|
type Store interface {
|
|
// GetByStatus returns all Resources with the given Status.
|
|
GetByStatus(ResourceStatus) miter.Iterator[Resource]
|
|
|
|
// GetURLsByLastChecked returns the URLs of all Resources with LastChecked
|
|
// values older than the given timestamp, or which have never been checked.
|
|
GetURLsByLastChecked(olderThan time.Time) miter.Iterator[URL]
|
|
|
|
// SetPinned overwrites the set of pinned URLs with the given one.
|
|
SetPinned(context.Context, []URL) error
|
|
|
|
// Update updates the Resource identified by the given URL with the given
|
|
// arguments.
|
|
//
|
|
// Update returns an error if the URL has not been pinned nor referenced as
|
|
// an outgoing URL of a different Resource.
|
|
Update(
|
|
ctx context.Context,
|
|
now time.Time,
|
|
url URL,
|
|
status ResourceStatus,
|
|
errorString string,
|
|
outgoing []URL,
|
|
) error
|
|
|
|
// GC will garbage collect the store, removing any orphaned Resources.
|
|
GC(context.Context) error
|
|
}
|
|
|
|
var migrations = &migrate.MemoryMigrationSource{Migrations: []*migrate.Migration{
|
|
{
|
|
Id: "1",
|
|
Up: []string{
|
|
`CREATE TABLE urls (
|
|
id INTEGER NOT NULL PRIMARY KEY,
|
|
url TEXT NOT NULL,
|
|
UNIQUE(url)
|
|
)`,
|
|
|
|
`CREATE TABLE resources (
|
|
url_id INTEGER NOT NULL PRIMARY KEY,
|
|
status INTEGER NOT NULL DEFAULT 0,
|
|
pinned INTEGER NOT NULL,
|
|
last_checked INTEGER NOT NULL DEFAULT 0,
|
|
error_string TEXT NOT NULL DEFAULT '',
|
|
FOREIGN KEY(url_id) REFERENCES urls(id) ON DELETE CASCADE
|
|
)`,
|
|
|
|
`CREATE TABLE links (
|
|
from_url_id INTEGER NOT NULL,
|
|
to_url_id INTEGER NOT NULL,
|
|
FOREIGN KEY(from_url_id) REFERENCES urls(id) ON DELETE CASCADE,
|
|
FOREIGN KEY(to_url_id) REFERENCES urls(id) ON DELETE CASCADE,
|
|
PRIMARY KEY(from_url_id, to_url_id)
|
|
)`,
|
|
|
|
`CREATE INDEX links_outgoing_idx ON links (to_url_id)`,
|
|
},
|
|
},
|
|
}}
|
|
|
|
// SQLiteSQLiteStoreOpts are optional fields which can be provided to
|
|
// NewSQLiteStore. A nil SQLiteSQLiteStoreOpts is equivalent to an empty one.
|
|
type SQLiteStoreOpts struct {
|
|
// Path to the database file to use.
|
|
//
|
|
// Defaults to ":memory:", indicating an in-memory database will be used.
|
|
Path string
|
|
}
|
|
|
|
func (o *SQLiteStoreOpts) withDefaults() *SQLiteStoreOpts {
|
|
if o == nil {
|
|
o = new(SQLiteStoreOpts)
|
|
}
|
|
|
|
if o.Path == "" {
|
|
o.Path = ":memory:"
|
|
}
|
|
|
|
return o
|
|
}
|
|
|
|
type SQLiteStore struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
var _ Store = (*SQLiteStore)(nil)
|
|
|
|
// NewSQLiteStore returns a Store implementation which uses an in-memory SQLite
|
|
// db.
|
|
func NewSQLiteStore(o *SQLiteStoreOpts) *SQLiteStore {
|
|
o = o.withDefaults()
|
|
|
|
db, err := sql.Open("sqlite3", o.Path+"?_foreign_keys=1")
|
|
if err != nil {
|
|
panic(fmt.Errorf("opening sqlite in memory: %w", err))
|
|
}
|
|
|
|
// go-sqlite doesn't support multiple go-routines, this is equivalent to
|
|
// wrapping each call to the db in a mutex.
|
|
db.SetMaxOpenConns(1)
|
|
|
|
if _, err := migrate.Exec(db, "sqlite3", migrations, migrate.Up); err != nil {
|
|
panic(fmt.Errorf("running migrations: %w", err))
|
|
}
|
|
|
|
return &SQLiteStore{db}
|
|
}
|
|
|
|
// Close cleans up all resources held by the SQLiteStore store, if any. It must
|
|
// be the final method call to the SQLiteStore.
|
|
func (s *SQLiteStore) Close() error {
|
|
return s.db.Close()
|
|
}
|
|
|
|
// iterate is a helper which can be used to read the results of a query in
|
|
// chunks, producing a single unified Iterator.
|
|
//
|
|
// iterate assumes that the rows are being scanned ordered by their row ID,
|
|
// which must be a number, and that each query call has some kind of limit
|
|
// applied.
|
|
func iterate[T any](
|
|
db interface {
|
|
QueryContext(context.Context, string, ...any) (*sql.Rows, error)
|
|
},
|
|
query string,
|
|
mkArgs func(minRowID int) []any,
|
|
scan func(*sql.Rows) (T, int, error), // returns scanned value and its rowID
|
|
) miter.Iterator[T] {
|
|
var (
|
|
zero T
|
|
minRowID = -1
|
|
res, resBase []T
|
|
|
|
pop = func() T {
|
|
r := res[0]
|
|
res = res[1:]
|
|
return r
|
|
}
|
|
)
|
|
|
|
return miter.FromFunc(func(ctx context.Context) (T, error) {
|
|
if len(res) > 0 {
|
|
return pop(), nil
|
|
}
|
|
|
|
res = resBase
|
|
|
|
rows, err := db.QueryContext(ctx, query, mkArgs(minRowID)...)
|
|
if err != nil {
|
|
return zero, fmt.Errorf("executing query: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var r T
|
|
if r, minRowID, err = scan(rows); err != nil {
|
|
return zero, fmt.Errorf("scanning row: %w", err)
|
|
}
|
|
|
|
res = append(res, r)
|
|
}
|
|
|
|
resBase = res[:0]
|
|
|
|
if len(res) == 0 {
|
|
return zero, miter.ErrEnd
|
|
}
|
|
|
|
return pop(), nil
|
|
})
|
|
}
|
|
|
|
const getByStatusLimit = 16
|
|
|
|
// GetByStatus implements the method for the Store interface.
|
|
func (s *SQLiteStore) GetByStatus(status ResourceStatus) miter.Iterator[Resource] {
|
|
const query = `
|
|
WITH
|
|
incoming(url_id, urls) AS (
|
|
SELECT
|
|
to_url_id,
|
|
GROUP_CONCAT(url, char(0))
|
|
FROM links
|
|
JOIN urls ON (urls.id = links.from_url_id)
|
|
GROUP BY to_url_id
|
|
),
|
|
outgoing(url_id, urls) AS (
|
|
SELECT
|
|
from_url_id,
|
|
GROUP_CONCAT(url, char(0))
|
|
FROM links
|
|
JOIN urls ON (urls.id = links.to_url_id)
|
|
GROUP BY from_url_id
|
|
)
|
|
SELECT
|
|
resources.url_id,
|
|
url,
|
|
status,
|
|
pinned,
|
|
last_checked,
|
|
error_string,
|
|
incoming.urls,
|
|
outgoing.urls
|
|
FROM resources
|
|
JOIN urls ON (urls.id = resources.url_id)
|
|
LEFT JOIN incoming ON (incoming.url_id = resources.url_id)
|
|
LEFT JOIN outgoing ON (outgoing.url_id = resources.url_id)
|
|
WHERE status = ?
|
|
AND (pinned OR incoming.urls IS NOT NULL)
|
|
AND resources.url_id > ?
|
|
ORDER BY resources.url_id ASC
|
|
LIMIT ?`
|
|
|
|
return iterate(
|
|
s.db,
|
|
query,
|
|
func(minRowID int) []any {
|
|
return []any{status, minRowID, getByStatusLimit}
|
|
},
|
|
func(rows *sql.Rows) (Resource, int, error) {
|
|
var (
|
|
r Resource
|
|
rowID int
|
|
err error
|
|
lastChecked int64
|
|
incoming, outgoing sql.NullString
|
|
)
|
|
|
|
if err := rows.Scan(
|
|
&rowID,
|
|
&r.URL,
|
|
&r.Status,
|
|
&r.Pinned,
|
|
&lastChecked,
|
|
&r.ErrorString,
|
|
&incoming,
|
|
&outgoing,
|
|
); err != nil {
|
|
return Resource{}, 0, fmt.Errorf("calling Scan: %w", err)
|
|
}
|
|
|
|
if lastChecked != 0 {
|
|
r.LastChecked = time.Unix(lastChecked, 0).UTC()
|
|
}
|
|
|
|
if incoming.String != "" {
|
|
if r.IncomingLinkURLs, err = parseURLs(
|
|
strings.Split(incoming.String, "\x00"),
|
|
); err != nil {
|
|
return Resource{}, 0, fmt.Errorf(
|
|
"parsing incoming links: %w", err,
|
|
)
|
|
}
|
|
}
|
|
|
|
if outgoing.String != "" {
|
|
if r.OutgoingLinkURLs, err = parseURLs(
|
|
strings.Split(outgoing.String, "\x00"),
|
|
); err != nil {
|
|
return Resource{}, 0, fmt.Errorf(
|
|
"parsing outgoing links: %w", err,
|
|
)
|
|
}
|
|
}
|
|
|
|
return r, rowID, nil
|
|
},
|
|
)
|
|
}
|
|
|
|
const getURLsByLastCheckedLimit = 64
|
|
|
|
// GetURLsByLastChecked implements the method for the Store interface.
|
|
func (s *SQLiteStore) GetURLsByLastChecked(
|
|
olderThan time.Time,
|
|
) miter.Iterator[URL] {
|
|
const query = `
|
|
WITH
|
|
incoming(url_id, urls) AS (
|
|
SELECT to_url_id, COUNT(1)
|
|
FROM links
|
|
GROUP BY to_url_id
|
|
)
|
|
SELECT resources.url_id, url
|
|
FROM resources
|
|
JOIN urls ON (urls.id = resources.url_id)
|
|
LEFT JOIN incoming ON (incoming.url_id = resources.url_id)
|
|
WHERE last_checked < ?
|
|
AND (pinned OR incoming.urls IS NOT NULL)
|
|
AND resources.url_id > ?
|
|
ORDER BY resources.url_id ASC
|
|
LIMIT ?`
|
|
|
|
return iterate(
|
|
s.db,
|
|
query,
|
|
func(minRowID int) []any {
|
|
return []any{
|
|
olderThan.Unix(), minRowID, getURLsByLastCheckedLimit,
|
|
}
|
|
},
|
|
func(rows *sql.Rows) (URL, int, error) {
|
|
var (
|
|
urlID int
|
|
urlStr string
|
|
)
|
|
|
|
if err := rows.Scan(&urlID, &urlStr); err != nil {
|
|
return "", 0, fmt.Errorf("scanning url: %w", err)
|
|
}
|
|
|
|
url, err := ParseURL(urlStr)
|
|
if err != nil {
|
|
return "", 0, fmt.Errorf(
|
|
"parsing url %q from db: %w", urlStr, err,
|
|
)
|
|
}
|
|
|
|
return url, urlID, nil
|
|
},
|
|
)
|
|
}
|
|
|
|
func (s *SQLiteStore) touch(ctx context.Context, urls []URL, pinned bool) (
|
|
[]int, error,
|
|
) {
|
|
var (
|
|
urlsQueryParams = make([]any, len(urls))
|
|
resourcesQueryParams = make([]any, 0, (len(urls)*2)+1)
|
|
ids = make([]int, 0, len(urls))
|
|
)
|
|
|
|
for i := range urls {
|
|
urlsQueryParams[i] = urls[i]
|
|
}
|
|
|
|
urlsQuery := `
|
|
INSERT INTO urls (url)
|
|
VALUES ` + joinRepeated("(?)", ",", len(urls)) + `
|
|
ON CONFLICT DO UPDATE SET url=url
|
|
RETURNING id`
|
|
|
|
rows, err := s.db.QueryContext(ctx, urlsQuery, urlsQueryParams...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("inserting into urls: %w", err)
|
|
}
|
|
|
|
for range urls {
|
|
if !rows.Next() {
|
|
return nil, errors.Join(
|
|
errors.New("expected a returned row"), rows.Close(),
|
|
)
|
|
}
|
|
|
|
var id int
|
|
if err := rows.Scan(&id); err != nil {
|
|
return nil, errors.Join(
|
|
fmt.Errorf("scanning return from insert into urls: %w", err),
|
|
rows.Close(),
|
|
)
|
|
}
|
|
|
|
resourcesQueryParams = append(resourcesQueryParams, id, pinned)
|
|
ids = append(ids, id)
|
|
}
|
|
|
|
rows.Close()
|
|
|
|
resourcesQuery := `
|
|
INSERT INTO resources (url_id, pinned)
|
|
VALUES ` + joinRepeated("(?,?)", ",", len(urls)) + `
|
|
ON CONFLICT DO UPDATE SET pinned = ?`
|
|
|
|
resourcesQueryParams = append(resourcesQueryParams, pinned)
|
|
|
|
_, err = s.db.ExecContext(ctx, resourcesQuery, resourcesQueryParams...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("inserting into resources: %w", err)
|
|
}
|
|
|
|
return ids, nil
|
|
}
|
|
|
|
// SetPinned implements the method for the Store interface.
|
|
func (s *SQLiteStore) SetPinned(ctx context.Context, urls []URL) error {
|
|
_, err := s.db.ExecContext(ctx, `UPDATE resources SET pinned = 0`)
|
|
if err != nil {
|
|
return fmt.Errorf("unsetting pinned on all resources: %w", err)
|
|
}
|
|
|
|
if _, err := s.touch(ctx, urls, true); err != nil {
|
|
return fmt.Errorf("pinning resources: %w", err)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Update implements the method for the Store interface.
|
|
func (s *SQLiteStore) Update(
|
|
ctx context.Context,
|
|
now time.Time,
|
|
url URL,
|
|
status ResourceStatus,
|
|
errorString string,
|
|
outgoing []URL,
|
|
) error {
|
|
const resourcesQuery = `
|
|
UPDATE resources
|
|
SET
|
|
status = ?,
|
|
last_checked = ?,
|
|
error_string = ?
|
|
WHERE url_id = (SELECT id FROM urls WHERE url = ?)
|
|
RETURNING url_id`
|
|
|
|
var urlID int
|
|
|
|
err := s.db.QueryRowContext(
|
|
ctx, resourcesQuery, status, now.Unix(), errorString, url,
|
|
).Scan(&urlID)
|
|
if err != nil {
|
|
return fmt.Errorf("inserting into resources: %w", err)
|
|
}
|
|
|
|
_, err = s.db.ExecContext(
|
|
ctx, `DELETE FROM links WHERE from_url_id = ?`, urlID,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting from links: %w", err)
|
|
}
|
|
|
|
if len(outgoing) == 0 {
|
|
return nil
|
|
}
|
|
|
|
outgoingIDs, err := s.touch(ctx, outgoing, false)
|
|
if err != nil {
|
|
return fmt.Errorf("touching outgoing links: %w", err)
|
|
}
|
|
|
|
linksQueryParams := make([]any, 0, len(outgoingIDs)*2)
|
|
for i := range outgoingIDs {
|
|
linksQueryParams = append(linksQueryParams, urlID, outgoingIDs[i])
|
|
}
|
|
|
|
linksQuery := `
|
|
INSERT INTO links (from_url_id, to_url_id)
|
|
VALUES ` + joinRepeated("(?,?)", ",", len(outgoing)) + `
|
|
ON CONFLICT DO NOTHING`
|
|
|
|
_, err = s.db.ExecContext(ctx, linksQuery, linksQueryParams...)
|
|
if err != nil {
|
|
return fmt.Errorf("inserting into links: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GC implements the method for the Store interface.
|
|
func (s *SQLiteStore) GC(ctx context.Context) error {
|
|
const query = `
|
|
WITH orphans AS (
|
|
SELECT url_id FROM resources
|
|
LEFT JOIN links ON (links.to_url_id = resources.url_id)
|
|
WHERE pinned = 0 AND from_url_id IS NULL
|
|
)
|
|
DELETE FROM urls WHERE id IN orphans
|
|
`
|
|
|
|
for {
|
|
res, err := s.db.ExecContext(ctx, query)
|
|
if err != nil {
|
|
return fmt.Errorf("performing delete: %w", err)
|
|
} else if n, err := res.RowsAffected(); err != nil {
|
|
return fmt.Errorf("determining rows affected: %w", err)
|
|
} else if n == 0 {
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func joinRepeated(str, sep string, n int) string {
|
|
res := strings.Repeat(str+sep, n)
|
|
return res[:len(res)-len(sep)]
|
|
}
|