Source code for pax.MongoDB_ClientMaker
import logging
import time
import re
import os
import pymongo
try:
from monary import Monary # noqa
except Exception:
pass # Let's hope we're not the event builder.
try:
from mongo_proxy import MongoProxy # noqa
except Exception:
# MongoDB Proxy did not load, falling back to vanilla pymongo
[docs] def dummy(x, **kwargs):
return x
MongoProxy = dummy
[docs]class ClientMaker:
"""Helper class to create MongoDB clients
On __init__, you can specify options that will be used to format mongodb uri's,
in particular user, password, host and port.
"""
def __init__(self, config):
if 'password' not in config:
config['password'] = os.environ.get('MONGO_PASSWORD')
if not config['password']:
raise ValueError("Please provide the mongo password in the environment variable MONGO_PASSWORD")
# Select only relevant config options, so we can just pass this to .format later.
self.config = {k: config[k] for k in ('user', 'password', 'host', 'port')}
self.log = logging.getLogger('Mongo client maker')
[docs] def get_client(self, database_name=None, uri=None, monary=False, host=None, autoreconnect=False, **kwargs):
"""Get a Mongoclient. Returns Mongo database object.
If you provide a mongodb connection string uri, we will insert user & password into it,
otherwise one will be built from the configuration settings.
If database_name=None, will connect to the default database of the uri. database=something
overrides event the uri's specification of a database.
host is special magic for split_hosts
kwargs will be passed to pymongo.mongoclient/Monary
"""
# Format of URI we should eventually send to mongo
full_uri_format = 'mongodb://{user}:{password}@{host}:{port}/{database}'
if uri is None:
# We must construct the entire URI from the settings
uri = full_uri_format.format(database=database_name, **self.config)
else:
# A URI was given. We expect it to NOT include user and password:
result = parse_passwordless_uri(uri)
_host, port, _database_name = result
if result is not None:
if not host:
host = _host
if database_name is None:
database_name = _database_name
uri = full_uri_format.format(database=database_name, host=host, port=port,
user=self.config['user'], password=self.config['password'])
else:
# Some other URI was provided. Just try it and hope for the best
pass
if monary:
# Be careful enabling this debug log statement, it's useful but prints the password in the uri
# self.log.debug("Connecting to Mongo via monary using uri %s" % uri)
# serverselection option makes the C driver retry if it can't connect;
# since we often make new monary connections this is useful to protect against brief network hickups.
client = Monary(uri + '?serverSelectionTryOnce=false&serverSelectionTimeoutMS=60000', **kwargs)
self.log.debug("Succesfully connected via monary (probably...)")
return client
else:
# Be careful enabling this debug log statement, it's useful but prints the password in the uri
# self.log.debug("Connecting to Mongo using uri %s" % uri)
client = pymongo.MongoClient(uri, **kwargs)
client.admin.command('ping') # raises pymongo.errors.ConnectionFailure on failure
self.log.debug("Successfully pinged client")
if autoreconnect:
# Wrap the client in a magic object that retries autoreconnect exceptions
client = MongoProxy(client, disconnect_on_timeout=False, wait_time=180)
return client
[docs]class PersistentRunsDBConnection:
"""Helper class for maitaining a persistent collection to the XENON1T runs database"""
def __init__(self, clientmaker_config):
self.log = logging.getLogger(self.__class__.__name__)
self.clientmaker = ClientMaker(clientmaker_config)
self._connect()
def _connect(self):
self.client = self.clientmaker.get_client('run', autoreconnect=True)
self.db = self.client['run']
self.collection = self.db['runs_new']
self.pipeline_status_collection = self.db['pipeline_status']
[docs] def check(self):
"""Checks that the runs db connection we currently have is alive. If not, we try to re-acquire it forever."""
while True:
try:
self.client.admin.command('ping')
return
except Exception as e:
self.log.fatal("Exception pinging runs db: %s: %s" % (type(e), str(e)))
try:
self._connect()
except Exception as e:
self.log.fatal("Could not re-acquire runs db connection: %s %s. Trying again in ten seconds." % (
type(e), str(e)))
time.sleep(10)
[docs]def parse_passwordless_uri(uri):
"""Return host, port, database_name"""
uri_pattern = r'mongodb://([^:]+):(\d+)/(\w+)'
m = re.match(uri_pattern, uri)
if m:
# URI was provided, but without user & pass.
return m.groups()
else:
# Some other URI was provided. Just try it and hope for the best
print("Unexpected Mongo URI %s, expected format %s." % (uri, uri_pattern))
return None