DanLing Runner¶
DanLing runners are now Torch-only and focus on scalable distributed training.
The new design borrows from TorchTitan’s Trainer philosophy:
- A single, explicit training lifecycle
- Backend-agnostic extension through
Runner, with opt-in stack-specific subclasses when needed - Backend-specific distributed logic in dedicated subclass runners
- Minimal shared contracts in
BaseRunner
Core APIs¶
RunnerConfig:chanfig-based long-term configuration surfaceRunner: stack-selecting backend-agnostic entrypoint (ddp/torch,deepspeed/ds,parallel; defaultddp)TorchRunner: shared Torch training core with explicit epoch-mode and step-mode loopsTorchRunner/ParallelRunner/DeepSpeedRunner: class-based stack runners
Parallel topology is configured through semantic axes:
| YAML | |
|---|---|
Data loading and metric reduction are scoped to the logical data domain.
Middle pipeline stages automatically use step-only proxy loaders so pipeline schedules
preserve step count and dataloader state without consuming real input batches locally.
Optimizer control decisions, such as non-finite-gradient skip, are reduced over
the optimizer domain covering all configured parallel axes so stages/shards do
not diverge on step/skip boundaries.
TorchRunner-based stacks build torchdata.stateful_dataloader.StatefulDataLoader
by default so train dataloader progress is checkpointable across restart paths.
Explicit user-provided dataloaders are not rewrapped.
ParallelRunner exposes topology-, pipeline-, and FSDP-aware extension points:
build_topology: validate/construct ordered parallel axes from config + world stateinit_model_parallel_groups: initialize model-parallel process groups/device meshparallelize_model: apply model-specific TP/CP/EP transforms before compile/FSDP wrappingbuild_pipeline_schedule: buildtorch.distributed.pipeliningschedule from configbind_pipeline_modules: bind local model parts to pipeline stagesapply_activation_checkpointing: wrap local model parts before compile/FSDP wrappingfsdp_kwargs/build_mixed_precision_policy/build_offload_policy: customize FSDP wrapping policybuild_optimizer: deduplicate parameters across local model parts before optimizer build
If any of parallel.axes.tensor, parallel.axes.context, parallel.axes.expert,
or parallel.axes.expert_tensor is greater than 1, the model must either expose
model.parallelize(parallel_context) or the runner must override
parallelize_model. DanLing intentionally does not silently no-op model-parallel
axes, because TP/CP/EP transforms are model-architecture specific.
BaseRunner contract¶
BaseRunner is inheritance-first and keeps only cross-backend functionality.
There is no plugin/spec registry layer.
Construction is intentionally direct: subclasses set concrete components in
__init__, call super().__init__(config), then assign model/data/optimizer
objects. MetaRunner invokes __post_init__ after subclass construction so
concrete runners can materialize models, optimizers, checkpoint state, and
metadata once user-owned attributes are present.
Execution API stubs in BaseRunner (all overridable):
train/train_epoch/train_steps/train_stepevaluate/evaluate_epoch/evaluate_steps/evaluate_stepinfer
Step-mode semantics:
traindispatches totrain_stepswhenepochsis unset (is_step_mode=True).TorchRunnerkeeps epoch-mode and step-mode loops separate on purpose to keep control flow local and override points explicit.evaluate_steps(split, steps=None)is always bounded-step evaluation. Whenstepsis omitted, it defaults tomax(steps_budget // 20, 1).
Checkpoint/score contracts (all overridable):
load_modelread_checkpointload_optimizer/load_schedulerload_state_dictscoresbest_index
Launch modes¶
- Scratch:
runner = Runner(config) - Resume:
runner = Runner.from_checkpoint(checkpoint)orrunner.load_checkpoint(checkpoint) - Finetune:
runner = Runner.from_pretrained(config, checkpoint)orrunner.load_pretrained(checkpoint)
Config source hints:
config.auto_resume: when enabled, auto-resume from backend latest checkpoint sourceconfig.resume: source used for full-state resume workflowsconfig.pretrained: source used for model-only initialization workflowsconfig.checkpoint: checkpoint policy only (backend/async/interval/retention), never a source path
Source precedence:
resume>auto_resume>pretrained
Checkpoint semantics¶
The user-facing checkpoint contract is intentionally small:
latest: the most recent persisted full-state checkpoint.best: the best persisted full-state checkpoint, updated only whensave_best=Trueand the current score is best.- History checkpoints: periodic snapshots created by the training loop.
- epoch mode:
ckpt-e{completed_epoch:06d} - step mode:
ckpt-s{global_step:012d} checkpoint.keep_latest_k: number of framework-generated historical checkpoints to retain.0disables automatic retention pruning. When enabled, currentlatestandbesttargets are protected.
Periodic checkpoint attempts are controlled by checkpoint_interval.
When checkpoint_interval is unset, runner defaults are used by mode (epochs: every epoch, steps: budget/20).
Forced checkpoints, including final and graceful-shutdown checkpoints, bypass the interval and update latest.
Backend storage is an implementation detail:
- File backend stores
latest.pth,best.pth, andckpt-*.pth.bestand history files are updated fromlatestvia hardlink-first aliasing with copy fallback. It is intended for local, single-process, and small-run workflows. In async mode it snapshots live tensors before background writes; use DCP for distributed or large-model training. - DCP backend exposes the same logical names (
latest,best,ckpt-*) but stores immutable physical target directories plus pointer files. This avoids overwriting checkpoints while async writers or external readers may still hold references. - DeepSpeed backend stores engine checkpoint tag directories plus pointer files for the same logical names.
- Retention policy is enforced by DanLing-managed file and DCP checkpoint managers.
Rank participation contract:
BaseRunner.save_checkpointis main-process only and backs file-style single-writer saves.TorchRunner.save_checkpointbypasses that main-process guard whencheckpoint.backend="dcp"; all ranks must participate in DCP saves.DeepSpeedRunner.save_checkpointuses DeepSpeed engine checkpointing; all ranks participate in those saves.- When
config.ft.enabled=TrueunderTorchRunner/DDP orParallelRunnerFSDP, full DCP saves are gated to the TorchFT participating replica group. Other replica groups save only their per-replica dataloader state.
Checkpoint persistence internals are implemented under danling.runners.checkpoints:
CheckpointManager: base contractFileCheckpointManager: default async filesystem manager with reliable history/best queueing plus latest-wins coalescing for non-critical savesTorchDistributedCheckpointManager: Torch DCP backend used whencheckpoint.backend="dcp"inTorchRunner
ParallelRunner forces checkpoint.backend="dcp" by default.
Dataloader state¶
Full-state checkpoints include split-keyed dataloader progress under the top-level
dataloaders payload.
TorchRunner.build_dataloaders()usesStatefulDataLoaderfor datasets that have not already been materialized.- User-provided dataloaders participate when they implement
state_dict()andload_state_dict(). - Non-stateful dataloaders are skipped rather than wrapped or special-cased.
BaseRunner.load_checkpoint()restores dataloaders after model/EMA/optimizer/scheduler state.- DCP checkpoints carry dataloader state in the main checkpoint payload.
- With TorchFT enabled, per-replica dataloader checkpoints take precedence over the main checkpoint dataloader payload during restore.
ParallelRunnermiddle-stage proxy loaders delegatestate_dict()andload_state_dict()to the real first-stage loader while yielding placeholders locally.
TorchFT¶
DanLing has an opt-in TorchFT integration for replicated-weight runners:
- enable with
config.ft.enabled=True - configure replica layout with:
ft.replica_idft.group_sizeft.min_replica_sizeft.process_groupft.process_group_timeout_ms- requires the external
torchftpackage and a TorchFT lighthouse setup
Current scope:
- supported:
TorchRunner/DDP,ParallelRunnerFSDP without tensor/pipeline axes - not yet supported:
ParallelRunnertensor/pipeline runs,DeepSpeedRunner
Current launch contract:
- launch one
torchrunper replica group WORLD_SIZEis replica-local data-parallel size, whileft.group_sizecounts replica groups- a single global
torchrunspanning all replica groups is not a supported TorchFT launch mode
When TorchFT is enabled:
- dataloader sharding and seed bias are expanded across replica groups using
ft.replica_idandft.group_size - FSDP2 replicated-dimension all-reduce hooks use TorchFT-managed process groups
- loss/normalizer logging reductions use the TorchFT managed replica group when available
- per-replica dataloader checkpoints are enabled automatically through the DCP checkpoint manager and win over main-checkpoint dataloader state on restore
Distributed state-dict invariants:
- Parallel distributed runs require DCP state-dict APIs for model/optimizer restore.
- Checkpoint restore order is model -> EMA -> optimizer -> scheduler -> dataloaders.
- File checkpoint fallback is only for non-distributed/basic flows.
Async policy is configured by checkpoint.async_mode:
disabled: synchronous checkpoint writesasync: asynchronous uploadsasync_with_pinned_mem: process-based async uploads with staging hook support (maybe_wait_for_staging)
When checkpoint.async_mode is unset, DanLing falls back to legacy checkpoint_async (True -> async, False -> disabled).
Directory hierarchy¶
DanLing now uses a stable run layout with per-attempt timestamps:
workspace_root/lineage[-code_id]/id
lineage: top-level lineage namespacecode_id: git code identity (<short_sha>for clean trees,<short_sha>-d<diff_sha10>when dirty) appended to lineage when availableexperiment: experiment namespaceconfig_id: deterministic hash of canonical config (hash(config)derived)id: stable run identifier, defaults tocode_id-config_idwhen git metadata is available andconfig_idotherwise-
timestamp: per-attempt runtime timestamp By default: -
dirpoints toworkspace_root/lineage[-code_id]/id - checkpoints and result snapshots are stored at
dir - logs are stored as
dir/logs/{timestamp}.log - tensorboard logs are stored as
dir/tensorboard/{timestamp} - W&B local run files default to
dir/wandb/when W&B logging is enabled - reproducibility artifacts are stored at
dir/metadata: config.full.yamlconfig.canonical.yamlconfig.diff.yaml(diff against baseline/default config)git.yamlgit.diff- logging is initialized on the global main process only
id is stable across retries for the same run. timestamp changes for each attempt.
Runner state layout¶
BaseRunner uses a small checkpointed runtime boundary:
runner: config snapshot used for semantic resume validationstate:RunnerStatetraining metadata (train,elastic,rng)dataloaders: split-keyed progress for stateful dataloaders
Concrete runners and checkpoint managers add model/EMA/optimizer/scheduler payloads
around that shared boundary. Runtime metadata (timestamp, mode) and live
trainable/data/metric objects (model, optimizer, datasets, dataloaders,
results, meters, etc.) stay as direct runner attributes for minimal
indirection in hot paths.
Runner Construction¶
TorchRunner is explicit-first: define components in your runner __init__.
Example:
For class-based stack presets:
Pipeline-aware hooks¶
TorchRunner exposes core training hooks that can be overridden:
train_context
ParallelRunner wires pipeline execution through build_pipeline_schedule and
bind_pipeline_modules, and routes train/eval/infer paths based on whether a
pipeline schedule is active.
When parallel.axes.pipeline > 1 and no explicit schedule is provided, ParallelRunner now
builds a torch.distributed.pipelining schedule internally from
config.parallel.pipeline_schedule (default: "1F1B"). The default is intentionally
non-interleaved for current pipeline+FSDP stability, with migration to
"Interleaved1F1B" planned after upstream issue
pytorch/pytorch#164756 is resolved.
Platform support¶
Supported runner stacks:
| Stack | Checkpoint path | Stateful dataloaders | TorchFT runtime |
|---|---|---|---|
torch / ddp |
file default, DCP opt-in | default for built loaders | supported |
parallel |
DCP default | default plus pipeline proxy state delegation | FSDP-only supported |
deepspeed / ds |
DeepSpeed engine checkpoint, file-style aliases | default for built loaders via client state | not supported |