""" Tests for Phase 4: Query Processing & REPL """ from pysecondo.core.nested_list import atom, list_nl from pysecondo.core.types import BaseType, TupleType, RelationType from pysecondo.storage.memory import MemoryStorage from pysecondo.algebras.relation import RelationAlgebra from pysecondo.algebras.standard import StandardAlgebra from pysecondo.algebras.base import AlgebraManager from pysecondo.parser.evaluator import Evaluator from pysecondo.parser.parser import Parser, parse_query, CreateCommand, UpdateCommand, QueryCommand import sys sys.path.insert(0, '.') def test_parser_create(): """Test parsing CREATE commands""" print("Testing Parser (CREATE)...") parser = Parser() # Test create command cmd = parser.parse( 'create cities : (rel (tuple ((Name string)(Population int))))') assert isinstance(cmd, CreateCommand) assert cmd.name == "cities" assert "(rel" in cmd.type_str print(" ✓ CREATE parsing tests passed") def test_parser_update(): """Test parsing UPDATE commands""" print("Testing Parser (UPDATE)...") parser = Parser() # Test update command cmd = parser.parse( 'update cities := (("Beijing" 21540000)("Shanghai" 24280000))') assert isinstance(cmd, UpdateCommand) assert cmd.name == "cities" assert "Beijing" in cmd.value print(" ✓ UPDATE parsing tests passed") def test_parser_query(): """Test parsing QUERY commands""" print("Testing Parser (QUERY)...") parser = Parser() # Test query command cmd = parser.parse('query cities feed count') assert isinstance(cmd, QueryCommand) assert cmd.expression == "cities feed count" # Test arithmetic query cmd = parser.parse('query 5 + 3') assert isinstance(cmd, QueryCommand) assert cmd.expression == "5 + 3" print(" ✓ QUERY parsing tests passed") def test_parser_expressions(): """Test expression tokenization""" print("Testing Expression Tokenization...") parser = Parser() # Simple identifier tokens = parser.parse_expression("cities") assert tokens == ["cities"] # Operator chain tokens = parser.parse_expression("cities feed consume") assert tokens == ["cities", "feed", "consume"] # Arithmetic tokens = parser.parse_expression("5 + 3") assert tokens == ["5", "+", "3"] # Complex expression tokens = parser.parse_expression("cities feed filter true consume") assert tokens == ["cities", "feed", "filter", "true", "consume"] print(" ✓ Expression tokenization tests passed") def test_evaluator_arithmetic(): """Test evaluating arithmetic expressions""" print("Testing Evaluator (Arithmetic)...") storage = MemoryStorage() algebra_manager = AlgebraManager() algebra_manager.register_algebra("StandardAlgebra", StandardAlgebra()) evaluator = Evaluator(algebra_manager, storage) # Test: 5 + 3 tokens = ["5", "+", "3"] value, value_type = evaluator.evaluate(tokens) assert value.value == 8 # Test: 10 - 4 tokens = ["10", "-", "4"] value, value_type = evaluator.evaluate(tokens) assert value.value == 6 # Test: 6 * 7 tokens = ["6", "*", "7"] value, value_type = evaluator.evaluate(tokens) assert value.value == 42 print(" ✓ Arithmetic evaluation tests passed") def test_evaluator_identifiers(): """Test evaluating identifiers""" print("Testing Evaluator (Identifiers)...") storage = MemoryStorage() algebra_manager = AlgebraManager() algebra_manager.register_algebra("StandardAlgebra", StandardAlgebra()) evaluator = Evaluator(algebra_manager, storage) # Store a value storage.create_object("x", atom(42), BaseType.INT) # Test: x tokens = ["x"] value, value_type = evaluator.evaluate(tokens) assert value.value == 42 print(" ✓ Identifier evaluation tests passed") def test_evaluator_relations(): """Test evaluating relation expressions""" print("Testing Evaluator (Relations)...") storage = MemoryStorage() algebra_manager = AlgebraManager() algebra_manager.register_algebra("StandardAlgebra", StandardAlgebra()) algebra_manager.register_algebra( "RelationAlgebra", RelationAlgebra(storage)) evaluator = Evaluator(algebra_manager, storage) # Create a relation cities_type = RelationType(TupleType([])) storage.create_object("cities", list_nl(), cities_type) # Test: cities feed tokens = ["cities", "feed"] value, value_type = evaluator.evaluate(tokens) print(" ✓ Relation evaluation tests passed") def test_end_to_end(): """Test end-to-end query execution""" print("Testing End-to-End Queries...") storage = MemoryStorage() algebra_manager = AlgebraManager() algebra_manager.register_algebra("StandardAlgebra", StandardAlgebra()) algebra_manager.register_algebra( "RelationAlgebra", RelationAlgebra(storage)) parser = Parser() evaluator = Evaluator(algebra_manager, storage) # Create relation create_cmd = parser.parse( 'create cities : (rel (tuple ((Name string)(Population int))))') assert isinstance(create_cmd, CreateCommand) # Insert data (using storage directly for simplicity) from pysecondo.core.types import parse_type cities_type = parse_type('(rel (tuple ((Name string)(Population int))))') cities_data = list_nl( list_nl("Beijing", 21540000), list_nl("Shanghai", 24280000), list_nl("Guangzhou", 14040000), ) storage.create_object("cities", cities_data, cities_type) # Query: cities feed count query_cmd = parser.parse('query cities feed count') assert isinstance(query_cmd, QueryCommand) tokens = parser.parse_expression(query_cmd.expression) value, value_type = evaluator.evaluate(tokens) assert value.value == 3 print(" ✓ End-to-end query tests passed") if __name__ == "__main__": print("=" * 50) print("Phase 4: Query Processing & REPL Tests") print("=" * 50) test_parser_create() test_parser_update() test_parser_query() test_parser_expressions() test_evaluator_arithmetic() test_evaluator_identifiers() test_evaluator_relations() test_end_to_end() print("\n" + "=" * 50) print("All Phase 4 tests passed! ✓") print("=" * 50)