def getRemoteFolderInfo(Resource_ID):
"""
Method for retrieving metadata about a file or folder on Google
Drive.
Args:
Resource_ID: string. Unique ID representing the resource
Returns:
FolderInfo: JSON Fromtted files resource.
"""
http = Authorize()
service = discovery.build('drive', 'v2', http=http)
logging.debug("DRIVEQUERY: METADATA: Requesting info for %s" % Resource_ID)
print(Resource_ID)
FolderInfo = service.files().get(fileId=Resource_ID[0]).execute()
logging.debug("DRIVEQUERY: METADATA: Recieved info for %s" % FolderInfo['title'])
return FolderInfo
python类build()的实例源码
def shortenUrl(self, URL):
"""
URL Shortener function that when combined with the uploading
script adds a new key:value to the JSON response with a much
more managable URL.
Args:
URL: string. URL parsed from JSON response
"""
http = Authorize()
service = discovery.build('urlshortener', 'v1', http=http)
url = service.url()
body = {
'longUrl': URL
}
response = url.insert(body=body).execute()
logging.debug("URLSHRINK: %s" % response)
short_url = response['id']
logging.debug("URLSHRINK: %s" % short_url)
return short_url
def shortenUrl(self, URL):
"""
URL Shortener function that when combined with the uploading
script adds a new key:value to the JSON response with a much
more managable URL.
Args:
URL: string. URL parsed from JSON response
"""
http = Authorize()
service = discovery.build('urlshortener', 'v1', http=http)
url = service.url()
body = {
'longUrl': URL
}
response = url.insert(body=body).execute()
logging.debug("URLSHRINK: %s" % response)
short_url = response['id']
logging.debug("URLSHRINK: %s" % short_url)
return short_url
def main():
credentials = get_credentials()
http = credentials.authorize(httplib2.Http())
service = discovery.build('calendar', 'v3', http=http)
now = datetime.datetime.utcnow().isoformat() + 'Z' # 'Z' indicates UTC time
print('Getting the events')
eventsResult = service.events().list(
calendarId='primary', timeMin=now, singleEvents=True,
orderBy='startTime', timeMax = "2016-11-20T22:32:00.285773Z").execute()
events = eventsResult.get('items', [])
if not events:
print('No upcoming events found.')
for event in events:
if ("Class of " in event["summary"] or "Lab of " in event["summary"]):
service.events().delete(calendarId='primary', eventId=event["id"]).execute()
print ("Deleted: ", event["summary"], event["start"])
print ("Deletion done!")
def main():
"""Shows basic usage of the Google Calendar API.
Creates a Google Calendar API service object and outputs a list of the next
10 events on the user's calendar.
"""
credentials = get_credentials()
http = credentials.authorize(httplib2.Http())
service = discovery.build('calendar', 'v3', http=http)
CALENDAR_NAME = 'csv-to-calendar'
calendarID = initCalendar(service, CALENDAR_NAME)
mat = getMatrixFromCSV('timetable.csv')
events = parseMatrixIntoEvents(mat)
for x in events:
uploadEvent(service, x, calendarID)
def get_google_api_client(module, service, user_agent_product, user_agent_version,
scopes=None, api_version='v1'):
"""
Get the discovery-based python client. Use when a cloud client is not available.
client = get_google_api_client(module, 'compute', user_agent_product=USER_AGENT_PRODUCT,
user_agent_version=USER_AGENT_VERSION)
:returns: A tuple containing the authorized client to the specified service and a
params dict {'service_account_email': '...', 'credentials_file': '...', 'project_id': ...}
:rtype: ``tuple``
"""
if not scopes:
scopes = GCP_DEFAULT_SCOPES
http_auth, conn_params = get_google_api_auth(module, scopes=scopes,
user_agent_product=user_agent_product,
user_agent_version=user_agent_version)
client = build(service, api_version, http=http_auth)
return (client, conn_params)
def Write(gs_path, data, suffix='.txt'):
"""Writes data to the cloud."""
credentials = GoogleCredentials.get_application_default()
service = discovery.build('storage', 'v1', credentials=credentials)
bucket_name, object_name = gs_path[5:].split('/', 1)
logging.info('Uploading file: %s/%s', bucket_name, object_name)
# Save the data off into a temp file.
tfile = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
tfile.write(data)
tfile.close()
# Upload the data.
logging.info('filename: %s %s %s', tfile.name, object_name, bucket_name)
req = service.objects().insert(media_body=tfile.name, name=object_name,
bucket=bucket_name)
_ = req.execute()
# Cleanup.
os.remove(tfile.name)
def google_search(key_word):
service = build("customsearch", "v1",
developerKey="??KEY")
res = service.cse().list(
q=key_word,
cx='????ID',
num=10, #Valid values are integers between 1 and 10, inclusive.
).execute()
for value in res:
#print(value)
if 'items' in value:
for results in res[value]:
#print(results)
print(results['formattedUrl'])
def build_service(self):
ok = False
logging.debug("Building %s service for %s (%s)" % (self.credential_type, self.api, self.version))
kwargs = {}
if self.credential_type == 'user':
if not self.http_auth:
self.get_http_auth()
kwargs['http'] = self.http_auth
ok = bool(self.http_auth)
else:
kwargs['credentials'] = self.credentials
ok = bool(self.credentials)
self.service = discovery.build(self.api, self.version, **kwargs)
if not ok:
logging.warning("Failed to build service for %s (%s) - Credential failure?" % (self.api, self.version))
return ok
def get_google_api_service(service_name, service_version, scopes):
'''Use Google Client API to get a Google API Service.
:param service_name: the name of requested service, e.g. 'sheets'
:param service_version: the version of requested service, e.g. 'v4'
:param scopes: the authentication requested.
:return: googleapiclient.discovery.Resource Object, tied to google
service API.
'''
try:
return discovery.build(
service_name,
service_version,
http=get_authorized_http_object(scopes)
)
except AttributeError: # config variables are missing
# todo: write as real exception
raise
def get_authenticated_service():
'''
Authorize the request and store authorization credentials.
'''
flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE,
scope=YOUTUBE_READ_WRITE_SSL_SCOPE,
message=MISSING_CLIENT_SECRETS_MESSAGE)
storage = Storage("%s-oauth2.json" % sys.argv[0])
credentials = storage.get()
if credentials is None or credentials.invalid:
credentials = run_flow(flow, storage)
# Trusted testers can download this discovery document from the
# developers page and it should be in the same directory with the code.
return build(API_SERVICE_NAME, API_VERSION,
http=credentials.authorize(httplib2.Http()))
def get_authenticated_service():
'''Authorize the request and store authorization credentials to YouTube'''
flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE,
scope=YOUTUBE_READ_WRITE_SSL_SCOPE,
message=MISSING_CLIENT_SECRETS_MESSAGE)
storage = Storage("%s-oauth2.json" % sys.argv[0])
credentials = storage.get()
if credentials is None or credentials.invalid:
credentials = run_flow(flow, storage)
# Trusted testers can download this discovery document from the
# developers page and it should be in the same directory with the code.
return build(API_SERVICE_NAME, API_VERSION,
http=credentials.authorize(httplib2.Http()))
def get_authenticated_service(self):
""" Create youtube oauth2 connection """
credentials = AccessTokenCredentials(
access_token=self.get_auth_code(),
user_agent='Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36'
)
return build(
'youtube', 'v3', http=credentials.authorize(httplib2.Http())
)
def connect(self ):
""" Establish connection """
config = self.configuration.get_config()
self.client = build(
config["apiServiceName"],
config["apiVersion"],
developerKey=config["developerKey"])
def create_pubsub_client(http=None):
credentials = oauth2client.GoogleCredentials.get_application_default()
if credentials.create_scoped_required():
credentials = credentials.create_scoped(PUBSUB_SCOPES)
if not http:
http = httplib2.Http()
credentials.authorize(http)
return discovery.build('pubsub', 'v1', http=http)
def _fetch_all_youtube_videos(self, playlistId):
"""
Fetches a playlist of videos from youtube
We splice the results together in no particular order
Parameters:
parm1 - (string) playlistId
Returns:
playListItem Dict
"""
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"
youtube = build(YOUTUBE_API_SERVICE_NAME,
YOUTUBE_API_VERSION,
developerKey=self.youtube_api_server_key)
res = youtube.playlistItems().list(
part="snippet",
playlistId=playlistId,
maxResults="50"
).execute()
nextPageToken = res.get('nextPageToken')
while ('nextPageToken' in res):
nextPage = youtube.playlistItems().list(
part="snippet",
playlistId=playlistId,
maxResults="50",
pageToken=nextPageToken
).execute()
res['items'] = res['items'] + nextPage['items']
if 'nextPageToken' not in nextPage:
res.pop('nextPageToken', None)
else:
nextPageToken = nextPage['nextPageToken']
return res
def __init__(self, config):
self._config = config
credentials = self._get_credentials(config)
self._http = credentials.authorize(httplib2.Http(cache=".cache"))
self._service = discovery.build('drive', 'v3', http=self._http)
self._appliances = get_appliances()