Files
tailnet/funnel.go
NeoMody d2ed395249 Fix critical/high/medium bugs from Codex GPT-5.4 review
Critical:
- derpclient.go: Close() no longer nils d.clients map — prevents panic
  from concurrent reconnect() writing to nil map. Also added d.closed
  check after successful Connect() in reconnect().
- control.go: HTTP upgrade response now parsed via bufio.Reader +
  http.ReadResponse. Unconsumed bytes (Noise handshake data in same TCP
  segment) preserved via bufferedConn wrapper.

High:
- control.go: loadOrGenerateKeys no longer early-returns after loading
  machine key — always validates/generates node key too.
- outbound.go: Renamed isNew→exists for clarity (logic was correct but
  confusing). Added rebuildPeers() for full MapResponse.Peers snapshots
  on reconnect — removes stale peers, adds new ones to WireGuard.

Medium:
- disco.go: LocalEndpoints() now filters out 0.0.0.0/[::] wildcard
  addresses — only returns concrete STUN/LAN endpoints for control.
- disco.go: handlePing DERP pong reply now falls back to defaultDERP
  when peer.homeDERP is 0 (matching sendCallMeMaybe behavior).
- control.go: Hostinfo.OS uses runtime.GOOS instead of hardcoded "linux".
- funnel.go: Bidirectional proxy uses TCP half-close (CloseWrite) so
  io.Copy goroutines don't hang when one direction completes.
2026-04-02 22:46:54 +08:00

326 lines
7.6 KiB
Go

package tailnet
import (
"context"
"crypto/tls"
"fmt"
"io"
"net"
"net/netip"
"sync"
"time"
"golang.zx2c4.com/wireguard/tun/netstack"
)
// FunnelOptions configures a Funnel endpoint.
type FunnelOptions struct {
// Port is the public-facing port (443, 8443, 10000).
Port uint16
// Protocol is "tcp", "tls-terminated-tcp", or "https".
Protocol string
// LocalAddr is the backend target, e.g. "127.0.0.1:3000".
LocalAddr string
// TerminateTLS, if true, means we terminate TLS and forward
// plaintext to LocalAddr. Only used with Protocol "tls-terminated-tcp".
TerminateTLS bool
// TLSConfig is optional TLS configuration for TLS termination.
TLSConfig *tls.Config
}
// FunnelServer manages Funnel listeners on the netstack TUN.
type FunnelServer struct {
mu sync.Mutex
tnet *netstack.Net
listeners map[uint16]*funnelListener
logf func(format string, args ...any)
}
type funnelListener struct {
port uint16
opts FunnelOptions
listener net.Listener
cancel context.CancelFunc
}
// NewFunnelServer creates a FunnelServer that listens on the given netstack.
func NewFunnelServer(tnet *netstack.Net, logf func(format string, args ...any)) *FunnelServer {
return &FunnelServer{
tnet: tnet,
listeners: make(map[uint16]*funnelListener),
logf: logf,
}
}
// EnableFunnel starts listening on a port and forwarding connections to a local target.
func (fs *FunnelServer) EnableFunnel(ctx context.Context, opts FunnelOptions) error {
if opts.Port == 0 {
return fmt.Errorf("funnel: port is required")
}
if opts.LocalAddr == "" {
return fmt.Errorf("funnel: local address is required")
}
fs.mu.Lock()
if _, ok := fs.listeners[opts.Port]; ok {
fs.mu.Unlock()
return fmt.Errorf("funnel: port %d already in use", opts.Port)
}
fs.mu.Unlock()
// Listen on the netstack TUN address
addr := &net.TCPAddr{Port: int(opts.Port)}
ln, err := fs.tnet.ListenTCP(addr)
if err != nil {
return fmt.Errorf("funnel: listen on port %d: %w", opts.Port, err)
}
ctx, cancel := context.WithCancel(ctx)
fl := &funnelListener{
port: opts.Port,
opts: opts,
listener: ln,
cancel: cancel,
}
fs.mu.Lock()
fs.listeners[opts.Port] = fl
fs.mu.Unlock()
fs.logf("funnel: listening on :%d → %s (%s)", opts.Port, opts.LocalAddr, opts.Protocol)
go fs.acceptLoop(ctx, fl)
return nil
}
// DisableFunnel stops listening on a port.
func (fs *FunnelServer) DisableFunnel(port uint16) error {
fs.mu.Lock()
fl, ok := fs.listeners[port]
if !ok {
fs.mu.Unlock()
return fmt.Errorf("funnel: port %d not active", port)
}
delete(fs.listeners, port)
fs.mu.Unlock()
fl.cancel()
fl.listener.Close()
fs.logf("funnel: stopped on :%d", port)
return nil
}
// ActivePorts returns the list of active Funnel ports.
func (fs *FunnelServer) ActivePorts() []uint16 {
fs.mu.Lock()
defer fs.mu.Unlock()
ports := make([]uint16, 0, len(fs.listeners))
for p := range fs.listeners {
ports = append(ports, p)
}
return ports
}
// Close stops all Funnel listeners.
func (fs *FunnelServer) Close() error {
fs.mu.Lock()
listeners := fs.listeners
fs.listeners = make(map[uint16]*funnelListener)
fs.mu.Unlock()
for _, fl := range listeners {
fl.cancel()
fl.listener.Close()
}
return nil
}
// acceptLoop accepts incoming connections and proxies them.
func (fs *FunnelServer) acceptLoop(ctx context.Context, fl *funnelListener) {
for {
conn, err := fl.listener.Accept()
if err != nil {
select {
case <-ctx.Done():
return
default:
fs.logf("funnel :%d: accept error: %v", fl.port, err)
return
}
}
go fs.handleConn(ctx, fl, conn)
}
}
// handleConn handles a single Funnel connection by proxying to the local target.
func (fs *FunnelServer) handleConn(ctx context.Context, fl *funnelListener, inbound net.Conn) {
defer inbound.Close()
// Optional TLS termination
if fl.opts.TerminateTLS && fl.opts.TLSConfig != nil {
tlsConn := tls.Server(inbound, fl.opts.TLSConfig)
if err := tlsConn.HandshakeContext(ctx); err != nil {
fs.logf("funnel :%d: TLS handshake failed: %v", fl.port, err)
return
}
inbound = tlsConn
}
// Connect to local target
var dialer net.Dialer
dialCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
outbound, err := dialer.DialContext(dialCtx, "tcp", fl.opts.LocalAddr)
if err != nil {
fs.logf("funnel :%d: dial %s failed: %v", fl.port, fl.opts.LocalAddr, err)
return
}
defer outbound.Close()
// Bidirectional copy with half-close. When one direction finishes,
// close the write side of the other so the copy goroutine unblocks.
done := make(chan struct{}, 2)
go func() {
io.Copy(outbound, inbound)
if tc, ok := outbound.(*net.TCPConn); ok {
tc.CloseWrite()
}
done <- struct{}{}
}()
go func() {
io.Copy(inbound, outbound)
if tc, ok := inbound.(*net.TCPConn); ok {
tc.CloseWrite()
}
done <- struct{}{}
}()
for i := 0; i < 2; i++ {
select {
case <-done:
case <-ctx.Done():
return
}
}
}
// EnableFunnel starts a Funnel endpoint on the Outbound.
// It sets IngressEnabled in Hostinfo and starts the TCP proxy.
func (o *Outbound) EnableFunnel(ctx context.Context, opts FunnelOptions) error {
o.mu.Lock()
if o.tnet == nil {
o.mu.Unlock()
return fmt.Errorf("outbound not started")
}
if o.funnel == nil {
o.funnel = NewFunnelServer(o.tnet, o.logf)
}
o.mu.Unlock()
// Re-register with IngressEnabled
if err := o.reRegisterWithIngress(ctx, true); err != nil {
return fmt.Errorf("funnel: re-register: %w", err)
}
return o.funnel.EnableFunnel(ctx, opts)
}
// DisableFunnel stops a Funnel endpoint.
func (o *Outbound) DisableFunnel(ctx context.Context, port uint16) error {
o.mu.Lock()
f := o.funnel
o.mu.Unlock()
if f == nil {
return fmt.Errorf("funnel not initialized")
}
if err := f.DisableFunnel(port); err != nil {
return err
}
// If no more active ports, disable ingress
if len(f.ActivePorts()) == 0 {
o.reRegisterWithIngress(ctx, false)
}
return nil
}
// FunnelAddrs returns the public Funnel addresses for this node.
// These are the Tailscale DNS names that external clients can connect to.
func (o *Outbound) FunnelAddrs() []string {
o.mu.Lock()
defer o.mu.Unlock()
nm := o.control.NetMap()
if nm == nil || nm.Node == nil {
return nil
}
var addrs []string
name := nm.Node.Name
if name == "" {
return nil
}
// Strip trailing dot
if name[len(name)-1] == '.' {
name = name[:len(name)-1]
}
if o.funnel != nil {
for _, port := range o.funnel.ActivePorts() {
if port == 443 {
addrs = append(addrs, fmt.Sprintf("https://%s", name))
} else {
addrs = append(addrs, fmt.Sprintf("https://%s:%d", name, port))
}
}
}
return addrs
}
// reRegisterWithIngress re-registers with the control plane, toggling IngressEnabled.
func (o *Outbound) reRegisterWithIngress(ctx context.Context, ingress bool) error {
o.logf("funnel: re-registering with IngressEnabled=%v", ingress)
// Build a new register request with updated Hostinfo
regReq := o.control.buildRegisterRequest(ingress)
body, err := encodeJSON(regReq)
if err != nil {
return err
}
resp, err := o.control.noise.Post(ctx, "/machine/register", body)
if err != nil {
return err
}
defer resp.Body.Close()
respBody, err := readLimitedBody(resp)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("re-register: HTTP %d: %s", resp.StatusCode, respBody)
}
o.logf("funnel: re-registered successfully, ingress=%v", ingress)
return nil
}
// SelfTailscaleIPs returns this node's Tailscale IP addresses.
func (o *Outbound) SelfTailscaleIPs() []netip.Addr {
o.mu.Lock()
defer o.mu.Unlock()
var addrs []netip.Addr
for _, pfx := range o.selfAddrs {
addrs = append(addrs, pfx.Addr())
}
return addrs
}