Critical: - derpclient.go: Close() no longer nils d.clients map — prevents panic from concurrent reconnect() writing to nil map. Also added d.closed check after successful Connect() in reconnect(). - control.go: HTTP upgrade response now parsed via bufio.Reader + http.ReadResponse. Unconsumed bytes (Noise handshake data in same TCP segment) preserved via bufferedConn wrapper. High: - control.go: loadOrGenerateKeys no longer early-returns after loading machine key — always validates/generates node key too. - outbound.go: Renamed isNew→exists for clarity (logic was correct but confusing). Added rebuildPeers() for full MapResponse.Peers snapshots on reconnect — removes stale peers, adds new ones to WireGuard. Medium: - disco.go: LocalEndpoints() now filters out 0.0.0.0/[::] wildcard addresses — only returns concrete STUN/LAN endpoints for control. - disco.go: handlePing DERP pong reply now falls back to defaultDERP when peer.homeDERP is 0 (matching sendCallMeMaybe behavior). - control.go: Hostinfo.OS uses runtime.GOOS instead of hardcoded "linux". - funnel.go: Bidirectional proxy uses TCP half-close (CloseWrite) so io.Copy goroutines don't hang when one direction completes.
326 lines
7.6 KiB
Go
326 lines
7.6 KiB
Go
package tailnet
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/netip"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.zx2c4.com/wireguard/tun/netstack"
|
|
)
|
|
|
|
// FunnelOptions configures a Funnel endpoint.
|
|
type FunnelOptions struct {
|
|
// Port is the public-facing port (443, 8443, 10000).
|
|
Port uint16
|
|
|
|
// Protocol is "tcp", "tls-terminated-tcp", or "https".
|
|
Protocol string
|
|
|
|
// LocalAddr is the backend target, e.g. "127.0.0.1:3000".
|
|
LocalAddr string
|
|
|
|
// TerminateTLS, if true, means we terminate TLS and forward
|
|
// plaintext to LocalAddr. Only used with Protocol "tls-terminated-tcp".
|
|
TerminateTLS bool
|
|
|
|
// TLSConfig is optional TLS configuration for TLS termination.
|
|
TLSConfig *tls.Config
|
|
}
|
|
|
|
// FunnelServer manages Funnel listeners on the netstack TUN.
|
|
type FunnelServer struct {
|
|
mu sync.Mutex
|
|
tnet *netstack.Net
|
|
listeners map[uint16]*funnelListener
|
|
logf func(format string, args ...any)
|
|
}
|
|
|
|
type funnelListener struct {
|
|
port uint16
|
|
opts FunnelOptions
|
|
listener net.Listener
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// NewFunnelServer creates a FunnelServer that listens on the given netstack.
|
|
func NewFunnelServer(tnet *netstack.Net, logf func(format string, args ...any)) *FunnelServer {
|
|
return &FunnelServer{
|
|
tnet: tnet,
|
|
listeners: make(map[uint16]*funnelListener),
|
|
logf: logf,
|
|
}
|
|
}
|
|
|
|
// EnableFunnel starts listening on a port and forwarding connections to a local target.
|
|
func (fs *FunnelServer) EnableFunnel(ctx context.Context, opts FunnelOptions) error {
|
|
if opts.Port == 0 {
|
|
return fmt.Errorf("funnel: port is required")
|
|
}
|
|
if opts.LocalAddr == "" {
|
|
return fmt.Errorf("funnel: local address is required")
|
|
}
|
|
|
|
fs.mu.Lock()
|
|
if _, ok := fs.listeners[opts.Port]; ok {
|
|
fs.mu.Unlock()
|
|
return fmt.Errorf("funnel: port %d already in use", opts.Port)
|
|
}
|
|
fs.mu.Unlock()
|
|
|
|
// Listen on the netstack TUN address
|
|
addr := &net.TCPAddr{Port: int(opts.Port)}
|
|
ln, err := fs.tnet.ListenTCP(addr)
|
|
if err != nil {
|
|
return fmt.Errorf("funnel: listen on port %d: %w", opts.Port, err)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
fl := &funnelListener{
|
|
port: opts.Port,
|
|
opts: opts,
|
|
listener: ln,
|
|
cancel: cancel,
|
|
}
|
|
|
|
fs.mu.Lock()
|
|
fs.listeners[opts.Port] = fl
|
|
fs.mu.Unlock()
|
|
|
|
fs.logf("funnel: listening on :%d → %s (%s)", opts.Port, opts.LocalAddr, opts.Protocol)
|
|
|
|
go fs.acceptLoop(ctx, fl)
|
|
return nil
|
|
}
|
|
|
|
// DisableFunnel stops listening on a port.
|
|
func (fs *FunnelServer) DisableFunnel(port uint16) error {
|
|
fs.mu.Lock()
|
|
fl, ok := fs.listeners[port]
|
|
if !ok {
|
|
fs.mu.Unlock()
|
|
return fmt.Errorf("funnel: port %d not active", port)
|
|
}
|
|
delete(fs.listeners, port)
|
|
fs.mu.Unlock()
|
|
|
|
fl.cancel()
|
|
fl.listener.Close()
|
|
fs.logf("funnel: stopped on :%d", port)
|
|
return nil
|
|
}
|
|
|
|
// ActivePorts returns the list of active Funnel ports.
|
|
func (fs *FunnelServer) ActivePorts() []uint16 {
|
|
fs.mu.Lock()
|
|
defer fs.mu.Unlock()
|
|
ports := make([]uint16, 0, len(fs.listeners))
|
|
for p := range fs.listeners {
|
|
ports = append(ports, p)
|
|
}
|
|
return ports
|
|
}
|
|
|
|
// Close stops all Funnel listeners.
|
|
func (fs *FunnelServer) Close() error {
|
|
fs.mu.Lock()
|
|
listeners := fs.listeners
|
|
fs.listeners = make(map[uint16]*funnelListener)
|
|
fs.mu.Unlock()
|
|
|
|
for _, fl := range listeners {
|
|
fl.cancel()
|
|
fl.listener.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// acceptLoop accepts incoming connections and proxies them.
|
|
func (fs *FunnelServer) acceptLoop(ctx context.Context, fl *funnelListener) {
|
|
for {
|
|
conn, err := fl.listener.Accept()
|
|
if err != nil {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
fs.logf("funnel :%d: accept error: %v", fl.port, err)
|
|
return
|
|
}
|
|
}
|
|
go fs.handleConn(ctx, fl, conn)
|
|
}
|
|
}
|
|
|
|
// handleConn handles a single Funnel connection by proxying to the local target.
|
|
func (fs *FunnelServer) handleConn(ctx context.Context, fl *funnelListener, inbound net.Conn) {
|
|
defer inbound.Close()
|
|
|
|
// Optional TLS termination
|
|
if fl.opts.TerminateTLS && fl.opts.TLSConfig != nil {
|
|
tlsConn := tls.Server(inbound, fl.opts.TLSConfig)
|
|
if err := tlsConn.HandshakeContext(ctx); err != nil {
|
|
fs.logf("funnel :%d: TLS handshake failed: %v", fl.port, err)
|
|
return
|
|
}
|
|
inbound = tlsConn
|
|
}
|
|
|
|
// Connect to local target
|
|
var dialer net.Dialer
|
|
dialCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
outbound, err := dialer.DialContext(dialCtx, "tcp", fl.opts.LocalAddr)
|
|
if err != nil {
|
|
fs.logf("funnel :%d: dial %s failed: %v", fl.port, fl.opts.LocalAddr, err)
|
|
return
|
|
}
|
|
defer outbound.Close()
|
|
|
|
// Bidirectional copy with half-close. When one direction finishes,
|
|
// close the write side of the other so the copy goroutine unblocks.
|
|
done := make(chan struct{}, 2)
|
|
go func() {
|
|
io.Copy(outbound, inbound)
|
|
if tc, ok := outbound.(*net.TCPConn); ok {
|
|
tc.CloseWrite()
|
|
}
|
|
done <- struct{}{}
|
|
}()
|
|
go func() {
|
|
io.Copy(inbound, outbound)
|
|
if tc, ok := inbound.(*net.TCPConn); ok {
|
|
tc.CloseWrite()
|
|
}
|
|
done <- struct{}{}
|
|
}()
|
|
|
|
for i := 0; i < 2; i++ {
|
|
select {
|
|
case <-done:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// EnableFunnel starts a Funnel endpoint on the Outbound.
|
|
// It sets IngressEnabled in Hostinfo and starts the TCP proxy.
|
|
func (o *Outbound) EnableFunnel(ctx context.Context, opts FunnelOptions) error {
|
|
o.mu.Lock()
|
|
if o.tnet == nil {
|
|
o.mu.Unlock()
|
|
return fmt.Errorf("outbound not started")
|
|
}
|
|
if o.funnel == nil {
|
|
o.funnel = NewFunnelServer(o.tnet, o.logf)
|
|
}
|
|
o.mu.Unlock()
|
|
|
|
// Re-register with IngressEnabled
|
|
if err := o.reRegisterWithIngress(ctx, true); err != nil {
|
|
return fmt.Errorf("funnel: re-register: %w", err)
|
|
}
|
|
|
|
return o.funnel.EnableFunnel(ctx, opts)
|
|
}
|
|
|
|
// DisableFunnel stops a Funnel endpoint.
|
|
func (o *Outbound) DisableFunnel(ctx context.Context, port uint16) error {
|
|
o.mu.Lock()
|
|
f := o.funnel
|
|
o.mu.Unlock()
|
|
if f == nil {
|
|
return fmt.Errorf("funnel not initialized")
|
|
}
|
|
|
|
if err := f.DisableFunnel(port); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If no more active ports, disable ingress
|
|
if len(f.ActivePorts()) == 0 {
|
|
o.reRegisterWithIngress(ctx, false)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// FunnelAddrs returns the public Funnel addresses for this node.
|
|
// These are the Tailscale DNS names that external clients can connect to.
|
|
func (o *Outbound) FunnelAddrs() []string {
|
|
o.mu.Lock()
|
|
defer o.mu.Unlock()
|
|
|
|
nm := o.control.NetMap()
|
|
if nm == nil || nm.Node == nil {
|
|
return nil
|
|
}
|
|
|
|
var addrs []string
|
|
name := nm.Node.Name
|
|
if name == "" {
|
|
return nil
|
|
}
|
|
// Strip trailing dot
|
|
if name[len(name)-1] == '.' {
|
|
name = name[:len(name)-1]
|
|
}
|
|
|
|
if o.funnel != nil {
|
|
for _, port := range o.funnel.ActivePorts() {
|
|
if port == 443 {
|
|
addrs = append(addrs, fmt.Sprintf("https://%s", name))
|
|
} else {
|
|
addrs = append(addrs, fmt.Sprintf("https://%s:%d", name, port))
|
|
}
|
|
}
|
|
}
|
|
return addrs
|
|
}
|
|
|
|
// reRegisterWithIngress re-registers with the control plane, toggling IngressEnabled.
|
|
func (o *Outbound) reRegisterWithIngress(ctx context.Context, ingress bool) error {
|
|
o.logf("funnel: re-registering with IngressEnabled=%v", ingress)
|
|
|
|
// Build a new register request with updated Hostinfo
|
|
regReq := o.control.buildRegisterRequest(ingress)
|
|
|
|
body, err := encodeJSON(regReq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
resp, err := o.control.noise.Post(ctx, "/machine/register", body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
respBody, err := readLimitedBody(resp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.StatusCode != 200 {
|
|
return fmt.Errorf("re-register: HTTP %d: %s", resp.StatusCode, respBody)
|
|
}
|
|
|
|
o.logf("funnel: re-registered successfully, ingress=%v", ingress)
|
|
return nil
|
|
}
|
|
|
|
// SelfTailscaleIPs returns this node's Tailscale IP addresses.
|
|
func (o *Outbound) SelfTailscaleIPs() []netip.Addr {
|
|
o.mu.Lock()
|
|
defer o.mu.Unlock()
|
|
var addrs []netip.Addr
|
|
for _, pfx := range o.selfAddrs {
|
|
addrs = append(addrs, pfx.Addr())
|
|
}
|
|
return addrs
|
|
}
|