Skip to content

DAG — Task orchestration

Update date : 2026-05-31

The provider's dag: key is a meta-resource: it generates N Tasks + auto-injected wrappers from a single manifest declaration.


Trigger modes

Mode Trigger new_files
schedule without S3 Snowflake CRON empty
schedule with S3 CRON + POLL_AND_DISPATCH populated
triggered External task + CONSUME_STREAM populated
on-demand EXECUTE TASK manual / CI empty
CONDITIONAL_CRON CRON + pinky-core.schedule condition → SKIP if false empty

CONDITIONAL_CRON use cases: skip on non-business days, suspend a client during billing lapse, skip during maintenance windows. See ADR-0013.


Auto-injected tasks

Task Mode Role
ROOT_TASK_WRAPPER all Collects TaskRuntimeInfo, manages SKIP
POLL_AND_DISPATCH schedule + S3 Lists S3_IN, inserts tracking, passes new_files
CONSUME_STREAM triggered Reads append-only stream, passes new_files

dagv1 (Snowflake Python DAG API) is required because the Core API imposes Python for task graphs. TaskContext is the only clean Python API to read/write return values between tasks without inline SQL. The dagv1 dependency is confined to the 2 wrappers — future migration = 2 SPs to touch, zero business SPs affected.


SKIP cascade

ROOT_TASK_WRAPPER returns 'SKIP' when no new files exist (CRON mode with no data). Downstream tasks guard on:

WHEN SYSTEM$GET_PREDECESSOR_RETURN_VALUE(...) != 'SKIP'

Backward-compatible defaults

  • continue absent → interpreted as 1 (pipeline continues) — existing SPs unchanged
  • trigger_rule absent → ALL (all predecessors must continue)

Business SP contract

SPs receive args: dict, return dict. They are unaware they run inside a DAG. They must be no-op safe on new_files = [] (on-demand mode).


multi_client_execution — orchestration multi-tenant

When a task step declares multi_client_execution: true, TASK_WRAPPER iterates over all client schemas in the current database and calls the target SP once per client.

No CLIENT_LIST table required. The schema structure itself is the state — consistent with ADR-0004 (zero statefile). PROVISION_CLIENT creates a schema → automatically included on next run. OFFBOARD_CLIENT drops the schema → automatically excluded. Zero drift, zero maintenance.

# TASK_WRAPPER — multi_client_execution path (Core API)
from snowflake.core import Root

def task_wrapper(session, args):
    if args.get("multi_client_execution"):
        root = Root(session)
        db   = session.get_current_database()

        clients = [
            s.name for s in root.databases[db].schemas.iter()
            if s.name not in ("PUBLIC", "INFORMATION_SCHEMA")
        ]

        for client_id in clients:
            session.call(args["target_sp"], {**args, "client_id": client_id})

        return {"continue": 1}
    else:
        return session.call(args["target_sp"], args)

The target SP handles one client. It receives client_id in args, injected by TASK_WRAPPER. No wrapper SP in the client schema — the business SP in PUBLIC operates directly on the client schema.

# SP in PUBLIC — handles one client, unaware of the loop
def process_one_client(session, args: dict) -> dict:
    client_id = args["client_id"]   # injected by TASK_WRAPPER
    session.sql(f"INSERT INTO {client_id}.results SELECT ...").collect()
    return {"continue": 1}

Manifest declaration

tasks:
  - name: PROCESS_CLIENTS
    target_sp: process_one_client   # SP in PUBLIC
    multi_client_execution: true
    trigger_rule: ALL

What this removes

No SP wrappers in client schemas. The client schema contains only data objects: tables, stages, event_table, secrets (delegated credentials). No orchestration logic.


Manifest example

dag:
  notification_type: ON_ERROR
  notify: [SUPPORT]
  schedule: "{{vars.schedule}}"

stages:
  - external_input_files   # provisions stage + injects POLL_AND_DISPATCH
  - external_output_files  # provisions stage
  - internal_exposed_files # provisions stage + VIEWER grants
  - resources_static       # provisions stage