# -*- coding: utf-8 -*- from bs4 import BeautifulSoup from containerLink import call_plugin_html, call_plugin_multihtml, PluginException from containerLink import plugin_reqs from containerLink import get_plugin_tim_url from collections import OrderedDict import yaml from yaml import CLoader import yaml.parser import yaml.scanner from htmlSanitize import sanitize_html import json import html import re def correct_yaml(text): """ Inserts missing spaces after : Like width:20 => width: 20 Also gives an other way to write multiline attributes, by starting the multiline like: program: |!! (!! could be any number and any non a-z,A-Z chars and ending it by !! in first column :param text: text to convert proper yaml :return: text that is proper yaml """ lines = text.splitlines() s = "" p = re.compile("^[^ :]*:[^ ]") # kissa:istuu pm = re.compile("^[^ :]*:[ ]+\|[^ a-zA-Z]+$") # program: ||| or program: |!!! multiline = False for line in lines: if p.match(line) and not multiline: line = line.replace(':', ': ', 1) if pm.match(line): multiline = True n = 0 line, end_str = line.split("|", 1) s = s + line + "|\n" continue if multiline: if line == end_str: multiline = False continue line = " " + line s = s + line + "\n" return s def parse_yaml(text): values = {} if len(text) == 0: return False try: text = correct_yaml(text) values = yaml.load(text, Loader=CLoader) except yaml.parser.ParserError as e: return str(e) except yaml.scanner.ScannerError as e: return str(e) try: if type(values) is str: return values else: return values except KeyError: return "Missing identifier" return "Unknown error" def parse_plugin_values(nodes): plugins = [] for node in nodes: values = {} name = node['plugin'] if not len(node.text): continue try: values = parse_yaml(node.text) if type(values) is str: plugins.append({"plugin": name, 'error': "YAML is malformed: " + values}) else: plugins.append({"plugin": name, "markup": values, "taskId": node['id']}) except Exception as e: plugins.append({"plugin": name, 'error': "Unknown error: " + str(e)}) return plugins ''' def parse_plugin_values(nodes): plugins = [] for node in nodes: values = {} name = node['plugin'] if len(node.text) > 0: try: values = yaml.load(node.text, Loader=CLoader) except (yaml.parser.ParserError, yaml.scanner.ScannerError): plugins.append({"plugin": name, 'error': "YAML is malformed"}) try: if type(values) is str: plugins.append({"plugin": name, 'error': "YAML is malformed"}) else: plugins.append({"plugin": name, "markup": values, "taskId": node['id']}) except KeyError: plugins.append({"plugin": name, 'error': "Missing identifier"}) return plugins ''' def find_plugins(html_str): if not html_str.startswith('
0:
try:
values = parse_yaml(node.text) # yaml.load(node.text, Loader=CLoader)
if type(values) is str:
print("Malformed yaml string ", values)
return "YAMLERROR: Malformed string"
except Exception as e:
print("Malformed yaml string ", str(e))
return "YAMLERROR: Malformed string"
return values
'''
def get_block_yaml(block):
tree = BeautifulSoup(block)
values = None
for node in tree.find_all('pre'):
if len(node.text) > 0:
try:
values = yaml.load(node.text, Loader=CLoader)
except (yaml.parser.ParserError, yaml.scanner.ScannerError):
print("Malformed yaml string")
return "YAMLERROR: Malformed string"
return values
'''
def get_error_html(plugin_name, message):
return 'Plugin {} error: {}'.format(plugin_name, message)
def find_task_ids(blocks, doc_id):
task_ids = []
final_html_blocks = []
plugins = {}
for idx, block in enumerate(blocks):
found_plugins = find_plugins(block)
if len(found_plugins) > 0:
plugin_info = parse_plugin_values(found_plugins)
error_messages = []
for vals in plugin_info:
plugin_name = vals['plugin']
if 'error' in vals:
error_messages.append(get_error_html(plugin_name, vals['error']))
continue
if not plugin_name in plugins:
plugins[plugin_name] = OrderedDict()
task_ids.append("{}.{}".format(doc_id, vals['taskId']))
return task_ids
def pluginify(blocks, user, answer_db, doc_id, user_id, custom_state=None):
""" "Pluginifies" or sanitizes the specified HTML blocks by inspecting each block
for plugin markers, calling the corresponding plugin route if such is found. Sanitizes HTML
for each non-plugin block.
:param blocks: A list of HTML blocks to be processed.
:param user: The current user's username.
:param answer_db: A reference to the answer database.
:param doc_id: The document id.
:param user_id: The user id.
:param custom_state: Optional state that will used as the state for the plugin instead of answer database.
If this parameter is specified, the expression len(blocks) MUST be 1.
:return: Processed HTML blocks along with JavaScript, CSS stylesheet and AngularJS module dependencies.
"""
if custom_state is not None:
if len(blocks) != 1:
raise PluginException('len(blocks) must be 1 if custom state is specified')
final_html_blocks = []
plugins = {}
for idx, block in enumerate(blocks):
found_plugins = find_plugins(block)
if len(found_plugins) > 0:
assert len(found_plugins) == 1
plugin_info = parse_plugin_values(found_plugins)
vals = plugin_info[0]
plugin_name = vals['plugin']
if 'error' in vals:
final_html_blocks.append({'html': ''
'Error(s) occurred while rendering plugin.'
''
+ get_error_html(plugin_name, vals['error'])
})
continue
if plugin_name not in plugins:
plugins[plugin_name] = OrderedDict()
vals['markup']["user_id"] = user
task_id = "{}.{}".format(doc_id, vals['taskId'])
states = answer_db.getAnswers(user_id, task_id)
if custom_state is not None:
state = custom_state
else:
# Don't show state for anonymous users.
state = None if user_id == 0 or len(states) == 0 else states[0]['content']
try:
if state is not None:
state = json.loads(state)
except ValueError:
pass
plugins[plugin_name][idx] = {"markup": vals['markup'], "state": state, "taskID": task_id}
final_html_blocks.append({'html': '', # This will be filled later
'task_id': task_id})
else:
final_html_blocks.append({'html': sanitize_html(block)})
js_paths = []
css_paths = []
modules = []
for plugin_name, plugin_block_map in plugins.items():
try:
resp = plugin_reqs(plugin_name)
except PluginException as e:
for idx in plugin_block_map.keys():
final_html_blocks[idx]['html'] = get_error_html(plugin_name, str(e))
continue
try:
reqs = json.loads(resp)
except ValueError:
for idx in plugin_block_map.keys():
final_html_blocks[idx]['html'] = get_error_html(plugin_name,
'Failed to parse JSON from plugin reqs route.')
continue
plugin_js_files, plugin_css_files, plugin_modules = plugin_deps(reqs)
for src in plugin_js_files:
if src.startswith("http") or src.startswith("/"):
js_paths.append(src)
else:
path = get_plugin_tim_url(plugin_name) + "/" + src
js_paths.append(path)
for src in plugin_css_files:
if src.startswith("http") or src.startswith("/"):
css_paths.append(src)
else:
path = get_plugin_tim_url(plugin_name) + "/" + src
css_paths.append(path)
for mod in plugin_modules:
modules.append(mod)
# Remove duplicates, preserving order
js_paths = list(OrderedDict.fromkeys(js_paths))
css_paths = list(OrderedDict.fromkeys(css_paths))
modules = list(OrderedDict.fromkeys(modules))
plugin_url = get_plugin_tim_url(plugin_name)
if 'multihtml' in reqs and reqs['multihtml']:
try:
response = call_plugin_multihtml(plugin_name, json.dumps([val for _, val in plugin_block_map.items()]))
except PluginException as e:
for idx in plugin_block_map.keys():
final_html_blocks[idx]['html'] = get_error_html(plugin_name, str(e))
continue
try:
plugin_htmls = json.loads(response)
except ValueError:
for idx in plugin_block_map.keys():
final_html_blocks[idx]['html'] = get_error_html(plugin_name,
'Failed to parse plugin response from reqs route.')
continue
for idx, markup, html in zip(plugin_block_map.keys(), plugin_block_map.values(), plugin_htmls):
final_html_blocks[idx]['html'] = "{}".format(markup['taskID'],
plugin_url,
html)
else:
for idx, val in plugin_block_map.items():
try:
html = call_plugin_html(plugin_name, val['markup'], val['state'], val['taskID'])
except PluginException as e:
final_html_blocks[idx]['html'] = get_error_html(plugin_name, str(e))
continue
final_html_blocks[idx]['html'] = "{}".format(val['taskID'],
plugin_url,
html)
return final_html_blocks, js_paths, css_paths, modules
def make_browse_buttons(user_id, task_id, answer_db):
states = answer_db.getAnswers(user_id, task_id)
if len(states) > 1:
formatted = ""
content_obj = json.loads(states[len(states)-1]["content"])
if isinstance(content_obj, dict):
for key, val in content_obj.items():
formatted += key + "\n---------------\n" + str(val) + "\n\n"
elif isinstance(content_obj, list):
for v in content_obj:
formatted += "List element:" + "\n---------------\n" + str(v) + "\n\n"
else:
formatted = str(content_obj)
first = "
First answer:
{}".format(html.escape(formatted))
else:
first = ""
return """
""".format(len(states), len(states), first)
# p is json of plugin requirements of the form:
# {"js": ["js.js"], "css":["css.css"], "angularModule":["module"]}
def plugin_deps(p):
js_files = []
modules = []
css_files = []
if "css" in p:
for cssF in p['css']:
css_files.append(cssF)
if "js" in p:
for jsF in p['js']:
js_files.append(jsF)
if "angularModule" in p:
for ng in p['angularModule']:
modules.append(ng)
return js_files, css_files, modules