Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
Cloud-native SIEM for intelligent security analytics for your entire enterprise.

Microsoft Sentinel Welcome to the Microsoft Sentinel repository! This repository contains out of the box detections, exploration queries, hunting quer

Microsoft Azure 2.9k Jan 02, 2023
Repo to demo translating colab/jupyter notebook to streamlit webapp

Repo to demo translating colab/jupyter notebook to streamlit webapp

Marisa Smith 2 Feb 02, 2022
Tool to audit and fix Python project requirements.

Requirement Auditor Utility to revise and updated python requirement files.

Luis Carlos Berrocal 1 Nov 07, 2021
Cobalt Strike Sleep Python Bridge

This project is 'bridge' between the sleep and python language. It allows the control of a Cobalt Strike teamserver through python without the need for for the standard GUI client. NOTE: This project

Cobalt Strike 140 Jan 04, 2023
Um Script De Mensagem anonimas Para linux e Termux Feito em python

Um Script De Mensagem anonimas Para linux e Termux Feito em python feito em um celular

6 Sep 09, 2021
Connect Playground - easy way to fill in your account with production-like objects

Just set of scripts to initialise accpunt with production-like data: A - Basic Distributor Account Initialization INPUT Distributor Account Token ACTI

CloudBlue 5 Jun 25, 2021
This repository contains a lot of short scripting programs implemented both in Python (Flask) and TypeScript (NodeJS).

fast-scripts This repository contains a lot of short scripting programs implemented both in Python (Flask) and TypeScript (NodeJS). In python These wi

Nahum Maurice 3 Dec 10, 2022
Explore-bikeshare-data - GitHub project as part of the Programming for Data Science with Python Nanodegree from Udacity

Date created February 10, 2022 Project Title Explore US Bikeshare Data Descripti

Thárcyla 1 Feb 14, 2022
A Python 3 client for the beanstalkd work queue

Greenstalk Greenstalk is a small and unopinionated Python client library for communicating with the beanstalkd work queue. The API provided mostly map

Justin Mayhew 67 Dec 08, 2022
WGGCommute - Adding Commute Times to WG-Gesucht Listings

WGGCommute - Adding Commute Times to WG-Gesucht Listings This is a barebones implementation of a chrome extension that can be used to add commute time

Jannis 2 Jul 20, 2022
Simple macOS StatusBar app to remind you to unplug your laptop when sufficiently charged

ChargeMon Simple macOS StatusBar app to monitor battery charge status and remind you to unplug your Mac when the battery is sufficiently charged Overv

Rhet Turnbull 5 Jan 25, 2022
Expression interpreter written in Python

Calc Interpreter An interpreter modeled after a calculator implemented in Python 3. The program currently only supports basic mathematical expressions

1 Oct 17, 2021
A modern python module including many useful features that make discord bot programming extremely easy.

discord-super-utils Documentation Secondary Documentation A modern python module including many useful features that make discord bot programming extr

106 Dec 19, 2022
A timer for bird lovers, plays a random birdcall while displaying its image and info.

Birdcall Timer A timer for bird lovers. Siriema hatchling by Junior Peres Junior Background My partner needed a customizable timer for sitting and sta

Marcelo Sanches 1 Jul 08, 2022
Standalone PyQGIS application for executing custom scripts without a QGIS GUI.

PyQGIS Standalone Script Executer Standalone PyQGIS application that is able to run a custom script, in this case Proximity.py without the need of a G

6 Sep 23, 2022
Welcome to my pod transcript search webb app!

pod_transcript_search Welcome to the pod transcript search webb app! Tech stack used: Languages used: Python (for the back-end), JavaScript (for the f

3 Feb 04, 2022
A example project's description is a high-level overview of why you’re doing a project.

A example project's description is a high-level overview of why you’re doing a project.

Nikita Matyukhin 12 Mar 23, 2022
Free components that wrap up Python into Delphi and Lazarus (FPC)

Python for Delphi (P4D) is a set of free components that wrap up the Python DLL into Delphi and Lazarus (FPC). They let you easily execute Python scri

747 Jan 02, 2023
Open-source library for analyzing the results produced by ABINIT

Package Continuous Integration Documentation About AbiPy is a python library to analyze the results produced by Abinit, an open-source program for the

ABINIT 91 Dec 09, 2022
Blender addons - A collection of Blender tools I've written for myself over the years.

gret A collection of Blender tools I've written for myself over the years. I use these daily so they should be bug-free, mostly. Feel free to take and

217 Jan 08, 2023