A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
The Open edX platform, the software that powers edX!

This is the core repository of the Open edX software. It includes the LMS (student-facing, delivering courseware), and Studio (course authoring) compo

edX 6.2k Jan 01, 2023
UdemyPy is a bot that hourly looks for Udemy free courses and post them in my Telegram Channel: Free Courses.

UdemyPy UdemyPy is a bot that hourly looks for Udemy free courses and post them in my Telegram Channel: Free Courses. How does it work? For publishing

88 Dec 25, 2022
The blancmange curve can be visually built up out of triangle wave functions if the infinite sum is approximated by finite sums of the first few terms.

Blancmange-curve The blancmange curve can be visually built up out of triangle wave functions if the infinite sum is approximated by finite sums of th

Shankar Mahadevan L 1 Nov 30, 2021
Tool that adds githuh profile views to ur acc

Tool that adds githuh profile views to ur acc

Lamp 2 Nov 28, 2021
ARK sõidueksami Matrixi bot

ARK Sõidueksami bot Küsib ARK-i lehelt uusimad eksami ajad ja saadab sõnumi Matrixi kanali Dev setup Linux python3 -m venv venv source venv/bin/activa

Arti Zirk 3 Jun 15, 2021
Turn crypto miner on/off depending on powerwall charge level

Mining Crypto with Tesla Solar and Powerwalls This script turns a crypto miner on and off when the Tesla Powerwall level drops/rises above a certain t

Matt 1 Nov 09, 2021
Launcher program to select which version of the Q-Sys software to launch.

QSC-QSYS Launcher Launcher program to select which version of the Q-Sys software to launch. Instructions To use the application simply save the "Q-Sys

Zach Lisko 2 Sep 28, 2022
Time python - Códigos para auxiliar e mostrar formas de como fazer um relógio e manipular o seu tempo

Time_python Códigos para auxiliar e mostrar formas de como fazer um relógio e manipular o seu tempo. Bibliotecas Nestes foram usadas bibliotecas nativ

Eduardo Henrique 1 Jan 03, 2022
All exercises done during the Python 3 course in the Video Course (World 1, 2 and 3)

Python3-cursoemvideo-exercises - All exercises done during the Python 3 course in the Video Course (World 1, 2 and 3)

Renan Barbosa 3 Jan 17, 2022
Taking the fight to the establishment.

Throwdown Taking the fight to the establishment. Wat? I wanted a simple markdown interpreter in python and/or javascript to output html for my website

Trevor van Hoof 1 Feb 01, 2022
A simple BrainF**k compiler written in Python

bf-comp A simple BrainF**k compiler written in Python. What else were you looking for?

1 Jan 09, 2022
A proof-of-concept package manager for Cairo contracts/libraries

glyph A proof-of-concept package manager for Cairo contracts/libraries. Distribution through pypi. Installation through existing package managers -- p

Sam Barnes 11 Jun 06, 2022
Video Stream is an Advanced Telegram Bot that's allow you to play Video & Music on Telegram Group Video Chat

Video Stream is an Advanced Telegram Bot that's allow you to play Video & Music on Telegram Group Video Chat 📊 Stats 🧪 Get SESSION_NAME from below:

dark phoenix 12 May 08, 2022
Buildium-to-stessa - Automation to assist in converting Buildium transactions into Stessa format

Buildium Transactions - Stessa Transactions There is currently no third-party i

Austin Comstock 4 Apr 17, 2022
msgqywx 使用企业微信的应用消息推送实时信息

msgqywx 使用企业微信的应用消息推送实时信息

Demon Finch 8 Dec 18, 2022
CPython extension implementing Shared Transactional Memory with native-looking interface

CPython extension implementing Shared Transactional Memory with native-looking interface

21 Jul 22, 2022
The Playwright Workshop for TAU: The Homecoming

tau-playwright-workshop This repository contains the instructions and example code for the Playwright workshop for TAU: The Homecoming on December 1,

Pandy Knight 134 Dec 30, 2022
Interactivity Lab: Household Pulse Explorable

Interactivity Lab: Household Pulse Explorable Goal: Build an interactive application that incorporates fundamental Streamlit components to offer a cur

1 Feb 10, 2022
Declarative and extensible library for configuration & code separation

ClassyConf ClassyConf is the configuration architecture solution for perfectionists with deadlines. It provides a declarative way to define settings f

83 Dec 07, 2022
Procedural modeling of fruit and sandstorm in Blender (bpy).

SandFruit Procedural modelling of fruit and sandstorm. Created by Adriana Arcia and Maya Boateng. Last updated December 19, 2020 Goal & Inspiration Ou

Adriana Arcia 2 Mar 20, 2022