DAG — Task orchestration
Update date : 2026-05-31
The provider's dag: key is a meta-resource: it generates N Tasks + auto-injected wrappers from
a single manifest declaration.
Trigger modes
| Mode | Trigger | new_files |
|---|---|---|
schedule without S3 |
Snowflake CRON | empty |
schedule with S3 |
CRON + POLL_AND_DISPATCH | populated |
triggered |
External task + CONSUME_STREAM | populated |
on-demand |
EXECUTE TASK manual / CI |
empty |
CONDITIONAL_CRON |
CRON + pinky-core.schedule condition → SKIP if false |
empty |
CONDITIONAL_CRON use cases: skip on non-business days, suspend a client during billing lapse,
skip during maintenance windows. See ADR-0013.
Auto-injected tasks
| Task | Mode | Role |
|---|---|---|
ROOT_TASK_WRAPPER |
all | Collects TaskRuntimeInfo, manages SKIP |
POLL_AND_DISPATCH |
schedule + S3 | Lists S3_IN, inserts tracking, passes new_files |
CONSUME_STREAM |
triggered | Reads append-only stream, passes new_files |
dagv1 (Snowflake Python DAG API) is required because the Core API imposes Python for task graphs.
TaskContext is the only clean Python API to read/write return values between tasks without inline SQL.
The dagv1 dependency is confined to the 2 wrappers — future migration = 2 SPs to touch, zero
business SPs affected.
SKIP cascade
ROOT_TASK_WRAPPER returns 'SKIP' when no new files exist (CRON mode with no data).
Downstream tasks guard on:
WHEN SYSTEM$GET_PREDECESSOR_RETURN_VALUE(...) != 'SKIP'
Backward-compatible defaults
continueabsent → interpreted as1(pipeline continues) — existing SPs unchangedtrigger_ruleabsent →ALL(all predecessors must continue)
Business SP contract
SPs receive args: dict, return dict. They are unaware they run inside a DAG.
They must be no-op safe on new_files = [] (on-demand mode).
multi_client_execution — orchestration multi-tenant
When a task step declares multi_client_execution: true, TASK_WRAPPER iterates over all
client schemas in the current database and calls the target SP once per client.
No CLIENT_LIST table required. The schema structure itself is the state — consistent with
ADR-0004 (zero statefile). PROVISION_CLIENT creates a schema → automatically included on next run.
OFFBOARD_CLIENT drops the schema → automatically excluded. Zero drift, zero maintenance.
# TASK_WRAPPER — multi_client_execution path (Core API)
from snowflake.core import Root
def task_wrapper(session, args):
if args.get("multi_client_execution"):
root = Root(session)
db = session.get_current_database()
clients = [
s.name for s in root.databases[db].schemas.iter()
if s.name not in ("PUBLIC", "INFORMATION_SCHEMA")
]
for client_id in clients:
session.call(args["target_sp"], {**args, "client_id": client_id})
return {"continue": 1}
else:
return session.call(args["target_sp"], args)
The target SP handles one client. It receives client_id in args, injected by TASK_WRAPPER.
No wrapper SP in the client schema — the business SP in PUBLIC operates directly on the client schema.
# SP in PUBLIC — handles one client, unaware of the loop
def process_one_client(session, args: dict) -> dict:
client_id = args["client_id"] # injected by TASK_WRAPPER
session.sql(f"INSERT INTO {client_id}.results SELECT ...").collect()
return {"continue": 1}
Manifest declaration
tasks:
- name: PROCESS_CLIENTS
target_sp: process_one_client # SP in PUBLIC
multi_client_execution: true
trigger_rule: ALL
What this removes
No SP wrappers in client schemas. The client schema contains only data objects: tables, stages, event_table, secrets (delegated credentials). No orchestration logic.
Manifest example
dag:
notification_type: ON_ERROR
notify: [SUPPORT]
schedule: "{{vars.schedule}}"
stages:
- external_input_files # provisions stage + injects POLL_AND_DISPATCH
- external_output_files # provisions stage
- internal_exposed_files # provisions stage + VIEWER grants
- resources_static # provisions stage