This caused strings whith the '%' character in it to be interpreted by Python's ConfigParser. This behavior is not needed for DashMachine, the goal is to read exactly what the user types. Setting interpolation to None fixes this. See: https://docs.python.org/3/library/configparser.html?highlight=configparser#interpolation-of-values
255 lines
9.4 KiB
Python
255 lines
9.4 KiB
Python
import os
|
|
import json
|
|
from configparser import ConfigParser
|
|
from dashmachine.main.models import Apps, Groups, DataSources, DataSourcesArgs, Tags
|
|
from dashmachine.user_system.models import User
|
|
from dashmachine.user_system.utils import (
|
|
hash_and_cache_password,
|
|
get_cached_password,
|
|
clean_auth_cache,
|
|
)
|
|
from dashmachine.settings_system.models import Settings
|
|
from dashmachine.paths import user_data_folder
|
|
from dashmachine import db
|
|
|
|
|
|
def row2dict(row):
|
|
d = {}
|
|
for column in row.__table__.columns:
|
|
d[column.name] = str(getattr(row, column.name))
|
|
|
|
return d
|
|
|
|
|
|
def read_config():
|
|
config = ConfigParser(interpolation=None)
|
|
try:
|
|
config.read(os.path.join(user_data_folder, "config.ini"))
|
|
except Exception as e:
|
|
return {"msg": f"Invalid Config: {e}."}
|
|
|
|
ds_list = DataSources.query.all()
|
|
for ds in ds_list:
|
|
ds.apps.clear()
|
|
DataSources.query.delete()
|
|
DataSourcesArgs.query.delete()
|
|
Apps.query.delete()
|
|
Settings.query.delete()
|
|
Groups.query.delete()
|
|
Tags.query.delete()
|
|
User.query.delete()
|
|
|
|
for section in config.sections():
|
|
|
|
# Settings creation
|
|
if section == "Settings":
|
|
settings = Settings()
|
|
|
|
settings.theme = config["Settings"].get("theme", "light")
|
|
|
|
settings.accent = config["Settings"].get("accent", "orange")
|
|
|
|
settings.background = config["Settings"].get("background", "None")
|
|
|
|
if "roles" in config["Settings"]:
|
|
settings.roles = config["Settings"]["roles"]
|
|
if "admin" not in settings.roles:
|
|
settings.roles += ",admin"
|
|
if "user" not in settings.roles:
|
|
settings.roles += ",user"
|
|
if "public_user" not in settings.roles:
|
|
settings.roles += ",public_user"
|
|
else:
|
|
settings.roles = "admin,user,public_user"
|
|
|
|
settings.home_access_groups = config["Settings"].get(
|
|
"home_access_groups", "admin_only"
|
|
)
|
|
|
|
settings.settings_access_groups = config["Settings"].get(
|
|
"settings_access_groups", "admin_only"
|
|
)
|
|
|
|
settings.custom_app_title = config["Settings"].get(
|
|
"custom_app_title", "DashMachine"
|
|
)
|
|
|
|
settings.sidebar_default = config["Settings"].get("sidebar_default", "open")
|
|
|
|
settings.tags_expanded = config["Settings"].get("tags_expanded", "True")
|
|
|
|
db.session.add(settings)
|
|
db.session.commit()
|
|
|
|
# User creation
|
|
elif "role" in config[section]:
|
|
user = User()
|
|
user.username = section
|
|
user.role = config[section]["role"]
|
|
user.sidebar_default = config[section].get("sidebar_default", None)
|
|
user.theme = config[section].get("theme", None)
|
|
user.accent = config[section].get("accent", None)
|
|
user.tags_expanded = config[section].get("tags_expanded", None)
|
|
user.password = ""
|
|
if not User.query.filter_by(role="admin").first() and user.role != "admin":
|
|
print(
|
|
f"Invalid Config: admin user not specified, or not specified first. {user.username} role set to admin"
|
|
)
|
|
user.role = "admin"
|
|
config.set(section, "role", "admin")
|
|
config.write(open(os.path.join(user_data_folder, "config.ini"), "w"))
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
new_password = config[section].get("password", None)
|
|
if new_password:
|
|
if new_password == config[section].get("confirm_password", None):
|
|
password = hash_and_cache_password(new_password, user.id)
|
|
user.password = password
|
|
db.session.merge(user)
|
|
db.session.commit()
|
|
else:
|
|
password = get_cached_password(user.id)
|
|
if password == "error":
|
|
print(
|
|
f"Invalid Config: Password for {user.username} must be specified. Using 'admin' by default"
|
|
)
|
|
user.password = password
|
|
db.session.merge(user)
|
|
db.session.commit()
|
|
config.set(section, "password", "")
|
|
config.set(section, "confirm_password", "")
|
|
config.write(open(os.path.join(user_data_folder, "config.ini"), "w"))
|
|
|
|
# Groups creation
|
|
elif "roles" in config[section]:
|
|
group = Groups()
|
|
group.name = section
|
|
group.roles = config[section]["roles"]
|
|
db.session.add(group)
|
|
db.session.commit()
|
|
|
|
# Data source creation
|
|
elif "platform" in config[section]:
|
|
data_source = DataSources()
|
|
data_source.name = section
|
|
data_source.platform = config[section]["platform"]
|
|
db.session.add(data_source)
|
|
db.session.commit()
|
|
for key, value in config[section].items():
|
|
if key not in ["name", "platform"]:
|
|
arg = DataSourcesArgs()
|
|
arg.key = key
|
|
arg.value = value
|
|
arg.data_source = data_source
|
|
db.session.add(arg)
|
|
db.session.commit()
|
|
|
|
else:
|
|
# App creation
|
|
app = Apps()
|
|
app.name = section
|
|
app.type = config[section].get("type", "app")
|
|
|
|
app.prefix = config[section].get("prefix", None)
|
|
if app.type == "app" and not app.prefix:
|
|
return {"msg": f"Invalid Config: {section} does not contain prefix."}
|
|
|
|
app.url = config[section].get("url", None)
|
|
if app.type == "app" and not app.url:
|
|
return {"msg": f"Invalid Config: {section} does not contain url."}
|
|
|
|
app.icon = config[section].get("icon", None)
|
|
|
|
app.sidebar_icon = config[section].get("sidebar_icon", None)
|
|
|
|
app.description = config[section].get("description", None)
|
|
|
|
app.open_in = config[section].get("open_in", "this_tab")
|
|
|
|
app.urls = config[section].get("urls", None)
|
|
|
|
if "groups" in config[section]:
|
|
for group_name in config[section]["groups"].split(","):
|
|
if not Groups.query.filter_by(name=group_name.strip()).first():
|
|
return {
|
|
"msg": f"Invalid Config: {section} contains at group that is not defined."
|
|
}
|
|
app.groups = config[section]["groups"]
|
|
else:
|
|
app.groups = None
|
|
|
|
# Tags creation
|
|
if "tags" in config[section]:
|
|
app.tags = config[section]["tags"]
|
|
for tag in app.tags.split(","):
|
|
tag = tag.strip()
|
|
if not Tags.query.filter_by(name=tag).first():
|
|
tag_db = Tags(name=tag)
|
|
db.session.add(tag_db)
|
|
db.session.commit()
|
|
tag_db.sort_pos = tag_db.id
|
|
db.session.merge(tag_db)
|
|
db.session.commit()
|
|
else:
|
|
if Tags.query.first():
|
|
app.tags = "Untagged"
|
|
if not Tags.query.filter_by(name="Untagged").first():
|
|
tag_db = Tags(name="Untagged")
|
|
db.session.add(tag_db)
|
|
db.session.commit()
|
|
else:
|
|
app.tags = None
|
|
|
|
db.session.add(app)
|
|
db.session.commit()
|
|
|
|
if "data_sources" in config[section]:
|
|
for config_ds in config[section]["data_sources"].split(","):
|
|
db_ds = DataSources.query.filter_by(name=config_ds.strip()).first()
|
|
if db_ds:
|
|
app.data_sources.append(db_ds)
|
|
db.session.merge(app)
|
|
db.session.commit()
|
|
else:
|
|
return {
|
|
"msg": f"Invalid Config: {section} has a data_source variable that doesn't exist."
|
|
}
|
|
|
|
group = Groups.query.filter_by(name="admin_only").first()
|
|
if not group:
|
|
group = Groups()
|
|
group.name = "admin_only"
|
|
group.roles = "admin"
|
|
db.session.add(group)
|
|
db.session.commit()
|
|
|
|
tags_settings = config["Settings"].get("tags", None)
|
|
if tags_settings:
|
|
tags_settings = tags_settings.replace("},{", "}%,%{").split("%,%")
|
|
|
|
for tag_setting in tags_settings:
|
|
tag_json = json.loads(tag_setting)
|
|
tag = Tags.query.filter_by(name=tag_json.get("name", None)).first()
|
|
if tag:
|
|
icon = tag_json.get("icon", None)
|
|
if icon:
|
|
tag.icon = icon
|
|
sort_pos = tag_json.get("sort_pos", None)
|
|
if icon:
|
|
tag.sort_pos = sort_pos
|
|
db.session.merge(tag)
|
|
db.session.commit()
|
|
|
|
clean_auth_cache()
|
|
if not User.query.first():
|
|
user = User()
|
|
user.username = "admin"
|
|
user.role = "admin"
|
|
user.password = ""
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
user.password = hash_and_cache_password("admin", user.id)
|
|
db.session.merge(user)
|
|
db.session.commit()
|
|
return {"msg": "success", "settings": row2dict(settings)}
|