Create an account

Very important

  • To access the important data of the forums, you must be active in each forum and especially in the leaks and database leaks section, send data and after sending the data and activity, data and important content will be opened and visible for you.
  • You will only see chat messages from people who are at or below your level.
  • More than 500,000 database leaks and millions of account leaks are waiting for you, so access and view with more activity.
  • Many important data are inactive and inaccessible for you, so open them with activity. (This will be done automatically)


Thread Rating:
  • 492 Vote(s) - 3.44 Average
  • 1
  • 2
  • 3
  • 4
  • 5
How can I use Tornado and Redis asynchronously?

#1
I'm trying to find how can I use Redis and Tornado asynchronously. I found the [tornado-redis][1] but I need more than just add a `yield` in the code.

I have the following code:

import redis
import tornado.web

class WaiterHandler(tornado.web.RequestHandler):

@tornado.web.asynchronous
def get(self):
client = redis.StrictRedis(port=6279)
pubsub = client.pubsub()
pubsub.subscribe('test_channel')

for item in pubsub.listen():
if item['type'] == 'message':
print item['channel']
print item['data']

self.write(item['data'])
self.finish()


class GetHandler(tornado.web.RequestHandler):

def get(self):
self.write("Hello world")


application = tornado.web.Application([
(r"/", GetHandler),
(r"/wait", WaiterHandler),
])

if __name__ == '__main__':
application.listen(8888)
print 'running'
tornado.ioloop.IOLoop.instance().start()


I need getting access the `/` url and get the "Hello World" while there's a request pending in the `/wait`.
How can I do it?

[1]:

[To see links please register here]

Reply

#2
You should not use Redis pub/sub in the main Tornado thread, as it will block the IO loop. You can handle the long polling from web clients in the main thread, but you should create a separate thread for listening to Redis. You can then use `ioloop.add_callback()` and/or a `threading.Queue` to communicate with the main thread when you receive messages.
Reply

#3
You need to use Tornado IOLoop compatible redis client.

There are few of them available, [toredis][1], [brukva][2], etc.

Here's pubsub example in toredis:

[To see links please register here]


[1]:

[To see links please register here]

[2]:

[To see links please register here]

Reply

#4
Okay, so here's my example of how I would do it with get requests.

I added two main components:

The first is a simple threaded pubsub listener which appends new messages into a local list object.
I also added list accessors to the class, so you can read from the listener thread as if you were reading from a regular list. As far as your `WebRequest` is concerned, you're just reading data from a local list object. This returns immediately and doesn't block current request from completing or future requests from being accepted and processed.

class OpenChannel(threading.Thread):
def __init__(self, channel, host = None, port = None):
threading.Thread.__init__(self)
self.lock = threading.Lock()
self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
self.pubsub = self.redis.pubsub()
self.pubsub.subscribe(channel)

self.output = []

# lets implement basic getter methods on self.output, so you can access it like a regular list
def __getitem__(self, item):
with self.lock:
return self.output[item]

def __getslice__(self, start, stop = None, step = None):
with self.lock:
return self.output[start:stop:step]

def __str__(self):
with self.lock:
return self.output.__str__()

# thread loop
def run(self):
for message in self.pubsub.listen():
with self.lock:
self.output.append(message['data'])

def stop(self):
self._Thread__stop()


The second is the ApplicationMixin class. This a secondary object you have your web request class inherit in order to add functionality and attributes. In this case it checks whether a channel listener already exists for the requested channel, creates one if none was found, and returns the listener handle to the WebRequest.

# add a method to the application that will return existing channels
# or create non-existing ones and then return them
class ApplicationMixin(object):
def GetChannel(self, channel, host = None, port = None):
if channel not in self.application.channels:
self.application.channels[channel] = OpenChannel(channel, host, port)
self.application.channels[channel].start()
return self.application.channels[channel]

The WebRequest class now treats the listener as if it were a static list (bearing in mind that you need to give `self.write` a string)

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
@tornado.web.asynchronous
def get(self, channel):
# get the channel
channel = self.GetChannel(channel)
# write out its entire contents as a list
self.write('{}'.format(channel[:]))
self.finish() # not necessary?

Finally, after application is created, I added an empty dictionary as an attribute

# add a dictionary containing channels to your application
application.channels = {}

As well as some cleanup of the running threads, once you exit the application

# clean up the subscribed channels
for channel in application.channels:
application.channels[channel].stop()
application.channels[channel].join()


**The complete code:**

import threading
import redis
import tornado.web



class OpenChannel(threading.Thread):
def __init__(self, channel, host = None, port = None):
threading.Thread.__init__(self)
self.lock = threading.Lock()
self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
self.pubsub = self.redis.pubsub()
self.pubsub.subscribe(channel)

self.output = []

# lets implement basic getter methods on self.output, so you can access it like a regular list
def __getitem__(self, item):
with self.lock:
return self.output[item]

def __getslice__(self, start, stop = None, step = None):
with self.lock:
return self.output[start:stop:step]

def __str__(self):
with self.lock:
return self.output.__str__()

# thread loop
def run(self):
for message in self.pubsub.listen():
with self.lock:
self.output.append(message['data'])

def stop(self):
self._Thread__stop()


# add a method to the application that will return existing channels
# or create non-existing ones and then return them
class ApplicationMixin(object):
def GetChannel(self, channel, host = None, port = None):
if channel not in self.application.channels:
self.application.channels[channel] = OpenChannel(channel, host, port)
self.application.channels[channel].start()
return self.application.channels[channel]

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
@tornado.web.asynchronous
def get(self, channel):
# get the channel
channel = self.GetChannel(channel)
# write out its entire contents as a list
self.write('{}'.format(channel[:]))
self.finish() # not necessary?


class GetHandler(tornado.web.RequestHandler):

def get(self):
self.write("Hello world")


application = tornado.web.Application([
(r"/", GetHandler),
(r"/channel/(?P<channel>\S+)", ReadChannel),
])


# add a dictionary containing channels to your application
application.channels = {}


if __name__ == '__main__':
application.listen(8888)
print 'running'
try:
tornado.ioloop.IOLoop.instance().start()
except KeyboardInterrupt:
pass

# clean up the subscribed channels
for channel in application.channels:
application.channels[channel].stop()
application.channels[channel].join()
Reply

#5
For Python >= 3.3, I would advise you to use [aioredis][1].
I did not test the code below but it should be something like that:

import redis
import tornado.web
from tornado.web import RequestHandler

import aioredis
import asyncio
from aioredis.pubsub import Receiver


class WaiterHandler(tornado.web.RequestHandler):

@tornado.web.asynchronous
def get(self):
client = await aioredis.create_redis((host, 6279), encoding="utf-8", loop=IOLoop.instance().asyncio_loop)

ch = redis.channels['test_channel']
result = None
while await ch.wait_message():
item = await ch.get()
if item['type'] == 'message':
print item['channel']
print item['data']
result = item['data']

self.write(result)
self.finish()


class GetHandler(tornado.web.RequestHandler):

def get(self):
self.write("Hello world")


application = tornado.web.Application([
(r"/", GetHandler),
(r"/wait", WaiterHandler),
])

if __name__ == '__main__':
print 'running'
tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop')
server = tornado.httpserver.HTTPServer(application)
server.bind(8888)
# zero means creating as many processes as there are cores.
server.start(0)
tornado.ioloop.IOLoop.instance().start()


[1]:

[To see links please register here]

"aioredis"
Reply



Forum Jump:


Users browsing this thread:
1 Guest(s)

©0Day  2016 - 2023 | All Rights Reserved.  Made with    for the community. Connected through