msgviewer: simplify filter and pager command handling

Refactor the filtering and paging logic to use several fewer goroutines.
Fixes data race condition with the timing of filter -> pager ->
terminal, can be found when switching message views fast.

Check if filter -> pager process is currently running before calling it
to start again. Fixes data race between fetching message body and
terminal starting. Both can initiate the copying process, and for long
running filters and fast message fetching, it is possible to have
fetched a message but still be running the filter when the terminal
starts.

Move StripAnsi to it's own file in lib/parse, similar to the hyperlinks
parser.

Signed-off-by: Tim Culverhouse <tim@timculverhouse.com>
Acked-by: Robin Jarry <robin@jarry.cc>
This commit is contained in:
Tim Culverhouse 2022-10-17 15:16:09 -05:00 committed by Robin Jarry
parent 7647dfb8b4
commit 556f346f96
2 changed files with 67 additions and 92 deletions

37
lib/parse/ansi.go Normal file
View file

@ -0,0 +1,37 @@
package parse
import (
"bufio"
"bytes"
"fmt"
"io"
"os"
"regexp"
"git.sr.ht/~rjarry/aerc/logging"
)
var ansi = regexp.MustCompile("\x1B\\[[0-?]*[ -/]*[@-~]")
// StripAnsi strips ansi escape codes from the reader
func StripAnsi(r io.Reader) io.Reader {
buf := bytes.NewBuffer(nil)
scanner := bufio.NewScanner(r)
scanner.Buffer(nil, 1024*1024*1024)
for scanner.Scan() {
line := scanner.Bytes()
line = ansi.ReplaceAll(line, []byte(""))
_, err := buf.Write(line)
if err != nil {
logging.Warnf("failed write ", err)
}
_, err = buf.Write([]byte("\n"))
if err != nil {
logging.Warnf("failed write ", err)
}
}
if err := scanner.Err(); err != nil {
fmt.Fprintf(os.Stderr, "failed to read line: %v\n", err)
}
return buf
}

View file

@ -1,14 +1,13 @@
package widgets package widgets
import ( import (
"bufio"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"os" "os"
"os/exec" "os/exec"
"regexp"
"strings" "strings"
"sync/atomic"
"github.com/danwakefield/fnmatch" "github.com/danwakefield/fnmatch"
"github.com/gdamore/tcell/v2" "github.com/gdamore/tcell/v2"
@ -25,8 +24,6 @@ import (
"git.sr.ht/~rjarry/aerc/models" "git.sr.ht/~rjarry/aerc/models"
) )
var ansi = regexp.MustCompile("\x1B\\[[0-?]*[ -/]*[@-~]")
var _ ProvidesMessages = (*MessageViewer)(nil) var _ ProvidesMessages = (*MessageViewer)(nil)
type MessageViewer struct { type MessageViewer struct {
@ -513,15 +510,17 @@ type PartViewer struct {
pagerin io.WriteCloser pagerin io.WriteCloser
part *models.BodyStructure part *models.BodyStructure
showHeaders bool showHeaders bool
sink io.WriteCloser
source io.Reader source io.Reader
term *Terminal term *Terminal
grid *ui.Grid grid *ui.Grid
uiConfig *config.UIConfig uiConfig *config.UIConfig
copying int32
links []string links []string
} }
const copying int32 = 1
func NewPartViewer(acct *AccountView, conf *config.AercConfig, func NewPartViewer(acct *AccountView, conf *config.AercConfig,
msg lib.MessageView, part *models.BodyStructure, msg lib.MessageView, part *models.BodyStructure,
index []int, index []int,
@ -529,7 +528,6 @@ func NewPartViewer(acct *AccountView, conf *config.AercConfig,
var ( var (
filter *exec.Cmd filter *exec.Cmd
pager *exec.Cmd pager *exec.Cmd
pipe io.WriteCloser
pagerin io.WriteCloser pagerin io.WriteCloser
term *Terminal term *Terminal
) )
@ -582,10 +580,7 @@ func NewPartViewer(acct *AccountView, conf *config.AercConfig,
fmt.Sprintf("AERC_MIME_TYPE=%s", mime)) fmt.Sprintf("AERC_MIME_TYPE=%s", mime))
filter.Env = append(filter.Env, filter.Env = append(filter.Env,
fmt.Sprintf("AERC_FILENAME=%s", part.FileName())) fmt.Sprintf("AERC_FILENAME=%s", part.FileName()))
if pipe, err = filter.StdinPipe(); err != nil { if pagerin, err = pager.StdinPipe(); err != nil {
return nil, err
}
if pagerin, _ = pager.StdinPipe(); err != nil {
return nil, err return nil, err
} }
if term, err = NewTerminal(pager); err != nil { if term, err = NewTerminal(pager); err != nil {
@ -610,7 +605,6 @@ func NewPartViewer(acct *AccountView, conf *config.AercConfig,
pagerin: pagerin, pagerin: pagerin,
part: part, part: part,
showHeaders: conf.Viewer.ShowHeaders, showHeaders: conf.Viewer.ShowHeaders,
sink: pipe,
term: term, term: term,
grid: grid, grid: grid,
uiConfig: acct.UiConfig(), uiConfig: acct.UiConfig(),
@ -635,27 +629,37 @@ func (pv *PartViewer) UpdateScreen() {
} }
func (pv *PartViewer) attemptCopy() { func (pv *PartViewer) attemptCopy() {
if pv.source == nil || pv.pager == nil || pv.pager.Process == nil { if pv.source == nil ||
pv.filter == nil ||
atomic.LoadInt32(&pv.copying) == copying {
return return
} }
if pv.filter != nil { atomic.StoreInt32(&pv.copying, copying)
pv.copyFilterOutToPager() // delayed until we write to the sink pv.writeMailHeaders()
if strings.EqualFold(pv.part.MIMEType, "text") {
pv.source = parse.StripAnsi(pv.hyperlinks(pv.source))
}
pv.filter.Stdin = pv.source
pv.filter.Stdout = pv.pagerin
pv.filter.Stderr = pv.pagerin
err := pv.filter.Start()
if err != nil {
logging.Errorf("error running filter: %v", err)
return
} }
go func() { go func() {
defer logging.PanicHandler() defer logging.PanicHandler()
defer atomic.StoreInt32(&pv.copying, 0)
pv.writeMailHeaders() err = pv.filter.Wait()
if strings.EqualFold(pv.part.MIMEType, "text") {
// if the content is plain we can strip ansi control chars
pv.copySourceToSinkStripAnsi()
} else {
// if it's binary we have to rely on the filter to be sane
_, err := io.Copy(pv.sink, pv.source)
if err != nil { if err != nil {
logging.Warnf("failed to copy: %v", err) logging.Errorf("error waiting for filter: %v", err)
return
} }
err = pv.pagerin.Close()
if err != nil {
logging.Errorf("error closing pager pipe: %v", err)
return
} }
pv.sink.Close()
}() }()
} }
@ -703,72 +707,6 @@ func (pv *PartViewer) hyperlinks(r io.Reader) (reader io.Reader) {
return reader return reader
} }
func (pv *PartViewer) copyFilterOutToPager() {
stdout, _ := pv.filter.StdoutPipe()
stderr, _ := pv.filter.StderrPipe()
err := pv.filter.Start()
if err != nil {
logging.Warnf("failed to start filter: %v", err)
}
ch := make(chan interface{})
go func() {
defer logging.PanicHandler()
_, err := io.Copy(pv.pagerin, stdout)
if err != nil {
pv.err = err
pv.Invalidate()
}
stdout.Close()
ch <- nil
}()
go func() {
defer logging.PanicHandler()
_, err := io.Copy(pv.pagerin, stderr)
if err != nil {
pv.err = err
pv.Invalidate()
}
stderr.Close()
ch <- nil
}()
go func() {
defer logging.PanicHandler()
<-ch
<-ch
err := pv.filter.Wait()
if err != nil {
logging.Warnf("failed to wait for the filter process: %v", err)
}
pv.pagerin.Close()
// If the pager command doesn't keep the terminal running, we
// risk not drawing the screen until user input unless we
// invalidate after writing
pv.Invalidate()
}()
}
func (pv *PartViewer) copySourceToSinkStripAnsi() {
scanner := bufio.NewScanner(pv.hyperlinks(pv.source))
// some people send around huge html without any newline in between
// this did overflow the default 64KB buffer of bufio.Scanner.
// If something can't fit in a GB there's no hope left
scanner.Buffer(nil, 1024*1024*1024)
for scanner.Scan() {
text := scanner.Text()
text = ansi.ReplaceAllString(text, "")
_, err := io.WriteString(pv.sink, text+"\n")
if err != nil {
logging.Warnf("failed write ", err)
}
}
if err := scanner.Err(); err != nil {
fmt.Fprintf(os.Stderr, "failed to read line: %v\n", err)
}
}
var noFilterConfiguredCommands = [][]string{ var noFilterConfiguredCommands = [][]string{
{":open<enter>", "Open using the system handler"}, {":open<enter>", "Open using the system handler"},
{":save<space>", "Save to file"}, {":save<space>", "Save to file"},