Source code for ocrd_network.runtime_data
from __future__ import annotations
from typing import Dict, List
from .deployment_utils import (
create_docker_client,
create_ssh_client,
DeployType
)
__all__ = [
'DataHost',
'DataMongoDB',
'DataProcessingWorker',
'DataProcessorServer',
'DataRabbitMQ'
]
[docs]class DataHost:
def __init__(self, config: Dict) -> None:
self.address = config['address']
self.username = config['username']
self.password = config.get('password', None)
self.keypath = config.get('path_to_privkey', None)
# These flags are used to track whether a connection
# of the specified type will be required
self.needs_ssh: bool = False
self.needs_docker: bool = False
self.ssh_client = None
self.docker_client = None
# TODO: Not sure this is DS is ideal, seems off
self.data_workers: List[DataProcessingWorker] = []
self.data_servers: List[DataProcessorServer] = []
for worker in config.get('workers', []):
name = worker['name']
count = worker['number_of_instance']
deploy_type = DeployType.DOCKER if worker.get('deploy_type', None) == 'docker' else DeployType.NATIVE
if not self.needs_ssh and deploy_type == DeployType.NATIVE:
self.needs_ssh = True
if not self.needs_docker and deploy_type == DeployType.DOCKER:
self.needs_docker = True
for _ in range(count):
self.data_workers.append(DataProcessingWorker(self.address, deploy_type, name))
for server in config.get('servers', []):
name = server['name']
port = server['port']
deploy_type = DeployType.DOCKER if server.get('deploy_type', None) == 'docker' else DeployType.NATIVE
if not self.needs_ssh and deploy_type == DeployType.NATIVE:
self.needs_ssh = True
if not self.needs_docker and deploy_type == DeployType.DOCKER:
self.needs_docker = True
self.data_servers.append(DataProcessorServer(self.address, port, deploy_type, name))
# Key: processor_name, Value: list of ports
self.server_ports: dict = {}
[docs] def create_client(self, client_type: str):
if client_type not in ['docker', 'ssh']:
raise ValueError(f'Host client type cannot be of type: {client_type}')
if client_type == 'ssh':
if not self.ssh_client:
self.ssh_client = create_ssh_client(
self.address, self.username, self.password, self.keypath)
return self.ssh_client
if client_type == 'docker':
if not self.docker_client:
self.docker_client = create_docker_client(
self.address, self.username, self.password, self.keypath
)
return self.docker_client
[docs]class DataProcessingWorker:
def __init__(self, host: str, deploy_type: DeployType, processor_name: str) -> None:
self.host = host
self.deploy_type = deploy_type
self.processor_name = processor_name
# Assigned when deployed
self.pid = None
[docs]class DataProcessorServer:
def __init__(self, host: str, port: int, deploy_type: DeployType, processor_name: str) -> None:
self.host = host
self.port = port
self.deploy_type = deploy_type
self.processor_name = processor_name
# Assigned when deployed
self.pid = None
[docs]class DataMongoDB:
def __init__(self, config: Dict) -> None:
self.address = config['address']
self.port = int(config['port'])
if 'ssh' in config:
self.ssh_username = config['ssh']['username']
self.ssh_keypath = config['ssh'].get('path_to_privkey', None)
self.ssh_password = config['ssh'].get('password', None)
else:
self.ssh_username = None
self.ssh_keypath = None
self.ssh_password = None
if 'credentials' in config:
self.username = config['credentials']['username']
self.password = config['credentials']['password']
self.url = f'mongodb://{self.username}:{self.password}@{self.address}:{self.port}'
else:
self.username = None
self.password = None
self.url = f'mongodb://{self.address}:{self.port}'
self.skip_deployment = config.get('skip_deployment', False)
# Assigned when deployed
self.pid = None
[docs]class DataRabbitMQ:
def __init__(self, config: Dict) -> None:
self.address = config['address']
self.port = int(config['port'])
if 'ssh' in config:
self.ssh_username = config['ssh']['username']
self.ssh_keypath = config['ssh'].get('path_to_privkey', None)
self.ssh_password = config['ssh'].get('password', None)
else:
self.ssh_username = None
self.ssh_keypath = None
self.ssh_password = None
self.vhost = '/'
self.username = config['credentials']['username']
self.password = config['credentials']['password']
self.url = f'amqp://{self.username}:{self.password}@{self.address}:{self.port}{self.vhost}'
self.skip_deployment = config.get('skip_deployment', False)
# Assigned when deployed
self.pid = None