320 lines
10 KiB
Python
320 lines
10 KiB
Python
|
|
"""
|
||
|
|
Expression Evaluator for PySECONDO
|
||
|
|
|
||
|
|
Evaluates query expressions using the algebra system.
|
||
|
|
Handles identifier lookup and operator execution.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import re
|
||
|
|
from typing import List, Any, Tuple
|
||
|
|
from pysecondo.core.types import Type, BaseType
|
||
|
|
from pysecondo.core.nested_list import NestedList, atom, list_nl
|
||
|
|
from pysecondo.algebras.base import AlgebraManager
|
||
|
|
from pysecondo.storage.memory import MemoryStorage
|
||
|
|
|
||
|
|
|
||
|
|
class Evaluator:
|
||
|
|
"""
|
||
|
|
Expression evaluator
|
||
|
|
|
||
|
|
Evaluates expressions like:
|
||
|
|
- identifier
|
||
|
|
- identifier feed consume
|
||
|
|
- identifier feed count
|
||
|
|
- 5 + 3
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
algebra_manager: AlgebraManager,
|
||
|
|
storage: MemoryStorage
|
||
|
|
):
|
||
|
|
self.algebra_manager = algebra_manager
|
||
|
|
self.storage = storage
|
||
|
|
|
||
|
|
def evaluate(self, tokens: List[str]) -> Tuple[NestedList, Type]:
|
||
|
|
"""
|
||
|
|
Evaluate a tokenized expression
|
||
|
|
|
||
|
|
Args:
|
||
|
|
tokens: List of tokens from parser
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Tuple of (value, type)
|
||
|
|
|
||
|
|
Raises:
|
||
|
|
ValueError: If evaluation fails
|
||
|
|
"""
|
||
|
|
if not tokens:
|
||
|
|
raise ValueError("Empty expression")
|
||
|
|
|
||
|
|
# Handle single token
|
||
|
|
if len(tokens) == 1:
|
||
|
|
return self.evaluate_single(tokens[0])
|
||
|
|
|
||
|
|
# Check if first token is a unary operator (prefix notation like "not false")
|
||
|
|
unary_ops = {'feed', 'consume', 'count', 'filter', 'not'}
|
||
|
|
if tokens[0] in unary_ops:
|
||
|
|
op_name = tokens[0]
|
||
|
|
if len(tokens) < 2:
|
||
|
|
raise ValueError(
|
||
|
|
f"Unary operator {op_name} requires an operand")
|
||
|
|
|
||
|
|
if op_name == 'filter':
|
||
|
|
# filter needs a predicate
|
||
|
|
if len(tokens) < 3:
|
||
|
|
raise ValueError("filter requires a predicate")
|
||
|
|
operand_token = tokens[1]
|
||
|
|
pred_token = tokens[2]
|
||
|
|
operand_value, operand_type = self.evaluate_single(
|
||
|
|
operand_token)
|
||
|
|
pred_value, pred_type = self.evaluate_single(pred_token)
|
||
|
|
return self.execute_operator(
|
||
|
|
op_name,
|
||
|
|
[operand_value, pred_value],
|
||
|
|
[operand_type, pred_type]
|
||
|
|
)
|
||
|
|
else:
|
||
|
|
# Simple unary operator
|
||
|
|
operand_token = tokens[1]
|
||
|
|
operand_value, operand_type = self.evaluate_single(
|
||
|
|
operand_token)
|
||
|
|
return self.execute_operator(
|
||
|
|
op_name,
|
||
|
|
[operand_value],
|
||
|
|
[operand_type]
|
||
|
|
)
|
||
|
|
|
||
|
|
# Start with first token (value or identifier)
|
||
|
|
current_value, current_type = self.evaluate_single(tokens[0])
|
||
|
|
i = 1
|
||
|
|
|
||
|
|
# Process operator chain
|
||
|
|
while i < len(tokens):
|
||
|
|
op_name = tokens[i]
|
||
|
|
operator = self.algebra_manager.get_operator(op_name)
|
||
|
|
|
||
|
|
if operator is None:
|
||
|
|
# Try to evaluate as a single token (might be an identifier)
|
||
|
|
try:
|
||
|
|
right_value, right_type = self.evaluate_single(op_name)
|
||
|
|
current_value, current_type = right_value, right_type
|
||
|
|
i += 1
|
||
|
|
continue
|
||
|
|
except:
|
||
|
|
raise ValueError(f"Unknown operator: {op_name}")
|
||
|
|
|
||
|
|
# Check if operator is unary or binary
|
||
|
|
# Unary operators: feed, consume, count, filter, not
|
||
|
|
unary_ops = {'feed', 'consume', 'count', 'filter', 'not'}
|
||
|
|
binary_ops = {'+', '-', '*', '/', '<', '>', '=', '!=', '<=', '>=',
|
||
|
|
'and', 'or'}
|
||
|
|
|
||
|
|
if op_name in unary_ops:
|
||
|
|
# Unary operator: apply to current value
|
||
|
|
if op_name == 'filter':
|
||
|
|
# filter needs a predicate argument
|
||
|
|
if i + 1 >= len(tokens):
|
||
|
|
raise ValueError("filter requires a predicate")
|
||
|
|
pred_token = tokens[i + 1]
|
||
|
|
pred_value, pred_type = self.evaluate_single(pred_token)
|
||
|
|
current_value, current_type = self.execute_operator(
|
||
|
|
op_name,
|
||
|
|
[current_value, pred_value],
|
||
|
|
[current_type, pred_type]
|
||
|
|
)
|
||
|
|
i += 2
|
||
|
|
else:
|
||
|
|
current_value, current_type = self.execute_operator(
|
||
|
|
op_name,
|
||
|
|
[current_value],
|
||
|
|
[current_type]
|
||
|
|
)
|
||
|
|
i += 1
|
||
|
|
elif op_name in binary_ops:
|
||
|
|
# Binary operator: need right operand
|
||
|
|
if i + 1 >= len(tokens):
|
||
|
|
raise ValueError(
|
||
|
|
f"Binary operator {op_name} requires right operand")
|
||
|
|
|
||
|
|
right_token = tokens[i + 1]
|
||
|
|
right_value, right_type = self.evaluate_single(right_token)
|
||
|
|
|
||
|
|
current_value, current_type = self.execute_operator(
|
||
|
|
op_name,
|
||
|
|
[current_value, right_value],
|
||
|
|
[current_type, right_type]
|
||
|
|
)
|
||
|
|
i += 2
|
||
|
|
else:
|
||
|
|
# Assume unary
|
||
|
|
current_value, current_type = self.execute_operator(
|
||
|
|
op_name,
|
||
|
|
[current_value],
|
||
|
|
[current_type]
|
||
|
|
)
|
||
|
|
i += 1
|
||
|
|
|
||
|
|
return current_value, current_type
|
||
|
|
|
||
|
|
def evaluate_single(self, token: str) -> Tuple[NestedList, Type]:
|
||
|
|
"""
|
||
|
|
Evaluate a single token
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Tuple of (value, type)
|
||
|
|
"""
|
||
|
|
# Boolean (check before identifier since 'true'/'false' are valid identifiers)
|
||
|
|
if token.lower() == 'true':
|
||
|
|
return atom(True), BaseType.BOOL
|
||
|
|
if token.lower() == 'false':
|
||
|
|
return atom(False), BaseType.BOOL
|
||
|
|
|
||
|
|
# String
|
||
|
|
if self.is_string(token):
|
||
|
|
value = atom(token[1:-1]) # Remove quotes
|
||
|
|
return value, BaseType.STRING
|
||
|
|
|
||
|
|
# Number
|
||
|
|
if self.is_number(token):
|
||
|
|
return self.parse_number(token)
|
||
|
|
|
||
|
|
# Nested list value
|
||
|
|
if token.startswith('(') and token.endswith(')'):
|
||
|
|
return self.parse_nested_list(token)
|
||
|
|
|
||
|
|
# Identifier (check last since it matches many patterns)
|
||
|
|
if self.is_identifier(token):
|
||
|
|
return self.lookup_identifier(token)
|
||
|
|
|
||
|
|
raise ValueError(f"Cannot evaluate token: {token}")
|
||
|
|
|
||
|
|
def lookup_identifier(self, name: str) -> Tuple[NestedList, Type]:
|
||
|
|
"""Look up an identifier in storage"""
|
||
|
|
value = self.storage.get_object(name)
|
||
|
|
if value is None:
|
||
|
|
raise ValueError(f"Unknown identifier: {name}")
|
||
|
|
|
||
|
|
obj_type = self.storage.get_type(name)
|
||
|
|
return value, obj_type
|
||
|
|
|
||
|
|
def parse_number(self, token: str) -> Tuple[NestedList, Type]:
|
||
|
|
"""Parse a number token"""
|
||
|
|
try:
|
||
|
|
if '.' in token:
|
||
|
|
value = atom(float(token))
|
||
|
|
return value, BaseType.REAL
|
||
|
|
else:
|
||
|
|
value = atom(int(token))
|
||
|
|
return value, BaseType.INT
|
||
|
|
except ValueError:
|
||
|
|
raise ValueError(f"Invalid number: {token}")
|
||
|
|
|
||
|
|
def parse_nested_list(self, token: str) -> Tuple[NestedList, Type]:
|
||
|
|
"""
|
||
|
|
Parse a nested list token
|
||
|
|
|
||
|
|
This is a simplified parser that handles basic nested lists.
|
||
|
|
"""
|
||
|
|
# Remove outer parentheses
|
||
|
|
inner = token[1:-1].strip()
|
||
|
|
|
||
|
|
if not inner:
|
||
|
|
# Empty list
|
||
|
|
value = list_nl()
|
||
|
|
return value, BaseType.INT # Default type
|
||
|
|
|
||
|
|
# Try to parse as list of values
|
||
|
|
# For simplicity, just handle comma-separated values
|
||
|
|
parts = self.split_list(inner)
|
||
|
|
|
||
|
|
values = []
|
||
|
|
for part in parts:
|
||
|
|
part = part.strip()
|
||
|
|
if self.is_number(part):
|
||
|
|
val, _ = self.parse_number(part)
|
||
|
|
values.append(val)
|
||
|
|
elif self.is_string(part):
|
||
|
|
val, _ = self.evaluate_single(part)
|
||
|
|
values.append(val)
|
||
|
|
else:
|
||
|
|
# Assume it's a nested list
|
||
|
|
val, _ = self.parse_nested_list(part)
|
||
|
|
values.append(val)
|
||
|
|
|
||
|
|
value = list_nl(*values)
|
||
|
|
# Type inference would happen here
|
||
|
|
return value, BaseType.INT
|
||
|
|
|
||
|
|
def split_list(self, s: str) -> List[str]:
|
||
|
|
"""
|
||
|
|
Split a list string into parts
|
||
|
|
|
||
|
|
Handles nested parentheses correctly.
|
||
|
|
"""
|
||
|
|
parts = []
|
||
|
|
current = []
|
||
|
|
depth = 0
|
||
|
|
|
||
|
|
for char in s:
|
||
|
|
if char == '(':
|
||
|
|
depth += 1
|
||
|
|
current.append(char)
|
||
|
|
elif char == ')':
|
||
|
|
depth -= 1
|
||
|
|
current.append(char)
|
||
|
|
elif char in ' \t' and depth == 0:
|
||
|
|
if current:
|
||
|
|
parts.append(''.join(current))
|
||
|
|
current = []
|
||
|
|
else:
|
||
|
|
current.append(char)
|
||
|
|
|
||
|
|
if current:
|
||
|
|
parts.append(''.join(current))
|
||
|
|
|
||
|
|
return parts
|
||
|
|
|
||
|
|
def execute_operator(
|
||
|
|
self,
|
||
|
|
op_name: str,
|
||
|
|
args: List[NestedList],
|
||
|
|
arg_types: List[Type]
|
||
|
|
) -> Tuple[NestedList, Type]:
|
||
|
|
"""
|
||
|
|
Execute an operator
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Tuple of (result_value, result_type)
|
||
|
|
"""
|
||
|
|
operator = self.algebra_manager.get_operator(op_name)
|
||
|
|
|
||
|
|
if operator is None:
|
||
|
|
raise ValueError(f"Unknown operator: {op_name}")
|
||
|
|
|
||
|
|
# Type check
|
||
|
|
result_type = operator.type_map(arg_types)
|
||
|
|
|
||
|
|
# Execute
|
||
|
|
if operator.value_map is None:
|
||
|
|
raise ValueError(f"Operator {op_name} has no value mapping")
|
||
|
|
|
||
|
|
result = operator.value_map(args)
|
||
|
|
return result, result_type
|
||
|
|
|
||
|
|
def is_identifier(self, token: str) -> bool:
|
||
|
|
"""Check if token is an identifier"""
|
||
|
|
return bool(re.match(r'^[a-zA-Z_]\w*$', token))
|
||
|
|
|
||
|
|
def is_number(self, token: str) -> bool:
|
||
|
|
"""Check if token is a number"""
|
||
|
|
try:
|
||
|
|
float(token)
|
||
|
|
return True
|
||
|
|
except ValueError:
|
||
|
|
return False
|
||
|
|
|
||
|
|
def is_string(self, token: str) -> bool:
|
||
|
|
"""Check if token is a string literal"""
|
||
|
|
return token.startswith('"') and token.endswith('"')
|