Python FolderFetcherPipeline Example

说明

python folderfetcherpipeline示例是从最受好评的开源项目中提取的实现代码,你可以参考下面示例的使用方式。

编程语言: Python

命名空间/包名称: pipelinefolder_fetcher_pipeline

示例#1
文件: folder_fetcher_pipeline_test.py项目: michaelcupino/scapes

  def testRun_folderWithOneDocAndFolder(self,
      mockOAuth2CredentialsFromJsonMethod):
    # TODO(michaelcupino): Move test json files into a test_utils package.
    http = HttpMockSequence([
      # Recursion level 0: Has 1 folder
      ({'status': '200'}, open(datafile(
          'test-drive-children-list-1-item-a.json'), 'rb').read()),
      # Recursion level 1: Has 0 folders
      ({'status': '200'}, open(datafile('test-drive-children-list-empty.json'),
          'rb').read()),
      # Recursion level 1: Has 1 document (1vN98...)
      ({'status': '200'}, open(datafile(
          'test-drive-children-list-1-item-a.json'), 'rb').read()),
      # Recursion level 0: Has 1 document (abc123)
      ({'status': '200'}, open(datafile(
          'test-drive-children-list-1-item-b.json'), 'rb').read()),
    ])
    mockCredentials = MagicMock(name='mockCredentials')
    mockCredentials.authorize = MagicMock(return_value=http)
    mockOAuth2CredentialsFromJsonMethod.return_value = mockCredentials

    pipeline = FolderFetcherPipeline('/folder123', None)
    pipeline.start_test()
    result = pipeline.outputs.default.value

    self.assertEqual(['abc123', '1vN98-jz7tx_mal-p_gn-vbQLH7Yq1-yi7Lc7Zw8Uy60'],
        result)

示例#2
文件: folder_fetcher_pipeline_test.py项目: michaelcupino/scapes

  def testRun_docPageToken(self,
      mockOAuth2CredentialsFromJsonMethod):
    # TODO(michaelcupino): Move test json files into a test_utils package.
    http = HttpMockSequence([
      # Recursion level 0: Has 0 folders
      ({'status': '200'}, open(datafile('test-drive-children-list-empty.json'),
          'rb').read()),
      # Recursion level 0: Has 1 document (1o0L3...) with a page token
      ({'status': '200'}, open(datafile(
          'test-drive-children-list-1-item-page.json'), 'rb').read()),
      # Recursion level 0: Has 1 document (1vN98...) with no page token
      ({'status': '200'}, open(datafile(
          'test-drive-children-list-1-item-a.json'), 'rb').read()),
    ])
    mockCredentials = MagicMock(name='mockCredentials')
    mockCredentials.authorize = MagicMock(return_value=http)
    mockOAuth2CredentialsFromJsonMethod.return_value = mockCredentials

    pipeline = FolderFetcherPipeline('/folder123', None)
    pipeline.start_test()
    result = pipeline.outputs.default.value

    expected = [
      '1o0L3UOiubzeBZNFEyKzZrUfVdWZ4K4-WeINn83VkwAI',
      '1vN98-jz7tx_mal-p_gn-vbQLH7Yq1-yi7Lc7Zw8Uy60',
    ]
    self.assertEqual(expected, result)

示例#3
文件: folder_fetcher_pipeline_test.py项目: michaelcupino/scapes

  def testRun_emptyFolder(self, mockOAuth2CredentialsFromJsonMethod):
    # TODO(michaelcupino): Move test json files into a test_utils package.
    http = HttpMockSequence([
      ({'status': '200'}, open(datafile('test-drive-children-list-empty.json'),
          'rb').read()),
      ({'status': '200'}, open(datafile('test-drive-children-list-empty.json'),
          'rb').read()),
    ])
    mockCredentials = MagicMock(name='mockCredentials')
    mockCredentials.authorize = MagicMock(return_value=http)
    mockOAuth2CredentialsFromJsonMethod.return_value = mockCredentials

    pipeline = FolderFetcherPipeline('/folder123', None)
    pipeline.start_test()
    result = pipeline.outputs.default.value

    self.assertEqual([], result)

展开阅读全文