CovertMark.strategy.strategy module

class CovertMark.strategy.strategy.DetectionStrategy(pt_pcap, negative_pcap=None, recall_pcap=None, debug=False)[source]

Bases: abc.ABC

An abstract class of a pluggable transport detection strategy, including parsing of positive and negative test traces, positive case splitting, performing analytics, and reporting results. Implement this class to produce individual strategies.

DESCRIPTION = 'A description of this strategy here.'
NAME = 'Default Strategy'
RUN_CONFIG_DESCRIPTION = []
clean_up_mongo()[source]

Deletes the temporary MongoDB collection used to store traces. This prevents further runs from being carried out, therefore to be used at the end of execution only.

config_specific_penalisation(config_set)[source]

Given a specific config and its score computated based on TPR, FPR and positive run execution time (weighted based on the strategy’s assigned weights), return a percentage of penalisation. This allows consideration of run config parameters that would adversely affect censor performance in live operations, but will not increase execution time in CovertMark. Override this method if config-specific penalisation required.

Parameters:config_set (tuple) – a tuple of arbitrary configuration as determined by the implementing strategy.
Returns:a float number between 0 and 1 as the proportion of penalty applied based on the run parameters. This should be the proportion of score to be deducted, rather than a scaling factor.
debug_print(message)[source]

Prints a debug message to the console, useful for debugging. Appends the strategy name and timestamp automatically.

destroy_traces()[source]

Erase imported positive, negative and recall traces (if any) from memory, but preserving statistical states. This allows light-weight storage of the strategy instance for further analysis. The strategy will no longer be usable after this.

in_negative_filter(ip)[source]

Check membership in the negative filter subnets.

Parameters:ip (str) – input IP or subnet.
Returns:True if IP or subnet specified is in the negative input filter, False otherwise, or if input invalid.
in_positive_filter(ip)[source]

Check membership in the positive filter subnets.

Parameters:ip (str) – input IP or subnet.
Returns:True if IP or subnet specified is in the positive input filter, False otherwise, or if input invalid.
interpret_config(config_set)[source]

Interpret as string a configuration passed into run_on_positive() or run_on_negative(), for user reporting. Override this method to customise reporting string.

Parameters:config_set (tuple) – a tuple of arbitrary configuration as determined by the implementing strategy.
Returns:a string interpreting the config set passed in.
load()[source]

Load parsed or stored packets from their trace collections. Call this method when it is ready to load traces from memory. Call this method again after calling set_strategic_filter() to set a new strategic filter, as afterwards traces need to be reloaded based on the new filter.

make_csv()[source]

Return a CSV containing the performance metrics for each run (potentially) with different config sets. Each element of the run config set will be supplied in a separate column. This CSV can also be used for plotting in data.plot. For the ease of plotting very small values, values entered into the CSV will be converted into percentages.

Returns:a csv containing all performance stats.
negative_run(**kwargs)[source]

Perform PT detection strategy on negative test packets to test for False Positive rate. Available data: - The number of negative packets in the collection under input filter (but regardless of the strategic filter): — _neg_collection_total - All negative test packets under strategic filter: — _neg_packets - A set of unique IPs seen in negative packets: — _negative_unique_ips Assign to _strategic_states if information needs to be stored between runs or carried over into positive test runs. Add to _negative_blocked_ips to tally blocked IPs for reporting. Implement this method, simply return 0 if no negative trace required.

Returns:False positive identification rate as your strategy interprets.
positive_run(**kwargs)[source]

Perform PT detection strategy on positive test packets. Available data: - The number of positive packets in the collection under input filter (but regardless of the strategic filter): — _pt_collection_total - All positive test packets under strategic filter: — _pt_packets - If _pt_split is True (split into test and validation packets) — _pt_test_packets_pt_validation_packets Assign to _strategic_states if information needs to be stored between runs or carried over into negative test runs. Implement this method.

Returns:True positive identification rate as your strategy interprets.
recall_run(**kwargs)[source]

Perform a recall on unseen positive packets specified in _recall_packets. You should carry over best parameters obtained from positive and negative runs or a best classifier through _strategic_states or subclass variables. It is assumed that after the recall input filter and the strategic filter, all packets in _recall_packets are positive packets unseen during positve and negative runs prior. _recall_subnets should have been set during _load_into_memory()

Returns:the positive recall rate.
register_performance_stats(config, time=None, TPR=None, FPR=None, ip_block_rate=None)[source]

Register timed performance metrics for each specific configuration. This should be called after each individual operation cycle of the strategy.

Parameters:
  • config (tuple) – a consistently-styled tuple containing configurations such as window size and threshold in a tuple, enabling separately setting the TPR and FPR values (below) in different method calls.
  • time (float) – if not None, update the execution time of positive run.
  • TPR (float) – if not None, update the true positive rate of the performance record specified by config. Float between 0 and 1.
  • FPR (float) – if not None, update the false positive rate of the performance record specified by config. Float between 0 and 1.
  • ip_block_rate (float) – if not None, update the rate of falsely blocked IPs of the strategy execution specified by config. Float between 0 and 1.
report_blocked_ips()[source]

Return a Wireshark-compatible filter expression to allow viewing blocked packets in Wireshark. Useful for studying false positives. Override this method if needed, draw data from _negative_blocked_ips as set above.

Returns:a Wireshark-compatible filter expression string.
run(**kwargs)[source]

The entry point of the strategy.

run_on_negative(config, **kwargs)[source]

Optionally test the detection strategy on negative client packets, call this instead of negative_run().

Parameters:config (tuple) – a consistently-formatted tuple containing configurations such as window size and threshold for performance indexing in records. It should be sufficiently specific to distinguish individual runs of the same configuration, as otherwise performance records for the config will be overwritten between runs.
run_on_positive(config, **kwargs)[source]

Test the detection strategy on positive PT packets, call this instead of

positive_run(), due to the need of timing positive executions for performance statistics.

Parameters:config (tuple) – a consistently-formatted tuple containing configurations such as window size and threshold for performance indexing in records. It should be sufficiently specific to distinguish individual runs of the same configuration, as otherwise performance records for the config will be overwritten between runs.
run_on_recall(**kwargs)[source]

Wrapper for the optional recall_run(), testing a trained classifier on positive recall packets.

run_strategy(**kwargs)[source]

Run the detection strategy. See other methods for detailed syntax of IP and strategic filters. Override if custom procedures required, such as adjusting a positive run after each negative run. Do not call this method, use run() as entry point.

Returns:tuple(_true_positive_rate, _false_positive_rate)
set_case_membership(positive_filters, negative_filters)[source]

Set an internal list of positive and negative subnets for membership checking with in_positive_filter() and in_negative_filter(). This is useful if a mixed pcap needs to be parsed into _pt_packets only. If only one of the two needs to be set, pass in None in the corresponding other parameter.

Parameters:
  • positive_filters (list) – list of input filters covering PT traffic.
  • negative_filters (list) – list of negative filters covering innocent traffic.
set_strategic_filter(new_filter={})[source]

While packets not related to the PT in the positive case should have been removed from positive packets when parsing the pcap file (e.g. _parse_PT_packets()), if this strategy only wants to examine certain packets in the traces, such as those with TLS payloads only, they should be specified here in the strategic filter. The syntax follows MongoDB queries on the packet syntax. (For packet syntax see load_packet_info()) Implement this method by assigning to _strategic_packet_filter(), optionally you can call this method again between positve and negative runs to adjust the filter as necessary with a new filter. load() should be called again after each change of filter to reload the postive and negative traces with the new filter.

Parameters:new_filter (dict) – MongoDB trace querying filter, examples: - Only examine TCP packets: {“tcp_info”: {“$ne”: None}} - Only examine TCP packets with non-empty payload: {“tcp_info”: {“$ne”: None}, “tcp_info.payload”: {“$ne”: b’‘}}
setup(pt_ip_filters=[], negative_ip_filters=[], pt_collection=None, negative_collection=None, test_recall=False, recall_ip_filters=[], recall_collection=None)[source]

Set up the analysis strategy with filters and any existing collection names. To skip parsing traces again and use existing collections in MongoDB, both pt_collection and negative_collection need to be set to valid names. Recall used for evaluation of strategy itself only, not for user’s use.

Parameters:
  • pt_ip_filters (list) – input IP filters for positive test packets.
  • negative_ip_filters (list) – input IP filters for negative test packets.
  • pt_collection (str) – set pt_collection to be the name of an existing collection in MongoDB to skip parsing again.
  • negative_collection (str) – set negative_collection to be the name of an existing collection in MongoDB to skip parsing again.
  • test_recall (bool) – if True, the strategy will also test the classifier on unseen positive recall packets to cross validate.
  • recall_ip_filters (list) – input IP filter for recall test packets.
  • recall_collection (str) – set recall_collection to be the name of an existing collection in MongoDB to skip parsing again.
split_pt(split_ratio=0.7)[source]

Gatekeeper method for test_validation_split(), ensuring that it is called after traces have been loaded from MongoDB into memory. Performs an implicit trace load if not yet loaded. Call this method to perform a split. Do not override this method, but override test_validation_split() below.

Parameters:split_ratio (float) – the proportion of positive packets used as test rather than validation in a split.
test_validation_split(split_ratio)[source]

Perform a split of positive test packets into test and validation sets if required by the strategy. Override this method if split required, otherwise, keep it returning a tuple of empty lists as followed.

Parameters:split_ratio (float) – passed in from split_pt()
Returns:tuple(test_packets, validation_packets)