跳转至

BaseRunner

danling.runners.BaseRunner

Backend-agnostic runner state and orchestration utilities.

BaseRunner intentionally keeps only the shared runtime contract used by concrete runners such as TorchRunner:

  • configuration and process lifecycle bootstrap
  • datasets/dataloaders/result containers
  • checkpoint/result persistence helpers
  • progress and score bookkeeping

Concrete runners are expected to customize runtime behavior through the explicit training/checkpoint hooks below, not by overriding bootstrap internals.

Construction lifecycle:

  1. Normalize config and create RunnerState.
  2. Bind workspace, containers, default FileCheckpointManager, and supervisor.
  3. Call early service hooks in order: init_distributed, init_checkpoint_manager, init_fault_tolerance, init_garbage_collection.
  4. Apply seed/determinism policy.
  5. Initialize logging, TensorBoard/W&B, print routing, signal handlers, and heartbeat.
  6. MetaRunner calls __post_init__. Concrete runners such as TorchRunner materialize models, optimizers, schedulers, and resume checkpoints there before delegating back to BaseRunner.__post_init__ for metadata persistence.

Override rule: early hooks run while the runner is only partially constructed; model/runtime hooks run in concrete __post_init__; loop hooks (train_step, evaluate_step, infer_step) run after all runtime components are bound.

Attributes:

Name Type Description
state RunnerState

Checkpointable aggregate state object.

config RunnerConfig

Runner configuration.

train_state RunnerTrainState

Training progress counters.

elastic_state RunnerElasticState

Torchelastic restart metadata.

rng_state RunnerRNGState

Python/NumPy/Torch RNG snapshots.

datasets FlatDict

Dataset mapping keyed by split.

dataloaders FlatDict

Dataloader mapping keyed by split.

checkpoint_manager CheckpointManager

Active checkpoint backend manager.

workspace RunnerWorkspace

Workspace, logging, metadata, and print-routing helper.

supervisor RunnerSupervisor

Signal, heartbeat, and garbage-collection helper.

ft FaultTolerance | None

Optional fault-tolerance runtime handle.

Source code in danling/runners/base_runner.py
Python
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
class BaseRunner(metaclass=MetaRunner):
    """
    Backend-agnostic runner state and orchestration utilities.

    `BaseRunner` intentionally keeps only the shared runtime contract used by
    concrete runners such as `TorchRunner`:

    - configuration and process lifecycle bootstrap
    - datasets/dataloaders/result containers
    - checkpoint/result persistence helpers
    - progress and score bookkeeping

    Concrete runners are expected to customize runtime behavior through the
    explicit training/checkpoint hooks below, not by overriding bootstrap
    internals.

    **Construction lifecycle:**

    1. Normalize config and create `RunnerState`.
    2. Bind workspace, containers, default `FileCheckpointManager`, and
       supervisor.
    3. Call early service hooks in order: `init_distributed`,
       `init_checkpoint_manager`, `init_fault_tolerance`,
       `init_garbage_collection`.
    4. Apply seed/determinism policy.
    5. Initialize logging, TensorBoard/W&B, print routing, signal handlers, and
       heartbeat.
    6. `MetaRunner` calls `__post_init__`. Concrete runners such as
       `TorchRunner` materialize models, optimizers, schedulers, and resume
       checkpoints there before delegating back to `BaseRunner.__post_init__`
       for metadata persistence.

    **Override rule:** early hooks run while the runner is only partially
    constructed; model/runtime hooks run in concrete `__post_init__`; loop
    hooks (`train_step`, `evaluate_step`, `infer_step`) run after all runtime
    components are bound.

    Attributes:
        state: Checkpointable aggregate state object.
        config: Runner configuration.
        train_state: Training progress counters.
        elastic_state: Torchelastic restart metadata.
        rng_state: Python/NumPy/Torch RNG snapshots.
        datasets: Dataset mapping keyed by split.
        dataloaders: Dataloader mapping keyed by split.
        checkpoint_manager: Active checkpoint backend manager.
        workspace: Workspace, logging, metadata, and print-routing helper.
        supervisor: Signal, heartbeat, and garbage-collection helper.
        ft: Optional fault-tolerance runtime handle.
    """

    state: RunnerState
    config: RunnerConfig
    train_state: RunnerTrainState
    elastic_state: RunnerElasticState
    rng_state: RunnerRNGState

    model: Any | None = None
    ema: Any | None = None
    criterion: Callable | None = None
    optimizer: Any | None = None
    scheduler: Any | None = None

    datasets: FlatDict
    dataloaders: FlatDict
    split: str | None = None

    results: RoundDict
    meters: AverageMeters
    metrics: Any | None = None
    train_metrics: Any | None = None
    evaluate_metrics: Any | None = None

    logger: logging.Logger | None = None
    writer: Any | None = None
    wandb: Any | None = None

    checkpoint_manager: CheckpointManager
    workspace: RunnerWorkspace
    supervisor: RunnerSupervisor
    ft: FaultTolerance | None

    timestamp: str
    _print_process: int

    def __init__(self, config: RunnerConfig | Mapping[str, Any]) -> None:
        if not isinstance(config, RunnerConfig):
            config = RunnerConfig(config)

        state = RunnerState(config=config)
        self.state = state
        self.config = state.config
        self.train_state = state.train
        self.elastic_state = state.elastic
        self.rng_state = state.rng

        self.timestamp = get_time_str()
        self.workspace = RunnerWorkspace(self)
        self.name = str(self.config.get("name", f"{self.workspace.lineage}-{self.workspace.experiment}"))
        self.datasets = FlatDict()
        self.dataloaders = DataLoaderDict()
        self.results = RoundDict()
        self.meters = AverageMeters()
        self.mode = RunnerMode.train
        self.checkpoint_manager = FileCheckpointManager(self)
        self.supervisor = RunnerSupervisor(self)
        self.ft = None

        self.init_distributed()
        self.init_checkpoint_manager()
        self.init_fault_tolerance()
        self.init_garbage_collection()

        if self.config.seed is not None:
            self.set_seed()

        if self.config.deterministic:
            self.set_deterministic()

        if self.config.log:
            self.workspace.init_logging()

        if self.config.tensorboard:
            self.init_tensorboard()
        if self.config.get("wandb.enabled", False):
            self.init_wandb()

        self.workspace.init_print()
        self.init_signal_handlers()
        self.init_heartbeat()

    @property
    def world_size(self) -> int:
        """Distributed world size from environment."""

        return int(os.getenv("WORLD_SIZE", "1"))

    @property
    def rank(self) -> int:
        """Global rank from environment."""

        return int(os.getenv("RANK", "0"))

    @property
    def local_rank(self) -> int:
        """Local rank from environment."""

        return int(os.getenv("LOCAL_RANK", "0"))

    @property
    def distributed(self) -> bool:
        """Whether distributed mode is active."""

        return self.world_size > 1

    @property
    def is_main_process(self) -> bool:
        """Whether current rank is global main process."""

        return self.rank == 0

    @property
    def is_local_main_process(self) -> bool:
        """Whether current rank is local main process."""

        return self.local_rank == 0

    @cached_property
    def code_id(self) -> str | None:
        """Stable code identity for the current checkout."""

        return get_git_hash()

    @cached_property
    def config_id(self) -> str:
        """Stable semantic config identity for this runner."""

        return format(hash(self.config) & ((1 << 48) - 1), "012x")

    @property
    def id(self) -> str:
        """Stable run identity derived from code identity and semantic config."""

        if self.code_id is None:
            return self.config_id
        return f"{self.code_id}-{self.config_id}"

    def __post_init__(self) -> None:
        """Hook called after `__init__` by `MetaRunner`."""
        self.workspace.save_metadata()

    @cached_property
    def score_split(self) -> str | None:
        """Split used for best-score selection."""

        if "score_split" in self.config and self.config.score_split is not None:
            return self.config.score_split

        splits = self.evaluate_splits
        if not splits:
            return None
        for split in splits:
            if split.lower().startswith("val"):
                return split
        return splits[0]

    @property
    def scores(self) -> FlatDict | None:
        """Index-to-score mapping extracted from `score_split/score_name`."""

        if not self.results:
            return None

        score_split = self.score_split
        if score_split is None:
            return None

        scores = FlatDict()
        for index, result in self.results.items():
            if score_split not in result:
                continue
            split_result = result[score_split]
            if not isinstance(split_result, Mapping):
                continue
            if self.config.score_name not in split_result:
                continue
            scores[index] = split_result[self.config.score_name]

        return scores or None

    @property
    def best_index(self) -> int:
        """Best result index according to configured score metric."""

        if not self.scores:
            return 0

        scores = self.scores
        indices = list(scores.keys())
        reducer = min if self.config.score_name == "loss" else max
        return reducer(reversed(indices), key=scores.get)

    @property
    def latest_result(self) -> RoundDict | None:
        """Most recent appended result row."""

        if not self.results:
            return None

        latest_index = next(reversed(self.results))
        latest = self.results[latest_index]

        ret = RoundDict(latest)
        ret["index"] = latest_index
        return ret

    @property
    def best_result(self) -> RoundDict | None:
        """Best result row according to configured score metric."""

        if not self.results:
            return None

        best_index = self.best_index
        best = self.results[best_index]

        ret = RoundDict(best)
        ret["index"] = best_index
        return ret

    @property
    def latest_score(self) -> float | None:
        """Latest scalar score."""

        scores = self.scores
        if not scores:
            return None

        latest_index = next(reversed(scores))
        return scores[latest_index]

    @property
    def best_score(self) -> float | None:
        """Best scalar score."""

        if not self.scores:
            return None

        return self.scores[self.best_index]

    @property
    def is_best(self) -> bool:
        """Whether latest score matches current best score.

        Returns ``True`` only when comparable scalar scores are available and
        agree within tolerance. Returns ``True`` on the first iteration (no
        prior results), and ``False`` when scores cannot be resolved (e.g.,
        no `score_split`/`score_name` configured) — silently reporting best
        in that case would trigger phantom "best" checkpoint copies.
        """

        if not self.results:
            return True

        latest = self.latest_score
        best = self.best_score
        if latest is None or best is None:
            return False
        return abs(latest - best) < 1e-7

    def get_epoch_result(self) -> RoundDict:
        meter_result = self.meters.average()
        if self.metrics is None:
            return RoundDict(meter_result)
        merged = RoundDict(meter_result)
        for key, value in self.metrics.average().items():
            if isinstance(value, Mapping) and len(value) == 1:
                value = next(iter(value.values()))
            merged[key] = value
        return merged

    def get_step_result(self) -> RoundDict:
        meter_result = self.meters.value()
        if self.metrics is None:
            return RoundDict(meter_result)
        merged = RoundDict(meter_result)
        for key, value in self.metrics.value().items():
            if isinstance(value, Mapping) and len(value) == 1:
                value = next(iter(value.values()))
            merged[key] = value
        return merged

    def append_result(self, result: RoundDict | Mapping[str, Any], index: int | None = None) -> None:
        if index is None:
            index = self.train_state.epoch

        if not isinstance(result, RoundDict):
            result = RoundDict(result)

        if index in self.results:
            self.results[index].merge(result)
        else:
            self.results[index] = result

    def step_log(
        self,
        split: str,
        iteration: int,
        length: int | str | None = None,
        result: RoundDict[str, Any] | Mapping[str, Any] | None = None,
    ) -> RoundDict:
        if length is None:
            try:
                length = len(self.dataloaders[split]) - 1
            except (TypeError, NotImplementedError):
                length = "∞"

        if result is None:
            result = self.get_step_result()
        elif not isinstance(result, RoundDict):
            result = RoundDict(result)
        print(self.format_step_result(result, split, iteration, length))

        if self.mode == RunnerMode.train:
            self.write_result(result, split)

        return result

    def format_epoch_result(
        self,
        result: RoundDict[str, Any],
        epochs: int | None = None,
        total_epochs: int | None = None,
    ) -> str:
        epochs = self.train_state.epoch if epochs is None else epochs
        total_epochs = self.epochs if total_epochs is None else total_epochs

        prefix = ""
        if total_epochs is not None:
            prefix = f"epoch [{epochs + 1}/{total_epochs}]"

        return f"{prefix}{self.format_result(result)}"

    def format_step_result(self, result: RoundDict[str, Any], split: str, steps: int, length: int | str) -> str:
        if self.mode == RunnerMode.train:
            prefix = f"training on {split}"
        elif self.mode == RunnerMode.evaluate:
            prefix = f"evaluating on {split}"
        elif self.mode == RunnerMode.infer:
            prefix = f"inferring on {split}"
        else:
            prefix = f"running in {self.mode} on {split}"

        return f"{prefix} [{steps}/{length}]\t{self.format_result(result)}"

    def format_result(self, result: RoundDict[str, Any], format_spec: str = ".4f") -> str:
        return format_result(result, format_spec=format_spec)

    def flatten_result(self, result: Mapping[str, Any]) -> FlatDict[str, Any]:
        flat_result = FlatDict()

        def add_score(tag: str, score: Any) -> None:
            if isinstance(score, AverageMeter):
                score = score.avg

            if isinstance(score, Mapping):
                nested = RoundDict(score)
                nested.setattr("separator", "/")
                for nested_name, nested_score in nested.dict(flatten=True).items():
                    add_score(f"{tag}/{nested_name}", nested_score)
                return

            if isinstance(score, Sequence) and not isinstance(score, (str, bytes)):
                for idx, nested_score in enumerate(score):
                    add_score(f"{tag}/{idx}", nested_score)
                return

            flat_result[tag] = score

        flattened = RoundDict(result)
        flattened.setattr("separator", "/")
        for name, score in flattened.dict(flatten=True).items():
            add_score(str(name), score)

        return flat_result

    def write_result(self, result: RoundDict[str, Any], split: str, steps: int | None = None) -> None:
        if self.writer is None and self.wandb is None:
            return

        steps = self.train_state.global_step if steps is None else steps

        flat_result = self.flatten_result(result)

        for name, score in flat_result.items():
            self.write_score(name, score, split, steps)

        if self.wandb is not None:
            payload = {f"{split}/{name}": score for name, score in flat_result.items()}
            self.wandb.log(payload, step=steps)

    def write_score(self, name: str, score: float, split: str, steps: int) -> None:
        if self.writer is not None:
            self.writer.add_scalar(f"{split}/{name}", score, steps)

    @catch
    @on_main_process
    def save_result(self) -> None:
        if not self.latest_result:
            return
        payload = {
            "name": self.name,
            "id": self.id,
            "timestamp": self.timestamp,
            "results": round(self.results, 8),
        }
        self.save(payload, os.path.join(self.workspace.dir, "results.json"), indent=4)

        latest = round(self.latest_result, 8)
        latest_payload = {"name": self.name, "id": self.id, "timestamp": self.timestamp}
        latest_payload.update(dict(latest))

        latest_path = os.path.join(self.workspace.dir, "latest.json")
        self.save(latest_payload, latest_path, indent=4)

        if self.is_best:
            shutil.copy(latest_path, os.path.join(self.workspace.dir, "best.json"))

    def auto_restore(self) -> None:
        """Auto-load resume/pretrained sources declared in config.

        Precedence:
            `config.resume` > `config.auto_resume` > `config.pretrained`.
        """

        restore_target = self._resolve_auto_restore_target()
        if restore_target is None:
            return

        restore_kind, restore_source = restore_target
        if restore_kind == "checkpoint":
            self.load_checkpoint(restore_source)
            return
        self.load_pretrained(restore_source)

    def _resolve_auto_restore_target(self) -> tuple[str, Mapping[Any, Any] | PathStr] | None:
        resume_source = self.config.get("resume")
        auto_resume = bool(self.config.get("auto_resume", False))
        pretrained_source = self.config.get("pretrained")

        specified_count = int(bool(resume_source)) + int(auto_resume) + int(bool(pretrained_source))
        if specified_count > 2:
            warn(
                "`config.resume`, `config.auto_resume`, and `config.pretrained` are all set; "
                "precedence is `resume` > `auto_resume` > `pretrained`",
                RuntimeWarning,
                stacklevel=2,
            )

        if resume_source:
            return ("checkpoint", resume_source)

        if auto_resume:
            return ("checkpoint", self._auto_resume_source())

        if pretrained_source:
            return ("pretrained", pretrained_source)

        return None

    def _auto_resume_source(self) -> str:
        backend = str(self.config.get("checkpoint.backend", "auto")).strip().lower()
        if backend == "dcp":
            return os.path.join(self.workspace.checkpoint_dir, "latest")
        return os.path.join(self.workspace.checkpoint_dir, "latest.pth")

    def init_distributed(self) -> None:
        """
        Initialize the distributed environment.

        The default is a no-op (single-process). Concrete runners override
        this hook to initialize the torch.distributed process group; see
        [`TorchRunner.init_distributed`][danling.runners.TorchRunner.init_distributed]
        for the canonical specification.
        """

    def init_checkpoint_manager(self) -> None:
        """
        Bind the runner's checkpoint manager.

        The default is a no-op — `BaseRunner.__init__` already binds the
        `FileCheckpointManager`. Concrete runners override this hook to swap
        in the backend-appropriate manager via `set_checkpoint_manager(...)`;
        see
        [`TorchRunner.init_checkpoint_manager`][danling.runners.TorchRunner.init_checkpoint_manager]
        for the canonical specification.
        """

    def init_fault_tolerance(self) -> None:
        """Initialize optional fault-tolerance runtime support."""

        self.ft = FaultTolerance(self)

    def init_heartbeat(self) -> None:
        """Configure optional background heartbeat writer."""

        self.supervisor.init_heartbeat()

    def init_garbage_collection(self) -> None:
        """Configure optional runner-managed Python GC pacing."""

        self.supervisor.init_garbage_collection()

    def init_signal_handlers(self) -> None:
        """Install runner-owned signal handlers for graceful preemption."""

        self.supervisor.init_signal_handlers()

    def prepare_for_shutdown_checkpoint(self) -> None:
        """Finalize runner state before writing a forced shutdown checkpoint."""

    def set_checkpoint_manager(self, manager: CheckpointManager) -> None:
        current = getattr(self, "checkpoint_manager", None)
        if current is manager:
            return
        if current is not None:
            current.close(timeout=0.0)
        self.checkpoint_manager = manager

    @on_main_process
    def init_tensorboard(self, *args, **kwargs) -> None:
        """Initialize tensorboard writer."""

        warn(
            "tensorboard is enabled, but this runner does not initialize a tensorboard writer",
            RuntimeWarning,
            stacklevel=2,
        )

    @on_main_process
    def init_wandb(self, *args, **kwargs) -> None:
        """Initialize Weights & Biases run for scalar logging."""

        try:
            import wandb
        except ImportError as exc:
            raise RuntimeError("wandb is enabled, but the `wandb` package is not installed") from exc

        wandb_config = self.config.wandb
        if "project" not in kwargs:
            kwargs["project"] = wandb_config.get("project") or self.workspace.lineage
        if "entity" not in kwargs and wandb_config.get("entity") is not None:
            kwargs["entity"] = wandb_config.entity
        if "group" not in kwargs:
            kwargs["group"] = wandb_config.get("group") or self.workspace.experiment
        if "name" not in kwargs:
            kwargs["name"] = wandb_config.get("name") or self.id
        if "job_type" not in kwargs and wandb_config.get("job_type") is not None:
            kwargs["job_type"] = wandb_config.job_type
        tags = wandb_config.get("tags")
        if "tags" not in kwargs and tags is not None:
            kwargs["tags"] = [tags] if isinstance(tags, str) else list(tags)
        if "dir" not in kwargs:
            kwargs["dir"] = wandb_config.get("dir") or self.workspace.dir
        if "mode" not in kwargs and wandb_config.get("mode") is not None:
            kwargs["mode"] = wandb_config.mode
        if "config" not in kwargs:
            kwargs["config"] = self.config.dict()

        self.wandb = cast(Any, wandb).init(*args, **kwargs)

    def set_seed(self, seed: int | None = None, bias: int | bool | None = None) -> int:
        """Set python/numpy RNG seeds and snapshot RNG state.

        Args:
            seed: Base seed. Defaults to `self.config.seed`.
            bias: Optional per-process bias. `None` uses `self.rank`.

        Returns:
            The process-local seed after applying bias.
        """

        base_seed = self.config.seed if seed is None else seed
        if base_seed is None:
            raise ValueError("cannot set seed: no seed is configured and no seed argument was provided")
        base_seed = int(base_seed)

        self.config.seed = base_seed

        process_seed = base_seed
        if bias is None:
            bias = self.rank
        if bias:
            process_seed += int(bias)

        random.seed(process_seed)
        if np_random is not None:
            np_random.seed(process_seed)

        self.rng_state.python = random.getstate()
        self.rng_state.numpy = np_random.get_state() if np_random is not None else None
        return process_seed

    def set_deterministic(self) -> None:
        """Enable deterministic behavior in subclass-specific backends."""

    def train(self, *args, **kwargs):
        """Run top-level training workflow."""

        raise NotImplementedError

    def train_epochs(self, *args, **kwargs):
        """Run epoch-mode training workflow."""

        raise NotImplementedError

    def train_epoch(self, *args, **kwargs):
        """Run one training epoch on a split."""

        raise NotImplementedError

    def train_steps(self, *args, **kwargs):
        """Run step-mode training workflow."""

        raise NotImplementedError

    def train_step(self, *args, **kwargs):
        """
        Run one training micro-step.

        Concrete runners define the override contract; see
        [`TorchRunner.train_step`][danling.runners.TorchRunner.train_step] for
        the canonical specification.
        """

        raise NotImplementedError

    def backward(self, loss, *args, **kwargs) -> None:
        """Run backward pass for one micro-step loss."""

        raise NotImplementedError

    def step(self, *args, **kwargs) -> None:
        """Advance optimizer/scheduler state when accumulation is ready."""

        raise NotImplementedError

    def evaluate(self, *args, **kwargs):
        """Run top-level evaluation workflow."""

        raise NotImplementedError

    def evaluate_epoch(self, *args, **kwargs):
        """Run one full evaluation epoch on a split."""

        raise NotImplementedError

    def evaluate_steps(self, *args, **kwargs):
        """Run bounded evaluation steps on a split."""

        raise NotImplementedError

    def evaluate_step(self, *args, **kwargs):
        """
        Run one evaluation step.

        Concrete runners define the override contract; see
        [`TorchRunner.evaluate_step`][danling.runners.TorchRunner.evaluate_step]
        for the canonical specification.
        """

        raise NotImplementedError

    def infer(self, *args, **kwargs):
        """Run top-level inference workflow."""

        raise NotImplementedError

    def infer_step(self, *args, **kwargs):
        """
        Run one inference step.

        Concrete runners define the override contract; see
        [`TorchRunner.infer_step`][danling.runners.TorchRunner.infer_step] for
        the canonical specification.
        """

        raise NotImplementedError

    def unwrap(self, model: Any) -> Any:
        """Return an unwrapped model object."""

        return model

    def state_dict(self, cls: type = dict) -> Mapping:
        """
        Build the backend-neutral runner checkpoint payload.

        The base payload contains semantic runner config, mutable runner
        state, RNG snapshots, and dataloader resume state. Backend runners
        extend this payload with model/optimizer/scheduler state.

        **Called when:** checkpoint managers build a payload for
        `save_checkpoint`, and fault-tolerance callbacks need a runner state
        snapshot.

        Args:
            cls: Mapping factory used for nested payloads. Backends may pass
                `dict`-like containers to preserve their serialization format.

        Returns:
            Mapping with `runner`, `state`, and `dataloaders` keys.

        **Side effects:** snapshots Python and NumPy RNG state into
        `self.rng_state` before exporting.

        !!! danger "Do not"
            - Mutate model or optimizer state here.
            - Drop the `runner` config payload; resume validation depends on it.
            - Override without calling `super()` unless you fully replace the
              checkpoint format.
        """

        self.rng_state.python = random.getstate()
        self.rng_state.numpy = np_random.get_state() if np_random is not None else None

        state = self.state.state_dict()
        if cls is not dict:
            state = cls(state)

        dataloader_state = self.dataloaders.state_dict()
        if cls is not dict:
            dataloader_state = cls(dataloader_state)

        return cls(runner=self.config.dict(), state=state, dataloaders=dataloader_state)

    def load_state_dict(self, checkpoint: Mapping[str, Any]) -> None:
        """
        Restore backend-neutral runner state from a checkpoint payload.

        This restores semantic runner state and Python/NumPy RNG state. Model,
        EMA, optimizer, scheduler, and dataloader component loading is owned by
        `load_checkpoint`.

        **Called when:** `load_checkpoint` restores a full checkpoint, and
        fault-tolerance callbacks receive a runner state payload.

        Args:
            checkpoint: Mapping produced by `state_dict` or a backend-specific
                superset of that payload.

        Raises:
            ValueError: checkpoint runner config differs semantically from the
                current runner config.

        **Side effects:** updates `self.state`, `self.train_state`,
        `self.elastic_state`, `self.rng_state`, and process RNG state.

        !!! danger "Do not"
            - Load model/optimizer/scheduler state here; use component loaders
              through `load_checkpoint`.
            - Suppress semantic config diffs unless you also update the resume
              policy deliberately.
        """

        runner_config = checkpoint.get("runner")
        if runner_config is not None:
            checkpoint_config = RunnerConfig(runner_config).canonical()
            current_config = self.config.canonical()
            semantic_diff = NestedDict(checkpoint_config).diff(current_config).dict()
            if semantic_diff:
                raise ValueError(
                    "cannot load checkpoint: runner config is semantically different from current config; "
                    f"start a new experiment or align config. diff={semantic_diff}"
                )

        state_dict = checkpoint.get("state") or {}
        self.state.load_state_dict(dict(state_dict))

        rng_state = state_dict.get("rng")
        if isinstance(rng_state, Mapping) and "python" in rng_state and self.rng_state.python is not None:
            random.setstate(self.rng_state.python)

        if (
            np_random is not None
            and isinstance(rng_state, Mapping)
            and "numpy" in rng_state
            and self.rng_state.numpy is not None
        ):
            np_random.set_state(self.rng_state.numpy)

    @staticmethod
    def _normalize_checkpoint_exclude_path(path: str) -> tuple[str, ...]:
        aliases = {
            "data_loader": "dataloaders",
            "dataloader": "dataloaders",
            "lr_scheduler": "scheduler",
        }
        parts = tuple(part for part in str(path).split(".") if part)
        if not parts:
            return ()
        return (aliases.get(parts[0], parts[0]), *parts[1:])

    def checkpoint_exclude_from_loading(self) -> tuple[tuple[str, ...], ...]:
        excluded = self.config.get("checkpoint.exclude_from_loading")
        if excluded is None:
            return ()
        if isinstance(excluded, str):
            excluded = (excluded,)
        return tuple(
            normalized for path in excluded if (normalized := self._normalize_checkpoint_exclude_path(str(path)))
        )

    @staticmethod
    def _drop_checkpoint_path(checkpoint: dict[str, Any], path: Sequence[str]) -> None:
        if not path:
            return
        key = path[0]
        if len(path) == 1:
            checkpoint.pop(key, None)
            return
        child = checkpoint.get(key)
        if isinstance(child, Mapping):
            child_copy = dict(child)
            checkpoint[key] = child_copy
            BaseRunner._drop_checkpoint_path(child_copy, path[1:])

    def _filter_checkpoint_for_loading(
        self,
        checkpoint: Mapping[str, Any],
        excluded_paths: Sequence[Sequence[str]],
    ) -> dict[str, Any]:
        filtered = dict(checkpoint)
        for path in excluded_paths:
            self._drop_checkpoint_path(filtered, path)
        return filtered

    @staticmethod
    def _is_top_level_checkpoint_excluded(excluded_paths: Sequence[Sequence[str]], *keys: str) -> bool:
        key_set = set(keys)
        return any(len(path) == 1 and path[0] in key_set for path in excluded_paths)

    def save_checkpoint(
        self,
        name: str = "latest",
        epochs: int | None = None,
        save_best: bool = True,
        last_step: bool = False,
        force: bool = False,
    ) -> None:
        """
        Persist runner state through the active checkpoint manager.

        Backend collective semantics are owned by
        `checkpoint_manager.is_collective`. File-style managers save on the
        main process only; collective managers require every rank to enter this
        method together.

        **Called when:** training loops hit checkpoint cadence, final
        `last_step` saves run, or the supervisor handles a shutdown signal.

        Args:
            name: Logical checkpoint alias, usually `"latest"` or `"best"`.
            epochs: Epoch index used for history checkpoint naming. Defaults
                to `self.train_state.epoch`.
            save_best: Whether to publish/update the best-checkpoint alias
                when `self.is_best` is true.
            last_step: Whether this save is the final save for the run.
            force: Bypass cadence checks inside the manager.

        **Side effects:** delegates to
        `self.checkpoint_manager.save_checkpoint(...)`.

        !!! danger "Do not"
            - Add a main-process guard around calls to this method; DCP-style
              managers need all ranks to participate.
            - Bypass the checkpoint manager for normal runner checkpoints.
        """

        if not (self.is_main_process or self.checkpoint_manager.is_collective):
            return
        epochs = self.train_state.epoch if epochs is None else epochs
        self.checkpoint_manager.save_checkpoint(
            name=name,
            epochs=epochs,
            save_best=save_best,
            last_step=last_step,
            force=force,
        )

    def save_seed_checkpoint(self, name: str = "seed") -> None:
        """
        Persist an initialization checkpoint for cross-topology experiments.

        Seed checkpoints are intended to be created before training advances,
        then loaded with `checkpoint.load_only=True` or `resume`/`pretrained`
        when comparing different parallel layouts from the same initial model
        state. They are saved through the final-checkpoint path, so
        `checkpoint.last_save_model_only=True` intentionally applies.
        """
        if self.train_state.global_step != 0 or self.train_state.epoch != 0:
            warn(
                "save_seed_checkpoint() is intended before training advances; "
                f"current epoch={self.train_state.epoch}, global_step={self.train_state.global_step}",
                RuntimeWarning,
                stacklevel=2,
            )
        self.save_checkpoint(name=name, epochs=0, save_best=False, last_step=True, force=True)

    def load_checkpoint(
        self,
        checkpoint: Mapping | bytes | str | os.PathLike,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        """
        Restore a full runner checkpoint.

        This is the full-state restore path: runtime state, model/EMA,
        optimizer, scheduler, and dataloader progress are restored when present
        and applicable to the current runner.

        **Called when:** users resume a run explicitly, `auto_restore` selects
        a resume source, `from_checkpoint` constructs a runner, or
        fault-tolerance callbacks restore a full runner payload.

        Args:
            checkpoint: In-memory checkpoint mapping or backend-specific path.
            *args: Forwarded to `read_checkpoint` and component loaders.
            **kwargs: Forwarded to `read_checkpoint` and component loaders.

        Raises:
            ValueError: checkpoint is missing required component state for an
                initialized component, or config validation fails.

        **Side effects:** updates runner state, model/EMA weights, optimizer,
        scheduler, dataloader progress, and `config.resume` for path inputs.

        !!! danger "Do not"
            - Use this for model-only finetuning payloads; use
              `load_pretrained` instead.
            - Override just to support a new path type; prefer overriding
              `read_checkpoint`.
        """

        ckpt = self.read_checkpoint(checkpoint, *args, **kwargs)
        excluded_paths = self.checkpoint_exclude_from_loading()
        if excluded_paths:
            if self._is_top_level_checkpoint_excluded(excluded_paths, "runner"):
                warn(
                    "`checkpoint.exclude_from_loading` contains 'runner'; "
                    "semantic runner config validation will be skipped for this load.",
                    RuntimeWarning,
                    stacklevel=2,
                )
            ckpt = self._filter_checkpoint_for_loading(ckpt, excluded_paths)

        self.load_state_dict(ckpt)
        if not self._is_top_level_checkpoint_excluded(excluded_paths, "model", "model_parts", "module"):
            if "model" in ckpt:
                self.load_model(ckpt["model"], *args, **kwargs)
            elif "model_parts" in ckpt:
                self.load_model(ckpt["model_parts"], *args, **kwargs)
            elif self.model is not None:
                raise ValueError(
                    "cannot restore model: checkpoint has no model state\n"
                    "Use `load_pretrained` only for model-only checkpoints with model/ema payloads"
                )
        if not self._is_top_level_checkpoint_excluded(excluded_paths, "ema") and (
            self.ema is not None or "ema" in ckpt
        ):
            self.load_ema(ckpt.get("ema"), *args, **kwargs)
        if not self._is_top_level_checkpoint_excluded(excluded_paths, "optimizer") and (
            self.optimizer is not None or "optimizer" in ckpt
        ):
            self.load_optimizer(ckpt.get("optimizer"), *args, **kwargs)
        if not self._is_top_level_checkpoint_excluded(excluded_paths, "scheduler") and (
            self.scheduler is not None or "scheduler" in ckpt
        ):
            self.load_scheduler(ckpt.get("scheduler"), *args, **kwargs)
        if not self._is_top_level_checkpoint_excluded(excluded_paths, "dataloaders") and (
            self.dataloaders or "dataloaders" in ckpt
        ):
            self.load_dataloaders(ckpt.get("dataloaders"))
        if isinstance(checkpoint, (str, bytes, os.PathLike)):
            self.config.resume = os.fsdecode(checkpoint)

    @staticmethod
    def _require_checkpoint_component_state(component: str, state_dict: Any | None) -> Any:
        component_labels = {
            "ema": "EMA state",
            "optimizer": "optimizer state",
            "scheduler": "scheduler state",
        }
        if state_dict is None:
            component_label = component_labels.get(component, f"{component} state")
            raise ValueError(
                f"cannot restore {component}: checkpoint has no {component_label}\n"
                "Use `load_pretrained` for model-only checkpoints instead of `load_checkpoint`"
            )
        return state_dict

    def load_model(self, state_dict: Mapping[str, Any], *args, **kwargs) -> None:
        """Load model state."""
        if self.model is None:
            raise ValueError("cannot restore model: model is not initialized")
        self.unwrap(self.model).load_state_dict(state_dict, *args, **kwargs)

    def load_ema(self, state_dict: Mapping[str, Any] | None, *args, **kwargs) -> None:
        """Load EMA state."""
        if self.ema is None:
            return
        state_dict = self._require_checkpoint_component_state("ema", state_dict)
        self.ema.load_state_dict(state_dict, *args, **kwargs)

    def load_optimizer(self, state_dict: Mapping[str, Any] | None, *args, **kwargs) -> None:
        """Load optimizer state."""
        if self.optimizer is None:
            return
        state_dict = self._require_checkpoint_component_state("optimizer", state_dict)
        self.optimizer.load_state_dict(state_dict, *args, **kwargs)

    def load_scheduler(self, state_dict: Mapping[str, Any] | None, *args, **kwargs) -> None:
        """Load scheduler state."""
        if self.scheduler is None:
            return
        state_dict = self._require_checkpoint_component_state("scheduler", state_dict)
        self.scheduler.load_state_dict(state_dict, *args, **kwargs)

    def load_dataloaders(self, state_dict: Mapping[str, Any] | None) -> None:
        """Load dataloader progress state when the current runner has matching loaders."""
        if state_dict is None:
            return
        self.dataloaders.load_state_dict(state_dict)

    def load_pretrained(
        self,
        checkpoint: Mapping | bytes | str | os.PathLike,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        """
        Load model weights only from a checkpoint payload or path.

        When checkpoint payload provides EMA weights (`ema`), EMA is preferred as
        the pretrained source. Otherwise `model` is used.

        **Called when:** users initialize from pretrained weights, or
        `auto_restore` selects `config.pretrained`.

        Args:
            checkpoint: In-memory payload or backend-specific path containing
                `ema`, `model`, or `model_parts`.
            *args: Forwarded to `read_checkpoint` and `load_model`.
            **kwargs: Forwarded to `read_checkpoint` and `load_model`.

        Raises:
            ValueError: model is not initialized, or the payload has no usable
                model/EMA state.

        **Side effects:** loads model weights and updates `config.pretrained`
        for path inputs. Optimizer, scheduler, runner state, and dataloaders
        are intentionally untouched.

        !!! danger "Do not"
            - Use this to resume training state; use `load_checkpoint` for
              full-state restore.
            - Load optimizer/scheduler state in this path.
        """

        if self.model is None:
            raise ValueError("cannot load pretrained weights: model is not initialized")

        ckpt = self.read_checkpoint(checkpoint, *args, **kwargs)
        if ckpt.get("ema") is not None:
            self.load_model(ckpt["ema"], *args, **kwargs)
        elif "model" in ckpt:
            self.load_model(ckpt["model"], *args, **kwargs)
        elif "model_parts" in ckpt:
            self.load_model(ckpt["model_parts"], *args, **kwargs)
        else:
            raise ValueError(
                "cannot load pretrained weights: checkpoint has no EMA or model state\n"
                "Use `load_checkpoint` for full checkpoint restore instead of `load_pretrained`"
            )
        if isinstance(checkpoint, (str, bytes, os.PathLike)):
            self.config.pretrained = os.fsdecode(checkpoint)
        else:
            self.config.pretrained = None

    @classmethod
    def from_checkpoint(cls, checkpoint: Mapping | bytes | str | os.PathLike, *args, **kwargs) -> BaseRunner:
        """Instantiate runner from checkpoint config and restore full state."""

        config = cls.read_config(checkpoint, *args, **kwargs)
        config.resume = None
        config.auto_resume = False
        config.pretrained = None
        runner = cls(config)
        runner.load_checkpoint(checkpoint, *args, **kwargs)
        return runner

    @classmethod
    def read_config(
        cls,
        checkpoint: Mapping | bytes | str | os.PathLike,
        *args,
        **kwargs,
    ) -> RunnerConfig:
        """
        Read runner config from checkpoint mapping or file path.

        Note:
            BaseRunner only accepts file checkpoints for path input.
            Backend-specific directory checkpoints must be handled in subclasses.
        """

        if isinstance(checkpoint, Mapping):
            ckpt = checkpoint
        elif isinstance(checkpoint, (bytes, str, os.PathLike)):
            checkpoint_id = os.fspath(checkpoint)
            if os.path.isfile(checkpoint_id):
                kwargs = dict(kwargs)
                kwargs["map_location"] = "cpu"
                kwargs["weights_only"] = False
                ckpt = load(checkpoint, *args, **kwargs)
            else:
                raise ValueError(
                    f"cannot read config from checkpoint path for {cls.__name__}: path must be a file; "
                    "use a backend-specific runner for directory-style checkpoints"
                )
        else:
            raise ValueError(
                "invalid checkpoint input: expected a mapping or path, "
                f"got {type(checkpoint).__name__}: {checkpoint!r}"
            )

        if "runner" not in ckpt:
            raise ValueError(
                "cannot read runner config: checkpoint is missing key 'runner'; "
                "use from_pretrained(...) for model-only checkpoints"
            )
        return RunnerConfig(ckpt["runner"])

    @classmethod
    def from_pretrained(
        cls,
        config: RunnerConfig | Mapping[str, Any],
        checkpoint: Mapping | bytes | str | os.PathLike,
        *args,
        **kwargs,
    ) -> BaseRunner:
        """Build a runner from config and load model weights only."""

        prepared = RunnerConfig(config)
        prepared.resume = None
        prepared.auto_resume = False
        prepared.pretrained = None
        runner = cls(prepared)
        runner.load_pretrained(checkpoint, *args, **kwargs)
        return runner

    def read_checkpoint(self, checkpoint: Mapping | bytes | str | os.PathLike, *args, **kwargs) -> Mapping[str, Any]:
        """Normalize checkpoint input into an in-memory mapping payload."""
        if isinstance(checkpoint, (bytes, str, os.PathLike)):
            kwargs = dict(kwargs)
            kwargs["map_location"] = "cpu"
            kwargs["weights_only"] = False
            return load(checkpoint, *args, **kwargs)
        if isinstance(checkpoint, Mapping):
            return checkpoint
        raise ValueError(
            f"invalid checkpoint input: expected a mapping or path, got {type(checkpoint).__name__}: {checkpoint!r}"
        )

    def save(self, obj: Any, file: PathStr, main_process_only: bool = True, *args, **kwargs) -> File:
        """Save an object with optional main-process guard."""

        if (main_process_only and self.is_main_process) or not main_process_only:
            return save(obj, file, *args, **kwargs)
        return file

    def close(self, timeout: float | None = None) -> bool:
        """Finalize checkpoint/log/writer resources before shutdown."""

        if timeout is None:
            timeout = self.config.get("checkpoint.wait_timeout")

        drained = True
        close_error: Exception | None = None
        try:
            drained = self.checkpoint_manager.close(timeout=timeout)
        except Exception as exc:
            close_error = exc

        if close_error is None and not drained:
            warn("runner close: timed out while draining async checkpoints", RuntimeWarning, stacklevel=2)
            return False

        self.supervisor.restore_signal_handlers()
        writer = self.writer
        if writer is not None:
            writer.flush()
            writer.close()
            self.writer = None

        if self.wandb is not None:
            self.wandb.finish()

        self.workspace.close()
        self.supervisor.close()
        if self.ft is not None:
            self.ft.close()

        if close_error is not None:
            raise close_error
        return drained

    @property
    def mode(self) -> RunnerMode:
        return self._mode

    @mode.setter
    def mode(self, mode: str | RunnerMode) -> None:
        if isinstance(mode, str):
            mode = RunnerMode(mode)
        if getattr(self, "_mode", None) == mode:
            return
        self._mode = mode

    @property
    def batch_size(self) -> int:
        """Infer batch size from config or first dataloader."""
        batch_size = self.config.get("dataloader.batch_size")
        if batch_size is not None:
            return batch_size

        if self.dataloaders:
            loader = next(iter(self.dataloaders.values()))
            batch_size = getattr(loader, "batch_size", None)
            if batch_size is not None:
                return batch_size

        raise AttributeError("batch_size could not be inferred and is not in config")

    @staticmethod
    def _loader_length(loader: Any) -> int | None:
        try:
            return len(loader)
        except (TypeError, NotImplementedError):
            return None

    @property
    def epochs(self) -> int | None:
        """Configured epoch budget, if present."""
        epochs = self.config.get("epochs")
        if epochs is not None:
            return epochs
        return None

    @epochs.setter
    def epochs(self, epochs: int) -> None:
        self.config.epochs = epochs

    @property
    def steps(self) -> int | None:
        """Configured/derived optimizer-step budget."""
        steps = self.config.get("steps")
        if steps is not None:
            return steps
        if self.epochs is not None and self.dataloaders:
            steps_per_epoch = 0
            for split in self.train_splits:
                split_micro_steps = self._loader_length(self.dataloaders[split])
                if split_micro_steps is None:
                    return None
                steps_per_epoch += (split_micro_steps + self.accum_steps - 1) // self.accum_steps
            return steps_per_epoch * self.epochs
        return None

    @steps.setter
    def steps(self, steps: int) -> None:
        self.config.steps = steps

    @property
    def is_step_mode(self) -> bool:
        """Whether runner is in step mode (`epochs` is unset)."""
        return self.epochs is None

    @cached_property
    def accum_steps(self) -> int:
        """Gradient accumulation steps."""
        return self.config.get("accum_steps", 1)

    @cached_property
    def precision(self) -> str | None:
        """Autocast precision mode."""
        return self.config.get("precision")

    @cached_property
    def max_grad_value(self) -> float | None:
        """Gradient value clipping threshold."""
        return self.config.get("max_grad_value")

    @cached_property
    def max_grad_norm(self) -> float | None:
        """Gradient norm clipping threshold."""
        return self.config.get("max_grad_norm")

    @cached_property
    def skip_nonfinite_grad(self) -> bool:
        """Whether to skip optimizer updates when gradients are non-finite."""
        return self.config.get("skip_nonfinite_grad", False)

    @cached_property
    def patience(self) -> int | float:
        """Early-stop patience in epoch mode."""
        return self.config.get("patience", float("inf"))

    @property
    def progress(self) -> float:
        """Normalized training progress in `[0, 1]`."""
        if self.steps is not None:
            return self.train_state.global_step / self.steps
        if self.epochs is not None:
            return self.train_state.epoch / self.epochs
        raise ValueError("cannot compute progress: neither `steps` nor `epochs` is configured")

    @property
    def train_splits(self) -> list[str]:
        """Configured or inferred training split names."""
        if "train_splits" in self.config:
            return self._sorted_unique(self.config["train_splits"])
        if self.datasets:
            inferred = [
                split
                for split, dataset in self.datasets.items()
                if split == "train" or getattr(dataset, "train", False) or getattr(dataset, "split", None) == "train"
            ]
            return self._sorted_unique(inferred)
        return []

    @property
    def evaluate_splits(self) -> list[str]:
        """Configured or inferred evaluation split names."""
        if "evaluate_splits" in self.config:
            return self._sorted_unique(self.config["evaluate_splits"])
        if self.datasets:
            train_splits = set(self.train_splits)
            return sorted(split for split in self.datasets if split not in train_splits)
        return []

    @staticmethod
    def _sorted_unique(values: Sequence[str] | str) -> list[str]:
        if isinstance(values, str):
            return [values]
        return sorted(dict.fromkeys(str(value) for value in values))

    @property
    def checkpoint_interval(self) -> int:
        """Checkpoint cadence in optimizer steps (step mode) or epochs (epoch mode)."""
        configured = self.config.get("checkpoint.interval")
        if configured is not None:
            return configured
        if self.epochs is not None:
            return 1
        if self.steps is not None:
            return max(ceil(self.steps / 20), 1)
        return 8_192

    @property
    def log_interval(self) -> int:
        """Step logging cadence."""
        configured = self.config.get("log_interval")
        if configured is not None:
            return configured
        if self.steps is not None:
            return max(ceil(self.steps / 100), 1)
        return 1_024

world_size property

Python
world_size: int

Distributed world size from environment.

rank property

Python
rank: int

Global rank from environment.

local_rank property

Python
local_rank: int

Local rank from environment.

distributed property

Python
distributed: bool

Whether distributed mode is active.

is_main_process property

Python
is_main_process: bool

Whether current rank is global main process.

is_local_main_process property

Python
is_local_main_process: bool

Whether current rank is local main process.

code_id cached property

Python
code_id: str | None

Stable code identity for the current checkout.

config_id cached property

Python
config_id: str

Stable semantic config identity for this runner.

id property

Python
id: str

Stable run identity derived from code identity and semantic config.

score_split cached property

Python
score_split: str | None

Split used for best-score selection.

scores property

Python
scores: FlatDict | None

Index-to-score mapping extracted from score_split/score_name.

best_index property

Python
best_index: int

Best result index according to configured score metric.

latest_result property

Python
latest_result: RoundDict | None

Most recent appended result row.

best_result property

Python
best_result: RoundDict | None

Best result row according to configured score metric.

latest_score property

Python
latest_score: float | None

Latest scalar score.

best_score property

Python
best_score: float | None

Best scalar score.

is_best property

Python
is_best: bool

Whether latest score matches current best score.

Returns True only when comparable scalar scores are available and agree within tolerance. Returns True on the first iteration (no prior results), and False when scores cannot be resolved (e.g., no score_split/score_name configured) — silently reporting best in that case would trigger phantom “best” checkpoint copies.

batch_size property

Python
batch_size: int

Infer batch size from config or first dataloader.

epochs property writable

Python
epochs: int | None

Configured epoch budget, if present.

steps property writable

Python
steps: int | None

Configured/derived optimizer-step budget.

is_step_mode property

Python
is_step_mode: bool

Whether runner is in step mode (epochs is unset).

accum_steps cached property

Python
accum_steps: int

Gradient accumulation steps.

precision cached property

Python
precision: str | None

Autocast precision mode.

max_grad_value cached property

Python
max_grad_value: float | None

Gradient value clipping threshold.

max_grad_norm cached property

Python
max_grad_norm: float | None

Gradient norm clipping threshold.

skip_nonfinite_grad cached property

Python
skip_nonfinite_grad: bool

Whether to skip optimizer updates when gradients are non-finite.

patience cached property

Python
patience: int | float

Early-stop patience in epoch mode.

progress property

Python
progress: float

Normalized training progress in [0, 1].

train_splits property

Python
train_splits: list[str]

Configured or inferred training split names.

evaluate_splits property

Python
evaluate_splits: list[str]

Configured or inferred evaluation split names.

checkpoint_interval property

Python
checkpoint_interval: int

Checkpoint cadence in optimizer steps (step mode) or epochs (epoch mode).

log_interval property

Python
log_interval: int

Step logging cadence.

__post_init__

Python
__post_init__() -> None

Hook called after __init__ by MetaRunner.

Source code in danling/runners/base_runner.py
Python
def __post_init__(self) -> None:
    """Hook called after `__init__` by `MetaRunner`."""
    self.workspace.save_metadata()

auto_restore

Python
auto_restore() -> None

Auto-load resume/pretrained sources declared in config.

Precedence

config.resume > config.auto_resume > config.pretrained.

Source code in danling/runners/base_runner.py
Python
def auto_restore(self) -> None:
    """Auto-load resume/pretrained sources declared in config.

    Precedence:
        `config.resume` > `config.auto_resume` > `config.pretrained`.
    """

    restore_target = self._resolve_auto_restore_target()
    if restore_target is None:
        return

    restore_kind, restore_source = restore_target
    if restore_kind == "checkpoint":
        self.load_checkpoint(restore_source)
        return
    self.load_pretrained(restore_source)

init_distributed

Python
init_distributed() -> None

Initialize the distributed environment.

The default is a no-op (single-process). Concrete runners override this hook to initialize the torch.distributed process group; see TorchRunner.init_distributed for the canonical specification.

Source code in danling/runners/base_runner.py
Python
def init_distributed(self) -> None:
    """
    Initialize the distributed environment.

    The default is a no-op (single-process). Concrete runners override
    this hook to initialize the torch.distributed process group; see
    [`TorchRunner.init_distributed`][danling.runners.TorchRunner.init_distributed]
    for the canonical specification.
    """

init_checkpoint_manager

Python
init_checkpoint_manager() -> None

Bind the runner’s checkpoint manager.

The default is a no-op — BaseRunner.__init__ already binds the FileCheckpointManager. Concrete runners override this hook to swap in the backend-appropriate manager via set_checkpoint_manager(...); see TorchRunner.init_checkpoint_manager for the canonical specification.

Source code in danling/runners/base_runner.py
Python
def init_checkpoint_manager(self) -> None:
    """
    Bind the runner's checkpoint manager.

    The default is a no-op — `BaseRunner.__init__` already binds the
    `FileCheckpointManager`. Concrete runners override this hook to swap
    in the backend-appropriate manager via `set_checkpoint_manager(...)`;
    see
    [`TorchRunner.init_checkpoint_manager`][danling.runners.TorchRunner.init_checkpoint_manager]
    for the canonical specification.
    """

init_fault_tolerance

Python
init_fault_tolerance() -> None

Initialize optional fault-tolerance runtime support.

Source code in danling/runners/base_runner.py
Python
def init_fault_tolerance(self) -> None:
    """Initialize optional fault-tolerance runtime support."""

    self.ft = FaultTolerance(self)

init_heartbeat

Python
init_heartbeat() -> None

Configure optional background heartbeat writer.

Source code in danling/runners/base_runner.py
Python
def init_heartbeat(self) -> None:
    """Configure optional background heartbeat writer."""

    self.supervisor.init_heartbeat()

init_garbage_collection

Python
init_garbage_collection() -> None

Configure optional runner-managed Python GC pacing.

Source code in danling/runners/base_runner.py
Python
def init_garbage_collection(self) -> None:
    """Configure optional runner-managed Python GC pacing."""

    self.supervisor.init_garbage_collection()

init_signal_handlers

Python
init_signal_handlers() -> None

Install runner-owned signal handlers for graceful preemption.

Source code in danling/runners/base_runner.py
Python
def init_signal_handlers(self) -> None:
    """Install runner-owned signal handlers for graceful preemption."""

    self.supervisor.init_signal_handlers()

prepare_for_shutdown_checkpoint

Python
prepare_for_shutdown_checkpoint() -> None

Finalize runner state before writing a forced shutdown checkpoint.

Source code in danling/runners/base_runner.py
Python
def prepare_for_shutdown_checkpoint(self) -> None:
    """Finalize runner state before writing a forced shutdown checkpoint."""

init_tensorboard

Python
init_tensorboard(*args, **kwargs) -> None

Initialize tensorboard writer.

Source code in danling/runners/base_runner.py
Python
@on_main_process
def init_tensorboard(self, *args, **kwargs) -> None:
    """Initialize tensorboard writer."""

    warn(
        "tensorboard is enabled, but this runner does not initialize a tensorboard writer",
        RuntimeWarning,
        stacklevel=2,
    )

init_wandb

Python
init_wandb(*args, **kwargs) -> None

Initialize Weights & Biases run for scalar logging.

Source code in danling/runners/base_runner.py
Python
@on_main_process
def init_wandb(self, *args, **kwargs) -> None:
    """Initialize Weights & Biases run for scalar logging."""

    try:
        import wandb
    except ImportError as exc:
        raise RuntimeError("wandb is enabled, but the `wandb` package is not installed") from exc

    wandb_config = self.config.wandb
    if "project" not in kwargs:
        kwargs["project"] = wandb_config.get("project") or self.workspace.lineage
    if "entity" not in kwargs and wandb_config.get("entity") is not None:
        kwargs["entity"] = wandb_config.entity
    if "group" not in kwargs:
        kwargs["group"] = wandb_config.get("group") or self.workspace.experiment
    if "name" not in kwargs:
        kwargs["name"] = wandb_config.get("name") or self.id
    if "job_type" not in kwargs and wandb_config.get("job_type") is not None:
        kwargs["job_type"] = wandb_config.job_type
    tags = wandb_config.get("tags")
    if "tags" not in kwargs and tags is not None:
        kwargs["tags"] = [tags] if isinstance(tags, str) else list(tags)
    if "dir" not in kwargs:
        kwargs["dir"] = wandb_config.get("dir") or self.workspace.dir
    if "mode" not in kwargs and wandb_config.get("mode") is not None:
        kwargs["mode"] = wandb_config.mode
    if "config" not in kwargs:
        kwargs["config"] = self.config.dict()

    self.wandb = cast(Any, wandb).init(*args, **kwargs)

set_seed

Python
set_seed(
    seed: int | None = None, bias: int | bool | None = None
) -> int

Set python/numpy RNG seeds and snapshot RNG state.

Parameters:

Name Type Description Default

seed

int | None

Base seed. Defaults to self.config.seed.

None

bias

int | bool | None

Optional per-process bias. None uses self.rank.

None

Returns:

Type Description
int

The process-local seed after applying bias.

Source code in danling/runners/base_runner.py
Python
def set_seed(self, seed: int | None = None, bias: int | bool | None = None) -> int:
    """Set python/numpy RNG seeds and snapshot RNG state.

    Args:
        seed: Base seed. Defaults to `self.config.seed`.
        bias: Optional per-process bias. `None` uses `self.rank`.

    Returns:
        The process-local seed after applying bias.
    """

    base_seed = self.config.seed if seed is None else seed
    if base_seed is None:
        raise ValueError("cannot set seed: no seed is configured and no seed argument was provided")
    base_seed = int(base_seed)

    self.config.seed = base_seed

    process_seed = base_seed
    if bias is None:
        bias = self.rank
    if bias:
        process_seed += int(bias)

    random.seed(process_seed)
    if np_random is not None:
        np_random.seed(process_seed)

    self.rng_state.python = random.getstate()
    self.rng_state.numpy = np_random.get_state() if np_random is not None else None
    return process_seed

set_deterministic

Python
set_deterministic() -> None

Enable deterministic behavior in subclass-specific backends.

Source code in danling/runners/base_runner.py
Python
def set_deterministic(self) -> None:
    """Enable deterministic behavior in subclass-specific backends."""

train

Python
train(*args, **kwargs)

Run top-level training workflow.

Source code in danling/runners/base_runner.py
Python
def train(self, *args, **kwargs):
    """Run top-level training workflow."""

    raise NotImplementedError

train_epochs

Python
train_epochs(*args, **kwargs)

Run epoch-mode training workflow.

Source code in danling/runners/base_runner.py
Python
def train_epochs(self, *args, **kwargs):
    """Run epoch-mode training workflow."""

    raise NotImplementedError

train_epoch

Python
train_epoch(*args, **kwargs)

Run one training epoch on a split.

Source code in danling/runners/base_runner.py
Python
def train_epoch(self, *args, **kwargs):
    """Run one training epoch on a split."""

    raise NotImplementedError

train_steps

Python
train_steps(*args, **kwargs)

Run step-mode training workflow.

Source code in danling/runners/base_runner.py
Python
def train_steps(self, *args, **kwargs):
    """Run step-mode training workflow."""

    raise NotImplementedError

train_step

Python
train_step(*args, **kwargs)

Run one training micro-step.

Concrete runners define the override contract; see TorchRunner.train_step for the canonical specification.

Source code in danling/runners/base_runner.py
Python
def train_step(self, *args, **kwargs):
    """
    Run one training micro-step.

    Concrete runners define the override contract; see
    [`TorchRunner.train_step`][danling.runners.TorchRunner.train_step] for
    the canonical specification.
    """

    raise NotImplementedError

backward

Python
backward(loss, *args, **kwargs) -> None

Run backward pass for one micro-step loss.

Source code in danling/runners/base_runner.py
Python
def backward(self, loss, *args, **kwargs) -> None:
    """Run backward pass for one micro-step loss."""

    raise NotImplementedError

step

Python
step(*args, **kwargs) -> None

Advance optimizer/scheduler state when accumulation is ready.

Source code in danling/runners/base_runner.py
Python
def step(self, *args, **kwargs) -> None:
    """Advance optimizer/scheduler state when accumulation is ready."""

    raise NotImplementedError

evaluate

Python
evaluate(*args, **kwargs)

Run top-level evaluation workflow.

Source code in danling/runners/base_runner.py
Python
def evaluate(self, *args, **kwargs):
    """Run top-level evaluation workflow."""

    raise NotImplementedError

evaluate_epoch

Python
evaluate_epoch(*args, **kwargs)

Run one full evaluation epoch on a split.

Source code in danling/runners/base_runner.py
Python
def evaluate_epoch(self, *args, **kwargs):
    """Run one full evaluation epoch on a split."""

    raise NotImplementedError

evaluate_steps

Python
evaluate_steps(*args, **kwargs)

Run bounded evaluation steps on a split.

Source code in danling/runners/base_runner.py
Python
def evaluate_steps(self, *args, **kwargs):
    """Run bounded evaluation steps on a split."""

    raise NotImplementedError

evaluate_step

Python
evaluate_step(*args, **kwargs)

Run one evaluation step.

Concrete runners define the override contract; see TorchRunner.evaluate_step for the canonical specification.

Source code in danling/runners/base_runner.py
Python
def evaluate_step(self, *args, **kwargs):
    """
    Run one evaluation step.

    Concrete runners define the override contract; see
    [`TorchRunner.evaluate_step`][danling.runners.TorchRunner.evaluate_step]
    for the canonical specification.
    """

    raise NotImplementedError

infer

Python
infer(*args, **kwargs)

Run top-level inference workflow.

Source code in danling/runners/base_runner.py
Python
def infer(self, *args, **kwargs):
    """Run top-level inference workflow."""

    raise NotImplementedError

infer_step

Python
infer_step(*args, **kwargs)

Run one inference step.

Concrete runners define the override contract; see TorchRunner.infer_step for the canonical specification.

Source code in danling/runners/base_runner.py
Python
def infer_step(self, *args, **kwargs):
    """
    Run one inference step.

    Concrete runners define the override contract; see
    [`TorchRunner.infer_step`][danling.runners.TorchRunner.infer_step] for
    the canonical specification.
    """

    raise NotImplementedError

unwrap

Python
unwrap(model: Any) -> Any

Return an unwrapped model object.

Source code in danling/runners/base_runner.py
Python
def unwrap(self, model: Any) -> Any:
    """Return an unwrapped model object."""

    return model

state_dict

Python
state_dict(cls: type = dict) -> Mapping

Build the backend-neutral runner checkpoint payload.

The base payload contains semantic runner config, mutable runner state, RNG snapshots, and dataloader resume state. Backend runners extend this payload with model/optimizer/scheduler state.

Called when: checkpoint managers build a payload for save_checkpoint, and fault-tolerance callbacks need a runner state snapshot.

Parameters:

Name Type Description Default

cls

type

Mapping factory used for nested payloads. Backends may pass dict-like containers to preserve their serialization format.

dict

Returns:

Type Description
Mapping

Mapping with runner, state, and dataloaders keys.

Side effects: snapshots Python and NumPy RNG state into self.rng_state before exporting.

Do not

  • Mutate model or optimizer state here.
  • Drop the runner config payload; resume validation depends on it.
  • Override without calling super() unless you fully replace the checkpoint format.
Source code in danling/runners/base_runner.py
Python
def state_dict(self, cls: type = dict) -> Mapping:
    """
    Build the backend-neutral runner checkpoint payload.

    The base payload contains semantic runner config, mutable runner
    state, RNG snapshots, and dataloader resume state. Backend runners
    extend this payload with model/optimizer/scheduler state.

    **Called when:** checkpoint managers build a payload for
    `save_checkpoint`, and fault-tolerance callbacks need a runner state
    snapshot.

    Args:
        cls: Mapping factory used for nested payloads. Backends may pass
            `dict`-like containers to preserve their serialization format.

    Returns:
        Mapping with `runner`, `state`, and `dataloaders` keys.

    **Side effects:** snapshots Python and NumPy RNG state into
    `self.rng_state` before exporting.

    !!! danger "Do not"
        - Mutate model or optimizer state here.
        - Drop the `runner` config payload; resume validation depends on it.
        - Override without calling `super()` unless you fully replace the
          checkpoint format.
    """

    self.rng_state.python = random.getstate()
    self.rng_state.numpy = np_random.get_state() if np_random is not None else None

    state = self.state.state_dict()
    if cls is not dict:
        state = cls(state)

    dataloader_state = self.dataloaders.state_dict()
    if cls is not dict:
        dataloader_state = cls(dataloader_state)

    return cls(runner=self.config.dict(), state=state, dataloaders=dataloader_state)

load_state_dict

Python
load_state_dict(checkpoint: Mapping[str, Any]) -> None

Restore backend-neutral runner state from a checkpoint payload.

This restores semantic runner state and Python/NumPy RNG state. Model, EMA, optimizer, scheduler, and dataloader component loading is owned by load_checkpoint.

Called when: load_checkpoint restores a full checkpoint, and fault-tolerance callbacks receive a runner state payload.

Parameters:

Name Type Description Default

checkpoint

Mapping[str, Any]

Mapping produced by state_dict or a backend-specific superset of that payload.

required

Raises:

Type Description
ValueError

checkpoint runner config differs semantically from the current runner config.

Side effects: updates self.state, self.train_state, self.elastic_state, self.rng_state, and process RNG state.

Do not

  • Load model/optimizer/scheduler state here; use component loaders through load_checkpoint.
  • Suppress semantic config diffs unless you also update the resume policy deliberately.
Source code in danling/runners/base_runner.py
Python
def load_state_dict(self, checkpoint: Mapping[str, Any]) -> None:
    """
    Restore backend-neutral runner state from a checkpoint payload.

    This restores semantic runner state and Python/NumPy RNG state. Model,
    EMA, optimizer, scheduler, and dataloader component loading is owned by
    `load_checkpoint`.

    **Called when:** `load_checkpoint` restores a full checkpoint, and
    fault-tolerance callbacks receive a runner state payload.

    Args:
        checkpoint: Mapping produced by `state_dict` or a backend-specific
            superset of that payload.

    Raises:
        ValueError: checkpoint runner config differs semantically from the
            current runner config.

    **Side effects:** updates `self.state`, `self.train_state`,
    `self.elastic_state`, `self.rng_state`, and process RNG state.

    !!! danger "Do not"
        - Load model/optimizer/scheduler state here; use component loaders
          through `load_checkpoint`.
        - Suppress semantic config diffs unless you also update the resume
          policy deliberately.
    """

    runner_config = checkpoint.get("runner")
    if runner_config is not None:
        checkpoint_config = RunnerConfig(runner_config).canonical()
        current_config = self.config.canonical()
        semantic_diff = NestedDict(checkpoint_config).diff(current_config).dict()
        if semantic_diff:
            raise ValueError(
                "cannot load checkpoint: runner config is semantically different from current config; "
                f"start a new experiment or align config. diff={semantic_diff}"
            )

    state_dict = checkpoint.get("state") or {}
    self.state.load_state_dict(dict(state_dict))

    rng_state = state_dict.get("rng")
    if isinstance(rng_state, Mapping) and "python" in rng_state and self.rng_state.python is not None:
        random.setstate(self.rng_state.python)

    if (
        np_random is not None
        and isinstance(rng_state, Mapping)
        and "numpy" in rng_state
        and self.rng_state.numpy is not None
    ):
        np_random.set_state(self.rng_state.numpy)

save_checkpoint

Python
save_checkpoint(
    name: str = "latest",
    epochs: int | None = None,
    save_best: bool = True,
    last_step: bool = False,
    force: bool = False,
) -> None

Persist runner state through the active checkpoint manager.

Backend collective semantics are owned by checkpoint_manager.is_collective. File-style managers save on the main process only; collective managers require every rank to enter this method together.

Called when: training loops hit checkpoint cadence, final last_step saves run, or the supervisor handles a shutdown signal.

Parameters:

Name Type Description Default

name

str

Logical checkpoint alias, usually "latest" or "best".

'latest'

epochs

int | None

Epoch index used for history checkpoint naming. Defaults to self.train_state.epoch.

None

save_best

bool

Whether to publish/update the best-checkpoint alias when self.is_best is true.

True

last_step

bool

Whether this save is the final save for the run.

False

force

bool

Bypass cadence checks inside the manager.

False

Side effects: delegates to self.checkpoint_manager.save_checkpoint(...).

Do not

  • Add a main-process guard around calls to this method; DCP-style managers need all ranks to participate.
  • Bypass the checkpoint manager for normal runner checkpoints.
Source code in danling/runners/base_runner.py
Python
def save_checkpoint(
    self,
    name: str = "latest",
    epochs: int | None = None,
    save_best: bool = True,
    last_step: bool = False,
    force: bool = False,
) -> None:
    """
    Persist runner state through the active checkpoint manager.

    Backend collective semantics are owned by
    `checkpoint_manager.is_collective`. File-style managers save on the
    main process only; collective managers require every rank to enter this
    method together.

    **Called when:** training loops hit checkpoint cadence, final
    `last_step` saves run, or the supervisor handles a shutdown signal.

    Args:
        name: Logical checkpoint alias, usually `"latest"` or `"best"`.
        epochs: Epoch index used for history checkpoint naming. Defaults
            to `self.train_state.epoch`.
        save_best: Whether to publish/update the best-checkpoint alias
            when `self.is_best` is true.
        last_step: Whether this save is the final save for the run.
        force: Bypass cadence checks inside the manager.

    **Side effects:** delegates to
    `self.checkpoint_manager.save_checkpoint(...)`.

    !!! danger "Do not"
        - Add a main-process guard around calls to this method; DCP-style
          managers need all ranks to participate.
        - Bypass the checkpoint manager for normal runner checkpoints.
    """

    if not (self.is_main_process or self.checkpoint_manager.is_collective):
        return
    epochs = self.train_state.epoch if epochs is None else epochs
    self.checkpoint_manager.save_checkpoint(
        name=name,
        epochs=epochs,
        save_best=save_best,
        last_step=last_step,
        force=force,
    )

save_seed_checkpoint

Python
save_seed_checkpoint(name: str = 'seed') -> None

Persist an initialization checkpoint for cross-topology experiments.

Seed checkpoints are intended to be created before training advances, then loaded with checkpoint.load_only=True or resume/pretrained when comparing different parallel layouts from the same initial model state. They are saved through the final-checkpoint path, so checkpoint.last_save_model_only=True intentionally applies.

Source code in danling/runners/base_runner.py
Python
def save_seed_checkpoint(self, name: str = "seed") -> None:
    """
    Persist an initialization checkpoint for cross-topology experiments.

    Seed checkpoints are intended to be created before training advances,
    then loaded with `checkpoint.load_only=True` or `resume`/`pretrained`
    when comparing different parallel layouts from the same initial model
    state. They are saved through the final-checkpoint path, so
    `checkpoint.last_save_model_only=True` intentionally applies.
    """
    if self.train_state.global_step != 0 or self.train_state.epoch != 0:
        warn(
            "save_seed_checkpoint() is intended before training advances; "
            f"current epoch={self.train_state.epoch}, global_step={self.train_state.global_step}",
            RuntimeWarning,
            stacklevel=2,
        )
    self.save_checkpoint(name=name, epochs=0, save_best=False, last_step=True, force=True)

load_checkpoint

Python
load_checkpoint(
    checkpoint: Mapping | bytes | str | PathLike,
    *args: Any,
    **kwargs: Any
) -> None

Restore a full runner checkpoint.

This is the full-state restore path: runtime state, model/EMA, optimizer, scheduler, and dataloader progress are restored when present and applicable to the current runner.

Called when: users resume a run explicitly, auto_restore selects a resume source, from_checkpoint constructs a runner, or fault-tolerance callbacks restore a full runner payload.

Parameters:

Name Type Description Default

checkpoint

Mapping | bytes | str | PathLike

In-memory checkpoint mapping or backend-specific path.

required

*args

Any

Forwarded to read_checkpoint and component loaders.

()

**kwargs

Any

Forwarded to read_checkpoint and component loaders.

{}

Raises:

Type Description
ValueError

checkpoint is missing required component state for an initialized component, or config validation fails.

Side effects: updates runner state, model/EMA weights, optimizer, scheduler, dataloader progress, and config.resume for path inputs.

Do not

  • Use this for model-only finetuning payloads; use load_pretrained instead.
  • Override just to support a new path type; prefer overriding read_checkpoint.
Source code in danling/runners/base_runner.py
Python
def load_checkpoint(
    self,
    checkpoint: Mapping | bytes | str | os.PathLike,
    *args: Any,
    **kwargs: Any,
) -> None:
    """
    Restore a full runner checkpoint.

    This is the full-state restore path: runtime state, model/EMA,
    optimizer, scheduler, and dataloader progress are restored when present
    and applicable to the current runner.

    **Called when:** users resume a run explicitly, `auto_restore` selects
    a resume source, `from_checkpoint` constructs a runner, or
    fault-tolerance callbacks restore a full runner payload.

    Args:
        checkpoint: In-memory checkpoint mapping or backend-specific path.
        *args: Forwarded to `read_checkpoint` and component loaders.
        **kwargs: Forwarded to `read_checkpoint` and component loaders.

    Raises:
        ValueError: checkpoint is missing required component state for an
            initialized component, or config validation fails.

    **Side effects:** updates runner state, model/EMA weights, optimizer,
    scheduler, dataloader progress, and `config.resume` for path inputs.

    !!! danger "Do not"
        - Use this for model-only finetuning payloads; use
          `load_pretrained` instead.
        - Override just to support a new path type; prefer overriding
          `read_checkpoint`.
    """

    ckpt = self.read_checkpoint(checkpoint, *args, **kwargs)
    excluded_paths = self.checkpoint_exclude_from_loading()
    if excluded_paths:
        if self._is_top_level_checkpoint_excluded(excluded_paths, "runner"):
            warn(
                "`checkpoint.exclude_from_loading` contains 'runner'; "
                "semantic runner config validation will be skipped for this load.",
                RuntimeWarning,
                stacklevel=2,
            )
        ckpt = self._filter_checkpoint_for_loading(ckpt, excluded_paths)

    self.load_state_dict(ckpt)
    if not self._is_top_level_checkpoint_excluded(excluded_paths, "model", "model_parts", "module"):
        if "model" in ckpt:
            self.load_model(ckpt["model"], *args, **kwargs)
        elif "model_parts" in ckpt:
            self.load_model(ckpt["model_parts"], *args, **kwargs)
        elif self.model is not None:
            raise ValueError(
                "cannot restore model: checkpoint has no model state\n"
                "Use `load_pretrained` only for model-only checkpoints with model/ema payloads"
            )
    if not self._is_top_level_checkpoint_excluded(excluded_paths, "ema") and (
        self.ema is not None or "ema" in ckpt
    ):
        self.load_ema(ckpt.get("ema"), *args, **kwargs)
    if not self._is_top_level_checkpoint_excluded(excluded_paths, "optimizer") and (
        self.optimizer is not None or "optimizer" in ckpt
    ):
        self.load_optimizer(ckpt.get("optimizer"), *args, **kwargs)
    if not self._is_top_level_checkpoint_excluded(excluded_paths, "scheduler") and (
        self.scheduler is not None or "scheduler" in ckpt
    ):
        self.load_scheduler(ckpt.get("scheduler"), *args, **kwargs)
    if not self._is_top_level_checkpoint_excluded(excluded_paths, "dataloaders") and (
        self.dataloaders or "dataloaders" in ckpt
    ):
        self.load_dataloaders(ckpt.get("dataloaders"))
    if isinstance(checkpoint, (str, bytes, os.PathLike)):
        self.config.resume = os.fsdecode(checkpoint)

load_model

Python
load_model(
    state_dict: Mapping[str, Any], *args, **kwargs
) -> None

Load model state.

Source code in danling/runners/base_runner.py
Python
def load_model(self, state_dict: Mapping[str, Any], *args, **kwargs) -> None:
    """Load model state."""
    if self.model is None:
        raise ValueError("cannot restore model: model is not initialized")
    self.unwrap(self.model).load_state_dict(state_dict, *args, **kwargs)

load_ema

Python
load_ema(
    state_dict: Mapping[str, Any] | None, *args, **kwargs
) -> None

Load EMA state.

Source code in danling/runners/base_runner.py
Python
def load_ema(self, state_dict: Mapping[str, Any] | None, *args, **kwargs) -> None:
    """Load EMA state."""
    if self.ema is None:
        return
    state_dict = self._require_checkpoint_component_state("ema", state_dict)
    self.ema.load_state_dict(state_dict, *args, **kwargs)

load_optimizer

Python
load_optimizer(
    state_dict: Mapping[str, Any] | None, *args, **kwargs
) -> None

Load optimizer state.

Source code in danling/runners/base_runner.py
Python
def load_optimizer(self, state_dict: Mapping[str, Any] | None, *args, **kwargs) -> None:
    """Load optimizer state."""
    if self.optimizer is None:
        return
    state_dict = self._require_checkpoint_component_state("optimizer", state_dict)
    self.optimizer.load_state_dict(state_dict, *args, **kwargs)

load_scheduler

Python
load_scheduler(
    state_dict: Mapping[str, Any] | None, *args, **kwargs
) -> None

Load scheduler state.

Source code in danling/runners/base_runner.py
Python
def load_scheduler(self, state_dict: Mapping[str, Any] | None, *args, **kwargs) -> None:
    """Load scheduler state."""
    if self.scheduler is None:
        return
    state_dict = self._require_checkpoint_component_state("scheduler", state_dict)
    self.scheduler.load_state_dict(state_dict, *args, **kwargs)

load_dataloaders

Python
load_dataloaders(
    state_dict: Mapping[str, Any] | None,
) -> None

Load dataloader progress state when the current runner has matching loaders.

Source code in danling/runners/base_runner.py
Python
def load_dataloaders(self, state_dict: Mapping[str, Any] | None) -> None:
    """Load dataloader progress state when the current runner has matching loaders."""
    if state_dict is None:
        return
    self.dataloaders.load_state_dict(state_dict)

load_pretrained

Python
load_pretrained(
    checkpoint: Mapping | bytes | str | PathLike,
    *args: Any,
    **kwargs: Any
) -> None

Load model weights only from a checkpoint payload or path.

When checkpoint payload provides EMA weights (ema), EMA is preferred as the pretrained source. Otherwise model is used.

Called when: users initialize from pretrained weights, or auto_restore selects config.pretrained.

Parameters:

Name Type Description Default

checkpoint

Mapping | bytes | str | PathLike

In-memory payload or backend-specific path containing ema, model, or model_parts.

required

*args

Any

Forwarded to read_checkpoint and load_model.

()

**kwargs

Any

Forwarded to read_checkpoint and load_model.

{}

Raises:

Type Description
ValueError

model is not initialized, or the payload has no usable model/EMA state.

Side effects: loads model weights and updates config.pretrained for path inputs. Optimizer, scheduler, runner state, and dataloaders are intentionally untouched.

Do not

  • Use this to resume training state; use load_checkpoint for full-state restore.
  • Load optimizer/scheduler state in this path.
Source code in danling/runners/base_runner.py
Python
def load_pretrained(
    self,
    checkpoint: Mapping | bytes | str | os.PathLike,
    *args: Any,
    **kwargs: Any,
) -> None:
    """
    Load model weights only from a checkpoint payload or path.

    When checkpoint payload provides EMA weights (`ema`), EMA is preferred as
    the pretrained source. Otherwise `model` is used.

    **Called when:** users initialize from pretrained weights, or
    `auto_restore` selects `config.pretrained`.

    Args:
        checkpoint: In-memory payload or backend-specific path containing
            `ema`, `model`, or `model_parts`.
        *args: Forwarded to `read_checkpoint` and `load_model`.
        **kwargs: Forwarded to `read_checkpoint` and `load_model`.

    Raises:
        ValueError: model is not initialized, or the payload has no usable
            model/EMA state.

    **Side effects:** loads model weights and updates `config.pretrained`
    for path inputs. Optimizer, scheduler, runner state, and dataloaders
    are intentionally untouched.

    !!! danger "Do not"
        - Use this to resume training state; use `load_checkpoint` for
          full-state restore.
        - Load optimizer/scheduler state in this path.
    """

    if self.model is None:
        raise ValueError("cannot load pretrained weights: model is not initialized")

    ckpt = self.read_checkpoint(checkpoint, *args, **kwargs)
    if ckpt.get("ema") is not None:
        self.load_model(ckpt["ema"], *args, **kwargs)
    elif "model" in ckpt:
        self.load_model(ckpt["model"], *args, **kwargs)
    elif "model_parts" in ckpt:
        self.load_model(ckpt["model_parts"], *args, **kwargs)
    else:
        raise ValueError(
            "cannot load pretrained weights: checkpoint has no EMA or model state\n"
            "Use `load_checkpoint` for full checkpoint restore instead of `load_pretrained`"
        )
    if isinstance(checkpoint, (str, bytes, os.PathLike)):
        self.config.pretrained = os.fsdecode(checkpoint)
    else:
        self.config.pretrained = None

from_checkpoint classmethod

Python
from_checkpoint(
    checkpoint: Mapping | bytes | str | PathLike,
    *args,
    **kwargs
) -> BaseRunner

Instantiate runner from checkpoint config and restore full state.

Source code in danling/runners/base_runner.py
Python
@classmethod
def from_checkpoint(cls, checkpoint: Mapping | bytes | str | os.PathLike, *args, **kwargs) -> BaseRunner:
    """Instantiate runner from checkpoint config and restore full state."""

    config = cls.read_config(checkpoint, *args, **kwargs)
    config.resume = None
    config.auto_resume = False
    config.pretrained = None
    runner = cls(config)
    runner.load_checkpoint(checkpoint, *args, **kwargs)
    return runner

read_config classmethod

Python
read_config(
    checkpoint: Mapping | bytes | str | PathLike,
    *args,
    **kwargs
) -> RunnerConfig

Read runner config from checkpoint mapping or file path.

Note

BaseRunner only accepts file checkpoints for path input. Backend-specific directory checkpoints must be handled in subclasses.

Source code in danling/runners/base_runner.py
Python
@classmethod
def read_config(
    cls,
    checkpoint: Mapping | bytes | str | os.PathLike,
    *args,
    **kwargs,
) -> RunnerConfig:
    """
    Read runner config from checkpoint mapping or file path.

    Note:
        BaseRunner only accepts file checkpoints for path input.
        Backend-specific directory checkpoints must be handled in subclasses.
    """

    if isinstance(checkpoint, Mapping):
        ckpt = checkpoint
    elif isinstance(checkpoint, (bytes, str, os.PathLike)):
        checkpoint_id = os.fspath(checkpoint)
        if os.path.isfile(checkpoint_id):
            kwargs = dict(kwargs)
            kwargs["map_location"] = "cpu"
            kwargs["weights_only"] = False
            ckpt = load(checkpoint, *args, **kwargs)
        else:
            raise ValueError(
                f"cannot read config from checkpoint path for {cls.__name__}: path must be a file; "
                "use a backend-specific runner for directory-style checkpoints"
            )
    else:
        raise ValueError(
            "invalid checkpoint input: expected a mapping or path, "
            f"got {type(checkpoint).__name__}: {checkpoint!r}"
        )

    if "runner" not in ckpt:
        raise ValueError(
            "cannot read runner config: checkpoint is missing key 'runner'; "
            "use from_pretrained(...) for model-only checkpoints"
        )
    return RunnerConfig(ckpt["runner"])

from_pretrained classmethod

Python
from_pretrained(
    config: RunnerConfig | Mapping[str, Any],
    checkpoint: Mapping | bytes | str | PathLike,
    *args,
    **kwargs
) -> BaseRunner

Build a runner from config and load model weights only.

Source code in danling/runners/base_runner.py
Python
@classmethod
def from_pretrained(
    cls,
    config: RunnerConfig | Mapping[str, Any],
    checkpoint: Mapping | bytes | str | os.PathLike,
    *args,
    **kwargs,
) -> BaseRunner:
    """Build a runner from config and load model weights only."""

    prepared = RunnerConfig(config)
    prepared.resume = None
    prepared.auto_resume = False
    prepared.pretrained = None
    runner = cls(prepared)
    runner.load_pretrained(checkpoint, *args, **kwargs)
    return runner

read_checkpoint

Python
read_checkpoint(
    checkpoint: Mapping | bytes | str | PathLike,
    *args,
    **kwargs
) -> Mapping[str, Any]

Normalize checkpoint input into an in-memory mapping payload.

Source code in danling/runners/base_runner.py
Python
def read_checkpoint(self, checkpoint: Mapping | bytes | str | os.PathLike, *args, **kwargs) -> Mapping[str, Any]:
    """Normalize checkpoint input into an in-memory mapping payload."""
    if isinstance(checkpoint, (bytes, str, os.PathLike)):
        kwargs = dict(kwargs)
        kwargs["map_location"] = "cpu"
        kwargs["weights_only"] = False
        return load(checkpoint, *args, **kwargs)
    if isinstance(checkpoint, Mapping):
        return checkpoint
    raise ValueError(
        f"invalid checkpoint input: expected a mapping or path, got {type(checkpoint).__name__}: {checkpoint!r}"
    )

save

Python
save(
    obj: Any,
    file: PathStr,
    main_process_only: bool = True,
    *args,
    **kwargs
) -> File

Save an object with optional main-process guard.

Source code in danling/runners/base_runner.py
Python
def save(self, obj: Any, file: PathStr, main_process_only: bool = True, *args, **kwargs) -> File:
    """Save an object with optional main-process guard."""

    if (main_process_only and self.is_main_process) or not main_process_only:
        return save(obj, file, *args, **kwargs)
    return file

close

Python
close(timeout: float | None = None) -> bool

Finalize checkpoint/log/writer resources before shutdown.

Source code in danling/runners/base_runner.py
Python
def close(self, timeout: float | None = None) -> bool:
    """Finalize checkpoint/log/writer resources before shutdown."""

    if timeout is None:
        timeout = self.config.get("checkpoint.wait_timeout")

    drained = True
    close_error: Exception | None = None
    try:
        drained = self.checkpoint_manager.close(timeout=timeout)
    except Exception as exc:
        close_error = exc

    if close_error is None and not drained:
        warn("runner close: timed out while draining async checkpoints", RuntimeWarning, stacklevel=2)
        return False

    self.supervisor.restore_signal_handlers()
    writer = self.writer
    if writer is not None:
        writer.flush()
        writer.close()
        self.writer = None

    if self.wandb is not None:
        self.wandb.finish()

    self.workspace.close()
    self.supervisor.close()
    if self.ft is not None:
        self.ft.close()

    if close_error is not None:
        raise close_error
    return drained