|
|
|
@ -91,44 +91,79 @@ type Client struct {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *Client) Run(ctx context.Context) error {
|
|
|
|
|
protocolErrors := make(chan error)
|
|
|
|
|
framerErrors := make(chan error)
|
|
|
|
|
const componentCount = 4
|
|
|
|
|
|
|
|
|
|
protocolErrors, framerErrors := make(chan error), make(chan error)
|
|
|
|
|
|
|
|
|
|
subCtx, cancel := context.WithCancel(ctx)
|
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
|
wg.Add(componentCount)
|
|
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
|
cancel()
|
|
|
|
|
c.logger.Info("context canceled, waiting for shutdown of components")
|
|
|
|
|
wg.Wait()
|
|
|
|
|
c.logger.Info("shutdown complete")
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
go func(f protocol.Framer) {
|
|
|
|
|
framerErrors <- f.ReadFrames(ctx, c.port, c.in)
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
|
|
err := f.ReadFrames(subCtx, c.port, c.in)
|
|
|
|
|
|
|
|
|
|
c.logger.Info("frame reading stopped")
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case framerErrors <- err:
|
|
|
|
|
case <-subCtx.Done():
|
|
|
|
|
}
|
|
|
|
|
}(c.framer)
|
|
|
|
|
|
|
|
|
|
go func(f protocol.Framer) {
|
|
|
|
|
framerErrors <- f.WriteFrames(ctx, c.port, c.out)
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
|
|
err := f.WriteFrames(subCtx, c.port, c.out)
|
|
|
|
|
|
|
|
|
|
c.logger.Info("frame writing stopped")
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case framerErrors <- err:
|
|
|
|
|
case <-subCtx.Done():
|
|
|
|
|
}
|
|
|
|
|
}(c.framer)
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
clientProtocol := protocol.NewClient(c.handler, c.commands, c.in, c.out, c.logger)
|
|
|
|
|
|
|
|
|
|
protocolErrors <- clientProtocol.Handle(ctx)
|
|
|
|
|
err := clientProtocol.Handle(subCtx)
|
|
|
|
|
|
|
|
|
|
c.logger.Info("client protocol stopped")
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case protocolErrors <- err:
|
|
|
|
|
case <-subCtx.Done():
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
ctx, cancelCommandLoop := context.WithCancel(ctx)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
|
|
c.commandLoop(subCtx)
|
|
|
|
|
|
|
|
|
|
go c.commandLoop(ctx)
|
|
|
|
|
c.logger.Info("client command loop stopped")
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
cancelCommandLoop()
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
case err := <-framerErrors:
|
|
|
|
|
cancelCommandLoop()
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error from framer: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
case err := <-protocolErrors:
|
|
|
|
|
cancelCommandLoop()
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error from protocol: %w", err)
|
|
|
|
|
}
|
|
|
|
|