""" Expression Evaluator for PySECONDO Evaluates query expressions using the algebra system. Handles identifier lookup and operator execution. """ import re from typing import List, Any, Tuple from pysecondo.core.types import Type, BaseType from pysecondo.core.nested_list import NestedList, atom, list_nl from pysecondo.algebras.base import AlgebraManager from pysecondo.storage.memory import MemoryStorage class Evaluator: """ Expression evaluator Evaluates expressions like: - identifier - identifier feed consume - identifier feed count - 5 + 3 """ def __init__( self, algebra_manager: AlgebraManager, storage: MemoryStorage ): self.algebra_manager = algebra_manager self.storage = storage def evaluate(self, tokens: List[str]) -> Tuple[NestedList, Type]: """ Evaluate a tokenized expression Args: tokens: List of tokens from parser Returns: Tuple of (value, type) Raises: ValueError: If evaluation fails """ if not tokens: raise ValueError("Empty expression") # Handle single token if len(tokens) == 1: return self.evaluate_single(tokens[0]) # Check if first token is a unary operator (prefix notation like "not false") unary_ops = {'feed', 'consume', 'count', 'filter', 'not'} if tokens[0] in unary_ops: op_name = tokens[0] if len(tokens) < 2: raise ValueError( f"Unary operator {op_name} requires an operand") if op_name == 'filter': # filter needs a predicate if len(tokens) < 3: raise ValueError("filter requires a predicate") operand_token = tokens[1] pred_token = tokens[2] operand_value, operand_type = self.evaluate_single( operand_token) pred_value, pred_type = self.evaluate_single(pred_token) return self.execute_operator( op_name, [operand_value, pred_value], [operand_type, pred_type] ) else: # Simple unary operator operand_token = tokens[1] operand_value, operand_type = self.evaluate_single( operand_token) return self.execute_operator( op_name, [operand_value], [operand_type] ) # Start with first token (value or identifier) current_value, current_type = self.evaluate_single(tokens[0]) i = 1 # Process operator chain while i < len(tokens): op_name = tokens[i] operator = self.algebra_manager.get_operator(op_name) if operator is None: # Try to evaluate as a single token (might be an identifier) try: right_value, right_type = self.evaluate_single(op_name) current_value, current_type = right_value, right_type i += 1 continue except: raise ValueError(f"Unknown operator: {op_name}") # Check if operator is unary or binary # Unary operators: feed, consume, count, filter, not unary_ops = {'feed', 'consume', 'count', 'filter', 'not'} binary_ops = {'+', '-', '*', '/', '<', '>', '=', '!=', '<=', '>=', 'and', 'or'} if op_name in unary_ops: # Unary operator: apply to current value if op_name == 'filter': # filter needs a predicate argument if i + 1 >= len(tokens): raise ValueError("filter requires a predicate") pred_token = tokens[i + 1] pred_value, pred_type = self.evaluate_single(pred_token) current_value, current_type = self.execute_operator( op_name, [current_value, pred_value], [current_type, pred_type] ) i += 2 else: current_value, current_type = self.execute_operator( op_name, [current_value], [current_type] ) i += 1 elif op_name in binary_ops: # Binary operator: need right operand if i + 1 >= len(tokens): raise ValueError( f"Binary operator {op_name} requires right operand") right_token = tokens[i + 1] right_value, right_type = self.evaluate_single(right_token) current_value, current_type = self.execute_operator( op_name, [current_value, right_value], [current_type, right_type] ) i += 2 else: # Assume unary current_value, current_type = self.execute_operator( op_name, [current_value], [current_type] ) i += 1 return current_value, current_type def evaluate_single(self, token: str) -> Tuple[NestedList, Type]: """ Evaluate a single token Returns: Tuple of (value, type) """ # Boolean (check before identifier since 'true'/'false' are valid identifiers) if token.lower() == 'true': return atom(True), BaseType.BOOL if token.lower() == 'false': return atom(False), BaseType.BOOL # String if self.is_string(token): value = atom(token[1:-1]) # Remove quotes return value, BaseType.STRING # Number if self.is_number(token): return self.parse_number(token) # Nested list value if token.startswith('(') and token.endswith(')'): return self.parse_nested_list(token) # Identifier (check last since it matches many patterns) if self.is_identifier(token): return self.lookup_identifier(token) raise ValueError(f"Cannot evaluate token: {token}") def lookup_identifier(self, name: str) -> Tuple[NestedList, Type]: """Look up an identifier in storage""" value = self.storage.get_object(name) if value is None: raise ValueError(f"Unknown identifier: {name}") obj_type = self.storage.get_type(name) return value, obj_type def parse_number(self, token: str) -> Tuple[NestedList, Type]: """Parse a number token""" try: if '.' in token: value = atom(float(token)) return value, BaseType.REAL else: value = atom(int(token)) return value, BaseType.INT except ValueError: raise ValueError(f"Invalid number: {token}") def parse_nested_list(self, token: str) -> Tuple[NestedList, Type]: """ Parse a nested list token This is a simplified parser that handles basic nested lists. """ # Remove outer parentheses inner = token[1:-1].strip() if not inner: # Empty list value = list_nl() return value, BaseType.INT # Default type # Try to parse as list of values # For simplicity, just handle comma-separated values parts = self.split_list(inner) values = [] for part in parts: part = part.strip() if self.is_number(part): val, _ = self.parse_number(part) values.append(val) elif self.is_string(part): val, _ = self.evaluate_single(part) values.append(val) else: # Assume it's a nested list val, _ = self.parse_nested_list(part) values.append(val) value = list_nl(*values) # Type inference would happen here return value, BaseType.INT def split_list(self, s: str) -> List[str]: """ Split a list string into parts Handles nested parentheses correctly. """ parts = [] current = [] depth = 0 for char in s: if char == '(': depth += 1 current.append(char) elif char == ')': depth -= 1 current.append(char) elif char in ' \t' and depth == 0: if current: parts.append(''.join(current)) current = [] else: current.append(char) if current: parts.append(''.join(current)) return parts def execute_operator( self, op_name: str, args: List[NestedList], arg_types: List[Type] ) -> Tuple[NestedList, Type]: """ Execute an operator Returns: Tuple of (result_value, result_type) """ operator = self.algebra_manager.get_operator(op_name) if operator is None: raise ValueError(f"Unknown operator: {op_name}") # Type check result_type = operator.type_map(arg_types) # Execute if operator.value_map is None: raise ValueError(f"Operator {op_name} has no value mapping") result = operator.value_map(args) return result, result_type def is_identifier(self, token: str) -> bool: """Check if token is an identifier""" return bool(re.match(r'^[a-zA-Z_]\w*$', token)) def is_number(self, token: str) -> bool: """Check if token is a number""" try: float(token) return True except ValueError: return False def is_string(self, token: str) -> bool: """Check if token is a string literal""" return token.startswith('"') and token.endswith('"')