from .. import analytics, data
from . import constants
import os
from abc import ABC, abstractmethod
from datetime import date, datetime
from collections import defaultdict
from timeit import default_timer
from math import log1p
[docs]class DetectionStrategy(ABC):
"""
An abstract class of a pluggable transport detection strategy, including
parsing of positive and negative test traces, positive case splitting,
performing analytics, and reporting results. Implement this class to produce
individual strategies.
"""
NAME = "Default Strategy"
DESCRIPTION = "A description of this strategy here."
_DEBUG_PREFIX = "DefaultStrategy" # For prefixing debug messages only.
RUN_CONFIG_DESCRIPTION = [] # A list of strings representing the fixed format
# of configuration for each run initiated in
# self.positive_run and self._negative_run.
def __init__(self, pt_pcap, negative_pcap=None, recall_pcap=None, debug=False):
self.__debug_on = debug
self.__pt_pcap = pt_pcap
if negative_pcap is not None:
self.__negative_pcap = negative_pcap
else:
self.__negative_pcap = None
if recall_pcap is not None:
self.__recall_pcap = recall_pcap
else:
self.__recall_pcap = None
self.__reader = data.retrieve.Retriever()
# MongoDB collections.
self._pt_collection = None
self._neg_collection = None
self._recall_collection = None
self._pt_collection_total = 0
self._neg_collection_total = 0
self._recall_collection_total = 0
# Lists of traces to be loaded.
self._packets_parsed = False
self._packets_loaded = False
self._pt_packets = []
self._pt_test_packets = []
self._pt_validation_packets = []
self._pt_split = False
self._neg_packets = []
self._recall_packets = []
self._positive_subnets = []
self._negative_subnets = []
self._recall_subnets = []
# The strategic filter to examine a subset of loaded packets.
self._strategic_packet_filter = {}
# The strategy's internal states.
self._strategic_states = {}
self._true_positive_rate = None
self._false_positive_rate = None
self._false_positive_blocked_rate = 0
self._negative_unique_ips = 0
self._negative_blocked_ips = set([])
self._recall_rate = None
self._test_recall = False
# For debug outputs, overwrite if required.
self.DEBUG = debug
# For scoring runs.
# The top level dictionary is arbitrarily indexed to allow subsequent
# amendments of records from the same run configuration.
# {'time': execution_time, 'TPR': True Positive Rate, 'FPR': False Positive Rate,
# 'block_rate': False Positive IP Block Rate}
# 'time' records the positive execution time, as negative validation is
# normally not required during live DPI operations.
self._time_statistics = {}
# For windowing-based strategies only.
self._window_size = 25
self._target_ip_occurrences = defaultdict(int)
def _parse_PT_packets(self, pt_filters):
"""
Parse positive test packets stored in the PCAP file.
N.B. Filters at this stage are intended to be used to remove unrelated
packets accidentally captured in the process, so that they do not affect
testing/training of positive case analysis. If the analysis strategy
only examines a subset of all PT traffic (e.g. packets with TLS payloads),
its filters should be set separately in :meth:`set_strategic_filter`.
This separation is by design so that source/destination import filtering
can be changed dynamically at each run based on the actual pcap, while
strategic filtering is expected to remain unchanged.
:param list pt_filters: Parser filters for PT trace parsing. Presented as a
list of tuples to parse upstream or downstream packets only. e.g.
[('192.168.0.42', data.constants.IP_SRC),
('13.32.68.100', data.constants.IP_DST)]
For filter matching rules, see :meth:`~CovertMark.data.parser.PCAPParser.set_ip_filter`.
For an empty (allow-all) filter, use `{}`.
:returns: True if a non-zero amount of packets were parsed, False otherwise.
"""
assert(all([i.isalnum() or i in [".", "_", "-", " "] for i in self.NAME]))
self.__pt_parser = data.parser.PCAPParser(self.__pt_pcap)
self.__pt_parser.set_ip_filter(pt_filters)
self.set_case_membership(pt_filters, None)
desp = self.NAME + " positive packets from " + os.path.basename(self.__pt_pcap)
self._pt_collection = self.__pt_parser.load_and_insert_new(description=desp)
if self._pt_collection:
return True
else:
return False
def _parse_negative_packets(self, negative_filters):
"""
Parse negative test packets stored in the PCAP file.
:param list negative_filters: same format as in :meth:`_parse_PT_packets`.
Allow-all by default.
:returns: True if a non-zero amount of packets were parsed, False otherwise.
"""
assert(all([i.isalnum() or i in [".", "_", "-", " "] for i in self.NAME]))
self.__neg_parser = data.parser.PCAPParser(self.__negative_pcap)
self.__neg_parser.set_ip_filter(negative_filters)
self.set_case_membership(None, negative_filters)
desp = self.NAME + " negative packets from " + os.path.basename(self.__negative_pcap)
self._neg_collection = self.__neg_parser.load_and_insert_new(description=desp)
if self._neg_collection:
return True
else:
return False
def _parse_recall_packets(self, recall_filters):
"""
Parse positive recall test packets stored in the PCAP file.
:param list recall_filters: same format as in :meth:`_parse_PT_packets`.
Allow-all by default.
:returns: True if a non-zero amount of packets were parsed, False otherwise.
"""
assert(all([i.isalnum() or i in [".", "_", "-", " "] for i in self.NAME]))
self.__recall_parser = data.parser.PCAPParser(self.__recall_pcap)
self.__recall_parser.set_ip_filter(recall_filters)
desp = self.NAME + " positive recall packets from " + os.path.basename(self.__recall_pcap)
self._recall_collection = self.__recall_parser.load_and_insert_new(description=desp)
if self._recall_collection:
return True
else:
return False
def _load_into_memory(self):
"""
Load parsed positive (and if set, negative) test traces from MongoDB
into runtime memory for analysis, applying :attr:`_strategic_filter` to
both.
:returns: True if successfully loaded, False otherwise.
"""
self.__reader.select(self._pt_collection)
self.debug_print("- Retrieving from {}...".format(self.__reader.current()))
self._pt_packets = self.__reader.retrieve(trace_filter=self._strategic_packet_filter)
self._pt_collection_total = self.__reader.count(trace_filter={})
if len(self._pt_packets) == 0:
return False
# Reload positive filters.
pt_filters = self.__reader.get_input_filters()
if pt_filters:
pt_clients = [i[0] for i in pt_filters]
self.debug_print("- Automatically setting the corresponding input filters for positive clients: {}".format(str(pt_clients)))
self.set_case_membership(pt_filters, None)
else:
self.debug_print("Input filters attached to the positive collection do not exist or are invalid, must be manually set with set_case_membership().")
# If no negative pcap parsed, we skip it.
if self._neg_collection is not None:
self.__reader.select(self._neg_collection)
self.debug_print("- Retrieving from {}...".format(self.__reader.current()))
self._neg_packets = self.__reader.retrieve(trace_filter=self._strategic_packet_filter)
self._neg_collection_total = self.__reader.count(trace_filter={})
# Record distinct destination IP addresses for stat reporting.
self._negative_unique_ips = self.__reader.distinct('dst')
if len(self._neg_packets) == 0:
return False
# Reload negative filters.
neg_filters = self.__reader.get_input_filters()
if neg_filters:
neg_clients = [i[0] for i in neg_filters]
self.debug_print("- Automatically setting the corresponding input filters for negative clients: {}".format(str(neg_clients)))
self.set_case_membership(None, neg_filters)
else:
self.debug_print("Input filters attached to the negative collection do not exist or are invalid, must be manually set with set_case_membership().")
# If no recall pcap parsed, we finish here.
if self._recall_collection is None:
self._packets_loaded = True
return True
self.__reader.select(self._recall_collection)
self.debug_print("- Retrieving from {}...".format(self.__reader.current()))
self._recall_packets = self.__reader.retrieve(trace_filter=self._strategic_packet_filter)
self._recall_collection_total = self.__reader.count(trace_filter={})
# Set recall subnets.
recall_filters = self.__reader.get_input_filters()
if recall_filters:
self._recall_subnets = [data.utils.build_subnet(i[0]) for i in recall_filters if i[1] in [data.constants.IP_SRC, data.constants.IP_EITHER]]
self.debug_print("Automatically set the corresponding input filters for recall clients: {}.".format(str([i[0] for i in recall_filters])))
if len(self._recall_packets) == 0:
return False
self.debug_print("Positive recall packets loaded.")
self._packets_loaded = True
return True
[docs] def set_case_membership(self, positive_filters, negative_filters):
"""
Set an internal list of positive and negative subnets for membership
checking with :meth:`in_positive_filter` and :meth:`in_negative_filter`. This
is useful if a mixed pcap needs to be parsed into :attr:`_pt_packets` only.
If only one of the two needs to be set, pass in None in the corresponding
other parameter.
:param list positive_filters: list of input filters covering PT traffic.
:param list negative_filters: list of negative filters covering innocent traffic.
"""
if positive_filters:
positive_subnets = [data.utils.build_subnet(i[0]) for i in positive_filters]
if all(positive_subnets):
self._positive_filters = positive_filters
self._positive_subnets = positive_subnets
if negative_filters:
negative_subnets = [data.utils.build_subnet(i[0]) for i in negative_filters]
if all(negative_subnets):
self._negative_filters = negative_filters
self._negative_subnets = negative_subnets
return True
[docs] def in_positive_filter(self, ip):
"""
Check membership in the positive filter subnets.
:param str ip: input IP or subnet.
:returns: True if IP or subnet specified is in the positive input filter,
False otherwise, or if input invalid.
"""
ip_subnet = data.utils.build_subnet(ip)
if not ip_subnet:
return False
for i in self._positive_subnets:
if i.overlaps(ip_subnet):
return True
return False
[docs] def in_negative_filter(self, ip):
"""
Check membership in the negative filter subnets.
:param str ip: input IP or subnet.
:returns: True if IP or subnet specified is in the negative input filter,
False otherwise, or if input invalid.
"""
ip_subnet = data.utils.build_subnet(ip)
if not ip_subnet:
return False
for i in self._negative_subnets:
if i.overlaps(ip_subnet):
return True
return False
[docs] def run_on_positive(self, config, **kwargs):
"""
Test the detection strategy on positive PT packets, call this instead of
:meth:`positive_run`, due to the need of timing positive executions for
performance statistics.
:param tuple config: a consistently-formatted tuple containing configurations
such as window size and threshold for performance indexing in records. It
should be sufficiently specific to distinguish individual runs of the
same configuration, as otherwise performance records for the config
will be overwritten between runs.
"""
if not self._pt_collection:
return False
if not self._packets_loaded:
self._load_into_memory()
if config is None:
return False
if (isinstance(config, list) or isinstance(config, tuple)):
assert(len(config) == len(self.RUN_CONFIG_DESCRIPTION))
time_start = default_timer()
tpr = self.positive_run(**kwargs)
duration = default_timer() - time_start
self._true_positive_rate = tpr
self.register_performance_stats(config, time=duration, TPR=tpr)
return self._true_positive_rate
[docs] def run_on_negative(self, config, **kwargs):
"""
Optionally test the detection strategy on negative client packets, call this
instead of :meth:`negative_run`.
:param tuple config: a consistently-formatted tuple containing configurations
such as window size and threshold for performance indexing in records. It
should be sufficiently specific to distinguish individual runs of the
same configuration, as otherwise performance records for the config
will be overwritten between runs.
"""
if not self._neg_collection:
return False
if not self._packets_loaded:
self._load_into_memory()
if config is None:
return False
if (isinstance(config, list) or isinstance(config, tuple)):
assert(len(config) == len(self.RUN_CONFIG_DESCRIPTION))
# Clear the falsely block set.
self._negative_blocked_ips = set()
fpr = self.negative_run(**kwargs)
self._false_positive_rate = fpr
self._false_positive_blocked_rate = float(len(self._negative_blocked_ips)) / self._negative_unique_ips
self.register_performance_stats(config, FPR=fpr, ip_block_rate=self._false_positive_blocked_rate)
return self._false_positive_rate
[docs] def run_on_recall(self, **kwargs):
"""
Wrapper for the optional :meth:`recall_run`, testing a trained classifier
on positive recall packets.
"""
if not self._recall_collection:
return False
if not self._packets_loaded:
self._load_into_memory()
self._recall_rate = self.recall_run(**kwargs)
return self._recall_rate
def _score_performance_stats(self):
"""
Based on the execution time, TPR, and FPR of strategy runs, score the
effectiveness of this strategy in identifying the input PT.
:returns: a floating point score between 0 and 100 for this strategy,
and the config underwhich this was achieved.
"""
# Filter out records yielding unacceptable TPR or FPR values.
acceptables = list(filter(lambda x: x[1]['TPR'] >= constants.TPR_BOUNDARY \
and x[1]['FPR'] <= constants.FPR_BOUNDARY and all(x[1]), self._time_statistics.items()))
acceptable_runs = [i[1] for i in acceptables]
acceptable_configs = [i[0] for i in acceptables]
# If invalid values or no acceptable runs, this strategy scores zero.
if len(acceptable_runs) < 1:
return 0, None
for i in acceptable_runs:
if not (0 <= i['TPR'] <= 1) or not (0 <= i['FPR'] <= 1):
return 0, None
# Penalise runs for their differences from best TPR/FPR and time values.
best_tpr = max([i['TPR'] for i in acceptable_runs])
worst_time = max([i['time'] for i in acceptable_runs])
scaled_times = [i['time'] / worst_time for i in acceptable_runs]
best_scaled_time = min(scaled_times)
tpr_penalties = [log1p((best_tpr - i['TPR'])*100) for i in acceptable_runs]
fpr_penalties = [log1p((max(0, i['FPR'] - constants.FPR_TARGET))*100) for i in acceptable_runs] # Hard target for FPR.
time_penalties = [log1p((i - best_scaled_time)*100) for i in scaled_times]
# For IP falsely blocked rate, penalise from zero.
block_rate_penalties = [log1p(i['block_rate']*100) for i in acceptable_runs]
# Calculate weighted penalties across all metrics.
overall_penalties = []
for i in range(len(tpr_penalties)):
overall_penalties.append(tpr_penalties[i] * constants.PENALTY_WEIGHTS[0] + \
fpr_penalties[i] * constants.PENALTY_WEIGHTS[1] + \
time_penalties[i] * constants.PENALTY_WEIGHTS[2] + \
block_rate_penalties[i] * constants.PENALTY_WEIGHTS[3])
# Now find out the minimum penalty required to reach the acceptable
# TPR and FPR performance, and calculate the scores accordingly.
scores = [(log1p(100) - i) / log1p(100) * 100 for i in overall_penalties]
# Apply strategy-specific penalisation.
strategy_penalised_scores = []
for i, score in enumerate(scores):
# Clip the penalty proportion to between 0 and 1.
strategy_penalty = sorted([0, self.config_specific_penalisation(acceptable_configs[i]), 1])[1]
strategy_penalised_scores.append(score * (1-strategy_penalty))
best_score = max(strategy_penalised_scores)
best_config = acceptable_configs[strategy_penalised_scores.index(max(strategy_penalised_scores))]
return best_score, best_config
[docs] def split_pt(self, split_ratio=0.7):
"""
Gatekeeper method for :meth:`test_validation_split`, ensuring that it is
called after traces have been loaded from MongoDB into memory. Performs
an implicit trace load if not yet loaded. Call this method to perform
a split.
Do not override this method, but override :meth:`test_validation_split` below.
:param float split_ratio: the proportion of positive packets used as test
rather than validation in a split.
"""
if not self._packets_loaded:
self._load_into_memory()
splits = self.test_validation_split(split_ratio)
if splits and isinstance(splits, tuple):
test, validation = splits
# Only validate split if a non-empty split has been performed.
if len(test) > 0 or len(validation) > 0:
self._pt_test_packets = test
self._pt_validation_packets = validation
self._pt_split = True
[docs] def debug_print(self, message):
"""
Prints a debug message to the console, useful for debugging. Appends the
strategy name and timestamp automatically.
"""
if self.__debug_on == False:
return
msg = "[" + self._DEBUG_PREFIX + "] " + str(datetime.now()) +" : " + message
print(msg)
[docs] def setup(self, pt_ip_filters=[], negative_ip_filters=[], pt_collection=None,
negative_collection=None, test_recall=False, recall_ip_filters=[],
recall_collection=None):
"""
Set up the analysis strategy with filters and any existing collection names.
To skip parsing traces again and use existing collections in MongoDB,
both `pt_collection` and `negative_collection` need to be set to valid names.
Recall used for evaluation of strategy itself only, not for user's use.
:param list pt_ip_filters: input IP filters for positive test packets.
:param list negative_ip_filters: input IP filters for negative test packets.
:param str pt_collection: set pt_collection to be the name of an existing
collection in MongoDB to skip parsing again.
:param str negative_collection: set negative_collection to be the name of an
existing collection in MongoDB to skip parsing again.
:param bool test_recall: if True, the strategy will also test the classifier
on unseen positive recall packets to cross validate.
:param list recall_ip_filters: input IP filter for recall test packets.
:param str recall_collection: set recall_collection to be the name of an
existing collection in MongoDB to skip parsing again.
"""
self.debug_print("Executing detection strategy: " + self.NAME)
self.debug_print(self.DESCRIPTION)
reparsing_positive = True
if not self.__negative_pcap:
reparsing_negative = False
else:
reparsing_negative = True
if pt_collection is not None:
if self.__reader.select(pt_collection):
reparsing_positive = False
self._pt_collection = pt_collection
self.debug_print("Loading existing PT trace...")
else:
self.debug_print("Re-parsing PT PCAP file as {} does not exist in MongoDB...".format(pt_collection))
if reparsing_positive:
self.debug_print("- Parsing PT PCAP...")
if self._parse_PT_packets(pt_ip_filters):
self.debug_print("Parsed PCAP file according to input positive IP filters.")
else:
raise RuntimeError("! Failure to parse positive PCAP files.")
if negative_collection is not None:
if self.__reader.select(negative_collection):
reparsing_negative = False
self._neg_collection = negative_collection
self.debug_print("Loading existing negative trace...")
else:
self.debug_print("Re-parsing negative trace as {} does not exist in MongoDB...".format(negative_collection))
if reparsing_negative:
self.debug_print("- Parsing negative PCAP...")
if self._parse_negative_packets(negative_ip_filters):
self.debug_print("Parsed PCAP file according to input negative IP filters.")
else:
raise RuntimeError("! Failure to parse negative PCAP file.")
if test_recall:
self._test_recall = True
self.debug_print("This run will test the positive recall of the best classifier.")
if self.__reader.select(recall_collection):
self._recall_collection = recall_collection
self.debug_print("Loading existing recall trace...")
else:
self.debug_print("- Attempting to parse recall PCAP as specified recall collection does not exist.")
if self._parse_recall_packets(recall_ip_filters):
self.debug_print("Parsed PCAP file according to input recall IP filters.")
else:
raise RuntimeError("! Failure to parse recall PCAP file.")
self._packets_parsed = True
[docs] def load(self):
"""
Load parsed or stored packets from their trace collections.
Call this method when it is ready to load traces from memory. Call this
method again after calling :meth:`set_strategic_filter` to set a new
strategic filter, as afterwards traces need to be reloaded based on the new
filter.
"""
self.debug_print("- Setting strategic filter...")
self.set_strategic_filter()
self.debug_print("Strategy filter on traces from MongoDB: {}".format(self._strategic_packet_filter))
self.debug_print("- Loading packets according to the initial strategic filter...")
self._load_into_memory()
self.debug_print("Positive: {} packets, examining {}.".format(self._pt_collection_total, len(self._pt_packets)))
self.debug_print("Negative: {} packets, examining {}.".format(self._neg_collection_total, len(self._neg_packets)))
self.debug_print("Positive Recall: {} packets, examining {}.".format(self._recall_collection_total, len(self._recall_packets)))
[docs] def run(self, **kwargs):
"""
The entry point of the strategy.
"""
if not self._packets_parsed:
raise RuntimeError("Use self.setup(...) to set up the strategy before running.")
if not self._packets_loaded:
self.debug_print("- Loading traces...")
self.load()
self.run_strategy(**kwargs)
[docs] def clean_up_mongo(self):
"""
Deletes the temporary MongoDB collection used to store traces. This
prevents further runs from being carried out, therefore to be used at
the end of execution only.
"""
self.__pt_parser.clean_up(self._pt_collection)
if self._neg_collection is not None:
self.__neg_parser.clean_up(self._neg_collection)
self._packets_parsed = False
[docs] def make_csv(self):
"""
Return a CSV containing the performance metrics for each run (potentially)
with different config sets. Each element of the run config set will be
supplied in a separate column. This CSV can also be used for plotting in
data.plot. For the ease of plotting very small values, values entered
into the CSV will be converted into percentages.
:returns: a csv containing all performance stats.
"""
csv_str = ""
title = ""
for desp in self.RUN_CONFIG_DESCRIPTION:
title += desp + ","
time_stats_keys = sorted(constants.TIME_STATS_DESCRIPTIONS.keys())
for stat in time_stats_keys:
title += constants.TIME_STATS_DESCRIPTIONS[stat] + ","
title = title[:-1] + "\n" # Remove the trailing comma.
csv_str += title
for config in self._time_statistics:
for i in config:
csv_str += str(i) + ","
for stat in time_stats_keys:
if constants.TIME_STATS_IS_FRACTION[stat]:
csv_str += "{:0.5f},".format(self._time_statistics[config][stat]*100)
else:
csv_str += "{:0.5f},".format(self._time_statistics[config][stat])
csv_str = csv_str[:-1] + "\n" # Remove the trailing comma again.
return csv_str
[docs] def destroy_traces(self):
"""
Erase imported positive, negative and recall traces (if any) from memory,
but preserving statistical states. This allows light-weight storage of the
strategy instance for further analysis. The strategy will no longer be
usable after this.
"""
self._packets_parsed = False
self._pt_packets = []
self._pt_test_packets = []
self._pt_validation_packets = []
self._neg_packets = []
self._recall_packets = []
# ========================To be implemented below==========================#
[docs] def run_strategy(self, **kwargs):
"""
Run the detection strategy. See other methods for detailed syntax of
IP and strategic filters. Override if custom procedures required, such
as adjusting a positive run after each negative run.
Do *not* call this method, use :meth:`run` as entry point.
:returns: tuple(:attr:`_true_positive_rate`, :attr:`_false_positive_rate`)
"""
self.debug_print("- Running detection strategy on positive test packets...")
self._true_positive_rate = self.run_on_positive(())
self.debug_print("Reported true positive rate: {}".format(self._true_positive_rate))
if self._neg_collection is not None:
self.debug_print("- Validating detection strategy on negative test packets...")
self._false_positive_rate = self.run_on_negative(())
self.debug_print("Reported false positive rate: {}".format(self._false_positive_rate))
self.debug_print("False positive IPs blocked rate: {}".format(self._false_positive_blocked_rate))
if self._test_recall:
self.debug_print("- Validating best strategy on positive recall packets...")
self._recall_rate = self.run_on_recall()
self.debug_print("Reported positive recall rate: {}".format(self._recall_rate))
self.register_performance_stats((), TPR=self._true_positive_rate,
FPR=self._false_positive_rate, ip_block_rate=self._false_positive_blocked_rate)
return (self._true_positive_rate, self._false_positive_rate)
[docs] @abstractmethod
def set_strategic_filter(self, new_filter={}):
"""
While packets not related to the PT in the positive case should have
been removed from positive packets when parsing the pcap file
(e.g. :meth:`_parse_PT_packets`), if this strategy only wants to examine
certain packets in the traces, such as those with TLS payloads only,
they should be specified here in the strategic filter. The syntax follows
MongoDB queries on the packet syntax.
(For packet syntax see :meth:`~CovertMark.data.parser.PCAPParser.load_packet_info`)
Implement this method by assigning to :meth:`_strategic_packet_filter`,
optionally you can call this method again between positve and negative
runs to adjust the filter as necessary with a new filter.
:meth:`load` should be called again after each change of filter to reload
the postive and negative traces with the new filter.
:param dict new_filter: MongoDB trace querying filter, examples:
- Only examine TCP packets: {"tcp_info": {"$ne": None}}
- Only examine TCP packets with non-empty payload:
{"tcp_info": {"$ne": None}, "tcp_info.payload": {"$ne": b''}}
"""
self._strategic_packet_filter = new_filter
[docs] def test_validation_split(self, split_ratio):
"""
Perform a split of positive test packets into test and validation sets if
required by the strategy. Override this method if split required, otherwise,
keep it returning a tuple of empty lists as followed.
:param float split_ratio: passed in from :meth:`split_pt`
:returns: tuple(test_packets, validation_packets)
"""
return ([], [])
[docs] @abstractmethod
def positive_run(self, **kwargs):
"""
Perform PT detection strategy on positive test packets.
Available data:
- The number of positive packets in the collection under input filter (but
regardless of the strategic filter):
--- :attr:`_pt_collection_total`
- All positive test packets under strategic filter:
--- :attr:`_pt_packets`
- If :attr:`_pt_split` is True (split into test and validation packets)
--- :attr:`_pt_test_packets`
--- :attr:`_pt_validation_packets`
Assign to :attr:`_strategic_states` if information needs to be stored
between runs or carried over into negative test runs.
Implement this method.
:returns: True positive identification rate as your strategy interprets.
"""
return 0
[docs] def negative_run(self, **kwargs):
"""
Perform PT detection strategy on negative test packets to test for False
Positive rate.
Available data:
- The number of negative packets in the collection under input filter (but
regardless of the strategic filter):
--- :attr:`_neg_collection_total`
- All negative test packets under strategic filter:
--- :attr:`_neg_packets`
- A set of unique IPs seen in negative packets:
--- :attr:`_negative_unique_ips`
Assign to :attr:`_strategic_states` if information needs to be stored
between runs or carried over into positive test runs.
Add to :attr:`_negative_blocked_ips` to tally blocked IPs for reporting.
Implement this method, simply return 0 if no negative trace required.
:returns: False positive identification rate as your strategy interprets.
"""
return 0
[docs] def recall_run(self, **kwargs):
"""
Perform a recall on unseen positive packets specified in :attr:`_recall_packets`.
You should carry over best parameters obtained from positive and negative
runs or a best classifier through :attr:`_strategic_states` or subclass
variables.
It is assumed that after the recall input filter and the strategic filter,
all packets in :attr:`_recall_packets` are positive packets unseen during
positve and negative runs prior. :attr:`_recall_subnets` should have been
set during :meth:`_load_into_memory`
:returns: the positive recall rate.
"""
return 0
[docs] def report_blocked_ips(self):
"""
Return a Wireshark-compatible filter expression to allow viewing blocked
packets in Wireshark. Useful for studying false positives. Override
this method if needed, draw data from :attr:`_negative_blocked_ips` as set
above.
:returns: a Wireshark-compatible filter expression string.
"""
return ""
[docs] def interpret_config(self, config_set):
"""
Interpret as string a configuration passed into :meth:`run_on_positive`
or :meth:`run_on_negative`, for user reporting.
Override this method to customise reporting string.
:param tuple config_set: a tuple of arbitrary configuration as determined by
the implementing strategy.
:returns: a string interpreting the config set passed in.
"""
config_string = ""
for i in range(len(config_set)):
config_string += str(i) + ": " + config_set[i]
if i < len(config_set) - 1:
config_string += ", "
else:
config_string += "."
return config_string
[docs] def config_specific_penalisation(self, config_set):
"""
Given a specific config and its score computated based on TPR, FPR and
positive run execution time (weighted based on the strategy's assigned
weights), return a percentage of penalisation. This allows consideration
of run config parameters that would adversely affect censor performance
in live operations, but will not increase execution time in CovertMark.
Override this method if config-specific penalisation required.
:param tuple config_set: a tuple of arbitrary configuration as determined by
the implementing strategy.
:returns: a float number between 0 and 1 as the proportion of penalty
applied based on the run parameters. This should be the proportion of
score to be deducted, rather than a scaling factor.
"""
return 0