A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Advanced python code - For students in my advanced python class

advanced_python_code For students in my advanced python class Week Topic Recordi

Ariel Avshalom 3 May 27, 2022
Automatically give thanks to Pypi packages you use in your project!

Automatically give thanks to Pypi packages you use in your project!

Ward 25 Dec 20, 2021
The first Python 1v1.lol triggerbot working with colors !

1v1.lol TriggerBot Afin d'utiliser mon triggerbot, vous devez activer le plein écran sur 1v1.lol sur votre naviguateur (quelque-soit ce dernier). Vous

Venax 5 Jul 25, 2022
Python MQTT v5.0 async client

gmqtt: Python async MQTT client implementation. Installation The latest stable version is available in the Python Package Index (PyPi) and can be inst

Gurtam 306 Jan 03, 2023
Implements a polyglot REPL which supports multiple languages and shared meta-object protocol scope between REPLs.

MetaCall Polyglot REPL Description This repository implements a Polyglot REPL which shares the state of the meta-object protocol between the REPLs. Us

MetaCall 10 Dec 28, 2022
Compress .dds file in ggpk to boost fps. This is a python rewrite of PoeTexureResizer.

PoeBooster Compress .dds file in ggpk to boost fps. This is a python rewrite of PoeTexureResizer. Setup Install ImageMagick-7.1.0. Download and unzip

3 Sep 30, 2022
Class and mathematical functions for quaternion numbers.

Quaternions Class and mathematical functions for quaternion numbers. Installation Python This is a Python 3 module. If you don't have Python installed

3 Nov 08, 2022
pyRTOS is a real-time operating system (RTOS), written in Python.

pyRTOS Introduction pyRTOS is a real-time operating system (RTOS), written in Python. The primary goal of pyRTOS is to provide a pure Python RTOS that

Ben Williams 96 Dec 30, 2022
Extrator de dados do jupiterweb

Extrator de dados do jupiterweb O programa é composto de dois arquivos: Um constando apenas de classes complementares que representam as unidades e as

Bruno Aricó 2 Nov 28, 2022
🗽 Like yarn outdated/upgrade, but for pip. Upgrade all your pip packages and automate your Python Dependency Management.

pipupgrade The missing command for pip Table of Contents Features Quick Start Usage Basic Usage Docker Environment Variables FAQ License Features Upda

Achilles Rasquinha 529 Dec 31, 2022
A simple 3D rigid body simulation written in python

pyRigidBody3d A simple 3D rigid body simulation written in python

30 Oct 07, 2022
Simple script with AminoLab to send ghost messages

Simple script with AminoLab to send ghost messages

Moleey 1 Nov 22, 2021
A system for assigning and grading notebooks

nbgrader Linux: Windows: Forum: Coverage: Cite: A system for assigning and grading Jupyter notebooks. Documentation can be found on Read the Docs. Hig

Project Jupyter 1.2k Dec 26, 2022
Cash in on Expressed Barcode Tags (EBTs) from NGS Sequencing Data with Python

Cash in on Expressed Barcode Tags (EBTs) from NGS Sequencing Data with Python Cashier is a tool developed by Russell Durrett for the analysis and extr

3 Sep 11, 2022
A Non profit app built on top of Frappe framework & ERPNext

Non Profit A Non profit app built on top of Frappe framework & ERPNext. People who change the world need the tools to do it! The Non Profit Modules of

Frappe 16 Nov 17, 2022
「📖」Tool created to extract metadata from a domain

Metafind is an OSINT tool created with the aim of automating the search for metadata of a particular domain from the search engine known as Google.

9 Dec 28, 2022
1 May 12, 2022
Modify the value and status of the records KoboToolbox

Modify the value and status of the records KoboToolbox (Modica el valor y status de los registros de KoboToolbox)

1 Oct 30, 2021
Retrying is an Apache 2.0 licensed general-purpose retrying library, written in Python, to simplify the task of adding retry behavior to just about anything.

Retrying Retrying is an Apache 2.0 licensed general-purpose retrying library, written in Python, to simplify the task of adding retry behavior to just

Ray Holder 1.9k Dec 29, 2022
✔️ Create to-do lists to easily manage your ideas and work.

Todo List + Add task + Remove task + List completed task + List not completed task + Set clock task time + View task statistics by date Changelog v 1.

Abbas Ataei 30 Nov 28, 2022