from . import data, analytics, strategy
from . import constants as c
from . import utils
import os, sys
from tabulate import tabulate
from operator import itemgetter
from tempfile import gettempdir
from collections import defaultdict
from itertools import product
[docs]class Commands:
cs = {}
hs = {}
[docs] @classmethod
def register(cls, help_text):
def decorator(func):
cls.cs[func.__name__] = func
cls.hs[func.__name__] = help_text
return func
return decorator
[docs]class CommandHandler:
def __init__(self, strategy_map):
# Access facilities.
self.__db = data.mongo.MongoDBManager()
self.__reader = data.retrieve.Retriever()
self._strategy_map = strategy_map
self._collections = self.__reader.list()
# Handler states.
self._current_procedure = []
self._results = {}
self.__result_counter = 0
self._imported_path = ""
[docs] def dispatch(self, command):
"""
Dispatch a user command or a sequence of commands to the correct methods.
:param str command: a top-level command matching the name of a handler method,
with parameter gathering handled by the handler method itself. This
can also be a sequence of commands separated by ";".
:returns: False if the command does not map to a handler method, True
if otherwise handler successfully executed.
"""
command = command.strip()
if command not in Commands.cs:
command_seq = [i.strip() for i in command.split(";")]
if all([i in Commands.cs for i in command_seq]):
for cmd in command_seq:
Commands.cs[cmd](self)
else:
return False
else:
Commands.cs[command](self)
return True
[docs] def set_procedure(self, procedure):
"""
Replace the current procedure. Used for parameterised execution.
:param procedure: a valid CovertMark procedure.
:returns: True if procedure set successfully, False otherwise.
"""
valid_procedure, _ = utils.validate_procedure(procedure, self._strategy_map)
if not valid_procedure:
return False
else:
self._current_procedure = procedure
return True
[docs] @Commands.register("Exit this program.")
def exit(self):
# Handled by the calling module.
return
[docs] @Commands.register("Display this help information.")
def help(self):
padding = len(max(Commands.cs, key=len)) + 1
for command in Commands.cs:
print(command.ljust(padding) + ": " + Commands.hs[command])
print("")
[docs] @Commands.register("List all available traces.")
def traces(self):
traces = self.__reader.list()
traces_tabulate, _ = utils.list_traces(traces)
print(traces_tabulate)
[docs] @Commands.register("Select and delete traces.")
def delete(self):
traces = self.__reader.list()
traces_tabulate, names = utils.list_traces(traces)
print("The following traces are currently stored in MongoDB, which were imported during strategy runs.")
print("Deleting them will make procedures files referencing MongoDB collections only unusuable.")
print(traces_tabulate)
while True:
deletes = input("Enter a Trace ID or a list of IDs (separated by ',') for deletion, or `end` to finish: ").strip()
if deletes == "end":
break
deletes = [i.strip() for i in deletes.split(',')]
if all([i.isdigit() for i in deletes]):
deletes = [int(i) for i in deletes]
else:
print("Contains invalid IDs, not deleting.")
continue
if all([i in names for i in deletes]):
confirm = input("Are you sure to delete {} trace collections? [y/N]:".format(len(deletes)))
if confirm.lower() == 'y':
for i in deletes:
self.__db.delete_collection(names[i])
print("Deletion successful.")
else:
print("Deletion cancelled.")
break
[docs] @Commands.register("Load and execute an existing benchmark procedure in json.")
def load(self):
path = input("Enter the path to the procedure json: ").strip()
procedure = utils.import_procedure(path, self._strategy_map)
if not procedure:
print(path + " does not seem to exist or is invalid.")
else:
self._current_procedure = procedure
print("Procedure has been successfully loaded, enter `execute` to run it.")
self._imported_path = path
[docs] @Commands.register("Program a new benchmark procedure to replace the current procedure.")
def new(self):
previous_inputs = defaultdict(str)
# Time saving when setting up multiple strategies for a new pcap.
print("Programming a new benchmark procedure.")
procedure = []
while True:
print("List of available strategy runs: ")
runs, indices = utils.get_strategy_runs(self._strategy_map)
print(runs)
while True:
next_run = input("Enter a Run ID to configure a run, or enter `end` to finish: ").strip()
if next_run == "end":
break
try:
next_run = int(next_run)
if 0 <= next_run <= len(indices):
break
except:
print("Invalid Run ID.")
if next_run == "end":
break
# Strategy map should have already been validated during read.
strat = self._strategy_map[indices[next_run][0]]
run_info = [i for i in strat["runs"] if i["run_order"] == indices[next_run][1]][0]
use_negative = strat["negative_input"]
print("Adding {} on {}.".format(strat["object"], run_info["run_description"]))
run = {'strategy': indices[next_run][0], 'run_order': indices[next_run][1]}
# Get filter information.
pt_filter_types = strat["pt_filters"]
if run_info["pt_filters_reverse"]:
search_pt_filter_types = [strategy.constants.FILTERS_REVERSE_MAP[i] for i in pt_filter_types]
else:
search_pt_filter_types = pt_filter_types
neg_filter_types = strat["negative_filters"]
if run_info["negative_filters_reverse"]:
search_neg_filter_types = [strategy.constants.FILTERS_REVERSE_MAP[i] for i in neg_filter_types]
else:
search_neg_filter_types = neg_filter_types
# See if we have matching traces already in the database.
print()
print(c.colours.BGC + c.colours.RED + "Configuring positive (PT) traffic for this run." + c.colours.ENDC)
pts_in_db = self.__reader.list(in_string=False, match_filters=search_pt_filter_types)
trace_id = ""
if len(pts_in_db) > 0:
pts_in_db, collections = utils.list_traces(pts_in_db)
print("The following traces already in MongoDB can be used with this strategy run: ")
print(pts_in_db)
print(c.colours.BGC + c.colours.RED + "Check directions very carefully, as the traces listed may have been imported for the opposite direction to this run, thus inappropriate to use." + c.colours.ENDC)
print("If you choose to reuse an existing trace collection here, the procedure will not be portable between computers.")
trace_id = input("Enter a trace ID to select and reuse as PT traffic, or enter nothing to skip and use a new PCAP: ").strip()
try:
trace_id = int(trace_id)
except:
trace_id = ""
# Need to configure a new PCAP and its input filters.
run['pt_filters'] = []
run['pt_collection'] = ""
run['pt_pcap'] = ""
if not isinstance(trace_id, int) or trace_id not in collections:
# Set PCAP.
while True:
pt_pcap = input("Enter the path to the PCAP file containing PT traffic [" + previous_inputs["pt_pcap"] +"]: ").strip()
if pt_pcap == "":
pt_pcap = previous_inputs["pt_pcap"]
else:
previous_inputs["pt_pcap"] = pt_pcap
if data.utils.check_file_exists(os.path.expanduser(pt_pcap)):
run["pt_pcap"] = pt_pcap
break
else:
print("The PCAP path entered does not seem to exist.")
# Set input filters.
print("Collecting PT IP filters for importing the PCAP, please enter IPv4/IPv6 addresses or subnets separated by ',':")
if data.constants.IP_SRC in pt_filter_types:
while True:
clients = input("Who are the PT clients [" + previous_inputs["pt_clients"] +"]: ").strip()
if clients == "":
clients = previous_inputs["pt_clients"].split(",")
else:
previous_inputs["pt_clients"] = clients
clients = clients.split(",")
clients = [i.strip() for i in clients]
if all([data.utils.build_subnet(i) for i in clients]):
for i in clients:
run['pt_filters'].append([i, data.constants.IP_SRC])
break
else:
print("Some addresses or subnets entered are invalid.")
if data.constants.IP_EITHER in pt_filter_types:
while True:
clients = input("Observe traffic in both directions passing through [" + previous_inputs["pt_either"] + "]: ").strip()
if clients == "":
clients = previous_inputs["pt_either"].split(",")
else:
previous_inputs["pt_either"] = clients
clients = clients.split(",")
clients = [i.strip() for i in clients]
if all([data.utils.build_subnet(i) for i in clients]):
for i in clients:
run['pt_filters'].append([i, data.constants.IP_EITHER])
break
else:
print("Some addresses or subnets entered are invalid.")
if data.constants.IP_DST in pt_filter_types:
while True:
servers = input("Who are the PT bridge servers [" + previous_inputs["pt_servers"] + "]: ").strip()
if servers == "":
servers = previous_inputs["pt_servers"].split(",")
else:
previous_inputs["pt_servers"] = servers
servers = servers.split(",")
servers = [i.strip() for i in servers]
if all([data.utils.build_subnet(i) for i in servers]):
for i in servers:
run['pt_filters'].append([i, data.constants.IP_DST])
break
else:
print("Some addresses or subnets entered are invalid.")
# We can use an existing collection instead.
else:
run['pt_collection'] = collections[trace_id]
# Now do the same for negative traffic.
print()
print(c.colours.BGC + c.colours.GREEN + "Configuring negative (innocent) validation traffic for this run." + c.colours.ENDC)
negs_in_db = self.__reader.list(in_string=False, match_filters=search_neg_filter_types)
trace_id = ""
if use_negative and len(negs_in_db) > 0:
negs_in_db, collections = utils.list_traces(negs_in_db)
print("The following traces already in MongoDB can be used with this strategy run: ")
print(negs_in_db)
print(c.colours.BGC + c.colours.GREEN + "Check directions very carefully, as the traces listed may have been imported for the opposite direction to this run, thus inappropriate to use." + c.colours.ENDC)
print("If you choose to reuse an existing trace collection here, the procedure will not be portable between computers.")
trace_id = input("Enter a trace ID to select and reuse as negative traffic, or enter nothing to skip and use a new PCAP: ").strip()
try:
trace_id = int(trace_id)
except:
trace_id = ""
# Need to configure a new PCAP and its input filters.
run['neg_filters'] = []
run['neg_collection'] = ""
run["neg_pcap"] = ""
if use_negative and (not isinstance(trace_id, int) or trace_id not in collections):
# Set PCAP.
while True:
neg_pcap = input("Enter the path to the PCAP file containing negative traffic [" + previous_inputs["neg_pcap"] + "]: ").strip()
if neg_pcap == "":
neg_pcap = previous_inputs["neg_pcap"]
else:
previous_inputs["neg_pcap"] = neg_pcap
if data.utils.check_file_exists(os.path.expanduser(neg_pcap)):
run["neg_pcap"] = neg_pcap
break
else:
print("The PCAP path entered does not seem to exist.")
# Set input filters.
print("Collecting negative IP filters for importing the PCAP, please enter IPv4/IPv6 addresses or subnets separated by ',':")
if data.constants.IP_SRC in neg_filter_types:
while True:
clients = input("Who are the innocent clients under suspicion [" + previous_inputs["neg_clients"] + "]: ").strip()
if clients == "":
clients = previous_inputs["neg_clients"].split(",")
else:
previous_inputs["neg_clients"] = clients
clients = clients.split(",")
clients = [i.strip() for i in clients]
if all([data.utils.build_subnet(i) for i in clients]):
for i in clients:
run['neg_filters'].append([i, data.constants.IP_SRC])
break
else:
print("Some addresses or subnets entered are invalid.")
if data.constants.IP_EITHER in neg_filter_types:
while True:
clients = input("Observe traffic in both directions passing through [" + previous_inputs["neg_either"] + "]: ").strip()
if clients == "":
clients = previous_inputs["neg_either"].split(",")
else:
previous_inputs["neg_either"] = clients
clients = clients.split(",")
clients = [i.strip() for i in clients]
if all([data.utils.build_subnet(i) for i in clients]):
for i in clients:
run['neg_filters'].append([i, data.constants.IP_EITHER])
break
else:
print("Some addresses or subnets entered are invalid.")
if data.constants.IP_DST in neg_filter_types:
while True:
servers = input("Who are the innocent servers under suspicion [" + previous_inputs["neg_servers"] + "]: ").strip()
if servers == "":
servers = previous_inputs["neg_servers"].split(",")
else:
previous_inputs["neg_servers"] = servers
servers = servers.split(",")
servers = [i.strip() for i in servers]
if all([data.utils.build_subnet(i) for i in servers]):
for i in servers:
run['neg_filters'].append([i, data.constants.IP_DST])
break
else:
print("Some addresses or subnets entered are invalid.")
# We can use an existing collection instead.
elif use_negative:
run['neg_collection'] = collections[trace_id]
run['user_params'] = []
if len(run_info["user_params"]) > 0:
for param in run_info["user_params"]:
while True:
val = input("Enter the {} value for runtime parameter `{}`: ".format(param[1].__name__, param[0])).strip()
try:
param_val = param[1](val)
run['user_params'].append([param[0], param_val])
break
except:
print("The value entered is invalid.")
# Ask user for a name.
user_defined_name = input("Add a custom name to this run for easy identification: ").strip()
if user_defined_name == "":
user_defined_name = "{} on {}".format(strat["object"], run_info["run_description"])
print("Defaulting name to {}.".format(user_defined_name))
run["user_defined_name"] = user_defined_name
procedure.append(run)
print("Strategy run has been successfully added to the current procedure.")
if len(procedure) > 0:
self._current_procedure = procedure
print("Procedure successfully created, enter `execute` to run it.")
else:
print("No strategy run has been programmed into the procedure, abandoning it.")
[docs] @Commands.register("Display the current procedure.")
def current(self):
print(utils.printable_procedure(self._current_procedure, self._strategy_map))
[docs] @Commands.register("Execute the current procedure.")
def execute(self):
valid_procedure, _ = utils.validate_procedure(self._current_procedure, self._strategy_map)
if len(self._current_procedure) == 0 or not valid_procedure:
print("There is no valid procedure loaded, enter `new` to create a new procedure.")
return
results, new_procedure = utils.execute_procedure(self._current_procedure, self._strategy_map, db_sub=True)
print("Execution of the current procedure is complete.")
if len(results) > 0:
# Check if there were any non-collection inputs.
original_procedure_has_explicit_inputs = False
for run in self._current_procedure:
if run["pt_collection"] == "" or run["neg_collection"] == "":
original_procedure_has_explicit_inputs = True
break
# If so, ask the user whether they want to replace it.
if original_procedure_has_explicit_inputs:
replace = input("Do you wish to use the MongoDB copy/copies instead of the PCAP(s) from now on? This will make the new procedure faster but unportable [y/N]:")
if replace.lower() == 'y':
self._current_procedure = new_procedure
print("Procedure settings replaced, `save` the procedure to file to permanently apply the changes.")
for result in results:
# Indexed by a global counter, each contains strategy module,
# run order of the strategy, the instance spawn, and the run's
# user defined name.
self._results[self.__result_counter] = [result[1]["strategy"],
result[1]["run_order"], result[0], result[1]["user_defined_name"]]
self.__result_counter += 1
else:
print("No strategy run has been successfully executed.")
[docs] @Commands.register("Save the current procedure to file.")
def save(self):
valid_procedure, _ = utils.validate_procedure(self._current_procedure, self._strategy_map)
if len(self._current_procedure) == 0 or not valid_procedure:
print("There is no valid procedure loaded, enter `new` to create a new procedure.")
return
while True:
if self._imported_path != "":
path_prompt = "Enter the export path for the procedure [" + self._imported_path + "]: "
else:
path_prompt = "Enter the export path for the procedure (e.g. ~/Documents/covertmark.json): "
out_path = input(path_prompt).strip()
if out_path == "cancel":
break
if self._imported_path != "" and out_path == "":
out_path = self._imported_path
if utils.save_procedure(out_path, self._current_procedure, self._strategy_map, overwrite=True):
print("Successfully saved current procedure to " + out_path + ".")
break
else:
print("Unable to save current procedure due to invalid output path or a permissions issue.")
[docs] @Commands.register("List results from strategy runs in this session.")
def results(self):
if len(self._results) == 0:
print("There are no results yet, enter `execute` to run the current procedure for results.")
return
print(utils.printable_results(self._results, self._strategy_map))
[docs] @Commands.register("Clear the currently stored results.")
def delresults(self):
confirm = input("Are you sure you want to delete all results in this session? [y/N]:").strip()
if confirm.lower() == "y":
self._results = {}
print("Deletion successful.")
else:
print("Deletion abandoned.")
[docs] @Commands.register("Get the Wireshark display filters for a result.")
def wireshark(self):
if len(self._results) == 0:
print("There are no results yet, enter `execute` to run the current procedure for results.")
return
print("Available results:")
print(utils.printable_results(self._results, self._strategy_map))
while True:
try:
result = input("Enter result ID to view its falsely blocked IPs' Wireshark display filter (`end` to quit): ").strip()
if result == "end":
break
else:
result = int(result)
if result not in self._results:
raise ValueError()
except:
print("Invalid result ID.")
continue
print(self._results[result][2].report_blocked_ips())
print()
[docs] @Commands.register("Compute CovertMark scores for the current results.")
def score(self):
if len(self._results) == 0:
print("There are no results yet, enter `execute` to run the current procedure for results.")
return
print("Available results:")
print(utils.printable_results(self._results, self._strategy_map))
print("The overall score will assume that all these results are for the same PT protocol involved.")
print()
results = {}
for n, result in self._results.items():
score, config = result[2]._score_performance_stats()
results[n] = {}
if 0 <= score <= 100:
for r in c.RATINGS:
if r[0] <= score < r[1]:
attrs = c.RATINGS[r]
results[n]["colour"] = attrs[0]
results[n]["explanation"] = attrs[1]
break
results[n]["score"] = round(score, 2)
results[n]["config"] = result[2].interpret_config(config)
results[n]["strategy_name"] = result[2].NAME
results[n]["run_description"] = [i for i in self._strategy_map[result[0]]["runs"] if i["run_order"] == result[1]][0]["run_description"]
results[n]["run_name"] = result[3]
results = sorted(results.items(), key=lambda x: x[1]["score"])
# The lower the score, the better the protocol. Therefore ascending.
print(c.CM_NAME + " Report:")
# Tabulate is funky with spaces when ANSI colour is involved, therefore not used.
for _, result in results:
print("-"*80)
print(result["colour"] + str(result["score"]) + c.colours.ENDC + " from " +\
result["strategy_name"] + " | " + result["run_name"])
print(c.colours.BOLD + result["explanation"] + c.colours.ENDC)
print("Best strategy configuration: " + result["config"])
print("-"*80)
overall_score = results[-1][1]["score"]
overall_colour = results[-1][1]["colour"]
overall_band = ""
for r in c.RATING_BANDS:
if r[0] <= overall_score < r[1]:
overall_band = c.RATING_BANDS[r]
break
print("Overall rating: " + overall_colour + overall_band + c.colours.ENDC)
print("The overall rating is determined by its weakest performance among all benchmark strategies.")
[docs] @Commands.register("Generate and export full CSVs of results.")
def csv(self):
if len(self._results) == 0:
print("There are no results yet, enter `execute` to run the current procedure for results.")
return
default_path = "~/Documents/"
out_path = input("Enter the directory for CSVs to be saved to [" + default_path + "]: ").strip()
if out_path == "":
out_path = default_path
if not os.path.isdir(os.path.expanduser(out_path)):
print("Invalid path.")
return
written = utils.save_csvs(self._results, out_path)
if len(written) == len(self._results):
print("All CSVs have been successfully written to " + out_path + ".")
elif len(written) < len(self._results) and len(written) > 0:
print("Some CSVs have been successfully written to " + out_path + ", but others failed.")
else:
print("Could not write the CSVs to " + out_path + ", check for strategy execution errors and permissions.")
[docs] @Commands.register("Select and plot data from results.")
def plot(self):
if len(self._results) == 0:
print("There are no results yet, enter `execute` to run the current procedure for results.")
return
# Get destination of the plots first.
default_path = "~/Documents/"
out_path = input("Enter the directory for plots to be saved to [" + default_path + "]: ").strip()
if out_path == "":
out_path = default_path
if not os.path.isdir(os.path.expanduser(out_path)):
print("Invalid path.")
return
# Save the CSVs to a temporary location by saving single CSVs.
temp_dir = os.path.join(gettempdir(), utils.random_file_name(c.CM_NAME, "csvs"))
os.makedirs(temp_dir)
csvs = {i: utils.save_csvs({i: self._results[i]}, temp_dir)[0] for i in self._results}
csv_headers = {i: self._results[i][2].make_csv().splitlines()[0] for i in self._results}
csv_names = {i: self._results[i][3] for i in self._results}
# Extract the attributes available for plotting.
y_keys = list(strategy.constants.TIME_STATS_DESCRIPTIONS.keys())
y_values = list(strategy.constants.TIME_STATS_DESCRIPTIONS.values())
attributes = defaultdict(list)
for i, header in csv_headers.items():
attrs = [x.strip() for x in header.split(",")]
for attr in attrs:
if attr not in y_values:
attributes[attr].append(i)
# Display the selection table.
headers = ("ID", "Attribute", "Results From")
contents = []
attr_id = 0
attr_id_to_attr = {}
for attr, in_results in attributes.items():
attribute_name = utils.width(attr, 25)
results_names = utils.width(",".join(list(set([self._results[i][2].NAME for i in in_results]))), 30)
contents.append((attr_id, attribute_name, results_names))
attr_id_to_attr[attr_id] = attr
attr_id += 1
print("The following attributes are available on the x-axis of the plot:")
print(tabulate(contents, headers, tablefmt="fancy_grid"))
while True:
x_attrs = input("Select a list of x-axis attribute IDs for plotting, separated by ',': ").strip()
try:
x_attrs = [int(i.strip()) for i in x_attrs.split(",")]
if len(x_attrs) < 1:
raise ValueError()
for x_attr in x_attrs:
if x_attr < 0 or x_attr >= attr_id:
raise ValueError()
break
except:
print("Invalid attribute ID(s) were entered.")
x_attrs_selected = [attr_id_to_attr[i] for i in x_attrs]
print("The following attributes are available on the y-axis of the plot:")
print(tabulate([(i, y_keys[i], y_values[i]) for i, _ in enumerate(y_keys)],
("ID", "Attribute Key", "Attribute Text"), tablefmt="fancy_grid"))
y_id_to_y = {i: y_values[i] for i, _ in enumerate(y_keys)}
# All strategies should have these information available after execution.
while True:
y_attrs = input("Select a list of y-axis attribute IDs for plotting, separated by ',': ").strip()
try:
y_attrs = [int(i.strip()) for i in y_attrs.split(",")]
if len(y_attrs) < 1:
raise ValueError()
for y_attr in y_attrs:
if y_attr < 0 or y_attr >= len(y_keys):
raise ValueError()
break
except:
print("An invalid attribute ID was entered.")
y_attrs_selected = [y_id_to_y[i] for i in y_attrs]
plot_attrs = product(x_attrs_selected, y_attrs_selected)
for plot_attr in plot_attrs:
relevant_results = attributes[plot_attr[0]]
relevant_csvs = [csvs[i] for i in relevant_results]
names = [csv_names[i] for i in relevant_results]
out_file = os.path.join(out_path, utils.random_file_name(plot_attr[0] + "_" + plot_attr[1], "png"))
out_file = out_file.replace(" ", "_")
default_title = plot_attr[1] + " by " + plot_attr[0]
print("Plotting {} against {}...".format(plot_attr[1], plot_attr[0]))
title = input("Give this plot a title [{}]: ".format(plot_attr[1])).strip()
if title == "":
title = default_title
data.plot.plot_performance(relevant_csvs, names, plot_attr[0], plot_attr[1],
show=False, img_out=out_file, title=title)
print("Saving " + out_file + "...")