From 68d6f4bcdc718ca57418d8cea329e79e189218ee Mon Sep 17 00:00:00 2001 From: Jan Dittberner Date: Sat, 21 May 2022 13:51:17 +0200 Subject: [PATCH] Reimplement decision close job --- cmd/boardvoting/config.go | 28 ++--- cmd/boardvoting/jobs.go | 83 +++++++++++--- cmd/boardvoting/main.go | 2 +- cmd/boardvoting/notifications.go | 44 +++++--- internal/models/decisions.go | 175 ++++++++++++++++++++++++++++-- internal/models/decisions_test.go | 42 ++++++- 6 files changed, 319 insertions(+), 55 deletions(-) diff --git a/cmd/boardvoting/config.go b/cmd/boardvoting/config.go index 2adb35a..16434e8 100644 --- a/cmd/boardvoting/config.go +++ b/cmd/boardvoting/config.go @@ -43,6 +43,8 @@ type mailConfig struct { SMTPHost string `yaml:"host"` SMTPPort int `yaml:"port"` NotificationSenderAddress string `yaml:"notification_sender_address"` + NoticeMailAddress string `yaml:"notice_mail_address"` + VoteNoticeMailAddress string `yaml:"vote_notice_mail_address"` } type httpTimeoutConfig struct { @@ -53,19 +55,17 @@ type httpTimeoutConfig struct { } type Config struct { - NoticeMailAddress string `yaml:"notice_mail_address"` - VoteNoticeMailAddress string `yaml:"vote_notice_mail_address"` - DatabaseFile string `yaml:"database_file"` - ClientCACertificates string `yaml:"client_ca_certificates"` - ServerCert string `yaml:"server_certificate"` - ServerKey string `yaml:"server_key"` - CookieSecret string `yaml:"cookie_secret"` - CsrfKey string `yaml:"csrf_key"` - BaseURL string `yaml:"base_url"` - HTTPAddress string `yaml:"http_address,omitempty"` - HTTPSAddress string `yaml:"https_address,omitempty"` - MailConfig mailConfig `yaml:"mail_server"` - Timeouts httpTimeoutConfig `yaml:"timeouts,omitempty"` + DatabaseFile string `yaml:"database_file"` + ClientCACertificates string `yaml:"client_ca_certificates"` + ServerCert string `yaml:"server_certificate"` + ServerKey string `yaml:"server_key"` + CookieSecret string `yaml:"cookie_secret"` + CsrfKey string `yaml:"csrf_key"` + BaseURL string `yaml:"base_url"` + HTTPAddress string `yaml:"http_address,omitempty"` + HTTPSAddress string `yaml:"https_address,omitempty"` + MailConfig *mailConfig `yaml:"mail_server"` + Timeouts *httpTimeoutConfig `yaml:"timeouts,omitempty"` } type configKey int @@ -84,7 +84,7 @@ func parseConfig(ctx context.Context, configFile string) (context.Context, error config := &Config{ HTTPAddress: "127.0.0.1:8000", HTTPSAddress: "127.0.0.1:8433", - Timeouts: httpTimeoutConfig{ + Timeouts: &httpTimeoutConfig{ Idle: httpIdleTimeout, ReadHeader: httpReadHeaderTimeout, Read: httpReadTimeout, diff --git a/cmd/boardvoting/jobs.go b/cmd/boardvoting/jobs.go index 6d56bfc..99d20d3 100644 --- a/cmd/boardvoting/jobs.go +++ b/cmd/boardvoting/jobs.go @@ -115,7 +115,6 @@ func (r *RemindVotersJob) Stop() { func (app *application) NewRemindVotersJob( rescheduleChannel chan Job, - notificationChannel chan NotificationMail, ) Job { return &RemindVotersJob{ infoLog: app.infoLog, @@ -123,31 +122,85 @@ func (app *application) NewRemindVotersJob( voters: app.voters, decisions: app.decisions, reschedule: rescheduleChannel, - notify: notificationChannel, + notify: app.mailNotifier.notifyChannel, } } -type closeDecisionsJob struct{} +type closeDecisionsJob struct { + timer *time.Timer + infoLog *log.Logger + errorLog *log.Logger + decisions *models.DecisionModel + reschedule chan Job + notify chan NotificationMail +} func (c *closeDecisionsJob) Schedule() { - // TODO implement me - panic("implement me") + var ( + nextDue *time.Time + err error + ) + + nextDue, err = c.decisions.GetNextPendingDecisionDue() + if err != nil { + c.errorLog.Printf("could not get next pending due date") + + c.Stop() + + return + } + + if nextDue == nil { + c.infoLog.Printf("no next planned execution of CloseDecisionsJob") + c.Stop() + + return + } + + c.infoLog.Printf("scheduling CloseDecisionsJob for %s", nextDue) + when := time.Until(nextDue.Add(time.Second)) + + if c.timer == nil { + c.timer = time.AfterFunc(when, c.Run) + + return + } + + c.timer.Reset(when) } func (c *closeDecisionsJob) Run() { - // TODO implement me - panic("implement me") + c.infoLog.Printf("running CloseDecisionsJob") + + results, err := c.decisions.CloseDecisions() + if err != nil { + c.errorLog.Printf("closing decisions failed: %v", err) + } + + for _, res := range results { + c.notify <- &ClosedDecisionNotification{decision: res} + } + + c.reschedule <- c } func (c *closeDecisionsJob) Stop() { - // TODO implement me - panic("implement me") + if c.timer != nil { + c.timer.Stop() + c.timer = nil + } } -func NewCloseDecisionsJob() Job { - // TODO implement real job - - return &closeDecisionsJob{} +func (app *application) NewCloseDecisionsJob( + rescheduleChannel chan Job, +) Job { + return &closeDecisionsJob{ + infoLog: app.infoLog, + errorLog: app.errorLog, + decisions: app.decisions, + reschedule: rescheduleChannel, + notify: app.mailNotifier.notifyChannel, + } } type JobScheduler struct { @@ -167,8 +220,8 @@ func (app *application) NewJobScheduler() { rescheduleChannel: rescheduleChannel, } - app.jobScheduler.addJob(NewCloseDecisionsJob()) - app.jobScheduler.addJob(app.NewRemindVotersJob(rescheduleChannel, app.mailNotifier.notifyChannel)) + app.jobScheduler.addJob(app.NewCloseDecisionsJob(rescheduleChannel)) + app.jobScheduler.addJob(app.NewRemindVotersJob(rescheduleChannel)) } func (js *JobScheduler) Schedule(quitChannel chan struct{}) { diff --git a/cmd/boardvoting/main.go b/cmd/boardvoting/main.go index cddaac3..6c651a1 100644 --- a/cmd/boardvoting/main.go +++ b/cmd/boardvoting/main.go @@ -47,7 +47,7 @@ type application struct { ctx context.Context jobScheduler *JobScheduler mailNotifier *MailNotifier - mailConfig mailConfig + mailConfig *mailConfig baseURL string } diff --git a/cmd/boardvoting/notifications.go b/cmd/boardvoting/notifications.go index 8952627..597199a 100644 --- a/cmd/boardvoting/notifications.go +++ b/cmd/boardvoting/notifications.go @@ -12,11 +12,6 @@ import ( "gopkg.in/mail.v2" ) -type headerData struct { - name string - value []string -} - type recipientData struct { field, address, name string } @@ -25,12 +20,12 @@ type NotificationContent struct { template string data interface{} subject string - headers []headerData + headers map[string][]string recipients []recipientData } type NotificationMail interface { - GetNotificationContent() *NotificationContent + GetNotificationContent(*mailConfig) *NotificationContent } type MailNotifier struct { @@ -53,7 +48,7 @@ func (app *application) StartMailNotifier() { for { select { case notification := <-app.mailNotifier.notifyChannel: - content := notification.GetNotificationContent() + content := notification.GetNotificationContent(app.mailConfig) mailText, err := content.buildMail(app.baseURL) if err != nil { @@ -63,6 +58,8 @@ func (app *application) StartMailNotifier() { } m := mail.NewMessage() + + m.SetHeaders(content.headers) m.SetAddressHeader("From", app.mailNotifier.senderAddress, "CAcert board voting system") for _, recipient := range content.recipients { @@ -71,10 +68,6 @@ func (app *application) StartMailNotifier() { m.SetHeader("Subject", content.subject) - for _, header := range content.headers { - m.SetHeader(header.name, header.value...) - } - m.SetBody("text/plain", mailText.String()) if err = app.mailNotifier.dialer.DialAndSend(m); err != nil { @@ -120,7 +113,7 @@ type RemindVoterNotification struct { decisions []models.Decision } -func (r RemindVoterNotification) GetNotificationContent() *NotificationContent { +func (r RemindVoterNotification) GetNotificationContent(*mailConfig) *NotificationContent { return &NotificationContent{ template: "remind_voter_mail.txt", data: struct { @@ -131,3 +124,28 @@ func (r RemindVoterNotification) GetNotificationContent() *NotificationContent { recipients: []recipientData{{"To", r.voter.Reminder, r.voter.Name}}, } } + +type ClosedDecisionNotification struct { + decision *models.ClosedDecision +} + +func (c *ClosedDecisionNotification) GetNotificationContent(mc *mailConfig) *NotificationContent { + return &NotificationContent{ + template: "closed_motion_mail.txt", + data: c.decision, + subject: fmt.Sprintf("Re: %s - %s - finalised", c.decision.Decision.Tag, c.decision.Decision.Title), + headers: c.getHeaders(), + recipients: []recipientData{c.getRecipient(mc)}, + } +} + +func (c *ClosedDecisionNotification) getHeaders() map[string][]string { + return map[string][]string{ + "References": {fmt.Sprintf("<%s>", c.decision.Decision.Tag)}, + "In-Reply-To": {fmt.Sprintf("<%s>", c.decision.Decision.Tag)}, + } +} + +func (c *ClosedDecisionNotification) getRecipient(mc *mailConfig) recipientData { + return recipientData{field: "To", address: mc.NoticeMailAddress, name: "CAcert board mailing list"} +} diff --git a/internal/models/decisions.go b/internal/models/decisions.go index 1a42427..87b49ef 100644 --- a/internal/models/decisions.go +++ b/internal/models/decisions.go @@ -18,6 +18,8 @@ limitations under the License. package models import ( + "database/sql" + "errors" "fmt" "log" "time" @@ -108,6 +110,30 @@ func (v VoteChoice) String() string { return unknownVariant } +type VoteSums struct { + Ayes, Nayes, Abstains int +} + +func (v *VoteSums) VoteCount() int { + return v.Ayes + v.Nayes + v.Abstains +} + +func (v *VoteSums) TotalVotes() int { + return v.Ayes + v.Nayes +} + +func (v *VoteSums) CalculateResult(quorum int, majority float32) (VoteStatus, string) { + if v.VoteCount() < quorum { + return voteStatusDeclined, fmt.Sprintf("Needed quorum of %d has not been reached.", quorum) + } + + if (float32(v.Ayes) / float32(v.TotalVotes())) < majority { + return voteStatusDeclined, fmt.Sprintf("Needed majority of %0.2f%% has not been reached.", majority) + } + + return voteStatusApproved, "Quorum and majority have been reached" +} + type Decision struct { ID int64 `db:"id"` Proposed time.Time @@ -123,6 +149,12 @@ type Decision struct { VoteType VoteType } +type ClosedDecision struct { + Decision *Decision + VoteSums *VoteSums + Reasoning string +} + type DecisionModel struct { DB *sqlx.DB InfoLog *log.Logger @@ -164,14 +196,23 @@ VALUES (:proposed, :proponent, :title, :content, :votetype, :status, :due, :prop return id, nil } -func (m *DecisionModel) CloseDecisions() error { - rows, err := m.DB.NamedQuery(` +func (m *DecisionModel) CloseDecisions() ([]*ClosedDecision, error) { + tx, err := m.DB.Beginx() + if err != nil { + return nil, fmt.Errorf("could not start transaction: %w", err) + } + + defer func(tx *sqlx.Tx) { + _ = tx.Rollback() + }(tx) + + rows, err := tx.NamedQuery(` SELECT decisions.id, decisions.tag, decisions.proponent, decisions.proposed, decisions.title, decisions.content, decisions.votetype, decisions.status, decisions.due, decisions.modified FROM decisions WHERE decisions.status=0 AND :now > due`, struct{ Now time.Time }{Now: time.Now().UTC()}) if err != nil { - return fmt.Errorf("fetching closable decisions failed: %w", err) + return nil, fmt.Errorf("fetching closable decisions failed: %w", err) } defer func(rows *sqlx.Rows) { @@ -183,31 +224,143 @@ WHERE decisions.status=0 AND :now > due`, struct{ Now time.Time }{Now: time.Now for rows.Next() { decision := &Decision{} if err = rows.StructScan(decision); err != nil { - return fmt.Errorf("scanning row failed: %w", err) + return nil, fmt.Errorf("scanning row failed: %w", err) } if rows.Err() != nil { - return fmt.Errorf("row error: %w", err) + return nil, fmt.Errorf("row error: %w", err) } decisions = append(decisions, decision) } + results := make([]*ClosedDecision, 0, len(decisions)) + + var decisionResult *ClosedDecision + for _, decision := range decisions { m.InfoLog.Printf("found closable decision %s", decision.Tag) - if err = m.Close(decision.Tag); err != nil { - return fmt.Errorf("closing decision %s failed: %w", decision.Tag, err) + if decisionResult, err = m.CloseDecision(tx, decision); err != nil { + return nil, fmt.Errorf("closing decision %s failed: %w", decision.Tag, err) } + + results = append(results, decisionResult) + } + + if err = tx.Commit(); err != nil { + return nil, fmt.Errorf("could not commit transaction: %w", err) } - return nil + return results, nil } -func (m *DecisionModel) Close(tag string) error { - panic("not implemented") +func (m *DecisionModel) CloseDecision(tx *sqlx.Tx, d *Decision) (*ClosedDecision, error) { + quorum, majority := d.VoteType.QuorumAndMajority() + + var ( + voteSums *VoteSums + err error + reasoning string + ) + + if voteSums, err = m.GetVoteSums(tx, d); err != nil { + return nil, fmt.Errorf("getting vote sums failed: %w", err) + } + + d.Status, reasoning = voteSums.CalculateResult(quorum, majority) + + result, err := m.DB.NamedExec( + `UPDATE decisions SET status=:status, modified=CURRENT_TIMESTAMP WHERE id=:id`, + d, + ) + + if err != nil { + return nil, fmt.Errorf("could not execute update query: %w", err) + } + + affectedRows, err := result.RowsAffected() + if err != nil { + return nil, fmt.Errorf("could not get affected rows count: %w", err) + } + + if affectedRows != 1 { + return nil, fmt.Errorf("unexpected number of rows %d instead of 1", affectedRows) + } + + m.InfoLog.Printf("decision %s closed with result %s: reasoning '%s'", d.Tag, d.Status, reasoning) + + return &ClosedDecision{d, voteSums, reasoning}, nil } -func (m *DecisionModel) FindUnVotedDecisionsForVoter(v *Voter) ([]Decision, error) { +func (m *DecisionModel) FindUnVotedDecisionsForVoter(_ *Voter) ([]Decision, error) { panic("not implemented") } + +func (m *DecisionModel) GetVoteSums(tx *sqlx.Tx, d *Decision) (*VoteSums, error) { + voteRows, err := tx.NamedQuery( + `SELECT vote, COUNT(vote) FROM votes WHERE decision=$1 GROUP BY vote`, + d.ID, + ) + if err != nil { + return nil, fmt.Errorf("fetching vote sums for motion %s failed: %w", d.Tag, err) + } + + defer func(voteRows *sqlx.Rows) { + _ = voteRows.Close() + }(voteRows) + + sums := &VoteSums{} + + for voteRows.Next() { + var ( + vote VoteChoice + count int + ) + + if err = voteRows.Err(); err != nil { + return nil, fmt.Errorf("could not fetch vote sums for motion %s: %w", d.Tag, err) + } + + if err = voteRows.Scan(&vote, &count); err != nil { + return nil, fmt.Errorf("could not parse row for vote sums of motion %s: %w", d.Tag, err) + } + + switch vote { + case voteAye: + sums.Ayes = count + case voteNaye: + sums.Nayes = count + case voteAbstain: + sums.Abstains = count + } + } + + return sums, nil +} + +func (m *DecisionModel) GetNextPendingDecisionDue() (*time.Time, error) { + row := m.DB.QueryRow(`SELECT due FROM decisions WHERE status=0 ORDER BY due LIMIT 1`, nil) + + if row == nil { + return nil, errors.New("no row returned") + } + + if err := row.Err(); err != nil { + return nil, fmt.Errorf("could not retrieve row for next pending decision: %w", err) + } + + var due time.Time + + if err := row.Scan(&due); err != nil { + if errors.Is(err, sql.ErrNoRows) { + m.InfoLog.Print("no pending decisions") + + return nil, nil + } + + return nil, fmt.Errorf("parsing result failed: %w", err) + } + + return &due, nil +} diff --git a/internal/models/decisions_test.go b/internal/models/decisions_test.go index 8ef7f9a..c237983 100644 --- a/internal/models/decisions_test.go +++ b/internal/models/decisions_test.go @@ -15,7 +15,9 @@ import ( "github.com/stretchr/testify/require" ) -func TestDecisionModel_Create(t *testing.T) { +func prepareTestDb(t *testing.T) (*sqlx.DB, *log.Logger) { + t.Helper() + testDir := t.TempDir() db, err := sql.Open("sqlite3", path.Join(testDir, "test.sqlite")) @@ -28,6 +30,12 @@ func TestDecisionModel_Create(t *testing.T) { err = internal.InitializeDb(dbx.DB, logger) require.NoError(t, err) + return dbx, logger +} + +func TestDecisionModel_Create(t *testing.T) { + dbx, logger := prepareTestDb(t) + dm := models.DecisionModel{DB: dbx, InfoLog: logger} v := &models.Voter{ @@ -40,3 +48,35 @@ func TestDecisionModel_Create(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, id) } + +func TestDecisionModel_GetNextPendingDecisionDue(t *testing.T) { + dbx, logger := prepareTestDb(t) + + dm := models.DecisionModel{DB: dbx, InfoLog: logger} + + var ( + nextDue *time.Time + err error + ) + + nextDue, err = dm.GetNextPendingDecisionDue() + assert.NoError(t, err) + assert.Empty(t, nextDue) + + v := &models.Voter{ + ID: 1, // sqlite does not check referential integrity. Might fail with a foreign key index. + Name: "test voter", + Reminder: "test+voter@example.com", + } + + due := time.Now().Add(10 * time.Minute) + + _, err = dm.Create(v, models.VoteTypeMotion, "test motion", "I move that we should test more", time.Now(), due) + require.NoError(t, err) + + nextDue, err = dm.GetNextPendingDecisionDue() + assert.NoError(t, err) + assert.NotEmpty(t, nextDue) + + assert.Equal(t, due.UTC(), *nextDue) +}