Django Q

Requirement

Installation

  • Install the latest version with pip:

    $ pip install django-q
    
  • Add django_q to your INSTALLED_APPS in your projects settings.py:

    INSTALLED_APPS = (
        ...
        'django_q',
        ...
    )
    
  • Run Django migrations to create the database tables:

    $ python manage.py migrate
    

Settings.py

try:
    from func_settings import redis_conf, redis_connect
    REDIS_CACHE = redis_connect(REDIS.get('default', {}))

    Q_CLUSTER = {
        'name': 'hongbaoyun',
        'workers': 8,
        'recycle': 500,
        'timeout': 60,
        'compress': True,
        'cpu_affinity': 1,
        'save_limit': 250,
        'queue_limit': 500,
        'label': 'Django Q',
        'redis_conn': REDIS_CACHE,
        # 'redis': redis_conf(REDIS.get('default', {})),
    }
except ImportError:
    REDIS_CACHE = None

Usage

  • Management Commands

    • Start a cluster with:
      $ python manage.py qcluster
      
    • Monitor your clusters with:
      $ python manage.py qmonitor
      
    • Check overall statistics with:
      $ python manage.py qinfo
      
  • Tasks

    • Functions

      • test_func.py

        def django_q_test_func(value):
            for i in range(1, value + 1):
                print i
                time.sleep(1)
            return 'Success'
        
        def print_result(task):
            print(task.result)
        
    • Just Async Exec

      from django_q.tasks import async
      
      async('test_func.django_q_test_func', 10)
      
      • qcluster

        15:40:00 [Q] INFO Process-1:1 processing [spaghetti-yellow-ink-mirror]
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        15:40:10 [Q] INFO Processed [spaghetti-yellow-ink-mirror]
        
    • Get Task_id/Result

      from django_q.tasks import async
      
      taskid = async('test_func.django_q_test_func', 10)
      # 0a364fb6faad47a89523d1013ca8ad59
      
      • Get Result

        In [1]: from django_q.tasks import result
        
        In [2]: result('0a364fb6faad47a89523d1013ca8ad59')
        Out[2]: 'Success'
        
    • Hook After Executed

      from django_q.tasks import async
      
      taskid = async('test_func.django_q_test_func', 10, hook='test_func.print_result')
      
      • qcluster

        15:49:57 [Q] INFO Process-1:2 processing [sad-football-fix-floor]
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        Success
        15:50:07 [Q] INFO Processed [sad-football-fix-floor]
        
  • Schedules

    • TODO

Supervisor

[program:djangoq]
command=/home/diors/env/bin/python /home/diors/work/tt4it/manage.py qcluster
autostart=true
autorestart=true
startretries=3
exitcodes=0,1,2
stopsignal=INT
stdout_logfile=/home/diors/supervisorlog/supervisor_djangoq_access.log
stderr_logfile=/home/diors/supervisorlog/supervisor_djangoq_error.log
user=diors

Admin

String vs. Instance

# Get the function from the task
logger.info(_('{} processing [{}]').format(name, task['name']))
f = task['func']
# if it's not an instance try to get it from the string
if not callable(f):
    try:
        module, func = f.rsplit('.', 1)
        m = importlib.import_module(module)
        f = getattr(m, func)
    except (ValueError, ImportError, AttributeError) as e:
        result = (e, False)
        if rollbar:
            rollbar.report_exc_info()
  • Right

    async('test_func.django_q_test_func', 10)
    
    or
    
    from test_func import django_q_test_func
    async(django_q_test_func, 10)
    
  • Wrong

    from test_func import django_q_test_func
    async('django_q_test_func', 10)
    # module, func = f.rsplit('.', 1)
    # 19:08:58 [Q] ERROR Failed [xxx] - need more than 1 value to unpack
    
  • Solution

    f = locals().get(f)
    if not f:
        module, func = f.rsplit('.', 1)
        m = importlib.import_module(module)
        f = getattr(m, func)
    

Redis Connect from Django Q

$ python manage.py shell
Python 2.7.5 (default, Mar  9 2014, 22:15:05)
Type "copyright", "credits" or "license" for more information.

IPython 4.0.0 -- An enhanced Interactive Python.
?         -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help      -> Python's own help system.
object?   -> Details about 'object', use 'object??' for extra details.

In [1]: from django_q.brokers.redis_broker import Redis

In [2]: r = Redis()

In [3]: r.connection
Out[3]: StrictRedis<ConnectionPool<Connection<host=localhost,port=6379,db=1>>>

References

[1] Koed00/[email protected], A multiprocessing distributed task queue for Django

results matching ""

    No results matching ""