deadlinks/store.go
2023-12-28 15:40:07 +01:00

439 lines
11 KiB
Go

package deadlinks
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"time"
"code.betamike.com/mediocregopher/mediocre-go-lib/miter"
_ "github.com/mattn/go-sqlite3"
migrate "github.com/rubenv/sql-migrate"
)
// Store keeps track of the current status of all discovered Resources, and
// links between them. A Resource which is neither pinned nor linked to from
// another Resource is considered to not exist.
//
// An implementation of Store must be thread-safe.
type Store interface {
// GetByStatus returns all Resources with the given Status.
GetByStatus(ResourceStatus) miter.Iterator[Resource]
// GetURLsByLastChecked returns the URLs of all Resources with LastChecked
// values older than the given timestamp, or which have never been checked.
GetURLsByLastChecked(olderThan time.Time) miter.Iterator[URL]
// SetPinned overwrites the set of pinned URLs with the given one.
SetPinned(context.Context, []URL) error
// Update updates the Resource identified by the given URL with the given
// arguments.
//
// Update returns an error if the URL has not been pinned nor referenced as
// an outgoing URL of a different Resource.
Update(
ctx context.Context,
now time.Time,
url URL,
status ResourceStatus,
errorString string,
outgoing []URL,
) error
// GC will garbage collect the store, removing any orphaned Resources.
GC(context.Context) error
}
var migrations = &migrate.MemoryMigrationSource{Migrations: []*migrate.Migration{
{
Id: "1",
Up: []string{
`CREATE TABLE urls (
id INTEGER NOT NULL PRIMARY KEY,
url TEXT NOT NULL,
UNIQUE(url)
)`,
`CREATE TABLE resources (
url_id INTEGER NOT NULL PRIMARY KEY,
status INTEGER NOT NULL DEFAULT 0,
pinned INTEGER NOT NULL,
last_checked INTEGER NOT NULL DEFAULT 0,
error_string TEXT NOT NULL DEFAULT '',
FOREIGN KEY(url_id) REFERENCES urls(id) ON DELETE CASCADE
)`,
`CREATE TABLE links (
from_url_id INTEGER NOT NULL,
to_url_id INTEGER NOT NULL,
FOREIGN KEY(from_url_id) REFERENCES urls(id) ON DELETE CASCADE,
FOREIGN KEY(to_url_id) REFERENCES urls(id) ON DELETE CASCADE,
PRIMARY KEY(from_url_id, to_url_id)
)`,
`CREATE INDEX links_outgoing_idx ON links (to_url_id)`,
},
},
}}
// SQLiteSQLiteStoreOpts are optional fields which can be provided to NewSQLiteStore.
// A nil SQLiteSQLiteStoreOpts is equivalent to an empty one.
type SQLiteStoreOpts struct {
// Path to the database file to use.
//
// Defaults to ":memory:", indicating an in-memory database will be used.
Path string
}
func (o *SQLiteStoreOpts) withDefaults() *SQLiteStoreOpts {
if o == nil {
o = new(SQLiteStoreOpts)
}
if o.Path == "" {
o.Path = ":memory:"
}
return o
}
type SQLiteStore struct {
db *sql.DB
}
var _ Store = (*SQLiteStore)(nil)
// NewSQLiteStore returns a Store implementation which uses an in-memory SQLite
// db.
func NewSQLiteStore(o *SQLiteStoreOpts) *SQLiteStore {
o = o.withDefaults()
db, err := sql.Open("sqlite3", o.Path+"?_foreign_keys=1")
if err != nil {
panic(fmt.Errorf("opening sqlite in memory: %w", err))
}
if _, err := migrate.Exec(db, "sqlite3", migrations, migrate.Up); err != nil {
panic(fmt.Errorf("running migrations: %w", err))
}
return &SQLiteStore{db}
}
// Close cleans up all resources held by the SQLiteStore store, if any. It must
// be the final method call to the SQLiteStore.
func (s *SQLiteStore) Close() error {
return s.db.Close()
}
// GetByStatus implements the method for the Store interface.
func (s *SQLiteStore) GetByStatus(status ResourceStatus) miter.Iterator[Resource] {
const query = `
WITH
incoming(url_id, urls) AS (
SELECT
to_url_id,
GROUP_CONCAT(url, char(0))
FROM links
JOIN urls ON (urls.id = links.from_url_id)
GROUP BY to_url_id
),
outgoing(url_id, urls) AS (
SELECT
from_url_id,
GROUP_CONCAT(url, char(0))
FROM links
JOIN urls ON (urls.id = links.to_url_id)
GROUP BY from_url_id
)
SELECT
url,
status,
pinned,
last_checked,
error_string,
incoming.urls,
outgoing.urls
FROM resources
JOIN urls ON (urls.id = resources.url_id)
LEFT JOIN incoming ON (incoming.url_id = resources.url_id)
LEFT JOIN outgoing ON (outgoing.url_id = resources.url_id)
WHERE status = ?
AND (pinned OR incoming.urls IS NOT NULL)`
return miter.Lazily(func(ctx context.Context) (miter.Iterator[Resource], error) {
rows, err := s.db.QueryContext(ctx, query, status)
if err != nil {
return nil, fmt.Errorf("executing query: %w", err)
}
return miter.FromFunc(func(ctx context.Context) (Resource, error) {
var (
r Resource
lastChecked int64
incoming, outgoing sql.NullString
)
if !rows.Next() {
return Resource{}, errors.Join(rows.Close(), miter.ErrEnd)
}
if err := rows.Scan(
&r.URL,
&r.Status,
&r.Pinned,
&lastChecked,
&r.ErrorString,
&incoming,
&outgoing,
); err != nil {
return Resource{}, errors.Join(
rows.Close(), fmt.Errorf("scanning row: %w", err),
)
}
if lastChecked != 0 {
r.LastChecked = time.Unix(lastChecked, 0).UTC()
}
if incoming.String != "" {
if r.IncomingLinkURLs, err = parseURLs(
strings.Split(incoming.String, "\x00"),
); err != nil {
return Resource{}, errors.Join(
rows.Close(), fmt.Errorf("parsing incoming links: %w", err),
)
}
}
if outgoing.String != "" {
if r.OutgoingLinkURLs, err = parseURLs(
strings.Split(outgoing.String, "\x00"),
); err != nil {
return Resource{}, errors.Join(
rows.Close(), fmt.Errorf("parsing outgoing links: %w", err),
)
}
}
return r, nil
}), nil
})
}
// GetURLsByLastChecked implements the method for the Store interface.
func (s *SQLiteStore) GetURLsByLastChecked(
olderThan time.Time,
) miter.Iterator[URL] {
const query = `
WITH
incoming(url_id, urls) AS (
SELECT to_url_id, COUNT(1)
FROM links
GROUP BY to_url_id
)
SELECT url
FROM resources
JOIN urls ON (urls.id = resources.url_id)
LEFT JOIN incoming ON (incoming.url_id = resources.url_id)
WHERE last_checked < ?
AND (pinned OR incoming.urls IS NOT NULL)`
return miter.Lazily(func(ctx context.Context) (miter.Iterator[URL], error) {
rows, err := s.db.QueryContext(ctx, query, olderThan.Unix())
if err != nil {
return nil, fmt.Errorf("executing query: %w", err)
}
return miter.FromFunc(func(ctx context.Context) (URL, error) {
if !rows.Next() {
return "", errors.Join(rows.Close(), miter.ErrEnd)
}
var urlStr string
if err := rows.Scan(&urlStr); err != nil {
return "", errors.Join(
rows.Close(), fmt.Errorf("scanning url: %w", err),
)
}
url, err := ParseURL(urlStr)
if err != nil {
return "", errors.Join(
rows.Close(),
fmt.Errorf("parsing url %q from db: %w", urlStr, err),
)
}
return url, nil
}), nil
})
}
func (s *SQLiteStore) touch(ctx context.Context, urls []URL, pinned bool) (
[]int, error,
) {
var (
urlsQueryParams = make([]any, len(urls))
resourcesQueryParams = make([]any, 0, (len(urls)*2)+1)
ids = make([]int, 0, len(urls))
)
for i := range urls {
urlsQueryParams[i] = urls[i]
}
urlsQuery := `
INSERT INTO urls (url)
VALUES ` + joinRepeated("(?)", ",", len(urls)) + `
ON CONFLICT DO UPDATE SET url=url
RETURNING id`
rows, err := s.db.QueryContext(ctx, urlsQuery, urlsQueryParams...)
if err != nil {
return nil, fmt.Errorf("inserting into urls: %w", err)
}
for range urls {
if !rows.Next() {
return nil, errors.Join(
errors.New("expected a returned row"), rows.Close(),
)
}
var id int
if err := rows.Scan(&id); err != nil {
return nil, errors.Join(
fmt.Errorf("scanning return from insert into urls: %w", err),
rows.Close(),
)
}
resourcesQueryParams = append(resourcesQueryParams, id, pinned)
ids = append(ids, id)
}
rows.Close()
resourcesQuery := `
INSERT INTO resources (url_id, pinned)
VALUES ` + joinRepeated("(?,?)", ",", len(urls)) + `
ON CONFLICT DO UPDATE SET pinned = ?`
resourcesQueryParams = append(resourcesQueryParams, pinned)
_, err = s.db.ExecContext(ctx, resourcesQuery, resourcesQueryParams...)
if err != nil {
return nil, fmt.Errorf("inserting into resources: %w", err)
}
return ids, nil
}
// SetPinned implements the method for the Store interface.
func (s *SQLiteStore) SetPinned(ctx context.Context, urls []URL) error {
_, err := s.db.ExecContext(ctx, `UPDATE resources SET pinned = 0`)
if err != nil {
return fmt.Errorf("unsetting pinned on all resources: %w", err)
}
if _, err := s.touch(ctx, urls, true); err != nil {
return fmt.Errorf("pinning resources: %w", err)
}
return err
}
// Update implements the method for the Store interface.
func (s *SQLiteStore) Update(
ctx context.Context,
now time.Time,
url URL,
status ResourceStatus,
errorString string,
outgoing []URL,
) error {
const resourcesQuery = `
UPDATE resources
SET
status = ?,
last_checked = ?,
error_string = ?
WHERE url_id = (SELECT id FROM urls WHERE url = ?)
RETURNING url_id`
var urlID int
err := s.db.QueryRowContext(
ctx, resourcesQuery, status, now.Unix(), errorString, url,
).Scan(&urlID)
if err != nil {
return fmt.Errorf("inserting into resources: %w", err)
}
_, err = s.db.ExecContext(
ctx, `DELETE FROM links WHERE from_url_id = ?`, urlID,
)
if err != nil {
return fmt.Errorf("deleting from links: %w", err)
}
if len(outgoing) == 0 {
return nil
}
outgoingIDs, err := s.touch(ctx, outgoing, false)
if err != nil {
return fmt.Errorf("touching outgoing links: %w", err)
}
linksQueryParams := make([]any, 0, len(outgoingIDs)*2)
for i := range outgoingIDs {
linksQueryParams = append(linksQueryParams, urlID, outgoingIDs[i])
}
linksQuery := `
INSERT INTO links (from_url_id, to_url_id)
VALUES ` + joinRepeated("(?,?)", ",", len(outgoing)) + `
ON CONFLICT DO NOTHING`
_, err = s.db.ExecContext(ctx, linksQuery, linksQueryParams...)
if err != nil {
return fmt.Errorf("inserting into links: %w", err)
}
return nil
}
// GC implements the method for the Store interface.
func (s *SQLiteStore) GC(ctx context.Context) error {
const query = `
WITH orphans AS (
SELECT url_id FROM resources
LEFT JOIN links ON (links.to_url_id = resources.url_id)
WHERE pinned = 0 AND from_url_id IS NULL
)
DELETE FROM urls WHERE id IN orphans
`
for {
res, err := s.db.ExecContext(ctx, query)
if err != nil {
return fmt.Errorf("performing delete: %w", err)
} else if n, err := res.RowsAffected(); err != nil {
return fmt.Errorf("determining rows affected: %w", err)
} else if n == 0 {
return nil
}
}
}
func joinRepeated(str, sep string, n int) string {
res := strings.Repeat(str+sep, n)
return res[:len(res)-len(sep)]
}