83 lines
2.3 KiB
Python
83 lines
2.3 KiB
Python
"""
|
|
In-Memory Storage for PySECONDO
|
|
|
|
Simplified storage implementation (no Berkeley DB)
|
|
Stores objects in memory dictionaries
|
|
"""
|
|
|
|
from typing import Dict, Optional
|
|
from pysecondo.core.nested_list import NestedList
|
|
from pysecondo.core.types import Type
|
|
|
|
|
|
class MemoryStorage:
|
|
"""
|
|
Simple in-memory storage for database objects
|
|
|
|
This is a simplified version of SECONDO's storage manager.
|
|
In the real SECONDO, Berkeley DB is used for persistence.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize empty storage"""
|
|
self.objects: Dict[str, NestedList] = {} # name -> value
|
|
self.types: Dict[str, Type] = {} # name -> type
|
|
|
|
def create_object(self, name: str, value: NestedList, obj_type: Type) -> None:
|
|
"""
|
|
Create a new object in storage
|
|
|
|
Args:
|
|
name: Object name
|
|
value: Nested list value
|
|
obj_type: Type of the object
|
|
"""
|
|
if name in self.objects:
|
|
raise ValueError(f"Object '{name}' already exists")
|
|
|
|
self.objects[name] = value
|
|
self.types[name] = obj_type
|
|
|
|
def update_object(self, name: str, value: NestedList, obj_type: Type) -> None:
|
|
"""
|
|
Update an existing object
|
|
|
|
Args:
|
|
name: Object name
|
|
value: New nested list value
|
|
obj_type: Type of the object
|
|
"""
|
|
if name not in self.objects:
|
|
raise ValueError(f"Object '{name}' does not exist")
|
|
|
|
self.objects[name] = value
|
|
self.types[name] = obj_type
|
|
|
|
def get_object(self, name: str) -> Optional[NestedList]:
|
|
"""Get object value by name"""
|
|
return self.objects.get(name)
|
|
|
|
def get_type(self, name: str) -> Optional[Type]:
|
|
"""Get object type by name"""
|
|
return self.types.get(name)
|
|
|
|
def delete_object(self, name: str) -> None:
|
|
"""Delete object from storage"""
|
|
if name in self.objects:
|
|
del self.objects[name]
|
|
if name in self.types:
|
|
del self.types[name]
|
|
|
|
def list_objects(self) -> list[str]:
|
|
"""List all object names"""
|
|
return list(self.objects.keys())
|
|
|
|
def object_exists(self, name: str) -> bool:
|
|
"""Check if object exists"""
|
|
return name in self.objects
|
|
|
|
def clear(self) -> None:
|
|
"""Clear all objects"""
|
|
self.objects.clear()
|
|
self.types.clear()
|