A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Script para generar automatización de registro de formularios IEEH

Formularios_IEEH Script para generar automatización de registro de formularios IEEH Corresponde a un conjunto de script en python que permiten la auto

vhevia11 1 Jan 06, 2022
One destination for all the developer's learning resources.

DevResources One destination for all the developer's learning resources. Find all of your learning resources under one roof and add your own. Live ✨ Y

Gaurav Sharma 33 Oct 21, 2022
A faster copy of nell's comet nuker

Astro a faster copy of nell's comet nuker also nell uses external libraries like it's cocaine man never learned to use ansi color codes (ily nell) (On

horrid 8 Aug 15, 2022
A simple calculator that can add, subtract, multiply or divide depending upon the input from the user

Calculator A simple calculator that can add, subtract, multiply or divide depending upon the input from the user. In this example, we should have the

Jayesh Mali 1 Dec 27, 2021
UFDR2DIR - A script to convert a Cellebrite UFDR to the original file structure

UFDR2DIR A script to convert a Cellebrite UFDR to it's original file and directo

DFIRScience 25 Oct 24, 2022
BloodCheck enables Red and Blue Teams to manage multiple Neo4j databases and run Cypher queries against a BloodHound dataset.

BloodCheck BloodCheck enables Red and Blue Teams to manage multiple Neo4j databases and run Cypher queries against a BloodHound dataset. Installation

Mr B0b 16 Nov 05, 2021
A collection of resources on neural rendering.

awesome neural rendering A collection of resources on neural rendering. Contributing If you think I have missed out on something (or) have any suggest

1.8k Dec 30, 2022
Zeus is an open source flight intellingence tool which supports more than 13,000+ airlines and 250+ countries.

Zeus Zeus is an open source flight intellingence tool which supports more than 13,000+ airlines and 250+ countries. Any flight worldwide, at your fing

DeVickey 1 Oct 22, 2021
Dump Data from FTDI Serial Port to Binary File on MacOS

Dump Data from FTDI Serial Port to Binary File on MacOS

pandy song 1 Nov 24, 2021
PatZilla is a modular patent information research platform and data integration toolkit with a modern user interface and access to multiple data sources.

PatZilla is a modular patent information research platform and data integration toolkit with a modern user interface and access to multiple data sources.

IP Tools 68 Dec 14, 2022
Render reMarkable documents to PDF

rmrl: reMarkable Rendering Library rmrl is a Python library for rendering reMarkable documents to PDF files. It takes the original PDF document and th

Robert Schroll 95 Dec 25, 2022
Tools for analyzing Java JVM gc log files

gc_log This package consists of two separate utilities useful for : gc_log_visualizer.py regionsize.py GC Log Visualizer This was updated to run under

Brad Schoening 0 Jan 04, 2022
Integration between the awesome window manager and the firefox web browser.

Integration between the awesome window manager and the firefox web browser.

contribuewwt 3 Feb 02, 2022
Stocks Trading News Alert Using Python

Stocks-Trading-News-Alert-Using-Python Ever Thought of Buying Shares of your Dream Company, When their stock price got down? But It is not possible to

Ayush Verma 3 Jul 29, 2022
A python script to turn tabs into spaces the right way.

detab A python script to turn tabs into spaces the right way. detab turns all tabs into spaces, not just leading tabs. Not all tabs have the same leng

1 Jan 26, 2022
Estimating the potential photovoltaic production of buildings (in Berlin)

The following people contributed equally to this repository (in alphabetical order): Daniel Bumke JJX Corstiaen Versteegh This repository is forked on

Daniel Bumke 6 Feb 18, 2022
The LiberaPay archive module for the SeanPM life archive project.

By: Top README.md Read this article in a different language Sorted by: A-Z Sorting options unavailable ( af Afrikaans Afrikaans | sq Shqiptare Albania

Sean P. Myrick V19.1.7.2 1 Aug 26, 2022
A simple package for interacting with the 9kw.eu anti-captcha service.

Welcome to captcha9kw’s documentation! captcha9kw is a smallish Python package for making use of the 9kw.eu services, including solving of interactive

2 Feb 26, 2022
Pre-1.0 door/chest sound injector for Minecraft

doorjector Pre-1.0 door/chest sound injector for Minecraft. While the game is running, doorjector hotswaps the new sounds for the old right before the

Sam 1 Nov 20, 2021
A free and open-source chess improvement app that combines the power of Lichess and Anki.

A free and open-source chess improvement app that combines the power of Lichess and Anki. Chessli Project Activity & Issue Tracking PyPI Build & Healt

93 Nov 23, 2022