Implante vários modelos de aprendizado de máquina em um único servidor

Conhecendo o problema

No desenvolvimento comercial, muitos casos de uso de aprendizado de máquina implicam em uma arquitetura multilocatário e exigem o treinamento de um modelo separado para cada cliente e / ou usuário.





Por exemplo, considere a previsão de compras e demanda de certos produtos usando aprendizado de máquina. Se você administra uma rede de lojas de varejo, pode usar os dados do histórico de compras do cliente e a demanda total desses produtos para prever custos e volumes de compra para cada loja individualmente.





Na maioria das vezes, em tais casos, para implantar modelos, você escreve um serviço Flask e o coloca em um contêiner do Docker. Existem muitos exemplos de servidores de aprendizado de máquina de modelo único, mas quando se trata de implementar vários modelos, o desenvolvedor tem poucas opções disponíveis para resolver o problema.





Em aplicativos multilocatários, o número de inquilinos não é conhecido com antecedência e pode ser praticamente ilimitado - em um momento você pode ter apenas um cliente, e em outro momento você pode servir modelos separados para cada usuário para milhares de usuários. É aqui que as limitações da abordagem de implantação padrão começam a surgir:





  • Se implantarmos um contêiner Docker para cada cliente, acabaremos com um aplicativo muito grande e caro que será muito difícil de gerenciar.





  • Um único container, em cuja imagem existem todos os modelos, também não funciona para nós, porque milhares de modelos podem funcionar no servidor e novos modelos são adicionados em tempo de execução.





Decisão

, . , Airflow S3, ML — .





ML — , : -> .





, :





  • Model — , ; SklearnModel, TensorFlowModel, MyCustomModel . .





  • ModelInfoRepository — , userid -> modelid. , SQAlchemyModelInfoRepository.





  • ModelRepository — , ID. FileSystemRepository, S3Repository .





from abc import ABC


class Model(ABC):
    @abstractmethod
    def predict(self, data: pd.DataFrame) -> np.ndarray:
        raise NotImplementedError
 

class ModelInfoRepository(ABC):
    @abstractmethod
    def get_model_id_by_user_id(self, user_id: str) -> str:
        raise NotImplementedError
 

class ModelRepository(ABC):
    @abstractmethod
    def get_model(self, model_id: str) -> Model:
        raise NotImplementedError
      
      



, sklearn, Amazon S3 userid -> modelid, .





class SklearnModel(Model):
    def __init__(self, model):
        self.model = model
 

    def predict(self, data: pd.DataFrame):
        return self.model.predict(data)
 

class SQAlchemyModelInfoRepository(ModelInfoRepository):
    def __init__(self, sqalchemy_session: Session):
        self.session = sqalchemy_session
 

    def get_model_id_by_user_id(user_id: str) -> str:
        # implementation goes here, query a table in any Database

      
class S3ModelRepository(ModelRepository):
    def __init__(self, s3_client):
        self.s3_client = s3_client
 

    def get_model(self, model_id: str) -> Model:
        # load and deserialize pickle from S3, implementation goes here
      
      



:





def make_app(model_info_repository: ModelInfoRepository,
    				 model_repsitory: ModelRepository) -> Flask:
    app = Flask("multi-model-server")
    
    @app.predict("/predict/<user_id>")
    def predict(user_id):
        model_id = model_info_repository.get_model_id_by_user_id(user_id)
 
        model = model_repsitory.get_model(model_id)
 
        data = pd.DataFrame(request.json())
 
        predictions = model.predict(data)
 
        return jsonify(predictions.tolist())
 
    return app
      
      



, Flask ; sklearn TensorFlow S3 , Flask .





, , . , . cachetools:





from cachetools import Cache
 
class CachedModelRepository(ModelRepository):
    def __init__(self, model_repository: ModelRepository, cache: Cache):
        self.model_repository = model_repository
        self.cache = cache
 
    @abstractmethod
    def get_model(self, model_id: str) -> Model:
        if model_id not in self.cache:
            self.cache[model_id] = self.model_repository.get_model(model_id)
        return self.cache[model_id]
      
      



:





from cachetools import LRUCache
 
model_repository = CachedModelRepository(
    S3ModelRepository(s3_client),
    LRUCache(max_size=10)
)
      
      



- , . , , MLOps . . , . №4 Google: , - .








All Articles