Conhecendo o problema
No desenvolvimento comercial, muitos casos de uso de aprendizado de máquina implicam em uma arquitetura multilocatário e exigem o treinamento de um modelo separado para cada cliente e / ou usuário.
Por exemplo, considere a previsão de compras e demanda de certos produtos usando aprendizado de máquina. Se você administra uma rede de lojas de varejo, pode usar os dados do histórico de compras do cliente e a demanda total desses produtos para prever custos e volumes de compra para cada loja individualmente.
Na maioria das vezes, em tais casos, para implantar modelos, você escreve um serviço Flask e o coloca em um contêiner do Docker. Existem muitos exemplos de servidores de aprendizado de máquina de modelo único, mas quando se trata de implementar vários modelos, o desenvolvedor tem poucas opções disponíveis para resolver o problema.
Em aplicativos multilocatários, o número de inquilinos não é conhecido com antecedência e pode ser praticamente ilimitado - em um momento você pode ter apenas um cliente, e em outro momento você pode servir modelos separados para cada usuário para milhares de usuários. É aqui que as limitações da abordagem de implantação padrão começam a surgir:
Se implantarmos um contêiner Docker para cada cliente, acabaremos com um aplicativo muito grande e caro que será muito difícil de gerenciar.
Um único container, em cuja imagem existem todos os modelos, também não funciona para nós, porque milhares de modelos podem funcionar no servidor e novos modelos são adicionados em tempo de execução.
Decisão
, . , Airflow S3, ML — .
ML — , : -> .
, :
Model — , ; SklearnModel, TensorFlowModel, MyCustomModel . .
ModelInfoRepository — , userid -> modelid. , SQAlchemyModelInfoRepository.
ModelRepository — , ID. FileSystemRepository, S3Repository .
from abc import ABC
class Model(ABC):
@abstractmethod
def predict(self, data: pd.DataFrame) -> np.ndarray:
raise NotImplementedError
class ModelInfoRepository(ABC):
@abstractmethod
def get_model_id_by_user_id(self, user_id: str) -> str:
raise NotImplementedError
class ModelRepository(ABC):
@abstractmethod
def get_model(self, model_id: str) -> Model:
raise NotImplementedError
, sklearn, Amazon S3 userid -> modelid, .
class SklearnModel(Model):
def __init__(self, model):
self.model = model
def predict(self, data: pd.DataFrame):
return self.model.predict(data)
class SQAlchemyModelInfoRepository(ModelInfoRepository):
def __init__(self, sqalchemy_session: Session):
self.session = sqalchemy_session
def get_model_id_by_user_id(user_id: str) -> str:
# implementation goes here, query a table in any Database
class S3ModelRepository(ModelRepository):
def __init__(self, s3_client):
self.s3_client = s3_client
def get_model(self, model_id: str) -> Model:
# load and deserialize pickle from S3, implementation goes here
:
def make_app(model_info_repository: ModelInfoRepository,
model_repsitory: ModelRepository) -> Flask:
app = Flask("multi-model-server")
@app.predict("/predict/<user_id>")
def predict(user_id):
model_id = model_info_repository.get_model_id_by_user_id(user_id)
model = model_repsitory.get_model(model_id)
data = pd.DataFrame(request.json())
predictions = model.predict(data)
return jsonify(predictions.tolist())
return app
, Flask ; sklearn TensorFlow S3 , Flask .
, , . , . cachetools:
from cachetools import Cache
class CachedModelRepository(ModelRepository):
def __init__(self, model_repository: ModelRepository, cache: Cache):
self.model_repository = model_repository
self.cache = cache
@abstractmethod
def get_model(self, model_id: str) -> Model:
if model_id not in self.cache:
self.cache[model_id] = self.model_repository.get_model(model_id)
return self.cache[model_id]
:
from cachetools import LRUCache
model_repository = CachedModelRepository(
S3ModelRepository(s3_client),
LRUCache(max_size=10)
)
- , . , , MLOps . . , . №4 Google: , - .