Artikel ini merupakan terjemahan seri tutorial Real time Chat application built with Vue, Django, RabbitMQ and uWSGI WebSockets bagian kelima yang ditulis oleh Osaetin Daniel.
Daftar isi:
Akhirnya kita sampai di bagian pamungkas tutorial ini. Selama menulis tutorial ini penulis mendapatkan banyak hal dan penulis harap pembaca juga sama.
Di bagian 4, kita sudah mencapai tujuan utama tutorial ini yaitu membangun aplikasi web chat dengan django dan Vue. Akan tetapi kita menemukan masalah saat akan melakukan scaling aplikasi.
WebSockets
Dari tutorial terakhir, kita membahas sebentar apa itu WebScoket, apa itu bi-directional connection yang akan terus terhubung sehingga memungkinkan server berkomunikasi dengan klien dan sebaliknya.Django sendiri dikembangkan saat aplikasi web/website tidak serumit sekarang. Pada saat itu, server hanya akan merespon apa yang diminta. Misalnya kita bisa meminta "Halo mang server, saya mau artikel dari tanggal 1 Januari 2005" lalu servernya membalas "Siap bos, bentar yak!" lalu si server mengambil artikel yang diminta dan memberikan respon "Nih bos artikel yang diminta" lalu server tidur lagi untuk menunggu seseorang meminta rikues baru.
Apa yang bisa kita simpulkan dari contoh di atas adalah, komunikasi yang terjadi hanya diinisiasi oleh klien.
Ini masalah yang dihadapi oleh Django dan web framework Python lainnya karena sistem yang mereka pakai (
WSGI
) sudah terikat dengan pola komunikasi request-response
seperti ini.Oleh karena itu banyak orang mencoba menyelesaikannya dengan mencari solusi lain diantaranya:
- Menggunakan framework/server web baru (Twisted, Tornado, dll.).
- Menggunakan async engine dengan server websocket (gevent).
- Menambah fitur ke server WSGI yang sudah ada (
uWSGI
).
WSGI
.Django channels
Andrew Godwin membawa websockets ke django melaluidjango-channels
. Saat tutorial ini ditulis, pustaka ini menjadi project resmi dari django software foundation. Artinya ia tidak akan ditinggalkan dalam waktu dekat.django-channels
memperkenalkan protokol baru bernama ASGI
yang berbeda dari WSGI
. django-channels datang dengan servernya sendiri bernama Daphne
. Daphne
bisa menangani koneksi http biasa atau koneksi WebSocket.Jika memutuskan untuk menggunakan
django-channels
kita harus mempelajari APi dan method-methodnya. Kita juga harus mengubah proses deployment.Untuk melakukan scale seacara horizontal ke beberapa mesin kita membutuhkan apa yang disebut oleh
django-channels
sebagai layer Channel. Layer yang direkomendasikan adalah layer Redis
. Adalah juga layer channel RabbitMQ
juga sebuah IPC (Inter Procses Communication) Layer. Layer Channel ini adalah perekat antara django dengan server Daphne
. Redis
dan RabbitMQ
biasanya dipakai untuk melakukan scale
channel secara horizontal. Layer IPC channel lebih cepat tapi hanya
cocok untuk satu server karena semua proses menggunakan sebuah shared memory untuk komunikasinya.Ada beberapa kekurangan menggunakan channel Redis. Redis tidak memiliki dukungan TLS secara native dan dukungannya untuk Persistent queues tidak sebaik RabbitMQ.
Selain itu, karena ASGI Specification,
django-channels
mengemulasi Pub/Sub
(ia tidka memakai Pub/Sub dari Redis atau RabbitmQ) yang kurang bagus
jika kita perlu mendengarkan sebuah channel secara langsung.Kita akan membangun sistem yang mirip dengan
django-channels
tapi dengan level yang lebih rendah. Kita akan membaca queue RabbitMQ
secara langsung. uWSGI
akan mengambil peran yang mirip seperti server Daphne
.Perbedaan antara
django-channels
dan pendekatan yang kita lakukan adalah kita tidak terbatas membuat satu server WebSocket. Kita bisa saja mengganti uWSGI
dengan server WebSocket lain tanpa kesulitan yang berarti.uWSGI WebSockets
unbit (developers uWSGI) mengambil pendekatan yang berbeda, mereka memutuskan untuk mengintegrasikan WebSockets langsung kedalam uWSGI Core. uWSGI merupakan server Web WSGI dengan performa yang cukup baik. Bisa dibilang ia merupakan Server WSGI paling populer. Ia juga mendukung beberapa bahasa pemrograman sepertiPerl
, Ruby
bahkan Go
.Jika pembaca membutuhkan WebSockets tapi sudah menggunakan
uWSGI
, pembaca tidak perlu mengubah apapun. Bahkan jika menggunakan Server WSGI
lain seperti gunicorn
, kita hanya perlu menjalankan pip install uswgi
.Jika pembaca masih ingat diskusi kita diawal bagian 3 tentang RabbitMQ. Ingat dimana penulis mengatakan bagaimana RabbitMQ menjadi perekat antara
uWSGI
dengan django
.Kita perlu membuat notifikasi dan mengirimkannya ke Queue RabbitMQ dan melalui websocket pesan ini di broadcast langsung ke beberapa user.
Untuk memudahkan proses pembuatan notifikasi dan pengirimannya ke RabbitMQ, penulis membuat sebuah pustaka django bernama django-notifs.
Pasang dari PyPI.
pip install django-notifs
Lalu tambahkan ke INSTALLED_APPS
:INSTALLED_APPS = (
'django.contrib.auth',
...,
'rest_framework',
'rest_framework.authtoken',
'djoser',
# Our apps
'chat',
'notifications'
)
Memasang django-notifs
juga akan memasang pika
, pustaka untuk terhubung ke RabbitMQ.Lakukan migrasi dengan perintah
python manage.py migrate
Terakhir pasang
RabbitMQ
. Panduannya berbeda untuk tiap sistem operasi, jadi langsung saja ke halaman pemasangan untuk mengikuti panduannya.Sebelum melanjutkan, pastiakn bahwa server
RabbitMQ
sudah berjalan, jika tidak kita akan mendapat error dari pika
.Buka
views.py
dan ubah isi view ChatSessionMessageView
:from notifications.signals import notify
class ChatSessionMessageView(APIView):
...
def post(self, request, *args, **kwargs):
"""create a new message in a chat session."""
uri = kwargs['uri']
message = request.data['message']
user = request.user
chat_session = ChatSession.objects.get(uri=uri)
chat_session_message = ChatSessionMessage.objects.create(
user=user, chat_session=chat_session, message=message
)
notif_args = {
'source': user,
'source_display_name': user.get_full_name(),
'category': 'chat', 'action': 'Sent',
'obj': chat_session_message.id,
'short_description': 'You a new message', 'silent': True,
'extra_data': {'uri': chat_session.uri}
}
notify.send(sender=self.__class__, **notif_args)
return Response ({
'status': 'SUCCESS', 'uri': chat_session.uri, 'message': message,
'user': deserialize_user(user)
})
Tepat sebelum kita mengembalikan sebuah respon ke user, kita mengirim sinyal notifikasi.
Parameter
silent
artinya notifikasi tidak disimpan ke database. Dengan kata lain, kita menggunakan django-notifs
seperti event emitter. Kita juga bisa mengirim data di notifikasi menggunakan dictionary
sebagai argumen extra_data
.Notification channels
Django-notifs menggunakanchannels
untuk mengirim pesan. Itu artinya kita bisa menulis custom channel untuk mengirimkan pesan lewat email, SMS, Slack, atau apapun.Kita ingin mengirim pesan broadcast ke beberapa klien secara bersamaan. Pola komunikasi ini bernama Pub/Sub (Publish/Subscribe) dan RabbitMQ memiliki fitur ini sebagai
exchanges
.Sebuah
exchange
adalah channel yang menerima pesan dari producer (aplikasi kita) lalu mem-broadcast-nya ke beberapa queues. Ada empat tipe exchanges
yaitu direct, topic, headers dan fanout. Kita akan menggunakan fanout
karena yang paling mudah dipahami dan cocok dipakai untuk kasus kita .Berikut ilustrasi dari dokumentasi RabbitMQ tentang bagaimana fanout bekerja:
Sebelum sebuah queue menerima sebuah pesan ia harus di bound ke exchange (funout).
Untuk menerapkan pola Pub/Sub kita perlu menulis delivery channel sendiri.
Buat sebuah file dan beri nama
channels.py
"""Notification channels for django-notifs."""
from json import dumps
import pika
from notifications.channels import BaseNotificationChannel
class BroadCastWebSocketChannel(BaseNotificationChannel):
"""Fanout notification for RabbitMQ."""
def _connect(self):
"""Connect to the RabbitMQ server."""
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
return connection, channel
def construct_message(self):
"""Construct the message to be sent."""
extra_data = self.notification_kwargs['extra_data']
return dumps(extra_data['message'])
def notify(self, message):
"""put the message of the RabbitMQ queue."""
connection, channel = self._connect()
uri = self.notification_kwargs['extra_data']['uri']
channel.exchange_declare(exchange=uri, exchange_type='fanout')
channel.basic_publish(exchange=uri, routing_key='', body=message)
connection.close()
Kita mengatur nama
exchange
sesuai dengan uri
sebuah sesi chat.Kita juga menyimpan pesan chat dalam sebuah dictionary. Kita memerlukan seluruh data tentang pesan-pesan tersebut bukan hanya teks pesannya saja.
Coba buat dan kirim pesan baru, maka akan ada RabbitMQ exchange berdasarkan uri dari sesi chat yang baru.
Untuk melihat daftar exchanges, jalankan perintah ini di terminal (untuk sistem *nix):
rabbitmqctl list_exchanges
Listing exchanges
amq.match headers
amq.direct direct
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
direct
amq.fanout fanout
amq.headers headers
fe662fd9de834fc fanout # our Exchange
Pembaca juga bisa melihat beberapa exchanges
bawaan.Sekarang kita akan membuat queue secara dinamis dan menautkannya ke exchange yang sudah kita buat sebelumnya sehingga mereka bisa menerima pesan tersebut.
Buat file baru bernama
websocket.py
."""Receive messages over from RabbitMQ and send them over the websocket."""
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
channel.exchange_declare(
exchange='fe662fd9de834fc', exchange_type='fanout'
)
# exclusive means the queue should be deleted once the connection is closed
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue # random queue name generated by RabbitMQ
channel.queue_bind(exchange='fe662fd9de834fc', queue=queue_name)
print('listening for messages...')
while True:
for method_frame, _, body in channel.consume(queue_name):
try:
print(body)
except OSError as error:
print(error)
else:
# acknowledge the message
channel.basic_ack(method_frame.delivery_tag)
Sekali lagi, setelah terhubung ke
RabbitMQ
menggunakan pika
kita mendeklarasikan exchange.Mendeklarasikan sebuah exchange (atau queue) beberapa kali tidak memiliki efek tambahan, jika exchange ada sebelum
RabbitMQ
maka exchange tersebut tidak melakukan apapun.Mari lihat baris berikut lebih detail:
channel.queue_bind(exchange='fe662fd9de834fc', queue=queue_name)
Baris ini menautkan queue ke exchange. Yang dilakukannya seperti "Hei exchange, saya tertarik dengan pesan yang kamu terima. Kirimin ke saya dong."
queue_name
dibuat secara random oleh RabbitMQ
karen akita memanggil queue_declare
tanpa membuat nama.Ada beberapa cara untuk mengonsumsi pesna dari channel. Kita bisa menggunakan callback atau mengonsumsinya secara manual dengan
for loop
. Kita akan menggunakan opsi yang kedua sehingga dapat meng-handle exception
yang muncul saat mengirim pesan ke klien. Akan lebih jelas alasan
mengapa memiliih teknik yang kedua saat kita mengimplementasi WebSocket.channel.basic_ack(method_frame.delivery_tag)
memastikan bahwa klien sudah menerima pesan sehingga pesna tersebut
bisa di hapus dari queue. Jika pesan belum dipastikan, ia akan tetap ada
di dalam queue sampai queue tersebut dihapus.Sekarang jalankan file
websocket
dengan perintah:$ python websocket.py
listening for messages...
Kembali ke UI aplikasi chat dan kirimkan beberapa pesan. Penulis mengirim
"hello world"
dan "how are you doing"
lalu mendapat pesan sebagai berikut:listening for messages...
b'{"user": {"id": 1, "username": "danidee", "email": "", "first_name": "", "last_name": ""}, "message": "Hello world"}'
b'{"user": {"id": 12, "username": "daniel", "email": "", "first_name": "", "last_name": ""}, "message": "How are you doing"}'
Buka sebuah terminal baru, jalankan file websocket
dan kirimkan lagi pesan dari aplikasi chat. Seharusnya pesan-pesan baru juga akan muncul.Terakhir, kita akan mengirim pesan langsung ke user.
Dimana websocket-nya?
Websocket sudah ada di dalam objek Pythonuwsgi
. Jadi, pasang dulu uWSGI
jika belum dilakukan.$ pip install uwsgi
Kita akan membuat beberapa modifikasi ke file websocket.py
"""Receive messages over from RabbitMQ and send them over the websocket."""
import sys
import pika
import uwsgi
def application(env, start_response):
"""Setup the Websocket Server and read messages off the queue."""
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
exchange = env['PATH_INFO'].replace('/', '')
channel.exchange_declare(
exchange=exchange, exchange_type='fanout'
)
# exclusive means the queue should be deleted once the connection is closed
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue # random queue name generated by RabbitMQ
channel.queue_bind(exchange=exchange, queue=queue_name)
uwsgi.websocket_handshake(
env['HTTP_SEC_WEBSOCKET_KEY'],
env.get('HTTP_ORIGIN', '')
)
def keepalive():
"""Keep the websocket connection alive (called every 30 seconds)."""
print('PING/PONG...')
try:
uwsgi.websocket_recv_nb()
connection.add_timeout(30, keepalive)
except OSError as error:
connection.close()
print(error)
sys.exit(1) # Kill process and force uWSGI to Respawn
keepalive()
while True:
for method_frame, _, body in channel.consume(queue_name):
try:
uwsgi.websocket_send(body)
except OSError as error:
print(error)
sys.exit(1) # Force uWSGI to Respawn
else:
# acknowledge the message
channel.basic_ack(method_frame.delivery_tag)
API websocket uwsgi
cukup sederhana. Kita hanya akan menggunakan tiga method:- uwsgi.websocket_handshake: Menjadi jembatan antara HTTP dengan protokol WS. Method ini membangun koneksi antara klien dengan server, jika gagal maka akan ada exception.
- uwsgi.websocket_recv_nb: Meski nama lengkap method-nya adalah
websocket receive non blocking
ia sebetulnya tidak hanya menerima pesan dalam bentuk non-blocking tapi juga membantu menjaga koneksi dengan klien dengan mengirimkanpong
ke browser (sebuah mekanisme untuk memeriksa apakah klien masih aktif). Untuk menjaga koneksi, method ini akan dipanggil setiap 30 detik, jika tidak koneksi klien dapat terputus. - uwsgi.websocket_send: Fungsi method ini sudah cukup jelas dari namanya. Alasan kita memberikan error handler karena saat koneksi ditutup dan kita mencoba mengirim pesan dengan
uwsgi.websocket_send
, akan munculOSError
. Lalu kita menutup koneksi ke RabbitMQ dan mematikan process-nya.uWSGI
akan melakukan proses restart untuk kita. Karena terjadi error, maka isi blokelse
tidak akan dipanggil sehingga pesan tetap ada diqueue
. Jika tidak terjadi error, blokelse
akan dipanggil yang artinya pesan sudah dikirim dan akan dihapus dariqueue
.
loop
berikutnya dan memanggil channel.consume
, ia akan mengirim pesan yang masih unacknowledged ditambah pesan-pesan baru di queue
. Artinya, kita tidak akan kehilangan pesan yang gagal dikirim karena masalah koneksi.Apakah pembaca memperhatiakn bahwa
uri
exchange tidak lagi di hardcoded? Kita mengambil namanya dari koneksi URL yang akan diakses oleh klien seperti ini:http://websocket-server/<uri>
Jangan risau jika belum jelas. Nanti saat kita menghubungi frontend Vue ke server WebSocket, hal-hal yang sudah kita lakukan akan mulai terlihat jelas.
Menghubungi WebSocket dengan JavaScript
Dalam konteks aplikasi web, tidak mungkin menghubungi WebSocket tanpa JavaScript. Sebagian besar browser sudah memiliki dukungan WebSocket sehingga tidak ada aplikasi tambahan yang harus kita pasang.Mari perbarui komponen
Chat
:<script>
const $ = window.jQuery
export default {
...
created () {
this.username = sessionStorage.getItem('username')
// Setup headers for all requests
$.ajaxSetup({
headers: {
'Authorization': `Token ${sessionStorage.getItem('authToken')}`
}
})
if (this.$route.params.uri) {
this.joinChatSession()
}
this.connectToWebSocket()
},
methods: {
...
postMessage (event) {
const data = {message: this.message}
$.post(`http://localhost:8000/api/chats/${this.$route.params.uri}/messages/`, data, (data) => {
this.message = '' // clear the message after sending
})
.fail((response) => {
alert(response.responseText)
})
},
joinChatSession () {
...
},
fetchChatSessionHistory () {
...
},
connectToWebSocket () {
const websocket = new WebSocket(`ws://localhost:8081/${this.$route.params.uri}`)
websocket.onopen = this.onOpen
websocket.onclose = this.onClose
websocket.onmessage = this.onMessage
websocket.onerror = this.onError
},
onOpen (event) {
console.log('Connection opened.', event.data)
},
onClose (event) {
console.log('Connection closed.', event.data)
// Try and Reconnect after five seconds
setTimeout(this.connectToWebSocket, 5000)
},
onMessage (event) {
const message = JSON.parse(event.data)
this.messages.push(message)
},
onError (event) {
alert('An error occured:', event.data)
}
}
}
</script>
Lalu mulai Server WebSocket
uWSGI
di port 8081 dan refresh browser.$ uwsgi --http :8081 --module websocket --master --processes 4
Hore!Seharusnya sekarang kita sudah bisa mengirim pesan dan melihat hasilnya secara realtime.
Masih ada satu masalah. Buka tiga tab baru (5 klien aktif). Klien terakhir tidak akan bisa terhubung karena 4 proses yang sudah kita atur telah dipakai klien lain.
CATATAN: Itulah alasan kenapa kita perlu mematikan proses yang macet dengan sys.exit(1). Proses yang macet itu belum tentu karena sistem atau koneksi, bisa saja karena user sengaja meninggalkan chat room sehingga membuat uWSGI harus menunggu beberapa saat sebelum menutup koneksi di server.
Opsi –-master akan memanggil master process yang akan memonitor proses-proses yang macet. Tanpanya, proses yang macet akan terus ada dan tidak pernah direstart sampai uWSGI dimatikan.
Asynchronous IO dan Concurrency
Mudahnya ide mengapa kita membutuhkanAsyncIO
atau cukup async
saja adalah saat kita memiliki beberapa IO bound tasks
yang perlu dijalankan. Saat operasi IO (dalam kasus kita, mengirim dan
menerima pesan), daripada menunggu pesan baru, suatu proses bisa
berganti IO bound task dan menjalankannya.Konsep sederhana ini yang membuat
NodeJS
unggul. Untuk Python dan uWSGI
agak berbeda karena memang tidak didesain sebagai asynchronous. Ada
beberapa pustaka async untuk Python. Pustaka resmi asyncio, gevent,
curio dll. uWSGI
sendiri mendukung beberapa pustaka tadi, tapi kita akan memakai gevent
.Dari pengalaman, penulis menemukaan bahwa
gevent
bekerja lebih baik dibanding asyncio
dan uGreen
. Gevent
juga memiliki banyak method yang berguna. Milsanya monkey.patch_all
yang akan mengganti sebagian besar pustaka standar dengan pustakanya gevent
sehingga bisa menulis kode synchronous yang akan dieksekusi secara asynchronous.Pasang gevent dengan pip:
$ pip install gevent
Jalankan Server WebSocket uWSGI
:$ uwsgi --http :8081 --gevent 2 --module websocket --gevent-monkey-patch --master
Kita
memulai server dengan 2 thread gevent dan sebuah process. Itu artinya
kita dapat meng-handle dua klien (naik turun antara 3 sampai 4 klien
secara random, tapi 2 klien bisa menerima pesan secara pasti), saat ada
lebih dari jumlah klien yang mampu dilayani kita akan mendapatkan pesan
di uWSGI
:async queue is full !!!
Ada dua cara untuk menyelesaikan permasalahan ini. Cara yang paling mudah ialah dengan menaikkan jumlah thread gevent. Jika kita mengganti kode
uWSGI
seperti berikut ini, maka kita dapat meng-handle 100 user bersamaan.$ uwsgi --http :8081 --gevent 100 --module websocket --gevent-monkey-patch --master
Lalu bagaimana jika ingin ada beberapa proses?
Kita juga bisa menaikkan jumlah proses lebih dari satu, misalnya di perintah berikut ini kita akan memulai server
uWSGI
dengan 4 proses.$ uwsgi --http :8081 --gevent 100 --module websocket --gevent-monkey-patch --master --processes 4
Dengan menaikkan jumlah proses, kita melipatgandakan jumlah klien yang dapat dilayani sebesar 4 * 100!
Bergantung pada spesifikasi server dan konfigurasinya, kita bisa menaikkan jumlah proses dan thread gevent sesuka hati. Tapi sebelumnya, pastikan sudah memonitor performa aplikasi karena semakin besar angka yang kita tulis semakin besar pula sumber daya yang akan dipakai yang artinya performa bisa menurun.
uWSGI
memiliki paket python bernama uwsgitop
yang bisa dipakai untuk melakukan monitoring (seperti aplikasi top
kalau di Linux).Scaling ke beberapa server
Suatu saat kita bisa memakai sumber daya server secara maksimal dan perlu melakukan scaling ke beberapa server. Karena server websocket kita tidak terikat dengan aplikasi django, maka proses ini bisa kita lakukan dengan lebih mudah karena bisa melakukan load balance beberapa server (setiap server menjalankan prosesuWSGI
dan thread gevent) dibelakang Nginx.Dengan ini, kita dapat meng-handle ribuan koneksi secara bersamaan.
Kita juga bisa menerapkan teknik clustering dan loadbalancing ke RabbitMQ jika perlu melakukan scale out. Dokumentasinya ada di https://www.rabbitmq.com/ha.html
Semoga seri tutorial ini bermaanfaat dan membantu pembaca dalam belajar.
Sumber : Membuat Aplikasi Web Realtime dengan Django, RabbitMQ dan Vue.js Bagian 5: uWSGI WebSockets
No comments:
Post a Comment