def _build_google_client(service, api_version, http_auth):
"""
Google build client helper.
:param service: service to build client for
:type service: ``str``
:param api_version: API version to use.
:type api_version: ``str``
:param http_auth: Initialized HTTP client to use.
:type http_auth: ``object``
:return: google-python-api client initialized to use 'service'
:rtype: ``object``
"""
client = build(service, api_version, http=http_auth)
return client
python类build()的实例源码
def main():
"""Shows basic usage of the Google Admin SDK Reports API.
Creates a Google Admin SDK Reports API service object and outputs a list of
last 10 login events.
"""
credentials = get_credentials()
http = credentials.authorize(httplib2.Http())
service = discovery.build('admin', 'reports_v1', http=http)
print('Getting the last 10 login events')
results = service.activities().list(userKey='all', applicationName='login',
maxResults=10).execute()
activities = results.get('items', [])
if not activities:
print('No logins found.')
else:
print('Logins:')
for activity in activities:
print('{0}: {1} ({2})'.format(activity['id']['time'],
activity['actor']['email'], activity['events'][0]['name']))
def execute(self):
"""
Returns GSuite events based on given app/activity.
Other parameters are optional.
"""
logging.debug("Authenticating to GSuite")
self.get_credentials()
self.http = self.credentials.authorize(httplib2.Http())
self.service = discovery.build('admin', 'reports_v1', http=self.http)
logging.debug("Retrieving %s events from: %s to %s", self.app, convert_time(self.s_time), convert_time(self.e_time))
self.results = self.service.activities().list(userKey=self.user,
applicationName=self.app,
startTime=self.s_time,
endTime=self.e_time,
maxResults=self.max).execute()
return self.results.get('items', [])
def main():
"""Shows basic usage of the Google Drive API.
Creates a Google Drive API service object and outputs the names and IDs
for up to 10 files.
"""
credentials = get_credentials()
http = credentials.authorize(httplib2.Http())
service = discovery.build('drive', 'v3', http=http)
results = service.files().list(
pageSize=10, fields="nextPageToken, files(id, name)").execute()
items = results.get('files', [])
if not items:
print('No files found.')
else:
print('Files:')
for item in items:
print('{0} ({1})'.format(item['name'], item['id']))
def send(To, Subject, Body, Cc=[], Bcc=[], html=False, files=[]):
"""Send an email
"""
subtype = 'html' if html else 'plain'
message = MIMEMultipart()
message['To'] = ', '.join(To)
message['Subject'] = Subject
message['Cc'] = ', '.join(Cc)
message['Bcc'] = ', '.join(Bcc)
message.attach(MIMEText(Body, subtype))
for f in files:
with open(f, "rb") as In:
part = MIMEApplication(In.read(), Name=basename(f))
part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f)
message.attach(part)
message = {'raw': base64.urlsafe_b64encode(message.as_string())}
credentials = oauth2client.file.Storage(CREDENTIALS_PATH).get()
Http = credentials.authorize(httplib2.Http())
service = discovery.build('gmail', 'v1', http=Http)
message = service.users().messages().send(userId='me', body=message).execute()
def main():
"""
Shows basic usage of the Google Calendar API.
Creates a Google Calendar API service object, logs the query from the arguments, and creates an event with Quick Add.
"""
credentials = get_credentials()
http = credentials.authorize(httplib2.Http())
service = discovery.build('calendar', 'v3', http=http)
if len(sys.argv) > 1:
with open("log.txt", "a") as logfile:
logfile.write(flags.query + "\n")
created_event = service.events().quickAdd(
calendarId='primary',
text=flags.query
).execute()
def image_urls_api_call(search_term, previously_captured_num, imgSize="medium"):
"""returns list of image urls (with faces) from google image search on search_term"""
try:
key = os.environ['GOOGLE_CUSTOM_SEARCH_KEY']
service = build("customsearch", "v1",
developerKey=key)
res = service.cse().list(
q=search_term,
cx='018267786259991380019:ja65luoszbg', # my custom search engine id.
searchType='image',
filter='1',
num=10,
imgSize=imgSize,
imgType="face",
fileType="jpg",
start = previously_captured_num
).execute()
links = [item['link'] for item in res['items']]
print('Added links for ' + search_term)
return(links)
except:
print('Failed to get links for ' + search_term)
return []
def authenticate():
global service
global http_auth
global calendar
try:
# read credentials
credentials = ServiceAccountCredentials.from_json_keyfile_name(CREDENTIAL_FILE_PATH, scopes)
# authorize and get the calendar service
http_auth = credentials.authorize(Http())
service = discovery.build('calendar', 'v3', http=http_auth)
calendar = service.calendars().get(calendarId=CALENDAR_ID).execute()
except:
logging.getLogger('BoilerLogger').error('failed to authenticate to google calendar service, will retry...')
init()
# get calendar events in a window sorted by start time
def initialize_analyticsreporting():
"""Initializes an analyticsreporting service object.
Returns: analytics an authorized analyticsreporting service
object.
"""
credentials = ServiceAccountCredentials.from_p12_keyfile(
settings.SERVICE_ACCOUNT_EMAIL, settings.KEY_FILE_LOCATION,
scopes=SCOPES
)
http = credentials.authorize(httplib2.Http())
# Build the service object.
analytics = build('analytics', 'v4', http=http,
discoveryServiceUrl=DISCOVERY_URI)
return analytics
def youtube_search(q, max_results):
youtube = build(settings.YOUTUBE_API_SERVICE_NAME, settings.YOUTUBE_API_VERSION, developerKey=settings.DEVELOPER_KEY)
search_response = youtube.search().list(q=q, part="id,snippet", maxResults=max_results).execute()
videos = []
for search_result in search_response.get("items", []):
if search_result["id"]["kind"] == "youtube#video":
video = {}
video['title'] = search_result["snippet"]['title']
video['description'] = search_result["snippet"]['description']
video['date'] = await youtube_date(search_result["snippet"]['publishedAt'])
video['channel_title'] = search_result["snippet"]['channelTitle']
video['channel_id'] = search_result["snippet"]['channelId']
video['video_id'] = search_result["id"]["videoId"]
videos.append(video)
return videos
def get_google_service(service_type=None,version=None):
'''
get_google service will use the requests library to get a url
:param service_type: the service to get (default is storage)
:param version: version to use (default is v1)
'''
if service_type == None:
service_type = "sheets"
if version == None:
version = "v4"
secrets=os.environ.get('GOOGLE_SHEETS_CREDENTIALS')
if secrets is not None:
return get_authenticated_service(secrets, service_type, version)
credentials = GoogleCredentials.get_application_default()
return discovery.build(service_type, version, credentials=credentials)
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
credentials = session.get('credentials', None)
if credentials is None:
return redirect(url_for('youtube.login'))
credentials = client.OAuth2Credentials.from_json(credentials)
if credentials.access_token_expired:
return redirect(url_for('youtube.login'))
http_auth = credentials.authorize(httplib2.Http())
g.youtube = build('youtube', 'v3', http=http_auth)
return f(*args, **kwargs)
return decorated_function
def build_service(self):
"""
Build service for connect to google fusiontables.
"""
logger.info('Build service for connect to google fusiontables server.')
try:
credentials = ServiceAccountCredentials.from_json_keyfile_name(
self.PATH_TO_KEY,
scopes=['https://www.googleapis.com/auth/fusiontables']
)
http_auth = credentials.authorize(Http())
except IOError:
raise IncorrectAccessFilePathException(
"Access key path '{0}' is incorrect.".format(
self.PATH_TO_KEY
)
)
self.SERVICE = build('fusiontables', 'v2', http=http_auth)
def initialize_analyticsreporting():
"""Initializes an analyticsreporting service object.
Returns:
analytics an authorized analyticsreporting service object.
"""
credentials = ServiceAccountCredentials.from_p12_keyfile(
SERVICE_ACCOUNT_EMAIL, KEY_FILE_LOCATION, scopes=SCOPES)
http = credentials.authorize(httplib2.Http())
# Build the service object.
analytics = build('analytics', 'v4', http=http, discoveryServiceUrl=DISCOVERY_URI)
return analytics
def auth():
gauth = GoogleAuth()
gauth.LoadCredentialsFile("letitrain-creds-gdrive.txt")
if gauth.credentials is None:
gauth.LocalWebserverAuth()
elif gauth.access_token_expired:
gauth.Refresh()
else:
gauth.Authorize()
gauth.SaveCredentialsFile("letitrain-creds-gdrive.txt")
httpauth = gauth.Get_Http_Object()
service = discovery.build('drive', 'v3', http=httpauth)
return gauth, httpauth, service
# Retrieves the information about every file
# Can either do deleted or regular files
def oauth2_drive_service():
"""
Used to get valid user credentials from storage that returns a
Google Drive service object with authorized API access.
Do NOT perform OAuth2 flow to obtain new credentials if nothing has
been stored, or if the stored credentials are invalid. Use another
script named ct_gdrive_oauth2.py for that.
"""
# Get credentials from storage
creds_path = os.path.join(args.creds_dir, OAUTH2_STORAGE_CREDS_FILENAME)
credentials = oauth2client.file.Storage(creds_path).get()
if not credentials or credentials.invalid:
raise Exception('Unauthorized Access!')
# Authorize http requests
http = credentials.authorize(httplib2.Http())
# Return an authorized Drive APIv3 service object
return discovery.build('drive', 'v3', http=http)
def authcheck():
scope='https://www.googleapis.com/auth/userinfo.email'
try:
credentials = GoogleCredentials.get_application_default()
if credentials.create_scoped_required():
credentials = credentials.create_scoped(scope)
except ApplicationDefaultCredentialsError:
return "Unable to acquire application default credentials"
http = httplib2.Http()
credentials.authorize(http)
service = build(serviceName='oauth2', version= 'v2',http=http)
resp = service.userinfo().get().execute()
return resp['email']
def main():
credentials = get_credentials()
http = credentials.authorize(httplib2.Http())
service = discovery.build('calendar', 'v3', http=http)
now = datetime.utcnow().isoformat() + 'Z' # 'Z' indicates UTC time
print('???5?????????')
eventsResult = service.events().list(
calendarId='primary', timeMin=now, maxResults=5, singleEvents=True,
orderBy='startTime').execute()
events = eventsResult.get('items', [])
if not events:
print('No upcoming events found.')
for event in events:
start = event['start'].get('dateTime', event['start'].get('date'))
print(start, event['summary'])
def main():
with open(flags.config) as input:
config = json.load(input)
if not config.get('disable_collection', False):
logger.info('Sending version information to stitchdata.com. ' +
'To disable sending anonymous usage data, set ' +
'the config parameter "disable_collection" to true')
threading.Thread(target=collect).start()
credentials = get_credentials()
http = credentials.authorize(httplib2.Http())
discoveryUrl = ('https://sheets.googleapis.com/$discovery/rest?'
'version=v4')
service = discovery.build('sheets', 'v4', http=http,
discoveryServiceUrl=discoveryUrl)
spreadsheet = get_spreadsheet(service, config['spreadsheet_id'])
input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
state = None
state = persist_lines(service, spreadsheet, input)
emit_state(state)
logger.debug("Exiting normally")
def get_service(api_name, api_version, scope, key_file_location,
service_account_email):
"""Get a service that communicates to a Google API.
Args:
api_name: The name of the api to connect to.
api_version: The api version to connect to.
scope: A list auth scopes to authorize for the application.
key_file_location: The path to a valid service account p12 key file.
service_account_email: The service account email address.
Returns:
A service that is connected to the specified API.
"""
credentials = ServiceAccountCredentials.from_p12_keyfile(
service_account_email, key_file_location, scopes=scope)
http = credentials.authorize(httplib2.Http())
# Build the Google API service object.
service = build(api_name, api_version, http=http)
return service
def initYoutube():
flow = flow_from_clientsecrets(constants.YOUTUBE_CLIENT_SECRETS_FILE,\
message=constants.MISSING_CLIENT_SECRETS_MESSAGE,\
scope=constants.YOUTUBE_READ_WRITE_SCOPE,
redirect_uri='http://localhost:8080/')
storage = Storage("%s-oauth2.json" % sys.argv[0])
credentials = storage.get()
if credentials is None or credentials.invalid:
flags = argparser.parse_args()
credentials = run_flow(flow, storage, flags)
else:
print("Found valid oauth2 json")
youtube = build(constants.YOUTUBE_API_SERVICE_NAME, constants.YOUTUBE_API_VERSION,
http=credentials.authorize(httplib2.Http()))
print("Done authentication with Youtube")
return youtube
def __init__(self, config, logger):
"""
Constructor
:param config: Configuration dict
:param logger: Python logger
"""
# Suppress cache warnings from gogogle api lib
logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR)
self._client_secret_file = os.path.join(config['credentials_dir'],
config['client_secret_file_name'])
self._credentials_file = os.path.join(config['credentials_dir'],
config['credentials_file_name'])
self._logger = logger
self._config = config
self._credentials = self._get_credentials()
# Bootstrap the Gmail client service
http = self._credentials.authorize(httplib2.Http())
self._service = discovery.build('gmail', 'v1', http=http)
def main():
"""Shows basic usage of the Google Calendar API.
Creates a Google Calendar API service object and outputs a list of the next
10 events on the user's calendar.
"""
rValue = ""
credentials = get_credentials()
http = credentials.authorize(httplib2.Http())
service = discovery.build('calendar', 'v3', http=http)
now = datetime.datetime.utcnow().isoformat() + 'Z' # 'Z' indicates UTC time
rValue = rValue + 'Getting the upcoming 10 events\n'
eventsResult = service.events().list(
calendarId='primary', timeMin=now, maxResults=10, singleEvents=True,
orderBy='startTime').execute()
events = eventsResult.get('items', [])
if not events:
rValue = rValue + 'No upcoming events found.\n'
for event in events:
start = event['start'].get('dateTime', event['start'].get('date'))
rValue = rValue + start + " " + event['summary'] + '\n'
return rValue
def get_youtube_vid_id(self, query, max_results=10):
"""
Makes a request to youtube API with a search query and returns the
corresponding video's id.
:param query: search query of type string to be used for
searching youtube.
:param max_results: The maximum results returned by searching youtube
:returns: The movie id of the first video came up in the youtube search
"""
youtube = build(Connection.YOUTUBE_API_SERVICE_NAME,
Connection.YOUTUBE_API_VERSION,
developerKey=self.keys['google'])
search_response = youtube.search().list(
q=query,
part="id,snippet",
maxResults=max_results
).execute()
for search_result in search_response.get("items", []):
if search_result["id"]["kind"] == "youtube#video":
return search_result["id"]["videoId"]
else:
return None
def get_training_features(potential_event, fb_event, fb_event_attending):
if 'owner' in fb_event['info']:
owner_name = 'id%s' % fb_event['info']['owner']['id']
else:
owner_name = ''
location = event_locations.get_address_for_fb_event(fb_event).encode('utf-8')
def strip_text(s):
return strip_punctuation(s.encode('utf8')).lower()
name = strip_text(fb_event['info'].get('name', ''))
description = strip_text(fb_event['info'].get('description', ''))
attendee_list = ' '.join(['id%s' % x['id'] for x in fb_event_attending['attending']['data']])
source_list = ' '.join('id%s' % x.id for x in potential_event.source_ids_only())
#TODO(lambert): maybe include number-of-keywords and keyword-density?
#TODO(lambert): someday write this as a proper mapreduce that reduces across languages and builds a classifier model per language?
# for now we can just grep and build sub-models per-language on my client machine.
return (attendee_list,)
return (potential_event.language, owner_name, location, name, description, attendee_list, source_list)
def __init__(self, wf):
"""Construct a new GoogleInterface. Checks Auth status and if unauthorized will prompt for authorization"""
self.HTTP_INSTANCE = httplib2.Http(**get_http_kw_args(wf))
self.log = wf.logger
self.wf = wf
self.CLIENT_SECRET_FILE = 'client_secret.json'
self.APPLICATION_NAME = 'Alfred Today'
self.SCOPES = 'https://www.googleapis.com/auth/calendar.readonly'
credentials = self._get_credentials()
if not credentials or credentials.invalid:
self._authorize_google()
http = credentials.authorize(self.HTTP_INSTANCE)
self.service = discovery.build('calendar', 'v3', http=http)
def __init__(self):
try:
self._flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args()
# Prevent a browser from opening
self._flags.noauth_local_webserver = True
except ImportError:
self._flags = None
## Init logging
self._logger = self.create_logger(logging_filename=LOGGING_FILE)
logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR)
## Get credentials, build calendar service object
mkdir_p(CREDENTIALS_DIR)
credentials = self.get_credentials(CLIENT_SECRET_FILE, SCOPES, CREDENTIALS_DIR, CREDENTIALS_FILENAME)
self._service = self.get_service(credentials)
self.naive_find_event_overlap()
def main():
"""Shows basic usage of the Google Calendar API.
Creates a Google Calendar API service object and outputs a list of the next
10 events on the user's calendar.
"""
credentials = get_credentials()
http = credentials.authorize(httplib2.Http())
service = discovery.build('calendar', 'v3', http=http)
now = datetime.datetime.utcnow().isoformat() + 'Z' # 'Z' indicates UTC time
print('Getting the upcoming 10 events')
eventsResult = service.events().list(
calendarId='primary', timeMin=now, maxResults=10, singleEvents=True,
orderBy='startTime').execute()
events = eventsResult.get('items', [])
if not events:
print('No upcoming events found.')
for event in events:
start = event['start'].get('dateTime', event['start'].get('date'))
print(start, event['summary'])
def main():
"""Shows basic usage of the Google Calendar API.
Creates a Google Calendar API service object and outputs a list of the next
10 events on the user's calendar.
"""
credentials = get_credentials()
http = credentials.authorize(httplib2.Http())
service = discovery.build('calendar', 'v3', http=http)
now = datetime.datetime.utcnow().isoformat() + 'Z' # 'Z' indicates UTC time
print('Getting the upcoming 10 events')
eventsResult = service.events().list(
calendarId='primary', timeMin=now, maxResults=10, singleEvents=True,
orderBy='startTime').execute()
events = eventsResult.get('items', [])
if not events:
print('No upcoming events found.')
for event in events:
start = event['start'].get('dateTime', event['start'].get('date'))
print(start, event['summary'])
def config_g_tasks(user, http_auth):
service = discovery.build('tasks', 'v1', http=http_auth)
options = []
results = service.tasklists().list(
maxResults=10).execute()
logging.debug(results)
if results:
options = [{"value": r.get('id'), "label": r.get('title')}
for r in results.get('items', [])]
return {
"input": "select",
"multi": False,
"prop": "taskList",
"instructions": "Choose a task list",
"options": options
}
# Fetch classes