Pull-Based Streaming: Software Pacing, Startup Buffering, and the State-Struct Pattern - Part 2 of 6

Part 2 of 6: Audio/Video Streaming with Swift and GStreamer

In Part 1, we built a GStreamer pipeline that produces encoded video frames and decoded audio samples. Two appsink elements sit at the end of the pipeline, waiting for someone to pull data from them. This post covers how those pull loops work, how they pace output at real time, and how 1,000+ lines of async streaming logic stays maintainable through a pattern we call the state-struct.

Why Pull-Based?

GStreamer offers two models for getting data out of a pipeline. In the push model, you attach a callback to the appsink and GStreamer calls you whenever a buffer is ready. In the pull model, you call gst_app_sink_try_pull_sample on your own schedule.

We use the pull model for two reasons. First, it gives us control over pacing. The pipeline produces data as fast as the encoder can emit it; someone needs to regulate when frames actually leave the server. Second, Swift's structured concurrency maps cleanly onto pull loops. Each loop is a Task that runs for the lifetime of the session, pulling samples and yielding them into an AsyncStream that the networking layer consumes.

                    GStreamer Pipeline
                          │
             ┌────────────┴────────────┐
             ▼                         ▼
        ┌─────────┐             ┌─────────┐
        │ Video   │             │ Audio   │
        │ AppSink │             │ AppSink │
        └────┬────┘             └────┬────┘
             │                       │
      ┌──────▼──────┐        ┌──────▼──────┐
      │ Video Pull  │        │ Audio Pull  │
      │   Loop      │        │   Loop      │
      │  (Task)     │        │  (Task)     │
      └──────┬──────┘        └──────┬──────┘
             │                       │
      ┌──────▼──────┐        ┌──────▼──────┐
      │ AsyncStream │        │ AsyncStream │
      │  (video)    │        │  (audio)    │
      └──────┬──────┘        └──────┬──────┘
             │                       │
             └────────┬──────────────┘
                      ▼
              WebSocket Send Tasks

The State-Struct Pattern

The video pull loop is approximately 1,000 lines long. The audio pull loop is over 1,100. Without structure, these would be unreadable. The pattern we use to manage this complexity is simple: every piece of mutable state that a loop iteration might read or modify lives in a dedicated struct, and each phase of the loop is a small helper function that takes that struct as inout.

private struct VideoPullLoopState {
    var initSent: Bool
    var sampleCount: UInt64 = 0
    var nilPullCount: UInt64 = 0
    var startupBoostCompleted: Bool
    var startupBoostFramesEmitted: UInt64 = 0
    var streamYieldEnqueuedCount: UInt64 = 0
    var streamYieldDroppedCount: UInt64 = 0
    let startupBoostDurationSeconds: Double
    // ... ~15 more fields
}

func pullVideoSamplesLoop() {
    var state = VideoPullLoopState(
        initSent: hasSentInitialStreamInitialization,
        startupBoostCompleted: startupTransportBoostDurationSeconds <= 0,
        startupBoostDurationSeconds: startupTransportBoostDurationSeconds
    )

    while isRunning {
        let sample = pullSampleFromAppSink()

        guard let sample else {
            handleNilVideoPull(state: &state)
            continue
        }

        ensureStreamInitialization(state: &state, sample: sample)
        guard let pts = resolveVideoPTS(buffer: buffer) else { continue }

        if evaluateVideoStartupGates(state: &state, pts: pts) { continue }

        trackVideoFrontier(presentationTimestamp: pts)
        establishVideoStartupAnchor(state: &state, pts: pts, sample: sample)

        let waitTime = computeVideoPacingWait(state: &state, pts: pts)
        if waitTime > 0.001 {
            try await Task.sleep(nanoseconds: UInt64(waitTime * 1_000_000_000))
        }

        let avccData = convertAnnexBToAVCC(data)
        emitVideoFrame(state: &state, data: avccData, pts: pts)
    }
}

Each helper is 20 to 40 lines. Each one reads and writes to state and to the actor's own properties. The main loop reads like a pipeline: pull, validate, gate, anchor, pace, convert, emit. If something goes wrong with pacing, you look at computeVideoPacingWait. If startup is broken, you look at evaluateVideoStartupGates. You do not re-read a 1,000-line function to find the relevant section.

The audio loop follows the same structure with its own state struct:

private struct AudioPullLoopState {
    enum TimelineMode { case pts, frameCount }
    enum PayloadMode { case pcmFloat32, aacAdts }
    enum GateAction { case pass, drop, hold }

    var audioFormatQueried = false
    var payloadMode: PayloadMode = .pcmFloat32
    var audioTimelineMode: TimelineMode
    var startupAudioAlignmentResolved = false
    var startupBoostCompleted: Bool

    // Coalescing
    var pendingAudioPayload = Data()
    var pendingAudioStartPTS: Double?
    let coalescedAudioWallTimeTarget: TimeInterval = 0.200  // 200ms
    // ... ~25 more fields
}

Software Pacing

The pipeline runs with sync=false on both appsinks, meaning GStreamer does not pace output to real time. The encoder will produce frames as fast as it can. If we forwarded frames at that rate, the client's buffer would fill instantly and we would have no control over network utilization.

Instead, each pull loop computes a wall-clock target for each frame and sleeps until that target arrives. The calculation is straightforward:

                  Wall Clock
                  ──────────────────────────────────▶
                  │                                 │
   Pacing Start   │         relativePTS             │  Now
   ───────────────┼─────────────────────────────────┼───
                  │                                 │
                  │◀────── targetTime ─────────────▶│
                  │                                 │
                  │  if now < targetTime:            │
                  │    sleep(targetTime - now)       │

In code:

private func computeVideoPacingWait(
    state: inout VideoPullLoopState,
    presentationTimestamp: Double
) -> Double {
    // Initialize pacing on first frame
    if !videoPacingInitialized {
        videoPacingFirstPTS = presentationTimestamp
        videoPacingStartTime = Date()
        videoPacingInitialized = true
    }

    let relativePTS = presentationTimestamp - videoPacingFirstPTS

    // ... startup boost check (see below) ...

    let targetTime = videoPacingStartTime!.addingTimeInterval(relativePTS)
    let now = Date()
    if now < targetTime {
        let waitTime = targetTime.timeIntervalSince(now)
        if waitTime > 0.001 {
            return waitTime
        }
    }
    return 0
}

This produces output at exactly 1x real-time speed. A frame with PTS 5.0 seconds into the stream is emitted 5.0 seconds after pacing started. The approach has a few important properties:

  • It is self-correcting. If a sleep runs slightly long, the next frame's target time is unchanged; the loop catches up naturally.
  • It tolerates variable encode time. The encoder may take 8ms for one frame and 25ms for the next. The sleep adjusts accordingly.
  • It decouples pacing from the pipeline clock. GStreamer's internal clock is not involved; the server's wall clock drives timing.

The audio loop uses the same logic with its own audioPacingStartTime and audioPacingFirstPTS.

Startup Buffering and the Burst Phase

When a client starts playback (or seeks to a new position), it needs several seconds of data before it can begin rendering. If the server paces from the first frame, the client must wait in real time for those seconds to accumulate. The user would stare at a spinner for 2 to 3 seconds before video appears.

The solution is a startup boost: a burst phase where pacing sleeps are skipped entirely, flooding the client's buffer as fast as the encoder and network allow. Once enough data has been sent, the server transitions to real-time pacing.

  Time ──────────────────────────────────────────────────▶

  ┌─────────────────────┐┌──────────────────────────────┐
  │   Startup Boost     ││     Real-Time Pacing         │
  │   (no sleeps)       ││     (sleep to 1x)            │
  │                     ││                              │
  │  Frames emitted at  ││  Frames emitted at           │
  │  encode speed       ││  presentation rate           │
  └─────────────────────┘└──────────────────────────────┘
         5 seconds                rest of stream

The boost window defaults to 5 seconds. But there is a subtlety: if the network is slower than the encoder, the output AsyncStream fills up and starts dropping frames. Dropping encoded frames during startup defeats the entire purpose. So the boost is queue-pressure aware:

if !state.startupBoostCompleted {
    let reachedBoostWindow = relativePTS >= state.startupBoostDurationSeconds
    let queuePressureActive = isStartupBoostQueuePressureActive(
        remainingCapacity: startupVideoStreamRemainingCapacity,
        configuredCapacity: startupVideoStreamBufferCapacity
    )

    if !reachedBoostWindow && !queuePressureActive {
        state.startupBoostFramesEmitted += 1
        return 0  // No wait; emit at full speed
    } else {
        state.startupBoostCompleted = true
        // Re-anchor pacing so we don't inherit a multi-second lead
        videoPacingStartTime = Date().addingTimeInterval(-relativePTS)
    }
}

Queue pressure is detected by checking how full the AsyncStream buffer is:

func isStartupBoostQueuePressureActive(
    remainingCapacity: Int?,
    configuredCapacity: Int
) -> Bool {
    guard configuredCapacity > 0, let remainingCapacity else { return false }
    let fillRatio = 1.0 - (Double(max(0, remainingCapacity)) / Double(configuredCapacity))
    return fillRatio >= startupTransportBoostQueueFillHighWatermark  // 0.80
}

When the stream buffer reaches 80% capacity, the boost ends and real-time pacing takes over. The re-anchoring step is critical: we set videoPacingStartTime to now - relativePTS so that the pacing reference point aligns with the current wall clock. Without this, the transition from boost to real-time pacing would calculate a massive sleep (the server is now 5 seconds "ahead" of real time), and the client would freeze.

Transport Buffer Sizing

The AsyncStream buffer capacities are not fixed constants. They are computed from the source media's frame rate and the startup boost window:

let startupWindowWithSafety = boostWindowSeconds + safetySeconds  // 5 + 2 = 7s
let framesPerSecond = sourceFrameRate  // e.g., 23.976

let computedVideoCapacity = Int(ceil(startupWindowWithSafety * framesPerSecond))
let extraHeadroom = max(12, Int(ceil(framesPerSecond * 0.5)))
let targetVideoCapacity = clamp(computedVideoCapacity + extraHeadroom, min: 60, max: 900)

A 24 fps source gets a capacity of about 180 frames. A 60 fps source gets about 450. The headroom fraction adds half a second of extra capacity to absorb timing jitter during the burst-to-pacing transition.

Audio capacity follows the same logic, adjusted for packet duration (24ms for raw GStreamer buffers, 200ms for coalesced packets).

PTS Resolution and Interpolation

Not every GStreamer buffer carries a valid PTS. Some encoders emit frames with PTS = GST_CLOCK_TIME_NONE (represented as UInt64.max in the C API). When this happens, we interpolate from the last valid PTS and the expected frame duration:

private func resolveVideoPTS(
    buffer: UnsafeMutablePointer<GstBuffer>
) -> Double? {
    let pts = buffer.pointee.pts

    if pts == UInt64.max {
        // Invalid PTS; interpolate
        if let frameDuration = videoExpectedFrameDuration, videoLastValidPTS > 0 {
            let interpolated = videoLastValidPTS + frameDuration
            videoLastValidPTS = interpolated
            return interpolated
        }
        return nil  // Cannot interpolate yet
    }

    let timestamp = Double(pts) / Double(GST_SECOND_NS)

    // Learn frame duration from first two valid PTS values
    if videoLastValidPTS > 0 && videoExpectedFrameDuration == nil {
        let duration = timestamp - videoLastValidPTS
        if duration > 0 && duration < 0.5 {
            videoExpectedFrameDuration = duration
        }
    }

    videoLastValidPTS = timestamp
    return timestamp
}

The audio loop has a more elaborate version of this. Some containers (particularly legacy AVI with MPEG Layer audio) produce timestamps that jump backwards or drift relative to the expected sample duration. The audio PTS resolver can switch dynamically from raw PTS mode to a synthetic frame-count timeline, where each buffer's PTS is computed from the previous buffer's PTS plus its known duration:

if hasBackwardJump || hasTimingDrift {
    state.audioTimelineMode = .frameCount
    presentationTimestamp = lastPTS + lastFrameDuration  // Synthetic
}

This dual-mode approach means the server adapts to whatever the demuxer produces without needing per-container special cases in the timeline logic.

Startup Gating: The Keyframe Requirement

The video pull loop enforces two startup gates before it begins emitting frames.

Gate 1: Keyframe requirement. The first emitted frame must be a keyframe (H.264 IDR or HEVC CRA/IDR). The client's hardware decoder needs a keyframe to initialize its reference frame buffer. If the pipeline starts mid-GOP after a seek, the first frames will be delta frames that reference non-existent references. The gate drops them until a keyframe arrives.

Gate 2: BDMV pre-IDR gap. For Blu-ray content, a pre-scan (covered in Part 4) determines how many seconds of corrupt pre-IDR video the H.264 decoder will produce. The gate drops frames for that duration. This is specific to transport streams where the seek lands between the first video PES packet and the first IDR.

The Audio Gate

The audio loop has its own startup gate that prevents audio from running ahead of video. This matters because audio decoding is much faster than video decoding; without a gate, the server might emit 3 seconds of audio before the first video keyframe even appears.

The gate uses a three-phase decision:

private func evaluateAudioGate(
    state: inout AudioPullLoopState,
    rawPTS: Double?
) -> AudioPullLoopState.GateAction {
    // Phase 1: Video frontier not yet exposed
    guard let videoFrontierPTS = startupVideoFrontierAbsolutePTS else {
        return .hold   // Wait for video to report any progress
    }

    // Phase 2: Audio ahead of video frontier
    if let audioPTS = rawPTS,
       audioPTS > videoFrontierPTS + 0.100 {   // 100ms tolerance
        return .hold   // Wait for video to catch up
    }

    // Phase 3: Video keyframe not yet established
    if startupVideoRelativeOffset == nil {
        return .drop   // Discard pre-keyframe audio
    }

    return .pass
}

When the gate returns .hold, the audio loop saves the current sample for reuse on the next iteration and sleeps for 1ms before trying again. It does not drop the sample; it waits for video to advance. When the gate returns .drop, the sample is discarded. The gate also tracks the earliest raw audio PTS it observes during gating (startupAudioObservedOriginPTS), which the alignment logic in Part 4 uses to synchronize audio to video.

Annex B to AVCC Conversion

GStreamer's H.264 and HEVC parsers output NAL units in Annex B format, where each NAL is preceded by a 3-byte (0x000001) or 4-byte (0x00000001) start code. Apple's VideoToolbox expects AVCC format, where each NAL is preceded by a 4-byte big-endian length. The pull loop converts on the fly:

  Annex B:    00 00 00 01 [NAL data ...] 00 00 01 [NAL data ...]
                  │                          │
                  ▼                          ▼
  AVCC:       [4-byte len] [NAL data ...] [4-byte len] [NAL data ...]

During conversion, we also strip SPS/PPS/VPS parameter sets from non-initialization frames. The client receives those once in the StreamInitialization message; including them in every keyframe would waste bandwidth.

Realtime Failure Detection

Not all media can be transcoded in real time. A 4K HEVC source decoded in software and re-encoded to H.264 may only manage 18 fps on certain hardware. The pull loop monitors this:

func shouldStopForRealtimeTranscodeFailure(
    normalizedPTS: Double,
    elapsed: Double,
    sampleCount: UInt64
) -> Bool {
    guard elapsed >= 12.0 else { return false }          // Wait 12s minimum
    guard normalizedPTS >= 4.0 else { return false }     // Need 4s of video progress
    guard sampleCount >= 100 else { return false }       // Need 100 frames

    let realtimeRatio = normalizedPTS / elapsed
    return realtimeRatio < 0.70    // Below 70% of real-time
}

The guards prevent false positives during startup (where the burst phase makes the ratio look artificially high) and during brief encoder stalls. If the ratio stays below 0.70 for a sustained period, the session terminates with an error message explaining that the server cannot keep up.

Audio Packet Coalescing

GStreamer's audio decoders emit small buffers, typically 20-32ms each. At 48 kHz stereo Float32, that is about 7.5 KB per buffer, resulting in roughly 31 packets per second. Each packet carries overhead: protobuf serialization, WebSocket framing, and network stack processing.

The audio pull loop optionally coalesces these into larger bundles:

if state.enableAudioPacketCoalescing {
    state.pendingAudioPayload.append(data)
    state.pendingAudioFrameCount += frameCount

    let wallElapsed = Date().timeIntervalSince(state.pendingAudioBatchWallStart!)
    if wallElapsed >= 0.200 || state.pendingAudioPayload.count >= 2_097_152 {
        flushPendingAudioSample(state: &state)
    }
}

This reduces the packet rate from ~31/sec to ~5/sec. The flush triggers on wall time (200ms) or payload size (2 MB). A PTS discontinuity greater than 3ms also forces an immediate flush to avoid merging non-contiguous audio.

What Comes Next

We now have two pull loops producing paced, timestamped data: encoded video NAL units and processed audio samples. Both are yielded into AsyncStreams that the networking layer consumes and serializes as Protocol Buffer messages for the WebSocket connection.

But we have been carefully avoiding a topic: the audio processing chain and its signal-processing pitfalls. The next post digs into the audio side of the pipeline, from DC offset removal to the decoder that outputs silence instead of sound.