opensextant.FlexPat
1# -*- coding: utf-8 -*- 2import os 3import re 4 5from opensextant import TextMatch, Extractor, reduce_matches 6 7 8def resource_for(resource_name): 9 """ 10 11 :param resource_name: name of a file in your resource path; Default: opensextant/resources/NAME 12 :return: file path. 13 """ 14 import opensextant 15 libdir = os.path.dirname(opensextant.__file__) 16 container = os.path.join(libdir, "resources") 17 fpath = os.path.join(container, resource_name) 18 if os.path.exists(fpath): 19 return fpath 20 else: 21 raise Exception("FileNotFound: Resource not found where expected at " + fpath) 22 23 24def class_for(full_classname, instantiate=True): 25 """ 26 27 :param full_classname: 28 :param instantiate: True if you wish the found class return an instance. 29 :return: Class obj or Class instance. 30 """ 31 from importlib import import_module 32 segments = full_classname.split('.') 33 clsname = segments[-1] 34 modname = '.'.join(segments[:-1]) 35 mod = import_module(modname) 36 clz = getattr(mod, clsname) 37 if not clz: 38 raise Exception("Class not found for " + full_classname) 39 if instantiate: 40 return clz() 41 else: 42 return clz 43 44 45class RegexPattern: 46 def __init__(self, fam, pid, desc): 47 self.family = fam 48 self.id = pid 49 self.description = desc 50 # Pattern 51 self.regex = None 52 # Ordered group-name list with slots in pattern. 53 self.regex_groups = [] 54 self.version = None 55 self.enabled = False 56 self.match_classname = None 57 self.match_class = None 58 59 def __str__(self): 60 return "{}, Pattern: {}".format(self.id, self.regex) 61 62 63class PatternMatch(TextMatch): 64 """ 65 A general Pattern-based TextMatch. 66 This Python variation consolidates PoliMatch (patterns-of-life = poli) ideas in the Java API. 67 """ 68 69 UPPER_CASE = 1 70 LOWER_CASE = 2 71 FOUND_CASE = 0 72 73 def __init__(self, *args, pattern_id=None, label=None, match_groups=None): 74 TextMatch.__init__(self, *args, label=label) 75 # Normalized text match is NONE until normalize() is run. 76 self.textnorm = None 77 self.pattern_id = pattern_id 78 self.case = PatternMatch.FOUND_CASE 79 self.match_groups = match_groups 80 self.variant_id = None 81 self.is_valid = True 82 self.confidence = -1 83 # PERFORMANCE flag: omit = True to never return the match value. 84 # It could be filtered out and returned. But omit means we never see it. 85 self.omit = False 86 87 # Optionally -- back fill as much surrounding text as you want for 88 # normalizer/validator routines. Use pre_text, post_text 89 self.pre_text = None 90 self.post_text = None 91 if self.pattern_id and "-" in self.pattern_id: 92 self.variant_id = self.pattern_id.split("-", 1)[1] 93 94 def __str__(self): 95 return f"({self.label}) {self.text}" 96 97 def copy_attrs(self, arr): 98 """ 99 Default copy of match group slots. Does not work for every situation. 100 :param arr: 101 :return: 102 """ 103 for k in arr: 104 val = self.get_value(k) 105 if val: 106 self.attrs[k] = val 107 108 def add_surrounding_text(self, text, text_len, length=16): 109 """ 110 Given this match's span and the text it was derived from, 111 populate pre_text, post_text with some # of chars specified by length. 112 113 :param text: The text in which this match was found. 114 :param text_len: the length of the text buffer. (avoid repeating len(text)) 115 :param length: the pre/post text length to attach. 116 :return: 117 """ 118 if self.start > 0: 119 x1 = self.start - length 120 if x1 < 0: 121 x1 = 0 122 self.pre_text = text[x1:self.start] 123 if self.end > 0: 124 x1 = self.end + length 125 if x1 > text_len: 126 x1 = text_len 127 self.post_text = text[self.end:x1] 128 129 def attributes(self): 130 """ 131 Render domain details to meaningful exported view of the data. 132 :return: 133 """ 134 default_attrs = {"method": self.pattern_id} 135 for (k, v, x1, x2) in self.match_groups: 136 default_attrs[k] = v 137 return default_attrs 138 139 def normalize(self): 140 if not self.text: 141 return 142 143 self.textnorm = self.text.strip() 144 if self.case == PatternMatch.UPPER_CASE: 145 self.textnorm = self.textnorm.upper() 146 elif self.case == PatternMatch.LOWER_CASE: 147 self.textnorm = self.textnorm.lower() 148 149 def get_value(self, k): 150 """ 151 Get Slot value -- returns first one. 152 :param k: 153 :return: 154 """ 155 grp = get_slot(self.match_groups, k) 156 if grp: 157 # tuple is group_name, value, start, end. Return value: 158 return grp[1] 159 return None 160 161 162def get_slot(grps, k): 163 """ 164 Given array of match groups, return first key matching 165 :param grps: 166 :param k: 167 :return: tuple matching. 168 """ 169 for g in grps: 170 key, v, x1, x2 = g 171 if key == k: 172 return g 173 return None 174 175 176class PatternTestCase: 177 def __init__(self, tid, family, text): 178 self.id = tid 179 self.family = family 180 self.text = text 181 self.true_positive = True 182 183 184def get_config_file(cfg, modfile): 185 """ 186 Locate a resource file that is collocated with the python module, e.g., get_config_file("file.cfg", __file__) 187 :param cfg: 188 :param modfile: 189 :return: 190 """ 191 pkgdir = os.path.dirname(os.path.abspath(modfile)) 192 patterns_file = os.path.join(pkgdir, cfg) 193 if os.path.exists(patterns_file): 194 return patterns_file 195 raise FileNotFoundError("No such file {} at {}".format(cfg, patterns_file)) 196 197 198class RegexPatternManager: 199 """ 200 RegexPatternManager is the patterns configuration file parser. 201 See documentation: https://opensextant.github.io/Xponents/doc/Patterns.md 202 203 """ 204 205 def __init__(self, patterns_cfg, module_file=None, debug=False, testing=False): 206 self.families = set([]) 207 self.patterns = {} 208 self.patterns_file = patterns_cfg 209 if module_file: 210 # Resolve this absolute path now. 211 self.patterns_file = get_config_file(patterns_cfg, module_file) 212 213 self.patterns_file_path = None 214 self.test_cases = [] 215 self.matcher_classes = {} 216 # CUSTOM: For mapping Python and Java classes internally. Experimental. 217 self.match_class_registry = {} 218 self.testing = testing 219 self.debug = debug 220 self._initialize() 221 222 def get_pattern(self, pid): 223 return self.patterns.get(pid) 224 225 def create_pattern(self, fam, rule, desc): 226 """ Override pattern class creation as needed. 227 """ 228 return RegexPattern(fam, "{}-{}".format(fam, rule), desc) 229 230 def create_testcase(self, tid, fam, text): 231 return PatternTestCase(tid, fam, text) 232 233 def validate_pattern(self, repat): 234 """Default validation is True 235 Override this if necessary, e.g., pattern implementation has additional metadata 236 """ 237 return repat is not None 238 239 def enable_all(self): 240 for k in self.patterns: 241 pat = self.patterns[k] 242 pat.enabled = True 243 244 def disable_all(self): 245 for k in self.patterns: 246 pat = self.patterns[k] 247 pat.enabled = False 248 249 def set_enabled(self, some: str, flag: bool): 250 """ 251 set family enabled or not 252 :param some: prefix of a family or family-variant 253 :param flag: bool setting 254 :return: 255 """ 256 for k in self.patterns: 257 pat = self.patterns[k] 258 if pat.id.startswith(some): 259 pat.enabled = flag 260 261 def _initialize(self): 262 """ 263 :raise Exception if item not found. 264 :return: 265 """ 266 self.patterns = {} 267 268 # the # RULE statements as name and a sequence of DEFINES and regex bits 269 defines = {} 270 rules = {} 271 # Preserve order 272 rule_order = [] 273 # Record pattern setup and validation messages 274 configMessages = [] 275 276 config_fpath = self.patterns_file 277 if not os.path.exists(self.patterns_file): 278 config_fpath = resource_for(self.patterns_file) 279 280 # By now we have tried the given file path, inferred a path local to the calling module 281 # and lastly tried a resource folder in opensextant/resource/ data. 282 if not os.path.exists(config_fpath): 283 raise FileNotFoundError("Tried various absolute and inferred paths for the file '{}'".format( 284 os.path.basename(self.patterns_file))) 285 286 # PY3: 287 with open(config_fpath, "r", encoding="UTF-8") as fh: 288 testcount = 0 289 for line in fh: 290 stmt = line.strip() 291 if line.startswith("#DEFINE"): 292 # #DEFINE<tab><defineName><tab><definePattern> 293 fields = re.split("[\t ]+", stmt, 2) 294 defines[fields[1]] = fields[2] 295 elif line.startswith("#RULE"): 296 # #RULE<tab><rule_fam><tab><rule_id><tab><pattern> 297 fields = re.split("[\t ]+", stmt, 3) 298 299 fam = fields[1] 300 ruleEnum = fields[2] 301 rulePattern = fields[3] 302 ruleKey = fam + "-" + ruleEnum 303 304 # if already a rule by that name, error 305 if ruleKey in rules: 306 raise Exception("FlexPat Config Error - Duplicate rule name " + ruleEnum) 307 308 rules[ruleKey] = rulePattern 309 rule_order.append(ruleKey) 310 elif self.testing and stmt.startswith("#TEST"): 311 fields = re.split("[\t ]+", stmt, 3) 312 testcount += 1 313 314 fam = fields[1] 315 ruleEnum = fields[2] 316 testtext = fields[3].strip().replace("$NL", "\n") 317 ruleKey = fam + "-" + ruleEnum 318 319 # testcount is a count of all tests, not just test within a rule family 320 testKey = "{}#{}".format(ruleKey, testcount) 321 self.test_cases.append(self.create_testcase(testKey, fam, testtext)) 322 elif stmt.startswith("#CLASS"): 323 fields = re.split("[\t ]+", stmt, 2) 324 fam = fields[1] 325 self.matcher_classes[fam] = fields[2] 326 else: 327 pass 328 329 elementRegex = "<[a-zA-Z0-9_]+>" 330 elementPattern = re.compile(elementRegex) 331 332 for tmpkey in rule_order: 333 tmpRulePattern = rules.get(tmpkey) 334 fam, rule_name = tmpkey.split("-", 1) 335 self.families.add(fam) 336 337 pat = self.create_pattern(fam, rule_name, "No Description yet...") 338 if fam in self.matcher_classes: 339 try: 340 pat.match_classname = self.matcher_classes.get(fam) 341 if pat.match_classname in self.match_class_registry: 342 # rename. Map Java class to a Python class. 343 pat.match_classname = self.match_class_registry[pat.match_classname] 344 345 # Do not instantiate, just find the class named in config file. 346 pat.match_class = class_for(pat.match_classname, instantiate=False) 347 except Exception as err: 348 print(err) 349 350 # find all of the element definitions within the pattern 351 groupNum = 1 352 for m in elementPattern.finditer(tmpRulePattern): 353 e1 = m.start() 354 e2 = m.end() 355 elementName = tmpRulePattern[e1 + 1: e2 - 1] 356 pat.regex_groups.append(elementName) 357 358 if self.debug: 359 subelementPattern = defines.get(elementName) 360 configMessages.append("\n\t") 361 configMessages.append("{} {} = {}".format(groupNum, elementName, subelementPattern)) 362 groupNum += 1 363 364 for slot_name in set(pat.regex_groups): 365 if slot_name not in defines: 366 raise Exception("Slot definition is not DEFINED for " + slot_name) 367 368 tmpDef = defines[slot_name] 369 # NOTE: Use of parens, "(expr)", is required to create groups within a pattern. 370 tmpDefPattern = "({})".format(tmpDef) 371 tmpDefSlot = "<{}>".format(slot_name) 372 # Replaces all. 373 tmpRulePattern = tmpRulePattern.replace(tmpDefSlot, tmpDefPattern) 374 375 if self.debug: 376 configMessages.append("\nrulepattern=" + tmpRulePattern) 377 378 pat.regex = re.compile(tmpRulePattern, re.IGNORECASE) 379 pat.enabled = True 380 self.patterns[pat.id] = pat 381 if not self.validate_pattern(pat): 382 raise Exception("Invalid Pattern " + str(pat)) 383 384 if self.debug: 385 configMessages.append("\nFound # of PATTERNS={}".format(len(self.patterns))) 386 387 388def _digest_sub_groups(m, pattern_groups): 389 """ 390 Reorganize regex groups internally. 391 :param pattern_groups: ordered list of groups as they appear in RE 392 :return: array only found item tuples: (group, value, start, end) 393 """ 394 count = 0 395 slots = [] 396 glen = len(pattern_groups) 397 for found in m.groups(): 398 if count > glen: 399 raise Exception("Unexpected -- more slots found than groups in pattern.") 400 slot_name = pattern_groups[count] 401 slot = (slot_name, found, m.start(count + 1), m.end(count + 1)) 402 slots.append(slot) 403 count += 1 404 405 return slots 406 407 408class PatternExtractor(Extractor): 409 """ 410 Discussion: Read first https://opensextant.github.io/Xponents/doc/Patterns.md 411 412 Example: 413 ``` 414 from opensextant.extractors.poli import PatternsOfLifeManager 415 from opensextant.FlexPat import PatternExtractor 416 417 # INIT 418 #===================== 419 # Invoke a particular REGEX rule set, here poli_patterns.cfg 420 # @see https://github.com/OpenSextant/Xponents/blob/master/Core/src/main/resources/poli_patterns.cfg 421 mgr = PatternsOfLifeManager("poli_patterns.cfg") 422 pex = PatternExtractor(mgr) 423 424 # DEV/TEST 425 #===================== 426 # "default_test()" is useful to run during development and 427 # encourages you to capture critical pattern variants in your "TEST" data. 428 # Look at your pass/fail situations -- what test cases are failing your rule? 429 test_results = pex.default_tests() 430 print("TEST RESULTS") 431 for result in test_results: 432 print(repr(result)) 433 434 # RUN 435 #===================== 436 real_results = pex.extract(".... text blob 1-800-123-4567...") 437 print("REAL RESULTS") 438 for result in real_results: 439 print(repr(result)) 440 print("\tRAW DICT:", render_match(result)) 441 ``` 442 """ 443 444 def __init__(self, pattern_manager): 445 """ 446 invoke RegexPatternManager(your_cfg_file) or implement a custom RegexPatternManager (rare). 447 NOTE - `PatternsOfLifeManager` is a particular subclass of RegexPatternManager becuase 448 it is manipulating the input patterns config file which is shared with the Java demo. 449 The `CLASS` names unfortunately are specific to Python or Java. 450 451 :param pattern_manager: RegexPatternManager 452 """ 453 Extractor.__init__(self) 454 self.id = "xpx" 455 self.name = "Xponents Pattern Extractor" 456 self.pattern_manager = pattern_manager 457 458 def extract(self, text, **kwargs): 459 """ Default Extractor API. """ 460 return self.extract_patterns(text, **kwargs) 461 462 def extract_patterns(self, text, **kwargs): 463 """ 464 Given some text input, apply all relevant pattern families against the text. 465 Surrounding text is added to each match for post-processing. 466 :param text: 467 :param kwargs: 468 :return: 469 """ 470 features = kwargs.get("features") 471 if not features: 472 features = self.pattern_manager.families 473 474 tlen = len(text) 475 results = [] 476 for fam in features: 477 if fam not in self.pattern_manager.families: 478 raise Exception("Uknown Pattern Family " + fam) 479 480 for pat_id in self.pattern_manager.patterns: 481 pat = self.pattern_manager.patterns[pat_id] 482 if not pat.family == fam: 483 continue 484 if not pat.enabled: 485 continue 486 487 for m in pat.regex.finditer(text): 488 digested_groups = _digest_sub_groups(m, pat.regex_groups) 489 if pat.match_class: 490 domainObj = pat.match_class(m.group(), m.start(), m.end(), 491 pattern_id=pat.id, 492 label=pat.family, 493 match_groups=digested_groups) 494 # surrounding text may be used by normalization and validation 495 domainObj.add_surrounding_text(text, tlen, length=20) 496 domainObj.normalize() 497 if not domainObj.omit: 498 results.append(domainObj) 499 else: 500 genericObj = PatternMatch(m.group(), m.start(), m.end(), 501 pattern_id=pat.id, 502 label=pat.family, 503 match_groups=digested_groups) 504 genericObj.add_surrounding_text(text, tlen, length=20) 505 results.append(genericObj) 506 507 # Determine if any matches are redundant. Mark redundancies as "filtered out". 508 reduce_matches(results) 509 for r in results: 510 if r.is_duplicate or r.is_submatch: 511 r.filtered_out = True 512 513 return results 514 515 def default_tests(self, scope="rule"): 516 """ 517 Default Tests run all TEST cases for each RULE in patterns config. 518 TESTs marked with a 'FAIL' comment are intended to return 0 matches or only matches that are filtered out. 519 Otherwise a TEST is intended to return 1 or more matches. 520 521 By default, this runs each test and observes only results that were triggered by that rule being tested. 522 If scope is "ruleset" then any results from any rule will be allowed. 523 "rule" scope is much better for detailed rule development as it tells you if your rule tests are testing the 524 right thing. 525 526 Runs the default tests on the provided configuration. Plenty of debug printed to screen. 527 But returns the test results as an array, e.g., to write to CSV for review. 528 This uses PatternExtractor.extract_patterns() to avoid any collision with the generic use 529 of Extractor.extract() parent method. 530 :param scope: rule or ruleset. Rule scope means only results for rule test case are evaluated. 531 ruleset scope means that all results for a test are evaluated. 532 :return: test results array; Each result represents a TEST case run against a RULE 533 """ 534 test_results = [] 535 for t in self.pattern_manager.test_cases: 536 expect_valid_match = "FAIL" not in t.text 537 print("Test", t.family, t.text) 538 output1 = self.extract_patterns(t.text, features=[t.family]) 539 540 output = [] 541 for m in output1: 542 if scope == "rule" and not t.id.startswith(m.pattern_id): 543 continue 544 output.append(m) 545 546 # Determine if pattern matched true positive or false positive. 547 # To condition the TP or FP based on the matches 548 # keep a running tally of whether each match is filtered or not. 549 # That is, for many matches True Positive = at least one unfiltered match is needed, AND was expected. 550 # for many matches False Positive = at least one unfiltered match is needed, AND was NOT expected. 551 fpcount = 0 552 tpcount = 0 553 for m in output: 554 allowed = not m.filtered_out or (m.is_duplicate and m.filtered_out) 555 if expect_valid_match and allowed: 556 tpcount += 1 557 if not expect_valid_match and allowed: 558 fpcount += 1 559 560 tp = tpcount > 0 and expect_valid_match 561 fp = fpcount > 0 and not expect_valid_match 562 tn = fpcount == 0 and not expect_valid_match 563 fn = tpcount == 0 and expect_valid_match 564 success = (tp or tn) and not (fp or fn) 565 test_results.append({"TEST": t.id, 566 "TEXT": t.text, 567 "MATCHES": output, 568 "PASS": success}) 569 570 return test_results 571 572 573def print_test(result: dict): 574 """ print the structure from default_tests() 575 """ 576 if not result: 577 return 578 579 tid = result["TEST"] 580 txt = result["TEXT"] 581 res = result["PASS"] 582 matches = "<None>" 583 if result["MATCHES"]: 584 arr = result["MATCHES"] 585 matches = ";".join([match.text for match in arr]) 586 print(f"TEST: {tid}, TEXT: {txt} PASS:{res}\tMATCHES: {matches}")
9def resource_for(resource_name): 10 """ 11 12 :param resource_name: name of a file in your resource path; Default: opensextant/resources/NAME 13 :return: file path. 14 """ 15 import opensextant 16 libdir = os.path.dirname(opensextant.__file__) 17 container = os.path.join(libdir, "resources") 18 fpath = os.path.join(container, resource_name) 19 if os.path.exists(fpath): 20 return fpath 21 else: 22 raise Exception("FileNotFound: Resource not found where expected at " + fpath)
:param resource_name: name of a file in your resource path; Default: opensextant/resources/NAME :return: file path.
25def class_for(full_classname, instantiate=True): 26 """ 27 28 :param full_classname: 29 :param instantiate: True if you wish the found class return an instance. 30 :return: Class obj or Class instance. 31 """ 32 from importlib import import_module 33 segments = full_classname.split('.') 34 clsname = segments[-1] 35 modname = '.'.join(segments[:-1]) 36 mod = import_module(modname) 37 clz = getattr(mod, clsname) 38 if not clz: 39 raise Exception("Class not found for " + full_classname) 40 if instantiate: 41 return clz() 42 else: 43 return clz
:param full_classname: :param instantiate: True if you wish the found class return an instance. :return: Class obj or Class instance.
64class PatternMatch(TextMatch): 65 """ 66 A general Pattern-based TextMatch. 67 This Python variation consolidates PoliMatch (patterns-of-life = poli) ideas in the Java API. 68 """ 69 70 UPPER_CASE = 1 71 LOWER_CASE = 2 72 FOUND_CASE = 0 73 74 def __init__(self, *args, pattern_id=None, label=None, match_groups=None): 75 TextMatch.__init__(self, *args, label=label) 76 # Normalized text match is NONE until normalize() is run. 77 self.textnorm = None 78 self.pattern_id = pattern_id 79 self.case = PatternMatch.FOUND_CASE 80 self.match_groups = match_groups 81 self.variant_id = None 82 self.is_valid = True 83 self.confidence = -1 84 # PERFORMANCE flag: omit = True to never return the match value. 85 # It could be filtered out and returned. But omit means we never see it. 86 self.omit = False 87 88 # Optionally -- back fill as much surrounding text as you want for 89 # normalizer/validator routines. Use pre_text, post_text 90 self.pre_text = None 91 self.post_text = None 92 if self.pattern_id and "-" in self.pattern_id: 93 self.variant_id = self.pattern_id.split("-", 1)[1] 94 95 def __str__(self): 96 return f"({self.label}) {self.text}" 97 98 def copy_attrs(self, arr): 99 """ 100 Default copy of match group slots. Does not work for every situation. 101 :param arr: 102 :return: 103 """ 104 for k in arr: 105 val = self.get_value(k) 106 if val: 107 self.attrs[k] = val 108 109 def add_surrounding_text(self, text, text_len, length=16): 110 """ 111 Given this match's span and the text it was derived from, 112 populate pre_text, post_text with some # of chars specified by length. 113 114 :param text: The text in which this match was found. 115 :param text_len: the length of the text buffer. (avoid repeating len(text)) 116 :param length: the pre/post text length to attach. 117 :return: 118 """ 119 if self.start > 0: 120 x1 = self.start - length 121 if x1 < 0: 122 x1 = 0 123 self.pre_text = text[x1:self.start] 124 if self.end > 0: 125 x1 = self.end + length 126 if x1 > text_len: 127 x1 = text_len 128 self.post_text = text[self.end:x1] 129 130 def attributes(self): 131 """ 132 Render domain details to meaningful exported view of the data. 133 :return: 134 """ 135 default_attrs = {"method": self.pattern_id} 136 for (k, v, x1, x2) in self.match_groups: 137 default_attrs[k] = v 138 return default_attrs 139 140 def normalize(self): 141 if not self.text: 142 return 143 144 self.textnorm = self.text.strip() 145 if self.case == PatternMatch.UPPER_CASE: 146 self.textnorm = self.textnorm.upper() 147 elif self.case == PatternMatch.LOWER_CASE: 148 self.textnorm = self.textnorm.lower() 149 150 def get_value(self, k): 151 """ 152 Get Slot value -- returns first one. 153 :param k: 154 :return: 155 """ 156 grp = get_slot(self.match_groups, k) 157 if grp: 158 # tuple is group_name, value, start, end. Return value: 159 return grp[1] 160 return None
A general Pattern-based TextMatch. This Python variation consolidates PoliMatch (patterns-of-life = poli) ideas in the Java API.
98 def copy_attrs(self, arr): 99 """ 100 Default copy of match group slots. Does not work for every situation. 101 :param arr: 102 :return: 103 """ 104 for k in arr: 105 val = self.get_value(k) 106 if val: 107 self.attrs[k] = val
Default copy of match group slots. Does not work for every situation. :param arr: :return:
109 def add_surrounding_text(self, text, text_len, length=16): 110 """ 111 Given this match's span and the text it was derived from, 112 populate pre_text, post_text with some # of chars specified by length. 113 114 :param text: The text in which this match was found. 115 :param text_len: the length of the text buffer. (avoid repeating len(text)) 116 :param length: the pre/post text length to attach. 117 :return: 118 """ 119 if self.start > 0: 120 x1 = self.start - length 121 if x1 < 0: 122 x1 = 0 123 self.pre_text = text[x1:self.start] 124 if self.end > 0: 125 x1 = self.end + length 126 if x1 > text_len: 127 x1 = text_len 128 self.post_text = text[self.end:x1]
Given this match's span and the text it was derived from, populate pre_text, post_text with some # of chars specified by length.
:param text: The text in which this match was found. :param text_len: the length of the text buffer. (avoid repeating len(text)) :param length: the pre/post text length to attach. :return:
130 def attributes(self): 131 """ 132 Render domain details to meaningful exported view of the data. 133 :return: 134 """ 135 default_attrs = {"method": self.pattern_id} 136 for (k, v, x1, x2) in self.match_groups: 137 default_attrs[k] = v 138 return default_attrs
Render domain details to meaningful exported view of the data. :return:
140 def normalize(self): 141 if not self.text: 142 return 143 144 self.textnorm = self.text.strip() 145 if self.case == PatternMatch.UPPER_CASE: 146 self.textnorm = self.textnorm.upper() 147 elif self.case == PatternMatch.LOWER_CASE: 148 self.textnorm = self.textnorm.lower()
Optional, but recommended routine to normalize the matched data. That is, parse fields, uppercase, streamline punctuation, etc. As well, given such normalization result, this is the opportunity to additionally validate the match. :return:
150 def get_value(self, k): 151 """ 152 Get Slot value -- returns first one. 153 :param k: 154 :return: 155 """ 156 grp = get_slot(self.match_groups, k) 157 if grp: 158 # tuple is group_name, value, start, end. Return value: 159 return grp[1] 160 return None
Get Slot value -- returns first one. :param k: :return:
Inherited Members
163def get_slot(grps, k): 164 """ 165 Given array of match groups, return first key matching 166 :param grps: 167 :param k: 168 :return: tuple matching. 169 """ 170 for g in grps: 171 key, v, x1, x2 = g 172 if key == k: 173 return g 174 return None
Given array of match groups, return first key matching :param grps: :param k: :return: tuple matching.
185def get_config_file(cfg, modfile): 186 """ 187 Locate a resource file that is collocated with the python module, e.g., get_config_file("file.cfg", __file__) 188 :param cfg: 189 :param modfile: 190 :return: 191 """ 192 pkgdir = os.path.dirname(os.path.abspath(modfile)) 193 patterns_file = os.path.join(pkgdir, cfg) 194 if os.path.exists(patterns_file): 195 return patterns_file 196 raise FileNotFoundError("No such file {} at {}".format(cfg, patterns_file))
Locate a resource file that is collocated with the python module, e.g., get_config_file("file.cfg", __file__) :param cfg: :param modfile: :return:
199class RegexPatternManager: 200 """ 201 RegexPatternManager is the patterns configuration file parser. 202 See documentation: https://opensextant.github.io/Xponents/doc/Patterns.md 203 204 """ 205 206 def __init__(self, patterns_cfg, module_file=None, debug=False, testing=False): 207 self.families = set([]) 208 self.patterns = {} 209 self.patterns_file = patterns_cfg 210 if module_file: 211 # Resolve this absolute path now. 212 self.patterns_file = get_config_file(patterns_cfg, module_file) 213 214 self.patterns_file_path = None 215 self.test_cases = [] 216 self.matcher_classes = {} 217 # CUSTOM: For mapping Python and Java classes internally. Experimental. 218 self.match_class_registry = {} 219 self.testing = testing 220 self.debug = debug 221 self._initialize() 222 223 def get_pattern(self, pid): 224 return self.patterns.get(pid) 225 226 def create_pattern(self, fam, rule, desc): 227 """ Override pattern class creation as needed. 228 """ 229 return RegexPattern(fam, "{}-{}".format(fam, rule), desc) 230 231 def create_testcase(self, tid, fam, text): 232 return PatternTestCase(tid, fam, text) 233 234 def validate_pattern(self, repat): 235 """Default validation is True 236 Override this if necessary, e.g., pattern implementation has additional metadata 237 """ 238 return repat is not None 239 240 def enable_all(self): 241 for k in self.patterns: 242 pat = self.patterns[k] 243 pat.enabled = True 244 245 def disable_all(self): 246 for k in self.patterns: 247 pat = self.patterns[k] 248 pat.enabled = False 249 250 def set_enabled(self, some: str, flag: bool): 251 """ 252 set family enabled or not 253 :param some: prefix of a family or family-variant 254 :param flag: bool setting 255 :return: 256 """ 257 for k in self.patterns: 258 pat = self.patterns[k] 259 if pat.id.startswith(some): 260 pat.enabled = flag 261 262 def _initialize(self): 263 """ 264 :raise Exception if item not found. 265 :return: 266 """ 267 self.patterns = {} 268 269 # the # RULE statements as name and a sequence of DEFINES and regex bits 270 defines = {} 271 rules = {} 272 # Preserve order 273 rule_order = [] 274 # Record pattern setup and validation messages 275 configMessages = [] 276 277 config_fpath = self.patterns_file 278 if not os.path.exists(self.patterns_file): 279 config_fpath = resource_for(self.patterns_file) 280 281 # By now we have tried the given file path, inferred a path local to the calling module 282 # and lastly tried a resource folder in opensextant/resource/ data. 283 if not os.path.exists(config_fpath): 284 raise FileNotFoundError("Tried various absolute and inferred paths for the file '{}'".format( 285 os.path.basename(self.patterns_file))) 286 287 # PY3: 288 with open(config_fpath, "r", encoding="UTF-8") as fh: 289 testcount = 0 290 for line in fh: 291 stmt = line.strip() 292 if line.startswith("#DEFINE"): 293 # #DEFINE<tab><defineName><tab><definePattern> 294 fields = re.split("[\t ]+", stmt, 2) 295 defines[fields[1]] = fields[2] 296 elif line.startswith("#RULE"): 297 # #RULE<tab><rule_fam><tab><rule_id><tab><pattern> 298 fields = re.split("[\t ]+", stmt, 3) 299 300 fam = fields[1] 301 ruleEnum = fields[2] 302 rulePattern = fields[3] 303 ruleKey = fam + "-" + ruleEnum 304 305 # if already a rule by that name, error 306 if ruleKey in rules: 307 raise Exception("FlexPat Config Error - Duplicate rule name " + ruleEnum) 308 309 rules[ruleKey] = rulePattern 310 rule_order.append(ruleKey) 311 elif self.testing and stmt.startswith("#TEST"): 312 fields = re.split("[\t ]+", stmt, 3) 313 testcount += 1 314 315 fam = fields[1] 316 ruleEnum = fields[2] 317 testtext = fields[3].strip().replace("$NL", "\n") 318 ruleKey = fam + "-" + ruleEnum 319 320 # testcount is a count of all tests, not just test within a rule family 321 testKey = "{}#{}".format(ruleKey, testcount) 322 self.test_cases.append(self.create_testcase(testKey, fam, testtext)) 323 elif stmt.startswith("#CLASS"): 324 fields = re.split("[\t ]+", stmt, 2) 325 fam = fields[1] 326 self.matcher_classes[fam] = fields[2] 327 else: 328 pass 329 330 elementRegex = "<[a-zA-Z0-9_]+>" 331 elementPattern = re.compile(elementRegex) 332 333 for tmpkey in rule_order: 334 tmpRulePattern = rules.get(tmpkey) 335 fam, rule_name = tmpkey.split("-", 1) 336 self.families.add(fam) 337 338 pat = self.create_pattern(fam, rule_name, "No Description yet...") 339 if fam in self.matcher_classes: 340 try: 341 pat.match_classname = self.matcher_classes.get(fam) 342 if pat.match_classname in self.match_class_registry: 343 # rename. Map Java class to a Python class. 344 pat.match_classname = self.match_class_registry[pat.match_classname] 345 346 # Do not instantiate, just find the class named in config file. 347 pat.match_class = class_for(pat.match_classname, instantiate=False) 348 except Exception as err: 349 print(err) 350 351 # find all of the element definitions within the pattern 352 groupNum = 1 353 for m in elementPattern.finditer(tmpRulePattern): 354 e1 = m.start() 355 e2 = m.end() 356 elementName = tmpRulePattern[e1 + 1: e2 - 1] 357 pat.regex_groups.append(elementName) 358 359 if self.debug: 360 subelementPattern = defines.get(elementName) 361 configMessages.append("\n\t") 362 configMessages.append("{} {} = {}".format(groupNum, elementName, subelementPattern)) 363 groupNum += 1 364 365 for slot_name in set(pat.regex_groups): 366 if slot_name not in defines: 367 raise Exception("Slot definition is not DEFINED for " + slot_name) 368 369 tmpDef = defines[slot_name] 370 # NOTE: Use of parens, "(expr)", is required to create groups within a pattern. 371 tmpDefPattern = "({})".format(tmpDef) 372 tmpDefSlot = "<{}>".format(slot_name) 373 # Replaces all. 374 tmpRulePattern = tmpRulePattern.replace(tmpDefSlot, tmpDefPattern) 375 376 if self.debug: 377 configMessages.append("\nrulepattern=" + tmpRulePattern) 378 379 pat.regex = re.compile(tmpRulePattern, re.IGNORECASE) 380 pat.enabled = True 381 self.patterns[pat.id] = pat 382 if not self.validate_pattern(pat): 383 raise Exception("Invalid Pattern " + str(pat)) 384 385 if self.debug: 386 configMessages.append("\nFound # of PATTERNS={}".format(len(self.patterns)))
RegexPatternManager is the patterns configuration file parser. See documentation: https://opensextant.github.io/Xponents/doc/Patterns.md
226 def create_pattern(self, fam, rule, desc): 227 """ Override pattern class creation as needed. 228 """ 229 return RegexPattern(fam, "{}-{}".format(fam, rule), desc)
Override pattern class creation as needed.
234 def validate_pattern(self, repat): 235 """Default validation is True 236 Override this if necessary, e.g., pattern implementation has additional metadata 237 """ 238 return repat is not None
Default validation is True Override this if necessary, e.g., pattern implementation has additional metadata
250 def set_enabled(self, some: str, flag: bool): 251 """ 252 set family enabled or not 253 :param some: prefix of a family or family-variant 254 :param flag: bool setting 255 :return: 256 """ 257 for k in self.patterns: 258 pat = self.patterns[k] 259 if pat.id.startswith(some): 260 pat.enabled = flag
set family enabled or not :param some: prefix of a family or family-variant :param flag: bool setting :return:
409class PatternExtractor(Extractor): 410 """ 411 Discussion: Read first https://opensextant.github.io/Xponents/doc/Patterns.md 412 413 Example: 414 ``` 415 from opensextant.extractors.poli import PatternsOfLifeManager 416 from opensextant.FlexPat import PatternExtractor 417 418 # INIT 419 #===================== 420 # Invoke a particular REGEX rule set, here poli_patterns.cfg 421 # @see https://github.com/OpenSextant/Xponents/blob/master/Core/src/main/resources/poli_patterns.cfg 422 mgr = PatternsOfLifeManager("poli_patterns.cfg") 423 pex = PatternExtractor(mgr) 424 425 # DEV/TEST 426 #===================== 427 # "default_test()" is useful to run during development and 428 # encourages you to capture critical pattern variants in your "TEST" data. 429 # Look at your pass/fail situations -- what test cases are failing your rule? 430 test_results = pex.default_tests() 431 print("TEST RESULTS") 432 for result in test_results: 433 print(repr(result)) 434 435 # RUN 436 #===================== 437 real_results = pex.extract(".... text blob 1-800-123-4567...") 438 print("REAL RESULTS") 439 for result in real_results: 440 print(repr(result)) 441 print("\tRAW DICT:", render_match(result)) 442 ``` 443 """ 444 445 def __init__(self, pattern_manager): 446 """ 447 invoke RegexPatternManager(your_cfg_file) or implement a custom RegexPatternManager (rare). 448 NOTE - `PatternsOfLifeManager` is a particular subclass of RegexPatternManager becuase 449 it is manipulating the input patterns config file which is shared with the Java demo. 450 The `CLASS` names unfortunately are specific to Python or Java. 451 452 :param pattern_manager: RegexPatternManager 453 """ 454 Extractor.__init__(self) 455 self.id = "xpx" 456 self.name = "Xponents Pattern Extractor" 457 self.pattern_manager = pattern_manager 458 459 def extract(self, text, **kwargs): 460 """ Default Extractor API. """ 461 return self.extract_patterns(text, **kwargs) 462 463 def extract_patterns(self, text, **kwargs): 464 """ 465 Given some text input, apply all relevant pattern families against the text. 466 Surrounding text is added to each match for post-processing. 467 :param text: 468 :param kwargs: 469 :return: 470 """ 471 features = kwargs.get("features") 472 if not features: 473 features = self.pattern_manager.families 474 475 tlen = len(text) 476 results = [] 477 for fam in features: 478 if fam not in self.pattern_manager.families: 479 raise Exception("Uknown Pattern Family " + fam) 480 481 for pat_id in self.pattern_manager.patterns: 482 pat = self.pattern_manager.patterns[pat_id] 483 if not pat.family == fam: 484 continue 485 if not pat.enabled: 486 continue 487 488 for m in pat.regex.finditer(text): 489 digested_groups = _digest_sub_groups(m, pat.regex_groups) 490 if pat.match_class: 491 domainObj = pat.match_class(m.group(), m.start(), m.end(), 492 pattern_id=pat.id, 493 label=pat.family, 494 match_groups=digested_groups) 495 # surrounding text may be used by normalization and validation 496 domainObj.add_surrounding_text(text, tlen, length=20) 497 domainObj.normalize() 498 if not domainObj.omit: 499 results.append(domainObj) 500 else: 501 genericObj = PatternMatch(m.group(), m.start(), m.end(), 502 pattern_id=pat.id, 503 label=pat.family, 504 match_groups=digested_groups) 505 genericObj.add_surrounding_text(text, tlen, length=20) 506 results.append(genericObj) 507 508 # Determine if any matches are redundant. Mark redundancies as "filtered out". 509 reduce_matches(results) 510 for r in results: 511 if r.is_duplicate or r.is_submatch: 512 r.filtered_out = True 513 514 return results 515 516 def default_tests(self, scope="rule"): 517 """ 518 Default Tests run all TEST cases for each RULE in patterns config. 519 TESTs marked with a 'FAIL' comment are intended to return 0 matches or only matches that are filtered out. 520 Otherwise a TEST is intended to return 1 or more matches. 521 522 By default, this runs each test and observes only results that were triggered by that rule being tested. 523 If scope is "ruleset" then any results from any rule will be allowed. 524 "rule" scope is much better for detailed rule development as it tells you if your rule tests are testing the 525 right thing. 526 527 Runs the default tests on the provided configuration. Plenty of debug printed to screen. 528 But returns the test results as an array, e.g., to write to CSV for review. 529 This uses PatternExtractor.extract_patterns() to avoid any collision with the generic use 530 of Extractor.extract() parent method. 531 :param scope: rule or ruleset. Rule scope means only results for rule test case are evaluated. 532 ruleset scope means that all results for a test are evaluated. 533 :return: test results array; Each result represents a TEST case run against a RULE 534 """ 535 test_results = [] 536 for t in self.pattern_manager.test_cases: 537 expect_valid_match = "FAIL" not in t.text 538 print("Test", t.family, t.text) 539 output1 = self.extract_patterns(t.text, features=[t.family]) 540 541 output = [] 542 for m in output1: 543 if scope == "rule" and not t.id.startswith(m.pattern_id): 544 continue 545 output.append(m) 546 547 # Determine if pattern matched true positive or false positive. 548 # To condition the TP or FP based on the matches 549 # keep a running tally of whether each match is filtered or not. 550 # That is, for many matches True Positive = at least one unfiltered match is needed, AND was expected. 551 # for many matches False Positive = at least one unfiltered match is needed, AND was NOT expected. 552 fpcount = 0 553 tpcount = 0 554 for m in output: 555 allowed = not m.filtered_out or (m.is_duplicate and m.filtered_out) 556 if expect_valid_match and allowed: 557 tpcount += 1 558 if not expect_valid_match and allowed: 559 fpcount += 1 560 561 tp = tpcount > 0 and expect_valid_match 562 fp = fpcount > 0 and not expect_valid_match 563 tn = fpcount == 0 and not expect_valid_match 564 fn = tpcount == 0 and expect_valid_match 565 success = (tp or tn) and not (fp or fn) 566 test_results.append({"TEST": t.id, 567 "TEXT": t.text, 568 "MATCHES": output, 569 "PASS": success}) 570 571 return test_results
Discussion: Read first https://opensextant.github.io/Xponents/doc/Patterns.md
Example:
from opensextant.extractors.poli import PatternsOfLifeManager
from opensextant.FlexPat import PatternExtractor
# INIT
#=====================
# Invoke a particular REGEX rule set, here poli_patterns.cfg
# @see https://github.com/OpenSextant/Xponents/blob/master/Core/src/main/resources/poli_patterns.cfg
mgr = PatternsOfLifeManager("poli_patterns.cfg")
pex = PatternExtractor(mgr)
# DEV/TEST
#=====================
# "default_test()" is useful to run during development and
# encourages you to capture critical pattern variants in your "TEST" data.
# Look at your pass/fail situations -- what test cases are failing your rule?
test_results = pex.default_tests()
print("TEST RESULTS")
for result in test_results:
print(repr(result))
# RUN
#=====================
real_results = pex.extract(".... text blob 1-800-123-4567...")
print("REAL RESULTS")
for result in real_results:
print(repr(result))
print(" RAW DICT:", render_match(result))
445 def __init__(self, pattern_manager): 446 """ 447 invoke RegexPatternManager(your_cfg_file) or implement a custom RegexPatternManager (rare). 448 NOTE - `PatternsOfLifeManager` is a particular subclass of RegexPatternManager becuase 449 it is manipulating the input patterns config file which is shared with the Java demo. 450 The `CLASS` names unfortunately are specific to Python or Java. 451 452 :param pattern_manager: RegexPatternManager 453 """ 454 Extractor.__init__(self) 455 self.id = "xpx" 456 self.name = "Xponents Pattern Extractor" 457 self.pattern_manager = pattern_manager
invoke RegexPatternManager(your_cfg_file) or implement a custom RegexPatternManager (rare).
NOTE - PatternsOfLifeManager is a particular subclass of RegexPatternManager becuase
it is manipulating the input patterns config file which is shared with the Java demo.
The CLASS names unfortunately are specific to Python or Java.
:param pattern_manager: RegexPatternManager
459 def extract(self, text, **kwargs): 460 """ Default Extractor API. """ 461 return self.extract_patterns(text, **kwargs)
Default Extractor API.
463 def extract_patterns(self, text, **kwargs): 464 """ 465 Given some text input, apply all relevant pattern families against the text. 466 Surrounding text is added to each match for post-processing. 467 :param text: 468 :param kwargs: 469 :return: 470 """ 471 features = kwargs.get("features") 472 if not features: 473 features = self.pattern_manager.families 474 475 tlen = len(text) 476 results = [] 477 for fam in features: 478 if fam not in self.pattern_manager.families: 479 raise Exception("Uknown Pattern Family " + fam) 480 481 for pat_id in self.pattern_manager.patterns: 482 pat = self.pattern_manager.patterns[pat_id] 483 if not pat.family == fam: 484 continue 485 if not pat.enabled: 486 continue 487 488 for m in pat.regex.finditer(text): 489 digested_groups = _digest_sub_groups(m, pat.regex_groups) 490 if pat.match_class: 491 domainObj = pat.match_class(m.group(), m.start(), m.end(), 492 pattern_id=pat.id, 493 label=pat.family, 494 match_groups=digested_groups) 495 # surrounding text may be used by normalization and validation 496 domainObj.add_surrounding_text(text, tlen, length=20) 497 domainObj.normalize() 498 if not domainObj.omit: 499 results.append(domainObj) 500 else: 501 genericObj = PatternMatch(m.group(), m.start(), m.end(), 502 pattern_id=pat.id, 503 label=pat.family, 504 match_groups=digested_groups) 505 genericObj.add_surrounding_text(text, tlen, length=20) 506 results.append(genericObj) 507 508 # Determine if any matches are redundant. Mark redundancies as "filtered out". 509 reduce_matches(results) 510 for r in results: 511 if r.is_duplicate or r.is_submatch: 512 r.filtered_out = True 513 514 return results
Given some text input, apply all relevant pattern families against the text. Surrounding text is added to each match for post-processing. :param text: :param kwargs: :return:
516 def default_tests(self, scope="rule"): 517 """ 518 Default Tests run all TEST cases for each RULE in patterns config. 519 TESTs marked with a 'FAIL' comment are intended to return 0 matches or only matches that are filtered out. 520 Otherwise a TEST is intended to return 1 or more matches. 521 522 By default, this runs each test and observes only results that were triggered by that rule being tested. 523 If scope is "ruleset" then any results from any rule will be allowed. 524 "rule" scope is much better for detailed rule development as it tells you if your rule tests are testing the 525 right thing. 526 527 Runs the default tests on the provided configuration. Plenty of debug printed to screen. 528 But returns the test results as an array, e.g., to write to CSV for review. 529 This uses PatternExtractor.extract_patterns() to avoid any collision with the generic use 530 of Extractor.extract() parent method. 531 :param scope: rule or ruleset. Rule scope means only results for rule test case are evaluated. 532 ruleset scope means that all results for a test are evaluated. 533 :return: test results array; Each result represents a TEST case run against a RULE 534 """ 535 test_results = [] 536 for t in self.pattern_manager.test_cases: 537 expect_valid_match = "FAIL" not in t.text 538 print("Test", t.family, t.text) 539 output1 = self.extract_patterns(t.text, features=[t.family]) 540 541 output = [] 542 for m in output1: 543 if scope == "rule" and not t.id.startswith(m.pattern_id): 544 continue 545 output.append(m) 546 547 # Determine if pattern matched true positive or false positive. 548 # To condition the TP or FP based on the matches 549 # keep a running tally of whether each match is filtered or not. 550 # That is, for many matches True Positive = at least one unfiltered match is needed, AND was expected. 551 # for many matches False Positive = at least one unfiltered match is needed, AND was NOT expected. 552 fpcount = 0 553 tpcount = 0 554 for m in output: 555 allowed = not m.filtered_out or (m.is_duplicate and m.filtered_out) 556 if expect_valid_match and allowed: 557 tpcount += 1 558 if not expect_valid_match and allowed: 559 fpcount += 1 560 561 tp = tpcount > 0 and expect_valid_match 562 fp = fpcount > 0 and not expect_valid_match 563 tn = fpcount == 0 and not expect_valid_match 564 fn = tpcount == 0 and expect_valid_match 565 success = (tp or tn) and not (fp or fn) 566 test_results.append({"TEST": t.id, 567 "TEXT": t.text, 568 "MATCHES": output, 569 "PASS": success}) 570 571 return test_results
Default Tests run all TEST cases for each RULE in patterns config. TESTs marked with a 'FAIL' comment are intended to return 0 matches or only matches that are filtered out. Otherwise a TEST is intended to return 1 or more matches.
By default, this runs each test and observes only results that were triggered by that rule being tested. If scope is "ruleset" then any results from any rule will be allowed. "rule" scope is much better for detailed rule development as it tells you if your rule tests are testing the right thing.
Runs the default tests on the provided configuration. Plenty of debug printed to screen. But returns the test results as an array, e.g., to write to CSV for review. This uses PatternExtractor.extract_patterns() to avoid any collision with the generic use of Extractor.extract() parent method. :param scope: rule or ruleset. Rule scope means only results for rule test case are evaluated. ruleset scope means that all results for a test are evaluated. :return: test results array; Each result represents a TEST case run against a RULE
574def print_test(result: dict): 575 """ print the structure from default_tests() 576 """ 577 if not result: 578 return 579 580 tid = result["TEST"] 581 txt = result["TEXT"] 582 res = result["PASS"] 583 matches = "<None>" 584 if result["MATCHES"]: 585 arr = result["MATCHES"] 586 matches = ";".join([match.text for match in arr]) 587 print(f"TEST: {tid}, TEXT: {txt} PASS:{res}\tMATCHES: {matches}")
print the structure from default_tests()