Reimplement decision close job

main
Jan Dittberner 2 years ago
parent 01b95f2253
commit 68d6f4bcdc

@ -43,6 +43,8 @@ type mailConfig struct {
SMTPHost string `yaml:"host"`
SMTPPort int `yaml:"port"`
NotificationSenderAddress string `yaml:"notification_sender_address"`
NoticeMailAddress string `yaml:"notice_mail_address"`
VoteNoticeMailAddress string `yaml:"vote_notice_mail_address"`
}
type httpTimeoutConfig struct {
@ -53,19 +55,17 @@ type httpTimeoutConfig struct {
}
type Config struct {
NoticeMailAddress string `yaml:"notice_mail_address"`
VoteNoticeMailAddress string `yaml:"vote_notice_mail_address"`
DatabaseFile string `yaml:"database_file"`
ClientCACertificates string `yaml:"client_ca_certificates"`
ServerCert string `yaml:"server_certificate"`
ServerKey string `yaml:"server_key"`
CookieSecret string `yaml:"cookie_secret"`
CsrfKey string `yaml:"csrf_key"`
BaseURL string `yaml:"base_url"`
HTTPAddress string `yaml:"http_address,omitempty"`
HTTPSAddress string `yaml:"https_address,omitempty"`
MailConfig mailConfig `yaml:"mail_server"`
Timeouts httpTimeoutConfig `yaml:"timeouts,omitempty"`
DatabaseFile string `yaml:"database_file"`
ClientCACertificates string `yaml:"client_ca_certificates"`
ServerCert string `yaml:"server_certificate"`
ServerKey string `yaml:"server_key"`
CookieSecret string `yaml:"cookie_secret"`
CsrfKey string `yaml:"csrf_key"`
BaseURL string `yaml:"base_url"`
HTTPAddress string `yaml:"http_address,omitempty"`
HTTPSAddress string `yaml:"https_address,omitempty"`
MailConfig *mailConfig `yaml:"mail_server"`
Timeouts *httpTimeoutConfig `yaml:"timeouts,omitempty"`
}
type configKey int
@ -84,7 +84,7 @@ func parseConfig(ctx context.Context, configFile string) (context.Context, error
config := &Config{
HTTPAddress: "127.0.0.1:8000",
HTTPSAddress: "127.0.0.1:8433",
Timeouts: httpTimeoutConfig{
Timeouts: &httpTimeoutConfig{
Idle: httpIdleTimeout,
ReadHeader: httpReadHeaderTimeout,
Read: httpReadTimeout,

@ -115,7 +115,6 @@ func (r *RemindVotersJob) Stop() {
func (app *application) NewRemindVotersJob(
rescheduleChannel chan Job,
notificationChannel chan NotificationMail,
) Job {
return &RemindVotersJob{
infoLog: app.infoLog,
@ -123,31 +122,85 @@ func (app *application) NewRemindVotersJob(
voters: app.voters,
decisions: app.decisions,
reschedule: rescheduleChannel,
notify: notificationChannel,
notify: app.mailNotifier.notifyChannel,
}
}
type closeDecisionsJob struct{}
type closeDecisionsJob struct {
timer *time.Timer
infoLog *log.Logger
errorLog *log.Logger
decisions *models.DecisionModel
reschedule chan Job
notify chan NotificationMail
}
func (c *closeDecisionsJob) Schedule() {
// TODO implement me
panic("implement me")
var (
nextDue *time.Time
err error
)
nextDue, err = c.decisions.GetNextPendingDecisionDue()
if err != nil {
c.errorLog.Printf("could not get next pending due date")
c.Stop()
return
}
if nextDue == nil {
c.infoLog.Printf("no next planned execution of CloseDecisionsJob")
c.Stop()
return
}
c.infoLog.Printf("scheduling CloseDecisionsJob for %s", nextDue)
when := time.Until(nextDue.Add(time.Second))
if c.timer == nil {
c.timer = time.AfterFunc(when, c.Run)
return
}
c.timer.Reset(when)
}
func (c *closeDecisionsJob) Run() {
// TODO implement me
panic("implement me")
c.infoLog.Printf("running CloseDecisionsJob")
results, err := c.decisions.CloseDecisions()
if err != nil {
c.errorLog.Printf("closing decisions failed: %v", err)
}
for _, res := range results {
c.notify <- &ClosedDecisionNotification{decision: res}
}
c.reschedule <- c
}
func (c *closeDecisionsJob) Stop() {
// TODO implement me
panic("implement me")
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
}
func NewCloseDecisionsJob() Job {
// TODO implement real job
return &closeDecisionsJob{}
func (app *application) NewCloseDecisionsJob(
rescheduleChannel chan Job,
) Job {
return &closeDecisionsJob{
infoLog: app.infoLog,
errorLog: app.errorLog,
decisions: app.decisions,
reschedule: rescheduleChannel,
notify: app.mailNotifier.notifyChannel,
}
}
type JobScheduler struct {
@ -167,8 +220,8 @@ func (app *application) NewJobScheduler() {
rescheduleChannel: rescheduleChannel,
}
app.jobScheduler.addJob(NewCloseDecisionsJob())
app.jobScheduler.addJob(app.NewRemindVotersJob(rescheduleChannel, app.mailNotifier.notifyChannel))
app.jobScheduler.addJob(app.NewCloseDecisionsJob(rescheduleChannel))
app.jobScheduler.addJob(app.NewRemindVotersJob(rescheduleChannel))
}
func (js *JobScheduler) Schedule(quitChannel chan struct{}) {

@ -47,7 +47,7 @@ type application struct {
ctx context.Context
jobScheduler *JobScheduler
mailNotifier *MailNotifier
mailConfig mailConfig
mailConfig *mailConfig
baseURL string
}

@ -12,11 +12,6 @@ import (
"gopkg.in/mail.v2"
)
type headerData struct {
name string
value []string
}
type recipientData struct {
field, address, name string
}
@ -25,12 +20,12 @@ type NotificationContent struct {
template string
data interface{}
subject string
headers []headerData
headers map[string][]string
recipients []recipientData
}
type NotificationMail interface {
GetNotificationContent() *NotificationContent
GetNotificationContent(*mailConfig) *NotificationContent
}
type MailNotifier struct {
@ -53,7 +48,7 @@ func (app *application) StartMailNotifier() {
for {
select {
case notification := <-app.mailNotifier.notifyChannel:
content := notification.GetNotificationContent()
content := notification.GetNotificationContent(app.mailConfig)
mailText, err := content.buildMail(app.baseURL)
if err != nil {
@ -63,6 +58,8 @@ func (app *application) StartMailNotifier() {
}
m := mail.NewMessage()
m.SetHeaders(content.headers)
m.SetAddressHeader("From", app.mailNotifier.senderAddress, "CAcert board voting system")
for _, recipient := range content.recipients {
@ -71,10 +68,6 @@ func (app *application) StartMailNotifier() {
m.SetHeader("Subject", content.subject)
for _, header := range content.headers {
m.SetHeader(header.name, header.value...)
}
m.SetBody("text/plain", mailText.String())
if err = app.mailNotifier.dialer.DialAndSend(m); err != nil {
@ -120,7 +113,7 @@ type RemindVoterNotification struct {
decisions []models.Decision
}
func (r RemindVoterNotification) GetNotificationContent() *NotificationContent {
func (r RemindVoterNotification) GetNotificationContent(*mailConfig) *NotificationContent {
return &NotificationContent{
template: "remind_voter_mail.txt",
data: struct {
@ -131,3 +124,28 @@ func (r RemindVoterNotification) GetNotificationContent() *NotificationContent {
recipients: []recipientData{{"To", r.voter.Reminder, r.voter.Name}},
}
}
type ClosedDecisionNotification struct {
decision *models.ClosedDecision
}
func (c *ClosedDecisionNotification) GetNotificationContent(mc *mailConfig) *NotificationContent {
return &NotificationContent{
template: "closed_motion_mail.txt",
data: c.decision,
subject: fmt.Sprintf("Re: %s - %s - finalised", c.decision.Decision.Tag, c.decision.Decision.Title),
headers: c.getHeaders(),
recipients: []recipientData{c.getRecipient(mc)},
}
}
func (c *ClosedDecisionNotification) getHeaders() map[string][]string {
return map[string][]string{
"References": {fmt.Sprintf("<%s>", c.decision.Decision.Tag)},
"In-Reply-To": {fmt.Sprintf("<%s>", c.decision.Decision.Tag)},
}
}
func (c *ClosedDecisionNotification) getRecipient(mc *mailConfig) recipientData {
return recipientData{field: "To", address: mc.NoticeMailAddress, name: "CAcert board mailing list"}
}

@ -18,6 +18,8 @@ limitations under the License.
package models
import (
"database/sql"
"errors"
"fmt"
"log"
"time"
@ -108,6 +110,30 @@ func (v VoteChoice) String() string {
return unknownVariant
}
type VoteSums struct {
Ayes, Nayes, Abstains int
}
func (v *VoteSums) VoteCount() int {
return v.Ayes + v.Nayes + v.Abstains
}
func (v *VoteSums) TotalVotes() int {
return v.Ayes + v.Nayes
}
func (v *VoteSums) CalculateResult(quorum int, majority float32) (VoteStatus, string) {
if v.VoteCount() < quorum {
return voteStatusDeclined, fmt.Sprintf("Needed quorum of %d has not been reached.", quorum)
}
if (float32(v.Ayes) / float32(v.TotalVotes())) < majority {
return voteStatusDeclined, fmt.Sprintf("Needed majority of %0.2f%% has not been reached.", majority)
}
return voteStatusApproved, "Quorum and majority have been reached"
}
type Decision struct {
ID int64 `db:"id"`
Proposed time.Time
@ -123,6 +149,12 @@ type Decision struct {
VoteType VoteType
}
type ClosedDecision struct {
Decision *Decision
VoteSums *VoteSums
Reasoning string
}
type DecisionModel struct {
DB *sqlx.DB
InfoLog *log.Logger
@ -164,14 +196,23 @@ VALUES (:proposed, :proponent, :title, :content, :votetype, :status, :due, :prop
return id, nil
}
func (m *DecisionModel) CloseDecisions() error {
rows, err := m.DB.NamedQuery(`
func (m *DecisionModel) CloseDecisions() ([]*ClosedDecision, error) {
tx, err := m.DB.Beginx()
if err != nil {
return nil, fmt.Errorf("could not start transaction: %w", err)
}
defer func(tx *sqlx.Tx) {
_ = tx.Rollback()
}(tx)
rows, err := tx.NamedQuery(`
SELECT decisions.id, decisions.tag, decisions.proponent, decisions.proposed, decisions.title, decisions.content,
decisions.votetype, decisions.status, decisions.due, decisions.modified
FROM decisions
WHERE decisions.status=0 AND :now > due`, struct{ Now time.Time }{Now: time.Now().UTC()})
if err != nil {
return fmt.Errorf("fetching closable decisions failed: %w", err)
return nil, fmt.Errorf("fetching closable decisions failed: %w", err)
}
defer func(rows *sqlx.Rows) {
@ -183,31 +224,143 @@ WHERE decisions.status=0 AND :now > due`, struct{ Now time.Time }{Now: time.Now
for rows.Next() {
decision := &Decision{}
if err = rows.StructScan(decision); err != nil {
return fmt.Errorf("scanning row failed: %w", err)
return nil, fmt.Errorf("scanning row failed: %w", err)
}
if rows.Err() != nil {
return fmt.Errorf("row error: %w", err)
return nil, fmt.Errorf("row error: %w", err)
}
decisions = append(decisions, decision)
}
results := make([]*ClosedDecision, 0, len(decisions))
var decisionResult *ClosedDecision
for _, decision := range decisions {
m.InfoLog.Printf("found closable decision %s", decision.Tag)
if err = m.Close(decision.Tag); err != nil {
return fmt.Errorf("closing decision %s failed: %w", decision.Tag, err)
if decisionResult, err = m.CloseDecision(tx, decision); err != nil {
return nil, fmt.Errorf("closing decision %s failed: %w", decision.Tag, err)
}
results = append(results, decisionResult)
}
if err = tx.Commit(); err != nil {
return nil, fmt.Errorf("could not commit transaction: %w", err)
}
return nil
return results, nil
}
func (m *DecisionModel) Close(tag string) error {
panic("not implemented")
func (m *DecisionModel) CloseDecision(tx *sqlx.Tx, d *Decision) (*ClosedDecision, error) {
quorum, majority := d.VoteType.QuorumAndMajority()
var (
voteSums *VoteSums
err error
reasoning string
)
if voteSums, err = m.GetVoteSums(tx, d); err != nil {
return nil, fmt.Errorf("getting vote sums failed: %w", err)
}
d.Status, reasoning = voteSums.CalculateResult(quorum, majority)
result, err := m.DB.NamedExec(
`UPDATE decisions SET status=:status, modified=CURRENT_TIMESTAMP WHERE id=:id`,
d,
)
if err != nil {
return nil, fmt.Errorf("could not execute update query: %w", err)
}
affectedRows, err := result.RowsAffected()
if err != nil {
return nil, fmt.Errorf("could not get affected rows count: %w", err)
}
if affectedRows != 1 {
return nil, fmt.Errorf("unexpected number of rows %d instead of 1", affectedRows)
}
m.InfoLog.Printf("decision %s closed with result %s: reasoning '%s'", d.Tag, d.Status, reasoning)
return &ClosedDecision{d, voteSums, reasoning}, nil
}
func (m *DecisionModel) FindUnVotedDecisionsForVoter(v *Voter) ([]Decision, error) {
func (m *DecisionModel) FindUnVotedDecisionsForVoter(_ *Voter) ([]Decision, error) {
panic("not implemented")
}
func (m *DecisionModel) GetVoteSums(tx *sqlx.Tx, d *Decision) (*VoteSums, error) {
voteRows, err := tx.NamedQuery(
`SELECT vote, COUNT(vote) FROM votes WHERE decision=$1 GROUP BY vote`,
d.ID,
)
if err != nil {
return nil, fmt.Errorf("fetching vote sums for motion %s failed: %w", d.Tag, err)
}
defer func(voteRows *sqlx.Rows) {
_ = voteRows.Close()
}(voteRows)
sums := &VoteSums{}
for voteRows.Next() {
var (
vote VoteChoice
count int
)
if err = voteRows.Err(); err != nil {
return nil, fmt.Errorf("could not fetch vote sums for motion %s: %w", d.Tag, err)
}
if err = voteRows.Scan(&vote, &count); err != nil {
return nil, fmt.Errorf("could not parse row for vote sums of motion %s: %w", d.Tag, err)
}
switch vote {
case voteAye:
sums.Ayes = count
case voteNaye:
sums.Nayes = count
case voteAbstain:
sums.Abstains = count
}
}
return sums, nil
}
func (m *DecisionModel) GetNextPendingDecisionDue() (*time.Time, error) {
row := m.DB.QueryRow(`SELECT due FROM decisions WHERE status=0 ORDER BY due LIMIT 1`, nil)
if row == nil {
return nil, errors.New("no row returned")
}
if err := row.Err(); err != nil {
return nil, fmt.Errorf("could not retrieve row for next pending decision: %w", err)
}
var due time.Time
if err := row.Scan(&due); err != nil {
if errors.Is(err, sql.ErrNoRows) {
m.InfoLog.Print("no pending decisions")
return nil, nil
}
return nil, fmt.Errorf("parsing result failed: %w", err)
}
return &due, nil
}

@ -15,7 +15,9 @@ import (
"github.com/stretchr/testify/require"
)
func TestDecisionModel_Create(t *testing.T) {
func prepareTestDb(t *testing.T) (*sqlx.DB, *log.Logger) {
t.Helper()
testDir := t.TempDir()
db, err := sql.Open("sqlite3", path.Join(testDir, "test.sqlite"))
@ -28,6 +30,12 @@ func TestDecisionModel_Create(t *testing.T) {
err = internal.InitializeDb(dbx.DB, logger)
require.NoError(t, err)
return dbx, logger
}
func TestDecisionModel_Create(t *testing.T) {
dbx, logger := prepareTestDb(t)
dm := models.DecisionModel{DB: dbx, InfoLog: logger}
v := &models.Voter{
@ -40,3 +48,35 @@ func TestDecisionModel_Create(t *testing.T) {
assert.NoError(t, err)
assert.NotEmpty(t, id)
}
func TestDecisionModel_GetNextPendingDecisionDue(t *testing.T) {
dbx, logger := prepareTestDb(t)
dm := models.DecisionModel{DB: dbx, InfoLog: logger}
var (
nextDue *time.Time
err error
)
nextDue, err = dm.GetNextPendingDecisionDue()
assert.NoError(t, err)
assert.Empty(t, nextDue)
v := &models.Voter{
ID: 1, // sqlite does not check referential integrity. Might fail with a foreign key index.
Name: "test voter",
Reminder: "test+voter@example.com",
}
due := time.Now().Add(10 * time.Minute)
_, err = dm.Create(v, models.VoteTypeMotion, "test motion", "I move that we should test more", time.Now(), due)
require.NoError(t, err)
nextDue, err = dm.GetNextPendingDecisionDue()
assert.NoError(t, err)
assert.NotEmpty(t, nextDue)
assert.Equal(t, due.UTC(), *nextDue)
}

Loading…
Cancel
Save