258 lines
7.9 KiB
Python

"""
##### PiHole
Display information from the PiHole API
```ini
[variable_name]
platform = pihole
host = 192.168.1.101
password = {{ PiHole password }}
value_template = {{ value_template }}
```
> **Returns:** `value_template` as rendered string
| Variable | Required | Description | Options |
|-----------------|----------|-----------------------------------------------------------------|-------------------|
| [variable_name] | Yes | Name for the data source. | [variable_name] |
| platform | Yes | Name of the platform. | pihole |
| host | Yes | Host of the PiHole | host |
| password | Yes | Password for the PiHole | password |
| value_template | Yes | Jinja template for how the returned data from API is displayed. | jinja template |
<br />
###### **Available fields for value_template**
* domain_count
* queries
* blocked
* ads_percentage
* unique_domains
* forwarded
* cached
* total_clients
* unique_clients
* total_queries
* gravity_last_updated
> **Working example:**
>```ini
> [pihole-data]
> platform = pihole
> host = 192.168.1.101
> password = password123
> value_template = Ads Blocked Today: {{ blocked }}<br>Status: {{ status }}<br>Queries today: {{ queries }}
>
> [PiHole]
> prefix = http://
> url = 192.168.1.101
> icon = static/images/apps/pihole.png
> description = A black hole for Internet advertisements
> open_in = new_tab
> data_sources = pihole-data
>```
"""
from flask import render_template_string
import requests
import time
import hashlib
def inApiLink(ip, endpoint):
return "http://" + str(ip) + "/admin/scripts/pi-hole/php/" + str(endpoint) + ".php"
class Auth(object):
def __init__(self, password):
# PiHole's web token is just a double sha256 hash of the utf8 encoded password
self.token = hashlib.sha256(
hashlib.sha256(str(password).encode()).hexdigest().encode()
).hexdigest()
self.auth_timestamp = time.time()
class PiHole(object):
# Takes in an ip address of a pihole server
def __init__(self, ip_address):
self.ip_address = ip_address
self.auth_data = None
self.refresh()
self.pw = None
def refresh(self):
rawdata = requests.get(
"http://" + self.ip_address + "/admin/api.php?summary"
).json()
if self.auth_data != None:
topdevicedata = requests.get(
"http://"
+ self.ip_address
+ "/admin/api.php?getQuerySources=25&auth="
+ self.auth_data.token
).json()
self.top_devices = topdevicedata["top_sources"]
self.forward_destinations = requests.get(
"http://"
+ self.ip_address
+ "/admin/api.php?getForwardDestinations&auth="
+ self.auth_data.token
).json()
self.query_types = requests.get(
"http://"
+ self.ip_address
+ "/admin/api.php?getQueryTypes&auth="
+ self.auth_data.token
).json()["querytypes"]
# Data that is returned is now parsed into vars
self.status = rawdata["status"]
self.domain_count = rawdata["domains_being_blocked"]
self.queries = rawdata["dns_queries_today"]
self.blocked = rawdata["ads_blocked_today"]
self.ads_percentage = rawdata["ads_percentage_today"]
self.unique_domains = rawdata["unique_domains"]
self.forwarded = rawdata["queries_forwarded"]
self.cached = rawdata["queries_cached"]
self.total_clients = rawdata["clients_ever_seen"]
self.unique_clients = rawdata["unique_clients"]
self.total_queries = rawdata["dns_queries_all_types"]
self.gravity_last_updated = rawdata["gravity_last_updated"]
def refreshTop(self, count):
if self.auth_data == None:
print("Unable to fetch top items. Please authenticate.")
exit(1)
rawdata = requests.get(
"http://"
+ self.ip_address
+ "/admin/api.php?topItems="
+ str(count)
+ "&auth="
+ self.auth_data.token
).json()
self.top_queries = rawdata["top_queries"]
self.top_ads = rawdata["top_ads"]
def getGraphData(self):
rawdata = requests.get(
"http://" + self.ip_address + "/admin/api.php?overTimeData10mins"
).json()
return {
"domains": rawdata["domains_over_time"],
"ads": rawdata["ads_over_time"],
}
def authenticate(self, password):
self.auth_data = Auth(password)
self.pw = password
# print(self.auth_data.token)
def getAllQueries(self):
if self.auth_data == None:
print("Unable to get queries. Please authenticate")
exit(1)
return requests.get(
"http://"
+ self.ip_address
+ "/admin/api.php?getAllQueries&auth="
+ self.auth_data.token
).json()["data"]
def enable(self):
if self.auth_data == None:
print("Unable to enable pihole. Please authenticate")
exit(1)
requests.get(
"http://"
+ self.ip_address
+ "/admin/api.php?enable&auth="
+ self.auth_data.token
)
def disable(self, seconds):
if self.auth_data == None:
print("Unable to disable pihole. Please authenticate")
exit(1)
requests.get(
"http://"
+ self.ip_address
+ "/admin/api.php?disable="
+ str(seconds)
+ "&auth="
+ self.auth_data.token
)
def getVersion(self):
return requests.get(
"http://" + self.ip_address + "/admin/api.php?versions"
).json()
def getDBfilesize(self):
if self.auth_data == None:
print("Please authenticate")
exit(1)
return float(
requests.get(
"http://"
+ self.ip_address
+ "/admin/api_db.php?getDBfilesize&auth="
+ self.auth_data.token
).json()["filesize"]
)
def getList(self, list):
return requests.get(
inApiLink(self.ip_address, "get") + "?list=" + str(list)
).json()
def add(self, list, domain):
if self.auth_data == None:
print("Please authenticate")
exit(1)
with requests.session() as s:
s.get(
"http://" + str(self.ip_address) + "/admin/scripts/pi-hole/php/add.php"
)
requests.post(
"http://" + str(self.ip_address) + "/admin/scripts/pi-hole/php/add.php",
data={"list": list, "domain": domain, "pw": self.pw},
).text
def sub(self, list, domain):
if self.auth_data == None:
print("Please authenticate")
exit(1)
with requests.session() as s:
s.get(
"http://" + str(self.ip_address) + "/admin/scripts/pi-hole/php/sub.php"
)
requests.post(
"http://" + str(self.ip_address) + "/admin/scripts/pi-hole/php/sub.php",
data={"list": list, "domain": domain, "pw": self.pw},
).text
class Platform:
def __init__(self, *args, **kwargs):
# parse the user's options from the config entries
for key, value in kwargs.items():
self.__dict__[key] = value
self.pihole = PiHole(self.host)
def process(self):
self.pihole.refresh()
value_template = render_template_string(
self.value_template, **self.pihole.__dict__
)
return value_template