package progress

import (
	"context"
	"io"
	"sort"
	"sync"
	"time"

	"github.com/pkg/errors"
)

// Progress package provides utility functions for using the context to capture
// progress of a running function. All progress items written contain an ID
// that is used to collapse unread messages.

type contextKeyT string

var contextKey = contextKeyT("buildkit/util/progress")

// FromContext returns a progress writer from a context.
func FromContext(ctx context.Context, opts ...WriterOption) (Writer, bool, context.Context) {
	v := ctx.Value(contextKey)
	pw, ok := v.(*progressWriter)
	if !ok {
		if pw, ok := v.(*MultiWriter); ok {
			return pw, true, ctx
		}
		return &noOpWriter{}, false, ctx
	}
	pw = newWriter(pw)
	for _, o := range opts {
		o(pw)
	}
	ctx = context.WithValue(ctx, contextKey, pw)
	return pw, true, ctx
}

type WriterOption func(Writer)

// NewContext returns a new context and a progress reader that captures all
// progress items writtern to this context. Last returned parameter is a closer
// function to signal that no new writes will happen to this context.
func NewContext(ctx context.Context) (Reader, context.Context, func()) {
	pr, pw, cancel := pipe()
	ctx = WithProgress(ctx, pw)
	return pr, ctx, cancel
}

func WithProgress(ctx context.Context, pw Writer) context.Context {
	return context.WithValue(ctx, contextKey, pw)
}

func WithMetadata(key string, val interface{}) WriterOption {
	return func(w Writer) {
		if pw, ok := w.(*progressWriter); ok {
			pw.meta[key] = val
		}
		if pw, ok := w.(*MultiWriter); ok {
			pw.meta[key] = val
		}
	}
}

type Controller interface {
	Start(context.Context) (context.Context, func(error))
	Status(id string, action string) func()
}

type Writer interface {
	Write(id string, value interface{}) error
	Close() error
}

type Reader interface {
	Read(context.Context) ([]*Progress, error)
}

type Progress struct {
	ID        string
	Timestamp time.Time
	Sys       interface{}
	meta      map[string]interface{}
}

type Status struct {
	Action    string
	Current   int
	Total     int
	Started   *time.Time
	Completed *time.Time
}

type progressReader struct {
	ctx     context.Context
	cond    *sync.Cond
	mu      sync.Mutex
	writers map[*progressWriter]struct{}
	dirty   map[string]*Progress
}

func (pr *progressReader) Read(ctx context.Context) ([]*Progress, error) {
	done := make(chan struct{})
	defer close(done)
	go func() {
		select {
		case <-done:
		case <-ctx.Done():
			pr.mu.Lock()
			pr.cond.Broadcast()
			pr.mu.Unlock()
		}
	}()
	pr.mu.Lock()
	for {
		select {
		case <-ctx.Done():
			pr.mu.Unlock()
			return nil, ctx.Err()
		default:
		}
		dmap := pr.dirty
		if len(dmap) == 0 {
			select {
			case <-pr.ctx.Done():
				if len(pr.writers) == 0 {
					pr.mu.Unlock()
					return nil, io.EOF
				}
			default:
			}
			pr.cond.Wait()
			continue
		}
		pr.dirty = make(map[string]*Progress)
		pr.mu.Unlock()

		out := make([]*Progress, 0, len(dmap))
		for _, p := range dmap {
			out = append(out, p)
		}

		sort.Slice(out, func(i, j int) bool {
			return out[i].Timestamp.Before(out[j].Timestamp)
		})

		return out, nil
	}
}

func (pr *progressReader) append(pw *progressWriter) {
	pr.mu.Lock()
	defer pr.mu.Unlock()

	select {
	case <-pr.ctx.Done():
		return
	default:
		pr.writers[pw] = struct{}{}
	}
}

func pipe() (*progressReader, *progressWriter, func()) {
	ctx, cancel := context.WithCancel(context.Background())
	pr := &progressReader{
		ctx:     ctx,
		writers: make(map[*progressWriter]struct{}),
		dirty:   make(map[string]*Progress),
	}
	pr.cond = sync.NewCond(&pr.mu)
	go func() {
		<-ctx.Done()
		pr.mu.Lock()
		pr.cond.Broadcast()
		pr.mu.Unlock()
	}()
	pw := &progressWriter{
		reader: pr,
	}
	return pr, pw, cancel
}

func newWriter(pw *progressWriter) *progressWriter {
	meta := make(map[string]interface{})
	for k, v := range pw.meta {
		meta[k] = v
	}
	pw = &progressWriter{
		reader: pw.reader,
		meta:   meta,
	}
	pw.reader.append(pw)
	return pw
}

type progressWriter struct {
	done   bool
	reader *progressReader
	meta   map[string]interface{}
}

func (pw *progressWriter) Write(id string, v interface{}) error {
	if pw.done {
		return errors.Errorf("writing %s to closed progress writer", id)
	}
	return pw.writeRawProgress(&Progress{
		ID:        id,
		Timestamp: time.Now(),
		Sys:       v,
		meta:      pw.meta,
	})
}

func (pw *progressWriter) WriteRawProgress(p *Progress) error {
	meta := p.meta
	if len(pw.meta) > 0 {
		meta = map[string]interface{}{}
		for k, v := range p.meta {
			meta[k] = v
		}
		for k, v := range pw.meta {
			if _, ok := meta[k]; !ok {
				meta[k] = v
			}
		}
	}
	p.meta = meta
	return pw.writeRawProgress(p)
}

func (pw *progressWriter) writeRawProgress(p *Progress) error {
	pw.reader.mu.Lock()
	pw.reader.dirty[p.ID] = p
	pw.reader.cond.Broadcast()
	pw.reader.mu.Unlock()
	return nil
}

func (pw *progressWriter) Close() error {
	pw.reader.mu.Lock()
	delete(pw.reader.writers, pw)
	pw.reader.mu.Unlock()
	pw.reader.cond.Broadcast()
	pw.done = true
	return nil
}

func (p *Progress) Meta(key string) (interface{}, bool) {
	v, ok := p.meta[key]
	return v, ok
}

type noOpWriter struct{}

func (pw *noOpWriter) Write(_ string, _ interface{}) error {
	return nil
}

func (pw *noOpWriter) Close() error {
	return nil
}