Cloud Composer DAG Job

The Cloud Composer DAG job type triggers an Apache Airflow DAG run in a Google Cloud Composer environment via the Airflow REST API (POST /dags/{dag_id}/dagRuns). The DAG is identified by its dag_id — Polysync stores that in the Job's External Id.

This job type is supported on the Google Cloud Composer platform.

Required job fields

  • External Id — the Airflow dag_id.
  • Job TypeCloud Composer DAG (set automatically on import).

Job discovery

GET /dags?limit=100&offset=0&only_active=false against the Composer environment's Airflow web server, paginated via offset / limit. For each DAG, GET /dags/{dag_id}/details is followed to import the declared params schema (with type information).

Parameter handling

Input + Input&Output parameters are passed as Airflow's conf dict on the DAG run:

{
  "dag_run_id": "polysync_<unique>",
  "conf": {
    "<param-1>": <typed-value>,
    "<param-2>": <typed-value>
  }
}
Direction Sent in conf Updated from response
Input
Output (not supported)
Input&Output (not supported)

Values are typed via the Polysync Data Type and serialized as JSON tokens (string, int, bool, JSON object, JSON array). The DAG receives them in dag_run.conf (Airflow 2.x).

Output parameters are not supported. Airflow surfaces the conf dict back on the run status response unchanged. Persist DAG outputs to GCS / BigQuery / XCom and read them from a downstream Task.

Execution flow

  1. Polysync POSTs the body above to POST /dags/{dag_id}/dagRuns. The composite RunId is the dag_run_id.

  2. Status is polled via GET /dags/{dag_id}/dagRuns/{dag_run_id} and decoded:

    Airflow state Polysync status
    success Success
    failed / upstream_failed Failed
    running Running
    up_for_retry / up_for_reschedule Running
    queued / scheduled Starting
    (other) Unknown
  3. Cancel is supported via PATCH /dags/{dag_id}/dagRuns/{dag_run_id} with body { "state": "failed" } — Airflow stops scheduling further tasks on the run.

Monitor URL

{airflowWebServerUrl}/dags/{dag_id}/grid?dag_run_id={dag_run_id}

Best practices

  • Declare DAG params with explicit type so the Polysync importer seeds the parameter schema with the correct Data Types.
  • Use Input&Output only when downstream Polysync Tasks need to chain the same value — DAG runs themselves don't update Polysync outputs.
  • For IAP-protected environments, populate the Google IAP Client Id platform attribute so Polysync requests an OIDC token with the matching audience.

Troubleshooting

  • HTTP 404 on execute — the dag_id was deleted or paused. Re-run Get Pipelines to refresh, or unpause the DAG in Airflow.
  • HTTP 401 — IAP authentication failed; verify Google IAP Client Id and the service account has the IAP-secured Web App User role.
  • DAG stuck in queued — Composer scheduler is unhealthy or DAG concurrency limits are blocking. Inspect Airflow's scheduler logs.