forked from GithubBackups/healthchecks
Handle concurrent sendalerts using QuerySet.update(). Fixes #39
This commit is contained in:
parent
c90627c6f4
commit
b5a0ff3538
@ -1,70 +1,68 @@
|
|||||||
import logging
|
|
||||||
import time
|
import time
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
from django.db import connection
|
from django.db import connection
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from hc.api.models import Check
|
from hc.api.models import Check
|
||||||
|
|
||||||
executor = ThreadPoolExecutor(max_workers=10)
|
|
||||||
logger = logging.getLogger(__name__)
|
def notify(check_id, stdout):
|
||||||
|
check = Check.objects.get(id=check_id)
|
||||||
|
|
||||||
|
tmpl = "\nSending alert, status=%s, code=%s\n"
|
||||||
|
stdout.write(tmpl % (check.status, check.code))
|
||||||
|
errors = check.send_alert()
|
||||||
|
for ch, error in errors:
|
||||||
|
stdout.write("ERROR: %s %s %s\n" % (ch.kind, ch.value, error))
|
||||||
|
|
||||||
|
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
help = 'Sends UP/DOWN email alerts'
|
help = 'Sends UP/DOWN email alerts'
|
||||||
|
owned = Check.objects.filter(user__isnull=False)
|
||||||
|
|
||||||
def handle_many(self):
|
def handle_one(self):
|
||||||
""" Send alerts for many checks simultaneously. """
|
""" Process a single check. """
|
||||||
query = Check.objects.filter(user__isnull=False).select_related("user")
|
|
||||||
|
|
||||||
now = timezone.now()
|
now = timezone.now()
|
||||||
going_down = query.filter(alert_after__lt=now, status="up")
|
|
||||||
going_up = query.filter(alert_after__gt=now, status="down")
|
# Look for checks that are going down
|
||||||
# Don't combine this in one query so Postgres can query using index:
|
flipped = "down"
|
||||||
checks = list(going_down.iterator()) + list(going_up.iterator())
|
q = self.owned.filter(alert_after__lt=now, status="up")
|
||||||
if not checks:
|
check = q.first()
|
||||||
|
|
||||||
|
if not check:
|
||||||
|
# If none found, look for checks that are going up
|
||||||
|
flipped = "up"
|
||||||
|
q = self.owned.filter(alert_after__gt=now, status="down")
|
||||||
|
check = q.first()
|
||||||
|
|
||||||
|
if check:
|
||||||
|
# Atomically update status to the opposite
|
||||||
|
q = Check.objects.filter(id=check.id, status=check.status)
|
||||||
|
num_updated = q.update(status=flipped)
|
||||||
|
if num_updated == 1:
|
||||||
|
# Send notifications only if status update succeeded
|
||||||
|
# (no other sendalerts process got there first)
|
||||||
|
t = Thread(target=notify, args=(check.id, self.stdout))
|
||||||
|
t.start()
|
||||||
|
return True
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
futures = [executor.submit(self.handle_one, check) for check in checks]
|
|
||||||
for future in futures:
|
|
||||||
future.result()
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def handle_one(self, check):
|
|
||||||
""" Send an alert for a single check.
|
|
||||||
|
|
||||||
Return True if an appropriate check was selected and processed.
|
|
||||||
Return False if no checks need to be processed.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Save the new status. If sendalerts crashes,
|
|
||||||
# it won't process this check again.
|
|
||||||
check.status = check.get_status()
|
|
||||||
check.save()
|
|
||||||
|
|
||||||
tmpl = "\nSending alert, status=%s, code=%s\n"
|
|
||||||
self.stdout.write(tmpl % (check.status, check.code))
|
|
||||||
errors = check.send_alert()
|
|
||||||
for ch, error in errors:
|
|
||||||
self.stdout.write("ERROR: %s %s %s\n" % (ch.kind, ch.value, error))
|
|
||||||
|
|
||||||
connection.close()
|
|
||||||
return True
|
|
||||||
|
|
||||||
def handle(self, *args, **options):
|
def handle(self, *args, **options):
|
||||||
self.stdout.write("sendalerts is now running")
|
self.stdout.write("sendalerts is now running")
|
||||||
|
|
||||||
ticks = 0
|
ticks = 0
|
||||||
while True:
|
while True:
|
||||||
if self.handle_many():
|
|
||||||
ticks = 1
|
|
||||||
else:
|
|
||||||
ticks += 1
|
|
||||||
|
|
||||||
time.sleep(1)
|
while self.handle_one():
|
||||||
|
ticks = 0
|
||||||
|
|
||||||
|
ticks += 1
|
||||||
|
time.sleep(2)
|
||||||
if ticks % 60 == 0:
|
if ticks % 60 == 0:
|
||||||
formatted = timezone.now().isoformat()
|
formatted = timezone.now().isoformat()
|
||||||
self.stdout.write("-- MARK %s --" % formatted)
|
self.stdout.write("-- MARK %s --" % formatted)
|
||||||
|
|
||||||
|
connection.close()
|
||||||
|
@ -4,31 +4,10 @@ from django.utils import timezone
|
|||||||
from hc.api.management.commands.sendalerts import Command
|
from hc.api.management.commands.sendalerts import Command
|
||||||
from hc.api.models import Check
|
from hc.api.models import Check
|
||||||
from hc.test import BaseTestCase
|
from hc.test import BaseTestCase
|
||||||
from mock import patch
|
|
||||||
|
|
||||||
|
|
||||||
class SendAlertsTestCase(BaseTestCase):
|
class SendAlertsTestCase(BaseTestCase):
|
||||||
|
|
||||||
@patch("hc.api.management.commands.sendalerts.Command.handle_one")
|
|
||||||
def test_it_handles_few(self, mock):
|
|
||||||
yesterday = timezone.now() - timedelta(days=1)
|
|
||||||
names = ["Check %d" % d for d in range(0, 10)]
|
|
||||||
|
|
||||||
for name in names:
|
|
||||||
check = Check(user=self.alice, name=name)
|
|
||||||
check.alert_after = yesterday
|
|
||||||
check.status = "up"
|
|
||||||
check.save()
|
|
||||||
|
|
||||||
result = Command().handle_many()
|
|
||||||
assert result, "handle_many should return True"
|
|
||||||
|
|
||||||
handled_names = []
|
|
||||||
for args, kwargs in mock.call_args_list:
|
|
||||||
handled_names.append(args[0].name)
|
|
||||||
|
|
||||||
assert set(names) == set(handled_names)
|
|
||||||
|
|
||||||
def test_it_handles_grace_period(self):
|
def test_it_handles_grace_period(self):
|
||||||
check = Check(user=self.alice, status="up")
|
check = Check(user=self.alice, status="up")
|
||||||
# 1 day 30 minutes after ping the check is in grace period:
|
# 1 day 30 minutes after ping the check is in grace period:
|
||||||
@ -36,4 +15,4 @@ class SendAlertsTestCase(BaseTestCase):
|
|||||||
check.save()
|
check.save()
|
||||||
|
|
||||||
# Expect no exceptions--
|
# Expect no exceptions--
|
||||||
Command().handle_one(check)
|
Command().handle_one()
|
||||||
|
@ -3,7 +3,6 @@ django-ses-backend==0.1.1
|
|||||||
Django==1.10
|
Django==1.10
|
||||||
django_compressor==2.1
|
django_compressor==2.1
|
||||||
djmail==0.11.0
|
djmail==0.11.0
|
||||||
futures==3.0.3
|
|
||||||
premailer==2.9.6
|
premailer==2.9.6
|
||||||
psycopg2==2.6.1
|
psycopg2==2.6.1
|
||||||
requests==2.9.1
|
requests==2.9.1
|
||||||
|
Loading…
x
Reference in New Issue
Block a user