@ -1,5 +1,5 @@
/ *
Copyright 2022 CAcert Inc .
Copyright CAcert Inc .
SPDX - License - Identifier : Apache - 2.0
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
@ -82,21 +82,24 @@ type Client struct {
config * config . ClientConfig
signerInfo * SignerInfo
knownCACertificates map [ string ] * CACertificateInfo
commandSources [ ] CommandSource
responseSinks map [ messages . ResponseCode ] ResponseSink
sync . Mutex
}
func ( c * Client ) Run (
ctx context . Context , callback <- chan interface { } , handler protocol . ClientHandler ,
ctx context . Context , callback <- chan any , handler protocol . ClientHandler ,
commands chan * protocol . Command ,
) error {
const componentCount = 4
protocolErrors , framerErrors := make ( chan error ) , make ( chan error )
protocolErrors , framerErrors , sourceErrors := make ( chan error ) , make ( chan error ) , make ( chan error )
subCtx , cancel := context . WithCancel ( ctx )
wg := sync . WaitGroup { }
wg . Add ( componentCount )
wg . Add ( len ( c . commandSources ) )
commands := make ( chan * protocol . Command , c . config . CommandChannelCapacity )
fromSigner := make ( chan [ ] byte )
toSigner := make ( chan [ ] byte )
@ -107,6 +110,8 @@ func (c *Client) Run(
c . logger . Info ( "shutdown complete" )
} ( )
c . RunSources ( subCtx , & wg , sourceErrors )
go func ( f protocol . Framer ) {
defer wg . Done ( )
@ -171,6 +176,12 @@ func (c *Client) Run(
return fmt . Errorf ( "error from protocol: %w" , err )
}
return nil
case err := <- sourceErrors :
if err != nil {
return fmt . Errorf ( "error from command source: %w" , err )
}
return nil
}
}
@ -205,10 +216,15 @@ func (c *Client) Close() error {
type commandGenerator func ( context . Context , chan <- * protocol . Command ) error
func ( c * Client ) commandLoop ( ctx context . Context , commands chan * protocol . Command , callback <- chan interface { } ) {
func ( c * Client ) commandLoop ( ctx context . Context , commands chan * protocol . Command , callback <- chan any ) {
healthTimer := time . NewTimer ( c . config . HealthStart )
fetchCRLTimer := time . NewTimer ( c . config . FetchCRLStart )
nextCommands := make ( chan * protocol . Command )
defer func ( ) {
close ( commands )
c . logger . Info ( "command loop stopped" )
} ( )
for {
select {
@ -216,57 +232,54 @@ func (c *Client) commandLoop(ctx context.Context, commands chan *protocol.Comman
return
case callbackData := <- callback :
go func ( ) {
err := c . handleCallback ( ctx , nextC ommands, callbackData )
err := c . handleCallback ( ctx , c ommands, callbackData )
if err != nil {
c . logger . WithError ( err ) . Error ( "callback handling failed" )
}
} ( )
case <- fetchCRLTimer . C :
go c . scheduleRequiredCRLFetches ( ctx , nextC ommands)
go c . scheduleRequiredCRLFetches ( ctx , c ommands)
fetchCRLTimer . Reset ( c . config . FetchCRLInterval )
case <- healthTimer . C :
go c . scheduleHealthCheck ( ctx , nextC ommands)
go c . scheduleHealthCheck ( ctx , c ommands)
healthTimer . Reset ( c . config . HealthInterval )
case nextCommand , ok := <- nextCommands :
if ! ok {
return
}
commands <- nextCommand
c . logger . WithFields ( map [ string ] interface { } {
"command" : nextCommand . Announce ,
"buffer length" : len ( commands ) ,
} ) . Trace ( "sent command" )
}
}
}
type ErrNoResponseSink struct {
msg string
}
func ( e ErrNoResponseSink ) Error ( ) string {
return fmt . Sprintf ( "no response sink for %s response found" , e . msg )
}
func ( c * Client ) handleCallback (
ctx context . Context ,
newCommands chan <- * protocol . Command ,
data interface { } ,
data any ,
) error {
var handler commandGenerator
var (
handler commandGenerator
err error
)
switch d := data . ( type ) {
case SignerInfo :
handler = c . updateSignerInfo ( d )
case * messages . CAInfoResponse :
handler = c . updateCAInformation ( d )
case * messages . FetchCRLResponse :
handler = c . updateCRL ( d )
case * protocol . Response :
handler , err = c . handleResponse ( d )
if err != nil {
return err
}
default :
return fmt . Errorf ( "unknown callback data of type %T" , d ata )
return fmt . Errorf ( "unknown callback data of type %T" , d )
}
if err := handler ( ctx , newCommands ) ; err != nil {
return err
}
return nil
return handler ( ctx , newCommands )
}
func ( c * Client ) updateSignerInfo (
@ -609,6 +622,94 @@ func (c *Client) setLastKnownCRL(caName string, number *big.Int) {
caInfo . LastKnownCRL = number
}
type CommandSource interface {
Run ( context . Context ) error
}
type ResponseSink interface {
SupportedResponses ( ) [ ] messages . ResponseCode
HandleResponse ( context . Context , * messages . ResponseAnnounce , any ) error
NotifyError ( ctx context . Context , requestID , message string ) error
}
func ( c * Client ) RegisterCommandSource ( source CommandSource ) {
c . commandSources = append ( c . commandSources , source )
}
func ( c * Client ) RegisterResponseSink ( sink ResponseSink ) {
for _ , code := range sink . SupportedResponses ( ) {
c . responseSinks [ code ] = sink
}
}
func ( c * Client ) handleResponse ( r * protocol . Response ) ( commandGenerator , error ) {
var handler commandGenerator
switch payload := r . Response . ( type ) {
case * messages . CAInfoResponse :
handler = c . updateCAInformation ( payload )
case * messages . FetchCRLResponse :
handler = c . updateCRL ( payload )
case * messages . ErrorResponse :
handler = func ( ctx context . Context , _ chan <- * protocol . Command ) error {
for _ , sink := range c . responseSinks {
if err := sink . NotifyError ( ctx , r . Announce . ID , payload . Message ) ; err != nil {
return fmt . Errorf ( "error from response sink: %w" , err )
}
}
return nil
}
case * messages . SignCertificateResponse :
sink , ok := c . responseSinks [ messages . RespSignCertificate ]
if ! ok {
return nil , ErrNoResponseSink { "sign certificate" }
}
handler = func ( ctx context . Context , _ chan <- * protocol . Command ) error {
if err := sink . HandleResponse ( ctx , r . Announce , payload ) ; err != nil {
return fmt . Errorf ( "error from response sink: %w" , err )
}
return nil
}
case * messages . SignOpenPGPResponse :
sink , ok := c . responseSinks [ messages . RespSignOpenPGP ]
if ! ok {
return nil , ErrNoResponseSink { "sign openpgp" }
}
handler = func ( ctx context . Context , _ chan <- * protocol . Command ) error {
if err := sink . HandleResponse ( ctx , r . Announce , payload ) ; err != nil {
return fmt . Errorf ( "error from response sink: %w" , err )
}
return nil
}
default :
return nil , fmt . Errorf ( "unhandled response %s" , payload )
}
return handler , nil
}
func ( c * Client ) RunSources ( ctx context . Context , wg * sync . WaitGroup , errorChan chan error ) {
for _ , source := range c . commandSources {
go func ( s CommandSource ) {
defer wg . Done ( )
err := s . Run ( ctx )
if err != nil {
c . logger . WithError ( err ) . Error ( "command source failed" )
errorChan <- err
}
c . logger . Info ( "command source stopped" )
} ( source )
}
}
func New (
cfg * config . ClientConfig ,
logger * logrus . Logger ,
@ -623,6 +724,8 @@ func New(
framer : cobsFramer ,
config : cfg ,
knownCACertificates : make ( map [ string ] * CACertificateInfo ) ,
responseSinks : make ( map [ messages . ResponseCode ] ResponseSink ) ,
commandSources : make ( [ ] CommandSource , 0 ) ,
}
err = client . setupConnection ( & serial . Config {