Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
Graphene Metanode is a locally hosted node for one account and several trading pairs, which uses minimal RAM resources.

Graphene Metanode is a locally hosted node for one account and several trading pairs, which uses minimal RAM resources. It provides the necessary user stream data and order book data for trading in a

litepresence 5 May 08, 2022
Launcher program to select which version of the Q-Sys software to launch.

QSC-QSYS Launcher Launcher program to select which version of the Q-Sys software to launch. Instructions To use the application simply save the "Q-Sys

Zach Lisko 2 Sep 28, 2022
Project 2 for Microsoft Azure on WUT

azure-proj2 Project 2 for Microsoft Azure on WUT Table of contents Team Tematyka projektu Architektura Opis rozwiązania Demo dzałania The Team Krzyszt

1 Dec 07, 2021
Telegram bot for Urban Dictionary.

Urban Dictionary Bot @TheUrbanDictBot A star ⭐ from you means a lot to us! Telegram bot for Urban Dictionary. Usage Deploy to Heroku Tap on above butt

Stark Bots 17 Nov 24, 2022
Paprika is a python library that reduces boilerplate. Heavily inspired by Project Lombok.

Image courtesy of Anna Quaglia (Photographer) Paprika Paprika is a python library that reduces boilerplate. It is heavily inspired by Project Lombok.

Rayan Hatout 55 Dec 26, 2022
Python Multilingual Ucrel Semantic Analysis System

PymUSAS Python Multilingual Ucrel Semantic Analysis System, it currently is a rule based token level semantic tagger which can be added to any spaCy p

UCREL 13 Nov 18, 2022
Your missing PO formatter and linter

pofmt Your missing PO formatter and linter Features Wrap msgid and msgstr with a constant max width. Can act as a pre-commit hook. Display lint errors

Frost Ming 5 Mar 22, 2022
Automated Changelog/release note generation

Quickly generate changelogs and release notes by analysing your git history. A tool written in python, but works on any language.

Documatic 95 Jan 03, 2023
Store Simulation

Almacenes Para clonar el Repositorio: Vaya a la terminal de Linux o Mac, o a la cmd en Windows y ejecute:

Johan Posada 1 Nov 12, 2021
An easy-to-learn, dynamic, interpreted, procedural programming language

Gen Programming Language WARNING!! THIS LANGUAGE IS IN DEVELOPMENT. ANYTHING CAN CHANGE AT ANY MOMENT. Gen is a dynamic, interpreted, procedural progr

Gen Programming Language 7 Oct 17, 2022
Back-end API for the reternal framework

RE:TERNAL RE:TERNAL is a centralised purple team simulation platform. Reternal uses agents installed on a simulation network to execute various known

Joey Dreijer 7 Apr 15, 2022
Encode and decode cancro lang files to and from brainfuck

cancrolang Encode and decode cancro lang files to and from brainfuck. examples python3 main.py -f hello.cancro --run Hello World! the interpreter is n

witer33 1 Dec 20, 2021
Sudoku-Solver

Sudoku-Solver This is a personal project, that put all my today knowledges to the test, is a project that im developing alone with a lot of effort and

Carlos Ismael Gitto Bernales 5 Nov 08, 2021
All Assignments , Test , Quizzes and Exams with solutions from NIT Patna B.Tech CSE 5th Semester.

A 🌟 to repo would be delightful, just do it ✔️ it is inexpensive. All Assignments , Quizzes and Exam papers at one place with clean and elegant solut

LakhanKumawat ᵖ⁺ 16 Dec 05, 2022
奇遇淘客服务器端

奇遇淘客 APP 服务器端 警告 正在使用 v0.2.0 版本的用户,请尽快升级到 v0.2.1。 v0.2.0 版本的 Docker 镜像中包含了有问题的 aiohttp。 奇遇淘客代码库 奇遇淘客 iOS APP 奇遇淘客 Android APP 奇遇淘客文档 服务器端文档 Docker 使用

奇遇科技 92 Nov 09, 2022
CBO uses its Capital Tax model (CBO-CapTax) to estimate the effects of federal taxes on capital income from new investment

CBO’s CapTax Model CBO uses its Capital Tax model (CBO-CapTax) to estimate the effects of federal taxes on capital income from new investment. Specifi

Congressional Budget Office 7 Dec 16, 2022
Submission to the HEAR2021 Challenge

Submission to the HEAR 2021 Challenge For model evaluation, python=3.8 and cuda10.2 with cudnn7.6.5 have been tested. The work uses a mixed supervised

Heinrich Dinkel 10 Dec 08, 2022
A simple python script to convert Rubber Ducky payloads into AutoHotKey scripts

AHKDuckyReplacer A simple python script to convert Rubber Ducky payloads into AutoHotKey scripts. I have also added a sample payload for testing. I wi

Krizsan0596 5 Sep 28, 2022
Small scripts to learn about GNOME internals

gnome-hacks This is a collection of APIs that allow programmatic manipulation of the GNOME shell. If you use GNOME (the default graphical shell in Ubu

Alex Nichol 5 Oct 22, 2021
A program that makes all 47 textures of Optifine CTM only using 2 textures

A program that makes all 47 textures of Optifine CTM only using 2 textures

1 Jan 22, 2022