platypush/platypush/plugins/google/credentials.py

148 lines
4.6 KiB
Python

import argparse
import os
import re
import sys
import textwrap as tw
from typing import List, Optional
import httplib2
from oauth2client import client
from oauth2client import tools
from oauth2client.file import Storage
from platypush.config import Config
credentials_dir = os.path.join(Config.get_workdir(), "credentials", "google")
default_secrets_file = os.path.join(credentials_dir, "client_secret.json")
"""Default path for the Google API client secrets file"""
def _parse_scopes(*scopes: str) -> List[str]:
return sorted(
{
t.split("/")[-1].strip()
for scope in scopes
for t in re.split(r"[\s,]", scope)
if t
}
)
def get_credentials_filename(*scopes: str):
parsed_scopes = _parse_scopes(*scopes)
scope_name = "-".join([scope.split("/")[-1] for scope in parsed_scopes])
os.makedirs(credentials_dir, exist_ok=True)
matching_scope_file = next(
iter(
os.path.join(credentials_dir, scopes_file)
for scopes_file in {
os.path.basename(file)
for file in os.listdir(credentials_dir)
if file.endswith(".json")
}
if not set(parsed_scopes).difference(
set(scopes_file.split(".json")[0].split("-"))
)
),
None,
)
if matching_scope_file:
return matching_scope_file
return os.path.join(credentials_dir, scope_name + ".json")
def get_credentials(scope: str, secrets_file: Optional[str] = None):
scopes = _parse_scopes(scope)
credentials_file = get_credentials_filename(*scopes)
# If we don't have a credentials file for the required set of scopes, but we have a secrets file,
# then try and generate the credentials file from the stored secrets.
if (
not os.path.isfile(credentials_file)
and secrets_file
and os.path.isfile(secrets_file)
and (os.getenv("DISPLAY") or os.getenv("BROWSER"))
):
args = []
generate_credentials(secrets_file, scope, *args)
assert os.path.isfile(credentials_file), tw.dedent(
f"""
Credentials file {credentials_file} not found. Generate it through:
python -m platypush.plugins.google.credentials "{','.join(scopes)}" {
secrets_file or '/path/to/client_secret.json'
}
[--auth_host_name AUTH_HOST_NAME]
[--noauth_local_webserver]
[--auth_host_port [AUTH_HOST_PORT [AUTH_HOST_PORT ...]]]
[--logging_level [DEBUG,INFO,WARNING,ERROR,CRITICAL]]
Specify --noauth_local_webserver if you're running this script on a headless machine.
You will then get an authentication URL on the logs.
Otherwise, the URL will be opened in the available browser.
"""
)
store = Storage(credentials_file)
credentials = store.get()
if not credentials or credentials.invalid:
credentials.refresh(httplib2.Http())
return credentials
def generate_credentials(client_secret_path: str, scope: str, *args: str):
scopes = _parse_scopes(scope)
credentials_file = get_credentials_filename(*scopes)
store = Storage(credentials_file)
scope = ' '.join(
f'https://www.googleapis.com/auth/{scope}' for scope in _parse_scopes(scope)
)
flow = client.flow_from_clientsecrets(client_secret_path, scope)
flow.user_agent = "Platypush"
flow.access_type = "offline" # type: ignore
flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args(args) # type: ignore
tools.run_flow(flow, store, flags)
print("Storing credentials to", credentials_file)
print(
"\nIf this is not the working directory of your Platypush instance, \n"
"then move the generated credentials file to WORKDIR/credentials/google"
)
def main():
"""
Generates a Google API credentials file given client secret JSON and scopes.
Usage::
python -m platypush.plugins.google.credentials \
[spaces/comma-separated list of scopes] \
[client_secret.json location]
"""
args = sys.argv[1:]
scope = (
args.pop(0) if args else input("Space/comma separated list of OAuth scopes: ")
).strip()
if args:
client_secret_path = args.pop(0)
elif os.path.isfile(default_secrets_file):
client_secret_path = default_secrets_file
else:
client_secret_path = input("Google credentials JSON file location: ")
client_secret_path = os.path.abspath(os.path.expanduser(client_secret_path)).strip()
generate_credentials(client_secret_path, scope, *args)
if __name__ == "__main__":
main()
# vim:sw=4:ts=4:et: