""" Tests for Phase 3: Relation Algebra """ from pysecondo.core.nested_list import atom, list_nl from pysecondo.core.types import BaseType, TupleType, RelationType, Attribute from pysecondo.query_processor import QueryProcessor from pysecondo.storage.memory import MemoryStorage from pysecondo.algebras.relation import RelationAlgebra from pysecondo.algebras.standard import StandardAlgebra from pysecondo.algebras.base import AlgebraManager import sys sys.path.insert(0, '.') def test_create_relation(): """Test creating relations""" print("Testing Create Relation...") storage = MemoryStorage() algebra_manager = AlgebraManager() algebra_manager.register_algebra("StandardAlgebra", StandardAlgebra()) algebra_manager.register_algebra( "RelationAlgebra", RelationAlgebra(storage) ) qp = QueryProcessor(algebra_manager, storage) # Create a relation qp.execute_create( "cities", "(rel (tuple ((Name string)(Population int))))" ) # Check it exists assert storage.object_exists("cities") obj_type = storage.get_type("cities") assert isinstance(obj_type, RelationType) print(" ✓ Create relation tests passed") def test_update_relation(): """Test updating relations with data""" print("Testing Update Relation...") storage = MemoryStorage() algebra_manager = AlgebraManager() algebra_manager.register_algebra("StandardAlgebra", StandardAlgebra()) algebra_manager.register_algebra( "RelationAlgebra", RelationAlgebra(storage) ) qp = QueryProcessor(algebra_manager, storage) # Create and populate a relation qp.execute_create( "cities", "(rel (tuple ((Name string)(Population int))))" ) # Insert data cities_data = list_nl( list_nl("Beijing", 21540000), list_nl("Shanghai", 24280000), list_nl("Guangzhou", 14040000), ) qp.execute_update("cities", cities_data) # Verify data cities = qp.lookup_identifier("cities") assert len(cities) == 3 print(" ✓ Update relation tests passed") def test_feed_consume(): """Test feed and consume operators""" print("Testing Feed and Consume...") storage = MemoryStorage() algebra_manager = AlgebraManager() algebra_manager.register_algebra("StandardAlgebra", StandardAlgebra()) algebra_manager.register_algebra( "RelationAlgebra", RelationAlgebra(storage) ) qp = QueryProcessor(algebra_manager, storage) # Create and populate a relation qp.execute_create( "cities", "(rel (tuple ((Name string)(Population int))))" ) cities_data = list_nl( list_nl("Beijing", 21540000), list_nl("Shanghai", 24280000), list_nl("Guangzhou", 14040000), ) qp.execute_update("cities", cities_data) # Test feed: relation -> stream cities = qp.lookup_identifier("cities") cities_type = qp.get_identifier_type("cities") feed_op = algebra_manager.get_operator("feed") stream = feed_op.value_map([cities]) assert len(stream) == 3 # Test consume: stream -> relation consume_op = algebra_manager.get_operator("consume") result = consume_op.value_map([stream]) assert len(result) == 3 print(" ✓ Feed and consume tests passed") def test_count(): """Test count operator""" print("Testing Count Operator...") storage = MemoryStorage() algebra_manager = AlgebraManager() algebra_manager.register_algebra("StandardAlgebra", StandardAlgebra()) algebra_manager.register_algebra( "RelationAlgebra", RelationAlgebra(storage) ) qp = QueryProcessor(algebra_manager, storage) # Create and populate a relation qp.execute_create( "cities", "(rel (tuple ((Name string)(Population int))))" ) cities_data = list_nl( list_nl("Beijing", 21540000), list_nl("Shanghai", 24280000), list_nl("Guangzhou", 14040000), list_nl("Shenzhen", 17560000), ) qp.execute_update("cities", cities_data) # Test count cities = qp.lookup_identifier("cities") feed_op = algebra_manager.get_operator("feed") count_op = algebra_manager.get_operator("count") stream = feed_op.value_map([cities]) count_result = count_op.value_map([stream]) assert count_result.value == 4 print(" ✓ Count operator tests passed") def test_filter(): """Test filter operator""" print("Testing Filter Operator...") storage = MemoryStorage() algebra_manager = AlgebraManager() algebra_manager.register_algebra("StandardAlgebra", StandardAlgebra()) algebra_manager.register_algebra( "RelationAlgebra", RelationAlgebra(storage) ) qp = QueryProcessor(algebra_manager, storage) # Create and populate a relation qp.execute_create( "cities", "(rel (tuple ((Name string)(Population int))))" ) cities_data = list_nl( list_nl("Beijing", 21540000), list_nl("Shanghai", 24280000), list_nl("Guangzhou", 14040000), list_nl("Shenzhen", 17560000), ) qp.execute_update("cities", cities_data) # Test filter with true (pass all) cities = qp.lookup_identifier("cities") feed_op = algebra_manager.get_operator("feed") filter_op = algebra_manager.get_operator("filter") stream = feed_op.value_map([cities]) filtered = filter_op.value_map([stream, atom(True)]) assert len(filtered) == 4 # Test filter with false (pass none) filtered = filter_op.value_map([stream, atom(False)]) assert len(filtered) == 0 print(" ✓ Filter operator tests passed") def test_query_pipeline(): """Test a complete query pipeline""" print("Testing Query Pipeline...") storage = MemoryStorage() algebra_manager = AlgebraManager() algebra_manager.register_algebra("StandardAlgebra", StandardAlgebra()) algebra_manager.register_algebra( "RelationAlgebra", RelationAlgebra(storage) ) qp = QueryProcessor(algebra_manager, storage) # Setup: create cities relation qp.execute_create( "cities", "(rel (tuple ((Name string)(Population int))))" ) cities_data = list_nl( list_nl("Beijing", 21540000), list_nl("Shanghai", 24280000), list_nl("Guangzhou", 14040000), list_nl("Shenzhen", 17560000), list_nl("Hangzhou", 12200000), ) qp.execute_update("cities", cities_data) # Query: cities feed count # Equivalent to: SELECT COUNT(*) FROM cities cities = qp.lookup_identifier("cities") feed_op = algebra_manager.get_operator("feed") count_op = algebra_manager.get_operator("count") stream = feed_op.value_map([cities]) count = count_op.value_map([stream]) assert count.value == 5 # Query: cities feed filter[true] consume # Equivalent to: SELECT * FROM cities filter_op = algebra_manager.get_operator("filter") consume_op = algebra_manager.get_operator("consume") stream = feed_op.value_map([cities]) filtered = filter_op.value_map([stream, atom(True)]) result = consume_op.value_map([filtered]) assert len(result) == 5 print(" ✓ Query pipeline tests passed") def test_type_checking(): """Test type checking for relation operators""" print("Testing Type Checking for Relations...") algebra_manager = AlgebraManager() algebra_manager.register_algebra("StandardAlgebra", StandardAlgebra()) storage = MemoryStorage() algebra_manager.register_algebra( "RelationAlgebra", RelationAlgebra(storage) ) # Test feed type mapping feed_op = algebra_manager.get_operator("feed") tuple_type = TupleType([ Attribute("Name", BaseType.STRING), Attribute("Population", BaseType.INT) ]) rel_type = RelationType(tuple_type) result_type = feed_op.type_map([rel_type]) assert result_type == tuple_type # Test consume type mapping consume_op = algebra_manager.get_operator("consume") result_type = consume_op.type_map([tuple_type]) assert isinstance(result_type, RelationType) assert result_type.tuple_type == tuple_type # Test count type mapping count_op = algebra_manager.get_operator("count") result_type = count_op.type_map([tuple_type]) assert result_type == BaseType.INT print(" ✓ Type checking tests passed") if __name__ == "__main__": print("=" * 50) print("Phase 3: Relation Algebra Tests") print("=" * 50) test_create_relation() test_update_relation() test_feed_consume() test_count() test_filter() test_query_pipeline() test_type_checking() print("\n" + "=" * 50) print("All Phase 3 tests passed! ✓") print("=" * 50)