diff --git a/README.md b/README.md index a802f98..8eae334 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,30 @@ ------------------------------------------ ### Usage +#### Login +To utilize drive-cli, you will need to create and download your credentials from the +[Google Developer Console](https://console.developers.google.com/apis/credentials). +1. Click "+ Create Credentials" in the top bar +2. Select "OAuth client ID" +3. Select the "Other" application type, and name this something memorable (like "drive-cli") +4. You can then click the ⬇️ icon to download the OAuth JSON file. Remember where you save this. +5. You can now use the drive-cli to login. +```shell script +$ drive login --json-file ~/Downloads/client_secret.json + +********************** welcome to ********************** + ____ _ ________ ____ + / __ \_____(_) _____ / ____/ / / _/ + / / / / ___/ / | / / _ \ ______ / / / / / / + / /_/ / / / /| |/ / __/ /_____/ / /___/ /____/ / +/_____/_/ /_/ |___/\___/ \____/_____/___/ + + +******************************************************** +``` +Once logged in, you are able to use any of the calls. You must log in before you can do anything else. + + #### Clone Download a file or folder present in drive using its file id or its sharing link. In case it is a folder it gets tracked. diff --git a/drive_cli/actions.py b/drive_cli/actions.py index 65055b1..58e632c 100644 --- a/drive_cli/actions.py +++ b/drive_cli/actions.py @@ -3,17 +3,14 @@ import sys import click from pick import Picker -from httplib2 import Http -from oauth2client import file from prettytable import PrettyTable -from googleapiclient.discovery import build from prettytable import MSWORD_FRIENDLY from mimetypes import MimeTypes - dirpath = os.path.dirname(os.path.realpath(__file__)) sys.path.append(dirpath) mime = MimeTypes() +drive = utils.Drive() @click.command('view-files', short_help='filter search files and file ID for files user has access to') @@ -26,10 +23,6 @@ def view_file(name, types, pid): """ cwd = os.getcwd() flags = {"--name": [None], "--types": [None], "--pid": [None]} - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - service = build('drive', 'v3', http=creds.authorize(Http())) page_token = None query = "" if name: @@ -49,7 +42,7 @@ def view_file(name, types, pid): "gif", "bmp", "txt", - "docx", + "doc", "js", "swf", "mp3", @@ -98,16 +91,16 @@ def view_file(name, types, pid): if pid: parent = click.prompt('enter the fid of parent or sharing link') flags["--pid"] = [parent] - fid = utils.get_fid(parent) + fid = drive.get_fid(parent) if (name != False) or (types != False): query += " and " query += "'" + fid + "' in parents" i = 1 while True: - response = service.files().list(q=query, - spaces='drive', - fields='nextPageToken, files(id, name,mimeType,modifiedTime)', - pageToken=page_token).execute() + response = drive.service.files().list(q=query, + spaces='drive', + fields='nextPageToken, files(id, name,mimeType,modifiedTime)', + pageToken=page_token).execute() templist = [response.get('files', [])[j:j + 25] for j in range(0, len( response.get('files', [])), 25)] @@ -123,50 +116,52 @@ def view_file(name, types, pid): page_token = response.get('nextPageToken', None) if page_token is None: break - utils.save_history([flags, "", cwd]) + drive.save_history([flags, "", cwd]) -@click.command('clone', short_help='download any file using sharing link or file ID it will be automatically tracked henceforth') +@click.command('clone', + short_help='download any file using sharing link or file ID it will be automatically tracked henceforth') @click.argument('payload') def download(payload): ''' clone: download a file/folder using either the sharing link or using the file ID for the file ''' cwd = os.getcwd() - utils.save_history([{}, payload, cwd]) + drive.save_history([{}, payload, cwd]) if payload != None: - fid = utils.get_fid(payload) + fid = drive.get_fid(payload) else: click.secho("argument error", fg='red') with click.Context(download) as ctx: click.echo(download.get_help(ctx)) sys.exit(0) - clone = utils.get_file(fid) + clone = drive.get_file(fid) click.secho("cloning into '" + clone['name'] + "' .....", fg='magenta') if clone['mimeType'] == 'application/vnd.google-apps.folder': new_dir = os.path.join(cwd, clone['name']) - utils.create_new(new_dir, fid) - utils.pull_content(new_dir, fid) + drive.create_new(new_dir, fid) + drive.pull_content(new_dir, fid) else: - utils.file_download(clone, cwd) + drive.file_download(clone, cwd) click.secho("cloning of " + clone['name'] + ' completed', fg='green') @click.command('add_remote', short_help='upload any existing file to drive') @click.option('--file', help='specify the partcular file to uploaded else entire directory is uploaded') -@click.option('--pid', help='specify particular folder id/sharing_link of the folder under which remote must must be added') +@click.option('--pid', + help='specify particular folder id/sharing_link of the folder under which remote must must be added') def create_remote(file, pid): """ add_remote: create remote equivalent for existing file/folder in local device """ cwd = os.getcwd() - utils.save_history([{"--file": [file], "--pid":[pid]}, "", cwd]) + drive.save_history([{"--file": [file], "--pid": [pid]}, "", cwd]) if pid == None: pid = 'root' if file != None: file_path = os.path.join(cwd, file) if os.path.isfile(file_path): - utils.upload_file(file, file_path, pid) + drive.upload_file(file, file_path, pid) else: click.secho("No such file exist: " + file_path, fg="red") with click.Context(create_remote) as ctx: @@ -174,10 +169,10 @@ def create_remote(file, pid): else: sep = os.sep dir_cd, name = sep.join(cwd.split(sep)[:-1]), cwd.split(sep)[-1] - child_cwd, child_id = utils.create_dir(dir_cd, pid, name) - utils.push_content(child_cwd, child_id) + child_cwd, child_id = drive.create_dir(dir_cd, pid, name) + drive.push_content(child_cwd, child_id) if pid != None: - parent_file = utils.get_file(pid) + parent_file = drive.get_file(pid) parent_name = parent_file['name'] click.secho("content added under directory " + parent_name, fg='magenta') @@ -185,33 +180,35 @@ def create_remote(file, pid): @click.command('rm', short_help='delete a particular file in drive') @click.option('--file', help='specify the partcular file to deleted else entire directory is deleted') -@click.option('--id', help='delete untracked file directly using id or sharing link, can be used even for unlinked files') +@click.option('--id', + help='delete untracked file directly using id or sharing link, can be used even for unlinked files') def delete(file, id): ''' rm: delete a particular file/folder from the directory in the remote drive ''' cwd = os.getcwd() - utils.save_history([{"--file": [file], "--id":[id]}, "", cwd]) + drive.save_history([{"--file": [file], "--id": [id]}, "", cwd]) if id == None: if file != None: file_path = os.path.join(cwd, file) if os.path.isfile(file_path): - local_dir = utils.get_child(cwd) + local_dir = drive.get_child(cwd) fid = local_dir[file] + drive.delete_file(fid) else: click.secho("No such file exist: " + file_path, fg="red") with click.Context(delete) as ctx: click.echo(delete.get_help(ctx)) cwd = file_path else: - data = utils.drive_data() + data = drive.drive_data() fid = data[cwd] data.pop(cwd, None) - utils.drive_data(data) - utils.delete_file(fid) + drive.drive_data(data) + drive.delete_file(fid) else: - fid = utils.get_fid(id) - utils.delete_file(fid) + fid = drive.get_fid(id) + drive.delete_file(fid) @click.command('ls', short_help='list out all the files present in this directory in the drive for tracked directories') @@ -220,12 +217,8 @@ def list_out(): ls: Print files belonging to a folder in the drive folder of the current directory """ cwd = os.getcwd() - utils.save_history([{}, "", cwd]) - data = utils.drive_data() - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - service = build('drive', 'v3', http=creds.authorize(Http())) + drive.save_history([{}, "", cwd]) + data = drive.drive_data() page_token = None if cwd not in data.keys(): click.secho( @@ -235,11 +228,11 @@ def list_out(): click.secho('listing down files in drive ....', fg='magenta') t = PrettyTable(['Name', 'File ID', 'Type']) while True: - children = service.files().list(q=query, - spaces='drive', - fields='nextPageToken, files(id,mimeType,name)', - pageToken=page_token - ).execute() + children = drive.service.files().list(q=query, + spaces='drive', + fields='nextPageToken, files(id,mimeType,name)', + pageToken=page_token + ).execute() for child in children.get('files', []): t.add_row([child.get('name')[:25], child.get( 'id'), child.get('mimeType')]) @@ -253,14 +246,9 @@ def list_out(): @click.argument('link') def view(link): cwd = os.getcwd() - utils.save_history([{}, link, cwd]) - fid = utils.get_fid(link) - try: - utils.concat(fid) - except: - error_message = str(sys.exc_info()[1]) - click.secho(error_message, fg='red') - + drive.save_history([{}, link, cwd]) + fid = drive.get_fid(link) + drive.concat(fid) @click.command('status', short_help='list changes committed since last sync') @@ -269,30 +257,30 @@ def status(): status: get a change log of files changed since you had the last sync(push/pull/clone) ''' cwd = os.getcwd() - utils.save_history([{}, "", cwd]) - data = utils.drive_data() + drive.save_history([{}, "", cwd]) + data = drive.drive_data() if cwd not in data.keys(): click.secho( "following directory has not been tracked: \nuse drive add_remote or drive clone ", fg='red') sys.exit(0) sync_time = data[cwd]['time'] - utils.list_status(cwd, sync_time) + drive.list_status(cwd, sync_time) @click.command('pull', short_help='get latest updates from online drive of the file') def pull(): cwd = os.getcwd() - utils.save_history([{}, "", cwd]) - data = utils.drive_data() + drive.save_history([{}, "", cwd]) + data = drive.drive_data() if cwd not in data.keys(): click.secho( "following directory has not been tracked: \nuse drive add_remote or drive clone ", fg='red') sys.exit(0) fid = data[cwd]['id'] - current_root = utils.get_file(fid) + current_root = drive.get_file(fid) click.secho("checking for changes in '" + current_root['name'] + "' ....", fg='magenta') - utils.pull_content(cwd, fid) + drive.pull_content(cwd, fid) click.secho(current_root['name'] + " is up to date with drive", fg='yellow') @@ -303,17 +291,17 @@ def push(): push the latest changes from your local folder that has been added/cloned to google drive. ''' cwd = os.getcwd() - utils.save_history([{}, "", cwd]) - data = utils.drive_data() + drive.save_history([{}, "", cwd]) + data = drive.drive_data() if cwd not in data.keys(): click.secho( "following directory has not been tracked: \nuse drive add_remote or drive clone ", fg='red') sys.exit(0) fid = data[cwd]['id'] - current_root = utils.get_file(fid) + current_root = drive.get_file(fid) click.secho("checking for changes in '" + current_root['name'] + "' ....", fg='magenta') - utils.push_content(cwd, fid) + drive.push_content(cwd, fid) click.secho("Working directory is clean", fg="green") @@ -346,13 +334,9 @@ def share(fid, role, type, message): cwd = os.getcwd() flags = {"--role": [role], "--type": [type], "--message": [message]} click.secho("updating share setting.....", fg='magenta') - file_id = utils.get_fid(fid) - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - service = build('drive', 'v3', http=creds.authorize(Http())) - if(type == "anyone"): - if(role == "owner"): + file_id = drive.get_fid(fid) + if (type == "anyone"): + if (role == "owner"): transfer_ownership = True else: transfer_ownership = False @@ -362,11 +346,11 @@ def share(fid, role, type, message): "allowFileDiscovery": True } try: - response = service.permissions().create(body=request, - fileId=file_id, - transferOwnership=transfer_ownership, - fields='id').execute() - if(list(response.keys())[0] == "error"): + response = drive.service.permissions().create(body=request, + fileId=file_id, + transferOwnership=transfer_ownership, + fields='id').execute() + if (list(response.keys())[0] == "error"): click.secho(response["error"]["message"], fg='red') else: share_link = "https://drive.google.com/open?id=" + file_id @@ -377,39 +361,36 @@ def share(fid, role, type, message): click.secho(error_message, fg='red') else: - if(type == "user"): - email_id = click.prompt("Enter email address of user ") - email_id = email_id.split(" ") + if (type == "user"): + email_id = click.prompt("Enetr email address of user ") else: - email_id = click.prompt("Enter email address of a google group ") - email_id = email_id.split(" ") + email_id = click.prompt("Enetr email address of a google group ") flags["Email ID"] = email_id - if(role == "owner"): + if (role == "owner"): transfer_ownership = True else: transfer_ownership = False - for email in email_id: - request = { - "role": role, - "type": type, - "emailAddress": email - } - try: - response = service.permissions().create(body=request, - fileId=file_id, - emailMessage=message, - sendNotificationEmail=True, - transferOwnership=transfer_ownership, - fields='id').execute() - if(list(response.keys())[0] == "error"): - click.secho(response["error"]["message"], fg='red') - else: - click.secho("successfully shared to " + email, fg='green') - except: - error_message = str(sys.exc_info()[1]) - error_message = error_message.split('\"')[1] - click.secho(error_message, fg='red') - utils.save_history([flags, fid, cwd]) + request = { + "role": role, + "type": type, + "emailAddress": email_id + } + try: + response = drive.service.permissions().create(body=request, + fileId=file_id, + emailMessage=message, + sendNotificationEmail=True, + transferOwnership=transfer_ownership, + fields='id').execute() + if (list(response.keys())[0] == "error"): + click.secho(response["error"]["message"], fg='red') + else: + click.secho("successfully share", fg='green') + except: + error_message = str(sys.exc_info()[1]) + error_message = error_message.split('\"')[1] + click.secho(error_message, fg='red') + drive.save_history([flags, fid, cwd]) @click.command('history', short_help="view history") @@ -419,14 +400,14 @@ def history(date, clear): if clear: click.confirm('Do you want to continue?', abort=True) click.secho("deleting.....", fg='magenta') - utils.clear_history() + drive.clear_history() click.secho("successfully deleted", fg='green') cwd = os.getcwd() - utils.save_history([{"--date": [date], "--clear":["True"]}, "", cwd]) + drive.save_history([{"--date": [date], "--clear": ["True"]}, "", cwd]) else: cwd = os.getcwd() - utils.save_history([{"--date": [date], "--clear":[None]}, "", cwd]) - History = utils.get_history() + drive.save_history([{"--date": [date], "--clear": [None]}, "", cwd]) + History = drive.get_history() if date != None: if date in History: history = History[date] @@ -434,15 +415,15 @@ def history(date, clear): click.secho(date + " " + i, fg='yellow', bold=True) click.secho("working directory : " + history[i]["cwd"], bold=True) click.secho("command : " + history[i]["command"]) - if(history[i]["arg"] != ""): + if (history[i]["arg"] != ""): click.secho("argument : " + history[i]["arg"]) - if(len(history[i]["flags"]) != 0): + if (len(history[i]["flags"]) != 0): flag_val = "" for j in history[i]["flags"]: - if(history[i]["flags"][j][0] != None): + if (history[i]["flags"][j][0] != None): val = ", ".join(history[i]["flags"][j]) flag_val = flag_val + "\t" + j + " : " + val + "\n" - if(flag_val != ""): + if (flag_val != ""): click.secho("flags : ", bold=True) click.secho(flag_val) click.secho("\n") @@ -458,41 +439,39 @@ def history(date, clear): click.secho(date + " " + i, fg='yellow', bold=True) click.secho("working directory : " + history[i]["cwd"], bold=True) click.secho("command : " + history[i]["command"]) - if(history[i]["arg"] != ""): + if (history[i]["arg"] != ""): click.secho("argument : " + history[i]["arg"]) - if(len(history[i]["flags"]) != 0): + if (len(history[i]["flags"]) != 0): flag_val = "" for j in history[i]["flags"]: - if(history[i]["flags"][j][0] != None): + if (history[i]["flags"][j][0] != None): val = ", ".join(history[i]["flags"][j]) flag_val = flag_val + "\t" + j + " : " + val + "\n" - if(flag_val != ""): + if (flag_val != ""): click.secho("flags : ", bold=True) click.secho(flag_val) click.secho("\n") -@click.command('log', short_help="It allows users to see who made edits and to revert to earlier versions of the same file.") +@click.command('log', + short_help="It allows users to see who made edits and to revert to earlier versions of the same file.") @click.argument('fid') @click.option('--get', type=str, help="provide revision id to get more info ") @click.option('--delete', type=str, help="delete a particular revision") -@click.option('--save', type=str, help="To keep revision forever, even if it is no longer the head revision. If not set, the revision will be automatically purged 30 days after newer content is uploaded. ") +@click.option('--save', type=str, + help="To keep revision forever, even if it is no longer the head revision. If not set, the revision will be automatically purged 30 days after newer content is uploaded. ") def get_revision(fid, get, delete, save): ''' It allows users to see who made edits and to revert to earlier versions of the same file. ''' cwd = os.getcwd() flags = {"--get": [get], "--delete": [delete], "--save": [save]} - utils.save_history([flags, fid, cwd]) - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - if(get != None): + drive.save_history([flags, fid, cwd]) + if (get != None): click.secho("fetching....", fg='magenta') - service = build('drive', 'v2', http=creds.authorize(Http())) - file_id = utils.get_fid(fid) - response = service.revisions().get(fileId=file_id, - revisionId=get).execute() + file_id = drive.get_fid(fid) + response = drive.service.revisions().get(fileId=file_id, + revisionId=get).execute() modified_time = response["modifiedDate"].split("T") user = response["lastModifyingUser"] click.secho(click.style("File : ", fg='yellow', bold=True) + @@ -507,13 +486,13 @@ def get_revision(fid, get, delete, save): bold=True) + response["fileSize"] + "bytes") click.secho(click.style("eTag : ", fg='yellow', bold=True) + response["etag"]) - if(response["published"]): + if (response["published"]): click.secho(click.style("Published : ", fg='yellow', bold=True) + "Yes") else: click.secho(click.style("Published : ", fg='yellow', bold=True) + "No") - if(response["pinned"]): + if (response["pinned"]): click.secho(click.style( "Pinned : ", fg='yellow', bold=True) + "Yes") else: @@ -522,30 +501,27 @@ def get_revision(fid, get, delete, save): click.secho(click.style("Permission Id : ", fg='yellow', bold=True) + user["permissionId"]) - if(delete != None): + if (delete != None): click.secho("deleting.....", fg='magenta') - service = build('drive', 'v3', http=creds.authorize(Http())) - file_id = utils.get_fid(fid) - response = service.revisions().delete(fileId=file_id, - revisionId=delete).execute() + file_id = drive.get_fid(fid) + response = drive.service.revisions().delete(fileId=file_id, + revisionId=delete).execute() click.secho("revision" + delete + "successfully deleted", fg='green') - if(save != None): + if (save != None): click.secho("saving " + save + " revision premanently....", fg='magenta') - service = build('drive', 'v3', http=creds.authorize(Http())) - file_id = utils.get_fid(fid) - response = service.revisions().update(body={"keepForever": True}, - fileId=file_id, - revisionId=save).execute() + file_id = drive.get_fid(fid) + response = drive.service.revisions().update(body={"keepForever": True}, + fileId=file_id, + revisionId=save).execute() click.secho("svaed successfully", fg='green') - if(delete == None and get == None and save == None): - file_id = utils.get_fid(fid) - file_name = utils.get_file(fid)["name"] + if (delete == None and get == None and save == None): + file_id = drive.get_fid(fid) + file_name = drive.get_file(fid)["name"] click.secho("fetching revision detail of " + file_name + ".....", fg='magenta') - service = build('drive', 'v3', http=creds.authorize(Http())) - response = service.revisions().list(fileId=file_id).execute() + response = drive.service.revisions().list(fileId=file_id).execute() revisions = response["revisions"] for r in reversed(revisions): modified_time = r["modifiedTime"].split("T") @@ -559,15 +535,11 @@ def get_revision(fid, get, delete, save): def file_info(fid): click.secho("fetching....", fg='magenta') cwd = os.getcwd() - utils.save_history([{}, fid, cwd]) - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - service = build('drive', 'v2', http=creds.authorize(Http())) - file_id = utils.get_fid(fid) + drive.save_history([{}, fid, cwd]) + file_id = drive.get_fid(fid) t = PrettyTable(["Genreal Info", "", " "]) t.align = "l" - f = service.files().get(fileId=file_id).execute() + f = drive.service.files().get(fileId=file_id).execute() t.add_row(["", "Name", f["title"]]) t.add_row(["", "ID", f["id"]]) t.add_row(["", "Mime Type", f["mimeType"]]) @@ -579,18 +551,18 @@ def file_info(fid): t.add_row(["", "created date", date + " " + time]) t.add_row(["", "can edit", str(f["capabilities"]["canEdit"]) + "\n"]) try: - parents = service.parents().list(fileId=file_id).execute() + parents = drive.service.parents().list(fileId=file_id).execute() if len(parents["items"]) != 0: t.add_row(["Parent Info", "", " "]) for parent in parents["items"]: - parent_name = utils.get_file(parent["id"])["name"] + parent_name = drive.get_file(parent["id"])["name"] t.add_row(["", "Name", parent_name]) t.add_row(["", "ID", parent["id"]]) t.add_row(["", "Link", parent["parentLink"] + "\n"]) except: pass try: - permissions = service.permissions().list(fileId=file_id).execute() + permissions = drive.service.permissions().list(fileId=file_id).execute() t.add_row(["Permissions", "", " "]) per_num = 0 for permission in permissions["items"]: @@ -608,7 +580,7 @@ def file_info(fid): except: pass try: - revisions = service.revisions().list(fileId=file_id).execute() + revisions = drive.service.revisions().list(fileId=file_id).execute() t.add_row(["Revision", "", " "]) rev_num = 0 for rev in revisions["items"]: @@ -633,7 +605,7 @@ def file_info(fid): def drive_ignore(unttrack_file, l): cwd = os.getcwd() drive_ignore_path = os.path.join(cwd, '.driveignore') - if(len(unttrack_file) != 0): + if (len(unttrack_file) != 0): try: file = open(drive_ignore_path, 'r') files = file.readlines() @@ -653,7 +625,7 @@ def drive_ignore(unttrack_file, l): if l: click.secho("listing untracked files....", fg="magenta") - utils.save_history([{"-l": ["True"]}, " ", cwd]) + drive.save_history([{"-l": ["True"]}, " ", cwd]) if os.path.isfile(drive_ignore_path): file = open(drive_ignore_path, 'r') untracked_files = file.read() @@ -663,4 +635,4 @@ def drive_ignore(unttrack_file, l): click.secho(".driveignore file doesn't exist in " + cwd, fg="red") sys.exit(0) else: - utils.save_history([{"-l": [None]}, " ", cwd]) + drive.save_history([{"-l": [None]}, " ", cwd]) diff --git a/drive_cli/auth.py b/drive_cli/auth.py index a2726ed..9c3b9dd 100644 --- a/drive_cli/auth.py +++ b/drive_cli/auth.py @@ -3,35 +3,59 @@ import click import pyfiglet import requests +import sys from oauth2client import file, client, tools +from pathlib import Path SCOPES = 'https://www.googleapis.com/auth/drive' dirpath = os.path.dirname(os.path.realpath(__file__)) +config_path = Path.home().joinpath(".config", "drive-cli") - -def login(remote): - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() +@click.command("login", + help='Log in to your google account and authenticate the service. See README for more detailed instructions ' + 'regarding authorization.') +@click.option("--remote", + is_flag=True, + default=False, + help="Remote login in case browser is on a different machine") +@click.option("--json-file", + type=click.Path(exists=True), + default=None, + help="Specify the location of your oauth token json file.") +def login(remote=False, json_file=None): + flags = tools.argparser.parse_args(args=[]) + if remote: + flags.noauth_local_webserver = True + click.secho("Running without local webserver auth.") + if json_file: + click.secho("Using Oauth JSON file {}".format(json_file)) + token = config_path.joinpath('token.json') + store = file.Storage(str(token)) + creds = store.get() if token.is_file() else None if not creds or creds.invalid: - client_id = os.path.join(dirpath, 'oauth.json') - flow = client.flow_from_clientsecrets(client_id, SCOPES) - flags = tools.argparser.parse_args(args=[]) - if remote: - flags.noauth_local_webserver = True - creds = tools.run_flow(flow, store, flags) - click.secho( - "********************** welcome to **********************", bold=True, fg='red') - result = pyfiglet.figlet_format("Drive - CLI", font="slant") - click.secho(result, fg='yellow') - click.secho( - "********************************************************", bold=True, fg='red') + client_id = json_file or config_path.joinpath('oauth.json') + if client_id.is_file(): + store = file.Storage(str(token)) + flow = client.flow_from_clientsecrets(str(client_id), SCOPES) + creds = tools.run_flow(flow, store, flags) + store.put(creds) + else: + click.secho("Unable to find your oauth json file. Please re-run, specifying the JSON file with" + "'drive login --json-file /path/to/your/file.json'", fg="red") + sys.exit(1) + + click.secho( + "********************** welcome to **********************", bold=True, fg='red') + result = pyfiglet.figlet_format("Drive - CLI", font="slant") + click.secho(result, fg='yellow') + click.secho( + "********************************************************", bold=True, fg='red') -@click.command('login', short_help='login to your google account and authenticate the service') def loggin(): + drive = utils.Drive() cwd = os.getcwd() - utils.save_history([{}, "", cwd]) + drive.save_history([{}, "", cwd]) @click.command('logout', short_help='logout from the account logged in with') @@ -39,17 +63,21 @@ def logout(): ''' logout: logout from the account that has been logged in ''' + drive = utils.Drive() cwd = os.getcwd() - utils.save_history([{}, "", cwd]) - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - if creds: - requests.post('https://accounts.google.com/o/oauth2/revoke', - params={'token': creds.access_token}, - headers={'content-type': 'application/x-www-form-urlencoded'}) - - os.remove(token) - click.secho("Logged Out successfully\nUse:") - click.secho("drive login", bold=True, fg='green') - click.secho("to login again") + drive.save_history([{}, "", cwd]) + token = config_path.joinpath('token.json') + if not token.is_file(): + click.secho("You are not logged in", fg="red") + sys.exit(1) + else: + store = file.Storage(token) + creds = store.get() + if creds: + requests.post('https://accounts.google.com/o/oauth2/revoke', + params={'token': creds.access_token}, + headers={'content-type': 'application/x-www-form-urlencoded'}) + os.remove(str(token)) + click.secho("Logged Out successfully\nUse:") + click.secho("drive login", bold=True, fg='green') + click.secho("to login again") diff --git a/drive_cli/dcli.py b/drive_cli/dcli.py index c5563b7..cadfc83 100644 --- a/drive_cli/dcli.py +++ b/drive_cli/dcli.py @@ -9,11 +9,10 @@ @click.group() -@click.option('--remote', is_flag=True, default=False, help='remote login in case browser is on a different machine') -def cli(remote): - auth.login(remote) +def cli(*args): + pass -cli.add_command(auth.loggin) +cli.add_command(auth.login) cli.add_command(actions.view_file) diff --git a/drive_cli/utils.py b/drive_cli/utils.py index 475c54b..c5762d4 100644 --- a/drive_cli/utils.py +++ b/drive_cli/utils.py @@ -7,6 +7,7 @@ import json import time from mimetypes import MimeTypes +from pathlib import Path from pick import Picker from datetime import datetime from googleapiclient.discovery import build @@ -14,340 +15,436 @@ from httplib2 import Http from oauth2client import file - dirpath = os.path.dirname(os.path.realpath(__file__)) mime = MimeTypes() +class UnauthorizedService: + unauth_msg = ("This application has not been authorized. Please run 'drive login', " + "use 'drive login --help' for further assistance.") -def get_history(): - hist_path = os.path.join(dirpath, '.history') - if not os.path.isfile(hist_path): - with open(hist_path, 'w')as outfile: - history = {} - json.dump(history, outfile) - else: - with open(hist_path, 'r') as infile: - history = json.load(infile) - return history - - -def save_history(info): - date_time = datetime.now().strftime("%d/%m/%Y %H:%M:%S").split(" ") - date = date_time[0] - time = date_time[1] - command = sys.argv - log = {"cwd": info[2], - "command": "drive " + command[1], - "arg": info[1], - "flags": info[0] - } - hist_path = os.path.join(dirpath, '.history') - history = get_history() - if not (date in history): - history[date] = {} - history[date][time] = log - with open(hist_path, 'w') as outfile: - json.dump(history, outfile) - - -def clear_history(): - hist_path = os.path.join(dirpath, '.history') - os.remove(hist_path) + def default(self): + click.secho(self.unauth_msg, fg="red") + sys.exit(1) + def __init__(self): + for item in {'about', 'changes', 'channels', 'comments', 'drives', 'files', 'new_batch_http_request', + 'permissions', 'replies', 'revisions', 'teamdrives'}: + setattr(self, item, self.default) -def go_back(picker): - return None, -1 +class Drive: -def drive_data(*argv): - dclipath = os.path.join(dirpath, '.drivecli') - if not os.path.isfile(dclipath): - with open(dclipath, 'w')as outfile: - if(not len(argv)): - data = {} - else: - data = argv[0] - json.dump(data, outfile) - else: - if(not len(argv)): - with open(dclipath, 'r') as infile: - data = json.load(infile) + def __init__(self): + self.config_path = Path.home().joinpath(".config", "drive-cli") + self.config_path.mkdir(parents=True, exist_ok=True) + self.hist_path = self.config_path.joinpath(".history") + self.dclipath = self.config_path.joinpath(".dclipath") + token = self.config_path.joinpath('token.json') + if token.is_file(): + store = file.Storage(str(token)) + creds = store.get() + self.service = build('drive', 'v3', http=creds.authorize(Http())) else: - with open(dclipath, 'w') as outfile: - data = argv[0] + self.service = UnauthorizedService() + + def get_history(self): + if not self.hist_path.is_file(): + with self.hist_path.open('w')as outfile: + history = {} + json.dump(history, outfile) + else: + with self.hist_path.open('r') as infile: + history = json.load(infile) + return history + + def save_history(self, info): + date_time = datetime.now().strftime("%d/%m/%Y %H:%M:%S").split(" ") + date = date_time[0] + time = date_time[1] + command = sys.argv + log = {"cwd": info[2], + "command": "drive " + command[1], + "arg": info[1], + "flags": info[0] + } + history = self.get_history() + if date not in history: + history[date] = {} + history[date][time] = log + with self.hist_path.open("w") as outfile: + json.dump(history, outfile) + + def clear_history(self): + os.remove(str(self.hist_path)) + + def drive_data(self, *argv): + if not self.dclipath.is_file(): + with self.dclipath.open("w") as outfile: + if (not len(argv)): + data = {} + else: + data = argv[0] json.dump(data, outfile) - return data - - -def get_request(service, fid, mimeType): - if(re.match('^application/vnd\.google-apps\..+', mimeType)): - if(mimeType == 'application/vnd.google-apps.document'): - mimeTypes = {extension: mime.guess_type("placeholder_filename." + extension)[0] for extension - in ("pdf", - "txt", - "docx", - "zip", - "html", - "rtf", - "odt")} - elif(mimeType == 'application/vnd.google-apps.spreadsheet'): - mimeTypes = {extension: mime.guess_type("placeholder_filename." + extension)[0] for extension - in ("pdf", - "xlsx", - "zip", - "html", - "ods", - "csv", - "tsv")} - elif(mimeType == 'application/vnd.google-apps.presentation'): - mimeTypes = {extension: mime.guess_type("paceholder_filename." + extension)[0] for extension - in ("pdf", - "zip", - "html", - "pptx", - "txt")} else: - mimeTypes = {extension: mime.guess_type("paceholder_filename." + extension)[0] for extension - in ("ods", - "csv", - "pdf", - "jpg", - "png", - "gif", - "bmp", - "txt", - "docx", - "js", - "swf", - "mp3", - "zip", - "rar", - "tar", - "cab", - "html", - "htm")} - mimeTypes.update( - {'tmpl': 'text/plain', 'php': 'application/x-httpd-php', 'arj': 'application/arj'}) - promptMessage = 'Choose type to export to \n(ENTER to select, s to stop):' - title = promptMessage - options = [x for x in mimeTypes.keys()] - picker = Picker(options, title, indicator='=>', default_index=0) - picker.register_custom_handler(ord('s'), go_back) - chosen, index = picker.start() - if index != -1: - request = service.files().export_media( - fileId=fid, mimeType=mimeTypes[chosen]) - return request, str("." + chosen) + if (not len(argv)): + with self.dclipath.open("r") as infile: + data = json.load(infile) + else: + with self.dclipath.open("w") as outfile: + data = argv[0] + json.dump(data, outfile) + return data + + @staticmethod + def get_request(service, fid, mimeType): + if (re.match('^application/vnd\.google-apps\..+', mimeType)): + if (mimeType == 'application/vnd.google-apps.document'): + mimeTypes = {extension: mime.guess_type("placeholder_filename." + extension)[0] for extension + in ("pdf", + "txt", + "doc", + "zip", + "html", + "rtf", + "odt")} + elif (mimeType == 'application/vnd.google-apps.spreadsheet'): + mimeTypes = {extension: mime.guess_type("placeholder_filename." + extension)[0] for extension + in ("pdf", + "xlsx", + "zip", + "html", + "ods", + "csv", + "tsv")} + elif (mimeType == 'application/vnd.google-apps.presentation'): + mimeTypes = {extension: mime.guess_type("paceholder_filename." + extension)[0] for extension + in ("pdf", + "zip", + "html", + "pptx", + "txt")} + else: + mimeTypes = {extension: mime.guess_type("paceholder_filename." + extension)[0] for extension + in ("ods", + "csv", + "pdf", + "jpg", + "png", + "gif", + "bmp", + "txt", + "doc", + "js", + "swf", + "mp3", + "zip", + "rar", + "tar", + "cab", + "html", + "htm")} + mimeTypes.update( + {'tmpl': 'text/plain', 'php': 'application/x-httpd-php', 'arj': 'application/arj'}) + promptMessage = 'Choose type to export to \n(ENTER to select, s to stop):' + title = promptMessage + options = [x for x in mimeTypes.keys()] + picker = Picker(options, title, indicator='=>', default_index=0) + picker.register_custom_handler(ord('s'), go_back) + chosen, index = picker.start() + if index != -1: + request = service.files().export_media( + fileId=fid, mimeType=mimeTypes[chosen]) + return request, str("." + chosen) + else: + sys.exit(0) else: - sys.exit(0) - else: - request = service.files().get_media(fileId=fid) - return request, "" - - -def write_needed(dir_name, item): - drive_time = time.mktime(time.strptime( - item['modifiedTime'], '%Y-%m-%dT%H:%M:%S.%fZ')) + float(19800.00) - local_time = os.path.getmtime(dir_name) - data = drive_data() - sync_time = data[dir_name]['time'] - if(sync_time < drive_time): - if(sync_time < local_time): - input = '' - while(input != 's' and input != 'o'): - input = click.prompt("Conflict: both local and online copy of " + - dir_name + " has been modified\npress o to OVERWRITE s to SKIP") - if(input == 'o'): + request = service.files().get_media(fileId=fid) + return request, "" + + def write_needed(self, dir_name, item): + drive_time = time.mktime(time.strptime( + item['modifiedTime'], '%Y-%m-%dT%H:%M:%S.%fZ')) + float(19800.00) + local_time = os.path.getmtime(dir_name) + data = self.drive_data() + sync_time = data[dir_name]['time'] + if (sync_time < drive_time): + if (sync_time < local_time): + input = '' + while (input != 's' and input != 'o'): + input = click.prompt("Conflict: both local and online copy of " + + dir_name + " has been modified\npress o to OVERWRITE s to SKIP") + if (input == 'o'): + return True + else: return True - else: - return True - return False - - -def push_needed(drive, item_path): - drive_time = time.mktime(time.strptime( - drive['modifiedTime'], '%Y-%m-%dT%H:%M:%S.%fZ')) + float(19800.00) - local_time = os.path.getmtime(item_path) - float(19801.00) - data = drive_data() - sync_time = data[item_path]['time'] - if sync_time < local_time: - if sync_time < drive_time: - input = '' - while(input != 's' and input != 'o'): - input = click.prompt("Conflict: both local and online copy of " + - dir_name + " has been modified\npress o to OVERWRITE s to SKIP") - if(input == 'o'): + return False + + def push_needed(self, drive, item_path): + drive_time = time.mktime(time.strptime( + drive['modifiedTime'], '%Y-%m-%dT%H:%M:%S.%fZ')) + float(19800.00) + local_time = os.path.getmtime(item_path) - float(19801.00) + data = self.drive_data() + sync_time = data[item_path]['time'] + if sync_time < local_time: + if sync_time < drive_time: + input = '' + while (input != 's' and input != 'o'): + input = click.prompt("Conflict: both local and online copy of " + + "dir_name" + " has been modified\npress o to OVERWRITE s to SKIP") + # TODO Figure out what dir_name is supposed to be + if (input == 'o'): + return True + else: return True + return False + + def modified_or_created(self, sync_time, item_path): + mtime = os.path.getmtime(item_path) + data = self.drive_data() + if item_path not in data.keys(): + click.secho("created: " + item_path, fg='green') + return 1 + elif (mtime > (sync_time + 1.000)): + click.secho("changed: " + item_path, fg='blue') + return 1 + return 0 + + @staticmethod + def get_fid(inp): + if 'google' in inp: + if 'open' in inp: + fid = inp.split('=')[-1] + elif 'folders' in inp: + fid = inp.split('/')[-1] + if '?' in fid: + fid = fid.split('?')[-2] + else: + fid = inp.split('/')[-2] else: - return True - return False - - -def modified_or_created(sync_time, item_path): - mtime = os.path.getmtime(item_path) - data = drive_data() - if item_path not in data.keys(): - click.secho("created: " + item_path, fg='green') - return 1 - elif(mtime > (sync_time + 1.000)): - click.secho("changed: " + item_path, fg='blue') - return 1 - return 0 - - -def get_fid(inp): - if 'google' in inp: - if 'open' in inp: - fid = inp.split('=')[-1] - elif 'folders' in inp: - fid = inp.split('/')[-1] - if '?' in fid: - fid = fid.split('?')[-2] - else: - fid = inp.split('/')[-2] - else: - fid = inp - return fid - + fid = inp + return fid -def create_new(cwd, fid): - if not os.path.exists(cwd): - os.mkdir(cwd) - else: - click.secho( - 'file ' + cwd + ' already exists! remove the existing file and retry', fg='red') - sys.exit(0) - data = drive_data() - data[cwd] = {} - data[cwd]['id'] = fid - data[cwd]['time'] = time.time() - drive_data(data) - - -def delete_file(fid): - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - service = build('drive', 'v2', http=creds.authorize(Http())) - fid = fid['id'] - try: - service.files().delete(fileId=fid).execute() - except: - click.secho( - "Error Ocurred:\n make sure that you have appropriate access", fg='red') - - -def get_file(fid): - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - service = build('drive', 'v3', http=creds.authorize(Http())) - files = service.files().get(fileId=fid).execute() - return files - - -def get_child(cwd): - data = drive_data() - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - service = build('drive', 'v3', http=creds.authorize(Http())) - page_token = None - drive_lis = {} - query = "'" + data[cwd]['id'] + "' in parents" - while True: - children = service.files().list(q=query, - spaces='drive', - fields='nextPageToken, files(id,mimeType,name,modifiedTime)', - pageToken=page_token - ).execute() - for child in children.get('files', []): - drive_lis[child['name']] = child - page_token = children.get('nextPageToken', None) - if page_token is None: - break - return drive_lis - - -def get_child_id(pid, item): - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - service = build('drive', 'v3', http=creds.authorize(Http())) - page_token = None - query = "name = '" + item + "' and " - query = "'" + pid + "' in parents" - response = service.files().list(q=query, - spaces='drive', - fields='nextPageToken, files(id, name)', - pageToken=page_token).execute() - fils = response.get('files', [])[0] - return fils.get('id') - - -def create_dir(cwd, pid, name): - file_metadata = { - 'name': name, - 'mimeType': 'application/vnd.google-apps.folder', - 'parents': [pid] - } - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - service = build('drive', 'v3', http=creds.authorize(Http())) - fid = service.files().create(body=file_metadata, fields='id').execute() - fid['time'] = time.time() - full_path = os.path.join(cwd, name) - data = drive_data() - data[full_path] = fid - drive_data(data) - click.secho("Created a tracked directory", fg='magenta') - return full_path, fid['id'] - - -def file_download(item, cwd, clone=False): - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - service = build('drive', 'v3', http=creds.authorize(Http())) - fid = item['id'] - fname = item['name'] - fh = io.BytesIO() - click.echo("Preparing: " + click.style(fname, fg='red') + " for download") - request, ext = get_request(service, fid, item['mimeType']) - file_path = (os.path.join(cwd, fname) + ext) - if(not clone and (os.path.exists(file_path)) and (not write_needed(file_path, item))): - return - downloader = MediaIoBaseDownload(fh, request) - done = False - with click.progressbar(length=100, label='downloading file') as bar: - pstatus = 0 + def create_new(self, cwd, fid): + if not os.path.exists(cwd): + os.mkdir(cwd) + else: + click.secho( + 'file ' + cwd + ' already exists! remove the existing file and retry', fg='red') + sys.exit(0) + data = self.drive_data() + data[cwd] = {} + data[cwd]['id'] = fid + data[cwd]['time'] = time.time() + self.drive_data(data) + + def delete_file(self, fid): + fid = fid['id'] + try: + self.service.files().delete(fileId=fid).execute() + except: + click.secho( + "Error Ocurred:\n make sure that you have appropriate access", fg='red') + + def get_file(self, fid): + files = self.service.files().get(fileId=fid).execute() + return files + + def get_child(self, cwd): + data = self.drive_data() + page_token = None + drive_lis = {} + query = "'" + data[cwd]['id'] + "' in parents" + while True: + children = self.service.files().list(q=query, + spaces='drive', + fields='nextPageToken, files(id,mimeType,name,modifiedTime)', + pageToken=page_token + ).execute() + for child in children.get('files', []): + drive_lis[child['name']] = child + page_token = children.get('nextPageToken', None) + if page_token is None: + break + return drive_lis + + def get_child_id(self, pid, item): + page_token = None + query = "name = '{}' and '{}' in parents".format(item, pid) + response = self.service.files().list(q=query, + spaces='drive', + fields='nextPageToken, files(id, name)', + pageToken=page_token).execute() + fils = response.get('files', [])[0] + return fils.get('id') + + def create_dir(self, cwd, pid, name): + file_metadata = { + 'name': name, + 'mimeType': 'application/vnd.google-apps.folder', + 'parents': [pid] + } + fid = self.service.files().create(body=file_metadata, fields='id').execute() + fid['time'] = time.time() + full_path = os.path.join(cwd, name) + data = self.drive_data() + data[full_path] = fid + self.drive_data(data) + click.secho("Created a tracked directory", fg='magenta') + return full_path, fid['id'] + + def file_download(self, item, cwd, clone=False): + fid = item['id'] + fname = item['name'] + fh = io.BytesIO() + click.echo("Preparing: " + click.style(fname, fg='red') + " for download") + request, ext = self.get_request(self.service, fid, item['mimeType']) + file_path = (os.path.join(cwd, fname) + ext) + if (not clone and (os.path.exists(file_path)) and (not self.write_needed(file_path, item))): + return + downloader = MediaIoBaseDownload(fh, request) + done = False + with click.progressbar(length=100, label='downloading file') as bar: + pstatus = 0 + while done is False: + status, done = downloader.next_chunk() + status = int(status.progress() * 100) + bar.update(int(status - pstatus)) + pstatus = status + with open(file_path, 'wb') as f: + f.write(fh.getvalue()) + data = self.drive_data() + data[file_path] = {'id': item['id'], 'time': time.time()} + self.drive_data(data) + click.secho("completed download of " + fname, fg='yellow') + + def concat(self, fid): + fh = io.BytesIO() + item = self.get_file(fid) + request, ext = self.get_request(self.service, fid, item['mimeType']) + downloader = MediaIoBaseDownload(fh, request) + done = False while done is False: status, done = downloader.next_chunk() - status = int(status.progress() * 100) - bar.update(int(status - pstatus)) - pstatus = status - with open(file_path, 'wb') as f: - f.write(fh.getvalue()) - data = drive_data() - data[file_path] = {'id': item['id'], 'time': time.time()} - drive_data(data) - click.secho("completed download of " + fname, fg='yellow') - - -def concat(fid): - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - service = build('drive', 'v3', http=creds.authorize(Http())) - fh = io.BytesIO() - item = get_file(fid) - request, ext = get_request(service, fid, item['mimeType']) - downloader = MediaIoBaseDownload(fh, request) - done = False - while done is False: - status, done = downloader.next_chunk() - print(fh.getvalue().decode('ISO-8859-1')) + print(fh.getvalue().decode('utf-8')) + + def upload_file(self, name, path, pid): + file_mimeType = identify_mimetype(name) + file_metadata = { + 'name': name, + 'parents': [pid], + 'mimeType': file_mimeType + } + media = MediaFileUpload(path, mimetype=file_mimeType) + new_file = self.service.files().create(body=file_metadata, + media_body=media, + fields='id').execute() + data = self.drive_data() + data[path] = {'id': new_file['id'], 'time': time.time()} + self.drive_data(data) + click.secho("uploaded " + name, fg='yellow') + return new_file + + def update_file(self, name, path, fid): + file_mimeType = identify_mimetype(name) + media = MediaFileUpload(path, mimetype=file_mimeType) + new_file = self.service.files().update(fileId=fid, + media_body=media, + fields='id').execute() + data = self.drive_data() + data[path]['time'] = {'time': time.time()} + self.drive_data(data) + return new_file + + def pull_content(self, cwd, fid): + data = self.drive_data() + page_token = None + lis = [] + query = "'" + data[cwd]['id'] + "' in parents" + while True: + children = self.service.files().list(q=query, + spaces='drive', + fields='nextPageToken, files(id,mimeType,name,modifiedTime)', + pageToken=page_token + ).execute() + for child in children.get('files', []): + lis.append(child) + page_token = children.get('nextPageToken', None) + if page_token is None: + break + for item in lis: + dir_name = os.path.join(cwd, item['name']) + if (item['mimeType'] != 'application/vnd.google-apps.folder'): + if ((not os.path.exists(dir_name)) or self.write_needed(dir_name, item)): + self.file_download(item, cwd, data[cwd]['time']) + else: + if (not os.path.exists(dir_name)): + click.secho("creating: " + dir_name) + os.mkdir(dir_name) + data = self.drive_data() + data[dir_name] = {'id': item['id'], 'time': time.time()} + data = self.drive_data(data) + else: + click.secho("updating: " + dir_name) + self.pull_content(dir_name, item['id']) + data = self.drive_data() + data[cwd]['time'] = time.time() + data = self.drive_data(data) + self.drive_data(data) + + def list_status(self, cwd, sync_time): + local_lis = list_local(cwd) + changes = 0 + for item in local_lis: + item_path = os.path.join(cwd, item) + if (os.path.isdir(item_path)): + if (self.modified_or_created(sync_time, item_path)): + changes += 1 + data = self.drive_data() + if item in data.keys(): + sync_time = data[item] + else: + sync_time = float(0) + self.list_status(item_path, sync_time) + else: + changes += self.modified_or_created(sync_time, item_path) + if changes == 0: + click.secho("No changes made since the last sync") + + def push_content(self, cwd, fid): + drive_lis = self.get_child(cwd) + local_lis = list_local(cwd) + data = self.drive_data() + for item in local_lis: + item_path = os.path.join(cwd, item) + if (os.path.isdir(item_path)): + if item not in drive_lis.keys(): + child_cwd, child_id = self.create_dir(cwd, fid, item) + else: + child_cwd = os.path.join(cwd, item) + child_id = drive_lis[item]['id'] + if child_cwd not in data.keys(): + data[child_cwd] = {'id': child_id, 'time': time.time()} + data = self.drive_data(data) + self.push_content(child_cwd, child_id) + else: + item_path = os.path.join(cwd, item) + if item not in drive_lis.keys(): + click.secho("uploading " + item + " ....") + self.upload_file(item, item_path, fid) + else: + if (self.push_needed(drive_lis[item], item_path)): + click.secho("updating " + item) + cid = self.get_child_id(fid, item) + self.update_file(item, item_path, cid) + click.secho("updating of " + item + + " completed", fg='yellow') + data = self.drive_data() + data[cwd]['time'] = time.time() + self.drive_data(data) + + +def go_back(picker): + return None, -1 def identify_mimetype(name): @@ -358,98 +455,6 @@ def identify_mimetype(name): return 'application/octet-stream' -def upload_file(name, path, pid): - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - service = build('drive', 'v3', http=creds.authorize(Http())) - file_mimeType = identify_mimetype(name) - file_metadata = { - 'name': name, - 'parents': [pid], - 'mimeType': file_mimeType - } - if os.stat(path).st_size <= (1024 * 1024): - media = MediaFileUpload(path, mimetype=file_mimeType) - new_file = service.files().create(body=file_metadata, - media_body=media, - fields='id').execute() - else: - CHUNK_SIZE_MB = int(os.getenv("CHUNK_SIZE_MB") or 1) # MB. You may want to increase the size to a higher speed if the network restrictions allow - media = MediaFileUpload( - path, mimetype=file_mimeType, - chunksize=(1024 * 1024 * CHUNK_SIZE_MB), resumable=True) - status, new_file = None, None - req = service.files().create(body=file_metadata, - media_body=media, - fields='id') - while new_file is None: - status, new_file = req.next_chunk() - - data = drive_data() - data[path] = {'id': new_file['id'], 'time': time.time()} - drive_data(data) - click.secho("uploaded " + name, fg='yellow') - return new_file - - -def update_file(name, path, fid): - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - service = build('drive', 'v3', http=creds.authorize(Http())) - file_mimeType = identify_mimetype(name) - media = MediaFileUpload(path, mimetype=file_mimeType) - new_file = service.files().update(fileId=fid, - media_body=media, - fields='id').execute() - data = drive_data() - data[path]['time'] = {'time': time.time()} - drive_data(data) - return new_file - - -def pull_content(cwd, fid): - data = drive_data() - token = os.path.join(dirpath, 'token.json') - store = file.Storage(token) - creds = store.get() - service = build('drive', 'v3', http=creds.authorize(Http())) - page_token = None - lis = [] - query = "'" + data[cwd]['id'] + "' in parents" - while True: - children = service.files().list(q=query, - spaces='drive', - fields='nextPageToken, files(id,mimeType,name,modifiedTime)', - pageToken=page_token - ).execute() - for child in children.get('files', []): - lis.append(child) - page_token = children.get('nextPageToken', None) - if page_token is None: - break - for item in lis: - dir_name = os.path.join(cwd, item['name']) - if(item['mimeType'] != 'application/vnd.google-apps.folder'): - if((not os.path.exists(dir_name)) or write_needed(dir_name, item)): - file_download(item, cwd, data[cwd]['time']) - else: - if(not os.path.exists(dir_name)): - click.secho("creating: " + dir_name) - os.makedirs(dir_name) - data = drive_data() - data[dir_name] = {'id': item['id'], 'time': time.time()} - data = drive_data(data) - else: - click.secho("updating: " + dir_name) - pull_content(dir_name, item['id']) - data = drive_data() - data[cwd]['time'] = time.time() - data = drive_data(data) - drive_data(data) - - def list_local(cwd): local_lis = os.listdir(cwd) drive_ignore_path = os.path.join(cwd, '.driveignore') @@ -460,56 +465,3 @@ def list_local(cwd): local_lis.remove(f[:-1]) file.close() return local_lis - - -def list_status(cwd, sync_time): - local_lis = list_local(cwd) - changes = 0 - for item in local_lis: - item_path = os.path.join(cwd, item) - if(os.path.isdir(item_path)): - if(modified_or_created(sync_time, item_path)): - changes += 1 - data = drive_data() - if item in data.keys(): - sync_time = data[item] - else: - sync_time = float(0) - list_status(item_path, sync_time) - else: - changes += modified_or_created(sync_time, item_path) - if changes == 0: - click.secho("No changes made since the last sync") - - -def push_content(cwd, fid): - drive_lis = get_child(cwd) - local_lis = list_local(cwd) - data = drive_data() - for item in local_lis: - item_path = os.path.join(cwd, item) - if(os.path.isdir(item_path)): - if item not in drive_lis.keys(): - child_cwd, child_id = create_dir(cwd, fid, item) - else: - child_cwd = os.path.join(cwd, item) - child_id = drive_lis[item]['id'] - if child_cwd not in data.keys(): - data[child_cwd] = {'id': child_id, 'time': time.time()} - data = drive_data(data) - push_content(child_cwd, child_id) - else: - item_path = os.path.join(cwd, item) - if item not in drive_lis.keys(): - click.secho("uploading " + item + " ....") - upload_file(item, item_path, fid) - else: - if(push_needed(drive_lis[item], item_path)): - click.secho("updating " + item) - cid = get_child_id(fid, item) - update_file(item, item_path, cid) - click.secho("updating of " + item + - " completed", fg='yellow') - data = drive_data() - data[cwd]['time'] = time.time() - drive_data(data) diff --git a/tests/test_auth.py b/tests/test_auth.py index 7fd09c1..6ae347f 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,8 +1,12 @@ +import os import unittest -from drive_cli.auth import (login) + +from click.testing import CliRunner +from drive_cli.auth import (login, config_path) class TestAuth(unittest.TestCase): + runner = CliRunner() def test_auth_when_token_is_incorrect(self): ''' @@ -10,14 +14,25 @@ def test_auth_when_token_is_incorrect(self): ''' pass - def test_auth_when_token_is_correct(self): + def test_login_remote(self): ''' Test when auth token in correct refresh current encoded token for auth to pass ''' - login(remote=False) - print("DONE LOGIN") + result = self.runner.invoke(login, ["--remote"]) + assert "Running without local webserver auth." in result.output + + def test_login_no_remote(self): + """ + Tests when the remote flag is off for login. + """ + result = self.runner.invoke(login) + assert "Running without local webserver auth." not in result.output + def test_login_with_json_file(self): + oauth_file = os.path.join(config_path, "oauth.json") + result = self.runner.invoke(login, ["--json-file", oauth_file]) + assert oauth_file in result.output if __name__ == "__main__": # Test path